reimplement chunked uploads

This commit is contained in:
Michael Yang 2023-08-14 17:50:06 -07:00
parent 9f944c00f1
commit 5dfe91be8b

View file

@ -1173,7 +1173,6 @@ func checkBlobExistence(ctx context.Context, mp ModelPath, digest string, regOpt
func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error { func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
// TODO allow resumability // TODO allow resumability
// TODO allow canceling uploads via DELETE // TODO allow canceling uploads via DELETE
// TODO allow cross repo blob mount
fp, err := GetBlobsPath(layer.Digest) fp, err := GetBlobsPath(layer.Digest)
if err != nil { if err != nil {
@ -1186,49 +1185,78 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Lay
} }
defer f.Close() defer f.Close()
totalUploaded := 0 completed := 0
chunkSize := 10 * 1024 * 1024
r, w := io.Pipe() for {
defer r.Close() r, w := io.Pipe()
defer r.Close()
limit := completed + chunkSize
if chunkSize >= layer.Size-completed {
limit = layer.Size
chunkSize = layer.Size - completed
}
go func() {
defer w.Close()
for {
n, err := io.CopyN(w, f, 1024*1024)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error copying pipe: %v", err),
Digest: layer.Digest,
Total: layer.Size,
Completed: completed,
})
return
}
completed += int(n)
go func() {
defer w.Close()
for {
n, err := io.CopyN(w, f, 1024*1024)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: fmt.Sprintf("error copying pipe: %v", err), Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest, Digest: layer.Digest,
Total: layer.Size, Total: layer.Size,
Completed: totalUploaded, Completed: completed,
}) })
return
if completed >= limit {
return
}
} }
}()
totalUploaded += int(n) headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream"
headers["Content-Length"] = strconv.Itoa(chunkSize)
headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, limit-1)
fn(api.ProgressResponse{ resp, err := makeRequest(ctx, "PATCH", url, headers, r, regOpts)
Status: fmt.Sprintf("uploading %s", layer.Digest), if err != nil {
Digest: layer.Digest, return err
Total: layer.Size,
Completed: totalUploaded,
})
if totalUploaded >= layer.Size {
return
}
} }
}() defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
url = resp.Header.Get("Location")
if completed >= layer.Size {
break
}
}
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest) url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
headers := make(map[string]string) headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream" headers["Content-Type"] = "application/octet-stream"
headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1) headers["Content-Length"] = "0"
headers["Content-Length"] = strconv.Itoa(int(layer.Size))
// finish the upload // finish the upload
resp, err := makeRequest(ctx, "PUT", url, headers, r, regOpts) resp, err := makeRequest(ctx, "PUT", url, headers, nil, regOpts)
if err != nil { if err != nil {
log.Printf("couldn't finish upload: %v", err) log.Printf("couldn't finish upload: %v", err)
return err return err