From d6e3b64582d93dfa0c836965b70d66c8487c2f8b Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Sun, 28 Apr 2024 13:40:31 -0700 Subject: [PATCH] Fix concurrency for CPU mode Prior refactoring passes accidentally removed the logic to bypass VRAM checks for CPU loads. This adds that back, along with test coverage. This also fixes loaded map access in the unit test to be behind the mutex which was likely the cause of various flakes in the tests. --- server/sched.go | 8 ++++ server/sched_test.go | 93 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 88 insertions(+), 13 deletions(-) diff --git a/server/sched.go b/server/sched.go index 3c7ab66e..0a6738a2 100644 --- a/server/sched.go +++ b/server/sched.go @@ -149,6 +149,14 @@ func (s *Scheduler) processPending(ctx context.Context) { break } + // If we're CPU only mode, just limit by loadedMax above + // TODO handle system memory exhaustion + if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 { + slog.Debug("cpu mode with existing models, loading") + s.loadFn(pending, ggml, gpus) + break + } + // No models loaded. Load the model but prefer the best fit. if loadedCount == 0 { slog.Debug("loading first model", "model", pending.model.ModelPath) diff --git a/server/sched_test.go b/server/sched_test.go index 86bd7846..32a80674 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -28,19 +28,33 @@ func TestInitScheduler(t *testing.T) { ctx, done := context.WithCancel(context.Background()) defer done() initialMax := loadedMax + initialParallel := numParallel s := InitScheduler(ctx) require.Equal(t, initialMax, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue") s = InitScheduler(ctx) require.Equal(t, initialMax, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0") s = InitScheduler(ctx) require.Equal(t, 0, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() + + os.Setenv("OLLAMA_NUM_PARALLEL", "blue") + _ = InitScheduler(ctx) + require.Equal(t, initialParallel, numParallel) + os.Setenv("OLLAMA_NUM_PARALLEL", "10") + _ = InitScheduler(ctx) + require.Equal(t, 10, numParallel) } func TestLoad(t *testing.T) { @@ -51,6 +65,7 @@ func TestLoad(t *testing.T) { req := &LlmRequest{ ctx: ctx, model: &Model{ModelPath: "foo"}, + opts: api.DefaultOptions(), successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), sessionDuration: 2, @@ -63,7 +78,9 @@ func TestLoad(t *testing.T) { s.load(req, ggml, gpus) require.Len(t, req.successCh, 0) require.Len(t, req.errCh, 1) + s.loadedMu.Lock() require.Len(t, s.loaded, 0) + s.loadedMu.Unlock() err := <-req.errCh require.Contains(t, err.Error(), "this model may be incompatible") @@ -78,7 +95,9 @@ func TestLoad(t *testing.T) { case resp := <-req.successCh: require.Equal(t, uint64(10), resp.estimatedVRAM) require.Equal(t, uint(1), resp.refCount) + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() } req.model.ModelPath = "dummy_model_path" @@ -90,7 +109,9 @@ func TestLoad(t *testing.T) { case resp := <-req.successCh: t.Errorf("unexpected success %v", resp) } + s.loadedMu.Lock() runner := s.loaded["dummy_model_path"] + s.loadedMu.Unlock() require.NotNil(t, runner) require.Equal(t, uint(0), runner.refCount) time.Sleep(1 * time.Millisecond) @@ -143,6 +164,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV scenario.req = &LlmRequest{ ctx: scenario.ctx, model: model, + opts: api.DefaultOptions(), sessionDuration: 5 * time.Millisecond, successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), @@ -171,7 +193,9 @@ func TestRequests(t *testing.T) { // Multiple loaded models scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte) - scenario3c := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded + scenario3c := newScenario(t, ctx, "ollama-model-4a", 30) + scenario3c.req.opts.NumGPU = 0 // CPU load, will be allowed + scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded s := InitScheduler(ctx) s.getGpuFn = func() gpu.GpuInfoList { @@ -240,7 +264,9 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() loadedMax = 0 s.newServerFn = scenario3b.newServer @@ -254,19 +280,14 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } + s.loadedMu.Lock() require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() - // Try to load a model that wont fit + // This is a CPU load with NumGPU = 0 so it should load s.newServerFn = scenario3c.newServer slog.Info("scenario3c") - require.Len(t, s.loaded, 2) - scenario3a.ctxDone() // Won't help since this one isn't big enough to make room - time.Sleep(2 * time.Millisecond) s.pendingReqCh <- scenario3c.req - // finish prior request, so new model can load - time.Sleep(6 * time.Millisecond) - require.Len(t, s.loaded, 1) - scenario3b.ctxDone() select { case resp := <-scenario3c.req.successCh: require.Equal(t, resp.llama, scenario3c.srv) @@ -275,7 +296,36 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } - require.Len(t, s.loaded, 1) + s.loadedMu.Lock() + require.Len(t, s.loaded, 3) + s.loadedMu.Unlock() + + // Try to load a model that wont fit + s.newServerFn = scenario3d.newServer + slog.Info("scenario3d") + s.loadedMu.Lock() + require.Len(t, s.loaded, 3) + s.loadedMu.Unlock() + scenario3a.ctxDone() // Won't help since this one isn't big enough to make room + time.Sleep(2 * time.Millisecond) + s.pendingReqCh <- scenario3d.req + // finish prior request, so new model can load + time.Sleep(6 * time.Millisecond) + s.loadedMu.Lock() + require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() + scenario3b.ctxDone() + select { + case resp := <-scenario3d.req.successCh: + require.Equal(t, resp.llama, scenario3d.srv) + require.Len(t, s.pendingReqCh, 0) + require.Len(t, scenario3d.req.errCh, 0) + case <-ctx.Done(): + t.Errorf("timeout") + } + s.loadedMu.Lock() + require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() } func TestGetRunner(t *testing.T) { @@ -318,7 +368,9 @@ func TestGetRunner(t *testing.T) { t.Errorf("timeout") } scenario1a.ctxDone() + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() scenario1c.req.model.ModelPath = "bad path" slog.Info("scenario1c") @@ -328,7 +380,9 @@ func TestGetRunner(t *testing.T) { require.Len(t, errCh1c, 0) time.Sleep(5 * time.Millisecond) + s.loadedMu.Lock() require.Len(t, s.loaded, 0) + s.loadedMu.Unlock() require.Len(t, errCh1c, 1) err = <-errCh1c require.Contains(t, err.Error(), "bad path") @@ -358,7 +412,9 @@ func TestPrematureExpired(t *testing.T) { require.Equal(t, resp.llama, scenario1a.srv) require.Len(t, s.pendingReqCh, 0) require.Len(t, errCh1a, 0) + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() slog.Info("sending premature expired event now") s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe case <-ctx.Done(): @@ -383,6 +439,7 @@ func TestUseLoadedRunner(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) req := &LlmRequest{ ctx: ctx, + opts: api.DefaultOptions(), successCh: make(chan *runnerRef, 1), sessionDuration: 2, } @@ -426,8 +483,10 @@ func TestUpdateFreeSpace(t *testing.T) { r2 := &runnerRef{llama: llm2, gpus: gpus} s := InitScheduler(ctx) + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() s.updateFreeSpace(gpus) require.Equal(t, uint64(850), gpus[0].FreeMemory) @@ -437,13 +496,18 @@ func TestUpdateFreeSpace(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) defer done() - req := &LlmRequest{ctx: ctx} + req := &LlmRequest{ + ctx: ctx, + opts: api.DefaultOptions(), + } r1 := &runnerRef{refCount: 1, sessionDuration: 1} r2 := &runnerRef{sessionDuration: 2} s := InitScheduler(ctx) + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() resp := s.findRunnerToUnload(req) require.Equal(t, r2, resp) @@ -458,10 +522,11 @@ func TestNeedsReload(t *testing.T) { defer done() llm := &mockLlm{} + do := api.DefaultOptions() runner := &runnerRef{ adapters: []string{"adapter1"}, projectors: []string{"projector1"}, - Options: &api.Options{}, + Options: &do, llama: llm, } req := &LlmRequest{ @@ -469,7 +534,7 @@ func TestNeedsReload(t *testing.T) { AdapterPaths: []string{"adapter2"}, ProjectorPaths: []string{"projector2"}, }, - opts: api.Options{}, + opts: api.DefaultOptions(), } resp := runner.needsReload(ctx, req) require.True(t, resp) @@ -508,8 +573,10 @@ func TestUnloadAllRunners(t *testing.T) { r1 := &runnerRef{llama: llm1} r2 := &runnerRef{llama: llm2} + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() s.unloadAllRunners() require.True(t, llm1.closeCalled)