From a71ff3f6a2f22fe46c48562d29fac9d60c0881a2 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Tue, 1 Aug 2023 12:15:22 -0700 Subject: [PATCH] use a pipe to push to registry with progress switch to a monolithic upload instead of a chunk upload through a pipe to report progress --- server/images.go | 109 +++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 61 deletions(-) diff --git a/server/images.go b/server/images.go index 73448a36..f566c7dd 100644 --- a/server/images.go +++ b/server/images.go @@ -906,71 +906,58 @@ func uploadBlobChunked(mp ModelPath, url string, layer *Layer, regOpts *Registry return err } + totalUploaded := 0 + + r, w := io.Pipe() + defer r.Close() + + go func() { + defer w.Close() + for { + n, err := io.CopyN(w, f, 1024*1024) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error copying pipe: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: totalUploaded, + }) + return + } + + totalUploaded += int(n) + + fn(api.ProgressResponse{ + Status: fmt.Sprintf("uploading %s", layer.Digest), + Digest: layer.Digest, + Total: layer.Size, + Completed: totalUploaded, + }) + + if totalUploaded >= layer.Size { + return + } + } + }() + + url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) + headers := make(map[string]string) headers["Content-Type"] = "application/octet-stream" + headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1) + headers["Content-Length"] = strconv.Itoa(int(layer.Size)) - chunkSize := 1 << 20 - buf := make([]byte, chunkSize) - var totalUploaded int + // finish the upload + resp, err := makeRequest("PUT", url, headers, r, regOpts) + if err != nil { + log.Printf("couldn't finish upload: %v", err) + return err + } + defer resp.Body.Close() - for { - n, err := f.Read(buf) - if err != nil { - return err - } - - headers["Content-Length"] = fmt.Sprintf("%d", n) - headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1) - - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: int(layer.Size), - Completed: int(totalUploaded), - }) - - // change the buffersize for the last chunk - if n < chunkSize { - buf = buf[:n] - } - resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts) - if err != nil { - log.Printf("couldn't upload blob: %v", err) - return err - } - defer resp.Body.Close() - url = resp.Header.Get("Location") - - // Check for success: For a successful upload, the Docker registry will respond with a 201 Created - if resp.StatusCode != http.StatusAccepted { - fn(api.ProgressResponse{ - Status: "error uploading layer", - Digest: layer.Digest, - Total: int(layer.Size), - Completed: int(totalUploaded), - }) - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("on layer upload registry responded with code %d: %v", resp.StatusCode, string(body)) - } - - totalUploaded += n - if totalUploaded >= layer.Size { - url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) - - // finish the upload - resp, err := makeRequest("PUT", url, nil, nil, regOpts) - if err != nil { - log.Printf("couldn't finish upload: %v", err) - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusCreated { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) - } - break - } + if resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) } return nil }