From 9f4b19c0ff344ecfbc0b34c97409929a7488b430 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Tue, 25 Feb 2025 17:20:27 +0800 Subject: [PATCH] [Fix][Core] Execute user requested task exit in C++ side Closes: ray-project/ray#49451 Signed-off-by: Chi-Sheng Liu --- python/ray/_raylet.pyx | 4 +++ python/ray/actor.py | 1 + python/ray/includes/libcoreworker.pxd | 1 + python/ray/tests/test_actor_failures.py | 39 +++++++++++++++++++++++++ src/ray/core_worker/context.cc | 11 +++++++ src/ray/core_worker/context.h | 5 ++++ src/ray/core_worker/core_worker.cc | 5 ++++ 7 files changed, 66 insertions(+) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5efe19e3615f..d6251f5ff97e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4621,6 +4621,10 @@ cdef class CoreWorker: return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext() .CurrentActorIsAsync()) + def exit_current_actor(self): + return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext() + .SetCurrentActorShouldExit()) + def current_actor_max_concurrency(self): return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext() .CurrentActorMaxConcurrency()) diff --git a/python/ray/actor.py b/python/ray/actor.py index 2a27389d2ce0..41ea2057f194 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1781,6 +1781,7 @@ def exit_actor(): # In asyncio actor mode, we can't raise SystemExit because it will just # quit the asycnio event loop thread, not the main thread. Instead, we # raise a custom error to the main thread to tell it to exit. + worker.core_worker.exit_current_actor() if worker.core_worker.current_actor_is_asyncio(): raise AsyncioActorExit() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9892bbf3f99e..d1d9bf49478b 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -92,6 +92,7 @@ cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil: cdef extern from "ray/core_worker/context.h" nogil: cdef cppclass CWorkerContext "ray::core::WorkerContext": c_bool CurrentActorIsAsync() + void SetCurrentActorShouldExit() const c_string &GetCurrentSerializedRuntimeEnv() int CurrentActorMaxConcurrency() const CActorID &GetRootDetachedActorID() diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index bf603b2197ec..29d0c0ce9180 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -989,6 +989,45 @@ def verify(): wait_for_condition(verify) +def test_exit_actor_async_actor_nested_task(shutdown_only, tmp_path): + async_temp_file = tmp_path / "async_actor.log" + async_temp_file.touch() + + @ray.remote + class AsyncActor: + def __init__(self): + def f(): + print("atexit handler") + with open(async_temp_file, "w") as f: + f.write("Async Actor\n") + + atexit.register(f) + + async def start_exit_task(self): + asyncio.create_task(self.exit()) + + async def exit(self): + exit_actor() + + async def normal_task(self): + pass + + a = AsyncActor.remote() + ray.get(a.__ray_ready__.remote()) + ray.get(a.start_exit_task.remote()) + # Wait for the actor to exit. + time.sleep(1) + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.normal_task.remote()) + + def verify(): + with open(async_temp_file) as f: + assert f.readlines() == ["Async Actor\n"] + return True + + wait_for_condition(verify) + + def test_exit_actor_queued(shutdown_only): """Verify after exit_actor is called the queued tasks won't execute.""" diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index d2b35724bf9c..59d0556039e4 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -394,6 +394,17 @@ bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; } +void WorkerContext::SetCurrentActorShouldExit() ABSL_LOCKS_EXCLUDED(mutex_) { + absl::WriterMutexLock lock(&mutex_); + RAY_CHECK(!current_actor_id_.IsNil()); + current_actor_should_exit_ = true; +} + +bool WorkerContext::CurrentActorShouldExit() const { + absl::ReaderMutexLock lock(&mutex_); + return current_actor_should_exit_; +} + bool WorkerContext::CurrentActorDetached() const { absl::ReaderMutexLock lock(&mutex_); return is_detached_actor_; diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 7b9cc010131f..6c49f86e5b86 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -117,6 +117,10 @@ class WorkerContext { bool CurrentActorIsAsync() const ABSL_LOCKS_EXCLUDED(mutex_); + void SetCurrentActorShouldExit() ABSL_LOCKS_EXCLUDED(mutex_); + + bool CurrentActorShouldExit() const ABSL_LOCKS_EXCLUDED(mutex_); + bool CurrentActorDetached() const ABSL_LOCKS_EXCLUDED(mutex_); uint64_t GetNextTaskIndex(); @@ -145,6 +149,7 @@ class WorkerContext { ActorID current_actor_id_ ABSL_GUARDED_BY(mutex_); int current_actor_max_concurrency_ ABSL_GUARDED_BY(mutex_) = 1; bool current_actor_is_asyncio_ ABSL_GUARDED_BY(mutex_) = false; + bool current_actor_should_exit_ ABSL_GUARDED_BY(mutex_) = false; bool is_detached_actor_ ABSL_GUARDED_BY(mutex_) = false; // The placement group id that the current actor belongs to. PlacementGroupID current_actor_placement_group_id_ ABSL_GUARDED_BY(mutex_); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 944bf84b05c4..f098f98daf89 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3156,6 +3156,11 @@ void CoreWorker::RunTaskExecutionLoop() { signal_checker->RunFnPeriodically( [this] { /// The overhead of this is only a single digit microsecond. + if (worker_context_.CurrentActorShouldExit()) { + Exit(rpc::WorkerExitType::INTENDED_USER_EXIT, + "User requested to exit the actor.", + nullptr); + } auto status = options_.check_signals(); if (status.IsIntentionalSystemExit()) { Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,