diff --git a/integration/max_queue_test.go b/integration/max_queue_test.go new file mode 100644 index 00000000..43b15c6c --- /dev/null +++ b/integration/max_queue_test.go @@ -0,0 +1,117 @@ +//go:build integration + +package integration + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/ollama/ollama/api" + "github.com/stretchr/testify/require" +) + +func TestMaxQueue(t *testing.T) { + // Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU + // Also note that by default Darwin can't sustain > ~128 connections without adjusting limits + threadCount := 32 + mq := os.Getenv("OLLAMA_MAX_QUEUE") + if mq != "" { + var err error + threadCount, err = strconv.Atoi(mq) + require.NoError(t, err) + } else { + os.Setenv("OLLAMA_MAX_QUEUE", fmt.Sprintf("%d", threadCount)) + } + + req := api.GenerateRequest{ + Model: "orca-mini", + Prompt: "write a long historical fiction story about christopher columbus. use at least 10 facts from his actual journey", + Options: map[string]interface{}{ + "seed": 42, + "temperature": 0.0, + }, + } + resp := []string{"explore", "discover", "ocean"} + + // CPU mode takes much longer at the limit with a large queue setting + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + client, _, cleanup := InitServerConnection(ctx, t) + defer cleanup() + + require.NoError(t, PullIfMissing(ctx, client, req.Model)) + + // Context for the worker threads so we can shut them down + // embedCtx, embedCancel := context.WithCancel(ctx) + embedCtx := ctx + + var genwg sync.WaitGroup + go func() { + genwg.Add(1) + defer genwg.Done() + slog.Info("Starting generate request") + DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second) + slog.Info("generate completed") + }() + + // Give the generate a chance to get started before we start hammering on embed requests + time.Sleep(5 * time.Millisecond) + + threadCount += 10 // Add a few extra to ensure we push the queue past its limit + busyCount := 0 + resetByPeerCount := 0 + canceledCount := 0 + succesCount := 0 + counterMu := sync.Mutex{} + var embedwg sync.WaitGroup + for i := 0; i < threadCount; i++ { + go func(i int) { + embedwg.Add(1) + defer embedwg.Done() + slog.Info("embed started", "id", i) + embedReq := api.EmbeddingRequest{ + Model: req.Model, + Prompt: req.Prompt, + Options: req.Options, + } + // Fresh client for every request + client, _ = GetTestEndpoint() + + resp, genErr := client.Embeddings(embedCtx, &embedReq) + counterMu.Lock() + defer counterMu.Unlock() + switch { + case genErr == nil: + succesCount++ + require.Greater(t, len(resp.Embedding), 5) // somewhat arbitrary, but sufficient to be reasonable + case errors.Is(genErr, context.Canceled): + canceledCount++ + case strings.Contains(genErr.Error(), "busy"): + busyCount++ + case strings.Contains(genErr.Error(), "connection reset by peer"): + resetByPeerCount++ + default: + require.NoError(t, genErr, "%d request failed", i) + } + + slog.Info("embed finished", "id", i) + }(i) + } + genwg.Wait() + slog.Info("generate done, waiting for embeds") + embedwg.Wait() + + require.Equal(t, resetByPeerCount, 0, "Connections reset by peer, have you updated your fd and socket limits?") + require.True(t, busyCount > 0, "no requests hit busy error but some should have") + require.True(t, canceledCount == 0, "no requests should have been canceled due to timeout") + + slog.Info("embeds completed", "success", succesCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount) +}