diff --git a/server/upload.go b/server/upload.go index ddf4321d..aecddfbb 100644 --- a/server/upload.go +++ b/server/upload.go @@ -95,6 +95,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg size = fi.Size() - offset } + // set part.N to the current number of parts b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size}) offset += size } @@ -111,22 +112,22 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg return nil } +// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded +// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error. func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { - b.err = b.run(ctx, opts) -} - -func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error { defer blobUploadManager.Delete(b.Digest) ctx, b.CancelFunc = context.WithCancel(ctx) p, err := GetBlobsPath(b.Digest) if err != nil { - return err + b.err = err + return } f, err := os.Open(p) if err != nil { - return err + b.err = err + return } defer f.Close() @@ -134,30 +135,34 @@ func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error { g.SetLimit(numUploadParts) for i := range b.Parts { part := &b.Parts[i] - requestURL := <-b.nextURL - g.Go(func() error { - for try := 0; try < maxRetries; try++ { - r := io.NewSectionReader(f, part.Offset, part.Size) - err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts) - switch { - case errors.Is(err, context.Canceled): - return err - case errors.Is(err, errMaxRetriesExceeded): - return err - case err != nil: - log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err) - continue + select { + case <-inner.Done(): + case requestURL := <-b.nextURL: + g.Go(func() error { + for try := 0; try < maxRetries; try++ { + r := io.NewSectionReader(f, part.Offset, part.Size) + err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts) + switch { + case errors.Is(err, context.Canceled): + return err + case errors.Is(err, errMaxRetriesExceeded): + return err + case err != nil: + log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err) + continue + } + + return nil } - return nil - } - - return errMaxRetriesExceeded - }) + return errMaxRetriesExceeded + }) + } } if err := g.Wait(); err != nil { - return err + b.err = err + return } requestURL := <-b.nextURL @@ -172,12 +177,12 @@ func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error { resp, err := makeRequest(ctx, "PUT", requestURL, headers, nil, opts) if err != nil { - return err + b.err = err + return } defer resp.Body.Close() b.done = true - return nil } func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error { @@ -219,6 +224,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL for try := 0; try < maxRetries; try++ { rs.Seek(0, io.SeekStart) b.Completed.Add(-buw.written) + buw.written = 0 err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil) switch { case errors.Is(err, context.Canceled): @@ -253,6 +259,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL rs.Seek(0, io.SeekStart) b.Completed.Add(-buw.written) + buw.written = 0 return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body) }