From bea007deb7bf3ba013aa503c9d9e3f23074c76ac Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Fri, 26 Jan 2024 15:10:45 -0800 Subject: [PATCH] use LimitGroup for uploads --- server/upload.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/server/upload.go b/server/upload.go index 4da34052..b090721e 100644 --- a/server/upload.go +++ b/server/upload.go @@ -18,7 +18,6 @@ import ( "github.com/jmorganca/ollama/api" "github.com/jmorganca/ollama/format" - "golang.org/x/sync/errgroup" ) var blobUploadManager sync.Map @@ -137,8 +136,37 @@ func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) { } defer b.file.Close() - g, inner := errgroup.WithContext(ctx) - g.SetLimit(numUploadParts) + g, inner := NewLimitGroup(ctx, numUploadParts) + + go func() { + ticker := time.NewTicker(time.Second) + var n int64 = 1 + var maxDelta float64 + var buckets []int64 + for { + select { + case <-ticker.C: + buckets = append(buckets, b.Completed.Load()) + if len(buckets) < 2 { + continue + } else if len(buckets) > 10 { + buckets = buckets[1:] + } + + delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets)) + slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta)))) + if delta > maxDelta*1.5 { + maxDelta = delta + g.SetLimit(n) + n++ + } + + case <-ctx.Done(): + return + } + } + }() + for i := range b.Parts { part := &b.Parts[i] select {