Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions pkg/inference/scheduling/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ const (
// being it is almost certainly greater than the number of models that most
// developers' systems will be able to load.
maximumRunnerSlots = 8
// runnerIdleTimeout is the maximum amount of time that a runner can sit
// idle (i.e. without any requests) before being terminated.
runnerIdleTimeout = 5 * time.Minute
// defaultRunnerIdleTimeout is the default maximum amount of time that a
// runner can sit idle (i.e. without any requests) before being terminated.
defaultRunnerIdleTimeout = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -58,6 +58,8 @@ type loader struct {
backends map[string]inference.Backend
// modelManager is the shared model manager.
modelManager *models.Manager
// runnerIdleTimeout is the loader-specific default runner idle timeout.
runnerIdleTimeout time.Duration
// totalMemory is the total system memory allocated to the loader.
totalMemory uint64
// idleCheck is used to signal the run loop when timestamps have updated.
Expand Down Expand Up @@ -103,6 +105,19 @@ func newLoader(
// tune this heuristic for systems with enormous amounts of VRAM.
nSlots := min(runtime.NumCPU(), maximumRunnerSlots)

// Check if we have a special environment.
isGPUEnabledCloudEnvironment := environment.Get() == environment.EnvironmentCloud &&
os.Getenv("NVIDIA_VISIBLE_DEVICES") != ""

// Compute the idle runner timeout.
//
// HACK: On GPU-enabled cloud engines, we'll bump this to 8 hours. We can
// remove this once we have configurable TTLs.
runnerIdleTimeout := defaultRunnerIdleTimeout
if isGPUEnabledCloudEnvironment {
runnerIdleTimeout = 8 * time.Hour
}

// Compute the amount of available memory.
//
// TODO: For now, we treat the system as having memory size 1 and all models
Expand All @@ -113,28 +128,30 @@ func newLoader(
// computing model size through estimation (using parameter count and
// quantization data type size).
//
// HACK: On GPU-enabled cloud engines, we'll temporarily bump this to 2.
// HACK: On GPU-enabled cloud engines, we'll bump this to 2. We can remove
// this once we have VRAM estimation.
totalMemory := uint64(1)
if environment.Get() == environment.EnvironmentCloud && os.Getenv("NVIDIA_VISIBLE_DEVICES") != "" {
if isGPUEnabledCloudEnvironment {
totalMemory = 2
}

// Create the loader.
l := &loader{
log: log,
backends: backends,
modelManager: modelManager,
totalMemory: totalMemory,
idleCheck: make(chan struct{}, 1),
guard: make(chan struct{}, 1),
availableMemory: totalMemory,
waiters: make(map[chan<- struct{}]bool),
runners: make(map[runnerKey]int, nSlots),
slots: make([]*runner, nSlots),
references: make([]uint, nSlots),
allocations: make([]uint64, nSlots),
timestamps: make([]time.Time, nSlots),
runnerConfigs: make(map[runnerKey]inference.BackendConfiguration),
log: log,
backends: backends,
modelManager: modelManager,
runnerIdleTimeout: runnerIdleTimeout,
totalMemory: totalMemory,
idleCheck: make(chan struct{}, 1),
guard: make(chan struct{}, 1),
availableMemory: totalMemory,
waiters: make(map[chan<- struct{}]bool),
runners: make(map[runnerKey]int, nSlots),
slots: make([]*runner, nSlots),
references: make([]uint, nSlots),
allocations: make([]uint64, nSlots),
timestamps: make([]time.Time, nSlots),
runnerConfigs: make(map[runnerKey]inference.BackendConfiguration),
}
l.guard <- struct{}{}
return l
Expand Down Expand Up @@ -175,7 +192,7 @@ func (l *loader) evict(idleOnly bool) int {
now := time.Now()
for r, slot := range l.runners {
unused := l.references[slot] == 0
idle := unused && now.Sub(l.timestamps[slot]) > runnerIdleTimeout
idle := unused && now.Sub(l.timestamps[slot]) > l.runnerIdleTimeout
defunct := false
select {
case <-l.slots[slot].done:
Expand Down Expand Up @@ -282,7 +299,7 @@ func (l *loader) idleCheckDuration() time.Duration {
// Compute the remaining duration. If negative, check immediately, otherwise
// wait until 100 milliseconds after expiration time (to avoid checking
// right on the expiration boundary).
if remaining := runnerIdleTimeout - time.Since(oldest); remaining < 0 {
if remaining := l.runnerIdleTimeout - time.Since(oldest); remaining < 0 {
return 0
} else {
return remaining + 100*time.Millisecond
Expand Down