118 lines
3.3 KiB
Go
118 lines
3.3 KiB
Go
|
//go:build integration
|
||
|
|
||
|
package integration
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log/slog"
|
||
|
"os"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ollama/ollama/api"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
)
|
||
|
|
||
|
func TestMaxQueue(t *testing.T) {
|
||
|
// Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU
|
||
|
// Also note that by default Darwin can't sustain > ~128 connections without adjusting limits
|
||
|
threadCount := 32
|
||
|
mq := os.Getenv("OLLAMA_MAX_QUEUE")
|
||
|
if mq != "" {
|
||
|
var err error
|
||
|
threadCount, err = strconv.Atoi(mq)
|
||
|
require.NoError(t, err)
|
||
|
} else {
|
||
|
os.Setenv("OLLAMA_MAX_QUEUE", fmt.Sprintf("%d", threadCount))
|
||
|
}
|
||
|
|
||
|
req := api.GenerateRequest{
|
||
|
Model: "orca-mini",
|
||
|
Prompt: "write a long historical fiction story about christopher columbus. use at least 10 facts from his actual journey",
|
||
|
Options: map[string]interface{}{
|
||
|
"seed": 42,
|
||
|
"temperature": 0.0,
|
||
|
},
|
||
|
}
|
||
|
resp := []string{"explore", "discover", "ocean"}
|
||
|
|
||
|
// CPU mode takes much longer at the limit with a large queue setting
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||
|
defer cancel()
|
||
|
client, _, cleanup := InitServerConnection(ctx, t)
|
||
|
defer cleanup()
|
||
|
|
||
|
require.NoError(t, PullIfMissing(ctx, client, req.Model))
|
||
|
|
||
|
// Context for the worker threads so we can shut them down
|
||
|
// embedCtx, embedCancel := context.WithCancel(ctx)
|
||
|
embedCtx := ctx
|
||
|
|
||
|
var genwg sync.WaitGroup
|
||
|
go func() {
|
||
|
genwg.Add(1)
|
||
|
defer genwg.Done()
|
||
|
slog.Info("Starting generate request")
|
||
|
DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second)
|
||
|
slog.Info("generate completed")
|
||
|
}()
|
||
|
|
||
|
// Give the generate a chance to get started before we start hammering on embed requests
|
||
|
time.Sleep(5 * time.Millisecond)
|
||
|
|
||
|
threadCount += 10 // Add a few extra to ensure we push the queue past its limit
|
||
|
busyCount := 0
|
||
|
resetByPeerCount := 0
|
||
|
canceledCount := 0
|
||
|
succesCount := 0
|
||
|
counterMu := sync.Mutex{}
|
||
|
var embedwg sync.WaitGroup
|
||
|
for i := 0; i < threadCount; i++ {
|
||
|
go func(i int) {
|
||
|
embedwg.Add(1)
|
||
|
defer embedwg.Done()
|
||
|
slog.Info("embed started", "id", i)
|
||
|
embedReq := api.EmbeddingRequest{
|
||
|
Model: req.Model,
|
||
|
Prompt: req.Prompt,
|
||
|
Options: req.Options,
|
||
|
}
|
||
|
// Fresh client for every request
|
||
|
client, _ = GetTestEndpoint()
|
||
|
|
||
|
resp, genErr := client.Embeddings(embedCtx, &embedReq)
|
||
|
counterMu.Lock()
|
||
|
defer counterMu.Unlock()
|
||
|
switch {
|
||
|
case genErr == nil:
|
||
|
succesCount++
|
||
|
require.Greater(t, len(resp.Embedding), 5) // somewhat arbitrary, but sufficient to be reasonable
|
||
|
case errors.Is(genErr, context.Canceled):
|
||
|
canceledCount++
|
||
|
case strings.Contains(genErr.Error(), "busy"):
|
||
|
busyCount++
|
||
|
case strings.Contains(genErr.Error(), "connection reset by peer"):
|
||
|
resetByPeerCount++
|
||
|
default:
|
||
|
require.NoError(t, genErr, "%d request failed", i)
|
||
|
}
|
||
|
|
||
|
slog.Info("embed finished", "id", i)
|
||
|
}(i)
|
||
|
}
|
||
|
genwg.Wait()
|
||
|
slog.Info("generate done, waiting for embeds")
|
||
|
embedwg.Wait()
|
||
|
|
||
|
require.Equal(t, resetByPeerCount, 0, "Connections reset by peer, have you updated your fd and socket limits?")
|
||
|
require.True(t, busyCount > 0, "no requests hit busy error but some should have")
|
||
|
require.True(t, canceledCount == 0, "no requests should have been canceled due to timeout")
|
||
|
|
||
|
slog.Info("embeds completed", "success", succesCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount)
|
||
|
}
|