Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Implement redirection without tee and rotation on windows #49906

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dcc508b
redirection utils
dentiny Jan 13, 2025
10f30e4
add windows open
dentiny Jan 15, 2025
893defc
cooperative destruction
dentiny Jan 15, 2025
6e11142
capsulate synchronization logic inside of RedirectionFileHandle
dentiny Jan 15, 2025
1d3f168
Merge branch 'master' into hjiang/redirection-util
dentiny Jan 15, 2025
b1a9db1
single close API to synchronize
dentiny Jan 15, 2025
6e1b198
combine close and flush
dentiny Jan 15, 2025
072f52a
RAY_CHECK for windows syscall
dentiny Jan 15, 2025
d15af0b
doc
dentiny Jan 15, 2025
896a895
fix windows
dentiny Jan 16, 2025
a1f51fc
fix windows
dentiny Jan 16, 2025
84d00a8
fix windows
dentiny Jan 16, 2025
e20fa51
format
dentiny Jan 16, 2025
32130e7
fix windowes
dentiny Jan 16, 2025
ff64321
linux implementation for tee
dentiny Jan 16, 2025
b05e10c
make windows compile
dentiny Jan 16, 2025
6704a45
rename
dentiny Jan 16, 2025
ee2d500
add test for more combinations
dentiny Jan 16, 2025
5a3176d
rename log option
dentiny Jan 16, 2025
ad7b292
capture stdout/stderr in test
dentiny Jan 16, 2025
c51129f
cleanup unused windows function
dentiny Jan 16, 2025
0aea412
set thread name
dentiny Jan 16, 2025
c8334ee
invalid_fd to save code
dentiny Jan 16, 2025
6e0bc8b
internalize stream fd
dentiny Jan 16, 2025
afaa709
fix build
dentiny Jan 16, 2025
e61b9e2
rename log -> stream
dentiny Jan 16, 2025
15c5372
fix windows build
dentiny Jan 17, 2025
db56818
stream redirection on windows
dentiny Jan 17, 2025
59cdf29
Merge branch 'master' into hjiang/windows-redirection
dentiny Jan 17, 2025
f1b15c6
fix build
dentiny Jan 17, 2025
b668f3a
fix windows test utils
dentiny Jan 17, 2025
80220aa
use STL for cross platform
dentiny Jan 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/ray/util/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/util/filesystem.h"

#include <cstdlib>
#include <fstream>

#include "ray/util/logging.h"

Expand Down Expand Up @@ -60,4 +61,18 @@ std::string GetUserTempDir() {
return result;
}

std::string CompleteReadFile(const std::string &fname) {
std::ifstream file(fname);
RAY_CHECK(file.good()) << "Fails to open file " << fname;

std::ostringstream buffer;
buffer << file.rdbuf();
RAY_CHECK(file.good()) << "Fails to read from file " << fname;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading, the file should be in the eof state which is not good()?


std::string content = buffer.str();
file.close();

return content;
}

} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/util/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ std::string JoinPaths(std::string base, const Paths &...components) {
(join(base, std::string_view(components)), ...);
return base;
}

// Read the whole content for the given [fname], and return as string.
// If any error happens, throw exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is exception thrown?

std::string CompleteReadFile(const std::string &fname);

} // namespace ray
31 changes: 30 additions & 1 deletion src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@ RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {

return RedirectionFileHandle{fd, std::move(flush_fn), std::move(close_fn)};
}
#elif defined(_WIN32)
#include <windows.h>
RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
HANDLE file_handle = CreateFile(file_path.c_str(),
GENERIC_WRITE,
0, // No sharing
NULL, // Default security attributes
CREATE_ALWAYS, // Always create a new file
FILE_ATTRIBUTE_NORMAL, // Normal file attributes
NULL // No template file
);
RAY_CHECK(file_handle != INVALID_HANDLE_VALUE)
<< "Fails to open file " << file_path << " with error "
<< std::to_string(GetLastError());

