2024-03-30 16:50:05 +00:00
package server
import (
"context"
"errors"
"fmt"
"log/slog"
"reflect"
2024-05-14 16:48:13 +00:00
"runtime"
2024-03-30 16:50:05 +00:00
"sort"
"strings"
"sync"
"time"
"github.com/ollama/ollama/api"
2024-05-22 04:30:52 +00:00
"github.com/ollama/ollama/envconfig"
2024-03-30 16:50:05 +00:00
"github.com/ollama/ollama/format"
"github.com/ollama/ollama/gpu"
"github.com/ollama/ollama/llm"
)
type LlmRequest struct {
ctx context . Context //nolint:containedctx
model * Model
opts api . Options
2024-07-01 16:43:59 +00:00
origNumCtx int // Track the initial ctx request
2024-03-30 16:50:05 +00:00
sessionDuration time . Duration
successCh chan * runnerRef
errCh chan error
2024-06-04 21:08:36 +00:00
schedAttempts uint
2024-03-30 16:50:05 +00:00
}
type Scheduler struct {
pendingReqCh chan * LlmRequest
finishedReqCh chan * LlmRequest
expiredCh chan * runnerRef
unloadedCh chan interface { }
loaded map [ string ] * runnerRef
loadedMu sync . Mutex
2024-05-07 00:47:52 +00:00
loadFn func ( req * LlmRequest , ggml * llm . GGML , gpus gpu . GpuInfoList , numParallel int )
newServerFn func ( gpus gpu . GpuInfoList , model string , ggml * llm . GGML , adapters [ ] string , projectors [ ] string , opts api . Options , numParallel int ) ( llm . LlamaServer , error )
2024-06-04 21:08:36 +00:00
getGpuFn func ( ) gpu . GpuInfoList
getCpuFn func ( ) gpu . GpuInfoList
reschedDelay time . Duration
2024-03-30 16:50:05 +00:00
}
2024-06-19 20:35:38 +00:00
// Default automatic value for number of models we allow per GPU
// Model will still need to fit in VRAM, but loading many small models
// on a large GPU can cause stalling
var defaultModelsPerGPU = 3
// Default automatic value for parallel setting
// Model will still need to fit in VRAM. If this setting wont fit
// we'll back off down to 1 to try to get it to fit
var defaultParallel = 4
2024-05-04 18:46:01 +00:00
var ErrMaxQueue = fmt . Errorf ( "server busy, please try again. maximum pending requests exceeded" )
2024-03-30 16:50:05 +00:00
func InitScheduler ( ctx context . Context ) * Scheduler {
sched := & Scheduler {
2024-05-04 18:46:01 +00:00
pendingReqCh : make ( chan * LlmRequest , envconfig . MaxQueuedRequests ) ,
finishedReqCh : make ( chan * LlmRequest , envconfig . MaxQueuedRequests ) ,
expiredCh : make ( chan * runnerRef , envconfig . MaxQueuedRequests ) ,
unloadedCh : make ( chan interface { } , envconfig . MaxQueuedRequests ) ,
2024-03-30 16:50:05 +00:00
loaded : make ( map [ string ] * runnerRef ) ,
newServerFn : llm . NewLlamaServer ,
getGpuFn : gpu . GetGPUInfo ,
2024-06-04 02:09:23 +00:00
getCpuFn : gpu . GetCPUInfo ,
2024-06-04 21:08:36 +00:00
reschedDelay : 250 * time . Millisecond ,
2024-03-30 16:50:05 +00:00
}
sched . loadFn = sched . load
return sched
}
// context must be canceled to decrement ref count and release the runner
func ( s * Scheduler ) GetRunner ( c context . Context , model * Model , opts api . Options , sessionDuration time . Duration ) ( chan * runnerRef , chan error ) {
2024-05-10 17:17:12 +00:00
if opts . NumCtx < 4 {
opts . NumCtx = 4
}
2024-03-30 16:50:05 +00:00
req := & LlmRequest {
ctx : c ,
model : model ,
opts : opts ,
sessionDuration : sessionDuration ,
successCh : make ( chan * runnerRef ) ,
errCh : make ( chan error , 1 ) ,
}
2024-05-05 22:59:32 +00:00
2024-03-30 16:50:05 +00:00
select {
case s . pendingReqCh <- req :
default :
2024-05-03 23:25:57 +00:00
req . errCh <- ErrMaxQueue
2024-03-30 16:50:05 +00:00
}
return req . successCh , req . errCh
}
// Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
func ( s * Scheduler ) Run ( ctx context . Context ) {
slog . Debug ( "starting llm scheduler" )
go func ( ) {
s . processPending ( ctx )
} ( )
go func ( ) {
s . processCompleted ( ctx )
} ( )
}
func ( s * Scheduler ) processPending ( ctx context . Context ) {
for {
select {
case <- ctx . Done ( ) :
slog . Debug ( "shutting down scheduler pending loop" )
return
case pending := <- s . pendingReqCh :
// Block other requests until we get this pending request running
2024-06-04 21:08:36 +00:00
pending . schedAttempts ++
2024-07-01 16:43:59 +00:00
if pending . origNumCtx == 0 {
pending . origNumCtx = pending . opts . NumCtx
2024-05-07 00:47:52 +00:00
}
2024-05-06 21:22:24 +00:00
if pending . ctx . Err ( ) != nil {
slog . Debug ( "pending request cancelled or timed out, skipping scheduling" )
continue
}
2024-05-07 00:47:52 +00:00
numParallel := envconfig . NumParallel
// TODO (jmorganca): multimodal models don't support parallel yet
// see https://github.com/ollama/ollama/issues/4165
if len ( pending . model . ProjectorPaths ) > 0 && numParallel != 1 {
numParallel = 1
slog . Warn ( "multimodal models don't support parallel requests yet" )
}
// Keep NumCtx and numParallel in sync
if numParallel > 1 {
2024-07-01 16:43:59 +00:00
pending . opts . NumCtx = pending . origNumCtx * numParallel
2024-05-07 00:47:52 +00:00
}
2024-05-06 21:22:24 +00:00
2024-03-30 16:50:05 +00:00
for {
var runnerToExpire * runnerRef
s . loadedMu . Lock ( )
runner := s . loaded [ pending . model . ModelPath ]
loadedCount := len ( s . loaded )
s . loadedMu . Unlock ( )
if runner != nil {
if runner . needsReload ( ctx , pending ) {
runnerToExpire = runner
} else {
// Runner is usable, return it
pending . useLoadedRunner ( runner , s . finishedReqCh )
break
}
2024-05-04 18:46:01 +00:00
} else if envconfig . MaxRunners > 0 && loadedCount >= envconfig . MaxRunners {
2024-03-30 16:50:05 +00:00
slog . Debug ( "max runners achieved, unloading one to make room" , "runner_count" , loadedCount )
2024-05-06 00:18:27 +00:00
runnerToExpire = s . findRunnerToUnload ( )
2024-03-30 16:50:05 +00:00
} else {
2024-05-04 18:46:01 +00:00
// Either no models are loaded or below envconfig.MaxRunners
2024-03-30 16:50:05 +00:00
// Get a refreshed GPU list
2024-06-04 02:09:23 +00:00
var gpus gpu . GpuInfoList
if pending . opts . NumGPU == 0 {
gpus = s . getCpuFn ( )
} else {
gpus = s . getGpuFn ( )
}
2024-04-24 23:17:24 +00:00
2024-06-19 20:35:38 +00:00
if envconfig . MaxRunners <= 0 {
// No user specified MaxRunners, so figure out what automatic setting to use
// If all GPUs have reliable free memory reporting, defaultModelsPerGPU * the number of GPUs
// if any GPU has unreliable free memory reporting, 1x the number of GPUs
allReliable := true
for _ , gpu := range gpus {
if gpu . UnreliableFreeMemory {
allReliable = false
break
}
}
if allReliable {
envconfig . MaxRunners = defaultModelsPerGPU * len ( gpus )
slog . Debug ( "updating default concurrency" , "OLLAMA_MAX_LOADED_MODELS" , envconfig . MaxRunners , "gpu_count" , len ( gpus ) )
} else {
slog . Info ( "one or more GPUs detected that are unable to accurately report free memory - disabling default concurrency" )
envconfig . MaxRunners = len ( gpus )
}
}
2024-04-24 23:17:24 +00:00
2024-04-24 23:37:03 +00:00
// Load model for fitting
2024-06-25 04:47:52 +00:00
ggml , err := llm . LoadModel ( pending . model . ModelPath , 0 )
2024-04-24 23:17:24 +00:00
if err != nil {
pending . errCh <- err
break
}
2024-04-24 23:37:03 +00:00
2024-06-04 02:09:23 +00:00
// Evaluate if the model will fit in the available system memory, or if we should unload a model first
if len ( gpus ) == 1 && gpus [ 0 ] . Library == "cpu" {
2024-05-07 00:47:52 +00:00
// simplifying assumption of defaultParallel when in CPU mode
if numParallel <= 0 {
numParallel = defaultParallel
2024-07-01 16:43:59 +00:00
pending . opts . NumCtx = pending . origNumCtx * numParallel
2024-05-07 00:47:52 +00:00
}
2024-06-04 02:09:23 +00:00
if loadedCount == 0 {
slog . Debug ( "cpu mode with first model, loading" )
2024-05-07 00:47:52 +00:00
s . loadFn ( pending , ggml , gpus , numParallel )
2024-06-04 02:09:23 +00:00
break
}
runnerToExpire = s . maybeFindCPURunnerToUnload ( pending , ggml , gpus )
if runnerToExpire == nil {
slog . Debug ( "cpu mode with available system memory or first model, loading" )
2024-05-07 00:47:52 +00:00
s . loadFn ( pending , ggml , gpus , numParallel )
2024-06-04 02:09:23 +00:00
break
}
// else we need to expire a runner
} else if loadedCount == 0 {
// No models loaded. Load the model but prefer the best fit.
2024-04-24 23:37:03 +00:00
slog . Debug ( "loading first model" , "model" , pending . model . ModelPath )
2024-05-07 00:47:52 +00:00
g := pickBestFitGPUs ( pending , ggml , gpus , & numParallel )
2024-04-24 23:37:03 +00:00
if g != nil {
gpus = g
}
2024-05-07 00:47:52 +00:00
s . loadFn ( pending , ggml , gpus , numParallel )
2024-04-24 23:37:03 +00:00
break
}
2024-06-04 02:09:23 +00:00
if runnerToExpire == nil {
2024-06-04 21:08:36 +00:00
// More than one loaded model, so we have to see if the
// new one fits
//
// We want to avoid loading on any GPUs that have other
// models still loading on them to avoid potential races
// with VRAM consumption ramping up during load
2024-06-05 19:07:20 +00:00
availGpus := s . filterGPUsWithoutLoadingModels ( gpus )
2024-06-04 21:08:36 +00:00
2024-06-04 02:09:23 +00:00
// Update free memory from currently loaded models
2024-06-04 21:08:36 +00:00
s . updateFreeSpace ( availGpus )
2024-05-07 00:47:52 +00:00
fitGpus := pickBestFitGPUs ( pending , ggml , availGpus , & numParallel )
2024-06-04 21:08:36 +00:00
if fitGpus != nil {
2024-06-04 02:09:23 +00:00
slog . Debug ( "new model fits with existing models, loading" )
2024-05-07 00:47:52 +00:00
s . loadFn ( pending , ggml , fitGpus , numParallel )
2024-06-04 21:08:36 +00:00
break
}
// We couldn't find a set of GPUs to fully load the new
// model. If no other models are loading (both GPU lists
// are the same) then we need to unload another model to
// make room
if len ( availGpus ) < len ( gpus ) {
// There are other requests pending, and this one
// needs more time, so put it on the back of the
// queue so that we might satisfy other pending
// requests that aren't blocked
go func ( ) {
// Process in a go routine to avoid deadlocking
// the scheduler if our queue is full
slog . Debug ( "delaying scheduling while other models finish loading" , "attempts" , pending . schedAttempts , "model" , pending . model . ModelPath )
time . Sleep ( s . reschedDelay )
s . pendingReqCh <- pending
} ( )
2024-06-04 02:09:23 +00:00
break
}
runnerToExpire = s . findRunnerToUnload ( )
2024-03-30 16:50:05 +00:00
}
}
if runnerToExpire == nil {
// Shouildn't happen
slog . Error ( "runner to expire was nil!" )
continue
}
// Trigger an expiration to unload once it's done
runnerToExpire . refMu . Lock ( )
2024-05-14 00:17:36 +00:00
slog . Debug ( "resetting model to expire immediately to make room" , "modelPath" , runnerToExpire . modelPath , "refCount" , runnerToExpire . refCount )
2024-03-30 16:50:05 +00:00
if runnerToExpire . expireTimer != nil {
runnerToExpire . expireTimer . Stop ( )
runnerToExpire . expireTimer = nil
}
runnerToExpire . sessionDuration = 0
if runnerToExpire . refCount <= 0 {
s . expiredCh <- runnerToExpire
}
runnerToExpire . refMu . Unlock ( )
// Wait for the unload to happen
// Note: at this point we're queueing up all incoming requests, even if they were for
// a different model that's loaded and not scheduled to be removed.
2024-05-14 00:17:36 +00:00
slog . Debug ( "waiting for pending requests to complete and unload to occur" , "modelPath" , runnerToExpire . modelPath )
2024-03-30 16:50:05 +00:00
select {
case <- ctx . Done ( ) :
slog . Debug ( "shutting down scheduler pending loop" )
return
case <- s . unloadedCh :
2024-05-14 00:17:36 +00:00
slog . Debug ( "unload completed" , "modelPath" , runnerToExpire . modelPath )
2024-03-30 16:50:05 +00:00
continue
}
}
case <- s . unloadedCh :
// An unload request when there are no pending request can be ignored
slog . Debug ( "ignoring unload event with no pending requests" )
}
}
}
func ( s * Scheduler ) processCompleted ( ctx context . Context ) {
// Process completed requests, expired timers, and unloading models
for {
select {
case <- ctx . Done ( ) :
slog . Debug ( "shutting down scheduler completed loop" )
return
case finished := <- s . finishedReqCh :
s . loadedMu . Lock ( )
runner := s . loaded [ finished . model . ModelPath ]
s . loadedMu . Unlock ( )
if runner == nil {
2024-05-21 20:39:01 +00:00
slog . Error ( "finished request signal received after model unloaded" , "modelPath" , finished . model . ModelPath )
2024-03-30 16:50:05 +00:00
continue
}
runner . refMu . Lock ( )
runner . refCount --
if runner . refCount <= 0 {
if runner . sessionDuration <= 0 {
2024-05-14 00:17:36 +00:00
slog . Debug ( "runner with zero duration has gone idle, expiring to unload" , "modelPath" , runner . modelPath )
2024-03-30 16:50:05 +00:00
if runner . expireTimer != nil {
runner . expireTimer . Stop ( )
runner . expireTimer = nil
}
s . expiredCh <- runner
} else if runner . expireTimer == nil {
2024-05-14 00:17:36 +00:00
slog . Debug ( "runner with non-zero duration has gone idle, adding timer" , "modelPath" , runner . modelPath , "duration" , runner . sessionDuration )
2024-03-30 16:50:05 +00:00
runner . expireTimer = time . AfterFunc ( runner . sessionDuration , func ( ) {
2024-05-14 00:17:36 +00:00
slog . Debug ( "timer expired, expiring to unload" , "modelPath" , runner . modelPath )
2024-03-30 16:50:05 +00:00
runner . refMu . Lock ( )
defer runner . refMu . Unlock ( )
if runner . expireTimer != nil {
runner . expireTimer . Stop ( )
2024-04-29 18:06:56 +00:00
runner . expireTimer = nil
2024-03-30 16:50:05 +00:00
}
s . expiredCh <- runner
} )
2024-05-14 00:17:36 +00:00
runner . expiresAt = time . Now ( ) . Add ( runner . sessionDuration )
2024-03-30 16:50:05 +00:00
} else {
2024-05-14 00:17:36 +00:00
slog . Debug ( "runner with non-zero duration has gone idle, resetting timer" , "modelPath" , runner . modelPath , "duration" , runner . sessionDuration )
2024-03-30 16:50:05 +00:00
runner . expireTimer . Reset ( runner . sessionDuration )
2024-05-14 00:17:36 +00:00
runner . expiresAt = time . Now ( ) . Add ( runner . sessionDuration )
2024-03-30 16:50:05 +00:00
}
}
2024-05-14 00:17:36 +00:00
slog . Debug ( "after processing request finished event" , "modelPath" , runner . modelPath , "refCount" , runner . refCount )
2024-03-30 16:50:05 +00:00
runner . refMu . Unlock ( )
case runner := <- s . expiredCh :
2024-05-14 00:17:36 +00:00
slog . Debug ( "runner expired event received" , "modelPath" , runner . modelPath )
2024-03-30 16:50:05 +00:00
runner . refMu . Lock ( )
if runner . refCount > 0 {
// Shouldn't happen, but safeguard to ensure no leaked runners
2024-05-14 00:17:36 +00:00
slog . Debug ( "expired event with positive ref count, retrying" , "modelPath" , runner . modelPath , "refCount" , runner . refCount )
2024-03-30 16:50:05 +00:00
go func ( runner * runnerRef ) {
// We can't unload yet, but want to as soon as the current request completes
// So queue up another expired event
time . Sleep ( 10 * time . Millisecond )
s . expiredCh <- runner
} ( runner )
runner . refMu . Unlock ( )
continue
}
2024-05-06 00:18:27 +00:00
s . loadedMu . Lock ( )
2024-05-14 00:17:36 +00:00
slog . Debug ( "got lock to unload" , "modelPath" , runner . modelPath )
2024-05-09 18:44:45 +00:00
finished := runner . waitForVRAMRecovery ( )
2024-03-30 16:50:05 +00:00
runner . unload ( )
2024-05-14 00:17:36 +00:00
delete ( s . loaded , runner . modelPath )
2024-03-30 16:50:05 +00:00
s . loadedMu . Unlock ( )
2024-05-14 00:17:36 +00:00
slog . Debug ( "runner released" , "modelPath" , runner . modelPath )
2024-03-30 16:50:05 +00:00
runner . refMu . Unlock ( )
2024-05-09 18:44:45 +00:00
<- finished
2024-05-14 00:17:36 +00:00
slog . Debug ( "sending an unloaded event" , "modelPath" , runner . modelPath )
2024-03-30 16:50:05 +00:00
s . unloadedCh <- struct { } { }
}
}
}
// Complete the pending request and send the runner back to the requester
// Wires up a finished event after the request context is completed
// Updates session duration, and resets expiration timer
func ( pending * LlmRequest ) useLoadedRunner ( runner * runnerRef , finished chan * LlmRequest ) {
runner . refMu . Lock ( )
defer runner . refMu . Unlock ( )
runner . refCount ++
2024-04-28 16:43:40 +00:00
if runner . expireTimer != nil {
runner . expireTimer . Stop ( )
runner . expireTimer = nil
}
2024-03-30 16:50:05 +00:00
runner . sessionDuration = pending . sessionDuration
pending . successCh <- runner
go func ( ) {
<- pending . ctx . Done ( )
slog . Debug ( "context for request finished" )
finished <- pending
} ( )
}
2024-05-07 00:47:52 +00:00
func ( s * Scheduler ) load ( req * LlmRequest , ggml * llm . GGML , gpus gpu . GpuInfoList , numParallel int ) {
if numParallel < 1 {
numParallel = 1
}
llama , err := s . newServerFn ( gpus , req . model . ModelPath , ggml , req . model . AdapterPaths , req . model . ProjectorPaths , req . opts , numParallel )
2024-03-30 16:50:05 +00:00
if err != nil {
// some older models are not compatible with newer versions of llama.cpp
// show a generalized compatibility error until there is a better way to
// check for model compatibility
if errors . Is ( llm . ErrUnsupportedFormat , err ) || strings . Contains ( err . Error ( ) , "failed to load model" ) {
err = fmt . Errorf ( "%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`" , err , req . model . ShortName )
}
slog . Info ( "NewLlamaServer failed" , "model" , req . model . ModelPath , "error" , err )
req . errCh <- err
return
}
2024-05-14 00:17:36 +00:00
runner := & runnerRef {
model : req . model ,
modelPath : req . model . ModelPath ,
llama : llama ,
Options : & req . opts ,
sessionDuration : req . sessionDuration ,
gpus : gpus ,
estimatedVRAM : llama . EstimatedVRAM ( ) ,
estimatedTotal : llama . EstimatedTotal ( ) ,
loading : true ,
refCount : 1 ,
}
2024-05-07 00:47:52 +00:00
runner . numParallel = numParallel
2024-03-30 16:50:05 +00:00
runner . refMu . Lock ( )
2024-05-14 00:17:36 +00:00
2024-03-30 16:50:05 +00:00
s . loadedMu . Lock ( )
s . loaded [ req . model . ModelPath ] = runner
slog . Info ( "loaded runners" , "count" , len ( s . loaded ) )
s . loadedMu . Unlock ( )
go func ( ) {
defer runner . refMu . Unlock ( )
if err = llama . WaitUntilRunning ( req . ctx ) ; err != nil {
slog . Error ( "error loading llama server" , "error" , err )
runner . refCount --
req . errCh <- err
2024-05-14 00:17:36 +00:00
slog . Debug ( "triggering expiration for failed load" , "model" , runner . modelPath )
2024-03-30 16:50:05 +00:00
s . expiredCh <- runner
return
}
slog . Debug ( "finished setting up runner" , "model" , req . model . ModelPath )
runner . loading = false
go func ( ) {
<- req . ctx . Done ( )
slog . Debug ( "context for request finished" )
s . finishedReqCh <- req
} ( )
req . successCh <- runner
} ( )
}
func ( s * Scheduler ) updateFreeSpace ( allGpus gpu . GpuInfoList ) {
type predKey struct {
Library string
ID string
}
predMap := map [ predKey ] uint64 { } // Sum up the total predicted usage per GPU for all runners
s . loadedMu . Lock ( )
for _ , r := range s . loaded {
r . refMu . Lock ( )
if r . llama != nil {
for _ , gpu := range allGpus {
2024-06-05 19:07:20 +00:00
predMap [ predKey { gpu . Library , gpu . ID } ] += r . llama . EstimatedVRAMByGPU ( gpu . ID )
2024-03-30 16:50:05 +00:00
}
} else {
slog . Warn ( "unexpected nil runner reference, memory prediction may be incorrect" )
}
r . refMu . Unlock ( )
}
s . loadedMu . Unlock ( )
// Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
for i := range allGpus {
if p , ok := predMap [ predKey { allGpus [ i ] . Library , allGpus [ i ] . ID } ] ; ok {
slog . Debug ( "gpu reported" , "gpu" , allGpus [ i ] . ID , "library" , allGpus [ i ] . Library , "available" , format . HumanBytes2 ( allGpus [ i ] . FreeMemory ) )
if p > allGpus [ i ] . TotalMemory {
// Shouldn't happen
slog . Warn ( "predicted usage exceeds VRAM" , "gpu" , allGpus [ i ] . ID , "totalMemory" , allGpus [ i ] . TotalMemory , "predicted" , p )
allGpus [ i ] . FreeMemory = 0
} else if ( allGpus [ i ] . TotalMemory - p ) < allGpus [ i ] . FreeMemory { // predicted free is smaller than reported free, use it
// TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
// and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
// after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
allGpus [ i ] . FreeMemory = allGpus [ i ] . TotalMemory - p
}
2024-06-04 21:08:36 +00:00
slog . Info ( "updated VRAM based on existing loaded models" , "gpu" , allGpus [ i ] . ID , "library" , allGpus [ i ] . Library , "total" , format . HumanBytes2 ( allGpus [ i ] . TotalMemory ) , "available" , format . HumanBytes2 ( allGpus [ i ] . FreeMemory ) )
}
}
}
// While models are loading the VRAM consumption numbers will be indeterminate, so we have
// to avoid scheduling another model on the same GPU(s) that haven't stabilized.
// This routine returns the set of GPUs that do not have an active loading model.
// If all GPUs have loading models, an empty list will be returned (not a single CPU entry)
2024-06-05 19:07:20 +00:00
func ( s * Scheduler ) filterGPUsWithoutLoadingModels ( allGpus gpu . GpuInfoList ) gpu . GpuInfoList {
2024-06-04 21:08:36 +00:00
ret := append ( gpu . GpuInfoList { } , allGpus ... )
s . loadedMu . Lock ( )
defer s . loadedMu . Unlock ( )
for _ , runner := range s . loaded {
if runner . loading {
slog . Debug ( "overlapping loads detected" , "gpus" , runner . gpus , "model" , runner . modelPath )
for _ , busyGPU := range runner . gpus {
for i := range ret {
if ret [ i ] . ID == busyGPU . ID {
ret = append ( ret [ : i ] , ret [ i + 1 : ] ... )
break
}
}
}
2024-03-30 16:50:05 +00:00
}
}
2024-06-04 21:08:36 +00:00
return ret
2024-03-30 16:50:05 +00:00
}
2024-06-04 21:08:36 +00:00
// TODO consolidate sched_types.go
2024-03-30 16:50:05 +00:00
type runnerRef struct {
refMu sync . Mutex
// refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
refCount uint // prevent unloading if > 0
// unloading bool // set to true when we are trying to unload the runner
2024-05-14 00:17:36 +00:00
llama llm . LlamaServer
loading bool // True only during initial load, then false forever
gpus gpu . GpuInfoList // Recorded at time of provisioning
estimatedVRAM uint64
estimatedTotal uint64
2024-03-30 16:50:05 +00:00
sessionDuration time . Duration
expireTimer * time . Timer
2024-05-14 00:17:36 +00:00
expiresAt time . Time
2024-03-30 16:50:05 +00:00
2024-05-07 00:47:52 +00:00
model * Model
modelPath string
numParallel int
2024-03-30 16:50:05 +00:00
* api . Options
}
// The refMu must already be held when calling unload
func ( runner * runnerRef ) unload ( ) {
2024-04-29 18:06:56 +00:00
if runner . expireTimer != nil {
runner . expireTimer . Stop ( )
runner . expireTimer = nil
}
2024-03-30 16:50:05 +00:00
if runner . llama != nil {
runner . llama . Close ( )
}
2024-05-14 00:17:36 +00:00
runner . model = nil
2024-03-30 16:50:05 +00:00
runner . llama = nil
runner . Options = nil
runner . gpus = nil
}
func ( runner * runnerRef ) needsReload ( ctx context . Context , req * LlmRequest ) bool {
slog . Debug ( "evaluating already loaded" , "model" , req . model . ModelPath )
runner . refMu . Lock ( )
defer runner . refMu . Unlock ( )
2024-04-25 23:02:40 +00:00
2024-03-30 16:50:05 +00:00
timeout := 10 * time . Second
if runner . loading {
timeout = 2 * time . Minute // Initial load can take a long time for big models on slow systems...
}
2024-04-25 23:02:40 +00:00
2024-05-06 21:22:24 +00:00
if runner . Options == nil {
return true
}
2024-04-25 23:02:40 +00:00
// Don't reload runner if num_gpu=-1 was provided
optsExisting := runner . Options . Runner
optsNew := req . opts . Runner
if optsNew . NumGPU < 0 {
optsExisting . NumGPU = - 1
optsNew . NumGPU = - 1
}
2024-05-07 00:47:52 +00:00
// Normalize the NumCtx for parallelism
optsExisting . NumCtx = optsExisting . NumCtx / runner . numParallel
2024-04-25 23:02:40 +00:00
ctx , cancel := context . WithTimeout ( ctx , timeout )
2024-03-30 16:50:05 +00:00
defer cancel ( )
2024-05-14 00:17:36 +00:00
if ! reflect . DeepEqual ( runner . model . AdapterPaths , req . model . AdapterPaths ) || // have the adapters changed?
! reflect . DeepEqual ( runner . model . ProjectorPaths , req . model . ProjectorPaths ) || // have the projectors changed?
2024-03-30 16:50:05 +00:00
! reflect . DeepEqual ( optsExisting , optsNew ) || // have the runner options changed?
runner . llama . Ping ( ctx ) != nil {
return true
}
2024-04-25 23:02:40 +00:00
2024-03-30 16:50:05 +00:00
return false
}
2024-05-09 18:44:45 +00:00
// Free memory reporting on GPUs can lag for a while even after the runner
// exits, so we have to keep checking until we see the available memory recover,
// otherwise subsequent model loads will get far less layers loaded or worse
// case, may completely fall back to CPU mode.
// This routine must be called before the runner unloads so it can establish
// a before and after GPU memory allocation. The returned channel
// will be notified when we're done waiting, or have timed out and should
// proceed anyway
func ( runner * runnerRef ) waitForVRAMRecovery ( ) chan interface { } {
finished := make ( chan interface { } , 1 )
2024-06-03 22:07:50 +00:00
// CPU or Metal don't need checking, so no waiting required
// windows can page VRAM, only cuda currently can report accurate used vram usage
2024-06-04 02:09:23 +00:00
if len ( runner . gpus ) == 0 ||
( len ( runner . gpus ) == 1 && ( runner . gpus [ 0 ] . Library == "cpu" || runner . gpus [ 0 ] . Library == "metal" ) ) ||
2024-06-03 22:07:50 +00:00
( runtime . GOOS == "windows" && runner . gpus [ 0 ] . Library != "cuda" ) {
2024-05-09 18:44:45 +00:00
finished <- struct { } { }
return finished
}
start := time . Now ( )
// Establish a baseline before we unload
gpusBefore := gpu . GetGPUInfo ( )
var totalMemoryBefore , freeMemoryBefore uint64
for _ , gpu := range gpusBefore {
totalMemoryBefore += gpu . TotalMemory
freeMemoryBefore += gpu . FreeMemory
}
go func ( ) {
expiresAt := start . Add ( 5 * time . Second ) // typical convergence is 0.5-1.5s
ticker := time . NewTicker ( 250 * time . Millisecond )
defer ticker . Stop ( )
for {
<- ticker . C
if time . Now ( ) . After ( expiresAt ) {
2024-06-04 21:08:36 +00:00
slog . Warn ( "gpu VRAM usage didn't recover within timeout" , "seconds" , time . Since ( start ) . Seconds ( ) , "model" , runner . modelPath )
2024-05-09 18:44:45 +00:00
finished <- struct { } { }
}
// Query GPUs, look for free to go back up
gpusNow := gpu . GetGPUInfo ( )
var totalMemoryNow , freeMemoryNow uint64
for _ , gpu := range gpusNow {
totalMemoryNow += gpu . TotalMemory
freeMemoryNow += gpu . FreeMemory
}
// If we're within ~80% of the estimated memory usage recovered, bail out
if float32 ( freeMemoryNow - freeMemoryBefore ) > float32 ( runner . estimatedVRAM ) * 0.8 {
2024-06-04 21:08:36 +00:00
slog . Debug ( fmt . Sprintf ( "gpu VRAM free memory converged after %0.2f seconds" , time . Since ( start ) . Seconds ( ) ) , "model" , runner . modelPath )
2024-05-09 18:44:45 +00:00
finished <- struct { } { }
return
}
}
} ( )
return finished
}
2024-03-30 16:50:05 +00:00
type ByDuration [ ] * runnerRef
func ( a ByDuration ) Len ( ) int { return len ( a ) }
func ( a ByDuration ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a ByDuration ) Less ( i , j int ) bool {
// uint64 to turn negative time (never unload) to largest
return uint64 ( a [ i ] . sessionDuration ) < uint64 ( a [ j ] . sessionDuration )
}
// TODO - future consideration to pick runners based on size
// type BySize []*runnerRef
// func (a BySize) Len() int { return len(a) }
// func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
// pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
// If the model can not be fit fully within the available GPU(s) nil is returned
2024-05-07 00:47:52 +00:00
// If numParallel is <= 0, this will attempt try to optimize parallism based on available VRAM, and adjust
// opts.NumCtx accordingly
func pickBestFitGPUs ( req * LlmRequest , ggml * llm . GGML , gpus gpu . GpuInfoList , numParallel * int ) gpu . GpuInfoList {
2024-03-30 16:50:05 +00:00
var estimatedVRAM uint64
2024-05-07 00:47:52 +00:00
var numParallelToTry [ ] int
if * numParallel <= 0 {
// If no specific parallel setting was provided, try larger then smaller, always end with 1
2024-06-19 20:35:38 +00:00
numParallelToTry = append ( numParallelToTry , defaultParallel , 1 )
2024-05-07 00:47:52 +00:00
} else {
numParallelToTry = [ ] int { * numParallel }
}
2024-03-30 16:50:05 +00:00
for _ , gl := range gpus . ByLibrary ( ) {
var ok bool
sgl := append ( make ( gpu . GpuInfoList , 0 , len ( gl ) ) , gl ... )
// TODO - potentially sort by performance capability, existing models loaded, etc.
2024-05-07 00:47:52 +00:00
// TODO - Eliminate any GPUs that already have envconfig.MaxRunners loaded on them
2024-03-30 16:50:05 +00:00
// Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
sort . Sort ( sort . Reverse ( gpu . ByFreeMemory ( sgl ) ) )
// First attempt to fit the model into a single GPU
2024-05-07 00:47:52 +00:00
for _ , p := range numParallelToTry {
2024-07-01 16:43:59 +00:00
req . opts . NumCtx = req . origNumCtx * p
2024-05-07 00:47:52 +00:00
if ! envconfig . SchedSpread {
for _ , g := range sgl {
if ok , estimatedVRAM = llm . PredictServerFit ( [ ] gpu . GpuInfo { g } , ggml , req . model . AdapterPaths , req . model . ProjectorPaths , req . opts ) ; ok {
slog . Info ( "new model will fit in available VRAM in single GPU, loading" , "model" , req . model . ModelPath , "gpu" , g . ID , "parallel" , p , "available" , g . FreeMemory , "required" , format . HumanBytes2 ( estimatedVRAM ) )
* numParallel = p
return [ ] gpu . GpuInfo { g }
}
2024-05-08 21:32:42 +00:00
}
2024-03-30 16:50:05 +00:00
}
}
// TODO future refinements
// - if multiple Libraries, see if any single GPU in any Library will fit
// - try subsets of GPUs instead of just falling back to 1 or all in a family
// Now try all the GPUs
2024-05-07 00:47:52 +00:00
for _ , p := range numParallelToTry {
2024-07-01 16:43:59 +00:00
req . opts . NumCtx = req . origNumCtx * p
2024-05-07 00:47:52 +00:00
if ok , estimatedVRAM = llm . PredictServerFit ( sgl , ggml , req . model . AdapterPaths , req . model . ProjectorPaths , req . opts ) ; ok {
slog . Info ( "new model will fit in available VRAM, loading" , "model" , req . model . ModelPath , "library" , sgl [ 0 ] . Library , "parallel" , p , "required" , format . HumanBytes2 ( estimatedVRAM ) )
* numParallel = p
return sgl
}
2024-03-30 16:50:05 +00:00
}
}
return nil
}
// findRunnerToUnload finds a runner to unload to make room for a new model
2024-05-06 00:18:27 +00:00
func ( s * Scheduler ) findRunnerToUnload ( ) * runnerRef {
2024-03-30 16:50:05 +00:00
s . loadedMu . Lock ( )
runnerList := make ( [ ] * runnerRef , 0 , len ( s . loaded ) )
for _ , r := range s . loaded {
runnerList = append ( runnerList , r )
}
s . loadedMu . Unlock ( )
2024-05-30 23:43:40 +00:00
if len ( runnerList ) == 0 {
slog . Debug ( "no loaded runner to unload" )
return nil
}
2024-03-30 16:50:05 +00:00
// In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
// e.g., if we have multiple options, will one make room for the request?
sort . Sort ( ByDuration ( runnerList ) )
// First try to find a runner that's already idle
for _ , runner := range runnerList {
runner . refMu . Lock ( )
rc := runner . refCount
runner . refMu . Unlock ( )
if rc == 0 {
slog . Debug ( "found an idle runner to unload" )
return runner
}
}
// None appear idle, just wait for the one with the shortest duration
slog . Debug ( "no idle runners, picking the shortest duration" , "count" , len ( runnerList ) )
return runnerList [ 0 ]
}
func ( s * Scheduler ) unloadAllRunners ( ) {
s . loadedMu . Lock ( )
defer s . loadedMu . Unlock ( )
for model , runner := range s . loaded {
if runner . llama != nil {
slog . Debug ( "shutting down runner" , "model" , model )
runner . llama . Close ( )
}
}
}
2024-06-04 02:09:23 +00:00
// If other runners are loaded, make sure the pending request will fit in system memory
// If not, pick a runner to unload, else return nil and the request can be loaded
func ( s * Scheduler ) maybeFindCPURunnerToUnload ( req * LlmRequest , ggml * llm . GGML , gpus gpu . GpuInfoList ) * runnerRef {
slog . Debug ( "evaluating if CPU model load will fit in available system memory" )
estimate := llm . EstimateGPULayers ( gpus , ggml , req . model . ProjectorPaths , req . opts )
if estimate . TotalSize <= gpus [ 0 ] . FreeMemory {
slog . Debug ( "cpu inference mode, model fits in available system memory" , "model" , format . HumanBytes2 ( estimate . TotalSize ) , "available" , format . HumanBytes2 ( gpus [ 0 ] . FreeMemory ) )
return nil
}
// TODO - optimization: try to find CPU only runners first, or partial offloads with enough in system memory to make room
return s . findRunnerToUnload ( )
}