From 72266c7684842b9720a4facf186a012312f506d9 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Fri, 25 Aug 2023 15:38:39 -0700 Subject: [PATCH 1/4] bump chunk size to 95MB --- server/upload.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/upload.go b/server/upload.go index 7a1b0046..455c8b33 100644 --- a/server/upload.go +++ b/server/upload.go @@ -56,7 +56,8 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l defer f.Close() var completed int64 - chunkSize := 10 * 1024 * 1024 + // 95MB chunk size + chunkSize := 95 * 1024 * 1024 for { chunk := int64(layer.Size) - completed From 865fceb73c7a8284d7bad3cace39fca977a25f3e Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Sat, 26 Aug 2023 08:28:35 -0700 Subject: [PATCH 2/4] chunked pipe --- server/upload.go | 102 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/server/upload.go b/server/upload.go index 455c8b33..aef688f5 100644 --- a/server/upload.go +++ b/server/upload.go @@ -55,49 +55,89 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l } defer f.Close() - var completed int64 // 95MB chunk size chunkSize := 95 * 1024 * 1024 - for { - chunk := int64(layer.Size) - completed + for offset := int64(0); offset < int64(layer.Size); { + chunk := int64(layer.Size) - offset if chunk > int64(chunkSize) { chunk = int64(chunkSize) } - sectionReader := io.NewSectionReader(f, int64(completed), chunk) + sectionReader := io.NewSectionReader(f, int64(offset), chunk) + for try := 0; try < MaxRetries; try++ { + r, w := io.Pipe() + defer r.Close() + go func() { + defer w.Close() - headers := make(http.Header) - headers.Set("Content-Type", "application/octet-stream") - headers.Set("Content-Length", strconv.Itoa(int(chunk))) - headers.Set("Content-Range", fmt.Sprintf("%d-%d", completed, completed+sectionReader.Size()-1)) - resp, err := makeRequestWithRetry(ctx, "PATCH", requestURL, headers, sectionReader, regOpts) - if err != nil && !errors.Is(err, io.EOF) { - fn(api.ProgressResponse{ - Status: fmt.Sprintf("error uploading chunk: %v", err), - Digest: layer.Digest, - Total: layer.Size, - Completed: int(completed), - }) + for chunked := int64(0); chunked < chunk; { + n, err := io.CopyN(w, sectionReader, 1024*1024) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error reading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset), + }) - return err - } - defer resp.Body.Close() + return + } - completed += sectionReader.Size() - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: layer.Size, - Completed: int(completed), - }) + chunked += n + fn(api.ProgressResponse{ + Status: fmt.Sprintf("uploading %s", layer.Digest), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset) + int(chunked), + }) + } + }() - requestURL, err = url.Parse(resp.Header.Get("Location")) - if err != nil { - return err - } + headers := make(http.Header) + headers.Set("Content-Type", "application/octet-stream") + headers.Set("Content-Length", strconv.Itoa(int(chunk))) + headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1)) + resp, err := makeRequest(ctx, "PATCH", requestURL, headers, r, regOpts) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error uploading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset), + }) + + return err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusAccepted, http.StatusCreated: + case http.StatusUnauthorized: + auth := resp.Header.Get("www-authenticate") + authRedir := ParseAuthRedirectString(auth) + token, err := getAuthToken(ctx, authRedir, regOpts) + if err != nil { + return err + } + + regOpts.Token = token + if _, err := sectionReader.Seek(0, io.SeekStart); err != nil { + return err + } + + continue + default: + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body) + } + + offset += sectionReader.Size() + requestURL, err = url.Parse(resp.Header.Get("Location")) + if err != nil { + return err + } - if completed >= int64(layer.Size) { break } } From 246dc654173d011dc5506b97715e61fe0fd17634 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Sat, 26 Aug 2023 21:55:21 -0700 Subject: [PATCH 3/4] loosen http status code checks --- api/client.go | 4 ++-- server/auth.go | 2 +- server/download.go | 2 +- server/images.go | 16 ++++++++-------- server/upload.go | 9 ++++----- 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/api/client.go b/api/client.go index 480022c0..dc69f689 100644 --- a/api/client.go +++ b/api/client.go @@ -29,7 +29,7 @@ type Client struct { } func checkError(resp *http.Response, body []byte) error { - if resp.StatusCode >= 200 && resp.StatusCode < 400 { + if resp.StatusCode < http.StatusBadRequest { return nil } @@ -165,7 +165,7 @@ func (c *Client) stream(ctx context.Context, method, path string, data any, fn f return fmt.Errorf(errorResponse.Error) } - if response.StatusCode >= 400 { + if response.StatusCode >= http.StatusBadRequest { return StatusError{ StatusCode: response.StatusCode, Status: response.Status, diff --git a/server/auth.go b/server/auth.go index be8ac7d0..3e35178f 100644 --- a/server/auth.go +++ b/server/auth.go @@ -109,7 +109,7 @@ func getAuthToken(ctx context.Context, redirData AuthRedirect, regOpts *Registry } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { + if resp.StatusCode >= http.StatusBadRequest { body, _ := io.ReadAll(resp.Body) return "", fmt.Errorf("on pull registry responded with code %d: %s", resp.StatusCode, body) } diff --git a/server/download.go b/server/download.go index 67dc46ed..97f24da3 100644 --- a/server/download.go +++ b/server/download.go @@ -168,7 +168,7 @@ func doDownload(ctx context.Context, opts downloadOpts, f *FileDownload) error { } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + if resp.StatusCode >= http.StatusBadRequest { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("%w: on download registry responded with code %d: %v", errDownload, resp.StatusCode, string(body)) } diff --git a/server/images.go b/server/images.go index f847e09e..9fc2be94 100644 --- a/server/images.go +++ b/server/images.go @@ -1086,11 +1086,11 @@ func pullModelManifest(ctx context.Context, mp ModelPath, regOpts *RegistryOptio } defer resp.Body.Close() - // Check for success: For a successful upload, the Docker registry will respond with a 201 Created - if resp.StatusCode != http.StatusOK { + if resp.StatusCode >= http.StatusBadRequest { if resp.StatusCode == http.StatusNotFound { return nil, fmt.Errorf("model not found") } + body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("on pull registry responded with code %d: %s", resp.StatusCode, body) } @@ -1151,7 +1151,7 @@ func checkBlobExistence(ctx context.Context, mp ModelPath, digest string, regOpt defer resp.Body.Close() // Check for success: If the blob exists, the Docker registry will respond with a 200 OK - return resp.StatusCode == http.StatusOK, nil + return resp.StatusCode < http.StatusBadRequest, nil } func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.ReadSeeker, regOpts *RegistryOptions) (*http.Response, error) { @@ -1165,10 +1165,8 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR status = resp.Status - switch resp.StatusCode { - case http.StatusAccepted, http.StatusCreated: - return resp, nil - case http.StatusUnauthorized: + switch { + case resp.StatusCode == http.StatusUnauthorized: auth := resp.Header.Get("www-authenticate") authRedir := ParseAuthRedirectString(auth) token, err := getAuthToken(ctx, authRedir, regOpts) @@ -1184,9 +1182,11 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR } continue - default: + case resp.StatusCode >= http.StatusBadRequest: body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body) + default: + return resp, nil } } diff --git a/server/upload.go b/server/upload.go index aef688f5..e1d8a17d 100644 --- a/server/upload.go +++ b/server/upload.go @@ -111,9 +111,8 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l } defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusAccepted, http.StatusCreated: - case http.StatusUnauthorized: + switch { + case resp.StatusCode == http.StatusUnauthorized: auth := resp.Header.Get("www-authenticate") authRedir := ParseAuthRedirectString(auth) token, err := getAuthToken(ctx, authRedir, regOpts) @@ -127,7 +126,7 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l } continue - default: + case resp.StatusCode >= http.StatusBadRequest: body, _ := io.ReadAll(resp.Body) return fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body) } @@ -158,7 +157,7 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l } defer resp.Body.Close() - if resp.StatusCode != http.StatusCreated { + if resp.StatusCode >= http.StatusBadRequest { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) } From 16b06699fd83b182a19ccf13078583120a55d714 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Mon, 28 Aug 2023 18:35:18 -0400 Subject: [PATCH 4/4] remove unused parameter --- server/images.go | 2 +- server/upload.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/images.go b/server/images.go index 9fc2be94..651411e2 100644 --- a/server/images.go +++ b/server/images.go @@ -974,7 +974,7 @@ func PushModel(ctx context.Context, name string, regOpts *RegistryOptions, fn fu continue } - if err := uploadBlobChunked(ctx, mp, location, layer, regOpts, fn); err != nil { + if err := uploadBlobChunked(ctx, location, layer, regOpts, fn); err != nil { log.Printf("error uploading blob: %v", err) return err } diff --git a/server/upload.go b/server/upload.go index e1d8a17d..cf51eea6 100644 --- a/server/upload.go +++ b/server/upload.go @@ -40,7 +40,7 @@ func startUpload(ctx context.Context, mp ModelPath, layer *Layer, regOpts *Regis return url.Parse(location) } -func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { +func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { // TODO allow resumability // TODO allow canceling uploads via DELETE