From 2799784ac8d0d0e8542d2484513d1e101cb77dbe Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Tue, 21 Nov 2023 12:12:04 -0800 Subject: [PATCH 1/4] revert checksum calculation to calculate-as-you-go --- server/upload.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/server/upload.go b/server/upload.go index 04cd5ac0..db0cedb9 100644 --- a/server/upload.go +++ b/server/upload.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "errors" "fmt" + "hash" "io" "log" "math" @@ -102,7 +103,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg } // set part.N to the current number of parts - b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size}) + b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()}) offset += size } @@ -147,14 +148,13 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { g.Go(func() error { var err error for try := 0; try < maxRetries; try++ { - err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) + err = b.uploadPart(inner, http.MethodPatch, requestURL, part, opts) switch { case errors.Is(err, context.Canceled): return err case errors.Is(err, errMaxRetriesExceeded): return err case err != nil: - part.Reset() sleep := time.Second * time.Duration(math.Pow(2, float64(try))) log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep) time.Sleep(sleep) @@ -176,17 +176,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { requestURL := <-b.nextURL - var sb strings.Builder - // calculate md5 checksum and add it to the commit request + var sb strings.Builder for _, part := range b.Parts { - hash := md5.New() - if _, err := io.Copy(hash, io.NewSectionReader(b.file, part.Offset, part.Size)); err != nil { - b.err = err - return - } - - sb.Write(hash.Sum(nil)) + sb.Write(part.Sum(nil)) } md5sum := md5.Sum([]byte(sb.String())) @@ -221,7 +214,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { } } -func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { +func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { + // reset the part here to ensure alignment + part.Reset() + headers := make(http.Header) headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) @@ -232,7 +228,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL } sr := io.NewSectionReader(b.file, part.Offset, part.Size) - resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, part), opts) + resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(part, part.Hash)), opts) if err != nil { return err } @@ -259,14 +255,13 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL // retry uploading to the redirect URL for try := 0; try < maxRetries; try++ { - err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil) + err = b.uploadPart(ctx, http.MethodPut, redirectURL, part, nil) switch { case errors.Is(err, context.Canceled): return err case errors.Is(err, errMaxRetriesExceeded): return err case err != nil: - part.Reset() sleep := time.Second * time.Duration(math.Pow(2, float64(try))) log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep) time.Sleep(sleep) @@ -345,7 +340,10 @@ type blobUploadPart struct { Offset int64 Size int64 written int64 + *blobUpload + + hash.Hash } func (p *blobUploadPart) Write(b []byte) (n int, err error) { @@ -356,6 +354,7 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) { } func (p *blobUploadPart) Reset() { + p.Hash.Reset() p.Completed.Add(-int64(p.written)) p.written = 0 } From 26c63418e029bb5b6dac75ca6148582788b0b7ce Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 29 Nov 2023 14:52:12 -0800 Subject: [PATCH 2/4] new hasher --- server/upload.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/upload.go b/server/upload.go index db0cedb9..1ed95bfa 100644 --- a/server/upload.go +++ b/server/upload.go @@ -103,7 +103,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg } // set part.N to the current number of parts - b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()}) + b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size}) offset += size } @@ -227,8 +227,9 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) } + md5sum := md5.New() sr := io.NewSectionReader(b.file, part.Offset, part.Size) - resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(part, part.Hash)), opts) + resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(part, md5sum)), opts) if err != nil { return err } @@ -296,6 +297,7 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * b.nextURL <- nextURL } + part.Hash = md5sum return nil } @@ -354,7 +356,6 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) { } func (p *blobUploadPart) Reset() { - p.Hash.Reset() p.Completed.Add(-int64(p.written)) p.written = 0 } From c4bdfffd96da9fc4b0f7c97d2a79f5497469539b Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 29 Nov 2023 15:04:23 -0800 Subject: [PATCH 3/4] upload: separate progress tracking --- server/upload.go | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/server/upload.go b/server/upload.go index 1ed95bfa..6ed3a9c5 100644 --- a/server/upload.go +++ b/server/upload.go @@ -103,7 +103,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg } // set part.N to the current number of parts - b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size}) + b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size}) offset += size } @@ -215,9 +215,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { } func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { - // reset the part here to ensure alignment - part.Reset() - headers := make(http.Header) headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) @@ -227,10 +224,14 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) } - md5sum := md5.New() sr := io.NewSectionReader(b.file, part.Offset, part.Size) - resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(part, md5sum)), opts) + + md5sum := md5.New() + w := &progressWriter{blobUpload: b} + + resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(w, md5sum)), opts) if err != nil { + w.Rollback() return err } defer resp.Body.Close() @@ -242,11 +243,13 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * nextURL, err := url.Parse(location) if err != nil { + w.Rollback() return err } switch { case resp.StatusCode == http.StatusTemporaryRedirect: + w.Rollback() b.nextURL <- nextURL redirectURL, err := resp.Location() @@ -275,6 +278,7 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err) case resp.StatusCode == http.StatusUnauthorized: + w.Rollback() auth := resp.Header.Get("www-authenticate") authRedir := ParseAuthRedirectString(auth) token, err := getAuthToken(ctx, authRedir) @@ -285,6 +289,7 @@ func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL * opts.Token = token fallthrough case resp.StatusCode >= http.StatusBadRequest: + w.Rollback() body, err := io.ReadAll(resp.Body) if err != nil { return err @@ -338,25 +343,26 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er type blobUploadPart struct { // N is the part number - N int - Offset int64 - Size int64 - written int64 - - *blobUpload - + N int + Offset int64 + Size int64 hash.Hash } -func (p *blobUploadPart) Write(b []byte) (n int, err error) { +type progressWriter struct { + written int64 + *blobUpload +} + +func (p *progressWriter) Write(b []byte) (n int, err error) { n = len(b) p.written += int64(n) p.Completed.Add(int64(n)) return n, nil } -func (p *blobUploadPart) Reset() { - p.Completed.Add(-int64(p.written)) +func (p *progressWriter) Rollback() { + p.Completed.Add(-p.written) p.written = 0 } From 13efd5f2188f9e43600b9efef14fce21995b7a51 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 29 Nov 2023 15:18:53 -0800 Subject: [PATCH 4/4] upload: fix PUT retry --- server/upload.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/server/upload.go b/server/upload.go index 6ed3a9c5..721c5e7c 100644 --- a/server/upload.go +++ b/server/upload.go @@ -194,24 +194,22 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { headers.Set("Content-Length", "0") for try := 0; try < maxRetries; try++ { - resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts) - if err != nil { - b.err = err - if errors.Is(err, context.Canceled) { - return - } - + var resp *http.Response + resp, err = makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts) + if errors.Is(err, context.Canceled) { + break + } else if err != nil { sleep := time.Second * time.Duration(math.Pow(2, float64(try))) log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep) time.Sleep(sleep) continue } defer resp.Body.Close() - - b.err = nil - b.done = true - return + break } + + b.err = err + b.done = true } func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {