Move ggml loading to when we attempt fitting

This commit is contained in:
Bryce Reitano 2024-04-24 17:17:24 -06:00
parent ade4b55520
commit 284e02bed0
2 changed files with 37 additions and 28 deletions

View file

@ -23,7 +23,6 @@ import (
type LlmRequest struct { type LlmRequest struct {
ctx context.Context //nolint:containedctx ctx context.Context //nolint:containedctx
model *Model model *Model
ggml *llm.GGML // TODO - how large is this, and do we need to free it after we've finished loading?
opts api.Options opts api.Options
sessionDuration time.Duration sessionDuration time.Duration
successCh chan *runnerRef successCh chan *runnerRef
@ -39,7 +38,7 @@ type Scheduler struct {
loaded map[string]*runnerRef loaded map[string]*runnerRef
loadedMu sync.Mutex loadedMu sync.Mutex
loadFn func(req *LlmRequest, gpus gpu.GpuInfoList) loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
getGpuFn func() gpu.GpuInfoList getGpuFn func() gpu.GpuInfoList
} }
@ -74,20 +73,14 @@ func InitScheduler(ctx context.Context) *Scheduler {
// context must be canceled to decrement ref count and release the runner // context must be canceled to decrement ref count and release the runner
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) { func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
ggml, err := llm.LoadModel(model.ModelPath)
req := &LlmRequest{ req := &LlmRequest{
ctx: c, ctx: c,
model: model, model: model,
ggml: ggml,
opts: opts, opts: opts,
sessionDuration: sessionDuration, sessionDuration: sessionDuration,
successCh: make(chan *runnerRef), successCh: make(chan *runnerRef),
errCh: make(chan error, 1), errCh: make(chan error, 1),
} }
if err != nil {
req.errCh <- err
return req.successCh, req.errCh
}
select { select {
case s.pendingReqCh <- req: case s.pendingReqCh <- req:
default: default:
@ -133,11 +126,17 @@ func (s *Scheduler) processPending(ctx context.Context) {
} else if loadedCount == 0 { } else if loadedCount == 0 {
slog.Debug("loading first model", "model", pending.model.ModelPath) slog.Debug("loading first model", "model", pending.model.ModelPath)
gpus := s.getGpuFn() gpus := s.getGpuFn()
g := pickBestFitGPUs(pending, gpus)
ggml, err := llm.LoadModel(pending.model.ModelPath)
if err != nil {
pending.errCh <- err
break
}
g := pickBestFitGPUs(pending, ggml, gpus)
if g != nil { if g != nil {
gpus = g gpus = g
} }
s.loadFn(pending, gpus) s.loadFn(pending, ggml, gpus)
break break
} else if loadedMax > 0 && loadedCount >= loadedMax { } else if loadedMax > 0 && loadedCount >= loadedMax {
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount) slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
@ -148,10 +147,16 @@ func (s *Scheduler) processPending(ctx context.Context) {
gpus := s.getGpuFn() gpus := s.getGpuFn()
// Update free memory from currently loaded models // Update free memory from currently loaded models
s.updateFreeSpace(gpus) s.updateFreeSpace(gpus)
gpus = pickBestFitGPUs(pending, gpus)
ggml, err := llm.LoadModel(pending.model.ModelPath)
if err != nil {
pending.errCh <- err
break
}
gpus = pickBestFitGPUs(pending, ggml, gpus)
if gpus != nil { if gpus != nil {
slog.Debug("new model fits with existing models, loading") slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, gpus) s.loadFn(pending, ggml, gpus)
break break
} }
runnerToExpire = s.findRunnerToUnload(pending) runnerToExpire = s.findRunnerToUnload(pending)
@ -282,8 +287,8 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
}() }()
} }
func (s *Scheduler) load(req *LlmRequest, gpus gpu.GpuInfoList) { func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
llama, err := s.newServerFn(gpus, req.model.ModelPath, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts) llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
if err != nil { if err != nil {
// some older models are not compatible with newer versions of llama.cpp // some older models are not compatible with newer versions of llama.cpp
// show a generalized compatibility error until there is a better way to // show a generalized compatibility error until there is a better way to
@ -454,7 +459,7 @@ func (a ByDuration) Less(i, j int) bool {
// pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
// If the model can not be fit fully within the available GPU(s) nil is returned // If the model can not be fit fully within the available GPU(s) nil is returned
func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList { func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
var estimatedVRAM uint64 var estimatedVRAM uint64
for _, gl := range gpus.ByLibrary() { for _, gl := range gpus.ByLibrary() {
var ok bool var ok bool
@ -466,7 +471,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
// First attempt to fit the model into a single GPU // First attempt to fit the model into a single GPU
for _, g := range sgl { for _, g := range sgl {
if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
return []gpu.GpuInfo{g} return []gpu.GpuInfo{g}
} }
@ -477,7 +482,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
// - try subsets of GPUs instead of just falling back to 1 or all in a family // - try subsets of GPUs instead of just falling back to 1 or all in a family
// Now try all the GPUs // Now try all the GPUs
if ok, estimatedVRAM = llm.PredictServerFit(gl, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM)) slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
return gl return gl
} }

View file

@ -59,7 +59,7 @@ func TestLoad(t *testing.T) {
return nil, fmt.Errorf("something failed to load model blah") return nil, fmt.Errorf("something failed to load model blah")
} }
gpus := gpu.GpuInfoList{} gpus := gpu.GpuInfoList{}
s.load(req, gpus) s.load(req, nil, gpus)
require.Len(t, req.successCh, 0) require.Len(t, req.successCh, 0)
require.Len(t, req.errCh, 1) require.Len(t, req.errCh, 1)
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
@ -70,7 +70,7 @@ func TestLoad(t *testing.T) {
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
return server, nil return server, nil
} }
s.load(req, gpus) s.load(req, nil, gpus)
select { select {
case err := <-req.errCh: case err := <-req.errCh:
require.NoError(t, err) require.NoError(t, err)
@ -82,7 +82,7 @@ func TestLoad(t *testing.T) {
req.model.ModelPath = "dummy_model_path" req.model.ModelPath = "dummy_model_path"
server.waitResp = fmt.Errorf("wait failure") server.waitResp = fmt.Errorf("wait failure")
s.load(req, gpus) s.load(req, nil, gpus)
select { select {
case err := <-req.errCh: case err := <-req.errCh:
require.Contains(t, err.Error(), "wait failure") require.Contains(t, err.Error(), "wait failure")
@ -101,6 +101,7 @@ type bundle struct {
ctxDone func() ctxDone func()
srv *mockLlm srv *mockLlm
req *LlmRequest req *LlmRequest
ggml *llm.GGML
} }
func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
@ -132,14 +133,15 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
}) })
assert.Nil(t, err) assert.Nil(t, err)
fname := f.Name() fname := f.Name()
model := &Model{Name: modelName, ModelPath: fname} model := &Model{Name: modelName, ModelPath: fname}
ggml, err := llm.LoadModel(model.ModelPath) scenario.ggml, err = llm.LoadModel(model.ModelPath)
require.NoError(t, err) require.NoError(t, err)
scenario.req = &LlmRequest{ scenario.req = &LlmRequest{
ctx: scenario.ctx, ctx: scenario.ctx,
model: model, model: model,
ggml: ggml,
sessionDuration: 5 * time.Millisecond, sessionDuration: 5 * time.Millisecond,
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
@ -157,13 +159,13 @@ func TestRequests(t *testing.T) {
scenario1a.req.sessionDuration = 0 scenario1a.req.sessionDuration = 0
scenario1b := newScenario(t, ctx, "ollama-model-1", 11) scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
scenario1b.req.model = scenario1a.req.model scenario1b.req.model = scenario1a.req.model
scenario1b.req.ggml = scenario1a.req.ggml scenario1b.ggml = scenario1a.ggml
scenario1b.req.sessionDuration = 0 scenario1b.req.sessionDuration = 0
// simple reload of same model // simple reload of same model
scenario2a := newScenario(t, ctx, "ollama-model-1", 20) scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
scenario2a.req.model = scenario1a.req.model scenario2a.req.model = scenario1a.req.model
scenario2a.req.ggml = scenario1a.req.ggml scenario2a.ggml = scenario1a.ggml
// Multiple loaded models // Multiple loaded models
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
@ -322,13 +324,14 @@ func TestGetRunner(t *testing.T) {
successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration) successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration)
require.Len(t, s.pendingReqCh, 0) require.Len(t, s.pendingReqCh, 0)
require.Len(t, successCh1c, 0) require.Len(t, successCh1c, 0)
require.Len(t, errCh1c, 0)
time.Sleep(5 * time.Millisecond)
require.Len(t, s.loaded, 0)
require.Len(t, errCh1c, 1) require.Len(t, errCh1c, 1)
err = <-errCh1c err = <-errCh1c
require.Contains(t, err.Error(), "bad path") require.Contains(t, err.Error(), "bad path")
scenario1b.ctxDone() scenario1b.ctxDone()
time.Sleep(5 * time.Millisecond)
require.Len(t, s.loaded, 0)
} }
// TODO - add one scenario that triggers the bogus finished event with positive ref count // TODO - add one scenario that triggers the bogus finished event with positive ref count
@ -366,7 +369,9 @@ func TestPrematureExpired(t *testing.T) {
require.LessOrEqual(t, len(s.finishedReqCh), 1) require.LessOrEqual(t, len(s.finishedReqCh), 1)
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
require.Len(t, s.finishedReqCh, 0) require.Len(t, s.finishedReqCh, 0)
s.loadedMu.Lock()
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
s.loadedMu.Unlock()
// also shouldn't happen in real life // also shouldn't happen in real life
s.finishedReqCh <- scenario1a.req s.finishedReqCh <- scenario1a.req
@ -426,7 +431,6 @@ func TestUpdateFreeSpace(t *testing.T) {
s.updateFreeSpace(gpus) s.updateFreeSpace(gpus)
require.Equal(t, uint64(850), gpus[0].FreeMemory) require.Equal(t, uint64(850), gpus[0].FreeMemory)
require.Equal(t, uint64(1850), gpus[1].FreeMemory) require.Equal(t, uint64(1850), gpus[1].FreeMemory)
} }
func TestFindRunnerToUnload(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) {