Enable concurrency by default

This adjusts our default settings to enable multiple models and parallel
requests to a single model.  Users can still override these by the same
env var settings as before.  Parallel has a direct impact on
num_ctx, which in turn can have a significant impact on small VRAM GPUs
so this change also refines the algorithm so that when parallel is not
explicitly set by the user, we try to find a reasonable default that fits
the model on their GPU(s).  As before, multiple models will only load
concurrently if they fully fit in VRAM.
This commit is contained in:
Daniel Hiltgen 2024-05-06 17:47:52 -07:00
parent c7c2f3bc22
commit 17b7186cd7
4 changed files with 135 additions and 72 deletions

View file

@ -85,13 +85,13 @@ func AsMap() map[string]EnvVar {
"OLLAMA_HOST": {"OLLAMA_HOST", Host, "IP Address for the ollama server (default 127.0.0.1:11434)"}, "OLLAMA_HOST": {"OLLAMA_HOST", Host, "IP Address for the ollama server (default 127.0.0.1:11434)"},
"OLLAMA_KEEP_ALIVE": {"OLLAMA_KEEP_ALIVE", KeepAlive, "The duration that models stay loaded in memory (default \"5m\")"}, "OLLAMA_KEEP_ALIVE": {"OLLAMA_KEEP_ALIVE", KeepAlive, "The duration that models stay loaded in memory (default \"5m\")"},
"OLLAMA_LLM_LIBRARY": {"OLLAMA_LLM_LIBRARY", LLMLibrary, "Set LLM library to bypass autodetection"}, "OLLAMA_LLM_LIBRARY": {"OLLAMA_LLM_LIBRARY", LLMLibrary, "Set LLM library to bypass autodetection"},
"OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models (default 1)"}, "OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models per GPU (default 4)"},
"OLLAMA_MAX_QUEUE": {"OLLAMA_MAX_QUEUE", MaxQueuedRequests, "Maximum number of queued requests"}, "OLLAMA_MAX_QUEUE": {"OLLAMA_MAX_QUEUE", MaxQueuedRequests, "Maximum number of queued requests"},
"OLLAMA_MAX_VRAM": {"OLLAMA_MAX_VRAM", MaxVRAM, "Maximum VRAM"}, "OLLAMA_MAX_VRAM": {"OLLAMA_MAX_VRAM", MaxVRAM, "Maximum VRAM"},
"OLLAMA_MODELS": {"OLLAMA_MODELS", ModelsDir, "The path to the models directory"}, "OLLAMA_MODELS": {"OLLAMA_MODELS", ModelsDir, "The path to the models directory"},
"OLLAMA_NOHISTORY": {"OLLAMA_NOHISTORY", NoHistory, "Do not preserve readline history"}, "OLLAMA_NOHISTORY": {"OLLAMA_NOHISTORY", NoHistory, "Do not preserve readline history"},
"OLLAMA_NOPRUNE": {"OLLAMA_NOPRUNE", NoPrune, "Do not prune model blobs on startup"}, "OLLAMA_NOPRUNE": {"OLLAMA_NOPRUNE", NoPrune, "Do not prune model blobs on startup"},
"OLLAMA_NUM_PARALLEL": {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests (default 1)"}, "OLLAMA_NUM_PARALLEL": {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests"},
"OLLAMA_ORIGINS": {"OLLAMA_ORIGINS", AllowOrigins, "A comma separated list of allowed origins"}, "OLLAMA_ORIGINS": {"OLLAMA_ORIGINS", AllowOrigins, "A comma separated list of allowed origins"},
"OLLAMA_RUNNERS_DIR": {"OLLAMA_RUNNERS_DIR", RunnersDir, "Location for runners"}, "OLLAMA_RUNNERS_DIR": {"OLLAMA_RUNNERS_DIR", RunnersDir, "Location for runners"},
"OLLAMA_SCHED_SPREAD": {"OLLAMA_SCHED_SPREAD", SchedSpread, "Always schedule model across all GPUs"}, "OLLAMA_SCHED_SPREAD": {"OLLAMA_SCHED_SPREAD", SchedSpread, "Always schedule model across all GPUs"},
@ -129,8 +129,8 @@ func clean(key string) string {
func init() { func init() {
// default values // default values
NumParallel = 1 NumParallel = 0
MaxRunners = 1 MaxRunners = 4
MaxQueuedRequests = 512 MaxQueuedRequests = 512
LoadConfig() LoadConfig()
@ -205,8 +205,8 @@ func LoadConfig() {
if onp := clean("OLLAMA_NUM_PARALLEL"); onp != "" { if onp := clean("OLLAMA_NUM_PARALLEL"); onp != "" {
val, err := strconv.Atoi(onp) val, err := strconv.Atoi(onp)
if err != nil || val <= 0 { if err != nil {
slog.Error("invalid setting must be greater than zero", "OLLAMA_NUM_PARALLEL", onp, "error", err) slog.Error("invalid setting, ignoring", "OLLAMA_NUM_PARALLEL", onp, "error", err)
} else { } else {
NumParallel = val NumParallel = val
} }
@ -251,7 +251,7 @@ func LoadConfig() {
if maxRunners != "" { if maxRunners != "" {
m, err := strconv.Atoi(maxRunners) m, err := strconv.Atoi(maxRunners)
if err != nil { if err != nil {
slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err) slog.Error("invalid setting, ignoring", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
} else { } else {
MaxRunners = m MaxRunners = m
} }
@ -260,7 +260,7 @@ func LoadConfig() {
if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" { if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" {
p, err := strconv.Atoi(onp) p, err := strconv.Atoi(onp)
if err != nil || p <= 0 { if err != nil || p <= 0 {
slog.Error("invalid setting", "OLLAMA_MAX_QUEUE", onp, "error", err) slog.Error("invalid setting, ignoring", "OLLAMA_MAX_QUEUE", onp, "error", err)
} else { } else {
MaxQueuedRequests = p MaxQueuedRequests = p
} }

View file

@ -77,7 +77,7 @@ func LoadModel(model string) (*GGML, error) {
// NewLlamaServer will run a server for the given GPUs // NewLlamaServer will run a server for the given GPUs
// The gpu list must be a single family. // The gpu list must be a single family.
func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options) (LlamaServer, error) { func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options, numParallel int) (LlamaServer, error) {
var err error var err error
var cpuRunner string var cpuRunner string
var estimate MemoryEstimate var estimate MemoryEstimate
@ -213,8 +213,10 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
// Windows CUDA should not use mmap for best performance // Windows CUDA should not use mmap for best performance
// Linux with a model larger than free space, mmap leads to thrashing // Linux with a model larger than free space, mmap leads to thrashing
// For CPU loads we want the memory to be allocated, not FS cache
if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == api.TriStateUndefined) || if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == api.TriStateUndefined) ||
(runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == api.TriStateUndefined) || (runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == api.TriStateUndefined) ||
(gpus[0].Library == "cpu" && opts.UseMMap == api.TriStateUndefined) ||
opts.UseMMap == api.TriStateFalse { opts.UseMMap == api.TriStateFalse {
params = append(params, "--no-mmap") params = append(params, "--no-mmap")
} }
@ -227,15 +229,6 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
params = append(params, "--numa") params = append(params, "--numa")
} }
numParallel := envconfig.NumParallel
// TODO (jmorganca): multimodal models don't support parallel yet
// see https://github.com/ollama/ollama/issues/4165
if len(projectors) > 0 {
numParallel = 1
slog.Warn("multimodal models don't support parallel requests yet")
}
params = append(params, "--parallel", fmt.Sprintf("%d", numParallel)) params = append(params, "--parallel", fmt.Sprintf("%d", numParallel))
if estimate.TensorSplit != "" { if estimate.TensorSplit != "" {

View file

@ -23,6 +23,7 @@ type LlmRequest struct {
ctx context.Context //nolint:containedctx ctx context.Context //nolint:containedctx
model *Model model *Model
opts api.Options opts api.Options
origNumCTX int // Track the initial ctx request
sessionDuration time.Duration sessionDuration time.Duration
successCh chan *runnerRef successCh chan *runnerRef
errCh chan error errCh chan error
@ -38,8 +39,8 @@ type Scheduler struct {
loaded map[string]*runnerRef loaded map[string]*runnerRef
loadedMu sync.Mutex loadedMu sync.Mutex
loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int)
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error)
getGpuFn func() gpu.GpuInfoList getGpuFn func() gpu.GpuInfoList
getCpuFn func() gpu.GpuInfoList getCpuFn func() gpu.GpuInfoList
reschedDelay time.Duration reschedDelay time.Duration
@ -65,13 +66,10 @@ func InitScheduler(ctx context.Context) *Scheduler {
// context must be canceled to decrement ref count and release the runner // context must be canceled to decrement ref count and release the runner
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) { func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
// allocate a large enough kv cache for all parallel requests
if opts.NumCtx < 4 { if opts.NumCtx < 4 {
opts.NumCtx = 4 opts.NumCtx = 4
} }
opts.NumCtx *= envconfig.NumParallel
req := &LlmRequest{ req := &LlmRequest{
ctx: c, ctx: c,
model: model, model: model,
@ -102,6 +100,7 @@ func (s *Scheduler) Run(ctx context.Context) {
} }
func (s *Scheduler) processPending(ctx context.Context) { func (s *Scheduler) processPending(ctx context.Context) {
maxRunnerFactor := 1 // number of GPUs or 1
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -110,11 +109,25 @@ func (s *Scheduler) processPending(ctx context.Context) {
case pending := <-s.pendingReqCh: case pending := <-s.pendingReqCh:
// Block other requests until we get this pending request running // Block other requests until we get this pending request running
pending.schedAttempts++ pending.schedAttempts++
if pending.origNumCTX == 0 {
pending.origNumCTX = pending.opts.NumCtx
}
if pending.ctx.Err() != nil { if pending.ctx.Err() != nil {
slog.Debug("pending request cancelled or timed out, skipping scheduling") slog.Debug("pending request cancelled or timed out, skipping scheduling")
continue continue
} }
numParallel := envconfig.NumParallel
// TODO (jmorganca): multimodal models don't support parallel yet
// see https://github.com/ollama/ollama/issues/4165
if len(pending.model.ProjectorPaths) > 0 && numParallel != 1 {
numParallel = 1
slog.Warn("multimodal models don't support parallel requests yet")
}
// Keep NumCtx and numParallel in sync
if numParallel > 1 {
pending.opts.NumCtx = pending.origNumCTX * numParallel
}
for { for {
var runnerToExpire *runnerRef var runnerToExpire *runnerRef
@ -130,7 +143,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
pending.useLoadedRunner(runner, s.finishedReqCh) pending.useLoadedRunner(runner, s.finishedReqCh)
break break
} }
} else if envconfig.MaxRunners > 0 && loadedCount >= envconfig.MaxRunners { } else if envconfig.MaxRunners > 0 && loadedCount >= (maxRunnerFactor*envconfig.MaxRunners) {
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount) slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
runnerToExpire = s.findRunnerToUnload() runnerToExpire = s.findRunnerToUnload()
} else { } else {
@ -142,6 +155,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
} else { } else {
gpus = s.getGpuFn() gpus = s.getGpuFn()
} }
maxRunnerFactor = max(len(gpus), 1)
// Load model for fitting // Load model for fitting
ggml, err := llm.LoadModel(pending.model.ModelPath) ggml, err := llm.LoadModel(pending.model.ModelPath)
@ -152,26 +166,32 @@ func (s *Scheduler) processPending(ctx context.Context) {
// Evaluate if the model will fit in the available system memory, or if we should unload a model first // Evaluate if the model will fit in the available system memory, or if we should unload a model first
if len(gpus) == 1 && gpus[0].Library == "cpu" { if len(gpus) == 1 && gpus[0].Library == "cpu" {
// simplifying assumption of defaultParallel when in CPU mode
if numParallel <= 0 {
numParallel = defaultParallel
pending.opts.NumCtx = pending.origNumCTX * numParallel
}
if loadedCount == 0 { if loadedCount == 0 {
slog.Debug("cpu mode with first model, loading") slog.Debug("cpu mode with first model, loading")
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, gpus, numParallel)
break break
} }
runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus) runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus)
if runnerToExpire == nil { if runnerToExpire == nil {
slog.Debug("cpu mode with available system memory or first model, loading") slog.Debug("cpu mode with available system memory or first model, loading")
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, gpus, numParallel)
break break
} }
// else we need to expire a runner // else we need to expire a runner
} else if loadedCount == 0 { } else if loadedCount == 0 {
// No models loaded. Load the model but prefer the best fit. // No models loaded. Load the model but prefer the best fit.
slog.Debug("loading first model", "model", pending.model.ModelPath) slog.Debug("loading first model", "model", pending.model.ModelPath)
g := pickBestFitGPUs(pending, ggml, gpus) g := pickBestFitGPUs(pending, ggml, gpus, &numParallel)
if g != nil { if g != nil {
gpus = g gpus = g
} }
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, gpus, numParallel)
break break
} }
@ -186,10 +206,10 @@ func (s *Scheduler) processPending(ctx context.Context) {
// Update free memory from currently loaded models // Update free memory from currently loaded models
s.updateFreeSpace(availGpus) s.updateFreeSpace(availGpus)
fitGpus := pickBestFitGPUs(pending, ggml, availGpus) fitGpus := pickBestFitGPUs(pending, ggml, availGpus, &numParallel)
if fitGpus != nil { if fitGpus != nil {
slog.Debug("new model fits with existing models, loading") slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, ggml, fitGpus) s.loadFn(pending, ggml, fitGpus, numParallel)
break break
} }
@ -350,8 +370,11 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
}() }()
} }
func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) { func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int) {
llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts) if numParallel < 1 {
numParallel = 1
}
llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts, numParallel)
if err != nil { if err != nil {
// some older models are not compatible with newer versions of llama.cpp // some older models are not compatible with newer versions of llama.cpp
// show a generalized compatibility error until there is a better way to // show a generalized compatibility error until there is a better way to
@ -375,6 +398,7 @@ func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
loading: true, loading: true,
refCount: 1, refCount: 1,
} }
runner.numParallel = numParallel
runner.refMu.Lock() runner.refMu.Lock()
s.loadedMu.Lock() s.loadedMu.Lock()
@ -483,8 +507,9 @@ type runnerRef struct {
expireTimer *time.Timer expireTimer *time.Timer
expiresAt time.Time expiresAt time.Time
model *Model model *Model
modelPath string modelPath string
numParallel int
*api.Options *api.Options
} }
@ -525,6 +550,9 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
optsNew.NumGPU = -1 optsNew.NumGPU = -1
} }
// Normalize the NumCtx for parallelism
optsExisting.NumCtx = optsExisting.NumCtx / runner.numParallel
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()
if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed? if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed?
@ -611,22 +639,38 @@ func (a ByDuration) Less(i, j int) bool {
// pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
// If the model can not be fit fully within the available GPU(s) nil is returned // If the model can not be fit fully within the available GPU(s) nil is returned
func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList { // If numParallel is <= 0, this will attempt try to optimize parallism based on available VRAM, and adjust
// opts.NumCtx accordingly
func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel *int) gpu.GpuInfoList {
var estimatedVRAM uint64 var estimatedVRAM uint64
var numParallelToTry []int
if *numParallel <= 0 {
// If no specific parallel setting was provided, try larger then smaller, always end with 1
numParallelToTry = append(numParallelToTry, 4, 1)
} else {
numParallelToTry = []int{*numParallel}
}
for _, gl := range gpus.ByLibrary() { for _, gl := range gpus.ByLibrary() {
var ok bool var ok bool
sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...) sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
// TODO - potentially sort by performance capability, existing models loaded, etc. // TODO - potentially sort by performance capability, existing models loaded, etc.
// TODO - Eliminate any GPUs that already have envconfig.MaxRunners loaded on them
// Note: at present, this will favor more VRAM over faster GPU speed in mixed setups // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl))) sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
// First attempt to fit the model into a single GPU // First attempt to fit the model into a single GPU
if !envconfig.SchedSpread { for _, p := range numParallelToTry {
for _, g := range sgl { req.opts.NumCtx = req.origNumCTX * p
if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { if !envconfig.SchedSpread {
slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) for _, g := range sgl {
return []gpu.GpuInfo{g} if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
slog.Info("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "parallel", p, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
*numParallel = p
return []gpu.GpuInfo{g}
}
} }
} }
} }
@ -636,9 +680,13 @@ func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.
// - try subsets of GPUs instead of just falling back to 1 or all in a family // - try subsets of GPUs instead of just falling back to 1 or all in a family
// Now try all the GPUs // Now try all the GPUs
if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { for _, p := range numParallelToTry {
slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "required", format.HumanBytes2(estimatedVRAM)) req.opts.NumCtx = req.origNumCTX * p
return sgl if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
slog.Info("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "parallel", p, "required", format.HumanBytes2(estimatedVRAM))
*numParallel = p
return sgl
}
} }
} }
return nil return nil

