Prevent multiple concurrent loads on the same gpus

While models are loading, the VRAM metrics are dynamic, so try
to load on a GPU that doesn't have a model actively loading, or wait
to avoid races that lead to OOMs
This commit is contained in:
Daniel Hiltgen 2024-06-04 14:08:36 -07:00
parent fc37c192ae
commit ff4f0cbd1d

View file

@ -26,6 +26,7 @@ type LlmRequest struct {
sessionDuration time.Duration sessionDuration time.Duration
successCh chan *runnerRef successCh chan *runnerRef
errCh chan error errCh chan error
schedAttempts uint
} }
type Scheduler struct { type Scheduler struct {
@ -37,10 +38,11 @@ type Scheduler struct {
loaded map[string]*runnerRef loaded map[string]*runnerRef
loadedMu sync.Mutex loadedMu sync.Mutex
loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
getGpuFn func() gpu.GpuInfoList getGpuFn func() gpu.GpuInfoList
getCpuFn func() gpu.GpuInfoList getCpuFn func() gpu.GpuInfoList
reschedDelay time.Duration
} }
var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded") var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
@ -55,6 +57,7 @@ func InitScheduler(ctx context.Context) *Scheduler {
newServerFn: llm.NewLlamaServer, newServerFn: llm.NewLlamaServer,
getGpuFn: gpu.GetGPUInfo, getGpuFn: gpu.GetGPUInfo,
getCpuFn: gpu.GetCPUInfo, getCpuFn: gpu.GetCPUInfo,
reschedDelay: 250 * time.Millisecond,
} }
sched.loadFn = sched.load sched.loadFn = sched.load
return sched return sched
@ -106,6 +109,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
return return
case pending := <-s.pendingReqCh: case pending := <-s.pendingReqCh:
// Block other requests until we get this pending request running // Block other requests until we get this pending request running
pending.schedAttempts++
if pending.ctx.Err() != nil { if pending.ctx.Err() != nil {
slog.Debug("pending request cancelled or timed out, skipping scheduling") slog.Debug("pending request cancelled or timed out, skipping scheduling")
@ -172,13 +176,39 @@ func (s *Scheduler) processPending(ctx context.Context) {
} }
if runnerToExpire == nil { if runnerToExpire == nil {
// More than one loaded model, so we have to see if the new one fits // More than one loaded model, so we have to see if the
// new one fits
//
// We want to avoid loading on any GPUs that have other
// models still loading on them to avoid potential races
// with VRAM consumption ramping up during load
availGpus := s.filterGPUsWithLoadingModels(gpus)
// Update free memory from currently loaded models // Update free memory from currently loaded models
s.updateFreeSpace(gpus) s.updateFreeSpace(availGpus)
gpus = pickBestFitGPUs(pending, ggml, gpus) fitGpus := pickBestFitGPUs(pending, ggml, availGpus)
if gpus != nil { if fitGpus != nil {
slog.Debug("new model fits with existing models, loading") slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, fitGpus)
break
}
// We couldn't find a set of GPUs to fully load the new
// model. If no other models are loading (both GPU lists
// are the same) then we need to unload another model to
// make room
if len(availGpus) < len(gpus) {
// There are other requests pending, and this one
// needs more time, so put it on the back of the
// queue so that we might satisfy other pending
// requests that aren't blocked
go func() {
// Process in a go routine to avoid deadlocking
// the scheduler if our queue is full
slog.Debug("delaying scheduling while other models finish loading", "attempts", pending.schedAttempts, "model", pending.model.ModelPath)
time.Sleep(s.reschedDelay)
s.pendingReqCh <- pending
}()
break break
} }
runnerToExpire = s.findRunnerToUnload() runnerToExpire = s.findRunnerToUnload()
@ -409,11 +439,36 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
// after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
allGpus[i].FreeMemory = allGpus[i].TotalMemory - p allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
} }
slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory)) slog.Info("updated VRAM based on existing loaded models", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
} }
} }
} }
// While models are loading the VRAM consumption numbers will be indeterminate, so we have
// to avoid scheduling another model on the same GPU(s) that haven't stabilized.
// This routine returns the set of GPUs that do not have an active loading model.
// If all GPUs have loading models, an empty list will be returned (not a single CPU entry)
func (s *Scheduler) filterGPUsWithLoadingModels(allGpus gpu.GpuInfoList) gpu.GpuInfoList {
ret := append(gpu.GpuInfoList{}, allGpus...)
s.loadedMu.Lock()
defer s.loadedMu.Unlock()
for _, runner := range s.loaded {
if runner.loading {
slog.Debug("overlapping loads detected", "gpus", runner.gpus, "model", runner.modelPath)
for _, busyGPU := range runner.gpus {
for i := range ret {
if ret[i].ID == busyGPU.ID {
ret = append(ret[:i], ret[i+1:]...)
break
}
}
}
}
}
return ret
}
// TODO consolidate sched_types.go
type runnerRef struct { type runnerRef struct {
refMu sync.Mutex refMu sync.Mutex
// refCond sync.Cond // Signaled on transition from 1 -> 0 refCount // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
@ -519,7 +574,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
for { for {
<-ticker.C <-ticker.C
if time.Now().After(expiresAt) { if time.Now().After(expiresAt) {
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds()) slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath)
finished <- struct{}{} finished <- struct{}{}
} }
@ -532,7 +587,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
} }
// If we're within ~80% of the estimated memory usage recovered, bail out // If we're within ~80% of the estimated memory usage recovered, bail out
if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 { if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 {
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds())) slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath)
finished <- struct{}{} finished <- struct{}{}
return return
} }