From 3478b2cf14c3fa2661c03f7fd5764a63a496293a Mon Sep 17 00:00:00 2001 From: Jesse Gross Date: Fri, 22 Nov 2024 15:17:15 -0800 Subject: [PATCH] runner.go: Fix deadlock with many concurrent requests If there are no avilable slots for new sequences then a request will not be added to the processing queue but will continue on to wait for a response that never comes. Besides never giving a response to the request, this prevents the model from being unloaded due to the outstanding request. To prevent this, there are semaphores that prevent more requests from being processed than there are slots - one in the Ollama server and one in the runner. - The Ollama server one works but it is not designed to protect the runner's data internal structures and the runner can return a final response before clearing its data structures. - The internal runner semaphore has similar behavior where it can release the semaphore when it issues a response. This is wrong - it should only release the semaphore after it has cleared the data structure. In addition, we should return an error if a slot is not found rather than deadlocking in the event we ever get to this spot. Fixes #7779 --- llama/runner/runner.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/llama/runner/runner.go b/llama/runner/runner.go index c3d0353f..db8092f3 100644 --- a/llama/runner/runner.go +++ b/llama/runner/runner.go @@ -300,6 +300,7 @@ func (s *Server) removeSequence(seqIndex int, reason string) { close(seq.embedding) seq.cache.InUse = false s.seqs[seqIndex] = nil + s.seqsSem.Release(1) } func (s *Server) run(ctx context.Context) { @@ -649,7 +650,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { return } - // Ensure that a place to put the sequence is available + // Ensure there is a place to put the sequence, released when removed from s.seqs if err := s.seqsSem.Acquire(r.Context(), 1); err != nil { if errors.Is(err, context.Canceled) { slog.Info("aborting completion request due to client closing the connection") @@ -658,9 +659,9 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { } return } - defer s.seqsSem.Release(1) s.mu.Lock() + found := false for i, sq := range s.seqs { if sq == nil { seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt) @@ -674,11 +675,17 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) { s.seqs[i] = seq s.cond.Signal() + found = true break } } s.mu.Unlock() + if !found { + http.Error(w, "could not find an available sequence", http.StatusInternalServerError) + return + } + for { select { case <-r.Context().Done(): @@ -742,7 +749,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { return } - // Ensure that a place to put the sequence is available + // Ensure there is a place to put the sequence, released when removed from s.seqs if err := s.seqsSem.Acquire(r.Context(), 1); err != nil { if errors.Is(err, context.Canceled) { slog.Info("aborting embeddings request due to client closing the connection") @@ -751,9 +758,9 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { } return } - defer s.seqsSem.Release(1) s.mu.Lock() + found := false for i, sq := range s.seqs { if sq == nil { seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt) @@ -764,11 +771,17 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) { } s.seqs[i] = seq s.cond.Signal() + found = true break } } s.mu.Unlock() + if !found { + http.Error(w, "could not find an available sequence", http.StatusInternalServerError) + return + } + embedding := <-seq.embedding if err := json.NewEncoder(w).Encode(&EmbeddingResponse{