This commit is contained in:
Michael Yang 2024-01-29 17:01:31 -08:00
parent bea007deb7
commit 6a4b994433
2 changed files with 5 additions and 8 deletions

View file

@ -188,7 +188,7 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *regis
continue continue
} }
g.Go(func() error { g.Go(inner, func() error {
var err error var err error
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
w := io.NewOffsetWriter(file, part.StartsAt()) w := io.NewOffsetWriter(file, part.StartsAt())
@ -238,7 +238,6 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *regis
} }
b.done = true b.done = true
return
} }
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error { func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error {
@ -411,7 +410,6 @@ func downloadBlob(ctx context.Context, opts downloadOpts) error {
type LimitGroup struct { type LimitGroup struct {
*errgroup.Group *errgroup.Group
context.Context
Semaphore *semaphore.Weighted Semaphore *semaphore.Weighted
weight, max_weight int64 weight, max_weight int64
@ -421,17 +419,16 @@ func NewLimitGroup(ctx context.Context, n int64) (*LimitGroup, context.Context)
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
return &LimitGroup{ return &LimitGroup{
Group: g, Group: g,
Context: ctx,
Semaphore: semaphore.NewWeighted(n), Semaphore: semaphore.NewWeighted(n),
weight: n, weight: n,
max_weight: n, max_weight: n,
}, ctx }, ctx
} }
func (g *LimitGroup) Go(fn func() error) { func (g *LimitGroup) Go(ctx context.Context, fn func() error) {
weight := g.weight weight := g.weight
g.Semaphore.Acquire(g.Context, weight) _ = g.Semaphore.Acquire(ctx, weight)
if g.Context.Err() != nil { if ctx.Err() != nil {
return return
} }

View file

@ -172,7 +172,7 @@ func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) {
select { select {
case <-inner.Done(): case <-inner.Done():
case requestURL := <-b.nextURL: case requestURL := <-b.nextURL:
g.Go(func() error { g.Go(inner, func() error {
var err error var err error
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
err = b.uploadPart(inner, http.MethodPatch, requestURL, part, opts) err = b.uploadPart(inner, http.MethodPatch, requestURL, part, opts)