From 17a9b193eb190d73f6fd9a9b1cf2f42cd22439ac Mon Sep 17 00:00:00 2001 From: Arun Babu Neelicattu Date: Fri, 19 Jun 2026 01:45:38 +0200 Subject: [PATCH] feat(server): persist model pin status and auto-load pinned models on boot --- src/cpp/include/lemon/model_manager.h | 1 + src/cpp/include/lemon/router.h | 1 + src/cpp/include/lemon/server.h | 6 + src/cpp/server/model_manager.cpp | 21 ++- src/cpp/server/router.cpp | 34 +++++ src/cpp/server/server.cpp | 67 +++++++++- test/server_pinning.py | 184 ++++++++++++++++++++++++++ 7 files changed, 307 insertions(+), 7 deletions(-) diff --git a/src/cpp/include/lemon/model_manager.h b/src/cpp/include/lemon/model_manager.h index cdcf844dc..8d32f6e05 100644 --- a/src/cpp/include/lemon/model_manager.h +++ b/src/cpp/include/lemon/model_manager.h @@ -314,6 +314,7 @@ class ModelManager { // Cache of all models with their download status mutable std::mutex models_cache_mutex_; + mutable std::mutex recipe_options_mutex_; mutable std::map models_cache_; mutable std::map public_model_aliases_; // public name -> canonical name mutable std::map canonical_public_names_; // canonical name -> public name diff --git a/src/cpp/include/lemon/router.h b/src/cpp/include/lemon/router.h index e98a8b11d..e3f641838 100644 --- a/src/cpp/include/lemon/router.h +++ b/src/cpp/include/lemon/router.h @@ -177,6 +177,7 @@ class Router { void evict_if_committed(const std::string& model_name); std::unique_ptr create_backend_server(const ModelInfo& model_info); std::string resolve_model_name(const std::string& model_name) const; + void persist_pinned_state(const std::string& model_name, bool pinned); ModelTelemetryIdentity get_telemetry_identity(WrappedServer* server) const; void record_telemetry_for_model(const ModelTelemetryIdentity& identity, int input_tokens, diff --git a/src/cpp/include/lemon/server.h b/src/cpp/include/lemon/server.h index 5ec2974be..55a251b54 100644 --- a/src/cpp/include/lemon/server.h +++ b/src/cpp/include/lemon/server.h @@ -128,6 +128,8 @@ class Server { void handle_shutdown(const httplib::Request& req, httplib::Response& res); void handle_simulate_vram_pressure(const httplib::Request& req, httplib::Response& res); + void load_pinned_models_async(); + // Backend management endpoint handlers void handle_install(const httplib::Request& req, httplib::Response& res); void handle_uninstall(const httplib::Request& req, httplib::Response& res); @@ -249,6 +251,10 @@ class Server { std::thread http_v4_thread_; std::thread http_v6_thread_; std::thread model_cache_warmup_thread_; + std::thread pinned_models_autoload_thread_; + + std::mutex shutdown_mutex_; + std::condition_variable shutdown_cv_; // Routed servers (all routes/handlers; never listen) and the main-port diff --git a/src/cpp/server/model_manager.cpp b/src/cpp/server/model_manager.cpp index 4a96620f6..0570dcbaa 100644 --- a/src/cpp/server/model_manager.cpp +++ b/src/cpp/server/model_manager.cpp @@ -1908,9 +1908,12 @@ void ModelManager::save_model_options(const ModelInfo& info) { LOG(INFO, "ModelManager") << "Saving options for model: " << info.model_name << std::endl; // Persist under canonical ID (built-ins are keyed bare in cache but // recipe_options.json stores them as builtin.). - recipe_options_[cache_key_to_canonical_id(info.model_name)] = info.recipe_options.to_json(); + { + std::lock_guard lock(recipe_options_mutex_); + recipe_options_[cache_key_to_canonical_id(info.model_name)] = info.recipe_options.to_json(); + save_user_json(get_recipe_options_file(), recipe_options_); + } update_model_options_in_cache(info); - save_user_json(get_recipe_options_file(), recipe_options_); } std::map ModelManager::get_supported_models() { @@ -2237,9 +2240,12 @@ void ModelManager::build_cache() { // Populate recipe options. recipe_options.json is keyed by canonical ID // (user.*, extra.*, builtin.*) — built-ins are keyed bare in the cache, so // we translate before lookup. - for (auto& [name, info] : all_models) { - json jro = json_recipe_options.count(name) ? json_recipe_options[name] : json(nullptr); - info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(name), recipe_options_); + { + std::lock_guard lock(recipe_options_mutex_); + for (auto& [name, info] : all_models) { + json jro = json_recipe_options.count(name) ? json_recipe_options[name] : json(nullptr); + info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(name), recipe_options_); + } } // Step 2: Filter by backend availability @@ -2329,7 +2335,10 @@ void ModelManager::add_model_to_cache(const std::string& model_name) { parse_image_defaults(info, *model_json); json jro = (model_json->contains("recipe_options") && (*model_json)["recipe_options"].is_object()) ? (*model_json)["recipe_options"] : json(nullptr); - info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(model_name), recipe_options_); + { + std::lock_guard lock(recipe_options_mutex_); + info.recipe_options = build_recipe_options(info, jro, cache_key_to_canonical_id(model_name), recipe_options_); + } info.suggested = JsonUtils::get_or_default(*model_json, "suggested", is_user_model); info.hf_load = JsonUtils::get_or_default(*model_json, "hf_load", false); diff --git a/src/cpp/server/router.cpp b/src/cpp/server/router.cpp index b3ec22c3b..26a47e391 100644 --- a/src/cpp/server/router.cpp +++ b/src/cpp/server/router.cpp @@ -239,6 +239,10 @@ void Router::evict_server(WrappedServer* server, int timeout_seconds) { if (!server) return; std::string model_name = server->get_model_name(); + if (server->is_pinned()) { + LOG(WARNING, "Router") << "Evicting pinned model: " << model_name + << " (due to NPU exclusivity or capacity limits)" << std::endl; + } LOG(INFO, "Router") << "Evicting model: " << model_name << std::endl; // Wait for any ongoing inference to complete. For watchdog-reset/dead @@ -416,6 +420,9 @@ void Router::load_model(const std::string& model_name, existing->update_access_time(); is_loading_ = false; load_cv_.notify_all(); + if (pinned.has_value()) { + persist_pinned_state(canonical_model_name, pinned.value()); + } return; } } @@ -543,6 +550,10 @@ void Router::load_model(const std::string& model_name, LOG(INFO, "Router") << "Model loaded successfully. Total loaded: " << loaded_servers_.size() << std::endl; + + if (pinned.has_value()) { + persist_pinned_state(canonical_model_name, pinned.value()); + } } else { // ERROR HANDLING (from spec: Error Handling section) // Check if error is "file not found" (exception to nuclear policy) @@ -590,6 +601,10 @@ void Router::load_model(const std::string& model_name, load_cv_.notify_all(); LOG(DEBUG, "Router") << "Retry successful in " << retry_server->get_load_duration_ms() << "ms!" << std::endl; + + if (pinned.has_value()) { + persist_pinned_state(canonical_model_name, pinned.value()); + } } catch (const std::exception& retry_error) { lock.lock(); is_loading_ = false; @@ -1475,6 +1490,25 @@ void Router::set_model_pinned(const std::string& model_name, bool pinned) { throw std::runtime_error("Model not loaded: " + model_name); } server->set_pinned(pinned); + persist_pinned_state(model_name, pinned); +} + +void Router::persist_pinned_state(const std::string& model_name, bool pinned) { + try { + ModelInfo info = model_manager_->get_model_info(model_name); + bool already_pinned = false; + json p_opt = info.recipe_options.get_option("pinned"); + if (p_opt.is_boolean()) { + already_pinned = p_opt.get(); + } + if (pinned == already_pinned) { + return; // Avoid redundant write + } + info.recipe_options.set_option("pinned", pinned); + model_manager_->save_model_options(info); + } catch (const std::exception& e) { + LOG(WARNING, "Router") << "Failed to persist pinned state for model " << model_name << ": " << e.what() << std::endl; + } } } // namespace lemon diff --git a/src/cpp/server/server.cpp b/src/cpp/server/server.cpp index a4b33f307..de44cb49c 100644 --- a/src/cpp/server/server.cpp +++ b/src/cpp/server/server.cpp @@ -1336,6 +1336,9 @@ void Server::run() { << "or hostname that resolves to RFC1918 IPv4." << std::endl; } + // Load pinned models gracefully in the background on boot + load_pinned_models_async(); + // Wait for listener threads, but check periodically for shutdown or rebind signals. // The threads are blocked in listen_after_bind(), which only returns when // the server is stopped or an error occurs. @@ -1410,7 +1413,11 @@ bool Server::should_shutdown() const { } void Server::set_shutdown_requested(bool requested) { - shutdown_requested_.store(requested); + { + std::lock_guard lock(shutdown_mutex_); + shutdown_requested_.store(requested); + } + shutdown_cv_.notify_all(); } bool Server::is_running() const { @@ -1423,6 +1430,17 @@ void Server::stop() { udp_beacon_.stopBroadcasting(); stop_http_listeners(); running_ = false; + + { + std::lock_guard lock(shutdown_mutex_); + shutdown_requested_.store(true); + } + shutdown_cv_.notify_all(); + + if (pinned_models_autoload_thread_.joinable()) { + pinned_models_autoload_thread_.join(); + } + shutdown_requested_ = false; // Reset for potential future use // Stop WebSocket server @@ -5304,4 +5322,51 @@ void Server::enrich_recipes(json& recipes) { } } +void Server::load_pinned_models_async() { + if (pinned_models_autoload_thread_.joinable()) { + return; + } + pinned_models_autoload_thread_ = std::thread([this]() { + try { + // Brief interruptible sleep to defer log interleaving, allowing startup logs and + // listener addresses to print fully before background model loading output begins. + { + std::unique_lock lock(shutdown_mutex_); + if (shutdown_cv_.wait_for(lock, std::chrono::milliseconds(100), [this]() { return shutdown_requested_.load(); })) { + return; + } + } + + LOG(INFO, "Server") << "Checking for pinned models to auto-load..." << std::endl; + auto models = model_manager_->get_supported_models(); + for (const auto& [name, info] : models) { + if (shutdown_requested_.load()) { + break; + } + + bool is_pinned = info.recipe_options.get_option("pinned").is_boolean() && + info.recipe_options.get_option("pinned").get(); + + if (is_pinned) { + LOG(INFO, "Server") << "Auto-loading pinned model: " << name << std::endl; + try { + router_->load_model(name, info, info.recipe_options, true, false, true); + LOG(INFO, "Server") << "Successfully auto-loaded pinned model: " << name << std::endl; + } catch (const std::exception& e) { + LOG(ERROR, "Server") << "Failed to auto-load pinned model '" << name << "': " << e.what() << std::endl; + } + } + } + } catch (const std::exception& e) { + LOG(WARNING, "Server") << "Error during pinned models auto-load: " << e.what() << std::endl; + } + + // KEEP THREAD ALIVE to prevent Linux PR_SET_PDEATHSIG from killing child processes. + // On Linux, the parent task is the specific thread that spawned the child. + // If this thread exits, the kernel sends SIGTERM to the spawned llama-servers. + std::unique_lock lock(shutdown_mutex_); + shutdown_cv_.wait(lock, [this]() { return shutdown_requested_.load(); }); + }); +} + } // namespace lemon diff --git a/test/server_pinning.py b/test/server_pinning.py index dda8656e0..f0414a3c8 100644 --- a/test/server_pinning.py +++ b/test/server_pinning.py @@ -13,6 +13,7 @@ ENDPOINT_TEST_MODEL, TIMEOUT_MODEL_OPERATION, TIMEOUT_DEFAULT, + get_default_lemond_binary, ) EVICTION_POLL_INTERVAL = 0.5 @@ -29,6 +30,14 @@ def _headers(): return headers +def get_free_port(): + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 0)) + return s.getsockname()[1] + + class PinningTests(ServerTestBase): """Integration tests for model pinning capabilities.""" @@ -387,6 +396,181 @@ def test_pinning_idempotency(self): "Model should be unpinned by explicit load with pinned=False", ) + def test_pinned_model_persists_and_autoloads_on_restart(self): + """A pinned model's pinned status persists in recipe_options.json and is autoloaded on boot.""" + import tempfile + import shutil + import subprocess + import json + + # Create a temp directory for cache/config + tmp_dir = tempfile.mkdtemp() + port = get_free_port() + lemond_bin = get_default_lemond_binary() + + server_proc = None + log1 = open(os.path.join(tmp_dir, "lemond1.log"), "w") + log2 = open(os.path.join(tmp_dir, "lemond2.log"), "w") + try: + # 1. Start lemond on custom port with temp directory + env = os.environ.copy() + # Link to the existing models directory so we don't have to re-download the model + real_config = requests.get( + f"{self.base_url.replace('/api/v1', '')}/internal/config", + headers=_headers(), + ).json() + models_dir = real_config.get("models_dir", "") + + # Write a custom config.json to the temp dir to use the same models directory + config_json_path = os.path.join(tmp_dir, "config.json") + with open(config_json_path, "w") as f: + json.dump({"models_dir": models_dir}, f) + + # Start the server + server_proc = subprocess.Popen( + [lemond_bin, tmp_dir, "--port", str(port)], + stdout=log1, + stderr=log1, + text=True, + env=env, + ) + + # Wait for server to be ready + url = f"http://localhost:{port}/api/v1" + for _ in range(30): + try: + requests.get(f"{url}/models", timeout=1) + break + except Exception: + time.sleep(1) + if server_proc.poll() is not None: + self.fail("Server process died early during start") + else: + self.fail("Server timed out starting up") + + # 2. Pin model via /internal/pin + # First, load the model unpinned + response = requests.post( + f"{url}/load", + json={"model_name": ENDPOINT_TEST_MODEL, "pinned": False}, + timeout=TIMEOUT_MODEL_OPERATION, + ) + self.assertEqual(response.status_code, 200) + + # Pin it via the /pin endpoint to test dynamic pin persistence + response = requests.post( + f"http://localhost:{port}/internal/pin", + json={"model_name": ENDPOINT_TEST_MODEL, "pinned": True}, + headers=_headers(), + timeout=TIMEOUT_DEFAULT, + ) + self.assertEqual(response.status_code, 200) + + # Verify the model is loaded and is pinned in registry/options + models_response = requests.get( + f"{url}/models", timeout=TIMEOUT_DEFAULT + ).json() + model_info = None + for m in models_response.get("data", []): + if m.get("id") == ENDPOINT_TEST_MODEL: + model_info = m + break + self.assertIsNotNone(model_info) + self.assertTrue(model_info.get("recipe_options", {}).get("pinned")) + + # 3. Kill server and its children to prevent orphaned backends + import psutil + + try: + parent = psutil.Process(server_proc.pid) + children = parent.children(recursive=True) + except psutil.NoSuchProcess: + children = [] + + server_proc.terminate() + try: + server_proc.wait(timeout=3) + except subprocess.TimeoutExpired: + server_proc.kill() + server_proc.wait() + server_proc = None + + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + pass + + # 4. Restart server + server_proc = subprocess.Popen( + [lemond_bin, tmp_dir, "--port", str(port)], + stdout=log2, + stderr=log2, + text=True, + env=env, + ) + + # Wait for server to be ready + for _ in range(30): + try: + requests.get(f"{url}/models", timeout=1) + break + except Exception: + time.sleep(1) + else: + self.fail("Server timed out starting up after restart") + + # 5. Check if the model is automatically loaded in the background + loaded = False + for _ in range(30): + health = requests.get(f"{url}/health", timeout=TIMEOUT_DEFAULT).json() + for m in health.get("all_models_loaded", []): + if m.get("model_name") == ENDPOINT_TEST_MODEL and m.get( + "loaded", True + ): + loaded = True + break + if loaded: + break + time.sleep(1) + + if not loaded: + # Read and print log files for debugging + log1.close() + log2.close() + with open(os.path.join(tmp_dir, "lemond1.log"), "r") as f: + print(f"\n--- lemond1.log ---\n{f.read()}") + with open(os.path.join(tmp_dir, "lemond2.log"), "r") as f: + print(f"\n--- lemond2.log ---\n{f.read()}") + + self.assertTrue(loaded, "Pinned model did not autoload on server restart") + + except Exception as e: + # Read and print log files for debugging + log1.close() + log2.close() + try: + with open(os.path.join(tmp_dir, "lemond1.log"), "r") as f: + print(f"\n--- lemond1.log ---\n{f.read()}") + except Exception: + pass + try: + with open(os.path.join(tmp_dir, "lemond2.log"), "r") as f: + print(f"\n--- lemond2.log ---\n{f.read()}") + except Exception: + pass + raise e + finally: + log1.close() + log2.close() + if server_proc: + server_proc.terminate() + try: + server_proc.wait(timeout=3) + except Exception: + server_proc.kill() + shutil.rmtree(tmp_dir) + if __name__ == "__main__": run_server_tests(