Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cpp/include/lemon/model_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class ModelManager {

// Cache of all models with their download status
mutable std::mutex models_cache_mutex_;
mutable std::mutex recipe_options_mutex_;
mutable std::map<std::string, ModelInfo> models_cache_;
mutable std::map<std::string, std::string> public_model_aliases_; // public name -> canonical name
mutable std::map<std::string, std::string> canonical_public_names_; // canonical name -> public name
Expand Down
1 change: 1 addition & 0 deletions src/cpp/include/lemon/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class Router {
void evict_if_committed(const std::string& model_name);
std::unique_ptr<WrappedServer> create_backend_server(const ModelInfo& model_info);
std::string resolve_model_name(const std::string& model_name) const;
void persist_pinned_state(const std::string& model_name, bool pinned);
ModelTelemetryIdentity get_telemetry_identity(WrappedServer* server) const;
void record_telemetry_for_model(const ModelTelemetryIdentity& identity,
int input_tokens,
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/include/lemon/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class Server {
void handle_shutdown(const httplib::Request& req, httplib::Response& res);
void handle_simulate_vram_pressure(const httplib::Request& req, httplib::Response& res);

void load_pinned_models_async();

// Backend management endpoint handlers
void handle_install(const httplib::Request& req, httplib::Response& res);
void handle_uninstall(const httplib::Request& req, httplib::Response& res);
Expand Down Expand Up @@ -249,6 +251,10 @@ class Server {
std::thread http_v4_thread_;
std::thread http_v6_thread_;
std::thread model_cache_warmup_thread_;
std::thread pinned_models_autoload_thread_;

std::mutex shutdown_mutex_;
std::condition_variable shutdown_cv_;


// Routed servers (all routes/handlers; never listen) and the main-port
Expand Down
21 changes: 15 additions & 6 deletions src/cpp/server/model_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,9 +1908,12 @@ void ModelManager::save_model_options(const ModelInfo& info) {
LOG(INFO, "ModelManager") << "Saving options for model: " << info.model_name << std::endl;
// Persist under canonical ID (built-ins are keyed bare in cache but
// recipe_options.json stores them as builtin.<name>).
recipe_options_[cache_key_to_canonical_id(info.model_name)] = info.recipe_options.to_json();
{
std::lock_guard<std::mutex> lock(recipe_options_mutex_);
recipe_options_[cache_key_to_canonical_id(info.model_name)] = info.recipe_options.to_json();
save_user_json(get_recipe_options_file(), recipe_options_);
}
update_model_options_in_cache(info);
save_user_json(get_recipe_options_file(), recipe_options_);
}

std::map<std::string, ModelInfo> ModelManager::get_supported_models() {
Expand Down Expand Up @@ -2237,9 +2240,12 @@ void ModelManager::build_cache() {
// Populate recipe options. recipe_options.json is keyed by canonical ID
// (user.*, extra.*, builtin.*) — built-ins are keyed bare in the cache, so
// we translate before lookup.
for (auto& [name, info] : all_models) {
json jro = json_recipe_options.count(name) ? json_recipe_options[name] : json(nullptr);
info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(name), recipe_options_);
{
std::lock_guard<std::mutex> lock(recipe_options_mutex_);
for (auto& [name, info] : all_models) {
json jro = json_recipe_options.count(name) ? json_recipe_options[name] : json(nullptr);
info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(name), recipe_options_);
}
}

// Step 2: Filter by backend availability
Expand Down Expand Up @@ -2329,7 +2335,10 @@ void ModelManager::add_model_to_cache(const std::string& model_name) {
parse_image_defaults(info, *model_json);
json jro = (model_json->contains("recipe_options") && (*model_json)["recipe_options"].is_object())
? (*model_json)["recipe_options"] : json(nullptr);
info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(model_name), recipe_options_);
{
std::lock_guard<std::mutex> lock(recipe_options_mutex_);
info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(model_name), recipe_options_);
}

info.suggested = JsonUtils::get_or_default<bool>(*model_json, "suggested", is_user_model);
info.hf_load = JsonUtils::get_or_default<bool>(*model_json, "hf_load", false);
Expand Down
34 changes: 34 additions & 0 deletions src/cpp/server/router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ void Router::evict_server(WrappedServer* server, int timeout_seconds) {
if (!server) return;

std::string model_name = server->get_model_name();
if (server->is_pinned()) {
LOG(WARNING, "Router") << "Evicting pinned model: " << model_name
<< " (due to NPU exclusivity or capacity limits)" << std::endl;
}
LOG(INFO, "Router") << "Evicting model: " << model_name << std::endl;

// Wait for any ongoing inference to complete. For watchdog-reset/dead
Expand Down Expand Up @@ -416,6 +420,9 @@ void Router::load_model(const std::string& model_name,
existing->update_access_time();
is_loading_ = false;
load_cv_.notify_all();
if (pinned.has_value()) {
persist_pinned_state(canonical_model_name, pinned.value());
}
return;
}
}
Expand Down Expand Up @@ -543,6 +550,10 @@ void Router::load_model(const std::string& model_name,

LOG(INFO, "Router") << "Model loaded successfully. Total loaded: "
<< loaded_servers_.size() << std::endl;

if (pinned.has_value()) {
persist_pinned_state(canonical_model_name, pinned.value());
}
} else {
// ERROR HANDLING (from spec: Error Handling section)
// Check if error is "file not found" (exception to nuclear policy)
Expand Down Expand Up @@ -590,6 +601,10 @@ void Router::load_model(const std::string& model_name,
load_cv_.notify_all();

LOG(DEBUG, "Router") << "Retry successful in " << retry_server->get_load_duration_ms() << "ms!" << std::endl;

if (pinned.has_value()) {
persist_pinned_state(canonical_model_name, pinned.value());
}
} catch (const std::exception& retry_error) {
lock.lock();
is_loading_ = false;
Expand Down Expand Up @@ -1475,6 +1490,25 @@ void Router::set_model_pinned(const std::string& model_name, bool pinned) {
throw std::runtime_error("Model not loaded: " + model_name);
}
server->set_pinned(pinned);
persist_pinned_state(model_name, pinned);
}

void Router::persist_pinned_state(const std::string& model_name, bool pinned) {
try {
ModelInfo info = model_manager_->get_model_info(model_name);
bool already_pinned = false;
json p_opt = info.recipe_options.get_option("pinned");
if (p_opt.is_boolean()) {
already_pinned = p_opt.get<bool>();
}
if (pinned == already_pinned) {
return; // Avoid redundant write
}
info.recipe_options.set_option("pinned", pinned);
model_manager_->save_model_options(info);
} catch (const std::exception& e) {
LOG(WARNING, "Router") << "Failed to persist pinned state for model " << model_name << ": " << e.what() << std::endl;
}
}

} // namespace lemon
67 changes: 66 additions & 1 deletion src/cpp/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,9 @@ void Server::run() {
<< "or hostname that resolves to RFC1918 IPv4." << std::endl;
}

// Load pinned models gracefully in the background on boot
load_pinned_models_async();

// Wait for listener threads, but check periodically for shutdown or rebind signals.
// The threads are blocked in listen_after_bind(), which only returns when
// the server is stopped or an error occurs.
Expand Down Expand Up @@ -1410,7 +1413,11 @@ bool Server::should_shutdown() const {
}

void Server::set_shutdown_requested(bool requested) {
shutdown_requested_.store(requested);
{
std::lock_guard<std::mutex> lock(shutdown_mutex_);
shutdown_requested_.store(requested);
}
shutdown_cv_.notify_all();
}

bool Server::is_running() const {
Expand All @@ -1423,6 +1430,17 @@ void Server::stop() {
udp_beacon_.stopBroadcasting();
stop_http_listeners();
running_ = false;

{
std::lock_guard<std::mutex> lock(shutdown_mutex_);
shutdown_requested_.store(true);
}
shutdown_cv_.notify_all();

if (pinned_models_autoload_thread_.joinable()) {
pinned_models_autoload_thread_.join();
}

shutdown_requested_ = false; // Reset for potential future use

// Stop WebSocket server
Expand Down Expand Up @@ -5304,4 +5322,51 @@ void Server::enrich_recipes(json& recipes) {
}
}

void Server::load_pinned_models_async() {
if (pinned_models_autoload_thread_.joinable()) {
return;
}
pinned_models_autoload_thread_ = std::thread([this]() {
try {
// Brief interruptible sleep to defer log interleaving, allowing startup logs and
// listener addresses to print fully before background model loading output begins.
{
std::unique_lock<std::mutex> lock(shutdown_mutex_);
if (shutdown_cv_.wait_for(lock, std::chrono::milliseconds(100), [this]() { return shutdown_requested_.load(); })) {
return;
}
}

LOG(INFO, "Server") << "Checking for pinned models to auto-load..." << std::endl;
auto models = model_manager_->get_supported_models();
for (const auto& [name, info] : models) {
if (shutdown_requested_.load()) {
break;
}

bool is_pinned = info.recipe_options.get_option("pinned").is_boolean() &&
info.recipe_options.get_option("pinned").get<bool>();

if (is_pinned) {
LOG(INFO, "Server") << "Auto-loading pinned model: " << name << std::endl;
try {
router_->load_model(name, info, info.recipe_options, true, false, true);
LOG(INFO, "Server") << "Successfully auto-loaded pinned model: " << name << std::endl;
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Failed to auto-load pinned model '" << name << "': " << e.what() << std::endl;
}
}
}
} catch (const std::exception& e) {
LOG(WARNING, "Server") << "Error during pinned models auto-load: " << e.what() << std::endl;
}

// KEEP THREAD ALIVE to prevent Linux PR_SET_PDEATHSIG from killing child processes.
// On Linux, the parent task is the specific thread that spawned the child.
// If this thread exits, the kernel sends SIGTERM to the spawned llama-servers.
std::unique_lock<std::mutex> lock(shutdown_mutex_);
shutdown_cv_.wait(lock, [this]() { return shutdown_requested_.load(); });
});
}

} // namespace lemon
Loading
Loading