From b73f0a84bdcabb75dbb7ebde64fb628d20b3776f Mon Sep 17 00:00:00 2001 From: Riley W Date: Thu, 6 Mar 2025 14:42:39 +0800 Subject: [PATCH] Dev/x11 refactor (#421) Send SIGKILL if users call ccancel. Remove useless codes. Refactor Xauth executing. Refactor logs. Fix xauth folder permission error. Fix bugs. Add x11 forwarding option for backend. Add log for where TaskCompletionAckReply is not sent. Fix bugs and x11 barely runs. Fix bugs and x11 runs successfully. Finish coding. Start debugging. Add subprocess.h. Improve Xauthority file generation and add deletion for it. feat: x11 socket listen and x11 forward Add Xauthority file generation. Add Display environment variable and import subprocess library. Merge the proto changes from front end. Fix errors from rebasing. Remove useless headers. --- dependencies/pre_installed/CMakeLists.txt | 1 + .../pre_installed/subprocess/CMakeLists.txt | 2 + .../include/subprocess/subprocess.h | 1203 +++++++++++++++++ protos/Crane.proto | 27 + protos/CraneSubprocess.proto | 1 + protos/PublicDefs.proto | 16 +- src/CraneCtld/CranedKeeper.cpp | 16 +- src/CraneCtld/CtldGrpcServer.cpp | 19 +- src/CraneCtld/CtldPublicDefs.h | 27 +- src/Craned/CMakeLists.txt | 6 +- src/Craned/CforedClient.cpp | 298 +++- src/Craned/CforedClient.h | 58 +- src/Craned/CranedPreCompiledHeader.h | 6 + src/Craned/TaskManager.cpp | 227 +++- src/Craned/TaskManager.h | 29 +- src/Utilities/PublicHeader/OS.cpp | 35 + src/Utilities/PublicHeader/include/crane/OS.h | 3 + 17 files changed, 1804 insertions(+), 170 deletions(-) create mode 100644 dependencies/pre_installed/subprocess/CMakeLists.txt create mode 100644 dependencies/pre_installed/subprocess/include/subprocess/subprocess.h diff --git a/dependencies/pre_installed/CMakeLists.txt b/dependencies/pre_installed/CMakeLists.txt index 7b8fcee6d..c4b93f3a5 100644 --- a/dependencies/pre_installed/CMakeLists.txt +++ b/dependencies/pre_installed/CMakeLists.txt @@ -3,6 +3,7 @@ add_compile_options(-w) add_subdirectory(concurrentqueue) add_subdirectory(pevents-1.22.11) +add_subdirectory(subprocess) include(${CMAKE_SOURCE_DIR}/CMakeModule/SuppressHeaderWarning.cmake) suppress_header_warning() diff --git a/dependencies/pre_installed/subprocess/CMakeLists.txt b/dependencies/pre_installed/subprocess/CMakeLists.txt new file mode 100644 index 000000000..00c99cb22 --- /dev/null +++ b/dependencies/pre_installed/subprocess/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(subprocess INTERFACE) +target_include_directories(subprocess INTERFACE include) \ No newline at end of file diff --git a/dependencies/pre_installed/subprocess/include/subprocess/subprocess.h b/dependencies/pre_installed/subprocess/include/subprocess/subprocess.h new file mode 100644 index 000000000..3e40bae04 --- /dev/null +++ b/dependencies/pre_installed/subprocess/include/subprocess/subprocess.h @@ -0,0 +1,1203 @@ +/* + The latest version of this library is available on GitHub; + https://github.com/sheredom/subprocess.h +*/ + +/* + This is free and unencumbered software released into the public domain. + + Anyone is free to copy, modify, publish, use, compile, sell, or + distribute this software, either in source code form or as a compiled + binary, for any purpose, commercial or non-commercial, and by any + means. + + In jurisdictions that recognize copyright laws, the author or authors + of this software dedicate any and all copyright interest in the + software to the public domain. We make this dedication for the benefit + of the public at large and to the detriment of our heirs and + successors. We intend this dedication to be an overt act of + relinquishment in perpetuity of all present and future rights to this + software under copyright law. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE. + + For more information, please refer to +*/ + +#ifndef SHEREDOM_SUBPROCESS_H_INCLUDED +#define SHEREDOM_SUBPROCESS_H_INCLUDED + +#if defined(_MSC_VER) +#pragma warning(push, 1) + +/* disable warning: '__cplusplus' is not defined as a preprocessor macro, + * replacing with '0' for '#if/#elif' */ +#pragma warning(disable : 4668) +#endif + +#include +#include + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif + +#if defined(__TINYC__) +#define SUBPROCESS_ATTRIBUTE(a) __attribute((a)) +#else +#define SUBPROCESS_ATTRIBUTE(a) __attribute__((a)) +#endif + +#if defined(_MSC_VER) +#define subprocess_pure +#define subprocess_weak __inline +#define subprocess_tls __declspec(thread) +#elif defined(__MINGW32__) +#define subprocess_pure SUBPROCESS_ATTRIBUTE(pure) +#define subprocess_weak static SUBPROCESS_ATTRIBUTE(used) +#define subprocess_tls __thread +#elif defined(__clang__) || defined(__GNUC__) || defined(__TINYC__) +#define subprocess_pure SUBPROCESS_ATTRIBUTE(pure) +#define subprocess_weak SUBPROCESS_ATTRIBUTE(weak) +#define subprocess_tls __thread +#else +#error Non clang, non gcc, non MSVC compiler found! +#endif + +struct subprocess_s; + +enum subprocess_option_e { + // stdout and stderr are the same FILE. + subprocess_option_combined_stdout_stderr = 0x1, + + // The child process should inherit the environment variables of the parent. + subprocess_option_inherit_environment = 0x2, + + // Enable asynchronous reading of stdout/stderr before it has completed. + subprocess_option_enable_async = 0x4, + + // Enable the child process to be spawned with no window visible if supported + // by the platform. + subprocess_option_no_window = 0x8, + + // Search for program names in the PATH variable. Always enabled on Windows. + // Note: this will **not** search for paths in any provided custom environment + // and instead uses the PATH of the spawning process. + subprocess_option_search_user_path = 0x10 +}; + +#if defined(__cplusplus) +extern "C" { +#endif + +/// @brief Create a process. +/// @param command_line An array of strings for the command line to execute for +/// this process. The last element must be NULL to signify the end of the array. +/// The memory backing this parameter only needs to persist until this function +/// returns. +/// @param options A bit field of subprocess_option_e's to pass. +/// @param out_process The newly created process. +/// @return On success zero is returned. +subprocess_weak int subprocess_create(const char *const command_line[], + int options, + struct subprocess_s *const out_process); + +/// @brief Create a process (extended create). +/// @param command_line An array of strings for the command line to execute for +/// this process. The last element must be NULL to signify the end of the array. +/// The memory backing this parameter only needs to persist until this function +/// returns. +/// @param options A bit field of subprocess_option_e's to pass. +/// @param environment An optional array of strings for the environment to use +/// for a child process (each element of the form FOO=BAR). The last element +/// must be NULL to signify the end of the array. +/// @param out_process The newly created process. +/// @return On success zero is returned. +/// +/// If `options` contains `subprocess_option_inherit_environment`, then +/// `environment` must be NULL. +subprocess_weak int +subprocess_create_ex(const char *const command_line[], int options, + const char *const environment[], + struct subprocess_s *const out_process); + +/// @brief Get the standard input file for a process. +/// @param process The process to query. +/// @return The file for standard input of the process. +/// +/// The file returned can be written to by the parent process to feed data to +/// the standard input of the process. +subprocess_pure subprocess_weak FILE * +subprocess_stdin(const struct subprocess_s *const process); + +/// @brief Get the standard output file for a process. +/// @param process The process to query. +/// @return The file for standard output of the process. +/// +/// The file returned can be read from by the parent process to read data from +/// the standard output of the child process. +subprocess_pure subprocess_weak FILE * +subprocess_stdout(const struct subprocess_s *const process); + +/// @brief Get the standard error file for a process. +/// @param process The process to query. +/// @return The file for standard error of the process. +/// +/// The file returned can be read from by the parent process to read data from +/// the standard error of the child process. +/// +/// If the process was created with the subprocess_option_combined_stdout_stderr +/// option bit set, this function will return NULL, and the subprocess_stdout +/// function should be used for both the standard output and error combined. +subprocess_pure subprocess_weak FILE * +subprocess_stderr(const struct subprocess_s *const process); + +/// @brief Wait for a process to finish execution. +/// @param process The process to wait for. +/// @param out_return_code The return code of the returned process (can be +/// NULL). +/// @return On success zero is returned. +/// +/// Joining a process will close the stdin pipe to the process. +subprocess_weak int subprocess_join(struct subprocess_s *const process, + int *const out_return_code); + +/// @brief Destroy a previously created process. +/// @param process The process to destroy. +/// @return On success zero is returned. +/// +/// If the process to be destroyed had not finished execution, it may out live +/// the parent process. +subprocess_weak int subprocess_destroy(struct subprocess_s *const process); + +/// @brief Terminate a previously created process. +/// @param process The process to terminate. +/// @return On success zero is returned. +/// +/// If the process to be destroyed had not finished execution, it will be +/// terminated (i.e killed). +subprocess_weak int subprocess_terminate(struct subprocess_s *const process); + +/// @brief Read the standard output from the child process. +/// @param process The process to read from. +/// @param buffer The buffer to read into. +/// @param size The maximum number of bytes to read. +/// @return The number of bytes actually read into buffer. Can only be 0 if the +/// process has complete. +/// +/// The only safe way to read from the standard output of a process during it's +/// execution is to use the `subprocess_option_enable_async` option in +/// conjunction with this method. +subprocess_weak unsigned +subprocess_read_stdout(struct subprocess_s *const process, char *const buffer, + unsigned size); + +/// @brief Read the standard error from the child process. +/// @param process The process to read from. +/// @param buffer The buffer to read into. +/// @param size The maximum number of bytes to read. +/// @return The number of bytes actually read into buffer. Can only be 0 if the +/// process has complete. +/// +/// The only safe way to read from the standard error of a process during it's +/// execution is to use the `subprocess_option_enable_async` option in +/// conjunction with this method. +subprocess_weak unsigned +subprocess_read_stderr(struct subprocess_s *const process, char *const buffer, + unsigned size); + +/// @brief Returns if the subprocess is currently still alive and executing. +/// @param process The process to check. +/// @return If the process is still alive non-zero is returned. +subprocess_weak int subprocess_alive(struct subprocess_s *const process); + +#if defined(__cplusplus) +#define SUBPROCESS_CAST(type, x) static_cast(x) +#define SUBPROCESS_PTR_CAST(type, x) reinterpret_cast(x) +#define SUBPROCESS_CONST_CAST(type, x) const_cast(x) +#define SUBPROCESS_NULL NULL +#else +#define SUBPROCESS_CAST(type, x) ((type)(x)) +#define SUBPROCESS_PTR_CAST(type, x) ((type)(x)) +#define SUBPROCESS_CONST_CAST(type, x) ((type)(x)) +#define SUBPROCESS_NULL 0 +#endif + +#if !defined(_WIN32) +#include +#include +#include +#include +#include +#include +#endif + +#if defined(_WIN32) + +#if (_MSC_VER < 1920) +#ifdef _WIN64 +typedef __int64 subprocess_intptr_t; +typedef unsigned __int64 subprocess_size_t; +#else +typedef int subprocess_intptr_t; +typedef unsigned int subprocess_size_t; +#endif +#else +#include + +typedef intptr_t subprocess_intptr_t; +typedef size_t subprocess_size_t; +#endif + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wreserved-identifier" +#endif + +typedef struct _PROCESS_INFORMATION *LPPROCESS_INFORMATION; +typedef struct _SECURITY_ATTRIBUTES *LPSECURITY_ATTRIBUTES; +typedef struct _STARTUPINFOA *LPSTARTUPINFOA; +typedef struct _OVERLAPPED *LPOVERLAPPED; + +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +#ifdef _MSC_VER +#pragma warning(push, 1) +#endif +#ifdef __MINGW32__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" +#endif + +struct subprocess_subprocess_information_s { + void *hProcess; + void *hThread; + unsigned long dwProcessId; + unsigned long dwThreadId; +}; + +struct subprocess_security_attributes_s { + unsigned long nLength; + void *lpSecurityDescriptor; + int bInheritHandle; +}; + +struct subprocess_startup_info_s { + unsigned long cb; + char *lpReserved; + char *lpDesktop; + char *lpTitle; + unsigned long dwX; + unsigned long dwY; + unsigned long dwXSize; + unsigned long dwYSize; + unsigned long dwXCountChars; + unsigned long dwYCountChars; + unsigned long dwFillAttribute; + unsigned long dwFlags; + unsigned short wShowWindow; + unsigned short cbReserved2; + unsigned char *lpReserved2; + void *hStdInput; + void *hStdOutput; + void *hStdError; +}; + +struct subprocess_overlapped_s { + uintptr_t Internal; + uintptr_t InternalHigh; + union { + struct { + unsigned long Offset; + unsigned long OffsetHigh; + } DUMMYSTRUCTNAME; + void *Pointer; + } DUMMYUNIONNAME; + + void *hEvent; +}; + +#ifdef __MINGW32__ +#pragma GCC diagnostic pop +#endif +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +__declspec(dllimport) unsigned long __stdcall GetLastError(void); +__declspec(dllimport) int __stdcall SetHandleInformation(void *, unsigned long, + unsigned long); +__declspec(dllimport) int __stdcall CreatePipe(void **, void **, + LPSECURITY_ATTRIBUTES, + unsigned long); +__declspec(dllimport) void *__stdcall CreateNamedPipeA( + const char *, unsigned long, unsigned long, unsigned long, unsigned long, + unsigned long, unsigned long, LPSECURITY_ATTRIBUTES); +__declspec(dllimport) int __stdcall ReadFile(void *, void *, unsigned long, + unsigned long *, LPOVERLAPPED); +__declspec(dllimport) unsigned long __stdcall GetCurrentProcessId(void); +__declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void); +__declspec(dllimport) void *__stdcall CreateFileA(const char *, unsigned long, + unsigned long, + LPSECURITY_ATTRIBUTES, + unsigned long, unsigned long, + void *); +__declspec(dllimport) void *__stdcall CreateEventA(LPSECURITY_ATTRIBUTES, int, + int, const char *); +__declspec(dllimport) int __stdcall CreateProcessA( + const char *, char *, LPSECURITY_ATTRIBUTES, LPSECURITY_ATTRIBUTES, int, + unsigned long, void *, const char *, LPSTARTUPINFOA, LPPROCESS_INFORMATION); +__declspec(dllimport) int __stdcall CloseHandle(void *); +__declspec(dllimport) unsigned long __stdcall WaitForSingleObject( + void *, unsigned long); +__declspec(dllimport) int __stdcall GetExitCodeProcess( + void *, unsigned long *lpExitCode); +__declspec(dllimport) int __stdcall TerminateProcess(void *, unsigned int); +__declspec(dllimport) unsigned long __stdcall WaitForMultipleObjects( + unsigned long, void *const *, int, unsigned long); +__declspec(dllimport) int __stdcall GetOverlappedResult(void *, LPOVERLAPPED, + unsigned long *, int); + +#if defined(_DLL) +#define SUBPROCESS_DLLIMPORT __declspec(dllimport) +#else +#define SUBPROCESS_DLLIMPORT +#endif + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wreserved-identifier" +#endif + +SUBPROCESS_DLLIMPORT int __cdecl _fileno(FILE *); +SUBPROCESS_DLLIMPORT int __cdecl _open_osfhandle(subprocess_intptr_t, int); +SUBPROCESS_DLLIMPORT subprocess_intptr_t __cdecl _get_osfhandle(int); + +#ifndef __MINGW32__ +void *__cdecl _alloca(subprocess_size_t); +#else +#include +#endif + +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +#else +typedef size_t subprocess_size_t; +#endif + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wpadded" +#endif +struct subprocess_s { + FILE *stdin_file; + FILE *stdout_file; + FILE *stderr_file; + +#if defined(_WIN32) + void *hProcess; + void *hStdInput; + void *hEventOutput; + void *hEventError; +#else + pid_t child; + int return_status; +#endif + + subprocess_size_t alive; +}; +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +#if defined(__clang__) +#if __has_warning("-Wunsafe-buffer-usage") +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunsafe-buffer-usage" +#endif +#endif + +#if defined(_WIN32) +subprocess_weak int subprocess_create_named_pipe_helper(void **rd, void **wr); +int subprocess_create_named_pipe_helper(void **rd, void **wr) { + const unsigned long pipeAccessInbound = 0x00000001; + const unsigned long fileFlagOverlapped = 0x40000000; + const unsigned long pipeTypeByte = 0x00000000; + const unsigned long pipeWait = 0x00000000; + const unsigned long genericWrite = 0x40000000; + const unsigned long openExisting = 3; + const unsigned long fileAttributeNormal = 0x00000080; + const void *const invalidHandleValue = + SUBPROCESS_PTR_CAST(void *, ~(SUBPROCESS_CAST(subprocess_intptr_t, 0))); + struct subprocess_security_attributes_s saAttr = {sizeof(saAttr), + SUBPROCESS_NULL, 1}; + char name[256] = {0}; + static subprocess_tls long index = 0; + const long unique = index++; + +#if defined(_MSC_VER) && _MSC_VER < 1900 +#pragma warning(push, 1) +#pragma warning(disable : 4996) + _snprintf(name, sizeof(name) - 1, + "\\\\.\\pipe\\sheredom_subprocess_h.%08lx.%08lx.%ld", + GetCurrentProcessId(), GetCurrentThreadId(), unique); +#pragma warning(pop) +#else + snprintf(name, sizeof(name) - 1, + "\\\\.\\pipe\\sheredom_subprocess_h.%08lx.%08lx.%ld", + GetCurrentProcessId(), GetCurrentThreadId(), unique); +#endif + + *rd = + CreateNamedPipeA(name, pipeAccessInbound | fileFlagOverlapped, + pipeTypeByte | pipeWait, 1, 4096, 4096, SUBPROCESS_NULL, + SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr)); + + if (invalidHandleValue == *rd) { + return -1; + } + + *wr = CreateFileA(name, genericWrite, SUBPROCESS_NULL, + SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), + openExisting, fileAttributeNormal, SUBPROCESS_NULL); + + if (invalidHandleValue == *wr) { + return -1; + } + + return 0; +} +#endif + +int subprocess_create(const char *const commandLine[], int options, + struct subprocess_s *const out_process) { + return subprocess_create_ex(commandLine, options, SUBPROCESS_NULL, + out_process); +} + +int subprocess_create_ex(const char *const commandLine[], int options, + const char *const environment[], + struct subprocess_s *const out_process) { +#if defined(_WIN32) + int fd; + void *rd, *wr; + char *commandLineCombined; + subprocess_size_t len; + int i, j; + int need_quoting; + unsigned long flags = 0; + const unsigned long startFUseStdHandles = 0x00000100; + const unsigned long handleFlagInherit = 0x00000001; + const unsigned long createNoWindow = 0x08000000; + struct subprocess_subprocess_information_s processInfo; + struct subprocess_security_attributes_s saAttr = {sizeof(saAttr), + SUBPROCESS_NULL, 1}; + char *used_environment = SUBPROCESS_NULL; + struct subprocess_startup_info_s startInfo = {0, + SUBPROCESS_NULL, + SUBPROCESS_NULL, + SUBPROCESS_NULL, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + SUBPROCESS_NULL, + SUBPROCESS_NULL, + SUBPROCESS_NULL, + SUBPROCESS_NULL}; + + startInfo.cb = sizeof(startInfo); + startInfo.dwFlags = startFUseStdHandles; + + if (subprocess_option_no_window == (options & subprocess_option_no_window)) { + flags |= createNoWindow; + } + + if (subprocess_option_inherit_environment != + (options & subprocess_option_inherit_environment)) { + if (SUBPROCESS_NULL == environment) { + used_environment = SUBPROCESS_CONST_CAST(char *, "\0\0"); + } else { + // We always end with two null terminators. + len = 2; + + for (i = 0; environment[i]; i++) { + for (j = 0; '\0' != environment[i][j]; j++) { + len++; + } + + // For the null terminator too. + len++; + } + + used_environment = SUBPROCESS_CAST(char *, _alloca(len)); + + // Re-use len for the insertion position + len = 0; + + for (i = 0; environment[i]; i++) { + for (j = 0; '\0' != environment[i][j]; j++) { + used_environment[len++] = environment[i][j]; + } + + used_environment[len++] = '\0'; + } + + // End with the two null terminators. + used_environment[len++] = '\0'; + used_environment[len++] = '\0'; + } + } else { + if (SUBPROCESS_NULL != environment) { + return -1; + } + } + + if (!CreatePipe(&rd, &wr, SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), + 0)) { + return -1; + } + + if (!SetHandleInformation(wr, handleFlagInherit, 0)) { + return -1; + } + + fd = _open_osfhandle(SUBPROCESS_PTR_CAST(subprocess_intptr_t, wr), 0); + + if (-1 != fd) { + out_process->stdin_file = _fdopen(fd, "wb"); + + if (SUBPROCESS_NULL == out_process->stdin_file) { + return -1; + } + } + + startInfo.hStdInput = rd; + + if (options & subprocess_option_enable_async) { + if (subprocess_create_named_pipe_helper(&rd, &wr)) { + return -1; + } + } else { + if (!CreatePipe(&rd, &wr, + SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), 0)) { + return -1; + } + } + + if (!SetHandleInformation(rd, handleFlagInherit, 0)) { + return -1; + } + + fd = _open_osfhandle(SUBPROCESS_PTR_CAST(subprocess_intptr_t, rd), 0); + + if (-1 != fd) { + out_process->stdout_file = _fdopen(fd, "rb"); + + if (SUBPROCESS_NULL == out_process->stdout_file) { + return -1; + } + } + + startInfo.hStdOutput = wr; + + if (subprocess_option_combined_stdout_stderr == + (options & subprocess_option_combined_stdout_stderr)) { + out_process->stderr_file = out_process->stdout_file; + startInfo.hStdError = startInfo.hStdOutput; + } else { + if (options & subprocess_option_enable_async) { + if (subprocess_create_named_pipe_helper(&rd, &wr)) { + return -1; + } + } else { + if (!CreatePipe(&rd, &wr, + SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), 0)) { + return -1; + } + } + + if (!SetHandleInformation(rd, handleFlagInherit, 0)) { + return -1; + } + + fd = _open_osfhandle(SUBPROCESS_PTR_CAST(subprocess_intptr_t, rd), 0); + + if (-1 != fd) { + out_process->stderr_file = _fdopen(fd, "rb"); + + if (SUBPROCESS_NULL == out_process->stderr_file) { + return -1; + } + } + + startInfo.hStdError = wr; + } + + if (options & subprocess_option_enable_async) { + out_process->hEventOutput = + CreateEventA(SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), 1, 1, + SUBPROCESS_NULL); + out_process->hEventError = + CreateEventA(SUBPROCESS_PTR_CAST(LPSECURITY_ATTRIBUTES, &saAttr), 1, 1, + SUBPROCESS_NULL); + } else { + out_process->hEventOutput = SUBPROCESS_NULL; + out_process->hEventError = SUBPROCESS_NULL; + } + + // Combine commandLine together into a single string + len = 0; + for (i = 0; commandLine[i]; i++) { + // for the trailing \0 + len++; + + // Quote the argument if it has a space in it + if (strpbrk(commandLine[i], "\t\v ") != SUBPROCESS_NULL || + commandLine[i][0] == SUBPROCESS_NULL) + len += 2; + + for (j = 0; '\0' != commandLine[i][j]; j++) { + switch (commandLine[i][j]) { + default: + break; + case '\\': + if (commandLine[i][j + 1] == '"') { + len++; + } + + break; + case '"': + len++; + break; + } + len++; + } + } + + commandLineCombined = SUBPROCESS_CAST(char *, _alloca(len)); + + if (!commandLineCombined) { + return -1; + } + + // Gonna re-use len to store the write index into commandLineCombined + len = 0; + + for (i = 0; commandLine[i]; i++) { + if (0 != i) { + commandLineCombined[len++] = ' '; + } + + need_quoting = strpbrk(commandLine[i], "\t\v ") != SUBPROCESS_NULL || + commandLine[i][0] == SUBPROCESS_NULL; + if (need_quoting) { + commandLineCombined[len++] = '"'; + } + + for (j = 0; '\0' != commandLine[i][j]; j++) { + switch (commandLine[i][j]) { + default: + break; + case '\\': + if (commandLine[i][j + 1] == '"') { + commandLineCombined[len++] = '\\'; + } + + break; + case '"': + commandLineCombined[len++] = '\\'; + break; + } + + commandLineCombined[len++] = commandLine[i][j]; + } + if (need_quoting) { + commandLineCombined[len++] = '"'; + } + } + + commandLineCombined[len] = '\0'; + + if (!CreateProcessA( + SUBPROCESS_NULL, + commandLineCombined, // command line + SUBPROCESS_NULL, // process security attributes + SUBPROCESS_NULL, // primary thread security attributes + 1, // handles are inherited + flags, // creation flags + used_environment, // used environment + SUBPROCESS_NULL, // use parent's current directory + SUBPROCESS_PTR_CAST(LPSTARTUPINFOA, + &startInfo), // STARTUPINFO pointer + SUBPROCESS_PTR_CAST(LPPROCESS_INFORMATION, &processInfo))) { + return -1; + } + + out_process->hProcess = processInfo.hProcess; + + out_process->hStdInput = startInfo.hStdInput; + + // We don't need the handle of the primary thread in the called process. + CloseHandle(processInfo.hThread); + + if (SUBPROCESS_NULL != startInfo.hStdOutput) { + CloseHandle(startInfo.hStdOutput); + + if (startInfo.hStdError != startInfo.hStdOutput) { + CloseHandle(startInfo.hStdError); + } + } + + out_process->alive = 1; + + return 0; +#else + int stdinfd[2]; + int stdoutfd[2]; + int stderrfd[2]; + pid_t child; + extern char **environ; + char *const empty_environment[1] = {SUBPROCESS_NULL}; + posix_spawn_file_actions_t actions; + char *const *used_environment; + + if (subprocess_option_inherit_environment == + (options & subprocess_option_inherit_environment)) { + if (SUBPROCESS_NULL != environment) { + return -1; + } + } + + if (0 != pipe(stdinfd)) { + return -1; + } + + if (0 != pipe(stdoutfd)) { + return -1; + } + + if (subprocess_option_combined_stdout_stderr != + (options & subprocess_option_combined_stdout_stderr)) { + if (0 != pipe(stderrfd)) { + return -1; + } + } + + if (environment) { +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wcast-qual" +#pragma clang diagnostic ignored "-Wold-style-cast" +#endif + used_environment = SUBPROCESS_CONST_CAST(char *const *, environment); +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + } else if (subprocess_option_inherit_environment == + (options & subprocess_option_inherit_environment)) { + used_environment = environ; + } else { + used_environment = empty_environment; + } + + if (0 != posix_spawn_file_actions_init(&actions)) { + return -1; + } + + // Close the stdin write end + if (0 != posix_spawn_file_actions_addclose(&actions, stdinfd[1])) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + + // Map the read end to stdin + if (0 != + posix_spawn_file_actions_adddup2(&actions, stdinfd[0], STDIN_FILENO)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + + // Close the stdout read end + if (0 != posix_spawn_file_actions_addclose(&actions, stdoutfd[0])) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + + // Map the write end to stdout + if (0 != + posix_spawn_file_actions_adddup2(&actions, stdoutfd[1], STDOUT_FILENO)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + + if (subprocess_option_combined_stdout_stderr == + (options & subprocess_option_combined_stdout_stderr)) { + if (0 != posix_spawn_file_actions_adddup2(&actions, STDOUT_FILENO, + STDERR_FILENO)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + } else { + // Close the stderr read end + if (0 != posix_spawn_file_actions_addclose(&actions, stderrfd[0])) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + // Map the write end to stdout + if (0 != posix_spawn_file_actions_adddup2(&actions, stderrfd[1], + STDERR_FILENO)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + } + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wcast-qual" +#pragma clang diagnostic ignored "-Wold-style-cast" +#endif + if (subprocess_option_search_user_path == + (options & subprocess_option_search_user_path)) { + if (0 != posix_spawnp(&child, commandLine[0], &actions, SUBPROCESS_NULL, + SUBPROCESS_CONST_CAST(char *const *, commandLine), + used_environment)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + } else { + if (0 != posix_spawn(&child, commandLine[0], &actions, SUBPROCESS_NULL, + SUBPROCESS_CONST_CAST(char *const *, commandLine), + used_environment)) { + posix_spawn_file_actions_destroy(&actions); + return -1; + } + } +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + + // Close the stdin read end + close(stdinfd[0]); + // Store the stdin write end + out_process->stdin_file = fdopen(stdinfd[1], "wb"); + + // Close the stdout write end + close(stdoutfd[1]); + // Store the stdout read end + out_process->stdout_file = fdopen(stdoutfd[0], "rb"); + + if (subprocess_option_combined_stdout_stderr == + (options & subprocess_option_combined_stdout_stderr)) { + out_process->stderr_file = out_process->stdout_file; + } else { + // Close the stderr write end + close(stderrfd[1]); + // Store the stderr read end + out_process->stderr_file = fdopen(stderrfd[0], "rb"); + } + + // Store the child's pid + out_process->child = child; + + out_process->alive = 1; + + posix_spawn_file_actions_destroy(&actions); + return 0; +#endif +} + +FILE *subprocess_stdin(const struct subprocess_s *const process) { + return process->stdin_file; +} + +FILE *subprocess_stdout(const struct subprocess_s *const process) { + return process->stdout_file; +} + +FILE *subprocess_stderr(const struct subprocess_s *const process) { + if (process->stdout_file != process->stderr_file) { + return process->stderr_file; + } else { + return SUBPROCESS_NULL; + } +} + +int subprocess_join(struct subprocess_s *const process, + int *const out_return_code) { +#if defined(_WIN32) + const unsigned long infinite = 0xFFFFFFFF; + + if (process->stdin_file) { + fclose(process->stdin_file); + process->stdin_file = SUBPROCESS_NULL; + } + + if (process->hStdInput) { + CloseHandle(process->hStdInput); + process->hStdInput = SUBPROCESS_NULL; + } + + WaitForSingleObject(process->hProcess, infinite); + + if (out_return_code) { + if (!GetExitCodeProcess( + process->hProcess, + SUBPROCESS_PTR_CAST(unsigned long *, out_return_code))) { + return -1; + } + } + + process->alive = 0; + + return 0; +#else + int status; + + if (process->stdin_file) { + fclose(process->stdin_file); + process->stdin_file = SUBPROCESS_NULL; + } + + if (process->child) { + if (process->child != waitpid(process->child, &status, 0)) { + return -1; + } + + process->child = 0; + + if (WIFEXITED(status)) { + process->return_status = WEXITSTATUS(status); + } else { + process->return_status = EXIT_FAILURE; + } + + process->alive = 0; + } + + if (out_return_code) { + *out_return_code = process->return_status; + } + + return 0; +#endif +} + +int subprocess_destroy(struct subprocess_s *const process) { + if (process->stdin_file) { + fclose(process->stdin_file); + process->stdin_file = SUBPROCESS_NULL; + } + + if (process->stdout_file) { + fclose(process->stdout_file); + + if (process->stdout_file != process->stderr_file) { + fclose(process->stderr_file); + } + + process->stdout_file = SUBPROCESS_NULL; + process->stderr_file = SUBPROCESS_NULL; + } + +#if defined(_WIN32) + if (process->hProcess) { + CloseHandle(process->hProcess); + process->hProcess = SUBPROCESS_NULL; + + if (process->hStdInput) { + CloseHandle(process->hStdInput); + } + + if (process->hEventOutput) { + CloseHandle(process->hEventOutput); + } + + if (process->hEventError) { + CloseHandle(process->hEventError); + } + } +#endif + + return 0; +} + +int subprocess_terminate(struct subprocess_s *const process) { +#if defined(_WIN32) + unsigned int killed_process_exit_code; + int success_terminate; + int windows_call_result; + + killed_process_exit_code = 99; + windows_call_result = + TerminateProcess(process->hProcess, killed_process_exit_code); + success_terminate = (windows_call_result == 0) ? 1 : 0; + return success_terminate; +#else + int result; + result = kill(process->child, 9); + return result; +#endif +} + +unsigned subprocess_read_stdout(struct subprocess_s *const process, + char *const buffer, unsigned size) { +#if defined(_WIN32) + void *handle; + unsigned long bytes_read = 0; + struct subprocess_overlapped_s overlapped = {0, 0, {{0, 0}}, SUBPROCESS_NULL}; + overlapped.hEvent = process->hEventOutput; + + handle = SUBPROCESS_PTR_CAST(void *, + _get_osfhandle(_fileno(process->stdout_file))); + + if (!ReadFile(handle, buffer, size, &bytes_read, + SUBPROCESS_PTR_CAST(LPOVERLAPPED, &overlapped))) { + const unsigned long errorIoPending = 997; + unsigned long error = GetLastError(); + + // Means we've got an async read! + if (error == errorIoPending) { + if (!GetOverlappedResult(handle, + SUBPROCESS_PTR_CAST(LPOVERLAPPED, &overlapped), + &bytes_read, 1)) { + const unsigned long errorIoIncomplete = 996; + const unsigned long errorHandleEOF = 38; + error = GetLastError(); + + if ((error != errorIoIncomplete) && (error != errorHandleEOF)) { + return 0; + } + } + } + } + + return SUBPROCESS_CAST(unsigned, bytes_read); +#else + const int fd = fileno(process->stdout_file); + const ssize_t bytes_read = read(fd, buffer, size); + + if (bytes_read < 0) { + return 0; + } + + return SUBPROCESS_CAST(unsigned, bytes_read); +#endif +} + +unsigned subprocess_read_stderr(struct subprocess_s *const process, + char *const buffer, unsigned size) { +#if defined(_WIN32) + void *handle; + unsigned long bytes_read = 0; + struct subprocess_overlapped_s overlapped = {0, 0, {{0, 0}}, SUBPROCESS_NULL}; + overlapped.hEvent = process->hEventError; + + handle = SUBPROCESS_PTR_CAST(void *, + _get_osfhandle(_fileno(process->stderr_file))); + + if (!ReadFile(handle, buffer, size, &bytes_read, + SUBPROCESS_PTR_CAST(LPOVERLAPPED, &overlapped))) { + const unsigned long errorIoPending = 997; + unsigned long error = GetLastError(); + + // Means we've got an async read! + if (error == errorIoPending) { + if (!GetOverlappedResult(handle, + SUBPROCESS_PTR_CAST(LPOVERLAPPED, &overlapped), + &bytes_read, 1)) { + const unsigned long errorIoIncomplete = 996; + const unsigned long errorHandleEOF = 38; + error = GetLastError(); + + if ((error != errorIoIncomplete) && (error != errorHandleEOF)) { + return 0; + } + } + } + } + + return SUBPROCESS_CAST(unsigned, bytes_read); +#else + const int fd = fileno(process->stderr_file); + const ssize_t bytes_read = read(fd, buffer, size); + + if (bytes_read < 0) { + return 0; + } + + return SUBPROCESS_CAST(unsigned, bytes_read); +#endif +} + +int subprocess_alive(struct subprocess_s *const process) { + int is_alive = SUBPROCESS_CAST(int, process->alive); + + if (!is_alive) { + return 0; + } +#if defined(_WIN32) + { + const unsigned long zero = 0x0; + const unsigned long wait_object_0 = 0x00000000L; + + is_alive = wait_object_0 != WaitForSingleObject(process->hProcess, zero); + } +#else + { + int status; + is_alive = 0 == waitpid(process->child, &status, WNOHANG); + + // If the process was successfully waited on we need to cleanup now. + if (!is_alive) { + if (WIFEXITED(status)) { + process->return_status = WEXITSTATUS(status); + } else { + process->return_status = EXIT_FAILURE; + } + + // Since we've already successfully waited on the process, we need to wipe + // the child now. + process->child = 0; + + if (subprocess_join(process, SUBPROCESS_NULL)) { + return -1; + } + } + } +#endif + + if (!is_alive) { + process->alive = 0; + } + + return is_alive; +} + +#if defined(__clang__) +#if __has_warning("-Wunsafe-buffer-usage") +#pragma clang diagnostic pop +#endif +#endif + +#if defined(__cplusplus) +} // extern "C" +#endif + +#endif /* SHEREDOM_SUBPROCESS_H_INCLUDED */ diff --git a/protos/Crane.proto b/protos/Crane.proto index b869c6e35..b71877189 100644 --- a/protos/Crane.proto +++ b/protos/Crane.proto @@ -626,6 +626,7 @@ message StreamCrunRequest { TASK_REQUEST = 0; TASK_COMPLETION_REQUEST = 1; TASK_IO_FORWARD = 2; + TASK_X11_FORWARD = 3; } message TaskReq { @@ -643,12 +644,18 @@ message StreamCrunRequest { string msg = 2; } + message TaskX11ForwardReq { + uint32 task_id = 1; + bytes msg = 2; + } + CrunRequestType type = 1; oneof payload { TaskReq payload_task_req = 2; TaskCompleteReq payload_task_complete_req = 3; TaskIOForwardReq payload_task_io_forward_req = 4; + TaskX11ForwardReq payload_task_x11_forward_req = 5; } } @@ -660,6 +667,7 @@ message StreamCrunReply { TASK_COMPLETION_ACK_REPLY = 3; TASK_IO_FORWARD = 4; TASK_IO_FORWARD_READY = 5; + TASK_X11_FORWARD = 6; } message TaskIdReply { @@ -689,6 +697,10 @@ message StreamCrunReply { string msg = 1; } + message TaskX11ForwardReply{ + bytes msg = 1; + } + CforedCrunReplyType type = 1 ; oneof payload { @@ -698,6 +710,7 @@ message StreamCrunReply { TaskCompletionAckReply payload_task_completion_ack_reply = 5; TaskIOForwardReadyReply payload_task_io_forward_ready_reply = 6; TaskIOForwardReply payload_task_io_forward_reply = 7; + TaskX11ForwardReply payload_task_x11_forward_reply = 8; } } @@ -706,6 +719,7 @@ message StreamTaskIORequest { CRANED_REGISTER = 0; CRANED_TASK_OUTPUT = 1; CRANED_UNREGISTER = 2; + CRANED_TASK_X11_OUTPUT = 3; } message CranedRegisterReq { @@ -721,11 +735,17 @@ message StreamTaskIORequest { string craned_id = 1; } + message CranedTaskX11OutputReq { + uint32 task_id = 1; + bytes msg = 2; + } + CranedRequestType type = 1; oneof payload { CranedRegisterReq payload_register_req = 2; CranedTaskOutputReq payload_task_output_req = 3; CranedUnRegisterReq payload_unregister_req = 4; + CranedTaskX11OutputReq payload_task_x11_output_req = 5; } } @@ -734,6 +754,7 @@ message StreamTaskIOReply { CRANED_REGISTER_REPLY = 0; CRANED_TASK_INPUT = 1; CRANED_UNREGISTER_REPLY = 2; + CRANED_TASK_X11_INPUT = 3; } message CranedRegisterReply { @@ -749,12 +770,18 @@ message StreamTaskIOReply { bool ok = 1; } + message CranedTaskX11InputReq { + uint32 task_id = 1; + bytes msg = 2; + } + CranedReplyType type = 1; oneof payload { CranedRegisterReply payload_craned_register_reply = 2; CranedTaskInputReq payload_task_input_req = 3; CranedUnregisterReply payload_craned_unregister_reply = 4; + CranedTaskX11InputReq payload_task_x11_input_req = 5; } } diff --git a/protos/CraneSubprocess.proto b/protos/CraneSubprocess.proto index 812f1059d..430310890 100644 --- a/protos/CraneSubprocess.proto +++ b/protos/CraneSubprocess.proto @@ -23,6 +23,7 @@ option go_package = "/protos/subprocess"; message CanStartMessage { bool ok = 1; + uint32 x11_port = 2; } message ChildProcessReady { diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 8f5647f3b..8e1860e4b 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -221,6 +221,14 @@ message BatchTaskAdditionalMeta { string error_file_pattern = 4; } +message X11Meta { + string target = 1; + uint32 port = 2; + string cookie = 3; + + bool enable_forwarding = 4; +} + message InteractiveTaskAdditionalMeta { string cfored_name = 1; string sh_script = 2; @@ -228,14 +236,8 @@ message InteractiveTaskAdditionalMeta { InteractiveTaskType interactive_type = 4; bool pty = 5; - bool x11 = 6; - - message X11Meta { - string target = 1; - uint32 port = 2; - string cookie = 3; - } + bool x11 = 6; X11Meta x11_meta = 7; } diff --git a/src/CraneCtld/CranedKeeper.cpp b/src/CraneCtld/CranedKeeper.cpp index bd6545896..08e3eb076 100644 --- a/src/CraneCtld/CranedKeeper.cpp +++ b/src/CraneCtld/CranedKeeper.cpp @@ -314,21 +314,9 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequests( mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern); mutable_meta->set_sh_script(meta_in_ctld.sh_script); } else { - auto &meta_in_ctld = std::get(task->meta); + const auto &proto_ia_meta = task->TaskToCtld().interactive_meta(); auto *mutable_meta = mutable_task->mutable_interactive_meta(); - mutable_meta->set_cfored_name(meta_in_ctld.cfored_name); - mutable_meta->set_sh_script(meta_in_ctld.sh_script); - mutable_meta->set_term_env(meta_in_ctld.term_env); - mutable_meta->set_interactive_type(meta_in_ctld.interactive_type); - mutable_meta->set_pty(meta_in_ctld.pty); - - mutable_meta->set_x11(meta_in_ctld.x11); - if (meta_in_ctld.x11) { - auto *x11_meta = mutable_meta->mutable_x11_meta(); - x11_meta->set_cookie(meta_in_ctld.x11_meta.cookie); - x11_meta->set_target(meta_in_ctld.x11_meta.target); - x11_meta->set_port(meta_in_ctld.x11_meta.port); - } + mutable_meta->CopyFrom(proto_ia_meta); } } diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index fca996964..6cb7a7d6a 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -833,12 +833,21 @@ grpc::Status CraneCtldServiceImpl::CforedStream( writer->WriteTaskCancelRequest(task_id); }; - meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr]( + meta.cb_task_completed = [this, cfored_name, writer_weak_ptr]( task_id_t task_id, bool send_completion_ack) { + CRANE_TRACE("The completion callback of task #{} has been called.", + task_id); if (auto writer = writer_weak_ptr.lock(); - writer && send_completion_ack) + writer && send_completion_ack) { writer->WriteTaskCompletionAckReply(task_id); + } else { + CRANE_ERROR( + "Stream writer of ia task #{} has been destroyed. " + "TaskCompletionAckReply will not be sent.", + task_id); + } + m_ctld_server_->m_mtx_.Lock(); // If cfored disconnected, the cfored_name should have be @@ -886,6 +895,12 @@ grpc::Status CraneCtldServiceImpl::CforedStream( if (g_task_scheduler->TerminatePendingOrRunningIaTask( payload.task_id()) != CraneErrCode::SUCCESS) stream_writer->WriteTaskCompletionAckReply(payload.task_id()); + else { + CRANE_TRACE( + "Termination of task #{} succeeded. " + "Leave TaskCompletionAck to TaskStatusChange.", + payload.task_id()); + } } break; case StreamCforedRequest::CFORED_GRACEFUL_EXIT: { diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 604e0eade..fda03fc5f 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -230,7 +230,6 @@ struct PartitionMeta { }; struct InteractiveMetaInTask { - std::string cfored_name; crane::grpc::InteractiveTaskType interactive_type; std::string sh_script; @@ -239,13 +238,6 @@ struct InteractiveMetaInTask { bool pty; bool x11; - struct X11Meta { - std::string cookie; - std::string target; - uint32_t port; - }; - X11Meta x11_meta; - std::function const&)> cb_task_res_allocated; @@ -511,22 +503,9 @@ struct TaskInCtld { .error_file_pattern = val.batch_meta().error_file_pattern(), }); } else { - auto& int_meta = std::get(meta); - int_meta.cfored_name = val.interactive_meta().cfored_name(); - int_meta.sh_script = val.interactive_meta().sh_script(); - - int_meta.interactive_type = val.interactive_meta().interactive_type(); - if (int_meta.interactive_type == crane::grpc::InteractiveTaskType::Crun) { - int_meta.term_env = val.interactive_meta().term_env(); - int_meta.pty = val.interactive_meta().pty(); - - int_meta.x11 = val.interactive_meta().x11(); - if (int_meta.x11) { - int_meta.x11_meta.cookie = val.interactive_meta().x11_meta().cookie(); - int_meta.x11_meta.target = val.interactive_meta().x11_meta().target(); - int_meta.x11_meta.port = val.interactive_meta().x11_meta().port(); - } - } + auto& InteractiveMeta = std::get(meta); + InteractiveMeta.interactive_type = + val.interactive_meta().interactive_type(); } node_num = val.node_num(); diff --git a/src/Craned/CMakeLists.txt b/src/Craned/CMakeLists.txt index ede243c21..f7b7b4954 100644 --- a/src/Craned/CMakeLists.txt +++ b/src/Craned/CMakeLists.txt @@ -7,14 +7,15 @@ add_executable(craned CforedClient.cpp TaskManager.h TaskManager.cpp + DeviceManager.cpp + DeviceManager.h CranedServer.h CranedServer.cpp CranedPublicDefs.h Craned.cpp CranedPreCompiledHeader.h - DeviceManager.cpp - DeviceManager.h) +) target_precompile_headers(craned PRIVATE CranedPreCompiledHeader.h) add_dependencies(craned libcgroup) @@ -53,6 +54,7 @@ target_link_libraries(craned absl::synchronization uvw + subprocess yaml-cpp diff --git a/src/Craned/CforedClient.cpp b/src/Craned/CforedClient.cpp index 08a23a13e..feb57ae02 100644 --- a/src/Craned/CforedClient.cpp +++ b/src/Craned/CforedClient.cpp @@ -63,30 +63,56 @@ void CforedClient::CleanOutputQueueAndWriteToStreamThread_( std::atomic* write_pending) { CRANE_TRACE("CleanOutputQueueThread started."); std::pair output; + std::tuple, size_t> x11_output; bool ok = m_output_queue_.try_dequeue(output); + bool x11_ok = m_x11_output_queue_.try_dequeue(x11_output); // Make sure before exit all output has been drained. - while (!m_stopped_ || ok) { - if (!ok) { + while (!m_stopped_ || ok || x11_ok) { + if (!ok && !x11_ok) { std::this_thread::sleep_for(std::chrono::milliseconds(75)); ok = m_output_queue_.try_dequeue(output); + x11_ok = m_x11_output_queue_.try_dequeue(x11_output); continue; } - StreamTaskIORequest request; - request.set_type(StreamTaskIORequest::CRANED_TASK_OUTPUT); + if (ok) { + StreamTaskIORequest request; + request.set_type(StreamTaskIORequest::CRANED_TASK_OUTPUT); - auto* payload = request.mutable_payload_task_output_req(); - payload->set_msg(output.second), payload->set_task_id(output.first); + auto* payload = request.mutable_payload_task_output_req(); + payload->set_msg(output.second), payload->set_task_id(output.first); - while (write_pending->load(std::memory_order::acquire)) - std::this_thread::sleep_for(std::chrono::milliseconds(25)); + while (write_pending->load(std::memory_order::acquire)) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + + CRANE_TRACE("Writing output..."); + write_pending->store(true, std::memory_order::release); + stream->Write(request, (void*)Tag::Write); - CRANE_TRACE("Writing output..."); - write_pending->store(true, std::memory_order::release); - stream->Write(request, (void*)Tag::Write); + ok = m_output_queue_.try_dequeue(output); + } - ok = m_output_queue_.try_dequeue(output); + if (x11_ok) { + StreamTaskIORequest request; + request.set_type(StreamTaskIORequest::CRANED_TASK_X11_OUTPUT); + + auto* payload = request.mutable_payload_task_x11_output_req(); + + auto& [task_id, p, len] = x11_output; + payload->set_msg(p.get(), len); + payload->set_task_id(task_id); + + while (write_pending->load(std::memory_order::acquire)) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + + CRANE_TRACE("Forwarding x11 output from task to cfored {}", + this->m_cfored_name_); + write_pending->store(true, std::memory_order::release); + stream->Write(request, (void*)Tag::Write); + + x11_ok = m_x11_output_queue_.try_dequeue(x11_output); + } } CRANE_TRACE("CleanOutputQueueThread exited."); @@ -217,25 +243,39 @@ void CforedClient::AsyncSendRecvThread_() { } CRANE_ASSERT(tag == Tag::Read); - if (reply.type() != StreamTaskIOReply::CRANED_TASK_INPUT) { - CRANE_ERROR("Expect TASK_INPUT, but got {}", (int)reply.type()); + + task_id_t task_id; + const std::string* msg; + + if (reply.type() == StreamTaskIOReply::CRANED_TASK_X11_INPUT) { + task_id = reply.payload_task_x11_input_req().task_id(); + msg = &reply.payload_task_x11_input_req().msg(); + CRANE_TRACE("TASK_X11_INPUT: task id #{}.", task_id); + } else if (reply.type() == StreamTaskIOReply::CRANED_TASK_INPUT) { + task_id = reply.payload_task_input_req().task_id(); + msg = &reply.payload_task_input_req().msg(); + CRANE_TRACE("TASK_INPUT: task id #{}.", task_id); + } else [[unlikely]] { + CRANE_ERROR("Expect TASK_INPUT or TASK_X11_INPUT, but got {}", + (int)reply.type()); break; } - task_id_t task_id = reply.payload_task_input_req().task_id(); - const std::string& msg = reply.payload_task_input_req().msg(); - m_mtx_.Lock(); auto fwd_meta_it = m_task_fwd_meta_map_.find(task_id); if (fwd_meta_it != m_task_fwd_meta_map_.end()) { TaskFwdMeta& meta = fwd_meta_it->second; - if (!meta.input_stopped) meta.input_stopped = !meta.input_cb(msg); + if (reply.type() == StreamTaskIOReply::CRANED_TASK_X11_INPUT) { + if (!meta.x11_input_stopped) + meta.x11_input_stopped = !meta.x11_input_cb(*msg); + } else // CRANED_TASK_INPUT + if (!meta.input_stopped) meta.input_stopped = !meta.input_cb(*msg); + } else { - CRANE_ERROR("Cfored {} trying to send msg to unknown task #{}", + CRANE_ERROR("Cfored {} trying to send data to unknown task #{}", m_cfored_name_, task_id); } - m_mtx_.Unlock(); reply.Clear(); @@ -278,6 +318,13 @@ void CforedClient::InitTaskFwdAndSetInputCb( m_task_fwd_meta_map_[task_id].input_cb = std::move(task_input_cb); } +void CforedClient::SetX11FwdInputCb( + task_id_t task_id, + std::function task_x11_input_cb) { + absl::MutexLock lock(&m_mtx_); + m_task_fwd_meta_map_[task_id].x11_input_cb = std::move(task_x11_input_cb); +} + bool CforedClient::TaskOutputFinish(task_id_t task_id) { absl::MutexLock lock(&m_mtx_); auto& task_fwd_meta = m_task_fwd_meta_map_.at(task_id); @@ -298,6 +345,13 @@ void CforedClient::TaskOutPutForward(task_id_t task_id, m_output_queue_.enqueue({task_id, msg}); } +void CforedClient::TaskX11OutPutForward(task_id_t task_id, + std::unique_ptr&& data, + size_t len) { + CRANE_TRACE("Receive TaskX11OutPutForward from task #{}.", task_id); + m_x11_output_queue_.enqueue({task_id, std::move(data), len}); +} + bool CforedManager::Init() { m_loop_ = uvw::loop::create(); @@ -345,24 +399,20 @@ void CforedManager::EvLoopThread_(const std::shared_ptr& uvw_loop) { m_loop_->run(); } -void CforedManager::RegisterIOForward(std::string const& cfored, - task_id_t task_id, int task_in_fd, - int task_out_fd, bool pty) { - RegisterElem elem{.cfored = cfored, - .task_id = task_id, - .task_input_fd = task_in_fd, - .task_output_fd = task_out_fd, - .pty = pty}; - std::promise done; - std::future done_fut = done.get_future(); +void CforedManager::RegisterIOForward(const RegisterElem& elem, + RegisterResult* result) { + std::promise done; + std::future done_fut = done.get_future(); m_register_queue_.enqueue(std::make_pair(std::move(elem), std::move(done))); m_register_handle_->send(); - done_fut.wait(); + *result = done_fut.get(); } void CforedManager::RegisterCb_() { - std::pair> p; + std::pair> p; + RegisterResult result{.ok = true}; + while (m_register_queue_.try_dequeue(p)) { RegisterElem& elem = p.first; if (m_cfored_client_map_.contains(elem.cfored)) { @@ -376,8 +426,7 @@ void CforedManager::RegisterCb_() { } m_cfored_client_map_[elem.cfored]->InitTaskFwdAndSetInputCb( - elem.task_id, - [fd = elem.task_input_fd](const std::string& msg) -> bool { + elem.task_id, [fd = elem.task_in_fd](const std::string& msg) -> bool { ssize_t sz_sent = 0, sz_written; while (sz_sent != msg.size()) { sz_written = write(fd, msg.c_str() + sz_sent, msg.size() - sz_sent); @@ -391,18 +440,17 @@ void CforedManager::RegisterCb_() { return true; }); - CRANE_TRACE("Registering fd {} for outputs of task #{}", - elem.task_output_fd, elem.task_id); - auto poll_handle = m_loop_->resource(elem.task_output_fd); - poll_handle->on([this, elem = std::move(elem)]( - const uvw::poll_event&, - uvw::poll_handle& h) { + CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.task_out_fd, + elem.task_id); + auto poll_handle = m_loop_->resource(elem.task_out_fd); + poll_handle->on([this, elem = elem](const uvw::poll_event&, + uvw::poll_handle& h) { CRANE_TRACE("Detect task #{} output.", elem.task_id); constexpr int MAX_BUF_SIZE = 4096; char buf[MAX_BUF_SIZE]; - auto ret = read(elem.task_output_fd, buf, MAX_BUF_SIZE); + auto ret = read(elem.task_out_fd, buf, MAX_BUF_SIZE); bool read_finished{false}; if (ret == 0) { @@ -441,7 +489,7 @@ void CforedManager::RegisterCb_() { CRANE_TRACE("Task #{} to cfored {} finished its output.", elem.task_id, elem.cfored); h.close(); - close(elem.task_output_fd); + close(elem.task_out_fd); bool ok_to_free = m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); @@ -458,11 +506,19 @@ void CforedManager::RegisterCb_() { m_cfored_client_map_[elem.cfored]->TaskOutPutForward(elem.task_id, output); }); + int ret = poll_handle->start(uvw::poll_handle::poll_event_flags::READABLE); - if (ret < 0) + if (ret < 0) { CRANE_ERROR("poll_handle->start() error: {}", uv_strerror(ret)); + result.ok = false; + } + + if (elem.x11_enable_forwarding) { + CRANE_TRACE("Registering X11 forwarding for task #{}", elem.task_id); + result.x11_port = SetupX11forwarding_(elem.cfored, elem.task_id); + } - p.second.set_value(true); + p.second.set_value(result); } } @@ -491,6 +547,151 @@ void CforedManager::TaskStopCb_() { } } +uint16_t CforedManager::SetupX11forwarding_(std::string const& cfored, + task_id_t task_id) { + int ret; + + auto x11_proxy_h = m_loop_->resource(); + constexpr uint16_t x11_port_begin = 6020; + constexpr uint16_t x11_port_end = 6100; + + auto x11_data = std::make_shared(); + x11_data->proxy_handle = x11_proxy_h; + + x11_proxy_h->data(x11_data); + m_task_id_to_x11_map_.emplace(task_id, x11_data); + + uint16_t port; + for (port = x11_port_begin; port < x11_port_end; ++port) { + ret = x11_proxy_h->bind("127.0.0.1", port); + if (ret == -1) { + CRANE_TRACE("Failed to bind x11 proxy port {} for task #{}: {}.", port, + task_id, strerror(errno)); + } else + break; + } + + if (port == x11_port_end) { + CRANE_ERROR("Failed to bind x11 proxy port within {}~{}!", x11_port_begin, + x11_port_end - 1); + x11_proxy_h->close(); + return 0; + } + + CRANE_TRACE("X11 proxy for task #{} was bound to port {}. Start listening.", + task_id, port); + ret = x11_proxy_h->listen(); + if (ret == -1) { + CRANE_ERROR("Failed to listening on x11 proxy port {}: {}.", port, + strerror(errno)); + x11_proxy_h->close(); + return 0; + } + + x11_proxy_h->on( + [this, cfored, task_id](uvw::listen_event& e, uvw::tcp_handle& h) { + CRANE_TRACE("Accepting connection on x11 proxy of task #{}", task_id); + + auto sock = h.parent().resource(); + + sock->on([this, cfored, task_id](uvw::data_event& e, + uvw::tcp_handle& s) { + CRANE_TRACE("Read x11 output from task #{}. Forwarding...", task_id); + m_cfored_client_map_[cfored]->TaskX11OutPutForward( + task_id, std::move(e.data), e.length); + }); + sock->on( + [this, task_id](const uvw::write_event&, uvw::tcp_handle&) { + CRANE_TRACE("Write x11 input to task #{} done.", task_id); + }); + + sock->on( + [task_id](uvw::end_event&, uvw::tcp_handle& h) { + // EOF + CRANE_TRACE("X11 proxy connection of task #{} ended. Closing it.", + task_id); + if (auto* p = h.data().get(); p) + p->sock_stopped = true; + h.close(); + }); + sock->on( + [task_id](uvw::error_event& e, uvw::tcp_handle& h) { + CRANE_ERROR("Error on x11 proxy of task #{}: {}. Closing it.", + task_id, e.what()); + if (auto* p = h.data().get(); p) + p->sock_stopped = true; + h.close(); + }); + sock->on([task_id](uvw::close_event&, + uvw::tcp_handle& h) { + CRANE_TRACE("X11 proxy connection of task #{} was closed.", task_id); + }); + + h.accept(*sock); + + h.data()->fd = sock->fd(); + h.data()->sock = sock; + sock->read(); + + // Currently only 1 connection of x11 client will be accepted. + // Close it once we accept one x11 client. + h.close(); + }); + + x11_proxy_h->on( + [task_id](const uvw::close_event&, uvw::tcp_handle&) { + CRANE_TRACE("X11 proxy listening port of task #{} closed.", task_id); + }); + + x11_proxy_h->on( + [task_id](const uvw::error_event& e, uvw::tcp_handle& h) { + CRANE_ERROR("Error on x11 proxy of task #{}: {}", task_id, e.what()); + h.close(); + }); + x11_proxy_h->on([task_id](uvw::end_event&, + uvw::tcp_handle& h) { + CRANE_TRACE("X11 proxy listening port of task #{} received EOF.", task_id); + h.close(); + }); + + m_cfored_client_map_[cfored]->SetX11FwdInputCb( + task_id, [x11_data](const std::string& msg) -> bool { + int fd = x11_data->fd; + ssize_t sz_sent = 0, sz_written; + while (sz_sent != msg.size()) { + sz_written = write(fd, msg.c_str() + sz_sent, msg.size() - sz_sent); + if (sz_written < 0) { + CRANE_ERROR("Pipe to task x11 client was broken."); + return false; + } + + sz_sent += sz_written; + } + return true; + // if (x11_proxy_fd < 0) { + // CRANE_ERROR("Invalid x11 proxy fd: {}", x11_proxy_fd); + // } + // + // if (x11_data->sock_stopped) { + // CRANE_TRACE("Sock has stopped. Ignoring forwarding.."); + // return false; + // } + // + // CRANE_TRACE("Writing X11 to fd {}", x11_proxy_fd); + // + // std::unique_ptr data(new char[msg.size()]); + // memcpy(data.get(), msg.data(), msg.size()); + // int r = x11_data->sock->write(std::move(data), msg.size()); + // + // CRANE_TRACE("Writing X11 to fd {} result: {}", x11_proxy_fd, r); + // return r < 0; + }); + + CRANE_TRACE("Registering x11 outputs of task #{}", task_id); + + return port; +} + void CforedManager::UnregisterIOForward_(const std::string& cfored, task_id_t task_id) { UnregisterElem elem{.cfored = cfored, .task_id = task_id}; @@ -504,6 +705,15 @@ void CforedManager::UnregisterCb_() { const std::string& cfored = elem.cfored; task_id_t task_id = elem.task_id; + auto it = m_task_id_to_x11_map_.find(elem.task_id); + if (it != m_task_id_to_x11_map_.end()) { + X11FdInfo* x11_fd_info = it->second.get(); + x11_fd_info->proxy_handle->close(); + if (x11_fd_info->sock) x11_fd_info->sock->close(); + + m_task_id_to_x11_map_.erase(it); + } + auto count = m_cfored_client_ref_count_map_[cfored]; if (count == 1) { m_cfored_client_ref_count_map_.erase(cfored); diff --git a/src/Craned/CforedClient.h b/src/Craned/CforedClient.h index b44239aac..3db2d939a 100644 --- a/src/Craned/CforedClient.h +++ b/src/Craned/CforedClient.h @@ -40,8 +40,15 @@ class CforedClient { void InitTaskFwdAndSetInputCb( task_id_t task_id, std::function task_input_cb); + void SetX11FwdInputCb( + task_id_t task_id, + std::function task_x11_input_cb); + void TaskOutPutForward(task_id_t task_id, const std::string& msg); + void TaskX11OutPutForward(task_id_t task_id, std::unique_ptr&& data, + size_t len); + bool TaskOutputFinish(task_id_t task_id); bool TaskProcessStop(task_id_t task_id); @@ -50,6 +57,10 @@ class CforedClient { struct TaskFwdMeta { std::function input_cb; bool input_stopped{false}; + + std::function x11_input_cb; + bool x11_input_stopped{false}; + bool output_stopped{false}; bool proc_stopped{false}; }; @@ -59,7 +70,14 @@ class CforedClient { crane::grpc::StreamTaskIOReply>* stream, std::atomic* write_pending); + ConcurrentQueue> m_input_queue_; ConcurrentQueue> m_output_queue_; + + ConcurrentQueue, size_t>> + m_x11_input_queue_; + ConcurrentQueue, size_t>> + m_x11_output_queue_; + std::thread m_fwd_thread_; std::atomic m_stopped_{false}; @@ -80,24 +98,30 @@ class CforedManager { using ConcurrentQueue = moodycamel::ConcurrentQueue; public: + struct RegisterElem { + std::string cfored; + task_id_t task_id; + int task_in_fd; + int task_out_fd; + bool pty; + + bool x11_enable_forwarding; + }; + + struct RegisterResult { + bool ok; + uint16_t x11_port; + }; + CforedManager() = default; ~CforedManager(); bool Init(); - void RegisterIOForward(std::string const& cfored, task_id_t task_id, - int task_in_fd, int task_out_fd, bool pty); + void RegisterIOForward(const RegisterElem& elem, RegisterResult* result); void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id); private: - struct RegisterElem { - std::string cfored; - task_id_t task_id; - int task_input_fd; - int task_output_fd; - bool pty; - }; - struct TaskStopElem { std::string cfored; task_id_t task_id; @@ -108,6 +132,15 @@ class CforedManager { task_id_t task_id; }; + struct X11FdInfo { + int fd; + std::shared_ptr sock; + std::shared_ptr proxy_handle; + std::atomic sock_stopped; + }; + + uint16_t SetupX11forwarding_(std::string const& cfored, task_id_t task_id); + void UnregisterIOForward_(std::string const& cfored, task_id_t task_id); void EvLoopThread_(const std::shared_ptr& uvw_loop); @@ -117,7 +150,7 @@ class CforedManager { std::thread m_ev_loop_thread_; std::shared_ptr m_register_handle_; - ConcurrentQueue>> + ConcurrentQueue>> m_register_queue_; void RegisterCb_(); @@ -134,6 +167,9 @@ class CforedManager { std::unordered_map m_cfored_client_ref_count_map_; + + std::unordered_map> + m_task_id_to_x11_map_; }; } // namespace Craned diff --git a/src/Craned/CranedPreCompiledHeader.h b/src/Craned/CranedPreCompiledHeader.h index e8a431b5f..ad974d9be 100644 --- a/src/Craned/CranedPreCompiledHeader.h +++ b/src/Craned/CranedPreCompiledHeader.h @@ -73,9 +73,15 @@ // fpm #include +// fmt +#include + // Concurrent queue #include +// subprocess +#include + // Include the header which defines the static log level #include "crane/Logger.h" diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index f115f5455..01f910c9c 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -43,6 +43,10 @@ bool TaskInstance::IsCalloc() const { crane::grpc::Calloc; } +CrunMetaInTaskInstance* TaskInstance::GetCrunMeta() const { + return dynamic_cast(this->meta.get()); +} + EnvMap TaskInstance::GetTaskEnvMap() const { std::unordered_map env_map; // Crane Env will override user task env; @@ -89,15 +93,19 @@ EnvMap TaskInstance::GetTaskEnvMap() const { if (ia_meta.x11()) { auto const& x11_meta = ia_meta.x11_meta(); + + std::string target = + ia_meta.x11_meta().enable_forwarding() ? "" : x11_meta.target(); env_map["DISPLAY"] = - fmt::format("{}:{}", x11_meta.target(), x11_meta.port()); + fmt::format("{}:{}", target, this->GetCrunMeta()->x11_port - 6000); + env_map["XAUTHORITY"] = this->GetCrunMeta()->x11_auth_path; } } int64_t time_limit_sec = this->task.time_limit().seconds(); - int hours = time_limit_sec / 3600; - int minutes = (time_limit_sec % 3600) / 60; - int seconds = time_limit_sec % 60; + int64_t hours = time_limit_sec / 3600; + int64_t minutes = (time_limit_sec % 3600) / 60; + int64_t seconds = time_limit_sec % 60; std::string time_limit = fmt::format("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds); env_map.emplace("CRANE_TIMELIMIT", time_limit); @@ -494,7 +502,7 @@ void TaskManager::Wait() { } CraneErrCode TaskManager::KillProcessInstance_(const ProcessInstance* proc, - int signum) { + int signum) { // Todo: Add timer which sends SIGTERM for those tasks who // will not quit when receiving SIGINT. if (proc) { @@ -519,7 +527,7 @@ void TaskManager::SetSigintCallback(std::function cb) { } CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, - ProcessInstance* process) { + ProcessInstance* process) { using google::protobuf::io::FileInputStream; using google::protobuf::io::FileOutputStream; using google::protobuf::util::ParseDelimitedFromZeroCopyStream; @@ -546,6 +554,36 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, return CraneErrCode::ERR_CGROUP; } + if (instance->IsCrun() && instance->task.interactive_meta().x11()) { + auto* inst_crun_meta = instance->GetCrunMeta(); + const PasswordEntry& pwd_entry = instance->pwd_entry; + + inst_crun_meta->x11_auth_path = + fmt::sprintf("%s/.crane/xauth/.Xauthority-XXXXXX", pwd_entry.HomeDir()); + + bool ok = util::os::CreateFoldersForFileEx( + inst_crun_meta->x11_auth_path, pwd_entry.Uid(), pwd_entry.Gid(), 0700); + if (!ok) { + CRANE_ERROR("Failed to create xauth source file for task #{}", + instance->task.task_id()); + return CraneErrCode::ERR_SYSTEM_ERR; + } + + // Default file permission is 0600. + int xauth_fd = mkstemp(inst_crun_meta->x11_auth_path.data()); + if (xauth_fd == -1) { + CRANE_ERROR("mkstemp() for xauth file failed: {}\n", strerror(errno)); + return CraneErrCode::ERR_SYSTEM_ERR; + } + + int ret = + fchown(xauth_fd, instance->pwd_entry.Uid(), instance->pwd_entry.Gid()); + if (ret == -1) { + CRANE_ERROR("fchown() for xauth file failed: {}\n", strerror(errno)); + return CraneErrCode::ERR_SYSTEM_ERR; + } + } + if (socketpair(AF_UNIX, SOCK_STREAM, 0, ctrl_sock_pair) != 0) { CRANE_ERROR("[Task #{}] Failed to create socket pair: {}", instance->task.task_id(), strerror(errno)); @@ -559,6 +597,7 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, if (instance->IsCrun()) { auto* crun_meta = dynamic_cast(instance->meta.get()); + launch_pty = instance->task.interactive_meta().pty(); CRANE_DEBUG("[Task #{}] Launch crun pty: {}", instance->task.task_id(), launch_pty); @@ -597,6 +636,9 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, CRANE_DEBUG("[Task #{}] Subprocess was created with pid: {}", instance->task.task_id(), child_pid); + CanStartMessage msg; + ChildProcessReady child_process_ready; + if (instance->IsCrun()) { auto* meta = dynamic_cast(instance->meta.get()); if (launch_pty) { @@ -605,10 +647,35 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, meta->task_input_fd = crun_pty_fd; meta->task_output_fd = crun_pty_fd; } - g_cfored_manager->RegisterIOForward( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id(), meta->task_input_fd, meta->task_output_fd, - launch_pty); + + const auto& proto_ia_meta = instance->task.interactive_meta(); + CforedManager::RegisterElem reg_elem{ + .cfored = proto_ia_meta.cfored_name(), + .task_id = instance->task.task_id(), + .task_in_fd = meta->task_input_fd, + .task_out_fd = meta->task_output_fd, + .pty = launch_pty, + .x11_enable_forwarding = proto_ia_meta.x11() && + proto_ia_meta.x11_meta().enable_forwarding(), + }; + + CforedManager::RegisterResult result; + g_cfored_manager->RegisterIOForward(reg_elem, &result); + + instance->GetCrunMeta()->x11_port = result.x11_port; + if (reg_elem.x11_enable_forwarding) { + instance->GetCrunMeta()->x11_port = result.x11_port; + msg.set_x11_port(result.x11_port); + } else { + uint32_t port = proto_ia_meta.x11_meta().port(); + instance->GetCrunMeta()->x11_port = port; + msg.set_x11_port(port); + } + + CRANE_TRACE("Crun task #{} x11 enabled: {}, forwarding: {}, port: {}", + reg_elem.task_id, proto_ia_meta.x11(), + proto_ia_meta.x11_meta().enable_forwarding(), + instance->GetCrunMeta()->x11_port); close(craned_crun_pipe[0]); close(crun_craned_pipe[1]); } @@ -619,25 +686,6 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, bool ok; FileInputStream istream(ctrl_fd); FileOutputStream ostream(ctrl_fd); - CanStartMessage msg; - ChildProcessReady child_process_ready; - - // Add event for stdout/stderr of the new subprocess - // struct bufferevent* ev_buf_event; - // ev_buf_event = - // bufferevent_socket_new(m_ev_base_, fd, BEV_OPT_CLOSE_ON_FREE); - // if (!ev_buf_event) { - // CRANE_ERROR( - // "Error constructing bufferevent for the subprocess of task #!", - // instance->task.task_id()); - // err = CraneErrCode::kLibEventError; - // goto AskChildToSuicide; - // } - // bufferevent_setcb(ev_buf_event, EvSubprocessReadCb_, nullptr, nullptr, - // (void*)process.get()); - // bufferevent_enable(ev_buf_event, EV_READ); - // bufferevent_disable(ev_buf_event, EV_WRITE); - // process->SetEvBufEvent(ev_buf_event); // Migrate the new subprocess to newly created cgroup if (!instance->cgroup->MigrateProcIn(child_pid)) { @@ -852,41 +900,91 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, if (instance->task.type() == crane::grpc::Batch) close(0); util::os::CloseFdFrom(3); + // Set up x11 authority file if enabled. + if (instance->IsCrun() && instance->task.interactive_meta().x11()) { + auto* inst_crun_meta = instance->GetCrunMeta(); + const auto& proto_x11_meta = instance->task.interactive_meta().x11_meta(); + + // Overwrite x11_port with real value from parent process. + inst_crun_meta->x11_port = msg.x11_port(); + + std::string x11_target = proto_x11_meta.enable_forwarding() + ? g_config.Hostname + : proto_x11_meta.target(); + std::string x11_disp_fmt = + proto_x11_meta.enable_forwarding() ? "%s/unix:%u" : "%s:%u"; + + std::string display = fmt::sprintf(x11_disp_fmt, x11_target, + inst_crun_meta->x11_port - 6000); + + std::vector xauth_argv{ + "/usr/bin/xauth", + "-v", + "-f", + inst_crun_meta->x11_auth_path.c_str(), + "add", + display.c_str(), + "MIT-MAGIC-COOKIE-1", + proto_x11_meta.cookie().c_str(), + }; + std::string xauth_cmd = absl::StrJoin(xauth_argv, ","); + + xauth_argv.push_back(nullptr); + + subprocess_s subprocess; + int result = subprocess_create(xauth_argv.data(), 0, &subprocess); + if (0 != result) { + fmt::print( + stderr, + "[Craned Subprocess] xauth subprocess creation failed: {}.\n", + strerror(errno)); + std::abort(); + } + + auto buf = std::make_unique(4096); + std::string xauth_stdout_str, xauth_stderr_str; + + std::FILE* cmd_fd = subprocess_stdout(&subprocess); + while (std::fgets(buf.get(), 4096, cmd_fd) != nullptr) + xauth_stdout_str.append(buf.get()); + + cmd_fd = subprocess_stderr(&subprocess); + while (std::fgets(buf.get(), 4096, cmd_fd) != nullptr) + xauth_stderr_str.append(buf.get()); + + if (0 != subprocess_join(&subprocess, &result)) + fmt::print(stderr, "[Craned Subprocess] xauth join failed.\n"); + + if (0 != subprocess_destroy(&subprocess)) + fmt::print(stderr, "[Craned Subprocess] xauth destroy failed.\n"); + + if (result != 0) { + fmt::print(stderr, "[Craned Subprocess] xauth return with {}.\n", + result); + fmt::print(stderr, "[Craned Subprocess] xauth stdout: {}\n", + xauth_stdout_str); + fmt::print(stderr, "[Craned Subprocess] xauth stderr: {}\n", + xauth_stderr_str); + } + } + EnvMap task_env_map = instance->GetTaskEnvMap(); EnvMap res_env_map = CgroupManager::GetResourceEnvMapByResInNode(res_in_node.value()); - if (clearenv()) { - fmt::print(stderr, "[Craned Subprocess] Warning: clearenv() failed.\n"); - } - - auto FuncSetEnv = - [](const std::unordered_map& v) { - for (const auto& [name, value] : v) - if (setenv(name.c_str(), value.c_str(), 1)) - fmt::print( - stderr, - "[Craned Subprocess] Warning: setenv() for {}={} failed.\n", - name, value); - }; + // clearenv() should be called just before fork! + if (clearenv()) + fmt::print(stderr, "[Craned Subprocess] clearenv() failed.\n"); + auto FuncSetEnv = [](const EnvMap& v) { + for (const auto& [name, value] : v) + if (setenv(name.c_str(), value.c_str(), 1)) + fmt::print(stderr, "[Craned Subprocess] setenv() for {}={} failed.\n", + name, value); + }; FuncSetEnv(task_env_map); FuncSetEnv(res_env_map); - if (instance->IsCrun() && instance->task.interactive_meta().x11()) { - auto const& x11_meta = instance->task.interactive_meta().x11_meta(); - std::string xauth_cmd = - fmt::format("xauth add {}:{} . {}", x11_meta.target(), - x11_meta.port(), x11_meta.cookie()); - - // FIXME: Shell injection vulnerability - rc = std::system(xauth_cmd.c_str()); - if (rc != 0) { - fmt::print(stderr, "[Craned Subprocess] Error: xauth failed.\n"); - std::abort(); - } - } - // Prepare the command line arguments. std::vector argv; @@ -914,7 +1012,7 @@ CraneErrCode TaskManager::SpawnProcessInInstance_(TaskInstance* instance, strerror(errno)); // TODO: See https://tldp.org/LDP/abs/html/exitcodes.html, return standard // exit codes - abort(); + std::abort(); } } @@ -1215,7 +1313,8 @@ void TaskManager::EvCleanGrpcQueryTaskIdFromPidQueueCb_() { auto task_iter = m_pid_task_map_.find(elem.pid); if (task_iter == m_pid_task_map_.end()) - elem.task_id_prom.set_value(std::unexpected(CraneErrCode::ERR_SYSTEM_ERR)); + elem.task_id_prom.set_value( + std::unexpected(CraneErrCode::ERR_SYSTEM_ERR)); else { TaskInstance* instance = task_iter->second; uint32_t task_id = instance->task.task_id(); @@ -1295,12 +1394,18 @@ void TaskManager::EvCleanTerminateTaskQueueCb_() { TaskInstance* task_instance = iter->second.get(); - if (elem.terminated_by_user) task_instance->cancelled_by_user = true; + int sig = SIGTERM; // For BatchTask + if (task_instance->IsCrun()) sig = SIGHUP; + if (elem.mark_as_orphaned) task_instance->orphaned = true; if (elem.terminated_by_timeout) task_instance->terminated_by_timeout = true; + if (elem.terminated_by_user) { + task_instance->cancelled_by_user = true; - int sig = SIGTERM; // For BatchTask - if (task_instance->IsCrun()) sig = SIGHUP; + // If termination request is sent by user, send SIGKILL to ensure that + // even freezing processes will be terminated immediately. + sig = SIGKILL; + } if (!task_instance->processes.empty()) { // For an Interactive task with a process running or a Batch task, we diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index c9d312719..d0d6caca7 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -119,9 +119,14 @@ struct BatchMetaInTaskInstance : MetaInTaskInstance { }; struct CrunMetaInTaskInstance : MetaInTaskInstance { + ~CrunMetaInTaskInstance() override = default; + int task_input_fd; int task_output_fd; - ~CrunMetaInTaskInstance() override = default; + + std::string x11_target; + uint16_t x11_port; + std::string x11_auth_path; }; // also arg for EvSigchldTimerCb_ @@ -141,16 +146,30 @@ struct TaskInstance { } if (this->IsCrun()) { - auto* ia_meta = dynamic_cast(meta.get()); - close(ia_meta->task_input_fd); + auto* crun_meta = GetCrunMeta(); + + close(crun_meta->task_input_fd); // For crun pty job, avoid close same fd twice - if (ia_meta->task_output_fd != ia_meta->task_input_fd) - close(ia_meta->task_output_fd); + if (crun_meta->task_output_fd != crun_meta->task_input_fd) + close(crun_meta->task_output_fd); + + if (!crun_meta->x11_auth_path.empty() && + !absl::EndsWith(crun_meta->x11_auth_path, "XXXXXX")) { + std::error_code ec; + bool ok = std::filesystem::remove(crun_meta->x11_auth_path, ec); + if (!ok) + CRANE_ERROR("Failed to remove x11 auth {} for task #{}: {}", + crun_meta->x11_auth_path, this->task.task_id(), + ec.message()); + } } } bool IsCrun() const; bool IsCalloc() const; + + CrunMetaInTaskInstance* GetCrunMeta() const; + EnvMap GetTaskEnvMap() const; crane::grpc::TaskToD task; diff --git a/src/Utilities/PublicHeader/OS.cpp b/src/Utilities/PublicHeader/OS.cpp index 127ac0fbc..e46652665 100644 --- a/src/Utilities/PublicHeader/OS.cpp +++ b/src/Utilities/PublicHeader/OS.cpp @@ -19,6 +19,7 @@ #include "crane/OS.h" #if defined(__linux__) || defined(__unix__) +# include # include # include #elif defined(_WIN32) @@ -63,6 +64,40 @@ bool CreateFoldersForFile(std::string const& p) { return true; } +bool CreateFoldersForFileEx(const std::string& p, uid_t owner, gid_t group, + mode_t permissions = 0755) { + namespace fs = std::filesystem; + + try { + fs::path dir_path = p; + dir_path = dir_path.parent_path(); + + fs::path current_dir; + for (auto& part : dir_path) { + current_dir /= part; + if (!fs::exists(current_dir)) { + if (mkdir(current_dir.c_str(), permissions) != 0) { + CRANE_ERROR("Failed to create directory {}: {}", current_dir.c_str(), + strerror(errno)); + return false; + } + } + + if (chown(current_dir.c_str(), owner, group) != 0) { + CRANE_ERROR("Failed to change ownership of directory {}: {}", + current_dir.c_str(), strerror(errno)); + return false; + } + } + } catch (const std::exception& e) { + CRANE_ERROR("Failed to create folder for {}: {}", p.c_str(), + e.what()); + return false; + } + + return true; +} + int GetFdOpenMax() { return static_cast(sysconf(_SC_OPEN_MAX)); } void CloseFdRange(int fd_begin, int fd_end) { diff --git a/src/Utilities/PublicHeader/include/crane/OS.h b/src/Utilities/PublicHeader/include/crane/OS.h index 03f05fa23..1bc3f6824 100644 --- a/src/Utilities/PublicHeader/include/crane/OS.h +++ b/src/Utilities/PublicHeader/include/crane/OS.h @@ -44,6 +44,9 @@ bool CreateFolders(std::string const& p); bool CreateFoldersForFile(std::string const& p); +bool CreateFoldersForFileEx(const std::string& p, uid_t owner, gid_t group, + mode_t permissions); + // Close file descriptors within [fd_begin, fd_end) void CloseFdRange(int fd_begin, int fd_end);