diff --git a/server/download.go b/server/download.go index 4985fb53..d8a841e1 100644 --- a/server/download.go +++ b/server/download.go @@ -152,6 +152,36 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *regis _ = file.Truncate(b.Total) g, inner := NewLimitGroup(ctx, numDownloadParts) + + go func() { + ticker := time.NewTicker(time.Second) + var n int64 = 1 + var maxDelta float64 + var buckets []int64 + for { + select { + case <-ticker.C: + buckets = append(buckets, b.Completed.Load()) + if len(buckets) < 2 { + continue + } else if len(buckets) > 10 { + buckets = buckets[1:] + } + + delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets)) + slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta)))) + if delta > maxDelta*1.5 { + maxDelta = delta + g.SetLimit(n) + n++ + } + + case <-ctx.Done(): + return + } + } + }() + for i := range b.Parts { part := b.Parts[i] if part.Completed == part.Size { @@ -413,6 +443,7 @@ func (g *LimitGroup) Go(fn func() error) { func (g *LimitGroup) SetLimit(n int64) { if n > 0 { + slog.Debug(fmt.Sprintf("setting limit to %d", n)) g.weight = g.max_weight / n } }