change push to chunked uploads from monolithic (#179)

This commit is contained in:
Patrick Devine 2023-07-22 17:31:26 -07:00 committed by GitHub
parent c448443813
commit 88c55199f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 52 deletions

View file

@ -94,9 +94,25 @@ func PushHandler(cmd *cobra.Command, args []string) error {
return err return err
} }
var currentDigest string
var bar *progressbar.ProgressBar
request := api.PushRequest{Name: args[0], Insecure: insecure} request := api.PushRequest{Name: args[0], Insecure: insecure}
fn := func(resp api.ProgressResponse) error { fn := func(resp api.ProgressResponse) error {
if resp.Digest != currentDigest && resp.Digest != "" {
currentDigest = resp.Digest
bar = progressbar.DefaultBytes(
int64(resp.Total),
fmt.Sprintf("pushing %s...", resp.Digest[7:19]),
)
bar.Set(resp.Completed)
} else if resp.Digest == currentDigest && resp.Digest != "" {
bar.Set(resp.Completed)
} else {
currentDigest = ""
fmt.Println(resp.Status) fmt.Println(resp.Status)
}
return nil return nil
} }

View file

@ -582,14 +582,10 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
} }
var layers []*Layer var layers []*Layer
var total int
var completed int
for _, layer := range manifest.Layers { for _, layer := range manifest.Layers {
layers = append(layers, layer) layers = append(layers, layer)
total += layer.Size
} }
layers = append(layers, &manifest.Config) layers = append(layers, &manifest.Config)
total += manifest.Config.Size
for _, layer := range layers { for _, layer := range layers {
exists, err := checkBlobExistence(mp, layer.Digest, regOpts) exists, err := checkBlobExistence(mp, layer.Digest, regOpts)
@ -598,21 +594,20 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
} }
if exists { if exists {
completed += layer.Size
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: "using existing layer", Status: "using existing layer",
Digest: layer.Digest, Digest: layer.Digest,
Total: total, Total: layer.Size,
Completed: completed, Completed: layer.Size,
}) })
log.Printf("Layer %s already exists", layer.Digest)
continue continue
} }
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: "starting upload", Status: "starting upload",
Digest: layer.Digest, Digest: layer.Digest,
Total: total, Total: layer.Size,
Completed: completed,
}) })
location, err := startUpload(mp, regOpts) location, err := startUpload(mp, regOpts)
@ -621,25 +616,14 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
return err return err
} }
err = uploadBlob(location, layer, regOpts) err = uploadBlobChunked(mp, location, layer, regOpts, fn)
if err != nil { if err != nil {
log.Printf("error uploading blob: %v", err) log.Printf("error uploading blob: %v", err)
return err return err
} }
completed += layer.Size
fn(api.ProgressResponse{
Status: "upload complete",
Digest: layer.Digest,
Total: total,
Completed: completed,
})
} }
fn(api.ProgressResponse{ fn(api.ProgressResponse{Status: "pushing manifest"})
Status: "pushing manifest",
Total: total,
Completed: completed,
})
url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag) url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
headers := map[string]string{ headers := map[string]string{
"Content-Type": "application/vnd.docker.distribution.manifest.v2+json", "Content-Type": "application/vnd.docker.distribution.manifest.v2+json",
@ -662,11 +646,7 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
} }
fn(api.ProgressResponse{ fn(api.ProgressResponse{Status: "success"})
Status: "success",
Total: total,
Completed: completed,
})
return nil return nil
} }
@ -828,19 +808,14 @@ func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) (
return resp.StatusCode == http.StatusOK, nil return resp.StatusCode == http.StatusOK, nil
} }
func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error { func uploadBlobChunked(mp ModelPath, location string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
// Create URL
url := fmt.Sprintf("%s&digest=%s", location, layer.Digest)
headers := make(map[string]string)
headers["Content-Length"] = fmt.Sprintf("%d", layer.Size)
headers["Content-Type"] = "application/octet-stream"
// TODO change from monolithic uploads to chunked uploads
// TODO allow resumability // TODO allow resumability
// TODO allow canceling uploads via DELETE // TODO allow canceling uploads via DELETE
// TODO allow cross repo blob mount // TODO allow cross repo blob mount
// Create URL
url := fmt.Sprintf("%s", location)
fp, err := GetBlobsPath(layer.Digest) fp, err := GetBlobsPath(layer.Digest)
if err != nil { if err != nil {
return err return err
@ -851,19 +826,72 @@ func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error {
return err return err
} }
resp, err := makeRequest("PUT", url, headers, f, regOpts) headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream"
chunkSize := 1 << 20
buf := make([]byte, chunkSize)
var totalUploaded int
for {
n, err := f.Read(buf)
if err != nil {
return err
}
headers["Content-Length"] = fmt.Sprintf("%d", n)
headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1)
fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
})
// change the buffersize for the last chunk
if n < chunkSize {
buf = buf[:n]
}
resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts)
if err != nil { if err != nil {
log.Printf("couldn't upload blob: %v", err) log.Printf("couldn't upload blob: %v", err)
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
url = resp.Header.Get("Location")
// Check for success: For a successful upload, the Docker registry will respond with a 201 Created // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
if resp.StatusCode != http.StatusCreated { if resp.StatusCode != http.StatusAccepted {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error uploading layer"),
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
})
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body)) return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
} }
totalUploaded += n
if totalUploaded >= layer.Size {
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
// finish the upload
resp, err := makeRequest("PUT", url, nil, nil, regOpts)
if err != nil {
log.Printf("couldn't finish upload: %v", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
}
break
}
}
return nil return nil
} }
@ -974,8 +1002,6 @@ func makeRequest(method, url string, headers map[string]string, body io.Reader,
} }
} }
log.Printf("url = %s", url)
req, err := http.NewRequest(method, url, body) req, err := http.NewRequest(method, url, body)
if err != nil { if err != nil {
return nil, err return nil, err