Merge pull request #109 from jmorganca/fix-create-memory

fix memory leak in create
This commit is contained in:
Michael Yang 2023-07-18 17:25:19 -07:00 committed by GitHub
commit 6e36f948df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -3,7 +3,6 @@ package server
import ( import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -42,10 +41,9 @@ type Layer struct {
Size int `json:"size"` Size int `json:"size"`
} }
type LayerWithBuffer struct { type LayerReader struct {
Layer Layer
io.Reader
Buffer *bytes.Buffer
} }
type ConfigV2 struct { type ConfigV2 struct {
@ -161,7 +159,7 @@ func CreateModel(name string, mf io.Reader, fn func(status string)) error {
return err return err
} }
var layers []*LayerWithBuffer var layers []*LayerReader
params := make(map[string]string) params := make(map[string]string)
for _, c := range commands { for _, c := range commands {
@ -274,7 +272,7 @@ func CreateModel(name string, mf io.Reader, fn func(status string)) error {
return nil return nil
} }
func removeLayerFromLayers(layers []*LayerWithBuffer, mediaType string) []*LayerWithBuffer { func removeLayerFromLayers(layers []*LayerReader, mediaType string) []*LayerReader {
j := 0 j := 0
for _, l := range layers { for _, l := range layers {
if l.MediaType != mediaType { if l.MediaType != mediaType {
@ -285,7 +283,7 @@ func removeLayerFromLayers(layers []*LayerWithBuffer, mediaType string) []*Layer
return layers[:j] return layers[:j]
} }
func SaveLayers(layers []*LayerWithBuffer, fn func(status string), force bool) error { func SaveLayers(layers []*LayerReader, fn func(status string), force bool) error {
// Write each of the layers to disk // Write each of the layers to disk
for _, layer := range layers { for _, layer := range layers {
fp, err := GetBlobsPath(layer.Digest) fp, err := GetBlobsPath(layer.Digest)
@ -303,10 +301,10 @@ func SaveLayers(layers []*LayerWithBuffer, fn func(status string), force bool) e
} }
defer out.Close() defer out.Close()
_, err = io.Copy(out, layer.Buffer) if _, err = io.Copy(out, layer.Reader); err != nil {
if err != nil {
return err return err
} }
} else { } else {
fn(fmt.Sprintf("using already created layer %s", layer.Digest)) fn(fmt.Sprintf("using already created layer %s", layer.Digest))
} }
@ -315,7 +313,7 @@ func SaveLayers(layers []*LayerWithBuffer, fn func(status string), force bool) e
return nil return nil
} }
func CreateManifest(name string, cfg *LayerWithBuffer, layers []*Layer) error { func CreateManifest(name string, cfg *LayerReader, layers []*Layer) error {
mp := ParseModelPath(name) mp := ParseModelPath(name)
manifest := ManifestV2{ manifest := ManifestV2{
@ -341,7 +339,7 @@ func CreateManifest(name string, cfg *LayerWithBuffer, layers []*Layer) error {
return os.WriteFile(fp, manifestJSON, 0o644) return os.WriteFile(fp, manifestJSON, 0o644)
} }
func GetLayerWithBufferFromLayer(layer *Layer) (*LayerWithBuffer, error) { func GetLayerWithBufferFromLayer(layer *Layer) (*LayerReader, error) {
fp, err := GetBlobsPath(layer.Digest) fp, err := GetBlobsPath(layer.Digest)
if err != nil { if err != nil {
return nil, err return nil, err
@ -361,7 +359,7 @@ func GetLayerWithBufferFromLayer(layer *Layer) (*LayerWithBuffer, error) {
return newLayer, nil return newLayer, nil
} }
func paramsToReader(params map[string]string) (io.Reader, error) { func paramsToReader(params map[string]string) (io.ReadSeeker, error) {
opts := api.DefaultOptions() opts := api.DefaultOptions()
typeOpts := reflect.TypeOf(opts) typeOpts := reflect.TypeOf(opts)
@ -419,7 +417,7 @@ func paramsToReader(params map[string]string) (io.Reader, error) {
return bytes.NewReader(bts), nil return bytes.NewReader(bts), nil
} }
func getLayerDigests(layers []*LayerWithBuffer) ([]string, error) { func getLayerDigests(layers []*LayerReader) ([]string, error) {
var digests []string var digests []string
for _, l := range layers { for _, l := range layers {
if l.Digest == "" { if l.Digest == "" {
@ -431,22 +429,17 @@ func getLayerDigests(layers []*LayerWithBuffer) ([]string, error) {
} }
// CreateLayer creates a Layer object from a given file // CreateLayer creates a Layer object from a given file
func CreateLayer(f io.Reader) (*LayerWithBuffer, error) { func CreateLayer(f io.ReadSeeker) (*LayerReader, error) {
buf := new(bytes.Buffer) digest, size := GetSHA256Digest(f)
_, err := io.Copy(buf, f) f.Seek(0, 0)
if err != nil {
return nil, err
}
digest, size := GetSHA256Digest(buf) layer := &LayerReader{
layer := &LayerWithBuffer{
Layer: Layer{ Layer: Layer{
MediaType: "application/vnd.docker.image.rootfs.diff.tar", MediaType: "application/vnd.docker.image.rootfs.diff.tar",
Digest: digest, Digest: digest,
Size: size, Size: size,
}, },
Buffer: buf, Reader: f,
} }
return layer, nil return layer, nil
@ -609,7 +602,7 @@ func pullModelManifest(mp ModelPath, username, password string) (*ManifestV2, er
return m, err return m, err
} }
func createConfigLayer(layers []string) (*LayerWithBuffer, error) { func createConfigLayer(layers []string) (*LayerReader, error) {
// TODO change architecture and OS // TODO change architecture and OS
config := ConfigV2{ config := ConfigV2{
Architecture: "arm64", Architecture: "arm64",
@ -628,22 +621,26 @@ func createConfigLayer(layers []string) (*LayerWithBuffer, error) {
buf := bytes.NewBuffer(configJSON) buf := bytes.NewBuffer(configJSON)
digest, size := GetSHA256Digest(buf) digest, size := GetSHA256Digest(buf)
layer := &LayerWithBuffer{ layer := &LayerReader{
Layer: Layer{ Layer: Layer{
MediaType: "application/vnd.docker.container.image.v1+json", MediaType: "application/vnd.docker.container.image.v1+json",
Digest: digest, Digest: digest,
Size: size, Size: size,
}, },
Buffer: buf, Reader: buf,
} }
return layer, nil return layer, nil
} }
// GetSHA256Digest returns the SHA256 hash of a given buffer and returns it, and the size of buffer // GetSHA256Digest returns the SHA256 hash of a given buffer and returns it, and the size of buffer
func GetSHA256Digest(data *bytes.Buffer) (string, int) { func GetSHA256Digest(r io.Reader) (string, int) {
layerBytes := data.Bytes() h := sha256.New()
hash := sha256.Sum256(layerBytes) n, err := io.Copy(h, r)
return "sha256:" + hex.EncodeToString(hash[:]), len(layerBytes) if err != nil {
log.Fatal(err)
}
return fmt.Sprintf("sha256:%x", h.Sum(nil)), int(n)
} }
func startUpload(mp ModelPath, username string, password string) (string, error) { func startUpload(mp ModelPath, username string, password string) (string, error) {