ollama/llm/llama.go
Patrick Devine 910e9401d0
Multimodal support (#1216)
---------

Co-authored-by: Matt Apperson <mattapperson@Matts-MacBook-Pro.local>
2023-12-11 13:56:22 -08:00

852 lines
23 KiB
Go

package llm
import (
"bufio"
"bytes"
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"log"
"math/rand"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/jmorganca/ollama/api"
"github.com/jmorganca/ollama/format"
)
const jsonGrammar = `
root ::= object
value ::= object | array | string | number | ("true" | "false" | "null") ws
object ::=
"{" ws (
string ":" ws value
("," ws string ":" ws value)*
)? "}" ws
array ::=
"[" ws (
value
("," ws value)*
)? "]" ws
string ::=
"\"" (
[^"\\] |
"\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) # escapes
)* "\"" ws
number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? ws
# Optional space: by convention, applied in this grammar after literal chars when allowed
ws ::= ([ \t\n] ws)?
`
//go:embed llama.cpp/*/build/*/bin/*
var llamaCppEmbed embed.FS
type ModelRunner struct {
Type string // "gguf" or "ggml"
Path string // path to the model runner executable
Accelerated bool
}
func chooseRunners(workDir, runnerType string) []ModelRunner {
buildPath := path.Join("llama.cpp", runnerType, "build")
var runners []ModelRunner
// set the runners based on the OS
// IMPORTANT: the order of the runners in the array is the priority order
switch runtime.GOOS {
case "darwin":
if runtime.GOARCH == "arm64" {
runners = []ModelRunner{{Type: runnerType, Path: path.Join(buildPath, "metal", "bin", "ollama-runner")}}
} else {
runners = []ModelRunner{{Type: runnerType, Path: path.Join(buildPath, "cpu", "bin", "ollama-runner")}}
}
case "linux":
runners = []ModelRunner{
{Type: runnerType, Path: path.Join(buildPath, "cuda", "bin", "ollama-runner"), Accelerated: true},
{Type: runnerType, Path: path.Join(buildPath, "cpu", "bin", "ollama-runner")},
}
case "windows":
// TODO: select windows GPU runner here when available
runners = []ModelRunner{
{Type: runnerType, Path: path.Join(buildPath, "cuda", "bin", "Release", "ollama-runner.exe"), Accelerated: true},
{Type: runnerType, Path: path.Join(buildPath, "cpu", "bin", "Release", "ollama-runner.exe")},
}
default:
log.Printf("unknown OS, running on CPU: %s", runtime.GOOS)
runners = []ModelRunner{
{Type: runnerType, Path: path.Join(buildPath, "cpu", "bin", "ollama-runner")},
}
}
runnerAvailable := false // if no runner files are found in the embed, this flag will cause a fast fail
for _, r := range runners {
// find all the files in the runner's bin directory
files, err := fs.Glob(llamaCppEmbed, path.Join(path.Dir(r.Path), "*"))
if err != nil {
// this is expected, ollama may be compiled without all runners packed in
log.Printf("%s runner not found: %v", r.Path, err)
continue
}
for _, f := range files {
runnerAvailable = true
srcFile, err := llamaCppEmbed.Open(f)
if err != nil {
log.Fatalf("read llama runner %s: %v", f, err)
}
defer srcFile.Close()
// create the directory in case it does not exist, filepath.Dir() converts the file path to the OS's format
destPath := filepath.Join(workDir, filepath.Dir(f))
if err := os.MkdirAll(destPath, 0o755); err != nil {
log.Fatalf("create runner temp dir %s: %v", filepath.Dir(f), err)
}
// create the path to the destination file, filepath.Base() converts the file path to the OS's format
destFile := filepath.Join(destPath, filepath.Base(f))
_, err = os.Stat(destFile)
switch {
case errors.Is(err, os.ErrNotExist):
destFile, err := os.OpenFile(destFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o755)
if err != nil {
log.Fatalf("write llama runner %s: %v", f, err)
}
defer destFile.Close()
if _, err := io.Copy(destFile, srcFile); err != nil {
log.Fatalf("copy llama runner %s: %v", f, err)
}
case err != nil:
log.Fatalf("stat llama runner %s: %v", f, err)
}
}
}
if !runnerAvailable {
log.Fatalf("%s runner not found", runnerType)
}
// return the runners to try in priority order
localRunnersByPriority := []ModelRunner{}
for _, r := range runners {
// clean the ModelRunner paths so that they match the OS we are running on
localRunnersByPriority = append(localRunnersByPriority, ModelRunner{
Type: r.Type,
Path: filepath.Clean(path.Join(workDir, r.Path)),
Accelerated: r.Accelerated,
})
}
return localRunnersByPriority
}
type llamaModel struct {
hyperparameters llamaHyperparameters
}
func (llm *llamaModel) ModelFamily() string {
return "llama"
}
func llamaModelType(numLayer uint32) string {
switch numLayer {
case 26:
return "3B"
case 32:
return "7B"
case 40:
return "13B"
case 48:
return "34B"
case 60:
return "30B"
case 80:
return "65B"
default:
return "unknown"
}
}
func (llm *llamaModel) ModelType() string {
return llamaModelType(llm.hyperparameters.NumLayer)
}
func (llm *llamaModel) FileType() string {
return fileType(llm.hyperparameters.FileType)
}
func (llm *llamaModel) NumLayers() int64 {
return int64(llm.hyperparameters.NumLayer)
}
type llamaHyperparameters struct {
// NumVocab is the size of the model's vocabulary.
NumVocab uint32
// NumEmbd is the size of the model's embedding layer.
NumEmbd uint32
NumMult uint32
NumHead uint32
// NumLayer is the number of layers in the model.
NumLayer uint32
NumRot uint32
// FileType describes the quantization level of the model, e.g. Q4_0, Q5_K, etc.
FileType uint32
}
type Running struct {
Port int
Cmd *exec.Cmd
Cancel context.CancelFunc
exitOnce sync.Once
exitCh chan error // channel to receive the exit status of the subprocess
*StatusWriter // captures error messages from the llama runner process
}
type ImageData struct {
Data []byte `json:"data"`
ID int `json:"id"`
}
type llama struct {
api.Options
ImageData []ImageData
Running
}
var (
errNvidiaSMI = errors.New("warning: gpu support may not be enabled, check that you have installed GPU drivers: nvidia-smi command failed")
errAvailableVRAM = errors.New("not enough VRAM available, falling back to CPU only")
)
// CheckVRAM returns the free VRAM in bytes on Linux machines with NVIDIA GPUs
func CheckVRAM() (int64, error) {
cmd := exec.Command("nvidia-smi", "--query-gpu=memory.free", "--format=csv,noheader,nounits")
var stdout bytes.Buffer
cmd.Stdout = &stdout
err := cmd.Run()
if err != nil {
return 0, errNvidiaSMI
}
var freeMiB int64
scanner := bufio.NewScanner(&stdout)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "[Insufficient Permissions]") {
return 0, fmt.Errorf("GPU support may not enabled, check you have installed GPU drivers and have the necessary permissions to run nvidia-smi")
}
vram, err := strconv.ParseInt(strings.TrimSpace(line), 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse available VRAM: %v", err)
}
freeMiB += vram
}
freeBytes := freeMiB * 1024 * 1024
if freeBytes < 2*format.GigaByte {
log.Printf("less than 2 GB VRAM available")
return 0, errAvailableVRAM
}
return freeBytes, nil
}
func NumGPU(numLayer, fileSizeBytes int64, opts api.Options) int {
if opts.NumGPU != -1 {
return opts.NumGPU
}
if runtime.GOOS == "linux" || runtime.GOOS == "windows" {
freeBytes, err := CheckVRAM()
if err != nil {
if !errors.Is(err, errNvidiaSMI) {
log.Print(err.Error())
}
// nvidia driver not installed or no nvidia GPU found
return 0
}
/*
Calculate bytes per layer, this will roughly be the size of the model file divided by the number of layers.
We can store the model weights and the kv cache in vram,
to enable kv chache vram storage add two additional layers to the number of layers retrieved from the model file.
*/
bytesPerLayer := fileSizeBytes / numLayer
// 75% of the absolute max number of layers we can fit in available VRAM, off-loading too many layers to the GPU can cause OOM errors
layers := int(freeBytes/bytesPerLayer) * 3 / 4
log.Printf("%d MB VRAM available, loading up to %d GPU layers", freeBytes/(1024*1024), layers)
return layers
}
// default to enable metal on macOS
return 1
}
// StatusWriter is a writer that captures error messages from the llama runner process
type StatusWriter struct {
ErrCh chan error
LastErrMsg string
}
func NewStatusWriter() *StatusWriter {
return &StatusWriter{
ErrCh: make(chan error, 1),
}
}
func (w *StatusWriter) Write(b []byte) (int, error) {
var errMsg string
if _, after, ok := bytes.Cut(b, []byte("error:")); ok {
errMsg = string(bytes.TrimSpace(after))
} else if _, after, ok := bytes.Cut(b, []byte("CUDA error")); ok {
errMsg = string(bytes.TrimSpace(after))
}
if errMsg != "" {
w.LastErrMsg = errMsg
w.ErrCh <- fmt.Errorf("llama runner: %s", errMsg)
}
return os.Stderr.Write(b)
}
func newLlama(model string, adapters, projectors []string, runners []ModelRunner, numLayers int64, opts api.Options) (*llama, error) {
fileInfo, err := os.Stat(model)
if err != nil {
return nil, err
}
if len(adapters) > 1 {
return nil, errors.New("ollama supports only one lora adapter, but multiple were provided")
}
numGPU := NumGPU(numLayers, fileInfo.Size(), opts)
params := []string{
"--model", model,
"--ctx-size", fmt.Sprintf("%d", opts.NumCtx),
"--batch-size", fmt.Sprintf("%d", opts.NumBatch),
"--n-gpu-layers", fmt.Sprintf("%d", numGPU),
"--embedding",
}
if opts.MainGPU > 0 {
params = append(params, "--main-gpu", fmt.Sprintf("%d", opts.MainGPU))
}
if opts.RopeFrequencyBase > 0 {
params = append(params, "--rope-freq-base", fmt.Sprintf("%f", opts.RopeFrequencyBase))
}
if opts.RopeFrequencyScale > 0 {
params = append(params, "--rope-freq-scale", fmt.Sprintf("%f", opts.RopeFrequencyScale))
}
if opts.NumGQA > 0 {
params = append(params, "--gqa", fmt.Sprintf("%d", opts.NumGQA))
}
if len(adapters) > 0 {
// TODO: applying multiple adapters is not supported by the llama.cpp server yet
params = append(params, "--lora", adapters[0])
}
if len(projectors) > 0 {
// TODO: applying multiple projectors is not supported by the llama.cpp server yet
params = append(params, "--mmproj", projectors[0])
}
if opts.NumThread > 0 {
params = append(params, "--threads", fmt.Sprintf("%d", opts.NumThread))
}
if !opts.F16KV {
params = append(params, "--memory-f32")
}
if opts.UseMLock {
params = append(params, "--mlock")
}
if !opts.UseMMap {
params = append(params, "--no-mmap")
}
if opts.UseNUMA {
params = append(params, "--numa")
}
var runnerErr error
// start the llama.cpp server with a retry in case the port is already in use
for _, runner := range runners {
if runner.Accelerated && numGPU == 0 {
log.Printf("skipping accelerated runner because num_gpu=0")
continue
}
if _, err := os.Stat(runner.Path); err != nil {
log.Printf("llama runner not found: %v", err)
continue
}
port := rand.Intn(65535-49152) + 49152 // get a random port in the ephemeral range
params := append(params, "--port", strconv.Itoa(port))
if runner.Type == "gguf" {
params = append(params, "--parallel", "2")
}
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(
ctx,
runner.Path,
params...,
)
var libraryPaths []string
if libraryPath, ok := os.LookupEnv("LD_LIBRARY_PATH"); ok {
libraryPaths = append(libraryPaths, libraryPath)
}
libraryPaths = append(libraryPaths, filepath.Dir(runner.Path))
cmd.Env = append(os.Environ(), fmt.Sprintf("LD_LIBRARY_PATH=%s", strings.Join(libraryPaths, ":")))
cmd.Stdout = os.Stderr
statusWriter := NewStatusWriter()
cmd.Stderr = statusWriter
llm := &llama{Options: opts, Running: Running{Port: port, Cmd: cmd, Cancel: cancel, exitCh: make(chan error)}}
log.Print("starting llama runner")
if err := llm.Cmd.Start(); err != nil {
log.Printf("error starting the external llama runner: %v", err)
continue
}
// monitor the llama runner process and signal when it exits
go func() {
err := llm.Cmd.Wait()
// default to printing the exit message of the command process, it will probably just say 'exit staus 1'
errMsg := err.Error()
// try to set a better error message if llama runner logs captured an error
if statusWriter.LastErrMsg != "" {
errMsg = statusWriter.LastErrMsg
}
log.Println(errMsg)
// llm.Cmd.Wait() can only be called once, use this exit channel to signal that the process has exited
llm.exitOnce.Do(func() {
close(llm.exitCh)
})
}()
if err := waitForServer(llm); err != nil {
log.Printf("error starting llama runner: %v", err)
llm.Close()
// default the runnerErr to the error returned by the most recent llama runner process
runnerErr = err
// capture the error directly from the runner process, if any
select {
case runnerErr = <-statusWriter.ErrCh:
default:
// the runner process probably timed out
}
// try again
continue
}
// server started successfully
return llm, nil
}
if runnerErr != nil {
// this is the error returned from the llama runner process that failed most recently
return nil, runnerErr
}
return nil, fmt.Errorf("failed to start a llama runner")
}
func waitForServer(llm *llama) error {
start := time.Now()
expiresAt := time.Now().Add(3 * time.Minute) // be generous with timeout, large models can take a while to load
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
log.Print("waiting for llama runner to start responding")
for {
select {
case <-llm.exitCh:
// failed to start subprocess
return fmt.Errorf("llama runner process has terminated")
case <-ticker.C:
if time.Now().After(expiresAt) {
// timeout
return fmt.Errorf("timed out waiting for llama runner to start")
}
if err := llm.Ping(context.Background()); err == nil {
// success
log.Printf("llama runner started in %f seconds", time.Since(start).Seconds())
return nil
}
}
}
}
func (llm *llama) Close() {
// signal the sub-process to terminate
llm.Cancel()
// wait for the command to exit to prevent race conditions with the next run
<-llm.exitCh
if llm.StatusWriter != nil && llm.StatusWriter.LastErrMsg != "" {
log.Printf("llama runner stopped with error: %v", llm.StatusWriter.LastErrMsg)
} else {
log.Print("llama runner stopped successfully")
}
}
func (llm *llama) SetOptions(opts api.Options) {
llm.Options = opts
}
type prediction struct {
Content string `json:"content"`
Model string `json:"model"`
Prompt string `json:"prompt"`
Stop bool `json:"stop"`
Timings struct {
PredictedN int `json:"predicted_n"`
PredictedMS float64 `json:"predicted_ms"`
PromptN int `json:"prompt_n"`
PromptMS float64 `json:"prompt_ms"`
}
}
const maxBufferSize = 512 * format.KiloByte
type PredictOpts struct {
Prompt string
Format string
Images []api.ImageData
CheckpointStart time.Time
CheckpointLoaded time.Time
}
type PredictResult struct {
CreatedAt time.Time
TotalDuration time.Duration
LoadDuration time.Duration
Content string
Done bool
PromptEvalCount int
PromptEvalDuration time.Duration
EvalCount int
EvalDuration time.Duration
}
func (llm *llama) Predict(ctx context.Context, predict PredictOpts, fn func(PredictResult)) error {
imageData := llm.ImageData
if len(predict.Images) > 0 {
for cnt, i := range predict.Images {
imageData = append(imageData, ImageData{Data: i, ID: cnt})
}
}
log.Printf("loaded %d images", len(imageData))
request := map[string]any{
"prompt": predict.Prompt,
"stream": true,
"n_predict": llm.NumPredict,
"n_keep": llm.NumKeep,
"main_gpu": llm.MainGPU,
"temperature": llm.Temperature,
"top_k": llm.TopK,
"top_p": llm.TopP,
"tfs_z": llm.TFSZ,
"typical_p": llm.TypicalP,
"repeat_last_n": llm.RepeatLastN,
"repeat_penalty": llm.RepeatPenalty,
"presence_penalty": llm.PresencePenalty,
"frequency_penalty": llm.FrequencyPenalty,
"mirostat": llm.Mirostat,
"mirostat_tau": llm.MirostatTau,
"mirostat_eta": llm.MirostatEta,
"penalize_nl": llm.PenalizeNewline,
"seed": llm.Seed,
"stop": llm.Stop,
"image_data": imageData,
}
if predict.Format == "json" {
request["grammar"] = jsonGrammar
}
// Handling JSON marshaling with special characters unescaped.
buffer := &bytes.Buffer{}
enc := json.NewEncoder(buffer)
enc.SetEscapeHTML(false)
if err := enc.Encode(request); err != nil {
return fmt.Errorf("failed to marshal data: %v", err)
}
endpoint := fmt.Sprintf("http://127.0.0.1:%d/completion", llm.Port)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buffer)
if err != nil {
return fmt.Errorf("error creating POST request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("POST predict: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed reading llm error response: %w", err)
}
log.Printf("llm predict error: %s", bodyBytes)
return fmt.Errorf("%s", bodyBytes)
}
scanner := bufio.NewScanner(resp.Body)
// increase the buffer size to avoid running out of space
buf := make([]byte, 0, maxBufferSize)
scanner.Buffer(buf, maxBufferSize)
for scanner.Scan() {
select {
case <-ctx.Done():
// This handles the request cancellation
return ctx.Err()
default:
line := scanner.Bytes()
if len(line) == 0 {
continue
}
evt, ok := bytes.CutPrefix(line, []byte("data: "))
if !ok {
return fmt.Errorf("error parsing llm response stream: %s", line)
}
var p prediction
if err := json.Unmarshal(evt, &p); err != nil {
return fmt.Errorf("error unmarshaling llm prediction response: %v", err)
}
if p.Content != "" {
fn(PredictResult{
CreatedAt: time.Now().UTC(),
Content: p.Content,
})
}
if p.Stop {
fn(PredictResult{
CreatedAt: time.Now().UTC(),
TotalDuration: time.Since(predict.CheckpointStart),
Done: true,
PromptEvalCount: p.Timings.PromptN,
PromptEvalDuration: parseDurationMs(p.Timings.PromptMS),
EvalCount: p.Timings.PredictedN,
EvalDuration: parseDurationMs(p.Timings.PredictedMS),
})
return nil
}
}
}
if err := scanner.Err(); err != nil {
if strings.Contains(err.Error(), "unexpected EOF") {
// this means the llama runner subprocess crashed
llm.Close()
if llm.StatusWriter != nil && llm.StatusWriter.LastErrMsg != "" {
return fmt.Errorf("llama runner exited: %v", llm.StatusWriter.LastErrMsg)
}
return fmt.Errorf("llama runner exited, you may not have enough available memory to run this model")
}
return fmt.Errorf("error reading llm response: %v", err)
}
return nil
}
type TokenizeRequest struct {
Content string `json:"content"`
}
type TokenizeResponse struct {
Tokens []int `json:"tokens"`
}
func (llm *llama) Encode(ctx context.Context, prompt string) ([]int, error) {
endpoint := fmt.Sprintf("http://127.0.0.1:%d/tokenize", llm.Port)
data, err := json.Marshal(TokenizeRequest{Content: prompt})
if err != nil {
return nil, fmt.Errorf("marshaling encode data: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("encode request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do encode request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read encode request: %w", err)
}
if resp.StatusCode >= 400 {
log.Printf("llm encode error: %s", body)
return nil, fmt.Errorf("%s", body)
}
var encoded TokenizeResponse
if err := json.Unmarshal(body, &encoded); err != nil {
return nil, fmt.Errorf("unmarshal encode response: %w", err)
}
return encoded.Tokens, nil
}
type DetokenizeRequest struct {
Tokens []int `json:"tokens"`
}
type DetokenizeResponse struct {
Content string `json:"content"`
}
func (llm *llama) Decode(ctx context.Context, tokens []int) (string, error) {
if len(tokens) == 0 {
return "", nil
}
endpoint := fmt.Sprintf("http://127.0.0.1:%d/detokenize", llm.Port)
data, err := json.Marshal(DetokenizeRequest{Tokens: tokens})
if err != nil {
return "", fmt.Errorf("marshaling decode data: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewBuffer(data))
if err != nil {
return "", fmt.Errorf("decode request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("do decode request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read decode request: %w", err)
}
if resp.StatusCode >= 400 {
log.Printf("llm decode error: %s", body)
return "", fmt.Errorf("%s", body)
}
var decoded DetokenizeResponse
if err := json.Unmarshal(body, &decoded); err != nil {
return "", fmt.Errorf("unmarshal encode response: %w", err)
}
return decoded.Content, nil
}
type EmbeddingRequest struct {
Content string `json:"content"`
}
type EmbeddingResponse struct {
Embedding []float64 `json:"embedding"`
}
func (llm *llama) Embedding(ctx context.Context, input string) ([]float64, error) {
endpoint := fmt.Sprintf("http://127.0.0.1:%d/embedding", llm.Port)
data, err := json.Marshal(TokenizeRequest{Content: input})
if err != nil {
return nil, fmt.Errorf("error marshaling embed data: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("error creating embed request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("POST embedding: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading embed response: %w", err)
}
if resp.StatusCode >= 400 {
log.Printf("llm encode error: %s", body)
return nil, fmt.Errorf("%s", body)
}
var embedding EmbeddingResponse
if err := json.Unmarshal(body, &embedding); err != nil {
return nil, fmt.Errorf("unmarshal tokenize response: %w", err)
}
return embedding.Embedding, nil
}
// Ping checks that the server subprocess is still running and responding to requests
func (llm *llama) Ping(ctx context.Context) error {
resp, err := http.Head(fmt.Sprintf("http://127.0.0.1:%d", llm.Port))
if err != nil {
return fmt.Errorf("ping resp: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected ping status: %s", resp.Status)
}
return nil
}