From 88c55199f8f75ca39a84c4df2058e8e3cf73731e Mon Sep 17 00:00:00 2001 From: Patrick Devine Date: Sat, 22 Jul 2023 17:31:26 -0700 Subject: [PATCH] change push to chunked uploads from monolithic (#179) --- cmd/cmd.go | 18 ++++++- server/images.go | 128 ++++++++++++++++++++++++++++------------------- 2 files changed, 94 insertions(+), 52 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index a40f68b2..f94e3964 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -94,9 +94,25 @@ func PushHandler(cmd *cobra.Command, args []string) error { return err } + var currentDigest string + var bar *progressbar.ProgressBar + request := api.PushRequest{Name: args[0], Insecure: insecure} fn := func(resp api.ProgressResponse) error { - fmt.Println(resp.Status) + if resp.Digest != currentDigest && resp.Digest != "" { + currentDigest = resp.Digest + bar = progressbar.DefaultBytes( + int64(resp.Total), + fmt.Sprintf("pushing %s...", resp.Digest[7:19]), + ) + + bar.Set(resp.Completed) + } else if resp.Digest == currentDigest && resp.Digest != "" { + bar.Set(resp.Completed) + } else { + currentDigest = "" + fmt.Println(resp.Status) + } return nil } diff --git a/server/images.go b/server/images.go index 8b52a3e6..c0b3996e 100644 --- a/server/images.go +++ b/server/images.go @@ -582,14 +582,10 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon } var layers []*Layer - var total int - var completed int for _, layer := range manifest.Layers { layers = append(layers, layer) - total += layer.Size } layers = append(layers, &manifest.Config) - total += manifest.Config.Size for _, layer := range layers { exists, err := checkBlobExistence(mp, layer.Digest, regOpts) @@ -598,21 +594,20 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon } if exists { - completed += layer.Size fn(api.ProgressResponse{ Status: "using existing layer", Digest: layer.Digest, - Total: total, - Completed: completed, + Total: layer.Size, + Completed: layer.Size, }) + log.Printf("Layer %s already exists", layer.Digest) continue } fn(api.ProgressResponse{ - Status: "starting upload", - Digest: layer.Digest, - Total: total, - Completed: completed, + Status: "starting upload", + Digest: layer.Digest, + Total: layer.Size, }) location, err := startUpload(mp, regOpts) @@ -621,25 +616,14 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon return err } - err = uploadBlob(location, layer, regOpts) + err = uploadBlobChunked(mp, location, layer, regOpts, fn) if err != nil { log.Printf("error uploading blob: %v", err) return err } - completed += layer.Size - fn(api.ProgressResponse{ - Status: "upload complete", - Digest: layer.Digest, - Total: total, - Completed: completed, - }) } - fn(api.ProgressResponse{ - Status: "pushing manifest", - Total: total, - Completed: completed, - }) + fn(api.ProgressResponse{Status: "pushing manifest"}) url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag) headers := map[string]string{ "Content-Type": "application/vnd.docker.distribution.manifest.v2+json", @@ -662,11 +646,7 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) } - fn(api.ProgressResponse{ - Status: "success", - Total: total, - Completed: completed, - }) + fn(api.ProgressResponse{Status: "success"}) return nil } @@ -828,19 +808,14 @@ func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) ( return resp.StatusCode == http.StatusOK, nil } -func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { - // Create URL - url := fmt.Sprintf("%s&digest=%s", location, layer.Digest) - - headers := make(map[string]string) - headers["Content-Length"] = fmt.Sprintf("%d", layer.Size) - headers["Content-Type"] = "application/octet-stream" - - // TODO change from monolithic uploads to chunked uploads +func uploadBlobChunked(mp ModelPath, location string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { // TODO allow resumability // TODO allow canceling uploads via DELETE // TODO allow cross repo blob mount + // Create URL + url := fmt.Sprintf("%s", location) + fp, err := GetBlobsPath(layer.Digest) if err != nil { return err @@ -851,19 +826,72 @@ func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { return err } - resp, err := makeRequest("PUT", url, headers, f, regOpts) - if err != nil { - log.Printf("couldn't upload blob: %v", err) - return err - } - defer resp.Body.Close() + headers := make(map[string]string) + headers["Content-Type"] = "application/octet-stream" - // Check for success: For a successful upload, the Docker registry will respond with a 201 Created - if resp.StatusCode != http.StatusCreated { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) - } + chunkSize := 1 << 20 + buf := make([]byte, chunkSize) + var totalUploaded int + for { + n, err := f.Read(buf) + if err != nil { + return err + } + + headers["Content-Length"] = fmt.Sprintf("%d", n) + headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1) + + fn(api.ProgressResponse{ + Status: fmt.Sprintf("uploading %s", layer.Digest), + Digest: layer.Digest, + Total: int(layer.Size), + Completed: int(totalUploaded), + }) + + // change the buffersize for the last chunk + if n < chunkSize { + buf = buf[:n] + } + resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts) + if err != nil { + log.Printf("couldn't upload blob: %v", err) + return err + } + defer resp.Body.Close() + url = resp.Header.Get("Location") + + // Check for success: For a successful upload, the Docker registry will respond with a 201 Created + if resp.StatusCode != http.StatusAccepted { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error uploading layer"), + Digest: layer.Digest, + Total: int(layer.Size), + Completed: int(totalUploaded), + }) + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) + } + + totalUploaded += n + if totalUploaded >= layer.Size { + url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) + + // finish the upload + resp, err := makeRequest("PUT", url, nil, nil, regOpts) + if err != nil { + log.Printf("couldn't finish upload: %v", err) + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) + } + break + } + } return nil } @@ -974,8 +1002,6 @@ func makeRequest(method, url string, headers map[string]string, body io.Reader, } } - log.Printf("url = %s", url) - req, err := http.NewRequest(method, url, body) if err != nil { return nil, err