From 865fceb73c7a8284d7bad3cace39fca977a25f3e Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Sat, 26 Aug 2023 08:28:35 -0700 Subject: [PATCH] chunked pipe --- server/upload.go | 102 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/server/upload.go b/server/upload.go index 455c8b33..aef688f5 100644 --- a/server/upload.go +++ b/server/upload.go @@ -55,49 +55,89 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l } defer f.Close() - var completed int64 // 95MB chunk size chunkSize := 95 * 1024 * 1024 - for { - chunk := int64(layer.Size) - completed + for offset := int64(0); offset < int64(layer.Size); { + chunk := int64(layer.Size) - offset if chunk > int64(chunkSize) { chunk = int64(chunkSize) } - sectionReader := io.NewSectionReader(f, int64(completed), chunk) + sectionReader := io.NewSectionReader(f, int64(offset), chunk) + for try := 0; try < MaxRetries; try++ { + r, w := io.Pipe() + defer r.Close() + go func() { + defer w.Close() - headers := make(http.Header) - headers.Set("Content-Type", "application/octet-stream") - headers.Set("Content-Length", strconv.Itoa(int(chunk))) - headers.Set("Content-Range", fmt.Sprintf("%d-%d", completed, completed+sectionReader.Size()-1)) - resp, err := makeRequestWithRetry(ctx, "PATCH", requestURL, headers, sectionReader, regOpts) - if err != nil && !errors.Is(err, io.EOF) { - fn(api.ProgressResponse{ - Status: fmt.Sprintf("error uploading chunk: %v", err), - Digest: layer.Digest, - Total: layer.Size, - Completed: int(completed), - }) + for chunked := int64(0); chunked < chunk; { + n, err := io.CopyN(w, sectionReader, 1024*1024) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error reading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset), + }) - return err - } - defer resp.Body.Close() + return + } - completed += sectionReader.Size() - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: layer.Size, - Completed: int(completed), - }) + chunked += n + fn(api.ProgressResponse{ + Status: fmt.Sprintf("uploading %s", layer.Digest), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset) + int(chunked), + }) + } + }() - requestURL, err = url.Parse(resp.Header.Get("Location")) - if err != nil { - return err - } + headers := make(http.Header) + headers.Set("Content-Type", "application/octet-stream") + headers.Set("Content-Length", strconv.Itoa(int(chunk))) + headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1)) + resp, err := makeRequest(ctx, "PATCH", requestURL, headers, r, regOpts) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error uploading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset), + }) + + return err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusAccepted, http.StatusCreated: + case http.StatusUnauthorized: + auth := resp.Header.Get("www-authenticate") + authRedir := ParseAuthRedirectString(auth) + token, err := getAuthToken(ctx, authRedir, regOpts) + if err != nil { + return err + } + + regOpts.Token = token + if _, err := sectionReader.Seek(0, io.SeekStart); err != nil { + return err + } + + continue + default: + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body) + } + + offset += sectionReader.Size() + requestURL, err = url.Parse(resp.Header.Get("Location")) + if err != nil { + return err + } - if completed >= int64(layer.Size) { break } }