diff --git a/server/images.go b/server/images.go index e8111b72..bdf166de 100644 --- a/server/images.go +++ b/server/images.go @@ -1180,7 +1180,6 @@ func checkBlobExistence(ctx context.Context, mp ModelPath, digest string, regOpt func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { // TODO allow resumability // TODO allow canceling uploads via DELETE - // TODO allow cross repo blob mount fp, err := GetBlobsPath(layer.Digest) if err != nil { @@ -1193,49 +1192,78 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Lay } defer f.Close() - totalUploaded := 0 + completed := 0 + chunkSize := 10 * 1024 * 1024 - r, w := io.Pipe() - defer r.Close() + for { + r, w := io.Pipe() + defer r.Close() + + limit := completed + chunkSize + if chunkSize >= layer.Size-completed { + limit = layer.Size + chunkSize = layer.Size - completed + } + + go func() { + defer w.Close() + for { + n, err := io.CopyN(w, f, 1024*1024) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error copying pipe: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: completed, + }) + return + } + + completed += int(n) - go func() { - defer w.Close() - for { - n, err := io.CopyN(w, f, 1024*1024) - if err != nil && !errors.Is(err, io.EOF) { fn(api.ProgressResponse{ - Status: fmt.Sprintf("error copying pipe: %v", err), + Status: fmt.Sprintf("uploading %s", layer.Digest), Digest: layer.Digest, Total: layer.Size, - Completed: totalUploaded, + Completed: completed, }) - return + + if completed >= limit { + return + } } + }() - totalUploaded += int(n) + headers := make(map[string]string) + headers["Content-Type"] = "application/octet-stream" + headers["Content-Length"] = strconv.Itoa(chunkSize) + headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, limit-1) - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: layer.Size, - Completed: totalUploaded, - }) - - if totalUploaded >= layer.Size { - return - } + resp, err := makeRequest(ctx, "PATCH", url, headers, r, regOpts) + if err != nil { + return err } - }() + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) + } + + url = resp.Header.Get("Location") + if completed >= layer.Size { + break + } + } url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) headers := make(map[string]string) headers["Content-Type"] = "application/octet-stream" - headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1) - headers["Content-Length"] = strconv.Itoa(int(layer.Size)) + headers["Content-Length"] = "0" // finish the upload - resp, err := makeRequest(ctx, "PUT", url, headers, r, regOpts) + resp, err := makeRequest(ctx, "PUT", url, headers, nil, regOpts) if err != nil { log.Printf("couldn't finish upload: %v", err) return err