View file

@ -47,11 +47,11 @@ func TestLoad(t *testing.T) {
sessionDuration: 2, sessionDuration: 2,
} }
// Fail to load model first // Fail to load model first
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
return nil, fmt.Errorf("something failed to load model blah") return nil, fmt.Errorf("something failed to load model blah")
} }
gpus := gpu.GpuInfoList{} gpus := gpu.GpuInfoList{}
s.load(req, ggml, gpus) s.load(req, ggml, gpus, 0)
require.Empty(t, req.successCh) require.Empty(t, req.successCh)
require.Len(t, req.errCh, 1) require.Len(t, req.errCh, 1)
s.loadedMu.Lock() s.loadedMu.Lock()
@ -61,10 +61,10 @@ func TestLoad(t *testing.T) {
require.Contains(t, err.Error(), "this model may be incompatible") require.Contains(t, err.Error(), "this model may be incompatible")
server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}} server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
return server, nil return server, nil
} }
s.load(req, ggml, gpus) s.load(req, ggml, gpus, 0)
select { select {
case err := <-req.errCh: case err := <-req.errCh:
require.NoError(t, err) require.NoError(t, err)
@ -78,12 +78,12 @@ func TestLoad(t *testing.T) {
req.model.ModelPath = "dummy_model_path" req.model.ModelPath = "dummy_model_path"
server.waitResp = fmt.Errorf("wait failure") server.waitResp = fmt.Errorf("wait failure")
s.load(req, ggml, gpus) s.load(req, ggml, gpus, 0)
select { select {
case err := <-req.errCh: case err := <-req.errCh:
require.Contains(t, err.Error(), "wait failure") require.Contains(t, err.Error(), "wait failure")
case resp := <-req.successCh: case resp := <-req.successCh:
t.Errorf("unexpected success %v", resp) t.Fatalf("unexpected success %v", resp)
} }
s.loadedMu.Lock() s.loadedMu.Lock()
runner := s.loaded["dummy_model_path"] runner := s.loaded["dummy_model_path"]
@ -102,7 +102,7 @@ type bundle struct {
ggml *llm.GGML ggml *llm.GGML
} }
func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
return scenario.srv, nil return scenario.srv, nil
} }
@ -200,7 +200,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario1a.req.errCh) require.Empty(t, scenario1a.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
// Same runner as first request due to not needing a reload // Same runner as first request due to not needing a reload
@ -213,7 +213,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario1b.req.errCh) require.Empty(t, scenario1b.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
// Trigger a reload // Trigger a reload
@ -231,7 +231,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario2a.req.errCh) require.Empty(t, scenario2a.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
envconfig.MaxRunners = 1 envconfig.MaxRunners = 1
@ -247,7 +247,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario3a.req.errCh) require.Empty(t, scenario3a.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
s.loadedMu.Lock() s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
@ -263,7 +263,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario3b.req.errCh) require.Empty(t, scenario3b.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
s.loadedMu.Lock() s.loadedMu.Lock()
require.Len(t, s.loaded, 2) require.Len(t, s.loaded, 2)
@ -279,7 +279,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario3c.req.errCh) require.Empty(t, scenario3c.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
s.loadedMu.Lock() s.loadedMu.Lock()
require.Len(t, s.loaded, 3) require.Len(t, s.loaded, 3)
@ -306,7 +306,7 @@ func TestRequests(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario3d.req.errCh) require.Empty(t, scenario3d.req.errCh)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
s.loadedMu.Lock() s.loadedMu.Lock()
require.Len(t, s.loaded, 2) require.Len(t, s.loaded, 2)
@ -349,7 +349,7 @@ func TestGetRunner(t *testing.T) {
require.Empty(t, s.pendingReqCh) require.Empty(t, s.pendingReqCh)
require.Empty(t, errCh1a) require.Empty(t, errCh1a)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
scenario1a.ctxDone() scenario1a.ctxDone()
s.loadedMu.Lock() s.loadedMu.Lock()
@ -400,7 +400,7 @@ func TestPrematureExpired(t *testing.T) {
slog.Info("sending premature expired event now") slog.Info("sending premature expired event now")
s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
time.Sleep(scenario1a.req.sessionDuration) time.Sleep(scenario1a.req.sessionDuration)
scenario1a.ctxDone() scenario1a.ctxDone()
@ -427,7 +427,7 @@ func TestUseLoadedRunner(t *testing.T) {
} }
finished := make(chan *LlmRequest) finished := make(chan *LlmRequest)
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1, sessionDuration: 1} r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1}
req.useLoadedRunner(r1, finished) req.useLoadedRunner(r1, finished)
require.Equal(t, uint(1), r1.refCount) require.Equal(t, uint(1), r1.refCount)
require.Equal(t, time.Duration(2), r1.sessionDuration) require.Equal(t, time.Duration(2), r1.sessionDuration)
@ -435,7 +435,7 @@ func TestUseLoadedRunner(t *testing.T) {
case success := <-req.successCh: case success := <-req.successCh:
require.Equal(t, r1, success) require.Equal(t, r1, success)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Fatal("timeout")
} }
done() done()
fin := <-finished fin := <-finished
@ -461,8 +461,8 @@ func TestUpdateFreeSpace(t *testing.T) {
gpus[1].FreeMemory = 1900 gpus[1].FreeMemory = 1900
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}} llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
r1 := &runnerRef{llama: llm1, gpus: gpus} r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1}
r2 := &runnerRef{llama: llm2, gpus: gpus} r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock() s.loadedMu.Lock()
@ -513,8 +513,8 @@ func TestFindRunnerToUnload(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done() defer done()
r1 := &runnerRef{refCount: 1, sessionDuration: 1} r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1}
r2 := &runnerRef{sessionDuration: 2} r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock() s.loadedMu.Lock()
@ -536,9 +536,13 @@ func TestNeedsReload(t *testing.T) {
llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
do := api.DefaultOptions() do := api.DefaultOptions()
runner := &runnerRef{ runner := &runnerRef{
model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}}, model: &Model{
Options: &do, AdapterPaths: []string{"adapter1"},
llama: llm, ProjectorPaths: []string{"projector1"},
},
Options: &do,
llama: llm,
numParallel: 1,
} }
req := &LlmRequest{ req := &LlmRequest{
model: &Model{ model: &Model{
@ -581,8 +585,8 @@ func TestUnloadAllRunners(t *testing.T) {
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.unloadAllRunners() s.unloadAllRunners()
r1 := &runnerRef{llama: llm1} r1 := &runnerRef{llama: llm1, numParallel: 1}
r2 := &runnerRef{llama: llm2} r2 := &runnerRef{llama: llm2, numParallel: 1}
s.loadedMu.Lock() s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
@ -596,14 +600,32 @@ func TestUnloadAllRunners(t *testing.T) {
func TestUnload(t *testing.T) { func TestUnload(t *testing.T) {
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1} r1 := &runnerRef{llama: llm1, numParallel: 1}
r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}} r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
r1.unload() r1.unload()
require.True(t, llm1.closeCalled) require.True(t, llm1.closeCalled)
r2.unload() r2.unload()
require.Nil(t, r2.model) require.Nil(t, r2.model)
} }
func TestAlreadyCanceled(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer done()
dctx, done2 := context.WithCancel(ctx)
done2()
scenario1a := newScenario(t, dctx, "ollama-model-1", 10)
scenario1a.req.sessionDuration = 0
s := InitScheduler(ctx)
slog.Info("scenario1a")
s.pendingReqCh <- scenario1a.req
require.Len(t, s.pendingReqCh, 1)
s.Run(ctx)
time.Sleep(5 * time.Millisecond)
require.Empty(t, s.pendingReqCh)
require.Empty(t, scenario1a.req.errCh)
require.Empty(t, scenario1a.req.successCh)
}
type mockLlm struct { type mockLlm struct {
pingResp error pingResp error
waitResp error waitResp error