From 77295f716ef3f4cade6d93232d3b933db7c57dd7 Mon Sep 17 00:00:00 2001 From: Bruce MacDonald Date: Wed, 11 Oct 2023 12:32:13 -0400 Subject: [PATCH] prevent waiting on exited command (#752) * prevent waiting on exited command * close llama runner once --- llm/llama.go | 64 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/llm/llama.go b/llm/llama.go index 8ddaadef..8bd11f53 100644 --- a/llm/llama.go +++ b/llm/llama.go @@ -20,6 +20,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/jmorganca/ollama/api" @@ -177,9 +178,12 @@ type llamaHyperparameters struct { } type Running struct { - Port int - Cmd *exec.Cmd - Cancel context.CancelFunc + Port int + Cmd *exec.Cmd + Cancel context.CancelFunc + exitOnce sync.Once + exitCh chan error // channel to receive the exit status of the subprocess + exitErr error // error returned by the subprocess } type llama struct { @@ -308,7 +312,7 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers cmd.Stdout = os.Stderr cmd.Stderr = os.Stderr - llm := &llama{Options: opts, Running: Running{Port: port, Cmd: cmd, Cancel: cancel}} + llm := &llama{Options: opts, Running: Running{Port: port, Cmd: cmd, Cancel: cancel, exitCh: make(chan error)}} log.Print("starting llama runner") if err := llm.Cmd.Start(); err != nil { @@ -316,14 +320,14 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers continue } - // monitor the command, it is blocking, so if it exits we need to capture that + // monitor the llama runner process and signal when it exits go func() { - err := llm.Cmd.Wait() // this will block until the command exits - if err != nil { - log.Printf("llama runner exited with error: %v", err) - } else { - log.Printf("llama runner exited") - } + err := llm.Cmd.Wait() + llm.exitErr = err + // llm.Cmd.Wait() can only be called once, use this exit channel to signal that the process has exited + llm.exitOnce.Do(func() { + close(llm.exitCh) + }) }() if err := waitForServer(llm); err != nil { @@ -341,29 +345,30 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers } func waitForServer(llm *llama) error { - // wait for the server to start responding start := time.Now() expiresAt := time.Now().Add(2 * time.Minute) // be generous with timeout, large models can take a while to load ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() log.Print("waiting for llama runner to start responding") - for range ticker.C { - if time.Now().After(expiresAt) { - return fmt.Errorf("llama runner did not start within alloted time, retrying") - } - - // check if the server process has terminated - if llm.Cmd.ProcessState != nil && llm.Cmd.ProcessState.Exited() { + for { + select { + case <-llm.exitCh: + // failed to start subprocess return fmt.Errorf("llama runner process has terminated") - } + case <-ticker.C: + if time.Now().After(expiresAt) { + // timeout + return fmt.Errorf("llama runner did not start within allotted time, retrying") + } - if err := llm.Ping(context.Background()); err == nil { - break + if err := llm.Ping(context.Background()); err == nil { + // success + log.Printf("llama runner started in %f seconds", time.Since(start).Seconds()) + return nil + } } } - - log.Printf("llama runner started in %f seconds", time.Since(start).Seconds()) - return nil } func (llm *llama) Close() { @@ -371,8 +376,13 @@ func (llm *llama) Close() { llm.Cancel() // wait for the command to exit to prevent race conditions with the next run - if err := llm.Cmd.Wait(); err != nil { - log.Printf("llama runner exited: %v", err) + <-llm.exitCh + err := llm.exitErr + + if err != nil { + log.Printf("llama runner stopped with error: %v", err) + } else { + log.Print("llama runner stopped successfully") } }