Merge pull request #4009 from dhiltgen/cpu_concurrency

Fix concurrency for CPU mode
This commit is contained in:
Daniel Hiltgen 2024-04-28 14:20:27 -07:00 committed by GitHub
commit 1e6a28bf5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 88 additions and 13 deletions

View file

@ -149,6 +149,14 @@ func (s *Scheduler) processPending(ctx context.Context) {
break break
} }
// If we're CPU only mode, just limit by loadedMax above
// TODO handle system memory exhaustion
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
slog.Debug("cpu mode with existing models, loading")
s.loadFn(pending, ggml, gpus)
break
}
// No models loaded. Load the model but prefer the best fit. // No models loaded. Load the model but prefer the best fit.
if loadedCount == 0 { if loadedCount == 0 {
slog.Debug("loading first model", "model", pending.model.ModelPath) slog.Debug("loading first model", "model", pending.model.ModelPath)

View file

@ -28,19 +28,33 @@ func TestInitScheduler(t *testing.T) {
ctx, done := context.WithCancel(context.Background()) ctx, done := context.WithCancel(context.Background())
defer done() defer done()
initialMax := loadedMax initialMax := loadedMax
initialParallel := numParallel
s := InitScheduler(ctx) s := InitScheduler(ctx)
require.Equal(t, initialMax, loadedMax) require.Equal(t, initialMax, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue") os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue")
s = InitScheduler(ctx) s = InitScheduler(ctx)
require.Equal(t, initialMax, loadedMax) require.Equal(t, initialMax, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0") os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0")
s = InitScheduler(ctx) s = InitScheduler(ctx)
require.Equal(t, 0, loadedMax) require.Equal(t, 0, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_NUM_PARALLEL", "blue")
_ = InitScheduler(ctx)
require.Equal(t, initialParallel, numParallel)
os.Setenv("OLLAMA_NUM_PARALLEL", "10")
_ = InitScheduler(ctx)
require.Equal(t, 10, numParallel)
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
@ -51,6 +65,7 @@ func TestLoad(t *testing.T) {
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
model: &Model{ModelPath: "foo"}, model: &Model{ModelPath: "foo"},
opts: api.DefaultOptions(),
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
sessionDuration: 2, sessionDuration: 2,
@ -63,7 +78,9 @@ func TestLoad(t *testing.T) {
s.load(req, ggml, gpus) s.load(req, ggml, gpus)
require.Len(t, req.successCh, 0) require.Len(t, req.successCh, 0)
require.Len(t, req.errCh, 1) require.Len(t, req.errCh, 1)
s.loadedMu.Lock()
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
s.loadedMu.Unlock()
err := <-req.errCh err := <-req.errCh
require.Contains(t, err.Error(), "this model may be incompatible") require.Contains(t, err.Error(), "this model may be incompatible")
@ -78,7 +95,9 @@ func TestLoad(t *testing.T) {
case resp := <-req.successCh: case resp := <-req.successCh:
require.Equal(t, uint64(10), resp.estimatedVRAM) require.Equal(t, uint64(10), resp.estimatedVRAM)
require.Equal(t, uint(1), resp.refCount) require.Equal(t, uint(1), resp.refCount)
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
} }
req.model.ModelPath = "dummy_model_path" req.model.ModelPath = "dummy_model_path"
@ -90,7 +109,9 @@ func TestLoad(t *testing.T) {
case resp := <-req.successCh: case resp := <-req.successCh:
t.Errorf("unexpected success %v", resp) t.Errorf("unexpected success %v", resp)
} }
s.loadedMu.Lock()
runner := s.loaded["dummy_model_path"] runner := s.loaded["dummy_model_path"]
s.loadedMu.Unlock()
require.NotNil(t, runner) require.NotNil(t, runner)
require.Equal(t, uint(0), runner.refCount) require.Equal(t, uint(0), runner.refCount)
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
@ -143,6 +164,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
scenario.req = &LlmRequest{ scenario.req = &LlmRequest{
ctx: scenario.ctx, ctx: scenario.ctx,
model: model, model: model,
opts: api.DefaultOptions(),
sessionDuration: 5 * time.Millisecond, sessionDuration: 5 * time.Millisecond,
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
@ -171,7 +193,9 @@ func TestRequests(t *testing.T) {
// Multiple loaded models // Multiple loaded models
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte) scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte)
scenario3c := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded scenario3c := newScenario(t, ctx, "ollama-model-4a", 30)
scenario3c.req.opts.NumGPU = 0 // CPU load, will be allowed
scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.getGpuFn = func() gpu.GpuInfoList { s.getGpuFn = func() gpu.GpuInfoList {
@ -240,7 +264,9 @@ func TestRequests(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
loadedMax = 0 loadedMax = 0
s.newServerFn = scenario3b.newServer s.newServerFn = scenario3b.newServer
@ -254,19 +280,14 @@ func TestRequests(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
s.loadedMu.Lock()
require.Len(t, s.loaded, 2) require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
// Try to load a model that wont fit // This is a CPU load with NumGPU = 0 so it should load
s.newServerFn = scenario3c.newServer s.newServerFn = scenario3c.newServer
slog.Info("scenario3c") slog.Info("scenario3c")
require.Len(t, s.loaded, 2)
scenario3a.ctxDone() // Won't help since this one isn't big enough to make room
time.Sleep(2 * time.Millisecond)
s.pendingReqCh <- scenario3c.req s.pendingReqCh <- scenario3c.req
// finish prior request, so new model can load
time.Sleep(6 * time.Millisecond)
require.Len(t, s.loaded, 1)
scenario3b.ctxDone()
select { select {
case resp := <-scenario3c.req.successCh: case resp := <-scenario3c.req.successCh:
require.Equal(t, resp.llama, scenario3c.srv) require.Equal(t, resp.llama, scenario3c.srv)
@ -275,7 +296,36 @@ func TestRequests(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
require.Len(t, s.loaded, 1) s.loadedMu.Lock()
require.Len(t, s.loaded, 3)
s.loadedMu.Unlock()
// Try to load a model that wont fit
s.newServerFn = scenario3d.newServer
slog.Info("scenario3d")
s.loadedMu.Lock()
require.Len(t, s.loaded, 3)
s.loadedMu.Unlock()
scenario3a.ctxDone() // Won't help since this one isn't big enough to make room
time.Sleep(2 * time.Millisecond)
s.pendingReqCh <- scenario3d.req
// finish prior request, so new model can load
time.Sleep(6 * time.Millisecond)
s.loadedMu.Lock()
require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
scenario3b.ctxDone()
select {
case resp := <-scenario3d.req.successCh:
require.Equal(t, resp.llama, scenario3d.srv)
require.Len(t, s.pendingReqCh, 0)
require.Len(t, scenario3d.req.errCh, 0)
case <-ctx.Done():
t.Errorf("timeout")
}
s.loadedMu.Lock()
require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
} }
func TestGetRunner(t *testing.T) { func TestGetRunner(t *testing.T) {
@ -318,7 +368,9 @@ func TestGetRunner(t *testing.T) {
t.Errorf("timeout") t.Errorf("timeout")
} }
scenario1a.ctxDone() scenario1a.ctxDone()
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
scenario1c.req.model.ModelPath = "bad path" scenario1c.req.model.ModelPath = "bad path"
slog.Info("scenario1c") slog.Info("scenario1c")
@ -328,7 +380,9 @@ func TestGetRunner(t *testing.T) {
require.Len(t, errCh1c, 0) require.Len(t, errCh1c, 0)
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
s.loadedMu.Lock()
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
s.loadedMu.Unlock()
require.Len(t, errCh1c, 1) require.Len(t, errCh1c, 1)
err = <-errCh1c err = <-errCh1c
require.Contains(t, err.Error(), "bad path") require.Contains(t, err.Error(), "bad path")
@ -358,7 +412,9 @@ func TestPrematureExpired(t *testing.T) {
require.Equal(t, resp.llama, scenario1a.srv) require.Equal(t, resp.llama, scenario1a.srv)
require.Len(t, s.pendingReqCh, 0) require.Len(t, s.pendingReqCh, 0)
require.Len(t, errCh1a, 0) require.Len(t, errCh1a, 0)
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
slog.Info("sending premature expired event now") slog.Info("sending premature expired event now")
s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
case <-ctx.Done(): case <-ctx.Done():
@ -383,6 +439,7 @@ func TestUseLoadedRunner(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
opts: api.DefaultOptions(),
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
sessionDuration: 2, sessionDuration: 2,
} }
@ -426,8 +483,10 @@ func TestUpdateFreeSpace(t *testing.T) {
r2 := &runnerRef{llama: llm2, gpus: gpus} r2 := &runnerRef{llama: llm2, gpus: gpus}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
s.updateFreeSpace(gpus) s.updateFreeSpace(gpus)
require.Equal(t, uint64(850), gpus[0].FreeMemory) require.Equal(t, uint64(850), gpus[0].FreeMemory)
@ -437,13 +496,18 @@ func TestUpdateFreeSpace(t *testing.T) {
func TestFindRunnerToUnload(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer done() defer done()
req := &LlmRequest{ctx: ctx} req := &LlmRequest{
ctx: ctx,
opts: api.DefaultOptions(),
}
r1 := &runnerRef{refCount: 1, sessionDuration: 1} r1 := &runnerRef{refCount: 1, sessionDuration: 1}
r2 := &runnerRef{sessionDuration: 2} r2 := &runnerRef{sessionDuration: 2}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
resp := s.findRunnerToUnload(req) resp := s.findRunnerToUnload(req)
require.Equal(t, r2, resp) require.Equal(t, r2, resp)
@ -458,10 +522,11 @@ func TestNeedsReload(t *testing.T) {
defer done() defer done()
llm := &mockLlm{} llm := &mockLlm{}
do := api.DefaultOptions()
runner := &runnerRef{ runner := &runnerRef{
adapters: []string{"adapter1"}, adapters: []string{"adapter1"},
projectors: []string{"projector1"}, projectors: []string{"projector1"},
Options: &api.Options{}, Options: &do,
llama: llm, llama: llm,
} }
req := &LlmRequest{ req := &LlmRequest{
@ -469,7 +534,7 @@ func TestNeedsReload(t *testing.T) {
AdapterPaths: []string{"adapter2"}, AdapterPaths: []string{"adapter2"},
ProjectorPaths: []string{"projector2"}, ProjectorPaths: []string{"projector2"},
}, },
opts: api.Options{}, opts: api.DefaultOptions(),
} }
resp := runner.needsReload(ctx, req) resp := runner.needsReload(ctx, req)
require.True(t, resp) require.True(t, resp)
@ -508,8 +573,10 @@ func TestUnloadAllRunners(t *testing.T) {
r1 := &runnerRef{llama: llm1} r1 := &runnerRef{llama: llm1}
r2 := &runnerRef{llama: llm2} r2 := &runnerRef{llama: llm2}
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
s.unloadAllRunners() s.unloadAllRunners()
require.True(t, llm1.closeCalled) require.True(t, llm1.closeCalled)