use a pipe to push to registry with progress

switch to a monolithic upload instead of a chunk upload through a pipe
to report progress
This commit is contained in:
Michael Yang 2023-08-01 12:15:22 -07:00
parent f0b365a478
commit a71ff3f6a2

View file

@ -906,71 +906,58 @@ func uploadBlobChunked(mp ModelPath, url string, layer *Layer, regOpts *Registry
return err
}
totalUploaded := 0
r, w := io.Pipe()
defer r.Close()
go func() {
defer w.Close()
for {
n, err := io.CopyN(w, f, 1024*1024)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error copying pipe: %v", err),
Digest: layer.Digest,
Total: layer.Size,
Completed: totalUploaded,
})
return
}
totalUploaded += int(n)
fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: layer.Size,
Completed: totalUploaded,
})
if totalUploaded >= layer.Size {
return
}
}
}()
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream"
headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1)
headers["Content-Length"] = strconv.Itoa(int(layer.Size))
chunkSize := 1 << 20
buf := make([]byte, chunkSize)
var totalUploaded int
// finish the upload
resp, err := makeRequest("PUT", url, headers, r, regOpts)
if err != nil {
log.Printf("couldn't finish upload: %v", err)
return err
}
defer resp.Body.Close()
for {
n, err := f.Read(buf)
if err != nil {
return err
}
headers["Content-Length"] = fmt.Sprintf("%d", n)
headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1)
fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
})
// change the buffersize for the last chunk
if n < chunkSize {
buf = buf[:n]
}
resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts)
if err != nil {
log.Printf("couldn't upload blob: %v", err)
return err
}
defer resp.Body.Close()
url = resp.Header.Get("Location")
// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
if resp.StatusCode != http.StatusAccepted {
fn(api.ProgressResponse{
Status: "error uploading layer",
Digest: layer.Digest,
Total: int(layer.Size),
Completed: int(totalUploaded),
})
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on layer upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
totalUploaded += n
if totalUploaded >= layer.Size {
url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
// finish the upload
resp, err := makeRequest("PUT", url, nil, nil, regOpts)
if err != nil {
log.Printf("couldn't finish upload: %v", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
break
}
if resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
}
return nil
}