One corrupt manifest should not wedge model operations (#7515)
One potential failure mode is an empty file which bubbles up as an EOF error, leading to all pulls and listing operations failing. Instead, continue and warn about the corrupt manifest. This also allows re-pulling the corrupt manifest to repair the system.
This commit is contained in:
parent
34a75102f7
commit
a4c70fe157
5 changed files with 34 additions and 21 deletions
|
@ -690,7 +690,8 @@ func CopyModel(src, dst model.Name) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteUnusedLayers(deleteMap map[string]struct{}) error {
|
func deleteUnusedLayers(deleteMap map[string]struct{}) error {
|
||||||
manifests, err := Manifests()
|
// Ignore corrupt manifests to avoid blocking deletion of layers that are freshly orphaned
|
||||||
|
manifests, err := Manifests(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -853,8 +854,8 @@ func PullModel(ctx context.Context, name string, regOpts *registryOptions, fn fu
|
||||||
manifest, _, err := GetManifest(mp)
|
manifest, _, err := GetManifest(mp)
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
// noop
|
// noop
|
||||||
} else if err != nil && !errors.Is(err, os.ErrNotExist) {
|
} else if err != nil {
|
||||||
return err
|
slog.Warn("pulling model with bad existing manifest", "name", name, "error", err)
|
||||||
} else {
|
} else {
|
||||||
for _, l := range manifest.Layers {
|
for _, l := range manifest.Layers {
|
||||||
deleteMap[l.Digest] = struct{}{}
|
deleteMap[l.Digest] = struct{}{}
|
||||||
|
|
|
@ -106,7 +106,8 @@ func (l *Layer) Remove() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ms, err := Manifests()
|
// Ignore corrupt manifests to avoid blocking deletion of layers that are freshly orphaned
|
||||||
|
ms, err := Manifests(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,7 +123,7 @@ func WriteManifest(name model.Name, config Layer, layers []Layer) error {
|
||||||
return json.NewEncoder(f).Encode(m)
|
return json.NewEncoder(f).Encode(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Manifests() (map[model.Name]*Manifest, error) {
|
func Manifests(continueOnError bool) (map[model.Name]*Manifest, error) {
|
||||||
manifests, err := GetManifestPath()
|
manifests, err := GetManifestPath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -145,22 +145,29 @@ func Manifests() (map[model.Name]*Manifest, error) {
|
||||||
if !fi.IsDir() {
|
if !fi.IsDir() {
|
||||||
rel, err := filepath.Rel(manifests, match)
|
rel, err := filepath.Rel(manifests, match)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if !continueOnError {
|
||||||
|
return nil, fmt.Errorf("%s %w", match, err)
|
||||||
|
}
|
||||||
slog.Warn("bad filepath", "path", match, "error", err)
|
slog.Warn("bad filepath", "path", match, "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
n := model.ParseNameFromFilepath(rel)
|
n := model.ParseNameFromFilepath(rel)
|
||||||
if !n.IsValid() {
|
if !n.IsValid() {
|
||||||
|
if !continueOnError {
|
||||||
|
return nil, fmt.Errorf("%s %w", rel, err)
|
||||||
|
}
|
||||||
slog.Warn("bad manifest name", "path", rel)
|
slog.Warn("bad manifest name", "path", rel)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m, err := ParseNamedManifest(n)
|
m, err := ParseNamedManifest(n)
|
||||||
if syntax := &(json.SyntaxError{}); errors.As(err, &syntax) {
|
if err != nil {
|
||||||
|
if !continueOnError {
|
||||||
|
return nil, fmt.Errorf("%s %w", n, err)
|
||||||
|
}
|
||||||
slog.Warn("bad manifest", "name", n, "error", err)
|
slog.Warn("bad manifest", "name", n, "error", err)
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
|
||||||
return nil, fmt.Errorf("%s: %w", n, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ms[n] = m
|
ms[n] = m
|
||||||
|
|
|
@ -112,7 +112,7 @@ func TestManifests(t *testing.T) {
|
||||||
createManifest(t, d, p)
|
createManifest(t, d, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
ms, err := Manifests()
|
ms, err := Manifests(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -622,7 +622,7 @@ func (s *Server) PushHandler(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkNameExists(name model.Name) error {
|
func checkNameExists(name model.Name) error {
|
||||||
names, err := Manifests()
|
names, err := Manifests(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -894,7 +894,7 @@ func getKVData(digest string, verbose bool) (llm.KV, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) ListHandler(c *gin.Context) {
|
func (s *Server) ListHandler(c *gin.Context) {
|
||||||
ms, err := Manifests()
|
ms, err := Manifests(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||||
return
|
return
|
||||||
|
@ -1211,6 +1211,9 @@ func Serve(ln net.Listener) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !envconfig.NoPrune() {
|
if !envconfig.NoPrune() {
|
||||||
|
if _, err := Manifests(false); err != nil {
|
||||||
|
slog.Warn("corrupt manifests detected, skipping prune operation. Re-pull or delete to clear", "error", err)
|
||||||
|
} else {
|
||||||
// clean up unused layers and manifests
|
// clean up unused layers and manifests
|
||||||
if err := PruneLayers(); err != nil {
|
if err := PruneLayers(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1225,6 +1228,7 @@ func Serve(ln net.Listener) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ctx, done := context.WithCancel(context.Background())
|
ctx, done := context.WithCancel(context.Background())
|
||||||
schedCtx, schedDone := context.WithCancel(ctx)
|
schedCtx, schedDone := context.WithCancel(ctx)
|
||||||
|
|
Loading…
Reference in a new issue