diff --git a/envconfig/config.go b/envconfig/config.go index e86f72e6..0f0f7f05 100644 --- a/envconfig/config.go +++ b/envconfig/config.go @@ -85,13 +85,13 @@ func AsMap() map[string]EnvVar { "OLLAMA_HOST": {"OLLAMA_HOST", Host, "IP Address for the ollama server (default 127.0.0.1:11434)"}, "OLLAMA_KEEP_ALIVE": {"OLLAMA_KEEP_ALIVE", KeepAlive, "The duration that models stay loaded in memory (default \"5m\")"}, "OLLAMA_LLM_LIBRARY": {"OLLAMA_LLM_LIBRARY", LLMLibrary, "Set LLM library to bypass autodetection"}, - "OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models (default 1)"}, + "OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models per GPU (default auto)"}, "OLLAMA_MAX_QUEUE": {"OLLAMA_MAX_QUEUE", MaxQueuedRequests, "Maximum number of queued requests"}, "OLLAMA_MAX_VRAM": {"OLLAMA_MAX_VRAM", MaxVRAM, "Maximum VRAM"}, "OLLAMA_MODELS": {"OLLAMA_MODELS", ModelsDir, "The path to the models directory"}, "OLLAMA_NOHISTORY": {"OLLAMA_NOHISTORY", NoHistory, "Do not preserve readline history"}, "OLLAMA_NOPRUNE": {"OLLAMA_NOPRUNE", NoPrune, "Do not prune model blobs on startup"}, - "OLLAMA_NUM_PARALLEL": {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests (default 1)"}, + "OLLAMA_NUM_PARALLEL": {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests (default auto)"}, "OLLAMA_ORIGINS": {"OLLAMA_ORIGINS", AllowOrigins, "A comma separated list of allowed origins"}, "OLLAMA_RUNNERS_DIR": {"OLLAMA_RUNNERS_DIR", RunnersDir, "Location for runners"}, "OLLAMA_SCHED_SPREAD": {"OLLAMA_SCHED_SPREAD", SchedSpread, "Always schedule model across all GPUs"}, @@ -129,8 +129,8 @@ func clean(key string) string { func init() { // default values - NumParallel = 1 - MaxRunners = 1 + NumParallel = 0 // Autoselect + MaxRunners = 0 // Autoselect MaxQueuedRequests = 512 LoadConfig() @@ -205,8 +205,8 @@ func LoadConfig() { if onp := clean("OLLAMA_NUM_PARALLEL"); onp != "" { val, err := strconv.Atoi(onp) - if err != nil || val <= 0 { - slog.Error("invalid setting must be greater than zero", "OLLAMA_NUM_PARALLEL", onp, "error", err) + if err != nil { + slog.Error("invalid setting, ignoring", "OLLAMA_NUM_PARALLEL", onp, "error", err) } else { NumParallel = val } @@ -251,7 +251,7 @@ func LoadConfig() { if maxRunners != "" { m, err := strconv.Atoi(maxRunners) if err != nil { - slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err) + slog.Error("invalid setting, ignoring", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err) } else { MaxRunners = m } @@ -260,7 +260,7 @@ func LoadConfig() { if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" { p, err := strconv.Atoi(onp) if err != nil || p <= 0 { - slog.Error("invalid setting", "OLLAMA_MAX_QUEUE", onp, "error", err) + slog.Error("invalid setting, ignoring", "OLLAMA_MAX_QUEUE", onp, "error", err) } else { MaxQueuedRequests = p } diff --git a/gpu/amd_windows.go b/gpu/amd_windows.go index 21585277..8b6fabeb 100644 --- a/gpu/amd_windows.go +++ b/gpu/amd_windows.go @@ -115,8 +115,6 @@ func AMDGetGPUInfo() []RocmGPUInfo { continue } - // TODO revisit this once ROCm v6 is available on windows. - // v5.7 only reports VRAM used by this process, so it's completely wrong and unusable slog.Debug("amdgpu memory", "gpu", i, "total", format.HumanBytes2(totalMemory)) slog.Debug("amdgpu memory", "gpu", i, "available", format.HumanBytes2(freeMemory)) gpuInfo := RocmGPUInfo{ @@ -126,6 +124,9 @@ func AMDGetGPUInfo() []RocmGPUInfo { TotalMemory: totalMemory, FreeMemory: freeMemory, }, + // Free memory reporting on Windows is not reliable until we bump to ROCm v6.2 + UnreliableFreeMemory: true, + ID: strconv.Itoa(i), // TODO this is probably wrong if we specify visible devices DependencyPath: libDir, MinimumMemory: rocmMinimumMemory, diff --git a/gpu/types.go b/gpu/types.go index 9920db5f..2eaa9bae 100644 --- a/gpu/types.go +++ b/gpu/types.go @@ -29,6 +29,11 @@ type GpuInfo struct { // Extra environment variables specific to the GPU as list of [key,value] EnvWorkarounds [][2]string `json:"envs,omitempty"` + // Set to true if we can NOT reliably discover FreeMemory. A value of true indicates + // the FreeMemory is best effort, and may over or under report actual memory usage + // False indicates FreeMemory can generally be trusted on this GPU + UnreliableFreeMemory bool + // GPU information ID string `json:"gpu_id"` // string to use for selection of this specific GPU Name string `json:"name"` // user friendly name if available diff --git a/llm/server.go b/llm/server.go index ad67138b..61346069 100644 --- a/llm/server.go +++ b/llm/server.go @@ -82,7 +82,7 @@ func LoadModel(model string, maxArraySize int) (*GGML, error) { // NewLlamaServer will run a server for the given GPUs // The gpu list must be a single family. -func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options) (LlamaServer, error) { +func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options, numParallel int) (LlamaServer, error) { var err error var cpuRunner string var estimate MemoryEstimate @@ -218,8 +218,10 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr // Windows CUDA should not use mmap for best performance // Linux with a model larger than free space, mmap leads to thrashing + // For CPU loads we want the memory to be allocated, not FS cache if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == api.TriStateUndefined) || (runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == api.TriStateUndefined) || + (gpus[0].Library == "cpu" && opts.UseMMap == api.TriStateUndefined) || opts.UseMMap == api.TriStateFalse { params = append(params, "--no-mmap") } @@ -232,15 +234,6 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr params = append(params, "--numa") } - numParallel := envconfig.NumParallel - - // TODO (jmorganca): multimodal models don't support parallel yet - // see https://github.com/ollama/ollama/issues/4165 - if len(projectors) > 0 { - numParallel = 1 - slog.Warn("multimodal models don't support parallel requests yet") - } - params = append(params, "--parallel", fmt.Sprintf("%d", numParallel)) if estimate.TensorSplit != "" { diff --git a/server/routes.go b/server/routes.go index ff66663c..76ead072 100644 --- a/server/routes.go +++ b/server/routes.go @@ -1237,6 +1237,11 @@ func (s *Server) ProcessHandler(c *gin.Context) { models = append(models, mr) } + slices.SortStableFunc(models, func(i, j api.ProcessModelResponse) int { + // longest duration remaining listed first + return cmp.Compare(j.ExpiresAt.Unix(), i.ExpiresAt.Unix()) + }) + c.JSON(http.StatusOK, api.ProcessResponse{Models: models}) } diff --git a/server/sched.go b/server/sched.go index 0084b533..87da1db4 100644 --- a/server/sched.go +++ b/server/sched.go @@ -23,6 +23,7 @@ type LlmRequest struct { ctx context.Context //nolint:containedctx model *Model opts api.Options + origNumCTX int // Track the initial ctx request sessionDuration time.Duration successCh chan *runnerRef errCh chan error @@ -38,13 +39,23 @@ type Scheduler struct { loaded map[string]*runnerRef loadedMu sync.Mutex - loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) - newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) + loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int) + newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) getGpuFn func() gpu.GpuInfoList getCpuFn func() gpu.GpuInfoList reschedDelay time.Duration } +// Default automatic value for number of models we allow per GPU +// Model will still need to fit in VRAM, but loading many small models +// on a large GPU can cause stalling +var defaultModelsPerGPU = 3 + +// Default automatic value for parallel setting +// Model will still need to fit in VRAM. If this setting wont fit +// we'll back off down to 1 to try to get it to fit +var defaultParallel = 4 + var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded") func InitScheduler(ctx context.Context) *Scheduler { @@ -65,13 +76,10 @@ func InitScheduler(ctx context.Context) *Scheduler { // context must be canceled to decrement ref count and release the runner func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) { - // allocate a large enough kv cache for all parallel requests if opts.NumCtx < 4 { opts.NumCtx = 4 } - opts.NumCtx *= envconfig.NumParallel - req := &LlmRequest{ ctx: c, model: model, @@ -110,11 +118,25 @@ func (s *Scheduler) processPending(ctx context.Context) { case pending := <-s.pendingReqCh: // Block other requests until we get this pending request running pending.schedAttempts++ + if pending.origNumCTX == 0 { + pending.origNumCTX = pending.opts.NumCtx + } if pending.ctx.Err() != nil { slog.Debug("pending request cancelled or timed out, skipping scheduling") continue } + numParallel := envconfig.NumParallel + // TODO (jmorganca): multimodal models don't support parallel yet + // see https://github.com/ollama/ollama/issues/4165 + if len(pending.model.ProjectorPaths) > 0 && numParallel != 1 { + numParallel = 1 + slog.Warn("multimodal models don't support parallel requests yet") + } + // Keep NumCtx and numParallel in sync + if numParallel > 1 { + pending.opts.NumCtx = pending.origNumCTX * numParallel + } for { var runnerToExpire *runnerRef @@ -143,6 +165,26 @@ func (s *Scheduler) processPending(ctx context.Context) { gpus = s.getGpuFn() } + if envconfig.MaxRunners <= 0 { + // No user specified MaxRunners, so figure out what automatic setting to use + // If all GPUs have reliable free memory reporting, defaultModelsPerGPU * the number of GPUs + // if any GPU has unreliable free memory reporting, 1x the number of GPUs + allReliable := true + for _, gpu := range gpus { + if gpu.UnreliableFreeMemory { + allReliable = false + break + } + } + if allReliable { + envconfig.MaxRunners = defaultModelsPerGPU * len(gpus) + slog.Debug("updating default concurrency", "OLLAMA_MAX_LOADED_MODELS", envconfig.MaxRunners, "gpu_count", len(gpus)) + } else { + slog.Info("one or more GPUs detected that are unable to accurately report free memory - disabling default concurrency") + envconfig.MaxRunners = len(gpus) + } + } + // Load model for fitting ggml, err := llm.LoadModel(pending.model.ModelPath, 0) if err != nil { @@ -152,26 +194,32 @@ func (s *Scheduler) processPending(ctx context.Context) { // Evaluate if the model will fit in the available system memory, or if we should unload a model first if len(gpus) == 1 && gpus[0].Library == "cpu" { + // simplifying assumption of defaultParallel when in CPU mode + if numParallel <= 0 { + numParallel = defaultParallel + pending.opts.NumCtx = pending.origNumCTX * numParallel + } + if loadedCount == 0 { slog.Debug("cpu mode with first model, loading") - s.loadFn(pending, ggml, gpus) + s.loadFn(pending, ggml, gpus, numParallel) break } runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus) if runnerToExpire == nil { slog.Debug("cpu mode with available system memory or first model, loading") - s.loadFn(pending, ggml, gpus) + s.loadFn(pending, ggml, gpus, numParallel) break } // else we need to expire a runner } else if loadedCount == 0 { // No models loaded. Load the model but prefer the best fit. slog.Debug("loading first model", "model", pending.model.ModelPath) - g := pickBestFitGPUs(pending, ggml, gpus) + g := pickBestFitGPUs(pending, ggml, gpus, &numParallel) if g != nil { gpus = g } - s.loadFn(pending, ggml, gpus) + s.loadFn(pending, ggml, gpus, numParallel) break } @@ -186,10 +234,10 @@ func (s *Scheduler) processPending(ctx context.Context) { // Update free memory from currently loaded models s.updateFreeSpace(availGpus) - fitGpus := pickBestFitGPUs(pending, ggml, availGpus) + fitGpus := pickBestFitGPUs(pending, ggml, availGpus, &numParallel) if fitGpus != nil { slog.Debug("new model fits with existing models, loading") - s.loadFn(pending, ggml, fitGpus) + s.loadFn(pending, ggml, fitGpus, numParallel) break } @@ -350,8 +398,11 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm }() } -func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) { - llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts) +func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int) { + if numParallel < 1 { + numParallel = 1 + } + llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts, numParallel) if err != nil { // some older models are not compatible with newer versions of llama.cpp // show a generalized compatibility error until there is a better way to @@ -375,6 +426,7 @@ func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) loading: true, refCount: 1, } + runner.numParallel = numParallel runner.refMu.Lock() s.loadedMu.Lock() @@ -483,8 +535,9 @@ type runnerRef struct { expireTimer *time.Timer expiresAt time.Time - model *Model - modelPath string + model *Model + modelPath string + numParallel int *api.Options } @@ -525,6 +578,9 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool optsNew.NumGPU = -1 } + // Normalize the NumCtx for parallelism + optsExisting.NumCtx = optsExisting.NumCtx / runner.numParallel + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed? @@ -611,22 +667,38 @@ func (a ByDuration) Less(i, j int) bool { // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits // If the model can not be fit fully within the available GPU(s) nil is returned -func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList { +// If numParallel is <= 0, this will attempt try to optimize parallism based on available VRAM, and adjust +// opts.NumCtx accordingly +func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel *int) gpu.GpuInfoList { var estimatedVRAM uint64 + + var numParallelToTry []int + if *numParallel <= 0 { + // If no specific parallel setting was provided, try larger then smaller, always end with 1 + numParallelToTry = append(numParallelToTry, defaultParallel, 1) + } else { + numParallelToTry = []int{*numParallel} + } + for _, gl := range gpus.ByLibrary() { var ok bool sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...) // TODO - potentially sort by performance capability, existing models loaded, etc. + // TODO - Eliminate any GPUs that already have envconfig.MaxRunners loaded on them // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl))) // First attempt to fit the model into a single GPU - if !envconfig.SchedSpread { - for _, g := range sgl { - if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { - slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) - return []gpu.GpuInfo{g} + for _, p := range numParallelToTry { + req.opts.NumCtx = req.origNumCTX * p + if !envconfig.SchedSpread { + for _, g := range sgl { + if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { + slog.Info("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "parallel", p, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) + *numParallel = p + return []gpu.GpuInfo{g} + } } } } @@ -636,9 +708,13 @@ func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu. // - try subsets of GPUs instead of just falling back to 1 or all in a family // Now try all the GPUs - if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { - slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "required", format.HumanBytes2(estimatedVRAM)) - return sgl + for _, p := range numParallelToTry { + req.opts.NumCtx = req.origNumCTX * p + if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { + slog.Info("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "parallel", p, "required", format.HumanBytes2(estimatedVRAM)) + *numParallel = p + return sgl + } } } return nil diff --git a/server/sched_test.go b/server/sched_test.go index 4a1cf72a..be0830a3 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -47,11 +47,11 @@ func TestLoad(t *testing.T) { sessionDuration: 2, } // Fail to load model first - s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { + s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) { return nil, fmt.Errorf("something failed to load model blah") } gpus := gpu.GpuInfoList{} - s.load(req, ggml, gpus) + s.load(req, ggml, gpus, 0) require.Empty(t, req.successCh) require.Len(t, req.errCh, 1) s.loadedMu.Lock() @@ -61,10 +61,10 @@ func TestLoad(t *testing.T) { require.Contains(t, err.Error(), "this model may be incompatible") server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}} - s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { + s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) { return server, nil } - s.load(req, ggml, gpus) + s.load(req, ggml, gpus, 0) select { case err := <-req.errCh: require.NoError(t, err) @@ -78,12 +78,12 @@ func TestLoad(t *testing.T) { req.model.ModelPath = "dummy_model_path" server.waitResp = fmt.Errorf("wait failure") - s.load(req, ggml, gpus) + s.load(req, ggml, gpus, 0) select { case err := <-req.errCh: require.Contains(t, err.Error(), "wait failure") case resp := <-req.successCh: - t.Errorf("unexpected success %v", resp) + t.Fatalf("unexpected success %v", resp) } s.loadedMu.Lock() runner := s.loaded["dummy_model_path"] @@ -102,7 +102,7 @@ type bundle struct { ggml *llm.GGML } -func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { +func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) { return scenario.srv, nil } @@ -200,7 +200,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario1a.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } // Same runner as first request due to not needing a reload @@ -213,7 +213,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario1b.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } // Trigger a reload @@ -231,7 +231,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario2a.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } envconfig.MaxRunners = 1 @@ -247,7 +247,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario3a.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } s.loadedMu.Lock() require.Len(t, s.loaded, 1) @@ -263,7 +263,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario3b.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } s.loadedMu.Lock() require.Len(t, s.loaded, 2) @@ -279,7 +279,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario3c.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } s.loadedMu.Lock() require.Len(t, s.loaded, 3) @@ -306,7 +306,7 @@ func TestRequests(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, scenario3d.req.errCh) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } s.loadedMu.Lock() require.Len(t, s.loaded, 2) @@ -349,7 +349,7 @@ func TestGetRunner(t *testing.T) { require.Empty(t, s.pendingReqCh) require.Empty(t, errCh1a) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } scenario1a.ctxDone() s.loadedMu.Lock() @@ -400,7 +400,7 @@ func TestPrematureExpired(t *testing.T) { slog.Info("sending premature expired event now") s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } time.Sleep(scenario1a.req.sessionDuration) scenario1a.ctxDone() @@ -427,7 +427,7 @@ func TestUseLoadedRunner(t *testing.T) { } finished := make(chan *LlmRequest) llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} - r1 := &runnerRef{llama: llm1, sessionDuration: 1} + r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1} req.useLoadedRunner(r1, finished) require.Equal(t, uint(1), r1.refCount) require.Equal(t, time.Duration(2), r1.sessionDuration) @@ -435,7 +435,7 @@ func TestUseLoadedRunner(t *testing.T) { case success := <-req.successCh: require.Equal(t, r1, success) case <-ctx.Done(): - t.Errorf("timeout") + t.Fatal("timeout") } done() fin := <-finished @@ -461,8 +461,8 @@ func TestUpdateFreeSpace(t *testing.T) { gpus[1].FreeMemory = 1900 llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}} llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}} - r1 := &runnerRef{llama: llm1, gpus: gpus} - r2 := &runnerRef{llama: llm2, gpus: gpus} + r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1} + r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1} s := InitScheduler(ctx) s.loadedMu.Lock() @@ -513,8 +513,8 @@ func TestFindRunnerToUnload(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) defer done() - r1 := &runnerRef{refCount: 1, sessionDuration: 1} - r2 := &runnerRef{sessionDuration: 2} + r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1} + r2 := &runnerRef{sessionDuration: 2, numParallel: 1} s := InitScheduler(ctx) s.loadedMu.Lock() @@ -536,9 +536,13 @@ func TestNeedsReload(t *testing.T) { llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} do := api.DefaultOptions() runner := &runnerRef{ - model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}}, - Options: &do, - llama: llm, + model: &Model{ + AdapterPaths: []string{"adapter1"}, + ProjectorPaths: []string{"projector1"}, + }, + Options: &do, + llama: llm, + numParallel: 1, } req := &LlmRequest{ model: &Model{ @@ -581,8 +585,8 @@ func TestUnloadAllRunners(t *testing.T) { s := InitScheduler(ctx) s.unloadAllRunners() - r1 := &runnerRef{llama: llm1} - r2 := &runnerRef{llama: llm2} + r1 := &runnerRef{llama: llm1, numParallel: 1} + r2 := &runnerRef{llama: llm2, numParallel: 1} s.loadedMu.Lock() s.loaded["a"] = r1 @@ -596,14 +600,32 @@ func TestUnloadAllRunners(t *testing.T) { func TestUnload(t *testing.T) { llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} - r1 := &runnerRef{llama: llm1} - r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}} + r1 := &runnerRef{llama: llm1, numParallel: 1} + r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1} r1.unload() require.True(t, llm1.closeCalled) r2.unload() require.Nil(t, r2.model) } +func TestAlreadyCanceled(t *testing.T) { + ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer done() + dctx, done2 := context.WithCancel(ctx) + done2() + scenario1a := newScenario(t, dctx, "ollama-model-1", 10) + scenario1a.req.sessionDuration = 0 + s := InitScheduler(ctx) + slog.Info("scenario1a") + s.pendingReqCh <- scenario1a.req + require.Len(t, s.pendingReqCh, 1) + s.Run(ctx) + time.Sleep(5 * time.Millisecond) + require.Empty(t, s.pendingReqCh) + require.Empty(t, scenario1a.req.errCh) + require.Empty(t, scenario1a.req.successCh) +} + type mockLlm struct { pingResp error waitResp error