From 05320e49448d88550c81ce0ba62f1664e8e51118 Mon Sep 17 00:00:00 2001 From: Billy O'Neal Date: Wed, 14 Feb 2024 17:07:19 -0800 Subject: [PATCH] Try to fix macOS hang when building Python 2 (#1343) --- include/vcpkg/base/files.h | 8 ++ include/vcpkg/base/parallel-algorithms.h | 175 +++++++++++++++++------ src/vcpkg/base/files.cpp | 119 +++++++++++++-- src/vcpkg/postbuildlint.cpp | 36 ++--- 4 files changed, 262 insertions(+), 76 deletions(-) diff --git a/include/vcpkg/base/files.h b/include/vcpkg/base/files.h index 74e16627bf..bfb88e4758 100644 --- a/include/vcpkg/base/files.h +++ b/include/vcpkg/base/files.h @@ -76,6 +76,7 @@ namespace vcpkg long long tell() const noexcept; int eof() const noexcept; std::error_code error() const noexcept; + int error_raw() const noexcept; const Path& path() const; ExpectedL try_seek_to(long long offset); @@ -97,6 +98,9 @@ namespace vcpkg ExpectedL try_getc(); ExpectedL try_read_all_from(long long offset, void* buffer, std::uint32_t size); std::string read_to_end(std::error_code& ec); + // reads any remaining chunks of the file; used to implement read_to_end + void read_to_end_suffix( + std::string& output, std::error_code& ec, char* buffer, size_t buffer_size, size_t last_read); }; struct WriteFilePointer : FilePointer @@ -135,6 +139,10 @@ namespace vcpkg ExpectedL try_read_contents(const Path& file_path) const; + // Tries to read `file_path`, and if the file starts with a shebang sequence #!, returns the contents of the + // file. If an I/O error occurs or the file does not start with a shebang sequence, returns an empty string. + virtual std::string best_effort_read_contents_if_shebang(const Path& file_path) const = 0; + virtual Path find_file_recursively_up(const Path& starting_dir, const Path& filename, std::error_code& ec) const = 0; diff --git a/include/vcpkg/base/parallel-algorithms.h b/include/vcpkg/base/parallel-algorithms.h index 47b2472746..406683e278 100644 --- a/include/vcpkg/base/parallel-algorithms.h +++ b/include/vcpkg/base/parallel-algorithms.h @@ -1,85 +1,170 @@ #pragma once - +#if defined(_WIN32) +#include +#else // ^^^ _WIN32 / !_WIN32 vvv +#include +#include +#include +#include +#endif // ^^^ _WIN32 #include +#include + +#include #include -#include -#include namespace vcpkg { template - inline void execute_in_parallel(size_t work_count, F&& work) noexcept + struct WorkCallbackContext { - const size_t thread_count = static_cast(get_concurrency()); - const size_t num_threads = std::max(static_cast(1), std::min(thread_count, work_count)); + F work; + size_t work_count; + std::atomic next_offset; - std::vector> workers; - workers.reserve(num_threads - 1); + WorkCallbackContext(F init_f, size_t work_count) : work(init_f), work_count(work_count), next_offset(0) { } - for (size_t i = 0; i < num_threads - 1; ++i) + // pre: run() is called at most SIZE_MAX - work_count times + void run() { - workers.emplace_back(std::async(std::launch::async | std::launch::deferred, [&work]() { work(); })); + for (;;) + { + auto offset = next_offset.fetch_add(1, std::memory_order_relaxed); + if (offset >= work_count) + { + return; + } + + work(offset); + } } - work(); + }; - for (auto&& w : workers) +#if defined(_WIN32) + struct PtpWork + { + PtpWork(_In_ PTP_WORK_CALLBACK pfnwk, _Inout_opt_ PVOID pv, _In_opt_ PTP_CALLBACK_ENVIRON pcbe) + : ptp_work(CreateThreadpoolWork(pfnwk, pv, pcbe)) { - w.get(); } - } + PtpWork(const PtpWork&) = delete; + PtpWork& operator=(const PtpWork&) = delete; + ~PtpWork() + { + if (ptp_work) + { + ::WaitForThreadpoolWorkCallbacks(ptp_work, TRUE); + ::CloseThreadpoolWork(ptp_work); + } + } - template - void parallel_for_each(Container&& c, F cb) noexcept + explicit operator bool() { return ptp_work != nullptr; } + + void submit() { ::SubmitThreadpoolWork(ptp_work); } + + private: + PTP_WORK ptp_work; + }; + + template + inline void execute_in_parallel(size_t work_count, F work) noexcept { - if (c.size() == 0) + if (work_count == 0) { return; } - if (c.size() == 1) + + if (work_count == 1) { - cb(c[0]); - return; + work(size_t{}); } - std::atomic_size_t next{0}; - - execute_in_parallel(c.size(), [&]() { - size_t i = 0; - while (i < c.size()) + WorkCallbackContext context{work, work_count}; + PtpWork ptp_work([](PTP_CALLBACK_INSTANCE, + void* context, + PTP_WORK) noexcept { static_cast*>(context)->run(); }, + &context, + nullptr); + if (ptp_work) + { + auto max_threads = (std::min)(work_count, static_cast(get_concurrency())); + max_threads = (std::min)(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add + // start at 1 to account for the running thread + for (size_t i = 1; i < max_threads; ++i) { - if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed)) - { - cb(c[i]); - } + ptp_work.submit(); } - }); + } + + context.run(); } +#else // ^^^ _WIN32 / !_WIN32 vvv + struct JThread + { + template>::value, int> = 0> + JThread(Arg0&& arg0) : m_thread(std::forward(arg0)) + { + } - template - void parallel_transform(const Container& c, RanItTarget out_begin, F&& cb) noexcept + ~JThread() { m_thread.join(); } + + JThread(const JThread&) = delete; + JThread& operator=(const JThread&) = delete; + JThread(JThread&&) = default; + JThread& operator=(JThread&&) = default; + + private: + std::thread m_thread; + }; + + template + inline void execute_in_parallel(size_t work_count, F work) noexcept { - if (c.size() == 0) + if (work_count == 0) { return; } - if (c.size() == 1) + + if (work_count == 1) { - *out_begin = cb(c[0]); + work(size_t{}); return; } - std::atomic_size_t next{0}; - - execute_in_parallel(c.size(), [&]() { - size_t i = 0; - while (i < c.size()) + WorkCallbackContext context{work, work_count}; + auto max_threads = std::min(work_count, static_cast(get_concurrency())); + max_threads = std::min(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add + auto bg_thread_count = max_threads - 1; + std::vector bg_threads; + bg_threads.reserve(bg_thread_count); + for (size_t i = 0; i < bg_thread_count; ++i) + { + try { - if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed)) - { - *(out_begin + i) = cb(c[i]); - } + bg_threads.emplace_back([&]() { context.run(); }); } - }); + catch (const std::system_error&) + { + // ok, just give up trying to create threads + break; + } + } + + context.run(); + // destroying workers joins + } +#endif // ^^^ !_WIN32 + + template + void parallel_for_each(Container&& c, F cb) noexcept + { + execute_in_parallel(c.size(), [&](size_t offset) { cb(c[offset]); }); + } + + template + void parallel_transform(const Container& c, RanItTarget out_begin, F cb) noexcept + { + execute_in_parallel(c.size(), [&](size_t offset) { out_begin[offset] = cb(c[offset]); }); } } diff --git a/src/vcpkg/base/files.cpp b/src/vcpkg/base/files.cpp index eb7d14d703..b305e9f2f1 100644 --- a/src/vcpkg/base/files.cpp +++ b/src/vcpkg/base/files.cpp @@ -1355,6 +1355,8 @@ namespace vcpkg return std::error_code(::ferror(m_fs), std::generic_category()); } + int FilePointer::error_raw() const noexcept { return ::ferror(m_fs); } + const Path& FilePointer::path() const { return m_path; } ExpectedL FilePointer::try_seek_to(long long offset) { return try_seek_to(offset, SEEK_SET); } @@ -1453,28 +1455,72 @@ namespace vcpkg std::string output; constexpr std::size_t buffer_size = 1024 * 32; char buffer[buffer_size]; - do + auto this_read = this->read(buffer, 1, buffer_size); + if (this_read != buffer_size) { - const auto this_read = this->read(buffer, 1, buffer_size); - if (this_read != 0) + auto maybe_error = ::ferror(m_fs); + if (maybe_error) { - output.append(buffer, this_read); + ec.assign(maybe_error, std::generic_category()); + return output; } - else if ((ec = this->error())) + + if (!this->eof()) { - return std::string(); + Checks::unreachable(VCPKG_LINE_INFO, "Got a partial read without an error or end"); } - } while (!this->eof()); + } - if (Strings::starts_with(output, "\xEF\xBB\xBF")) { - // remove byte-order mark from the beginning of the string - output.erase(output.begin(), output.begin() + 3); + const char* to_append = buffer; + size_t to_append_size = this_read; + if (to_append_size >= 3 && ::memcmp(to_append, "\xEF\xBB\xBF", 3) == 0) + { + // remove byte-order mark from the beginning of the string + to_append_size -= 3; + to_append += 3; + } + + output.append(to_append, to_append_size); } + read_to_end_suffix(output, ec, buffer, buffer_size, this_read); return output; } + void ReadFilePointer::read_to_end_suffix( + std::string& output, std::error_code& ec, char* buffer, size_t buffer_size, size_t last_read) + { + if (last_read == buffer_size) + { + for (;;) + { + last_read = this->read(buffer, 1, buffer_size); + if (last_read != buffer_size) + { + break; + } + + output.append(buffer, last_read); + } + + if (auto maybe_error = ::ferror(m_fs)) + { + ec.assign(maybe_error, std::generic_category()); + output.clear(); + return; + } + + output.append(buffer, last_read); + if (!this->eof()) + { + Checks::unreachable(VCPKG_LINE_INFO, "Got a partial read without an error or end"); + } + } + + ec.clear(); + } + WriteFilePointer::WriteFilePointer() noexcept = default; WriteFilePointer::WriteFilePointer(WriteFilePointer&&) noexcept = default; @@ -2204,6 +2250,58 @@ namespace vcpkg return file.read_to_end(ec); } + + virtual std::string best_effort_read_contents_if_shebang(const Path& file_path) const override + { + std::error_code ec; + StatsTimer t(g_us_filesystem_stats); + ReadFilePointer file{file_path, ec}; + std::string output; + if (ec) + { + Debug::print("Failed to open: ", file_path, '\n'); + return output; + } + + constexpr std::size_t buffer_size = 1024 * 32; + char buffer[buffer_size]; + auto this_read = file.read(buffer, 1, buffer_size); + if (this_read != buffer_size) + { + if (file.error_raw()) + { + return output; + } + + if (!file.eof()) + { + Checks::unreachable(VCPKG_LINE_INFO, "Got a partial read without an error or end"); + } + } + + { + const char* to_append = buffer; + size_t to_append_size = this_read; + if (to_append_size >= 3 && ::memcmp(to_append, "\xEF\xBB\xBF", 3) == 0) + { + // remove byte-order mark from the beginning of the string + to_append_size -= 3; + to_append += 3; + } + + if (to_append_size < 2 || ::memcmp(to_append, "#!", 2) != 0) + { + // doesn't start with shebang + return output; + } + + output.append(to_append, to_append_size); + } + + file.read_to_end_suffix(output, ec, buffer, buffer_size, this_read); + return output; + } + virtual ExpectedL> read_lines(const Path& file_path) const override { StatsTimer t(g_us_filesystem_stats); @@ -2225,6 +2323,7 @@ namespace vcpkg { output.on_data({buffer, this_read}); } + else if ((ec = file.error())) { return format_filesystem_call_error(ec, "read_lines_read", {file_path}); diff --git a/src/vcpkg/postbuildlint.cpp b/src/vcpkg/postbuildlint.cpp index b3d26a057b..3effd2326e 100644 --- a/src/vcpkg/postbuildlint.cpp +++ b/src/vcpkg/postbuildlint.cpp @@ -1245,19 +1245,10 @@ namespace vcpkg if (extension.empty()) { - std::error_code ec; - ReadFilePointer read_file(file, ec); - if (ec) return false; - char buffer[5]; - if (read_file.read(buffer, 1, sizeof(buffer)) < sizeof(buffer)) return false; - if (Strings::starts_with(StringView(buffer, sizeof(buffer)), "#!") || - Strings::starts_with(StringView(buffer, sizeof(buffer)), "\xEF\xBB\xBF#!") /* ignore byte-order mark */) - { - const auto contents = fs.read_contents(file, IgnoreErrors{}); - return Strings::contains_any_ignoring_hash_comments(contents, searcher_paths); - } - return false; + const auto contents = fs.best_effort_read_contents_if_shebang(file); + return Strings::contains_any_ignoring_hash_comments(contents, searcher_paths); } + return false; } @@ -1287,22 +1278,25 @@ namespace vcpkg string_paths, [](std::string& s) { return Strings::boyer_moore_horspool_searcher(s.begin(), s.end()); }); std::vector failing_files; - std::mutex mtx; - auto files = fs.get_regular_files_recursive(dir, IgnoreErrors{}); + { + std::mutex mtx; + auto files = fs.get_regular_files_recursive(dir, IgnoreErrors{}); - parallel_for_each(files, [&](const Path& file) { - if (file_contains_absolute_paths(fs, file, searcher_paths)) - { - std::lock_guard lock{mtx}; - failing_files.push_back(file); - } - }); + parallel_for_each(files, [&](const Path& file) { + if (file_contains_absolute_paths(fs, file, searcher_paths)) + { + std::lock_guard lock{mtx}; + failing_files.push_back(file); + } + }); + } // destroy mtx if (failing_files.empty()) { return LintStatus::SUCCESS; } + Util::sort(failing_files); auto error_message = msg::format(msgFilesContainAbsolutePath1); for (auto&& absolute_path : absolute_paths) {