Refine subprocess reaping

This commit is contained in:
Daniel Hiltgen 2024-05-09 11:10:28 -07:00
parent 920a4b0794
commit 84ac7ce139

View file

@ -53,6 +53,7 @@ type llmServer struct {
estimatedTotal uint64 // Total size of model estimatedTotal uint64 // Total size of model
totalLayers uint64 totalLayers uint64
gpuCount int gpuCount int
loadDuration time.Duration // Record how long it took the model to load
sem *semaphore.Weighted sem *semaphore.Weighted
} }
@ -291,6 +292,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
sem: semaphore.NewWeighted(int64(numParallel)), sem: semaphore.NewWeighted(int64(numParallel)),
totalLayers: ggml.KV().BlockCount() + 1, totalLayers: ggml.KV().BlockCount() + 1,
gpuCount: gpuCount, gpuCount: gpuCount,
done: make(chan error, 1),
} }
s.cmd.Env = os.Environ() s.cmd.Env = os.Environ()
@ -339,6 +341,11 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
continue continue
} }
// reap subprocess when it exits
go func() {
s.done <- s.cmd.Wait()
}()
return s, nil return s, nil
} }
@ -486,6 +493,7 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
expiresAt := time.Now().Add(10 * time.Minute) // be generous with timeout, large models can take a while to load expiresAt := time.Now().Add(10 * time.Minute) // be generous with timeout, large models can take a while to load
slog.Info("waiting for llama runner to start responding") slog.Info("waiting for llama runner to start responding")
var lastStatus ServerStatus = -1
for { for {
select { select {
@ -500,12 +508,6 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
return fmt.Errorf("llama runner process has terminated: %v %s", err, msg) return fmt.Errorf("llama runner process has terminated: %v %s", err, msg)
default: default:
} }
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
status, err := s.getServerStatus(ctx)
if err != nil {
slog.Debug("server not yet available", "error", err)
}
if time.Now().After(expiresAt) { if time.Now().After(expiresAt) {
// timeout // timeout
msg := "" msg := ""
@ -521,14 +523,20 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
} }
return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg) return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg)
} }
ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel()
status, _ := s.getServerStatus(ctx)
if lastStatus != status && status != ServerStatusReady {
// Only log on status changes
slog.Info("waiting for server to become available", "status", status.ToString())
}
switch status { switch status {
case ServerStatusLoadingModel:
time.Sleep(time.Millisecond * 250)
slog.Debug("loading model")
case ServerStatusReady: case ServerStatusReady:
slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", time.Since(start).Seconds())) s.loadDuration = time.Since(start)
slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", s.loadDuration.Seconds()))
return nil return nil
default: default:
lastStatus = status
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 250)
continue continue
} }
@ -930,8 +938,11 @@ func (s *llmServer) Close() error {
if err := s.cmd.Process.Kill(); err != nil { if err := s.cmd.Process.Kill(); err != nil {
return err return err
} }
// if ProcessState is already populated, Wait already completed, no need to wait again
_ = s.cmd.Wait() if s.cmd.ProcessState == nil {
slog.Debug("waiting for llama server to exit")
<-s.done
}
slog.Debug("llama server stopped") slog.Debug("llama server stopped")
} }