From 8a35bb926e63cd36e221eafc4dd4054fbdcd398b Mon Sep 17 00:00:00 2001 From: Jesse Gross Date: Thu, 14 Nov 2024 15:01:48 -0800 Subject: [PATCH] runner.go: Increase survivability of main processing loop Currently, if an error occurs during the prep stages (such as tokenizing) of a single request, it will only affect that request. However, if an error happens during decoding, it can take down the entire runner. Instead, it's better to drop the tokens that triggered the error and try to keep going. However, we also need to stop when we run out of tokens, otherwise, this just causes an infinite loop. This is likely the cause of at least some of the hanging issues that have been reported. Bug #7573 --- llama/runner/runner.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/llama/runner/runner.go b/llama/runner/runner.go index 0ae50608..3ffb57bb 100644 --- a/llama/runner/runner.go +++ b/llama/runner/runner.go @@ -14,6 +14,7 @@ import ( "path/filepath" "regexp" "runtime" + "runtime/debug" "strconv" "strings" "sync" @@ -339,6 +340,15 @@ func (s *Server) run(ctx context.Context) { // it should only be responsible for accepting tokens or embeddings and // processing batches as fast as possible func (s *Server) processBatch(tokenBatch *llama.Batch, embedBatch *llama.Batch) { + // Try to keep going even if we hit a panic so that corner cases don't take the whole + // runner down. In most cases, this will result in dropping the tokens that we are currently + // processing and then continuing with what is remaining. + defer func() { + if err := recover(); err != nil { + slog.Error("error while processing batch", "error", err, "stack", debug.Stack()) + } + }() + s.mu.Lock() for s.allNil() { s.cond.Wait() // Wait until an item is added @@ -357,6 +367,14 @@ func (s *Server) processBatch(tokenBatch *llama.Batch, embedBatch *llama.Batch) continue } + // If an error occurred during the processing of a previous batch then we may have emptied the inputs + // without adding a new one. In this case, end the sequence rather than infinite looping. + if len(seq.inputs) == 0 { + slog.Error("removing sequence due to no input tokens", "index", seqIdx, "cache id", seq.cache.Id) + s.removeSequence(seqIdx, "error") + continue + } + // if past the num predict limit if seq.numPredict > 0 && seq.numPredicted >= seq.numPredict { s.removeSequence(seqIdx, "limit")