From bf146fb072b8dbf49efa2f874959ad978c48bf29 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Thu, 7 Sep 2023 12:01:50 -0700 Subject: [PATCH] fix retry on unauthorized chunk --- server/upload.go | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/server/upload.go b/server/upload.go index cf51eea6..a8c62827 100644 --- a/server/upload.go +++ b/server/upload.go @@ -66,31 +66,39 @@ func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, r sectionReader := io.NewSectionReader(f, int64(offset), chunk) for try := 0; try < MaxRetries; try++ { + ch := make(chan error, 1) + r, w := io.Pipe() defer r.Close() go func() { defer w.Close() for chunked := int64(0); chunked < chunk; { - n, err := io.CopyN(w, sectionReader, 1024*1024) - if err != nil && !errors.Is(err, io.EOF) { + select { + case err := <-ch: + log.Printf("chunk interrupted: %v", err) + return + default: + n, err := io.CopyN(w, sectionReader, 1024*1024) + if err != nil && !errors.Is(err, io.EOF) { + fn(api.ProgressResponse{ + Status: fmt.Sprintf("error reading chunk: %v", err), + Digest: layer.Digest, + Total: layer.Size, + Completed: int(offset), + }) + + return + } + + chunked += n fn(api.ProgressResponse{ - Status: fmt.Sprintf("error reading chunk: %v", err), + Status: fmt.Sprintf("uploading %s", layer.Digest), Digest: layer.Digest, Total: layer.Size, - Completed: int(offset), + Completed: int(offset) + int(chunked), }) - - return } - - chunked += n - fn(api.ProgressResponse{ - Status: fmt.Sprintf("uploading %s", layer.Digest), - Digest: layer.Digest, - Total: layer.Size, - Completed: int(offset) + int(chunked), - }) } }() @@ -113,6 +121,8 @@ func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, r switch { case resp.StatusCode == http.StatusUnauthorized: + ch <- errors.New("unauthorized") + auth := resp.Header.Get("www-authenticate") authRedir := ParseAuthRedirectString(auth) token, err := getAuthToken(ctx, authRedir, regOpts) @@ -121,10 +131,7 @@ func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, r } regOpts.Token = token - if _, err := sectionReader.Seek(0, io.SeekStart); err != nil { - return err - } - + sectionReader = io.NewSectionReader(f, int64(offset), chunk) continue case resp.StatusCode >= http.StatusBadRequest: body, _ := io.ReadAll(resp.Body)