diff --git a/common/arg.cpp b/common/arg.cpp index 2b5e509d1d538..37b1c8eea44eb 100644 --- a/common/arg.cpp +++ b/common/arg.cpp @@ -17,8 +17,6 @@ #endif #define JSON_ASSERT GGML_ASSERT -#include - #include #include #include @@ -26,6 +24,8 @@ #include #include #include +#include +#include #include #include #include @@ -37,6 +37,11 @@ #if defined(LLAMA_USE_CURL) #include #include +#include +# ifndef _WIN32 +# include +# include +# endif #endif #ifdef __linux__ @@ -251,6 +256,7 @@ struct common_load_model_from_url_headers { std::string etag; std::string last_modified; std::string accept_ranges; + int64_t content_length = -1; }; struct FILE_deleter { @@ -263,6 +269,7 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void static std::regex etag_regex("ETag", std::regex_constants::icase); static std::regex last_modified_regex("Last-Modified", std::regex_constants::icase); static std::regex accept_ranges_regex("Accept-Ranges", std::regex_constants::icase); + static std::regex content_length_regex("Content-Length", std::regex_constants::icase); std::string header(buffer, n_items); std::smatch match; if (std::regex_match(header, match, header_regex)) { @@ -274,6 +281,12 @@ static size_t common_header_callback(char * buffer, size_t, size_t n_items, void headers->last_modified = value; } else if (std::regex_match(key, match, accept_ranges_regex)) { headers->accept_ranges = value; + } else if (std::regex_match(key, match, content_length_regex)) { + try { + headers->content_length = std::stoll(value); + } catch (const std::exception & e) { + LOG_WRN("%s: failed to parse Content-Length '%s': %s\n", __func__, value.c_str(), e.what()); + } } } @@ -367,6 +380,385 @@ static bool common_download_head(CURL * curl, return common_curl_perf(curl) == CURLE_OK; } +// Shared progress state for multi-connection downloads +struct common_multiconn_progress { + std::mutex progress_mutex; + int64_t total_content_length; + std::vector chunk_downloaded_bytes; + std::vector initial_downloaded_bytes; + std::chrono::steady_clock::time_point start_time; + bool progress_enabled; + bool printed; + + common_multiconn_progress(int64_t content_length, int num_chunks, bool enable_progress) : + total_content_length(content_length), + chunk_downloaded_bytes(num_chunks, 0), + initial_downloaded_bytes(num_chunks, 0), + start_time(std::chrono::steady_clock::now()), + progress_enabled(enable_progress), + printed(false) {} + + void update_chunk_progress(int chunk_idx, int64_t downloaded_bytes) { + if (!progress_enabled) { + return; + } + + std::lock_guard lock(progress_mutex); + chunk_downloaded_bytes[chunk_idx] = initial_downloaded_bytes[chunk_idx] + downloaded_bytes; + + // Calculate total downloaded across all chunks + int64_t total_downloaded = 0; + for (int64_t bytes : chunk_downloaded_bytes) { + total_downloaded += bytes; + } + + display_progress(total_downloaded); + } + + private: + + // Format size display + template + std::string format_size(T bytes) { + const char * units[] = { "B", "KB", "MB", "GB" }; + double size = bytes; + int unit_idx = 0; + while (size >= 1024.0 && unit_idx < 3) { + size /= 1024.0; + ++unit_idx; + } + + return string_format("%.1f%s", size, units[unit_idx]); + } + + // Format speed display + std::string format_speed(double bytes_per_sec) { + return string_format("%s/s", format_size(bytes_per_sec).c_str()); + } + + void display_progress(int64_t total_downloaded) { + if (total_content_length <= 0) { + return; + } + + // Calculate percentage + const int64_t percentage = (total_downloaded * 100) / total_content_length; + + // Calculate speed + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - start_time).count(); + double speed = 0.0; + if (elapsed > 0) { + speed = (total_downloaded * 1000.0) / elapsed; // bytes per second + } + + // Calculate ETA + double eta = 0.0; + if (speed > 0) { + eta = (total_content_length - total_downloaded) / speed; + } + + // Format progress display + std::string progress_prefix = string_format("%3lld%%", percentage); + std::string progress_suffix = + string_format("%s/%s %s ETA:%ds", format_size(total_downloaded).c_str(), + format_size(total_content_length).c_str(), format_speed(speed).c_str(), (int) eta); + + // Calculate terminal width and progress bar + int terminal_width = 80; // Default fallback +# ifdef _WIN32 + UINT originalOutputCP = GetConsoleOutputCP(); + SetConsoleOutputCP(CP_UTF8); +# else + struct winsize w; + if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &w) == 0) { + terminal_width = w.ws_col; + } +# endif + + int prefix_suffix_len = progress_prefix.length() + progress_suffix.length() + 4; // " | " + int progress_bar_width = std::max(10, terminal_width - prefix_suffix_len); + + // Generate progress bar + std::string progress_bar; + const long pos = (percentage * progress_bar_width) / 100; + for (int i = 0; i < progress_bar_width; ++i) { + progress_bar.append((i < pos) ? "█" : " "); + } + + // Print progress (clear line and print new progress) + fprintf(stderr, "\r%s %s| %s", progress_prefix.c_str(), progress_bar.c_str(), progress_suffix.c_str()); + fflush(stderr); + printed = true; + +# ifdef _WIN32 + SetConsoleOutputCP(originalOutputCP); +# endif + } +}; + +// Progress callback for individual chunks +static size_t common_multiconn_progress_callback(void * ptr, + curl_off_t, + curl_off_t now_downloaded, + curl_off_t, + curl_off_t) { + auto * progress_info = static_cast *>(ptr); + common_multiconn_progress * progress = progress_info->first; + int chunk_idx = progress_info->second; + + if (progress) { + progress->update_chunk_progress(chunk_idx, now_downloaded); + } + + return 0; +} + +// Structure to manage a single download chunk +struct common_download_chunk { + std::string url; + std::string bearer_token; + std::string path_temporary; + int64_t start_byte; + int64_t end_byte; + int64_t downloaded_bytes; + curl_ptr curl; + std::unique_ptr file; + bool completed; + std::pair progress_info; + + common_download_chunk(const std::string & url_, + const std::string & bearer_token_, + const std::string & path_, + int64_t start, + int64_t end, + common_multiconn_progress * progress, + int chunk_idx) : + url(url_), + bearer_token(bearer_token_), + path_temporary(path_), + start_byte(start), + end_byte(end), + downloaded_bytes(0), + curl(nullptr, &curl_easy_cleanup), + file(nullptr), + completed(false), + progress_info(progress, chunk_idx) {} +}; + +// Multi-connection download manager +static bool common_download_file_multiconn(const std::string & url, + const std::string & path, + const std::string & bearer_token, + int64_t content_length, + bool enable_progress = true, + int num_connections = 4) { + // Minimum chunk size (2MB) - don't use multi-connection for small files + const int64_t min_chunk_size = 2 * 1024 * 1024; + const int64_t min_file_size = min_chunk_size * 2; // Minimum file size to enable multi-connection + + if (content_length < min_file_size) { + LOG_DBG("%s: file too small (%lld bytes) for multi-connection download\n", __func__, content_length); + return false; + } + + // Adjust number of connections based on file size + const int64_t chunk_size = content_length / num_connections; + if (chunk_size < min_chunk_size) { + num_connections = static_cast(content_length / min_chunk_size); + if (num_connections < 2) { + LOG_DBG("%s: adjusted connection count results in less than 2 connections\n", __func__); + return false; + } + } + + LOG_INF("%s: starting multi-connection download with %d connections for %lld bytes (chunk size: %lld)\n", __func__, + num_connections, content_length, content_length / num_connections); + + // Create shared progress tracker + common_multiconn_progress progress_tracker(content_length, num_connections, enable_progress); + + std::vector chunks; + std::vector> futures; + + // Create chunk files and prepare for download + for (int i = 0; i < num_connections; ++i) { + int64_t start = static_cast(i) * (content_length / num_connections); + int64_t end; + if (i == num_connections - 1) { + end = content_length - 1; + } else { + end = start + (content_length / num_connections) - 1; + } + + std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i); + chunks.emplace_back(url, bearer_token, chunk_path, start, end, &progress_tracker, i); + + // Check if chunk file exists (resume support) + if (std::filesystem::exists(chunks.back().path_temporary)) { + chunks.back().downloaded_bytes = std::filesystem::file_size(chunks.back().path_temporary); + progress_tracker.initial_downloaded_bytes[i] = chunks.back().downloaded_bytes; + LOG_DBG("%s: resuming chunk %i from byte %lld\n", __func__, i, chunks.back().downloaded_bytes); + } + } + + // Download chunks in parallel + for (size_t i = 0; i < chunks.size(); ++i) { + futures.push_back(std::async( + std::launch::async, + [&](size_t chunk_idx) -> bool { + auto & chunk = chunks[chunk_idx]; + + // Initialize CURL for this chunk + chunk.curl.reset(curl_easy_init()); + if (!chunk.curl.get()) { + LOG_ERR("%s: failed to initialize CURL for chunk %zu\n", __func__, chunk_idx); + return false; + } + + // Open chunk file for writing + chunk.file.reset(fopen(chunk.path_temporary.c_str(), "ab")); + if (!chunk.file) { + LOG_ERR("%s: failed to open chunk file %s\n", __func__, chunk.path_temporary.c_str()); + return false; + } + + // Set up CURL options + curl_easy_setopt(chunk.curl.get(), CURLOPT_URL, chunk.url.c_str()); + curl_easy_setopt(chunk.curl.get(), CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(chunk.curl.get(), CURLOPT_WRITEDATA, chunk.file.get()); + curl_easy_setopt(chunk.curl.get(), CURLOPT_WRITEFUNCTION, common_write_callback); + + // Set up progress callback + if (enable_progress) { + curl_easy_setopt(chunk.curl.get(), CURLOPT_NOPROGRESS, 0L); + curl_easy_setopt(chunk.curl.get(), CURLOPT_XFERINFOFUNCTION, common_multiconn_progress_callback); + curl_easy_setopt(chunk.curl.get(), CURLOPT_XFERINFODATA, &chunk.progress_info); + } else { + curl_easy_setopt(chunk.curl.get(), CURLOPT_NOPROGRESS, 1L); + } + + // Set range for this chunk + int64_t actual_start = chunk.start_byte + chunk.downloaded_bytes; + if (actual_start <= chunk.end_byte) { + std::string range_str = std::to_string(actual_start) + "-" + std::to_string(chunk.end_byte); + curl_easy_setopt(chunk.curl.get(), CURLOPT_RANGE, range_str.c_str()); + + LOG_DBG("%s: downloading chunk %zu range %s\n", __func__, chunk_idx, range_str.c_str()); + } else { + // Chunk already completed + chunk.completed = true; + LOG_DBG("%s: chunk %zu already completed\n", __func__, chunk_idx); + return true; + } + + // Add authorization header if needed + curl_slist * headers = nullptr; + headers = curl_slist_append(headers, "User-Agent: llama-cpp"); + if (!chunk.bearer_token.empty()) { + std::string auth_header = "Authorization: Bearer " + chunk.bearer_token; + headers = curl_slist_append(headers, auth_header.c_str()); + } + curl_easy_setopt(chunk.curl.get(), CURLOPT_HTTPHEADER, headers); + +# ifdef _WIN32 + curl_easy_setopt(chunk.curl.get(), CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA); +# endif + + // Perform the download + CURLcode res = curl_easy_perform(chunk.curl.get()); + bool success = (res == CURLE_OK); + + if (success) { + long http_code = 0; + curl_easy_getinfo(chunk.curl.get(), CURLINFO_RESPONSE_CODE, &http_code); + if (http_code < 200 || http_code >= 400) { + LOG_ERR("%s: chunk %zu failed with HTTP code %ld\n", __func__, chunk_idx, http_code); + success = false; + } else { + LOG_DBG("%s: chunk %zu completed successfully (HTTP %ld)\n", __func__, chunk_idx, http_code); + } + } else { + LOG_ERR("%s: chunk %zu failed: %s\n", __func__, chunk_idx, curl_easy_strerror(res)); + } + + // Cleanup + if (headers) { + curl_slist_free_all(headers); + } + chunk.file.reset(); + + chunk.completed = success; + return success; + }, + i)); + } + + // Wait for all chunks to complete + bool all_success = true; + for (size_t i = 0; i < futures.size(); ++i) { + if (!futures[i].get()) { + LOG_ERR("%s: chunk %zu failed\n", __func__, i); + all_success = false; + } + } + + if (!all_success) { + LOG_ERR("%s: one or more chunks failed to download\n", __func__); + // Clean up any partial chunk files + for (const auto & chunk : chunks) { + if (std::filesystem::exists(chunk.path_temporary)) { + std::filesystem::remove(chunk.path_temporary); + } + } + return false; + } + + // Clear progress line if we were showing progress + if (enable_progress && progress_tracker.printed) { + fprintf(stderr, "\n"); + } + + // Combine chunks into final file + const std::string path_temporary = path + ".downloadInProgress"; + std::ofstream final_file(path_temporary, std::ios::binary); + if (!final_file) { + LOG_ERR("%s: failed to create final file %s\n", __func__, path_temporary.c_str()); + return false; + } + + LOG_INF("%s: combining %zu chunks into final file\n", __func__, chunks.size()); + for (size_t i = 0; i < chunks.size(); ++i) { + std::ifstream chunk_file(chunks[i].path_temporary, std::ios::binary); + if (!chunk_file) { + LOG_ERR("%s: failed to open chunk file %s for combining\n", __func__, chunks[i].path_temporary.c_str()); + final_file.close(); + std::filesystem::remove(path_temporary); + return false; + } + + // Copy chunk to final file + final_file << chunk_file.rdbuf(); + chunk_file.close(); + + // Verify chunk was written properly + if (final_file.fail()) { + LOG_ERR("%s: failed to write chunk %zu to final file\n", __func__, i); + final_file.close(); + std::filesystem::remove(path_temporary); + return false; + } + + // Remove chunk file after successful combination + std::filesystem::remove(chunks[i].path_temporary); + LOG_DBG("%s: combined and removed chunk %zu\n", __func__, i); + } + + final_file.close(); + LOG_INF("%s: multi-connection download completed successfully\n", __func__); + return true; +} + // download one single file from remote URL to local path static bool common_download_file_single_online(const std::string & url, const std::string & path, @@ -476,9 +868,10 @@ static bool common_download_file_single_online(const std::string & url, // Write the updated JSON metadata file. metadata.update({ - { "url", url }, - { "etag", headers.etag }, - { "lastModified", headers.last_modified } + { "url", url }, + { "etag", headers.etag }, + { "lastModified", headers.last_modified }, + { "contentLength", headers.content_length } }); write_file(metadata_path, metadata.dump(4)); LOG_DBG("%s: file metadata saved: %s\n", __func__, metadata_path.c_str()); @@ -487,7 +880,56 @@ static bool common_download_file_single_online(const std::string & url, LOG_INF("%s: trying to download model from %s to %s (server_etag:%s, server_last_modified:%s)...\n", __func__, llama_download_hide_password_in_url(url).c_str(), path_temporary.c_str(), headers.etag.c_str(), headers.last_modified.c_str()); - const bool was_pull_successful = common_pull_file(curl.get(), path_temporary); + + bool was_pull_successful = false; + + // Try multi-connection download if conditions are met + if (accept_ranges_supported && headers.content_length > 0) { + LOG_INF("%s: server supports range requests with content length %lld bytes\n", __func__, + headers.content_length); + + // Store chunk info in metadata for progress tracking + metadata["multiconn"] = { + { "content_length", headers.content_length }, + { "chunks_used", true }, + { "attempt_time", std::time(nullptr) } + }; + write_file(metadata_path, metadata.dump(4)); + + was_pull_successful = + common_download_file_multiconn(url, path, bearer_token, headers.content_length, true); + + if (!was_pull_successful) { + LOG_WRN("%s: multi-connection download failed, falling back to single connection\n", __func__); + // Remove failed chunk metadata + metadata.erase("multiconn"); + write_file(metadata_path, metadata.dump(4)); + + // Clean up any remaining chunk files + try { + for (int i = 0; i < 10; ++i) { // Check up to 10 possible chunks + std::string chunk_path = path + ".downloadInProgress.chunk" + std::to_string(i); + if (std::filesystem::exists(chunk_path)) { + std::filesystem::remove(chunk_path); + LOG_DBG("%s: cleaned up chunk file %s\n", __func__, chunk_path.c_str()); + } + } + } catch (const std::exception & e) { + LOG_WRN("%s: error cleaning up chunk files: %s\n", __func__, e.what()); + } + } + } else { + LOG_DBG( + "%s: multi-connection download not attempted: accept_ranges=%d, content_length=%lld, " + "from_scratch=%d\n", + __func__, accept_ranges_supported ? 1 : 0, headers.content_length, + should_download_from_scratch ? 1 : 0); + } + + // Fall back to single-connection download if multi-connection failed or wasn't attempted + if (!was_pull_successful) { + was_pull_successful = common_pull_file(curl.get(), path_temporary); + } if (!was_pull_successful) { if (i + 1 < max_attempts) { const int exponential_backoff_delay = std::pow(retry_delay_seconds, i) * 1000;