auto flush_fn = [file_handle]() {
RAY_CHECK(FlushFileBuffers(file_handle))
<< "Failed to flush data to disk with error: " << std::to_string(GetLastError());
};
auto close_fn = [file_handle]() {
RAY_CHECK(FlushFileBuffers(file_handle))
<< "Failed to flush data to disk with error: " << std::to_string(GetLastError());
RAY_CHECK(CloseHandle(file_handle))
<< "Failed to close file with error: " << std::to_string(GetLastError());
};
return RedirectionFileHandle{file_handle, std::move(flush_fn), std::move(close_fn)};
}
#endif

} // namespace
Expand Down Expand Up @@ -340,7 +367,9 @@ RedirectionFileHandle CreateRedirectionFileHandle(
#elif defined(_WIN32)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
return RedirectionFileHandle{};
// TODO(hjiang): For windows, we currently doesn't support redirection with rotation and
// tee to stdout/stderr.
return OpenFileForRedirection(stream_redirect_opt.file_path);
}
#endif

Expand Down
23 changes: 12 additions & 11 deletions src/ray/util/stream_redirection_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ namespace {
#if defined(__APPLE__) || defined(__linux__)
int GetStdoutHandle() { return STDOUT_FILENO; }
int GetStderrHandle() { return STDERR_FILENO; }
#elif defined(_WIN32)
int GetStdoutHandle() { return _fileno(stdout); }
int GetStderrHandle() { return _fileno(stderr); }
#endif

// TODO(hjiang): Revisit later, should be able to save some heap alllocation with
// TODO(hjiang): Revisit later, should be able to save some heap allocation with
// absl::InlinedVector.
//
// Maps from original stream file handle (i.e. stdout/stderr) to its stream redirector.
Expand All @@ -52,7 +55,6 @@ void SyncOnStreamRedirection() {
}
}

#if defined(__APPLE__) || defined(__linux__)
// Redirect the given [stream_fd] based on the specified option.
void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
std::call_once(stream_exit_once_flag, []() {
Expand All @@ -61,14 +63,21 @@ void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
});

RedirectionFileHandle handle = CreateRedirectionFileHandle(opt);

#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_NE(dup2(handle.GetWriteHandle(), stream_fd), -1)
<< "Fails to duplicate file descritor " << strerror(errno);
#elif defined(_WIN32)
int pipe_write_fd =
_open_osfhandle(reinterpret_cast<intptr_t>(handle.GetWriteHandle()), _O_WRONLY);
RAY_CHECK_NE(_dup2(pipe_write_fd, stream_fd), -1)
<< "Fails to duplicate file descritor.";
#endif

const bool is_new =
redirection_file_handles.emplace(stream_fd, std::move(handle)).second;
RAY_CHECK(is_new) << "Redirection has been register for stream " << stream_fd;
}
#endif

