2024-03-14 10:24:13 -07:00
package llm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"log/slog"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
2024-03-30 09:50:05 -07:00
"golang.org/x/sync/semaphore"
2024-03-14 10:24:13 -07:00
"github.com/ollama/ollama/api"
2024-05-28 08:21:10 +08:00
"github.com/ollama/ollama/envconfig"
2024-03-14 10:24:13 -07:00
"github.com/ollama/ollama/format"
"github.com/ollama/ollama/gpu"
)
2024-03-30 09:50:05 -07:00
type LlamaServer interface {
Ping ( ctx context . Context ) error
WaitUntilRunning ( ctx context . Context ) error
Completion ( ctx context . Context , req CompletionRequest , fn func ( CompletionResponse ) ) error
Embedding ( ctx context . Context , prompt string ) ( [ ] float64 , error )
Tokenize ( ctx context . Context , content string ) ( [ ] int , error )
Detokenize ( ctx context . Context , tokens [ ] int ) ( string , error )
Close ( ) error
2024-06-03 19:09:23 -07:00
EstimatedVRAM ( ) uint64 // Total VRAM across all GPUs
2024-05-13 17:17:36 -07:00
EstimatedTotal ( ) uint64
2024-06-05 12:07:20 -07:00
EstimatedVRAMByGPU ( gpuID string ) uint64
2024-03-30 09:50:05 -07:00
}
// llmServer is an instance of the llama.cpp server
type llmServer struct {
2024-03-14 10:24:13 -07:00
port int
cmd * exec . Cmd
done chan error // Channel to signal when the process exits
status * StatusWriter
2024-04-02 16:44:10 -07:00
options api . Options
2024-03-30 09:50:05 -07:00
2024-06-03 19:09:23 -07:00
estimate MemoryEstimate
totalLayers uint64
// gpuCount int
gpus gpu . GpuInfoList // Recorded just before the model loaded, free space will be incorrect
loadDuration time . Duration // Record how long it took the model to load
2024-05-18 12:34:31 -07:00
loadProgress float32
2024-03-30 09:50:05 -07:00
sem * semaphore . Weighted
2024-03-14 10:24:13 -07:00
}
2024-06-24 21:47:52 -07:00
// LoadModel will load a model from disk. The model must be in the GGML format.
//
// It collects array values for arrays with a size less than or equal to
// maxArraySize. If maxArraySize is 0, the default value of 1024 is used. If
// the maxArraySize is negative, all arrays are collected.
func LoadModel ( model string , maxArraySize int ) ( * GGML , error ) {
2024-03-30 09:50:05 -07:00
if _ , err := os . Stat ( model ) ; err != nil {
return nil , err
}
2024-03-14 10:24:13 -07:00
f , err := os . Open ( model )
if err != nil {
return nil , err
}
defer f . Close ( )
2024-06-24 21:47:52 -07:00
ggml , _ , err := DecodeGGML ( f , maxArraySize )
2024-03-30 09:50:05 -07:00
return ggml , err
}
2024-03-14 10:24:13 -07:00
2024-03-30 09:50:05 -07:00
// NewLlamaServer will run a server for the given GPUs
// The gpu list must be a single family.
2024-05-06 17:47:52 -07:00
func NewLlamaServer ( gpus gpu . GpuInfoList , model string , ggml * GGML , adapters , projectors [ ] string , opts api . Options , numParallel int ) ( LlamaServer , error ) {
2024-03-30 09:50:05 -07:00
var err error
2024-05-10 10:17:12 -07:00
var cpuRunner string
2024-05-18 12:34:31 -07:00
var estimate MemoryEstimate
2024-06-20 11:07:04 -07:00
var systemTotalMemory uint64
var systemFreeMemory uint64
systemMemInfo , err := gpu . GetCPUMem ( )
if err != nil {
slog . Error ( "failed to lookup system memory" , "error" , err )
} else {
systemTotalMemory = systemMemInfo . TotalMemory
systemFreeMemory = systemMemInfo . FreeMemory
slog . Debug ( "system memory" , "total" , format . HumanBytes2 ( systemTotalMemory ) , "free" , systemFreeMemory )
}
2024-03-14 10:24:13 -07:00
2024-06-03 19:09:23 -07:00
// If the user wants zero GPU layers, reset the gpu list to be CPU/system ram info
if opts . NumGPU == 0 {
gpus = gpu . GetCPUInfo ( )
}
if len ( gpus ) == 1 && gpus [ 0 ] . Library == "cpu" {
2024-03-30 09:50:05 -07:00
cpuRunner = serverForCpu ( )
2024-05-18 12:34:31 -07:00
estimate = EstimateGPULayers ( gpus , ggml , projectors , opts )
2024-03-30 09:50:05 -07:00
} else {
2024-05-18 12:34:31 -07:00
estimate = EstimateGPULayers ( gpus , ggml , projectors , opts )
2024-03-30 09:50:05 -07:00
2024-05-21 22:07:57 -07:00
switch {
2024-06-20 11:07:04 -07:00
case gpus [ 0 ] . Library == "metal" && estimate . VRAMSize > systemTotalMemory :
2024-03-30 09:50:05 -07:00
// disable partial offloading when model is greater than total system memory as this
// can lead to locking up the system
opts . NumGPU = 0
2024-05-18 12:34:31 -07:00
case gpus [ 0 ] . Library != "metal" && estimate . Layers == 0 :
2024-05-10 15:09:48 -07:00
// Don't bother loading into the GPU if no layers can fit
cpuRunner = serverForCpu ( )
2024-06-03 19:09:23 -07:00
gpus = gpu . GetCPUInfo ( )
2024-05-18 12:34:31 -07:00
case opts . NumGPU < 0 && estimate . Layers > 0 && gpus [ 0 ] . Library != "cpu" :
opts . NumGPU = estimate . Layers
2024-04-17 10:29:12 -07:00
}
}
2024-06-17 18:39:48 -07:00
estimate . log ( )
2024-03-30 09:50:05 -07:00
// Loop through potential servers
2024-05-21 22:07:57 -07:00
finalErr := errors . New ( "no suitable llama servers found" )
2024-03-14 10:24:13 -07:00
if len ( adapters ) > 1 {
return nil , errors . New ( "ollama supports only one lora adapter, but multiple were provided" )
}
2024-07-03 13:10:14 -07:00
availableServers := getAvailableServers ( )
if len ( availableServers ) == 0 {
if runtime . GOOS != "windows" {
slog . Warn ( "llama server binary disappeared, reinitializing payloads" )
err = Init ( )
if err != nil {
slog . Warn ( "failed to reinitialize payloads" , "error" , err )
return nil , err
}
availableServers = getAvailableServers ( )
} else {
return nil , finalErr
}
}
2024-03-30 09:50:05 -07:00
var servers [ ] string
if cpuRunner != "" {
servers = [ ] string { cpuRunner }
} else {
servers = serversForGpu ( gpus [ 0 ] ) // All GPUs in the list are matching Library and Variant
}
2024-05-04 11:46:01 -07:00
demandLib := envconfig . LLMLibrary
2024-03-14 10:24:13 -07:00
if demandLib != "" {
serverPath := availableServers [ demandLib ]
if serverPath == "" {
slog . Info ( fmt . Sprintf ( "Invalid OLLAMA_LLM_LIBRARY %s - not found" , demandLib ) )
} else {
slog . Info ( "user override" , "OLLAMA_LLM_LIBRARY" , demandLib , "path" , serverPath )
servers = [ ] string { demandLib }
2024-05-04 09:15:31 -07:00
if strings . HasPrefix ( demandLib , "cpu" ) {
// Omit the GPU flag to silence the warning
opts . NumGPU = - 1
}
2024-03-14 10:24:13 -07:00
}
}
if len ( servers ) == 0 {
2024-03-30 09:50:05 -07:00
return nil , fmt . Errorf ( "no servers found for %v" , gpus )
2024-03-14 10:24:13 -07:00
}
params := [ ] string {
"--model" , model ,
"--ctx-size" , fmt . Sprintf ( "%d" , opts . NumCtx ) ,
"--batch-size" , fmt . Sprintf ( "%d" , opts . NumBatch ) ,
"--embedding" ,
}
2024-05-09 13:52:56 -07:00
params = append ( params , "--log-disable" )
2024-03-14 10:24:13 -07:00
2024-04-02 16:06:45 -07:00
if opts . NumGPU >= 0 {
2024-03-14 10:24:13 -07:00
params = append ( params , "--n-gpu-layers" , fmt . Sprintf ( "%d" , opts . NumGPU ) )
}
2024-05-04 11:46:01 -07:00
if envconfig . Debug {
2024-03-14 10:24:13 -07:00
params = append ( params , "--verbose" )
}
if opts . MainGPU > 0 {
params = append ( params , "--main-gpu" , fmt . Sprintf ( "%d" , opts . MainGPU ) )
}
if len ( adapters ) > 0 {
// TODO: applying multiple adapters is not supported by the llama.cpp server yet
params = append ( params , "--lora" , adapters [ 0 ] )
}
if len ( projectors ) > 0 {
// TODO: applying multiple projectors is not supported by the llama.cpp server yet
params = append ( params , "--mmproj" , projectors [ 0 ] )
}
if opts . NumThread > 0 {
params = append ( params , "--threads" , fmt . Sprintf ( "%d" , opts . NumThread ) )
}
if ! opts . F16KV {
params = append ( params , "--memory-f32" )
}
2024-05-22 21:52:09 -07:00
flashAttnEnabled := envconfig . FlashAttention
2024-05-21 06:36:03 +10:00
for _ , g := range gpus {
2024-05-30 16:58:01 -07:00
// only cuda (compute capability 7+) and metal support flash attention
2024-05-21 06:36:03 +10:00
if g . Library != "metal" && ( g . Library != "cuda" || g . DriverMajor < 7 ) {
2024-05-22 21:52:09 -07:00
flashAttnEnabled = false
2024-05-21 06:36:03 +10:00
}
2024-05-30 16:58:01 -07:00
// mmap has issues with partial offloading on metal
if g . Library == "metal" &&
uint64 ( opts . NumGPU ) > 0 &&
uint64 ( opts . NumGPU ) < ggml . KV ( ) . BlockCount ( ) + 1 {
2024-06-17 12:14:42 -07:00
opts . UseMMap = api . TriStateFalse
2024-05-30 16:58:01 -07:00
}
2024-05-21 06:36:03 +10:00
}
2024-05-30 16:58:01 -07:00
2024-05-22 21:52:09 -07:00
if flashAttnEnabled {
2024-05-21 06:36:03 +10:00
params = append ( params , "--flash-attn" )
}
2024-06-17 12:14:42 -07:00
// Windows CUDA should not use mmap for best performance
2024-06-20 11:07:04 -07:00
// Linux with a model larger than free space, mmap leads to thrashing
2024-05-06 17:47:52 -07:00
// For CPU loads we want the memory to be allocated, not FS cache
2024-06-20 11:07:04 -07:00
if ( runtime . GOOS == "windows" && gpus [ 0 ] . Library == "cuda" && opts . UseMMap == api . TriStateUndefined ) ||
( runtime . GOOS == "linux" && systemFreeMemory < estimate . TotalSize && opts . UseMMap == api . TriStateUndefined ) ||
2024-05-06 17:47:52 -07:00
( gpus [ 0 ] . Library == "cpu" && opts . UseMMap == api . TriStateUndefined ) ||
2024-06-20 11:07:04 -07:00
opts . UseMMap == api . TriStateFalse {
2024-05-30 16:58:01 -07:00
params = append ( params , "--no-mmap" )
}
if opts . UseMLock {
params = append ( params , "--mlock" )
}
if opts . UseNUMA {
params = append ( params , "--numa" )
}
2024-03-30 09:50:05 -07:00
params = append ( params , "--parallel" , fmt . Sprintf ( "%d" , numParallel ) )
2024-05-18 12:34:31 -07:00
if estimate . TensorSplit != "" {
params = append ( params , "--tensor-split" , estimate . TensorSplit )
}
if estimate . TensorSplit != "" {
params = append ( params , "--tensor-split" , estimate . TensorSplit )
}
2024-05-21 22:21:04 -07:00
for i := range len ( servers ) {
2024-03-14 10:24:13 -07:00
dir := availableServers [ servers [ i ] ]
2024-04-22 16:22:05 -07:00
if dir == "" {
// Shouldn't happen
finalErr = fmt . Errorf ( "[%d] server %s not listed in available servers %v" , i , servers [ i ] , availableServers )
2024-05-09 16:23:37 -07:00
slog . Error ( "server list inconsistent" , "error" , finalErr )
2024-04-22 16:22:05 -07:00
continue
}
2024-03-14 10:24:13 -07:00
2024-05-04 09:15:31 -07:00
if strings . HasPrefix ( servers [ i ] , "cpu" ) {
2024-06-03 19:09:23 -07:00
gpus = gpu . GetCPUInfo ( )
2024-05-04 09:15:31 -07:00
}
2024-05-28 08:21:10 +08:00
// Find an availableServers port, retry on each iteration in case the failure was a port conflict race
2024-03-14 10:24:13 -07:00
port := 0
if a , err := net . ResolveTCPAddr ( "tcp" , "localhost:0" ) ; err == nil {
var l * net . TCPListener
if l , err = net . ListenTCP ( "tcp" , a ) ; err == nil {
port = l . Addr ( ) . ( * net . TCPAddr ) . Port
l . Close ( )
}
}
if port == 0 {
slog . Debug ( "ResolveTCPAddr failed " , "error" , err )
port = rand . Intn ( 65535 - 49152 ) + 49152 // get a random port in the ephemeral range
}
finalParams := append ( params , "--port" , strconv . Itoa ( port ) )
pathEnv := "LD_LIBRARY_PATH"
if runtime . GOOS == "windows" {
pathEnv = "PATH"
}
2024-06-15 13:17:20 -07:00
// prepend the server directory to LD_LIBRARY_PATH/PATH and the parent dir for common dependencies
libraryPaths := [ ] string { dir , filepath . Dir ( dir ) }
2024-03-30 09:50:05 -07:00
2024-03-14 10:24:13 -07:00
if libraryPath , ok := os . LookupEnv ( pathEnv ) ; ok {
// Append our runner directory to the path
// This will favor system libraries over our bundled library dependencies
2024-05-05 17:45:43 -07:00
libraryPaths = append ( libraryPaths , filepath . SplitList ( libraryPath ) ... )
2024-03-14 10:24:13 -07:00
}
2024-03-30 09:50:05 -07:00
// Note: we always put the dependency path first
// since this was the exact version we verified for AMD GPUs
// and we favor what the user had in their path
if gpus [ 0 ] . DependencyPath != "" {
// TODO refine for multi-gpu support
libraryPaths = append ( [ ] string { gpus [ 0 ] . DependencyPath } , libraryPaths ... )
}
2024-03-14 10:24:13 -07:00
server := filepath . Join ( dir , "ollama_llama_server" )
if runtime . GOOS == "windows" {
2024-05-21 22:07:57 -07:00
server += ".exe"
2024-03-14 10:24:13 -07:00
}
2024-04-23 10:05:26 -07:00
// Detect tmp cleaners wiping out the file
_ , err := os . Stat ( server )
if errors . Is ( err , os . ErrNotExist ) {
slog . Warn ( "llama server disappeared, reinitializing payloads" , "path" , server , "error" , err )
err = Init ( )
if err != nil {
slog . Warn ( "failed to reinitialize payloads" , "error" , err )
return nil , err
}
}
2024-03-30 09:50:05 -07:00
s := & llmServer {
2024-05-18 12:34:31 -07:00
port : port ,
cmd : exec . Command ( server , finalParams ... ) ,
status : NewStatusWriter ( os . Stderr ) ,
options : opts ,
estimate : estimate ,
sem : semaphore . NewWeighted ( int64 ( numParallel ) ) ,
totalLayers : ggml . KV ( ) . BlockCount ( ) + 1 ,
2024-06-03 19:09:23 -07:00
gpus : gpus ,
2024-05-18 12:34:31 -07:00
done : make ( chan error , 1 ) ,
2024-03-14 10:24:13 -07:00
}
2024-03-30 09:50:05 -07:00
2024-05-10 22:53:21 -07:00
s . cmd . Env = os . Environ ( )
2024-03-14 10:24:13 -07:00
s . cmd . Stdout = os . Stdout
s . cmd . Stderr = s . status
2024-05-31 16:15:21 -07:00
envWorkarounds := [ ] [ 2 ] string { }
for _ , gpu := range gpus {
envWorkarounds = append ( envWorkarounds , gpu . EnvWorkarounds ... )
}
2024-05-21 22:21:04 -07:00
visibleDevicesEnv , visibleDevicesEnvVal := gpus . GetVisibleDevicesEnv ( )
2024-05-10 22:53:21 -07:00
pathEnvVal := strings . Join ( libraryPaths , string ( filepath . ListSeparator ) )
// Update or add the path and visible devices variable with our adjusted version
pathNeeded := true
devicesNeeded := visibleDevicesEnv != ""
for i := range s . cmd . Env {
cmp := strings . SplitN ( s . cmd . Env [ i ] , "=" , 2 )
if strings . EqualFold ( cmp [ 0 ] , pathEnv ) {
s . cmd . Env [ i ] = pathEnv + "=" + pathEnvVal
pathNeeded = false
} else if devicesNeeded && strings . EqualFold ( cmp [ 0 ] , visibleDevicesEnv ) {
s . cmd . Env [ i ] = visibleDevicesEnv + "=" + visibleDevicesEnvVal
devicesNeeded = false
2024-05-31 16:15:21 -07:00
} else if len ( envWorkarounds ) != 0 {
for _ , kv := range envWorkarounds {
if strings . EqualFold ( cmp [ 0 ] , kv [ 0 ] ) {
s . cmd . Env [ i ] = kv [ 0 ] + "=" + kv [ 1 ]
}
}
2024-05-10 22:53:21 -07:00
}
2024-05-05 17:45:43 -07:00
}
2024-05-10 22:53:21 -07:00
if pathNeeded {
s . cmd . Env = append ( s . cmd . Env , pathEnv + "=" + pathEnvVal )
2024-05-05 17:45:43 -07:00
}
2024-05-10 22:53:21 -07:00
if devicesNeeded {
s . cmd . Env = append ( s . cmd . Env , visibleDevicesEnv + "=" + visibleDevicesEnvVal )
2024-03-30 09:50:05 -07:00
}
2024-03-14 10:24:13 -07:00
slog . Info ( "starting llama server" , "cmd" , s . cmd . String ( ) )
2024-05-15 14:42:57 -07:00
if envconfig . Debug {
filteredEnv := [ ] string { }
for _ , ev := range s . cmd . Env {
if strings . HasPrefix ( ev , "CUDA_" ) ||
strings . HasPrefix ( ev , "ROCM_" ) ||
strings . HasPrefix ( ev , "HIP_" ) ||
strings . HasPrefix ( ev , "HSA_" ) ||
strings . HasPrefix ( ev , "GGML_" ) ||
strings . HasPrefix ( ev , "PATH=" ) ||
strings . HasPrefix ( ev , "LD_LIBRARY_PATH=" ) {
filteredEnv = append ( filteredEnv , ev )
}
}
// Log at debug as the environment is inherited and might contain sensitive information
slog . Debug ( "subprocess" , "environment" , filteredEnv )
}
2024-03-14 10:24:13 -07:00
if err = s . cmd . Start ( ) ; err != nil {
2024-05-07 16:46:15 -07:00
// Detect permission denied and augment them essage about noexec
if errors . Is ( err , os . ErrPermission ) {
finalErr = fmt . Errorf ( "unable to start server %w. %s may have noexec set. Set OLLAMA_TMPDIR for server to a writable executable directory" , err , dir )
continue
}
2024-03-14 10:24:13 -07:00
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
}
err = fmt . Errorf ( "error starting the external llama server: %v %s" , err , msg )
finalErr = err
continue
}
2024-05-09 11:10:28 -07:00
// reap subprocess when it exits
go func ( ) {
s . done <- s . cmd . Wait ( )
} ( )
2024-03-14 10:24:13 -07:00
return s , nil
}
slog . Error ( "unable to load any llama server" , "error" , finalErr )
return nil , finalErr
}
2024-04-05 14:50:38 -07:00
func projectorMemoryRequirements ( filename string ) uint64 {
2024-03-14 10:24:13 -07:00
file , err := os . Open ( filename )
if err != nil {
return 0
}
defer file . Close ( )
2024-06-24 21:47:52 -07:00
ggml , _ , err := DecodeGGML ( file , 0 )
2024-03-14 10:24:13 -07:00
if err != nil {
return 0
}
2024-04-03 15:00:31 -07:00
var mem uint64
for _ , layer := range ggml . Tensors ( ) . Layers ( ) {
mem += layer . size ( )
2024-03-14 10:24:13 -07:00
}
2024-04-05 14:50:38 -07:00
return mem
2024-03-14 10:24:13 -07:00
}
type ServerStatus int
const ( // iota is reset to 0
ServerStatusReady ServerStatus = iota
2024-05-06 14:22:53 -07:00
ServerStatusNoSlotsAvailable
2024-03-14 10:24:13 -07:00
ServerStatusLoadingModel
ServerStatusNotResponding
ServerStatusError
)
2024-03-30 09:50:05 -07:00
func ( s ServerStatus ) ToString ( ) string {
switch s {
case ServerStatusReady :
return "llm server ready"
2024-05-06 14:22:53 -07:00
case ServerStatusNoSlotsAvailable :
2024-03-30 09:50:05 -07:00
return "llm busy - no slots available"
case ServerStatusLoadingModel :
return "llm server loading model"
case ServerStatusNotResponding :
return "llm server not responding"
default :
return "llm server error"
}
}
2024-03-14 10:24:13 -07:00
type ServerStatusResp struct {
2024-05-20 16:41:43 -07:00
Status string ` json:"status" `
SlotsIdle int ` json:"slots_idle" `
SlotsProcessing int ` json:"slots_processing" `
Error string ` json:"error" `
Progress float32 ` json:"progress" `
2024-03-14 10:24:13 -07:00
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) getServerStatus ( ctx context . Context ) ( ServerStatus , error ) {
2024-03-14 10:24:13 -07:00
// Fail fast if its exited
if s . cmd . ProcessState != nil {
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
}
2024-05-07 16:46:15 -07:00
if s . cmd . ProcessState . ExitCode ( ) == - 1 {
// Most likely a signal killed it, log some more details to try to help troubleshoot
slog . Warn ( "llama runner process no longer running" , "sys" , s . cmd . ProcessState . Sys ( ) , "string" , s . cmd . ProcessState . String ( ) )
}
2024-03-14 10:24:13 -07:00
return ServerStatusError , fmt . Errorf ( "llama runner process no longer running: %d %s" , s . cmd . ProcessState . ExitCode ( ) , msg )
}
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , fmt . Sprintf ( "http://127.0.0.1:%d/health" , s . port ) , nil )
if err != nil {
return ServerStatusError , fmt . Errorf ( "error creating GET request: %v" , err )
}
req . Header . Set ( "Content-Type" , "application/json" )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
if errors . Is ( err , context . DeadlineExceeded ) {
2024-05-21 22:07:57 -07:00
return ServerStatusNotResponding , errors . New ( "server not responding" )
2024-03-14 10:24:13 -07:00
}
return ServerStatusError , fmt . Errorf ( "health resp: %w" , err )
}
defer resp . Body . Close ( )
body , err := io . ReadAll ( resp . Body )
if err != nil {
return ServerStatusError , fmt . Errorf ( "read health request: %w" , err )
}
var status ServerStatusResp
if err := json . Unmarshal ( body , & status ) ; err != nil {
return ServerStatusError , fmt . Errorf ( "health unmarshal encode response: %w" , err )
}
switch status . Status {
case "ok" :
return ServerStatusReady , nil
case "no slot available" :
2024-05-06 14:22:53 -07:00
return ServerStatusNoSlotsAvailable , nil
2024-03-14 10:24:13 -07:00
case "loading model" :
2024-05-20 16:41:43 -07:00
s . loadProgress = status . Progress
2024-03-14 10:24:13 -07:00
return ServerStatusLoadingModel , nil
default :
return ServerStatusError , fmt . Errorf ( "server error: %+v" , status )
}
}
2024-05-06 14:22:53 -07:00
// getServerStatusRetry will retry if ServerStatusNoSlotsAvailable is received
func ( s * llmServer ) getServerStatusRetry ( ctx context . Context ) ( ServerStatus , error ) {
var retries int
for {
status , err := s . getServerStatus ( ctx )
if err != nil {
return status , err
}
if status == ServerStatusNoSlotsAvailable {
if retries >= 10 {
return status , fmt . Errorf ( "no slots available after %d retries" , retries )
}
time . Sleep ( 5 * time . Millisecond )
retries ++
continue
}
return status , nil
}
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Ping ( ctx context . Context ) error {
2024-03-14 10:24:13 -07:00
_ , err := s . getServerStatus ( ctx )
if err != nil {
slog . Debug ( "server unhealthy" , "error" , err )
return err
}
return nil
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) WaitUntilRunning ( ctx context . Context ) error {
2024-03-14 10:24:13 -07:00
start := time . Now ( )
2024-05-28 08:56:18 -07:00
stallDuration := 5 * time . Minute // If no progress happens
finalLoadDuration := 5 * time . Minute // After we hit 100%, give the runner more time to come online
stallTimer := time . Now ( ) . Add ( stallDuration ) // give up if we stall
2024-03-14 10:24:13 -07:00
slog . Info ( "waiting for llama runner to start responding" )
var lastStatus ServerStatus = - 1
2024-05-28 08:56:18 -07:00
fullyLoaded := false
2024-04-17 17:39:52 +02:00
2024-03-14 10:24:13 -07:00
for {
select {
2024-03-30 09:50:05 -07:00
case <- ctx . Done ( ) :
2024-05-25 09:23:28 -07:00
slog . Warn ( "client connection closed before server finished loading, aborting load" )
2024-04-26 17:38:29 -04:00
return fmt . Errorf ( "timed out waiting for llama runner to start: %w" , ctx . Err ( ) )
2024-03-14 10:24:13 -07:00
case err := <- s . done :
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
}
2024-07-01 16:04:13 -07:00
if strings . Contains ( msg , "unknown model" ) {
return fmt . Errorf ( "this model is not supported by your version of Ollama. You may need to upgrade" )
}
2024-03-14 10:24:13 -07:00
return fmt . Errorf ( "llama runner process has terminated: %v %s" , err , msg )
2024-05-08 16:44:35 -07:00
default :
}
2024-05-20 16:41:43 -07:00
if time . Now ( ) . After ( stallTimer ) {
2024-04-17 17:39:52 +02:00
// timeout
2024-03-14 10:24:13 -07:00
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
}
2024-05-20 16:41:43 -07:00
return fmt . Errorf ( "timed out waiting for llama runner to start - progress %0.2f - %s" , s . loadProgress , msg )
2024-04-17 17:39:52 +02:00
}
if s . cmd . ProcessState != nil {
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
2024-03-14 10:24:13 -07:00
}
2024-04-17 17:39:52 +02:00
return fmt . Errorf ( "llama runner process no longer running: %d %s" , s . cmd . ProcessState . ExitCode ( ) , msg )
}
2024-05-09 11:10:28 -07:00
ctx , cancel := context . WithTimeout ( ctx , 200 * time . Millisecond )
defer cancel ( )
2024-05-20 16:41:43 -07:00
priorProgress := s . loadProgress
2024-05-09 11:10:28 -07:00
status , _ := s . getServerStatus ( ctx )
if lastStatus != status && status != ServerStatusReady {
// Only log on status changes
slog . Info ( "waiting for server to become available" , "status" , status . ToString ( ) )
}
2024-04-17 17:39:52 +02:00
switch status {
case ServerStatusReady :
2024-05-09 11:10:28 -07:00
s . loadDuration = time . Since ( start )
slog . Info ( fmt . Sprintf ( "llama runner started in %0.2f seconds" , s . loadDuration . Seconds ( ) ) )
2024-04-17 17:39:52 +02:00
return nil
default :
2024-05-09 11:10:28 -07:00
lastStatus = status
2024-05-20 16:41:43 -07:00
// Reset the timer as long as we're making forward progress on the load
if priorProgress != s . loadProgress {
slog . Debug ( fmt . Sprintf ( "model load progress %0.2f" , s . loadProgress ) )
stallTimer = time . Now ( ) . Add ( stallDuration )
2024-05-28 08:56:18 -07:00
} else if ! fullyLoaded && int ( s . loadProgress * 100.0 ) >= 100 {
slog . Debug ( "model load completed, waiting for server to become available" , "status" , status . ToString ( ) )
stallTimer = time . Now ( ) . Add ( finalLoadDuration )
fullyLoaded = true
2024-05-20 16:41:43 -07:00
}
2024-04-17 17:39:52 +02:00
time . Sleep ( time . Millisecond * 250 )
continue
2024-03-14 10:24:13 -07:00
}
}
}
const jsonGrammar = `
root : := object
value : := object | array | string | number | ( "true" | "false" | "null" ) ws
object : :=
"{" ws (
string ":" ws value
( "," ws string ":" ws value ) *
) ? "}" ws
array : :=
"[" ws (
value
( "," ws value ) *
) ? "]" ws
string : :=
"\"" (
2024-06-09 13:57:09 -04:00
[ ^ " \ \ \ x7F \ x00 - \ x1F ] |
2024-03-14 10:24:13 -07:00
"\\" ( [ "\\/bfnrt] | " u " [ 0 - 9 a - fA - F ] [ 0 - 9 a - fA - F ] [ 0 - 9 a - fA - F ] [ 0 - 9 a - fA - F ] ) # escapes
) * "\"" ws
number : := ( "-" ? ( [ 0 - 9 ] | [ 1 - 9 ] [ 0 - 9 ] * ) ) ( "." [ 0 - 9 ] + ) ? ( [ eE ] [ - + ] ? [ 0 - 9 ] + ) ? ws
# Optional space : by convention , applied in this grammar after literal chars when allowed
ws : := ( [ \ t \ n ] ws ) ?
`
const maxBufferSize = 512 * format . KiloByte
type ImageData struct {
Data [ ] byte ` json:"data" `
ID int ` json:"id" `
}
type completion struct {
2024-05-09 13:30:14 -07:00
Content string ` json:"content" `
Model string ` json:"model" `
Prompt string ` json:"prompt" `
Stop bool ` json:"stop" `
StoppedLimit bool ` json:"stopped_limit" `
2024-03-14 10:24:13 -07:00
Timings struct {
PredictedN int ` json:"predicted_n" `
PredictedMS float64 ` json:"predicted_ms" `
PromptN int ` json:"prompt_n" `
PromptMS float64 ` json:"prompt_ms" `
}
}
type CompletionRequest struct {
Prompt string
Format string
Images [ ] ImageData
Options api . Options
}
type CompletionResponse struct {
Content string
2024-05-09 13:30:14 -07:00
DoneReason string
2024-03-14 10:24:13 -07:00
Done bool
PromptEvalCount int
PromptEvalDuration time . Duration
EvalCount int
EvalDuration time . Duration
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Completion ( ctx context . Context , req CompletionRequest , fn func ( CompletionResponse ) ) error {
if err := s . sem . Acquire ( ctx , 1 ) ; err != nil {
slog . Error ( "Failed to acquire semaphore" , "error" , err )
return err
}
defer s . sem . Release ( 1 )
2024-04-25 19:02:30 -04:00
// only allow maximum 10 "context shifts" to avoid infinite generation
if req . Options . NumPredict < 0 || req . Options . NumPredict > 10 * s . options . NumCtx {
req . Options . NumPredict = 10 * s . options . NumCtx
slog . Debug ( "setting token limit to 10x num_ctx" , "num_ctx" , s . options . NumCtx , "num_predict" , req . Options . NumPredict )
}
2024-03-14 10:24:13 -07:00
request := map [ string ] any {
"prompt" : req . Prompt ,
"stream" : true ,
"n_predict" : req . Options . NumPredict ,
"n_keep" : req . Options . NumKeep ,
"main_gpu" : req . Options . MainGPU ,
"temperature" : req . Options . Temperature ,
"top_k" : req . Options . TopK ,
"top_p" : req . Options . TopP ,
"tfs_z" : req . Options . TFSZ ,
"typical_p" : req . Options . TypicalP ,
"repeat_last_n" : req . Options . RepeatLastN ,
"repeat_penalty" : req . Options . RepeatPenalty ,
"presence_penalty" : req . Options . PresencePenalty ,
"frequency_penalty" : req . Options . FrequencyPenalty ,
"mirostat" : req . Options . Mirostat ,
"mirostat_tau" : req . Options . MirostatTau ,
"mirostat_eta" : req . Options . MirostatEta ,
"penalize_nl" : req . Options . PenalizeNewline ,
"seed" : req . Options . Seed ,
"stop" : req . Options . Stop ,
"image_data" : req . Images ,
"cache_prompt" : true ,
}
// Make sure the server is ready
2024-05-06 14:22:53 -07:00
status , err := s . getServerStatusRetry ( ctx )
2024-03-14 10:24:13 -07:00
if err != nil {
return err
} else if status != ServerStatusReady {
2024-03-30 09:50:05 -07:00
return fmt . Errorf ( "unexpected server status: %s" , status . ToString ( ) )
2024-03-14 10:24:13 -07:00
}
if req . Format == "json" {
request [ "grammar" ] = jsonGrammar
if ! strings . Contains ( strings . ToLower ( req . Prompt ) , "json" ) {
slog . Warn ( "Prompt does not specify that the LLM should response in JSON, but JSON format is expected. For best results specify that JSON is expected in the system prompt." )
}
}
2024-05-06 14:22:53 -07:00
// Handling JSON marshaling with special characters unescaped.
buffer := & bytes . Buffer { }
enc := json . NewEncoder ( buffer )
enc . SetEscapeHTML ( false )
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
if err := enc . Encode ( request ) ; err != nil {
return fmt . Errorf ( "failed to marshal data: %v" , err )
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
endpoint := fmt . Sprintf ( "http://127.0.0.1:%d/completion" , s . port )
serverReq , err := http . NewRequestWithContext ( ctx , http . MethodPost , endpoint , buffer )
if err != nil {
return fmt . Errorf ( "error creating POST request: %v" , err )
}
serverReq . Header . Set ( "Content-Type" , "application/json" )
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
res , err := http . DefaultClient . Do ( serverReq )
if err != nil {
return fmt . Errorf ( "POST predict: %v" , err )
}
defer res . Body . Close ( )
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
if res . StatusCode >= 400 {
bodyBytes , err := io . ReadAll ( res . Body )
2024-03-14 10:24:13 -07:00
if err != nil {
2024-05-06 14:22:53 -07:00
return fmt . Errorf ( "failed reading llm error response: %w" , err )
2024-03-14 10:24:13 -07:00
}
2024-05-06 14:22:53 -07:00
log . Printf ( "llm predict error: %s" , bodyBytes )
return fmt . Errorf ( "%s" , bodyBytes )
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
scanner := bufio . NewScanner ( res . Body )
buf := make ( [ ] byte , 0 , maxBufferSize )
scanner . Buffer ( buf , maxBufferSize )
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
// keep track of the last token generated, this is used to abort if the model starts looping
var lastToken string
var tokenRepeat int
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
for scanner . Scan ( ) {
select {
case <- ctx . Done ( ) :
// This handles the request cancellation
return ctx . Err ( )
default :
line := scanner . Bytes ( )
if len ( line ) == 0 {
continue
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
evt , ok := bytes . CutPrefix ( line , [ ] byte ( "data: " ) )
if ! ok {
return fmt . Errorf ( "error parsing llm response stream: %s" , line )
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
var c completion
if err := json . Unmarshal ( evt , & c ) ; err != nil {
2024-05-28 08:21:10 +08:00
return fmt . Errorf ( "error unmarshalling llm prediction response: %v" , err )
2024-05-06 14:22:53 -07:00
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
switch {
case strings . TrimSpace ( c . Content ) == lastToken :
tokenRepeat ++
default :
lastToken = strings . TrimSpace ( c . Content )
tokenRepeat = 0
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
// 30 picked as an arbitrary max token repeat limit, modify as needed
if tokenRepeat > 30 {
slog . Debug ( "prediction aborted, token repeat limit reached" )
return ctx . Err ( )
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
if c . Content != "" {
fn ( CompletionResponse {
Content : c . Content ,
} )
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
if c . Stop {
2024-05-09 13:30:14 -07:00
doneReason := "stop"
if c . StoppedLimit {
doneReason = "length"
}
2024-05-06 14:22:53 -07:00
fn ( CompletionResponse {
Done : true ,
2024-05-09 13:30:14 -07:00
DoneReason : doneReason ,
2024-05-06 14:22:53 -07:00
PromptEvalCount : c . Timings . PromptN ,
PromptEvalDuration : parseDurationMs ( c . Timings . PromptMS ) ,
EvalCount : c . Timings . PredictedN ,
EvalDuration : parseDurationMs ( c . Timings . PredictedMS ) ,
} )
return nil
2024-03-14 10:24:13 -07:00
}
}
2024-05-06 14:22:53 -07:00
}
2024-03-14 10:24:13 -07:00
2024-05-06 14:22:53 -07:00
if err := scanner . Err ( ) ; err != nil {
if strings . Contains ( err . Error ( ) , "unexpected EOF" ) {
s . Close ( )
msg := ""
if s . status != nil && s . status . LastErrMsg != "" {
msg = s . status . LastErrMsg
2024-03-14 10:24:13 -07:00
}
2024-05-06 14:22:53 -07:00
return fmt . Errorf ( "an unknown error was encountered while running the model %s" , msg )
2024-03-14 10:24:13 -07:00
}
2024-05-06 14:22:53 -07:00
return fmt . Errorf ( "error reading llm response: %v" , err )
2024-03-14 10:24:13 -07:00
}
2024-05-06 14:22:53 -07:00
return nil
2024-03-14 10:24:13 -07:00
}
type EmbeddingRequest struct {
Content string ` json:"content" `
}
type EmbeddingResponse struct {
Embedding [ ] float64 ` json:"embedding" `
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Embedding ( ctx context . Context , prompt string ) ( [ ] float64 , error ) {
if err := s . sem . Acquire ( ctx , 1 ) ; err != nil {
slog . Error ( "Failed to acquire semaphore" , "error" , err )
return nil , err
}
defer s . sem . Release ( 1 )
2024-05-06 14:22:53 -07:00
2024-03-14 10:24:13 -07:00
// Make sure the server is ready
2024-05-06 14:22:53 -07:00
status , err := s . getServerStatusRetry ( ctx )
2024-03-14 10:24:13 -07:00
if err != nil {
return nil , err
} else if status != ServerStatusReady {
2024-03-30 09:50:05 -07:00
return nil , fmt . Errorf ( "unexpected server status: %s" , status . ToString ( ) )
2024-03-14 10:24:13 -07:00
}
2024-05-31 18:54:21 -07:00
data , err := json . Marshal ( TokenizeRequest { Content : prompt } )
if err != nil {
2024-03-14 10:24:13 -07:00
return nil , fmt . Errorf ( "error marshaling embed data: %w" , err )
}
2024-05-31 18:54:21 -07:00
req , err := http . NewRequestWithContext ( ctx , http . MethodPost , fmt . Sprintf ( "http://127.0.0.1:%d/embedding" , s . port ) , bytes . NewBuffer ( data ) )
2024-03-14 10:24:13 -07:00
if err != nil {
return nil , fmt . Errorf ( "error creating embed request: %w" , err )
}
req . Header . Set ( "Content-Type" , "application/json" )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return nil , fmt . Errorf ( "do embedding request: %w" , err )
}
defer resp . Body . Close ( )
body , err := io . ReadAll ( resp . Body )
if err != nil {
return nil , fmt . Errorf ( "error reading embed response: %w" , err )
}
if resp . StatusCode >= 400 {
log . Printf ( "llm encode error: %s" , body )
return nil , fmt . Errorf ( "%s" , body )
}
var embedding EmbeddingResponse
if err := json . Unmarshal ( body , & embedding ) ; err != nil {
return nil , fmt . Errorf ( "unmarshal tokenize response: %w" , err )
}
return embedding . Embedding , nil
}
2024-05-31 18:54:21 -07:00
type TokenizeRequest struct {
Content string ` json:"content" `
}
type TokenizeResponse struct {
Tokens [ ] int ` json:"tokens" `
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Tokenize ( ctx context . Context , content string ) ( [ ] int , error ) {
2024-05-31 18:54:21 -07:00
// Make sure the server is ready
status , err := s . getServerStatus ( ctx )
if err != nil {
return nil , err
} else if status != ServerStatusReady && status != ServerStatusNoSlotsAvailable {
return nil , fmt . Errorf ( "unexpected server status: %s" , status . ToString ( ) )
}
data , err := json . Marshal ( TokenizeRequest { Content : content } )
if err != nil {
return nil , fmt . Errorf ( "marshaling encode data: %w" , err )
}
req , err := http . NewRequestWithContext ( ctx , http . MethodPost , fmt . Sprintf ( "http://127.0.0.1:%d/tokenize" , s . port ) , bytes . NewBuffer ( data ) )
if err != nil {
return nil , fmt . Errorf ( "encode request: %w" , err )
}
req . Header . Set ( "Content-Type" , "application/json" )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return nil , fmt . Errorf ( "do encode request: %w" , err )
}
defer resp . Body . Close ( )
body , err := io . ReadAll ( resp . Body )
if err != nil {
return nil , fmt . Errorf ( "read encode request: %w" , err )
}
if resp . StatusCode >= 400 {
log . Printf ( "llm encode error: %s" , body )
return nil , fmt . Errorf ( "%s" , body )
}
var encoded TokenizeResponse
if err := json . Unmarshal ( body , & encoded ) ; err != nil {
return nil , fmt . Errorf ( "unmarshal encode response: %w" , err )
}
return encoded . Tokens , nil
}
type DetokenizeRequest struct {
Tokens [ ] int ` json:"tokens" `
}
type DetokenizeResponse struct {
Content string ` json:"content" `
2024-03-14 10:24:13 -07:00
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Detokenize ( ctx context . Context , tokens [ ] int ) ( string , error ) {
2024-05-31 18:54:21 -07:00
// Make sure the server is ready
status , err := s . getServerStatus ( ctx )
if err != nil {
return "" , err
} else if status != ServerStatusReady && status != ServerStatusNoSlotsAvailable {
return "" , fmt . Errorf ( "unexpected server status: %s" , status . ToString ( ) )
}
data , err := json . Marshal ( DetokenizeRequest { Tokens : tokens } )
if err != nil {
return "" , fmt . Errorf ( "marshaling decode data: %w" , err )
}
req , err := http . NewRequestWithContext ( ctx , http . MethodPost , fmt . Sprintf ( "http://127.0.0.1:%d/detokenize" , s . port ) , bytes . NewBuffer ( data ) )
if err != nil {
return "" , fmt . Errorf ( "decode request: %w" , err )
}
req . Header . Set ( "Content-Type" , "application/json" )
resp , err := http . DefaultClient . Do ( req )
if err != nil {
return "" , fmt . Errorf ( "do decode request: %w" , err )
}
defer resp . Body . Close ( )
body , err := io . ReadAll ( resp . Body )
if err != nil {
return "" , fmt . Errorf ( "read decode request: %w" , err )
}
if resp . StatusCode >= 400 {
log . Printf ( "llm decode error: %s" , body )
return "" , fmt . Errorf ( "%s" , body )
}
var decoded DetokenizeResponse
if err := json . Unmarshal ( body , & decoded ) ; err != nil {
return "" , fmt . Errorf ( "unmarshal encode response: %w" , err )
}
return decoded . Content , nil
2024-03-14 10:24:13 -07:00
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) Close ( ) error {
2024-03-14 10:24:13 -07:00
if s . cmd != nil {
slog . Debug ( "stopping llama server" )
2024-04-28 16:41:38 +00:00
if err := s . cmd . Process . Kill ( ) ; err != nil {
return err
}
2024-05-09 11:10:28 -07:00
// if ProcessState is already populated, Wait already completed, no need to wait again
if s . cmd . ProcessState == nil {
slog . Debug ( "waiting for llama server to exit" )
<- s . done
}
2024-04-29 18:06:56 +00:00
slog . Debug ( "llama server stopped" )
2024-03-14 10:24:13 -07:00
}
return nil
}
2024-03-30 09:50:05 -07:00
func ( s * llmServer ) EstimatedVRAM ( ) uint64 {
2024-05-18 12:34:31 -07:00
return s . estimate . VRAMSize
2024-03-30 09:50:05 -07:00
}
2024-05-13 17:17:36 -07:00
func ( s * llmServer ) EstimatedTotal ( ) uint64 {
2024-05-18 12:34:31 -07:00
return s . estimate . TotalSize
2024-05-13 17:17:36 -07:00
}
2024-06-05 12:07:20 -07:00
func ( s * llmServer ) EstimatedVRAMByGPU ( gpuID string ) uint64 {
2024-06-03 19:09:23 -07:00
for i , gpu := range s . gpus {
if gpu . ID == gpuID {
return s . estimate . GPUSizes [ i ]
}
}
return 0
}
2024-03-14 10:24:13 -07:00
func parseDurationMs ( ms float64 ) time . Duration {
dur , err := time . ParseDuration ( fmt . Sprintf ( "%fms" , ms ) )
if err != nil {
panic ( err )
}
return dur
}