From bbe41ce41a49099097f30fcdb59f08e707d166e1 Mon Sep 17 00:00:00 2001 From: Bruce MacDonald Date: Sat, 9 Dec 2023 14:14:02 -0500 Subject: [PATCH] fix: parallel queueing race condition caused silent failure (#1445) * fix: queued request failures - increase parallel requests to 2 to complete queued request, queueing is managed in ollama * log steam errors --- llm/llama.go | 54 ++++++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/llm/llama.go b/llm/llama.go index b2f1571f..9dcf5bac 100644 --- a/llm/llama.go +++ b/llm/llama.go @@ -341,6 +341,7 @@ func newLlama(model string, adapters, projectors []string, runners []ModelRunner "--ctx-size", fmt.Sprintf("%d", opts.NumCtx), "--batch-size", fmt.Sprintf("%d", opts.NumBatch), "--n-gpu-layers", fmt.Sprintf("%d", numGPU), + "--parallel", "2", "--embedding", } @@ -631,34 +632,37 @@ func (llm *llama) Predict(ctx context.Context, predict PredictOpts, fn func(Pred continue } - if evt, ok := bytes.CutPrefix(line, []byte("data: ")); ok { - var p prediction - if err := json.Unmarshal(evt, &p); err != nil { - return fmt.Errorf("error unmarshaling llm prediction response: %v", err) - } + evt, ok := bytes.CutPrefix(line, []byte("data: ")) + if !ok { + return fmt.Errorf("error parsing llm response stream: %s", line) + } - if p.Content != "" { - fn(PredictResult{ - Model: predict.Model, - CreatedAt: time.Now().UTC(), - Content: p.Content, - }) - } + var p prediction + if err := json.Unmarshal(evt, &p); err != nil { + return fmt.Errorf("error unmarshaling llm prediction response: %v", err) + } - if p.Stop { - fn(PredictResult{ - Model: predict.Model, - CreatedAt: time.Now().UTC(), - TotalDuration: time.Since(predict.CheckpointStart), + if p.Content != "" { + fn(PredictResult{ + Model: predict.Model, + CreatedAt: time.Now().UTC(), + Content: p.Content, + }) + } - Done: true, - PromptEvalCount: p.Timings.PromptN, - PromptEvalDuration: parseDurationMs(p.Timings.PromptMS), - EvalCount: p.Timings.PredictedN, - EvalDuration: parseDurationMs(p.Timings.PredictedMS), - }) - return nil - } + if p.Stop { + fn(PredictResult{ + Model: predict.Model, + CreatedAt: time.Now().UTC(), + TotalDuration: time.Since(predict.CheckpointStart), + + Done: true, + PromptEvalCount: p.Timings.PromptN, + PromptEvalDuration: parseDurationMs(p.Timings.PromptMS), + EvalCount: p.Timings.PredictedN, + EvalDuration: parseDurationMs(p.Timings.PredictedMS), + }) + return nil } } }