Merge pull request #975 from jmorganca/mxyng/downloads
update downloads to use retry wrapper
This commit is contained in:
commit
1fd511e661
5 changed files with 24 additions and 39 deletions
|
@ -72,7 +72,7 @@ func ClientFromEnvironment() (*Client, error) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
mockRequest, err := http.NewRequest("HEAD", client.base.String(), nil)
|
mockRequest, err := http.NewRequest(http.MethodHead, client.base.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ func getAuthToken(ctx context.Context, redirData AuthRedirect) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s := SignatureData{
|
s := SignatureData{
|
||||||
Method: "GET",
|
Method: http.MethodGet,
|
||||||
Path: redirectURL.String(),
|
Path: redirectURL.String(),
|
||||||
Data: nil,
|
Data: nil,
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ func getAuthToken(ctx context.Context, redirData AuthRedirect) (string, error) {
|
||||||
|
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
headers.Set("Authorization", sig)
|
headers.Set("Authorization", sig)
|
||||||
resp, err := makeRequest(ctx, "GET", redirectURL, headers, nil, nil)
|
resp, err := makeRequest(ctx, http.MethodGet, redirectURL, headers, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("couldn't get token: %q", err)
|
log.Printf("couldn't get token: %q", err)
|
||||||
return "", err
|
return "", err
|
||||||
|
|
|
@ -89,17 +89,12 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *R
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(b.Parts) == 0 {
|
if len(b.Parts) == 0 {
|
||||||
resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode >= http.StatusBadRequest {
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
|
|
||||||
}
|
|
||||||
|
|
||||||
b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
|
b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
|
||||||
|
|
||||||
var size = b.Total / numDownloadParts
|
var size = b.Total / numDownloadParts
|
||||||
|
@ -199,7 +194,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
|
||||||
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
|
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
|
headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
|
||||||
resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, opts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1002,7 +1002,7 @@ func PushModel(ctx context.Context, name string, regOpts *RegistryOptions, fn fu
|
||||||
|
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
headers.Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
|
headers.Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
|
||||||
resp, err := makeRequestWithRetry(ctx, "PUT", requestURL, headers, bytes.NewReader(manifestJSON), regOpts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, bytes.NewReader(manifestJSON), regOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1124,22 +1124,12 @@ func pullModelManifest(ctx context.Context, mp ModelPath, regOpts *RegistryOptio
|
||||||
|
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
headers.Set("Accept", "application/vnd.docker.distribution.manifest.v2+json")
|
headers.Set("Accept", "application/vnd.docker.distribution.manifest.v2+json")
|
||||||
resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, regOpts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, regOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("couldn't get manifest: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode >= http.StatusBadRequest {
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
|
||||||
return nil, fmt.Errorf("model not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("on pull registry responded with code %d: %s", resp.StatusCode, body)
|
|
||||||
}
|
|
||||||
|
|
||||||
var m *ManifestV2
|
var m *ManifestV2
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1202,15 +1192,19 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR
|
||||||
|
|
||||||
regOpts.Token = token
|
regOpts.Token = token
|
||||||
if body != nil {
|
if body != nil {
|
||||||
if _, err := body.Seek(0, io.SeekStart); err != nil {
|
body.Seek(0, io.SeekStart)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
case resp.StatusCode == http.StatusNotFound:
|
||||||
|
return nil, os.ErrNotExist
|
||||||
case resp.StatusCode >= http.StatusBadRequest:
|
case resp.StatusCode >= http.StatusBadRequest:
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
return nil, fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%d: %s", resp.StatusCode, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("%d: %s", resp.StatusCode, body)
|
||||||
default:
|
default:
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
|
||||||
requestURL.RawQuery = values.Encode()
|
requestURL.RawQuery = values.Encode()
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := makeRequestWithRetry(ctx, "POST", requestURL, nil, nil, opts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
|
||||||
headers.Set("Content-Type", "application/octet-stream")
|
headers.Set("Content-Type", "application/octet-stream")
|
||||||
headers.Set("Content-Length", "0")
|
headers.Set("Content-Length", "0")
|
||||||
|
|
||||||
resp, err := makeRequestWithRetry(ctx, "PUT", requestURL, headers, nil, opts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.err = err
|
b.err = err
|
||||||
return
|
return
|
||||||
|
@ -334,15 +334,13 @@ func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryO
|
||||||
requestURL := mp.BaseURL()
|
requestURL := mp.BaseURL()
|
||||||
requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)
|
requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)
|
||||||
|
|
||||||
resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
|
resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
|
||||||
if err != nil {
|
switch {
|
||||||
|
case errors.Is(err, os.ErrNotExist):
|
||||||
|
case err != nil:
|
||||||
return err
|
return err
|
||||||
}
|
default:
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
switch resp.StatusCode {
|
|
||||||
case http.StatusNotFound:
|
|
||||||
case http.StatusOK:
|
|
||||||
fn(api.ProgressResponse{
|
fn(api.ProgressResponse{
|
||||||
Status: fmt.Sprintf("uploading %s", layer.Digest),
|
Status: fmt.Sprintf("uploading %s", layer.Digest),
|
||||||
Digest: layer.Digest,
|
Digest: layer.Digest,
|
||||||
|
@ -351,8 +349,6 @@ func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryO
|
||||||
})
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
default:
|
|
||||||
return fmt.Errorf("unexpected status code %d", resp.StatusCode)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
|
data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
|
||||||
|
|
Loading…
Reference in a new issue