diff --git a/server/sched.go b/server/sched.go index 3c7ab66e..0a6738a2 100644 --- a/server/sched.go +++ b/server/sched.go @@ -149,6 +149,14 @@ func (s *Scheduler) processPending(ctx context.Context) { break } + // If we're CPU only mode, just limit by loadedMax above + // TODO handle system memory exhaustion + if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 { + slog.Debug("cpu mode with existing models, loading") + s.loadFn(pending, ggml, gpus) + break + } + // No models loaded. Load the model but prefer the best fit. if loadedCount == 0 { slog.Debug("loading first model", "model", pending.model.ModelPath) diff --git a/server/sched_test.go b/server/sched_test.go index 86bd7846..32a80674 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -28,19 +28,33 @@ func TestInitScheduler(t *testing.T) { ctx, done := context.WithCancel(context.Background()) defer done() initialMax := loadedMax + initialParallel := numParallel s := InitScheduler(ctx) require.Equal(t, initialMax, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue") s = InitScheduler(ctx) require.Equal(t, initialMax, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0") s = InitScheduler(ctx) require.Equal(t, 0, loadedMax) + s.loadedMu.Lock() require.NotNil(t, s.loaded) + s.loadedMu.Unlock() + + os.Setenv("OLLAMA_NUM_PARALLEL", "blue") + _ = InitScheduler(ctx) + require.Equal(t, initialParallel, numParallel) + os.Setenv("OLLAMA_NUM_PARALLEL", "10") + _ = InitScheduler(ctx) + require.Equal(t, 10, numParallel) } func TestLoad(t *testing.T) { @@ -51,6 +65,7 @@ func TestLoad(t *testing.T) { req := &LlmRequest{ ctx: ctx, model: &Model{ModelPath: "foo"}, + opts: api.DefaultOptions(), successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), sessionDuration: 2, @@ -63,7 +78,9 @@ func TestLoad(t *testing.T) { s.load(req, ggml, gpus) require.Len(t, req.successCh, 0) require.Len(t, req.errCh, 1) + s.loadedMu.Lock() require.Len(t, s.loaded, 0) + s.loadedMu.Unlock() err := <-req.errCh require.Contains(t, err.Error(), "this model may be incompatible") @@ -78,7 +95,9 @@ func TestLoad(t *testing.T) { case resp := <-req.successCh: require.Equal(t, uint64(10), resp.estimatedVRAM) require.Equal(t, uint(1), resp.refCount) + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() } req.model.ModelPath = "dummy_model_path" @@ -90,7 +109,9 @@ func TestLoad(t *testing.T) { case resp := <-req.successCh: t.Errorf("unexpected success %v", resp) } + s.loadedMu.Lock() runner := s.loaded["dummy_model_path"] + s.loadedMu.Unlock() require.NotNil(t, runner) require.Equal(t, uint(0), runner.refCount) time.Sleep(1 * time.Millisecond) @@ -143,6 +164,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV scenario.req = &LlmRequest{ ctx: scenario.ctx, model: model, + opts: api.DefaultOptions(), sessionDuration: 5 * time.Millisecond, successCh: make(chan *runnerRef, 1), errCh: make(chan error, 1), @@ -171,7 +193,9 @@ func TestRequests(t *testing.T) { // Multiple loaded models scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte) - scenario3c := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded + scenario3c := newScenario(t, ctx, "ollama-model-4a", 30) + scenario3c.req.opts.NumGPU = 0 // CPU load, will be allowed + scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded s := InitScheduler(ctx) s.getGpuFn = func() gpu.GpuInfoList { @@ -240,7 +264,9 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() loadedMax = 0 s.newServerFn = scenario3b.newServer @@ -254,19 +280,14 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } + s.loadedMu.Lock() require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() - // Try to load a model that wont fit + // This is a CPU load with NumGPU = 0 so it should load s.newServerFn = scenario3c.newServer slog.Info("scenario3c") - require.Len(t, s.loaded, 2) - scenario3a.ctxDone() // Won't help since this one isn't big enough to make room - time.Sleep(2 * time.Millisecond) s.pendingReqCh <- scenario3c.req - // finish prior request, so new model can load - time.Sleep(6 * time.Millisecond) - require.Len(t, s.loaded, 1) - scenario3b.ctxDone() select { case resp := <-scenario3c.req.successCh: require.Equal(t, resp.llama, scenario3c.srv) @@ -275,7 +296,36 @@ func TestRequests(t *testing.T) { case <-ctx.Done(): t.Errorf("timeout") } - require.Len(t, s.loaded, 1) + s.loadedMu.Lock() + require.Len(t, s.loaded, 3) + s.loadedMu.Unlock() + + // Try to load a model that wont fit + s.newServerFn = scenario3d.newServer + slog.Info("scenario3d") + s.loadedMu.Lock() + require.Len(t, s.loaded, 3) + s.loadedMu.Unlock() + scenario3a.ctxDone() // Won't help since this one isn't big enough to make room + time.Sleep(2 * time.Millisecond) + s.pendingReqCh <- scenario3d.req + // finish prior request, so new model can load + time.Sleep(6 * time.Millisecond) + s.loadedMu.Lock() + require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() + scenario3b.ctxDone() + select { + case resp := <-scenario3d.req.successCh: + require.Equal(t, resp.llama, scenario3d.srv) + require.Len(t, s.pendingReqCh, 0) + require.Len(t, scenario3d.req.errCh, 0) + case <-ctx.Done(): + t.Errorf("timeout") + } + s.loadedMu.Lock() + require.Len(t, s.loaded, 2) + s.loadedMu.Unlock() } func TestGetRunner(t *testing.T) { @@ -318,7 +368,9 @@ func TestGetRunner(t *testing.T) { t.Errorf("timeout") } scenario1a.ctxDone() + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() scenario1c.req.model.ModelPath = "bad path" slog.Info("scenario1c") @@ -328,7 +380,9 @@ func TestGetRunner(t *testing.T) { require.Len(t, errCh1c, 0) time.Sleep(5 * time.Millisecond) + s.loadedMu.Lock() require.Len(t, s.loaded, 0) + s.loadedMu.Unlock() require.Len(t, errCh1c, 1) err = <-errCh1c require.Contains(t, err.Error(), "bad path") @@ -358,7 +412,9 @@ func TestPrematureExpired(t *testing.T) { require.Equal(t, resp.llama, scenario1a.srv) require.Len(t, s.pendingReqCh, 0) require.Len(t, errCh1a, 0) + s.loadedMu.Lock() require.Len(t, s.loaded, 1) + s.loadedMu.Unlock() slog.Info("sending premature expired event now") s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe case <-ctx.Done(): @@ -383,6 +439,7 @@ func TestUseLoadedRunner(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) req := &LlmRequest{ ctx: ctx, + opts: api.DefaultOptions(), successCh: make(chan *runnerRef, 1), sessionDuration: 2, } @@ -426,8 +483,10 @@ func TestUpdateFreeSpace(t *testing.T) { r2 := &runnerRef{llama: llm2, gpus: gpus} s := InitScheduler(ctx) + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() s.updateFreeSpace(gpus) require.Equal(t, uint64(850), gpus[0].FreeMemory) @@ -437,13 +496,18 @@ func TestUpdateFreeSpace(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) defer done() - req := &LlmRequest{ctx: ctx} + req := &LlmRequest{ + ctx: ctx, + opts: api.DefaultOptions(), + } r1 := &runnerRef{refCount: 1, sessionDuration: 1} r2 := &runnerRef{sessionDuration: 2} s := InitScheduler(ctx) + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() resp := s.findRunnerToUnload(req) require.Equal(t, r2, resp) @@ -458,10 +522,11 @@ func TestNeedsReload(t *testing.T) { defer done() llm := &mockLlm{} + do := api.DefaultOptions() runner := &runnerRef{ adapters: []string{"adapter1"}, projectors: []string{"projector1"}, - Options: &api.Options{}, + Options: &do, llama: llm, } req := &LlmRequest{ @@ -469,7 +534,7 @@ func TestNeedsReload(t *testing.T) { AdapterPaths: []string{"adapter2"}, ProjectorPaths: []string{"projector2"}, }, - opts: api.Options{}, + opts: api.DefaultOptions(), } resp := runner.needsReload(ctx, req) require.True(t, resp) @@ -508,8 +573,10 @@ func TestUnloadAllRunners(t *testing.T) { r1 := &runnerRef{llama: llm1} r2 := &runnerRef{llama: llm2} + s.loadedMu.Lock() s.loaded["a"] = r1 s.loaded["b"] = r2 + s.loadedMu.Unlock() s.unloadAllRunners() require.True(t, llm1.closeCalled)