Merge pull request #3895 from brycereitano/shiftloading
Move ggml loading to when attempting to fit
This commit is contained in:
commit
f503a848c2
2 changed files with 45 additions and 36 deletions
|
@ -23,7 +23,6 @@ import (
|
||||||
type LlmRequest struct {
|
type LlmRequest struct {
|
||||||
ctx context.Context //nolint:containedctx
|
ctx context.Context //nolint:containedctx
|
||||||
model *Model
|
model *Model
|
||||||
ggml *llm.GGML // TODO - how large is this, and do we need to free it after we've finished loading?
|
|
||||||
opts api.Options
|
opts api.Options
|
||||||
sessionDuration time.Duration
|
sessionDuration time.Duration
|
||||||
successCh chan *runnerRef
|
successCh chan *runnerRef
|
||||||
|
@ -39,7 +38,7 @@ type Scheduler struct {
|
||||||
loaded map[string]*runnerRef
|
loaded map[string]*runnerRef
|
||||||
loadedMu sync.Mutex
|
loadedMu sync.Mutex
|
||||||
|
|
||||||
loadFn func(req *LlmRequest, gpus gpu.GpuInfoList)
|
loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
|
||||||
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
|
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
|
||||||
getGpuFn func() gpu.GpuInfoList
|
getGpuFn func() gpu.GpuInfoList
|
||||||
}
|
}
|
||||||
|
@ -74,20 +73,14 @@ func InitScheduler(ctx context.Context) *Scheduler {
|
||||||
|
|
||||||
// context must be canceled to decrement ref count and release the runner
|
// context must be canceled to decrement ref count and release the runner
|
||||||
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
|
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
|
||||||
ggml, err := llm.LoadModel(model.ModelPath)
|
|
||||||
req := &LlmRequest{
|
req := &LlmRequest{
|
||||||
ctx: c,
|
ctx: c,
|
||||||
model: model,
|
model: model,
|
||||||
ggml: ggml,
|
|
||||||
opts: opts,
|
opts: opts,
|
||||||
sessionDuration: sessionDuration,
|
sessionDuration: sessionDuration,
|
||||||
successCh: make(chan *runnerRef),
|
successCh: make(chan *runnerRef),
|
||||||
errCh: make(chan error, 1),
|
errCh: make(chan error, 1),
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
req.errCh <- err
|
|
||||||
return req.successCh, req.errCh
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case s.pendingReqCh <- req:
|
case s.pendingReqCh <- req:
|
||||||
default:
|
default:
|
||||||
|
@ -130,28 +123,39 @@ func (s *Scheduler) processPending(ctx context.Context) {
|
||||||
pending.useLoadedRunner(runner, s.finishedReqCh)
|
pending.useLoadedRunner(runner, s.finishedReqCh)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
} else if loadedCount == 0 {
|
|
||||||
slog.Debug("loading first model", "model", pending.model.ModelPath)
|
|
||||||
gpus := s.getGpuFn()
|
|
||||||
g := pickBestFitGPUs(pending, gpus)
|
|
||||||
if g != nil {
|
|
||||||
gpus = g
|
|
||||||
}
|
|
||||||
s.loadFn(pending, gpus)
|
|
||||||
break
|
|
||||||
} else if loadedMax > 0 && loadedCount >= loadedMax {
|
} else if loadedMax > 0 && loadedCount >= loadedMax {
|
||||||
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
|
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
|
||||||
runnerToExpire = s.findRunnerToUnload(pending)
|
runnerToExpire = s.findRunnerToUnload(pending)
|
||||||
} else {
|
} else {
|
||||||
// More than one loaded model, so we have to see if the new one fits
|
// Either no models are loaded or below loadedMax
|
||||||
// Get a refreshed GPU list
|
// Get a refreshed GPU list
|
||||||
gpus := s.getGpuFn()
|
gpus := s.getGpuFn()
|
||||||
|
|
||||||
|
// Load model for fitting
|
||||||
|
ggml, err := llm.LoadModel(pending.model.ModelPath)
|
||||||
|
if err != nil {
|
||||||
|
pending.errCh <- err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// No models loaded. Load the model but prefer the best fit.
|
||||||
|
if loadedCount == 0 {
|
||||||
|
slog.Debug("loading first model", "model", pending.model.ModelPath)
|
||||||
|
g := pickBestFitGPUs(pending, ggml, gpus)
|
||||||
|
if g != nil {
|
||||||
|
gpus = g
|
||||||
|
}
|
||||||
|
s.loadFn(pending, ggml, gpus)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// More than one loaded model, so we have to see if the new one fits
|
||||||
// Update free memory from currently loaded models
|
// Update free memory from currently loaded models
|
||||||
s.updateFreeSpace(gpus)
|
s.updateFreeSpace(gpus)
|
||||||
gpus = pickBestFitGPUs(pending, gpus)
|
gpus = pickBestFitGPUs(pending, ggml, gpus)
|
||||||
if gpus != nil {
|
if gpus != nil {
|
||||||
slog.Debug("new model fits with existing models, loading")
|
slog.Debug("new model fits with existing models, loading")
|
||||||
s.loadFn(pending, gpus)
|
s.loadFn(pending, ggml, gpus)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
runnerToExpire = s.findRunnerToUnload(pending)
|
runnerToExpire = s.findRunnerToUnload(pending)
|
||||||
|
@ -282,8 +286,8 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) load(req *LlmRequest, gpus gpu.GpuInfoList) {
|
func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
|
||||||
llama, err := s.newServerFn(gpus, req.model.ModelPath, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
|
llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// some older models are not compatible with newer versions of llama.cpp
|
// some older models are not compatible with newer versions of llama.cpp
|
||||||
// show a generalized compatibility error until there is a better way to
|
// show a generalized compatibility error until there is a better way to
|
||||||
|
@ -454,7 +458,7 @@ func (a ByDuration) Less(i, j int) bool {
|
||||||
|
|
||||||
// pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
|
// pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
|
||||||
// If the model can not be fit fully within the available GPU(s) nil is returned
|
// If the model can not be fit fully within the available GPU(s) nil is returned
|
||||||
func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
|
func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
|
||||||
var estimatedVRAM uint64
|
var estimatedVRAM uint64
|
||||||
for _, gl := range gpus.ByLibrary() {
|
for _, gl := range gpus.ByLibrary() {
|
||||||
var ok bool
|
var ok bool
|
||||||
|
@ -466,7 +470,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
|
||||||
|
|
||||||
// First attempt to fit the model into a single GPU
|
// First attempt to fit the model into a single GPU
|
||||||
for _, g := range sgl {
|
for _, g := range sgl {
|
||||||
if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
|
if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
|
||||||
slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
|
slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
|
||||||
return []gpu.GpuInfo{g}
|
return []gpu.GpuInfo{g}
|
||||||
}
|
}
|
||||||
|
@ -477,7 +481,7 @@ func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
|
||||||
// - try subsets of GPUs instead of just falling back to 1 or all in a family
|
// - try subsets of GPUs instead of just falling back to 1 or all in a family
|
||||||
|
|
||||||
// Now try all the GPUs
|
// Now try all the GPUs
|
||||||
if ok, estimatedVRAM = llm.PredictServerFit(gl, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
|
if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
|
||||||
slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
|
slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
|
||||||
return gl
|
return gl
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ func TestLoad(t *testing.T) {
|
||||||
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
|
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
|
||||||
defer done()
|
defer done()
|
||||||
s := InitScheduler(ctx)
|
s := InitScheduler(ctx)
|
||||||
|
var ggml *llm.GGML // value not used in tests
|
||||||
req := &LlmRequest{
|
req := &LlmRequest{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
model: &Model{ModelPath: "foo"},
|
model: &Model{ModelPath: "foo"},
|
||||||
|
@ -59,7 +60,7 @@ func TestLoad(t *testing.T) {
|
||||||
return nil, fmt.Errorf("something failed to load model blah")
|
return nil, fmt.Errorf("something failed to load model blah")
|
||||||
}
|
}
|
||||||
gpus := gpu.GpuInfoList{}
|
gpus := gpu.GpuInfoList{}
|
||||||
s.load(req, gpus)
|
s.load(req, ggml, gpus)
|
||||||
require.Len(t, req.successCh, 0)
|
require.Len(t, req.successCh, 0)
|
||||||
require.Len(t, req.errCh, 1)
|
require.Len(t, req.errCh, 1)
|
||||||
require.Len(t, s.loaded, 0)
|
require.Len(t, s.loaded, 0)
|
||||||
|
@ -70,7 +71,7 @@ func TestLoad(t *testing.T) {
|
||||||
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
s.load(req, gpus)
|
s.load(req, ggml, gpus)
|
||||||
select {
|
select {
|
||||||
case err := <-req.errCh:
|
case err := <-req.errCh:
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -82,7 +83,7 @@ func TestLoad(t *testing.T) {
|
||||||
|
|
||||||
req.model.ModelPath = "dummy_model_path"
|
req.model.ModelPath = "dummy_model_path"
|
||||||
server.waitResp = fmt.Errorf("wait failure")
|
server.waitResp = fmt.Errorf("wait failure")
|
||||||
s.load(req, gpus)
|
s.load(req, ggml, gpus)
|
||||||
select {
|
select {
|
||||||
case err := <-req.errCh:
|
case err := <-req.errCh:
|
||||||
require.Contains(t, err.Error(), "wait failure")
|
require.Contains(t, err.Error(), "wait failure")
|
||||||
|
@ -101,6 +102,7 @@ type bundle struct {
|
||||||
ctxDone func()
|
ctxDone func()
|
||||||
srv *mockLlm
|
srv *mockLlm
|
||||||
req *LlmRequest
|
req *LlmRequest
|
||||||
|
ggml *llm.GGML
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
||||||
|
@ -132,14 +134,15 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
|
||||||
{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
|
{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
|
||||||
})
|
})
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
fname := f.Name()
|
fname := f.Name()
|
||||||
model := &Model{Name: modelName, ModelPath: fname}
|
model := &Model{Name: modelName, ModelPath: fname}
|
||||||
ggml, err := llm.LoadModel(model.ModelPath)
|
scenario.ggml, err = llm.LoadModel(model.ModelPath)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
scenario.req = &LlmRequest{
|
scenario.req = &LlmRequest{
|
||||||
ctx: scenario.ctx,
|
ctx: scenario.ctx,
|
||||||
model: model,
|
model: model,
|
||||||
ggml: ggml,
|
|
||||||
sessionDuration: 5 * time.Millisecond,
|
sessionDuration: 5 * time.Millisecond,
|
||||||
successCh: make(chan *runnerRef, 1),
|
successCh: make(chan *runnerRef, 1),
|
||||||
errCh: make(chan error, 1),
|
errCh: make(chan error, 1),
|
||||||
|
@ -157,13 +160,13 @@ func TestRequests(t *testing.T) {
|
||||||
scenario1a.req.sessionDuration = 0
|
scenario1a.req.sessionDuration = 0
|
||||||
scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
|
scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
|
||||||
scenario1b.req.model = scenario1a.req.model
|
scenario1b.req.model = scenario1a.req.model
|
||||||
scenario1b.req.ggml = scenario1a.req.ggml
|
scenario1b.ggml = scenario1a.ggml
|
||||||
scenario1b.req.sessionDuration = 0
|
scenario1b.req.sessionDuration = 0
|
||||||
|
|
||||||
// simple reload of same model
|
// simple reload of same model
|
||||||
scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
|
scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
|
||||||
scenario2a.req.model = scenario1a.req.model
|
scenario2a.req.model = scenario1a.req.model
|
||||||
scenario2a.req.ggml = scenario1a.req.ggml
|
scenario2a.ggml = scenario1a.ggml
|
||||||
|
|
||||||
// Multiple loaded models
|
// Multiple loaded models
|
||||||
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
|
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
|
||||||
|
@ -322,13 +325,14 @@ func TestGetRunner(t *testing.T) {
|
||||||
successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration)
|
successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration)
|
||||||
require.Len(t, s.pendingReqCh, 0)
|
require.Len(t, s.pendingReqCh, 0)
|
||||||
require.Len(t, successCh1c, 0)
|
require.Len(t, successCh1c, 0)
|
||||||
|
require.Len(t, errCh1c, 0)
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
require.Len(t, s.loaded, 0)
|
||||||
require.Len(t, errCh1c, 1)
|
require.Len(t, errCh1c, 1)
|
||||||
err = <-errCh1c
|
err = <-errCh1c
|
||||||
require.Contains(t, err.Error(), "bad path")
|
require.Contains(t, err.Error(), "bad path")
|
||||||
scenario1b.ctxDone()
|
scenario1b.ctxDone()
|
||||||
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
require.Len(t, s.loaded, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - add one scenario that triggers the bogus finished event with positive ref count
|
// TODO - add one scenario that triggers the bogus finished event with positive ref count
|
||||||
|
@ -366,7 +370,9 @@ func TestPrematureExpired(t *testing.T) {
|
||||||
require.LessOrEqual(t, len(s.finishedReqCh), 1)
|
require.LessOrEqual(t, len(s.finishedReqCh), 1)
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
require.Len(t, s.finishedReqCh, 0)
|
require.Len(t, s.finishedReqCh, 0)
|
||||||
|
s.loadedMu.Lock()
|
||||||
require.Len(t, s.loaded, 0)
|
require.Len(t, s.loaded, 0)
|
||||||
|
s.loadedMu.Unlock()
|
||||||
|
|
||||||
// also shouldn't happen in real life
|
// also shouldn't happen in real life
|
||||||
s.finishedReqCh <- scenario1a.req
|
s.finishedReqCh <- scenario1a.req
|
||||||
|
@ -426,7 +432,6 @@ func TestUpdateFreeSpace(t *testing.T) {
|
||||||
s.updateFreeSpace(gpus)
|
s.updateFreeSpace(gpus)
|
||||||
require.Equal(t, uint64(850), gpus[0].FreeMemory)
|
require.Equal(t, uint64(850), gpus[0].FreeMemory)
|
||||||
require.Equal(t, uint64(1850), gpus[1].FreeMemory)
|
require.Equal(t, uint64(1850), gpus[1].FreeMemory)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFindRunnerToUnload(t *testing.T) {
|
func TestFindRunnerToUnload(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue