runner.go: Increase survivability of main processing loop

Currently, if an error occurs during the prep stages (such as
tokenizing) of a single request, it will only affect that request.
However, if an error happens during decoding, it can take down the
entire runner.

Instead, it's better to drop the tokens that triggered the error and try to
keep going. However, we also need to stop when we run out of tokens,
otherwise, this just causes an infinite loop. This is likely the cause
of at least some of the hanging issues that have been reported.

Bug 
This commit is contained in:
Jesse Gross 2024-11-14 15:01:48 -08:00 committed by Jesse Gross
parent a0ea067b63
commit 8a35bb926e

View file

@ -14,6 +14,7 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"runtime" "runtime"
"runtime/debug"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -339,6 +340,15 @@ func (s *Server) run(ctx context.Context) {
// it should only be responsible for accepting tokens or embeddings and // it should only be responsible for accepting tokens or embeddings and
// processing batches as fast as possible // processing batches as fast as possible
func (s *Server) processBatch(tokenBatch *llama.Batch, embedBatch *llama.Batch) { func (s *Server) processBatch(tokenBatch *llama.Batch, embedBatch *llama.Batch) {
// Try to keep going even if we hit a panic so that corner cases don't take the whole
// runner down. In most cases, this will result in dropping the tokens that we are currently
// processing and then continuing with what is remaining.
defer func() {
if err := recover(); err != nil {
slog.Error("error while processing batch", "error", err, "stack", debug.Stack())
}
}()
s.mu.Lock() s.mu.Lock()
for s.allNil() { for s.allNil() {
s.cond.Wait() // Wait until an item is added s.cond.Wait() // Wait until an item is added
@ -357,6 +367,14 @@ func (s *Server) processBatch(tokenBatch *llama.Batch, embedBatch *llama.Batch)
continue continue
} }
// If an error occurred during the processing of a previous batch then we may have emptied the inputs
// without adding a new one. In this case, end the sequence rather than infinite looping.
if len(seq.inputs) == 0 {
slog.Error("removing sequence due to no input tokens", "index", seqIdx, "cache id", seq.cache.Id)
s.removeSequence(seqIdx, "error")
continue
}
// if past the num predict limit // if past the num predict limit
if seq.numPredict > 0 && seq.numPredicted >= seq.numPredict { if seq.numPredict > 0 && seq.numPredicted >= seq.numPredict {
s.removeSequence(seqIdx, "limit") s.removeSequence(seqIdx, "limit")