Skip to content

Commit

Permalink
[Fix][Core] Execute user requested task exit in C++ side
Browse files Browse the repository at this point in the history
Closes: #49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Feb 25, 2025
1 parent 7c8e1c8 commit 9f4b19c
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 0 deletions.
4 changes: 4 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4621,6 +4621,10 @@ cdef class CoreWorker:
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorIsAsync())

def exit_current_actor(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.SetCurrentActorShouldExit())

def current_actor_max_concurrency(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorMaxConcurrency())
Expand Down
1 change: 1 addition & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,7 @@ def exit_actor():
# In asyncio actor mode, we can't raise SystemExit because it will just
# quit the asycnio event loop thread, not the main thread. Instead, we
# raise a custom error to the main thread to tell it to exit.
worker.core_worker.exit_current_actor()
if worker.core_worker.current_actor_is_asyncio():
raise AsyncioActorExit()

Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil:
cdef extern from "ray/core_worker/context.h" nogil:
cdef cppclass CWorkerContext "ray::core::WorkerContext":
c_bool CurrentActorIsAsync()
void SetCurrentActorShouldExit()
const c_string &GetCurrentSerializedRuntimeEnv()
int CurrentActorMaxConcurrency()
const CActorID &GetRootDetachedActorID()
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,45 @@ def verify():
wait_for_condition(verify)


def test_exit_actor_async_actor_nested_task(shutdown_only, tmp_path):
async_temp_file = tmp_path / "async_actor.log"
async_temp_file.touch()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
print("atexit handler")
with open(async_temp_file, "w") as f:
f.write("Async Actor\n")

atexit.register(f)

async def start_exit_task(self):
asyncio.create_task(self.exit())

async def exit(self):
exit_actor()

async def normal_task(self):
pass

a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())
ray.get(a.start_exit_task.remote())
# Wait for the actor to exit.
time.sleep(1)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.normal_task.remote())

def verify():
with open(async_temp_file) as f:
assert f.readlines() == ["Async Actor\n"]
return True

wait_for_condition(verify)


def test_exit_actor_queued(shutdown_only):
"""Verify after exit_actor is called the queued tasks won't execute."""

Expand Down
11 changes: 11 additions & 0 deletions src/ray/core_worker/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,17 @@ bool WorkerContext::CurrentActorIsAsync() const {
return current_actor_is_asyncio_;
}

void WorkerContext::SetCurrentActorShouldExit() ABSL_LOCKS_EXCLUDED(mutex_) {
absl::WriterMutexLock lock(&mutex_);
RAY_CHECK(!current_actor_id_.IsNil());
current_actor_should_exit_ = true;
}

bool WorkerContext::CurrentActorShouldExit() const {
absl::ReaderMutexLock lock(&mutex_);
return current_actor_should_exit_;
}

bool WorkerContext::CurrentActorDetached() const {
absl::ReaderMutexLock lock(&mutex_);
return is_detached_actor_;
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class WorkerContext {

bool CurrentActorIsAsync() const ABSL_LOCKS_EXCLUDED(mutex_);

void SetCurrentActorShouldExit() ABSL_LOCKS_EXCLUDED(mutex_);

bool CurrentActorShouldExit() const ABSL_LOCKS_EXCLUDED(mutex_);

bool CurrentActorDetached() const ABSL_LOCKS_EXCLUDED(mutex_);

uint64_t GetNextTaskIndex();
Expand Down Expand Up @@ -145,6 +149,7 @@ class WorkerContext {
ActorID current_actor_id_ ABSL_GUARDED_BY(mutex_);
int current_actor_max_concurrency_ ABSL_GUARDED_BY(mutex_) = 1;
bool current_actor_is_asyncio_ ABSL_GUARDED_BY(mutex_) = false;
bool current_actor_should_exit_ ABSL_GUARDED_BY(mutex_) = false;
bool is_detached_actor_ ABSL_GUARDED_BY(mutex_) = false;
// The placement group id that the current actor belongs to.
PlacementGroupID current_actor_placement_group_id_ ABSL_GUARDED_BY(mutex_);
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3156,6 +3156,11 @@ void CoreWorker::RunTaskExecutionLoop() {
signal_checker->RunFnPeriodically(
[this] {
/// The overhead of this is only a single digit microsecond.
if (worker_context_.CurrentActorShouldExit()) {
Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
"User requested to exit the actor.",
nullptr);
}
auto status = options_.check_signals();
if (status.IsIntentionalSystemExit()) {
Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
Expand Down

0 comments on commit 9f4b19c

Please sign in to comment.