diff --git a/server/sched_test.go b/server/sched_test.go index 3fbd188a..c16b407d 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "runtime" "testing" "time" @@ -94,7 +95,7 @@ func TestLoad(t *testing.T) { require.Len(t, s.expiredCh, 1) } -type bundle struct { +type reqBundle struct { ctx context.Context //nolint:containedctx ctxDone func() srv *mockLlm @@ -102,13 +103,13 @@ type bundle struct { ggml *llm.GGML } -func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) { +func (scenario *reqBundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) { return scenario.srv, nil } -func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64) *bundle { - scenario := &bundle{} - scenario.ctx, scenario.ctxDone = context.WithCancel(ctx) +func newScenarioRequest(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64, duration *api.Duration) *reqBundle { + b := &reqBundle{} + b.ctx, b.ctxDone = context.WithCancel(ctx) t.Helper() f, err := os.CreateTemp(t.TempDir(), modelName) @@ -135,124 +136,154 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV fname := f.Name() model := &Model{Name: modelName, ModelPath: fname} - scenario.ggml, err = llm.LoadModel(model.ModelPath, 0) + b.ggml, err = llm.LoadModel(model.ModelPath, 0) require.NoError(t, err) - scenario.req = &LlmRequest{ - ctx: scenario.ctx, + if duration == nil { + duration = &api.Duration{Duration: 5 * time.Millisecond} + } + b.req = &LlmRequest{ + ctx: b.ctx, model: model, opts: api.DefaultOptions(), - sessionDuration: &api.Duration{Duration: 5 * time.Millisecond}, + sessionDuration: duration, successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), } - scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}} - return scenario + b.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}} + return b } -func TestRequests(t *testing.T) { - ctx, done := context.WithTimeout(context.Background(), 10*time.Second) +func getGpuFn() gpu.GpuInfoList { + g := gpu.GpuInfo{Library: "metal"} + g.TotalMemory = 24 * format.GigaByte + g.FreeMemory = 12 * format.GigaByte + return []gpu.GpuInfo{g} +} + +func getCpuFn() gpu.GpuInfoList { + g := gpu.GpuInfo{Library: "cpu"} + g.TotalMemory = 32 * format.GigaByte + g.FreeMemory = 26 * format.GigaByte + return []gpu.GpuInfo{g} +} + +func TestRequestsSameModelSameRequest(t *testing.T) { + ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond) defer done() - - // Same model, same request - scenario1a := newScenario(t, ctx, "ollama-model-1", 10) - scenario1a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond} - scenario1b := newScenario(t, ctx, "ollama-model-1", 11) - scenario1b.req.model = scenario1a.req.model - scenario1b.ggml = scenario1a.ggml - scenario1b.req.sessionDuration = &api.Duration{Duration: 0} - - // simple reload of same model - scenario2a := newScenario(t, ctx, "ollama-model-1", 20) - tmpModel := *scenario1a.req.model - scenario2a.req.model = &tmpModel - scenario2a.ggml = scenario1a.ggml - scenario2a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond} - - // Multiple loaded models - scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) - scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte) - scenario3c := newScenario(t, ctx, "ollama-model-4a", 30) - scenario3c.req.opts.NumGPU = 0 // CPU load, will be allowed - scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded - s := InitScheduler(ctx) - s.getGpuFn = func() gpu.GpuInfoList { - g := gpu.GpuInfo{Library: "metal"} - g.TotalMemory = 24 * format.GigaByte - g.FreeMemory = 12 * format.GigaByte - return []gpu.GpuInfo{g} - } - s.getCpuFn = func() gpu.GpuInfoList { - g := gpu.GpuInfo{Library: "cpu"} - g.TotalMemory = 32 * format.GigaByte - g.FreeMemory = 26 * format.GigaByte - return []gpu.GpuInfo{g} - } - s.newServerFn = scenario1a.newServer - slog.Info("scenario1a") - s.pendingReqCh <- scenario1a.req + s.getGpuFn = getGpuFn + s.getCpuFn = getCpuFn + a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}) + b := newScenarioRequest(t, ctx, "ollama-model-1", 11, &api.Duration{Duration: 0}) + b.req.model = a.req.model + b.ggml = a.ggml + + s.newServerFn = a.newServer + slog.Info("a") + s.pendingReqCh <- a.req require.Len(t, s.pendingReqCh, 1) s.Run(ctx) select { - case resp := <-scenario1a.req.successCh: - require.Equal(t, resp.llama, scenario1a.srv) + case resp := <-a.req.successCh: + require.Equal(t, resp.llama, a.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario1a.req.errCh) - case err := <-scenario1a.req.errCh: + require.Empty(t, a.req.errCh) + case err := <-a.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } // Same runner as first request due to not needing a reload - s.newServerFn = scenario1b.newServer - slog.Info("scenario1b") - s.pendingReqCh <- scenario1b.req + s.newServerFn = b.newServer + slog.Info("b") + s.pendingReqCh <- b.req select { - case resp := <-scenario1b.req.successCh: - require.Equal(t, resp.llama, scenario1a.srv) + case resp := <-b.req.successCh: + require.Equal(t, resp.llama, a.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario1b.req.errCh) - case err := <-scenario1b.req.errCh: + require.Empty(t, b.req.errCh) + case err := <-b.req.errCh: + t.Fatal(err.Error()) + case <-ctx.Done(): + t.Fatal("timeout") + } +} + +func TestRequestsSimpleReloadSameModel(t *testing.T) { + ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer done() + s := InitScheduler(ctx) + s.getGpuFn = getGpuFn + s.getCpuFn = getCpuFn + a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}) + b := newScenarioRequest(t, ctx, "ollama-model-1", 20, &api.Duration{Duration: 5 * time.Millisecond}) + tmpModel := *a.req.model + b.req.model = &tmpModel + b.ggml = a.ggml + + s.newServerFn = a.newServer + slog.Info("a") + s.pendingReqCh <- a.req + require.Len(t, s.pendingReqCh, 1) + s.Run(ctx) + select { + case resp := <-a.req.successCh: + require.Equal(t, resp.llama, a.srv) + require.Empty(t, s.pendingReqCh) + require.Empty(t, a.req.errCh) + case err := <-a.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } // Trigger a reload - s.newServerFn = scenario2a.newServer - scenario2a.req.model.AdapterPaths = []string{"new"} - slog.Info("scenario2a") - s.pendingReqCh <- scenario2a.req + s.newServerFn = b.newServer + b.req.model.AdapterPaths = []string{"new"} + slog.Info("b") + s.pendingReqCh <- b.req // finish first two requests, so model can reload time.Sleep(1 * time.Millisecond) - scenario1a.ctxDone() - scenario1b.ctxDone() + a.ctxDone() select { - case resp := <-scenario2a.req.successCh: - require.Equal(t, resp.llama, scenario2a.srv) + case resp := <-b.req.successCh: + require.Equal(t, resp.llama, b.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario2a.req.errCh) - case err := <-scenario2a.req.errCh: + require.Empty(t, b.req.errCh) + case err := <-b.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } +} + +func TestRequestsMultipleLoadedModels(t *testing.T) { + ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer done() + s := InitScheduler(ctx) + s.getGpuFn = getGpuFn + s.getCpuFn = getCpuFn + + // Multiple loaded models + a := newScenarioRequest(t, ctx, "ollama-model-3a", 1*format.GigaByte, nil) + b := newScenarioRequest(t, ctx, "ollama-model-3b", 24*format.GigaByte, nil) + c := newScenarioRequest(t, ctx, "ollama-model-4a", 30, nil) + c.req.opts.NumGPU = 0 // CPU load, will be allowed + d := newScenarioRequest(t, ctx, "ollama-model-3c", 30, nil) // Needs prior unloaded envconfig.MaxRunners = 1 - s.newServerFn = scenario3a.newServer - slog.Info("scenario3a") - s.pendingReqCh <- scenario3a.req - // finish prior request, so new model can load - time.Sleep(1 * time.Millisecond) - scenario2a.ctxDone() + s.newServerFn = a.newServer + slog.Info("a") + s.pendingReqCh <- a.req + s.Run(ctx) select { - case resp := <-scenario3a.req.successCh: - require.Equal(t, resp.llama, scenario3a.srv) + case resp := <-a.req.successCh: + require.Equal(t, resp.llama, a.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario3a.req.errCh) - case err := <-scenario3a.req.errCh: + require.Empty(t, a.req.errCh) + case err := <-a.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") @@ -262,15 +293,15 @@ func TestRequests(t *testing.T) { s.loadedMu.Unlock() envconfig.MaxRunners = 0 - s.newServerFn = scenario3b.newServer - slog.Info("scenario3b") - s.pendingReqCh <- scenario3b.req + s.newServerFn = b.newServer + slog.Info("b") + s.pendingReqCh <- b.req select { - case resp := <-scenario3b.req.successCh: - require.Equal(t, resp.llama, scenario3b.srv) + case resp := <-b.req.successCh: + require.Equal(t, resp.llama, b.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario3b.req.errCh) - case err := <-scenario3b.req.errCh: + require.Empty(t, b.req.errCh) + case err := <-b.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") @@ -280,15 +311,15 @@ func TestRequests(t *testing.T) { s.loadedMu.Unlock() // This is a CPU load with NumGPU = 0 so it should load - s.newServerFn = scenario3c.newServer - slog.Info("scenario3c") - s.pendingReqCh <- scenario3c.req + s.newServerFn = c.newServer + slog.Info("c") + s.pendingReqCh <- c.req select { - case resp := <-scenario3c.req.successCh: - require.Equal(t, resp.llama, scenario3c.srv) + case resp := <-c.req.successCh: + require.Equal(t, resp.llama, c.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario3c.req.errCh) - case err := <-scenario3c.req.errCh: + require.Empty(t, c.req.errCh) + case err := <-c.req.errCh: t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") @@ -298,25 +329,25 @@ func TestRequests(t *testing.T) { s.loadedMu.Unlock() // Try to load a model that wont fit - s.newServerFn = scenario3d.newServer - slog.Info("scenario3d") + s.newServerFn = d.newServer + slog.Info("d") s.loadedMu.Lock() require.Len(t, s.loaded, 3) s.loadedMu.Unlock() - scenario3a.ctxDone() // Won't help since this one isn't big enough to make room + a.ctxDone() // Won't help since this one isn't big enough to make room time.Sleep(2 * time.Millisecond) - s.pendingReqCh <- scenario3d.req + s.pendingReqCh <- d.req // finish prior request, so new model can load time.Sleep(6 * time.Millisecond) s.loadedMu.Lock() require.Len(t, s.loaded, 2) s.loadedMu.Unlock() - scenario3b.ctxDone() + b.ctxDone() select { - case resp := <-scenario3d.req.successCh: - require.Equal(t, resp.llama, scenario3d.srv) + case resp := <-d.req.successCh: + require.Equal(t, resp.llama, d.srv) require.Empty(t, s.pendingReqCh) - require.Empty(t, scenario3d.req.errCh) + require.Empty(t, d.req.errCh) case <-ctx.Done(): t.Fatal("timeout") } @@ -325,30 +356,59 @@ func TestRequests(t *testing.T) { s.loadedMu.Unlock() } +func TestRequestsModelTooBigForSystem(t *testing.T) { + ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer done() + s := InitScheduler(ctx) + s.getGpuFn = func() gpu.GpuInfoList { + g := gpu.GpuInfo{Library: "metal"} + g.TotalMemory = 4 * format.MebiByte + g.FreeMemory = 3 * format.MebiByte + return []gpu.GpuInfo{g} + } + + s.getCpuFn = func() gpu.GpuInfoList { + g := gpu.GpuInfo{Library: "cpu"} + g.TotalMemory = 4 * format.MebiByte + g.FreeMemory = 2 * format.MebiByte + return []gpu.GpuInfo{g} + } + a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond}) + + s.newServerFn = a.newServer + slog.Info("a") + s.pendingReqCh <- a.req + require.Len(t, s.pendingReqCh, 1) + s.Run(ctx) + select { + case <-a.req.successCh: + if runtime.GOOS == "linux" { + t.Fatal("request should have been rejected with out of space") + } + // else - Darwin and Windows don't reject right now + case err := <-a.req.errCh: + require.Contains(t, err.Error(), "too large") + case <-ctx.Done(): + t.Fatal("timeout") + } +} func TestGetRunner(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) defer done() - scenario1a := newScenario(t, ctx, "ollama-model-1a", 10) - scenario1a.req.sessionDuration = &api.Duration{Duration: 0} - scenario1b := newScenario(t, ctx, "ollama-model-1b", 10) - scenario1b.req.sessionDuration = &api.Duration{Duration: 0} - scenario1c := newScenario(t, ctx, "ollama-model-1c", 10) - scenario1c.req.sessionDuration = &api.Duration{Duration: 0} + a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, &api.Duration{Duration: 2 * time.Millisecond}) + b := newScenarioRequest(t, ctx, "ollama-model-1b", 10, &api.Duration{Duration: 2 * time.Millisecond}) + c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond}) envconfig.MaxQueuedRequests = 1 s := InitScheduler(ctx) - s.getGpuFn = func() gpu.GpuInfoList { - g := gpu.GpuInfo{Library: "metal"} - g.TotalMemory = 24 * format.GigaByte - g.FreeMemory = 12 * format.GigaByte - return []gpu.GpuInfo{g} - } - s.newServerFn = scenario1a.newServer - slog.Info("scenario1a") - successCh1a, errCh1a := s.GetRunner(scenario1a.ctx, scenario1a.req.model, scenario1a.req.opts, scenario1a.req.sessionDuration) + s.getGpuFn = getGpuFn + s.getCpuFn = getCpuFn + s.newServerFn = a.newServer + slog.Info("a") + successCh1a, errCh1a := s.GetRunner(a.ctx, a.req.model, a.req.opts, a.req.sessionDuration) require.Len(t, s.pendingReqCh, 1) - slog.Info("scenario1b") - successCh1b, errCh1b := s.GetRunner(scenario1b.ctx, scenario1b.req.model, scenario1b.req.opts, scenario1b.req.sessionDuration) + slog.Info("b") + successCh1b, errCh1b := s.GetRunner(b.ctx, b.req.model, b.req.opts, b.req.sessionDuration) require.Len(t, s.pendingReqCh, 1) require.Empty(t, successCh1b) require.Len(t, errCh1b, 1) @@ -357,22 +417,24 @@ func TestGetRunner(t *testing.T) { s.Run(ctx) select { case resp := <-successCh1a: - require.Equal(t, resp.llama, scenario1a.srv) + require.Equal(t, resp.llama, a.srv) require.Empty(t, s.pendingReqCh) require.Empty(t, errCh1a) + case err := <-errCh1a: + t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } - scenario1a.ctxDone() + a.ctxDone() // Set "a" model to idle so it can unload s.loadedMu.Lock() require.Len(t, s.loaded, 1) s.loadedMu.Unlock() - scenario1c.req.model.ModelPath = "bad path" - slog.Info("scenario1c") - successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration) + c.req.model.ModelPath = "bad path" + slog.Info("c") + successCh1c, errCh1c := s.GetRunner(c.ctx, c.req.model, c.req.opts, c.req.sessionDuration) // Starts in pending channel, then should be quickly processsed to return an error - time.Sleep(5 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Long enough for the "a" model to expire and unload require.Empty(t, successCh1c) s.loadedMu.Lock() require.Empty(t, s.loaded) @@ -380,7 +442,7 @@ func TestGetRunner(t *testing.T) { require.Len(t, errCh1c, 1) err = <-errCh1c require.Contains(t, err.Error(), "bad path") - scenario1b.ctxDone() + b.ctxDone() } // TODO - add one scenario that triggers the bogus finished event with positive ref count @@ -389,7 +451,7 @@ func TestPrematureExpired(t *testing.T) { defer done() // Same model, same request - scenario1a := newScenario(t, ctx, "ollama-model-1a", 10) + scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil) s := InitScheduler(ctx) s.getGpuFn = func() gpu.GpuInfoList { g := gpu.GpuInfo{Library: "metal"} @@ -411,6 +473,8 @@ func TestPrematureExpired(t *testing.T) { s.loadedMu.Unlock() slog.Info("sending premature expired event now") s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe + case err := <-errCh1a: + t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } @@ -446,6 +510,8 @@ func TestUseLoadedRunner(t *testing.T) { select { case success := <-req.successCh: require.Equal(t, r1, success) + case err := <-req.errCh: + t.Fatal(err.Error()) case <-ctx.Done(): t.Fatal("timeout") } @@ -625,8 +691,7 @@ func TestAlreadyCanceled(t *testing.T) { defer done() dctx, done2 := context.WithCancel(ctx) done2() - scenario1a := newScenario(t, dctx, "ollama-model-1", 10) - scenario1a.req.sessionDuration = &api.Duration{Duration: 0} + scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0}) s := InitScheduler(ctx) slog.Info("scenario1a") s.pendingReqCh <- scenario1a.req