From bd54b08261c15e927234d03e2b1020e528b38afe Mon Sep 17 00:00:00 2001 From: ManniX-ITA <20623405+mann1x@users.noreply.github.com> Date: Wed, 17 Apr 2024 17:39:52 +0200 Subject: [PATCH 1/3] Streamlined WaitUntilRunning --- llm/server.go | 68 ++++++++++++++++++++------------------------------- 1 file changed, 27 insertions(+), 41 deletions(-) diff --git a/llm/server.go b/llm/server.go index 4e808085..25122572 100644 --- a/llm/server.go +++ b/llm/server.go @@ -381,56 +381,42 @@ func (s *LlamaServer) Ping(ctx context.Context) error { func (s *LlamaServer) WaitUntilRunning() error { start := time.Now() - // TODO we need to wire up a better way to detect hangs during model load and startup of the server expiresAt := time.Now().Add(10 * time.Minute) // be generous with timeout, large models can take a while to load - ticker := time.NewTicker(50 * time.Millisecond) - defer ticker.Stop() slog.Info("waiting for llama runner to start responding") - var lastStatus ServerStatus = -1 + for { - select { - case err := <-s.done: + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + status, err := s.getServerStatus(ctx) + if err != nil { + slog.Debug("server not yet available", "error", err) + } + if time.Now().After(expiresAt) { + // timeout msg := "" if s.status != nil && s.status.LastErrMsg != "" { msg = s.status.LastErrMsg } - return fmt.Errorf("llama runner process has terminated: %v %s", err, msg) - case <-ticker.C: - if time.Now().After(expiresAt) { - // timeout - msg := "" - if s.status != nil && s.status.LastErrMsg != "" { - msg = s.status.LastErrMsg - } - return fmt.Errorf("timed out waiting for llama runner to start: %s", msg) - } - if s.cmd.ProcessState != nil { - msg := "" - if s.status != nil && s.status.LastErrMsg != "" { - msg = s.status.LastErrMsg - } - return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg) - } - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - status, err := s.getServerStatus(ctx) - if err != nil && lastStatus != status { - slog.Debug("server not yet available", "error", err) - lastStatus = status - continue - } - - switch status { - case ServerStatusLoadingModel: - // TODO - this state never seems to happen with the current server.cpp code (bug?) - // it doesn't respond to the health endpoint until after the model is loaded - slog.Debug("loading model") - case ServerStatusReady: - slog.Debug(fmt.Sprintf("llama runner started in %f seconds", time.Since(start).Seconds())) - return nil + return fmt.Errorf("timed out waiting for llama runner to start: %s", msg) + } + if s.cmd.ProcessState != nil { + msg := "" + if s.status != nil && s.status.LastErrMsg != "" { + msg = s.status.LastErrMsg } + return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg) + } + switch status { + case ServerStatusLoadingModel: + time.Sleep(time.Millisecond * 250) + slog.Debug("loading model") + case ServerStatusReady: + slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", time.Since(start).Seconds())) + return nil + default: + time.Sleep(time.Millisecond * 250) + continue } } } From c942e4a07b91dc6b78bb245241ea514b752e3d4d Mon Sep 17 00:00:00 2001 From: ManniX-ITA <20623405+mann1x@users.noreply.github.com> Date: Wed, 17 Apr 2024 17:40:32 +0200 Subject: [PATCH 2/3] Fixed startup sequence to report model loading --- llm/ext_server/server.cpp | 42 +++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/llm/ext_server/server.cpp b/llm/ext_server/server.cpp index 22117037..96df9f4b 100644 --- a/llm/ext_server/server.cpp +++ b/llm/ext_server/server.cpp @@ -2726,7 +2726,7 @@ static json format_detokenized_response(std::string content) static void log_server_request(const httplib::Request &req, const httplib::Response &res) { // skip GH copilot requests when using default port - if (req.path == "/v1/health" || req.path == "/v1/completions") + if (req.path == "/health" || req.path == "/v1/health" || req.path == "/v1/completions") { return; } @@ -3053,6 +3053,26 @@ int main(int argc, char **argv) { log_data["api_key"] = "api_key: " + std::to_string(sparams.api_keys.size()) + " keys loaded"; } + if (sparams.n_threads_http < 1) { + // +2 threads for monitoring endpoints + sparams.n_threads_http = std::max(params.n_parallel + 2, (int32_t) std::thread::hardware_concurrency() - 1); + } + log_data["n_threads_http"] = std::to_string(sparams.n_threads_http); + svr.new_task_queue = [&sparams] { return new httplib::ThreadPool(sparams.n_threads_http); }; + + LOG_INFO("HTTP server listening", log_data); + // run the HTTP server in a thread - see comment below + std::thread t([&]() + { + if (!svr.listen_after_bind()) + { + state.store(SERVER_STATE_ERROR); + return 1; + } + + return 0; + }); + // load the model if (!llama.load_model(params)) { @@ -3257,26 +3277,6 @@ int main(int argc, char **argv) { }*/ //); - if (sparams.n_threads_http < 1) { - // +2 threads for monitoring endpoints - sparams.n_threads_http = std::max(params.n_parallel + 2, (int32_t) std::thread::hardware_concurrency() - 1); - } - log_data["n_threads_http"] = std::to_string(sparams.n_threads_http); - svr.new_task_queue = [&sparams] { return new httplib::ThreadPool(sparams.n_threads_http); }; - - LOG_INFO("HTTP server listening", log_data); - // run the HTTP server in a thread - see comment below - std::thread t([&]() - { - if (!svr.listen_after_bind()) - { - state.store(SERVER_STATE_ERROR); - return 1; - } - - return 0; - }); - llama.queue_tasks.on_new_task(std::bind( &llama_server_context::process_single_task, &llama, std::placeholders::_1)); llama.queue_tasks.on_finish_multitask(std::bind( From 84ac7ce139252506d77115a3152f36a5a4f3541a Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Thu, 9 May 2024 11:10:28 -0700 Subject: [PATCH 3/3] Refine subprocess reaping --- llm/server.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/llm/server.go b/llm/server.go index b452434e..4600d00f 100644 --- a/llm/server.go +++ b/llm/server.go @@ -53,6 +53,7 @@ type llmServer struct { estimatedTotal uint64 // Total size of model totalLayers uint64 gpuCount int + loadDuration time.Duration // Record how long it took the model to load sem *semaphore.Weighted } @@ -291,6 +292,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr sem: semaphore.NewWeighted(int64(numParallel)), totalLayers: ggml.KV().BlockCount() + 1, gpuCount: gpuCount, + done: make(chan error, 1), } s.cmd.Env = os.Environ() @@ -339,6 +341,11 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr continue } + // reap subprocess when it exits + go func() { + s.done <- s.cmd.Wait() + }() + return s, nil } @@ -486,6 +493,7 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { expiresAt := time.Now().Add(10 * time.Minute) // be generous with timeout, large models can take a while to load slog.Info("waiting for llama runner to start responding") + var lastStatus ServerStatus = -1 for { select { @@ -500,12 +508,6 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { return fmt.Errorf("llama runner process has terminated: %v %s", err, msg) default: } - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - status, err := s.getServerStatus(ctx) - if err != nil { - slog.Debug("server not yet available", "error", err) - } if time.Now().After(expiresAt) { // timeout msg := "" @@ -521,14 +523,20 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { } return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg) } + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + status, _ := s.getServerStatus(ctx) + if lastStatus != status && status != ServerStatusReady { + // Only log on status changes + slog.Info("waiting for server to become available", "status", status.ToString()) + } switch status { - case ServerStatusLoadingModel: - time.Sleep(time.Millisecond * 250) - slog.Debug("loading model") case ServerStatusReady: - slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", time.Since(start).Seconds())) + s.loadDuration = time.Since(start) + slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", s.loadDuration.Seconds())) return nil default: + lastStatus = status time.Sleep(time.Millisecond * 250) continue } @@ -930,8 +938,11 @@ func (s *llmServer) Close() error { if err := s.cmd.Process.Kill(); err != nil { return err } - - _ = s.cmd.Wait() + // if ProcessState is already populated, Wait already completed, no need to wait again + if s.cmd.ProcessState == nil { + slog.Debug("waiting for llama server to exit") + <-s.done + } slog.Debug("llama server stopped") }