Make maximum pending request configurable

This also bumps up the default to be 50 queued requests
instead of 10.
This commit is contained in:
Daniel Hiltgen 2024-05-03 16:25:57 -07:00
parent 371f5e52aa
commit 20f6c06569
3 changed files with 37 additions and 23 deletions

View file

@ -232,3 +232,9 @@ curl http://localhost:11434/api/generate -d '{"model": "llama3", "keep_alive": 0
Alternatively, you can change the amount of time all models are loaded into memory by setting the `OLLAMA_KEEP_ALIVE` environment variable when starting the Ollama server. The `OLLAMA_KEEP_ALIVE` variable uses the same parameter types as the `keep_alive` parameter types mentioned above. Refer to section explaining [how to configure the Ollama server](#how-do-i-configure-ollama-server) to correctly set the environment variable. Alternatively, you can change the amount of time all models are loaded into memory by setting the `OLLAMA_KEEP_ALIVE` environment variable when starting the Ollama server. The `OLLAMA_KEEP_ALIVE` variable uses the same parameter types as the `keep_alive` parameter types mentioned above. Refer to section explaining [how to configure the Ollama server](#how-do-i-configure-ollama-server) to correctly set the environment variable.
If you wish to override the `OLLAMA_KEEP_ALIVE` setting, use the `keep_alive` API parameter with the `/api/generate` or `/api/chat` API endpoints. If you wish to override the `OLLAMA_KEEP_ALIVE` setting, use the `keep_alive` API parameter with the `/api/generate` or `/api/chat` API endpoints.
## How do I manage the maximum number of requests the server can queue
If too many requests are sent to the server, it will respond with a 503 error
indicating the server is overloaded. You can adjust how many requests may be
queue by setting `OLLAMA_MAX_QUEUE`

View file

@ -146,12 +146,7 @@ func (s *Server) GenerateHandler(c *gin.Context) {
select { select {
case runner = <-rCh: case runner = <-rCh:
case err = <-eCh: case err = <-eCh:
if errors.Is(err, context.Canceled) { handleErrorResponse(c, err)
c.JSON(499, gin.H{"error": "request canceled"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return return
} }
@ -394,12 +389,7 @@ func (s *Server) EmbeddingsHandler(c *gin.Context) {
select { select {
case runner = <-rCh: case runner = <-rCh:
case err = <-eCh: case err = <-eCh:
if errors.Is(err, context.Canceled) { handleErrorResponse(c, err)
c.JSON(499, gin.H{"error": "request canceled"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return return
} }
@ -1212,12 +1202,7 @@ func (s *Server) ChatHandler(c *gin.Context) {
select { select {
case runner = <-rCh: case runner = <-rCh:
case err = <-eCh: case err = <-eCh:
if errors.Is(err, context.Canceled) { handleErrorResponse(c, err)
c.JSON(499, gin.H{"error": "request canceled"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return return
} }
@ -1338,3 +1323,15 @@ func (s *Server) ChatHandler(c *gin.Context) {
streamResponse(c, ch) streamResponse(c, ch)
} }
func handleErrorResponse(c *gin.Context, err error) {
if errors.Is(err, context.Canceled) {
c.JSON(499, gin.H{"error": "request canceled"})
return
}
if errors.Is(err, ErrMaxQueue) {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
}

View file

@ -43,10 +43,13 @@ type Scheduler struct {
getGpuFn func() gpu.GpuInfoList getGpuFn func() gpu.GpuInfoList
} }
var (
// TODO set this to zero after a release or two, to enable multiple models by default // TODO set this to zero after a release or two, to enable multiple models by default
var loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners) loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
var maxQueuedRequests = 10 // TODO configurable maxQueuedRequests = 512
var numParallel = 1 numParallel = 1
ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
)
func InitScheduler(ctx context.Context) *Scheduler { func InitScheduler(ctx context.Context) *Scheduler {
maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS") maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
@ -66,6 +69,14 @@ func InitScheduler(ctx context.Context) *Scheduler {
numParallel = p numParallel = p
} }
} }
if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" {
p, err := strconv.Atoi(onp)
if err != nil || p <= 0 {
slog.Error("invalid setting", "OLLAMA_MAX_QUEUE", onp, "error", err)
} else {
maxQueuedRequests = p
}
}
sched := &Scheduler{ sched := &Scheduler{
pendingReqCh: make(chan *LlmRequest, maxQueuedRequests), pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
@ -95,7 +106,7 @@ func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options,
select { select {
case s.pendingReqCh <- req: case s.pendingReqCh <- req:
default: default:
req.errCh <- fmt.Errorf("server busy, please try again. maximum pending requests exceeded") req.errCh <- ErrMaxQueue
} }
return req.successCh, req.errCh return req.successCh, req.errCh
} }