void FlushOnRedirectedStream(int stream_fd) {
auto iter = redirection_file_handles.find(stream_fd);
Expand All @@ -79,7 +88,6 @@ void FlushOnRedirectedStream(int stream_fd) {

} // namespace

#if defined(__APPLE__) || defined(__linux__)
void RedirectStdout(const StreamRedirectionOption &opt) {
RedirectStream(GetStdoutHandle(), opt);
}
Expand All @@ -89,11 +97,4 @@ void RedirectStderr(const StreamRedirectionOption &opt) {
void FlushOnRedirectedStdout() { FlushOnRedirectedStream(GetStdoutHandle()); }
void FlushOnRedirectedStderr() { FlushOnRedirectedStream(GetStderrHandle()); }

#elif defined(_WIN32)
void RedirectStdout(const StreamRedirectionOption &opt) { return; }
void RedirectStderr(const StreamRedirectionOption &opt) { return; }
void FlushOnRedirectedStdout() { return; }
void FlushOnRedirectedStderr() { return; }
#endif

} // namespace ray
15 changes: 1 addition & 14 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -212,22 +212,10 @@ ray_cc_test(
tags = ["team:core"],
)

ray_cc_library(
name = "unix_test_utils",
hdrs = ["unix_test_utils.h"],
srcs = ["unix_test_utils.cc"],
deps = [
"//src/ray/util",
"@boost//:scope_exit",
],
testonly = True,
)

ray_cc_test(
name = "pipe_logger_test",
srcs = ["pipe_logger_test.cc"],
deps = [
":unix_test_utils",
"//src/ray/util",
"//src/ray/util:pipe_logger",
"@com_google_googletest//:gtest_main",
Expand All @@ -240,9 +228,8 @@ ray_cc_test(
name = "stream_redirection_utils_test",
srcs = ["stream_redirection_utils_test.cc"],
deps = [
"//src/ray/util:stream_redirection_utils",
":unix_test_utils",
"//src/ray/util",
"//src/ray/util:stream_redirection_utils",
"@com_google_googletest//:gtest_main",
],
size = "small",
Expand Down
63 changes: 59 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#if defined(__APPLE__) || defined(__linux__)

#include "ray/util/pipe_logger.h"

#include <gtest/gtest.h>
Expand All @@ -23,9 +21,13 @@
#include <future>
#include <string_view>

#include "ray/util/tests/unix_test_utils.h"
#include "ray/util/filesystem.h"
#include "ray/util/util.h"

#if defined(__APPLE__) || defined(__linux__)

#include <unistd.h>

namespace ray {

namespace {
Expand Down Expand Up @@ -105,7 +107,29 @@ TEST_P(PipeLoggerTest, PipeWrite) {

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

// TODO(hjiang): Add more test cases on different combinations.
TEST(PipeLoggerTestWithTee, RedirectionWithNoTeeAndRotation) {
// TODO(core): We should have a better test util, which allows us to create a temporary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO(core): We should have a better test util, which allows us to create a temporary
// TODO(hjiang): We should have a better test util, which allows us to create a temporary

// testing directory.
const std::string test_file_path = absl::StrFormat("%s.out", GenerateUUIDV4());

StreamRedirectionOption logging_option{};
logging_option.file_path = test_file_path;

auto log_token = CreateRedirectionFileHandle(logging_option);
ASSERT_EQ(write(log_token.GetWriteHandle(), kLogLine1.data(), kLogLine1.length()),
kLogLine1.length());
ASSERT_EQ(write(log_token.GetWriteHandle(), kLogLine2.data(), kLogLine2.length()),
kLogLine2.length());
log_token.Close();

// Check log content after completion.
EXPECT_EQ(CompleteReadFile(test_file_path),
absl::StrFormat("%s%s", kLogLine1, kLogLine2));

// Delete temporary file.
EXPECT_EQ(unlink(test_file_path.data()), 0);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
// TODO(core): We should have a better test util, which allows us to create a temporary
// testing directory.
Expand Down Expand Up @@ -178,4 +202,35 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

} // namespace ray

#elif defined(_WIN32)

#include <windows.h>

#include "ray/util/tests/windows_test_utils.h"

namespace ray {

TEST(PipeLoggerTestWithTee, RedirectionWithNoTeeAndRotation) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to write the same test twice? I think you can write one that's runnable in both linux and windows

const std::string test_file_path = absl::StrFormat("%s.out", GenerateUUIDV4());

StreamRedirectionOption logging_option{};
logging_option.file_path = test_file_path;

auto log_token = CreateRedirectionFileHandle(logging_option);
ASSERT_EQ(write(log_token.GetWriteHandle(), kLogLine1.data(), kLogLine1.length()),
kLogLine1.length());
ASSERT_EQ(write(log_token.GetWriteHandle(), kLogLine2.data(), kLogLine2.length()),
kLogLine2.length());
log_token.Close();

// Check log content after completion.
EXPECT_EQ(CompleteReadFile(test_file_path),
absl::StrFormat("%s%s", kLogLine1, kLogLine2));

// Delete temporary file.
EXPECT_EQ(DeleteFile(test_file_path.c_str()), TRUE);
}

} // namespace ray

#endif
2 changes: 1 addition & 1 deletion src/ray/util/tests/stream_redirection_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <iostream>
#include <thread>

#include "ray/util/tests/unix_test_utils.h"
#include "ray/util/filesystem.h"
#include "ray/util/util.h"

namespace ray {
Expand Down
46 changes: 0 additions & 46 deletions src/ray/util/tests/unix_test_utils.cc

This file was deleted.

31 changes: 0 additions & 31 deletions src/ray/util/tests/unix_test_utils.h

This file was deleted.

Loading