From 3b49315f971dca74cc3109c9362e93e70812df8c Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Fri, 18 Aug 2023 10:03:29 -0700 Subject: [PATCH] retry on unauthorized chunk push The token printed for authorized requests has a lifetime of 1h. If an upload exceeds 1h, a chunk push will fail since the token is created on a "start upload" request. This replaces the Pipe with SectionReader which is simpler and implements Seek, a requirement for makeRequestWithRetry. This is slightly worse than using a Pipe since the progress update is directly tied to the chunk size instead of controlled separately. --- server/images.go | 69 +++++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 45 deletions(-) diff --git a/server/images.go b/server/images.go index 8e9c1a13..2c14ec8d 100644 --- a/server/images.go +++ b/server/images.go @@ -1181,66 +1181,45 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Lay } defer f.Close() - completed := 0 + var completed int64 chunkSize := 10 * 1024 * 1024 for { - r, w := io.Pipe() - defer r.Close() - - limit := completed + chunkSize - if chunkSize >= layer.Size-completed { - limit = layer.Size - chunkSize = layer.Size - completed + chunk := int64(layer.Size) - completed + if chunk > int64(chunkSize) { + chunk = int64(chunkSize) } - go func() { - defer w.Close() - for { - n, err := io.CopyN(w, f, 1024*1024) - if err != nil && !errors.Is(err, io.EOF) { - fn(api.ProgressResponse{ - Status: fmt.Sprintf("error copying pipe: %v", err), - Digest: layer.Digest, - Total: layer.Size, - Completed: completed, - }) - return - } - - completed += int(n) - - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: layer.Size, - Completed: completed, - }) - - if completed >= limit { - return - } - } - }() + sectionReader := io.NewSectionReader(f, int64(completed), chunk) headers := make(map[string]string) headers["Content-Type"] = "application/octet-stream" - headers["Content-Length"] = strconv.Itoa(chunkSize) - headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, limit-1) + headers["Content-Length"] = strconv.Itoa(int(chunk)) + headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, completed+sectionReader.Size()-1) + + resp, err := makeRequestWithRetry(ctx, "PATCH", url, headers, sectionReader, regOpts) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error uploading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(completed), + }) - resp, err := makeRequest(ctx, "PATCH", url, headers, r, regOpts) - if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode != http.StatusAccepted { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) - } + completed += sectionReader.Size() + fn(api.ProgressResponse{ + Status: fmt.Sprintf("uploading %s", layer.Digest), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(completed), + }) url = resp.Header.Get("Location") - if completed >= layer.Size { + if completed >= int64(layer.Size) { break } }