From b599946b74f4d3226325e06f4c412274475775c6 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 11 Oct 2023 10:55:07 -0700 Subject: [PATCH 1/4] add format bytes --- api/client.go | 2 +- format/bytes.go | 16 ++++++++++++++++ llm/llama.go | 2 +- llm/llm.go | 36 ++++++++++++++++++------------------ 4 files changed, 36 insertions(+), 20 deletions(-) create mode 100644 format/bytes.go diff --git a/api/client.go b/api/client.go index 4a5b97c9..f308b233 100644 --- a/api/client.go +++ b/api/client.go @@ -127,7 +127,7 @@ func (c *Client) do(ctx context.Context, method, path string, reqData, respData return nil } -const maxBufferSize = 512 * 1024 // 512KB +const maxBufferSize = 512 * 1000 // 512KB func (c *Client) stream(ctx context.Context, method, path string, data any, fn func([]byte) error) error { var buf *bytes.Buffer diff --git a/format/bytes.go b/format/bytes.go new file mode 100644 index 00000000..63cc7b00 --- /dev/null +++ b/format/bytes.go @@ -0,0 +1,16 @@ +package format + +import "fmt" + +func HumanBytes(b int64) string { + switch { + case b > 1000*1000*1000: + return fmt.Sprintf("%d GB", b/1000/1000/1000) + case b > 1000*1000: + return fmt.Sprintf("%d MB", b/1000/1000) + case b > 1000: + return fmt.Sprintf("%d KB", b/1000) + default: + return fmt.Sprintf("%d B", b) + } +} diff --git a/llm/llama.go b/llm/llama.go index 8bd11f53..33468cba 100644 --- a/llm/llama.go +++ b/llm/llama.go @@ -454,7 +454,7 @@ type PredictRequest struct { Stop []string `json:"stop,omitempty"` } -const maxBufferSize = 512 * 1024 // 512KB +const maxBufferSize = 512 * 1000 // 512KB func (llm *llama) Predict(ctx context.Context, prevContext []int, prompt string, fn func(api.GenerateResponse)) error { prevConvo, err := llm.Decode(ctx, prevContext) diff --git a/llm/llm.go b/llm/llm.go index ef424b5d..6df2a47c 100644 --- a/llm/llm.go +++ b/llm/llm.go @@ -60,33 +60,33 @@ func New(workDir, model string, adapters []string, opts api.Options) (LLM, error totalResidentMemory := memory.TotalMemory() switch ggml.ModelType() { case "3B", "7B": - if ggml.FileType() == "F16" && totalResidentMemory < 16*1024*1024 { - return nil, fmt.Errorf("F16 model requires at least 16GB of memory") - } else if totalResidentMemory < 8*1024*1024 { - return nil, fmt.Errorf("model requires at least 8GB of memory") + if ggml.FileType() == "F16" && totalResidentMemory < 16*1000*1000 { + return nil, fmt.Errorf("F16 model requires at least 16 GB of memory") + } else if totalResidentMemory < 8*1000*1000 { + return nil, fmt.Errorf("model requires at least 8 GB of memory") } case "13B": - if ggml.FileType() == "F16" && totalResidentMemory < 32*1024*1024 { - return nil, fmt.Errorf("F16 model requires at least 32GB of memory") - } else if totalResidentMemory < 16*1024*1024 { - return nil, fmt.Errorf("model requires at least 16GB of memory") + if ggml.FileType() == "F16" && totalResidentMemory < 32*1000*1000 { + return nil, fmt.Errorf("F16 model requires at least 32 GB of memory") + } else if totalResidentMemory < 16*1000*1000 { + return nil, fmt.Errorf("model requires at least 16 GB of memory") } case "30B", "34B", "40B": - if ggml.FileType() == "F16" && totalResidentMemory < 64*1024*1024 { - return nil, fmt.Errorf("F16 model requires at least 64GB of memory") - } else if totalResidentMemory < 32*1024*1024 { - return nil, fmt.Errorf("model requires at least 32GB of memory") + if ggml.FileType() == "F16" && totalResidentMemory < 64*1000*1000 { + return nil, fmt.Errorf("F16 model requires at least 64 GB of memory") + } else if totalResidentMemory < 32*1000*1000 { + return nil, fmt.Errorf("model requires at least 32 GB of memory") } case "65B", "70B": - if ggml.FileType() == "F16" && totalResidentMemory < 128*1024*1024 { - return nil, fmt.Errorf("F16 model requires at least 128GB of memory") - } else if totalResidentMemory < 64*1024*1024 { - return nil, fmt.Errorf("model requires at least 64GB of memory") + if ggml.FileType() == "F16" && totalResidentMemory < 128*1000*1000 { + return nil, fmt.Errorf("F16 model requires at least 128 GB of memory") + } else if totalResidentMemory < 64*1000*1000 { + return nil, fmt.Errorf("model requires at least 64 GB of memory") } case "180B": - if ggml.FileType() == "F16" && totalResidentMemory < 512*1024*1024 { + if ggml.FileType() == "F16" && totalResidentMemory < 512*1000*1000 { return nil, fmt.Errorf("F16 model requires at least 512GB of memory") - } else if totalResidentMemory < 128*1024*1024 { + } else if totalResidentMemory < 128*1000*1000 { return nil, fmt.Errorf("model requires at least 128GB of memory") } } From a2055a1e9397e0bff38314ec4976f56295ba4114 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Mon, 9 Oct 2023 10:14:49 -0700 Subject: [PATCH 2/4] update download --- server/download.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/server/download.go b/server/download.go index 80914ae3..dd7753b3 100644 --- a/server/download.go +++ b/server/download.go @@ -30,6 +30,7 @@ type blobDownload struct { Total int64 Completed atomic.Int64 + done bool Parts []*blobDownloadPart @@ -124,7 +125,7 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *Regis file.Truncate(b.Total) - g, ctx := errgroup.WithContext(ctx) + g, _ := errgroup.WithContext(ctx) // TODO(mxyng): download concurrency should be configurable g.SetLimit(64) for i := range b.Parts { @@ -168,7 +169,12 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *Regis } } - return os.Rename(file.Name(), b.Name) + if err := os.Rename(file.Name(), b.Name); err != nil { + return err + } + + b.done = true + return nil } func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error { @@ -267,11 +273,8 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) Completed: b.Completed.Load(), }) - if b.Completed.Load() >= b.Total { - // wait for the file to get renamed - if _, err := os.Stat(b.Name); err == nil { - return nil - } + if b.done { + return nil } } } From 630bb75d2acd2ae6bc5182e2f53dc8b27b7877b3 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Tue, 10 Oct 2023 17:22:44 -0700 Subject: [PATCH 3/4] dynamically size download parts based on file size --- server/download.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/server/download.go b/server/download.go index dd7753b3..531219fd 100644 --- a/server/download.go +++ b/server/download.go @@ -20,6 +20,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/jmorganca/ollama/api" + "github.com/jmorganca/ollama/format" ) var blobDownloadManager sync.Map @@ -47,6 +48,12 @@ type blobDownloadPart struct { *blobDownload `json:"-"` } +const ( + numDownloadParts = 64 + minDownloadPartSize int64 = 32 * 1000 * 1000 + maxDownloadPartSize int64 = 256 * 1000 * 1000 +) + func (p *blobDownloadPart) Name() string { return strings.Join([]string{ p.blobDownload.Name, "partial", strconv.Itoa(p.N), @@ -92,9 +99,15 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *R b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) - var offset int64 - var size int64 = 64 * 1024 * 1024 + var size = b.Total / numDownloadParts + switch { + case size < minDownloadPartSize: + size = minDownloadPartSize + case size > maxDownloadPartSize: + size = maxDownloadPartSize + } + var offset int64 for offset < b.Total { if offset+size > b.Total { size = b.Total - offset @@ -108,7 +121,7 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *R } } - log.Printf("downloading %s in %d part(s)", b.Digest[7:19], len(b.Parts)) + log.Printf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)) return nil } @@ -126,8 +139,7 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *Regis file.Truncate(b.Total) g, _ := errgroup.WithContext(ctx) - // TODO(mxyng): download concurrency should be configurable - g.SetLimit(64) + g.SetLimit(numDownloadParts) for i := range b.Parts { part := b.Parts[i] if part.Completed == part.Size { From c413a5509313b6c9800d60ecf926a368f141d42b Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 11 Oct 2023 13:49:01 -0700 Subject: [PATCH 4/4] download: handle inner errors --- server/download.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/server/download.go b/server/download.go index 531219fd..0c3beb7e 100644 --- a/server/download.go +++ b/server/download.go @@ -31,11 +31,13 @@ type blobDownload struct { Total int64 Completed atomic.Int64 - done bool Parts []*blobDownloadPart context.CancelFunc + + done bool + err error references atomic.Int32 } @@ -125,7 +127,11 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *R return nil } -func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) (err error) { +func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) { + b.err = b.run(ctx, requestURL, opts) +} + +func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error { defer blobDownloadManager.Delete(b.Digest) ctx, b.CancelFunc = context.WithCancel(ctx) @@ -285,8 +291,8 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) Completed: b.Completed.Load(), }) - if b.done { - return nil + if b.done || b.err != nil { + return b.err } } }