prevent waiting on exited command (#752)
* prevent waiting on exited command * close llama runner once
This commit is contained in:
parent
f2ba1311aa
commit
77295f716e
1 changed files with 37 additions and 27 deletions
54
llm/llama.go
54
llm/llama.go
|
@ -20,6 +20,7 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jmorganca/ollama/api"
|
||||
|
@ -180,6 +181,9 @@ type Running struct {
|
|||
Port int
|
||||
Cmd *exec.Cmd
|
||||
Cancel context.CancelFunc
|
||||
exitOnce sync.Once
|
||||
exitCh chan error // channel to receive the exit status of the subprocess
|
||||
exitErr error // error returned by the subprocess
|
||||
}
|
||||
|
||||
type llama struct {
|
||||
|
@ -308,7 +312,7 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers
|
|||
cmd.Stdout = os.Stderr
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
llm := &llama{Options: opts, Running: Running{Port: port, Cmd: cmd, Cancel: cancel}}
|
||||
llm := &llama{Options: opts, Running: Running{Port: port, Cmd: cmd, Cancel: cancel, exitCh: make(chan error)}}
|
||||
|
||||
log.Print("starting llama runner")
|
||||
if err := llm.Cmd.Start(); err != nil {
|
||||
|
@ -316,14 +320,14 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers
|
|||
continue
|
||||
}
|
||||
|
||||
// monitor the command, it is blocking, so if it exits we need to capture that
|
||||
// monitor the llama runner process and signal when it exits
|
||||
go func() {
|
||||
err := llm.Cmd.Wait() // this will block until the command exits
|
||||
if err != nil {
|
||||
log.Printf("llama runner exited with error: %v", err)
|
||||
} else {
|
||||
log.Printf("llama runner exited")
|
||||
}
|
||||
err := llm.Cmd.Wait()
|
||||
llm.exitErr = err
|
||||
// llm.Cmd.Wait() can only be called once, use this exit channel to signal that the process has exited
|
||||
llm.exitOnce.Do(func() {
|
||||
close(llm.exitCh)
|
||||
})
|
||||
}()
|
||||
|
||||
if err := waitForServer(llm); err != nil {
|
||||
|
@ -341,38 +345,44 @@ func newLlama(model string, adapters []string, runners []ModelRunner, numLayers
|
|||
}
|
||||
|
||||
func waitForServer(llm *llama) error {
|
||||
// wait for the server to start responding
|
||||
start := time.Now()
|
||||
expiresAt := time.Now().Add(2 * time.Minute) // be generous with timeout, large models can take a while to load
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Print("waiting for llama runner to start responding")
|
||||
for range ticker.C {
|
||||
if time.Now().After(expiresAt) {
|
||||
return fmt.Errorf("llama runner did not start within alloted time, retrying")
|
||||
}
|
||||
|
||||
// check if the server process has terminated
|
||||
if llm.Cmd.ProcessState != nil && llm.Cmd.ProcessState.Exited() {
|
||||
for {
|
||||
select {
|
||||
case <-llm.exitCh:
|
||||
// failed to start subprocess
|
||||
return fmt.Errorf("llama runner process has terminated")
|
||||
case <-ticker.C:
|
||||
if time.Now().After(expiresAt) {
|
||||
// timeout
|
||||
return fmt.Errorf("llama runner did not start within allotted time, retrying")
|
||||
}
|
||||
|
||||
if err := llm.Ping(context.Background()); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// success
|
||||
log.Printf("llama runner started in %f seconds", time.Since(start).Seconds())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (llm *llama) Close() {
|
||||
// signal the sub-process to terminate
|
||||
llm.Cancel()
|
||||
|
||||
// wait for the command to exit to prevent race conditions with the next run
|
||||
if err := llm.Cmd.Wait(); err != nil {
|
||||
log.Printf("llama runner exited: %v", err)
|
||||
<-llm.exitCh
|
||||
err := llm.exitErr
|
||||
|
||||
if err != nil {
|
||||
log.Printf("llama runner stopped with error: %v", err)
|
||||
} else {
|
||||
log.Print("llama runner stopped successfully")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue