retry on unauthorized chunk push

The token printed for authorized requests has a lifetime of 1h. If an
upload exceeds 1h, a chunk push will fail since the token is created on
a "start upload" request.

This replaces the Pipe with SectionReader which is simpler and
implements Seek, a requirement for makeRequestWithRetry. This is
slightly worse than using a Pipe since the progress update is directly
tied to the chunk size instead of controlled separately.
This commit is contained in:
Michael Yang 2023-08-18 10:03:29 -07:00
parent 5ca05c2e88
commit 3b49315f97

View file

@ -1181,66 +1181,45 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Lay
} }
defer f.Close() defer f.Close()
completed := 0 var completed int64
chunkSize := 10 * 1024 * 1024 chunkSize := 10 * 1024 * 1024
for { for {
r, w := io.Pipe() chunk := int64(layer.Size) - completed
defer r.Close() if chunk > int64(chunkSize) {
chunk = int64(chunkSize)
limit := completed + chunkSize
if chunkSize >= layer.Size-completed {
limit = layer.Size
chunkSize = layer.Size - completed
} }
go func() { sectionReader := io.NewSectionReader(f, int64(completed), chunk)
defer w.Close()
for {
n, err := io.CopyN(w, f, 1024*1024)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error copying pipe: %v", err),
Digest: layer.Digest,
Total: layer.Size,
Completed: completed,
})
return
}
completed += int(n)
fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest),
Digest: layer.Digest,
Total: layer.Size,
Completed: completed,
})
if completed >= limit {
return
}
}
}()
headers := make(map[string]string) headers := make(map[string]string)
headers["Content-Type"] = "application/octet-stream" headers["Content-Type"] = "application/octet-stream"
headers["Content-Length"] = strconv.Itoa(chunkSize) headers["Content-Length"] = strconv.Itoa(int(chunk))
headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, limit-1) headers["Content-Range"] = fmt.Sprintf("%d-%d", completed, completed+sectionReader.Size()-1)
resp, err := makeRequestWithRetry(ctx, "PATCH", url, headers, sectionReader, regOpts)
if err != nil && !errors.Is(err, io.EOF) {
fn(api.ProgressResponse{
Status: fmt.Sprintf("error uploading chunk: %v", err),
Digest: layer.Digest,
Total: layer.Size,
Completed: int(completed),
})
resp, err := makeRequest(ctx, "PATCH", url, headers, r, regOpts)
if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted { completed += sectionReader.Size()
body, _ := io.ReadAll(resp.Body) fn(api.ProgressResponse{
return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body)) Status: fmt.Sprintf("uploading %s", layer.Digest),
} Digest: layer.Digest,
Total: layer.Size,
Completed: int(completed),
})
url = resp.Header.Get("Location") url = resp.Header.Get("Location")
if completed >= layer.Size { if completed >= int64(layer.Size) {
break break
} }
} }