This commit is contained in:
Michael Yang 2024-01-29 17:16:37 -08:00
parent 6a4b994433
commit 084d846621
2 changed files with 69 additions and 75 deletions

View file

@ -151,36 +151,9 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *regis
_ = file.Truncate(b.Total) _ = file.Truncate(b.Total)
g, inner := NewLimitGroup(ctx, numDownloadParts) var limit int64 = 2
g, inner := NewLimitGroup(ctx, numDownloadParts, limit)
go func() { go watchDelta(inner, g, &b.Completed, limit)
ticker := time.NewTicker(time.Second)
var n int64 = 1
var maxDelta float64
var buckets []int64
for {
select {
case <-ticker.C:
buckets = append(buckets, b.Completed.Load())
if len(buckets) < 2 {
continue
} else if len(buckets) > 10 {
buckets = buckets[1:]
}
delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta))))
if delta > maxDelta*1.5 {
maxDelta = delta
g.SetLimit(n)
n++
}
case <-ctx.Done():
return
}
}
}()
for i := range b.Parts { for i := range b.Parts {
part := b.Parts[i] part := b.Parts[i]
@ -410,37 +383,84 @@ func downloadBlob(ctx context.Context, opts downloadOpts) error {
type LimitGroup struct { type LimitGroup struct {
*errgroup.Group *errgroup.Group
Semaphore *semaphore.Weighted *semaphore.Weighted
size, limit int64
weight, max_weight int64
} }
func NewLimitGroup(ctx context.Context, n int64) (*LimitGroup, context.Context) { func NewLimitGroup(ctx context.Context, size, limit int64) (*LimitGroup, context.Context) {
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
return &LimitGroup{ return &LimitGroup{
Group: g, Group: g,
Semaphore: semaphore.NewWeighted(n), Weighted: semaphore.NewWeighted(size),
weight: n, size: size,
max_weight: n, limit: limit,
}, ctx }, ctx
} }
func (g *LimitGroup) Go(ctx context.Context, fn func() error) { func (g *LimitGroup) Go(ctx context.Context, fn func() error) {
weight := g.weight var weight int64 = 1
_ = g.Semaphore.Acquire(ctx, weight) if g.limit > 0 {
weight = g.size / g.limit
}
_ = g.Acquire(ctx, weight)
if ctx.Err() != nil { if ctx.Err() != nil {
return return
} }
g.Group.Go(func() error { g.Group.Go(func() error {
defer g.Semaphore.Release(weight) defer g.Release(weight)
return fn() return fn()
}) })
} }
func (g *LimitGroup) SetLimit(n int64) { func (g *LimitGroup) SetLimit(limit int64) {
if n > 0 { if limit > g.limit {
slog.Debug(fmt.Sprintf("setting limit to %d", n)) g.limit = limit
g.weight = g.max_weight / n }
}
func watchDelta(ctx context.Context, g *LimitGroup, c *atomic.Int64, limit int64) {
var maxDelta float64
var buckets []int64
// 5s ramp up period
nextUpdate := time.Now().Add(5 * time.Second)
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
buckets = append(buckets, c.Load())
if len(buckets) < 2 {
continue
} else if len(buckets) > 10 {
buckets = buckets[1:]
}
delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
slog.Debug("", "limit", limit, "delta", format.HumanBytes(int64(delta)), "max_delta", format.HumanBytes(int64(maxDelta)))
if time.Now().Before(nextUpdate) {
// quiet period; do not update ccy if recently updated
continue
} else if maxDelta > 0 {
x := delta / maxDelta
if x < 1.2 {
continue
}
limit += int64(x)
slog.Debug("setting", "limit", limit)
g.SetLimit(limit)
}
// 3s cooldown period
nextUpdate = time.Now().Add(3 * time.Second)
maxDelta = delta
case <-ctx.Done():
return
}
} }
} }

View file

@ -136,41 +136,15 @@ func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) {
} }
defer b.file.Close() defer b.file.Close()
g, inner := NewLimitGroup(ctx, numUploadParts) var limit int64 = 2
g, inner := NewLimitGroup(ctx, numUploadParts, limit)
go func() { go watchDelta(inner, g, &b.Completed, limit)
ticker := time.NewTicker(time.Second)
var n int64 = 1
var maxDelta float64
var buckets []int64
for {
select {
case <-ticker.C:
buckets = append(buckets, b.Completed.Load())
if len(buckets) < 2 {
continue
} else if len(buckets) > 10 {
buckets = buckets[1:]
}
delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta))))
if delta > maxDelta*1.5 {
maxDelta = delta
g.SetLimit(n)
n++
}
case <-ctx.Done():
return
}
}
}()
for i := range b.Parts { for i := range b.Parts {
part := &b.Parts[i] part := &b.Parts[i]
select { select {
case <-inner.Done(): case <-inner.Done():
break
case requestURL := <-b.nextURL: case requestURL := <-b.nextURL:
g.Go(inner, func() error { g.Go(inner, func() error {
var err error var err error