Skip to content

Commit

Permalink
[Fix][Core] Execute user requested actor exit in C++ side (#49918)
Browse files Browse the repository at this point in the history
When an asyncio task creates another asyncio task, raising
`AsyncioActorExit` cannot make the caller exit because they are not the
same task. Therefore, this PR makes `exit_actor` to request actor exit
in core worker context, which will be checked regularly by core worker.

Closes: #49451

---------

Signed-off-by: Chi-Sheng Liu <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
  • Loading branch information
MortalHappiness and edoakes authored Mar 7, 2025
1 parent 9b45c99 commit 9ed6ec6
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 39 deletions.
15 changes: 14 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,7 @@ cdef void execute_task(
# Execute the task.
with core_worker.profile_event(b"task:execute"):
task_exception = True
task_exception_instance = None
try:
if debugger_breakpoint != b"":
ray.util.pdb.set_trace(
Expand Down Expand Up @@ -1986,10 +1987,14 @@ cdef void execute_task(
" {}.".format(
core_worker.get_current_task_id()),
exc_info=True)
raise e
task_exception_instance = e
finally:
# Record the end of the task log.
worker.record_task_log_end(task_id, attempt_number)
if task_exception_instance is not None:
raise task_exception_instance
if core_worker.get_current_actor_should_exit():
raise_sys_exit_with_custom_error_message("exit_actor() is called.")

if (returns[0].size() == 1
and not inspect.isgenerator(outputs)
Expand Down Expand Up @@ -4671,6 +4676,14 @@ cdef class CoreWorker:
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorIsAsync())

def set_current_actor_should_exit(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.SetCurrentActorShouldExit())

def get_current_actor_should_exit(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.GetCurrentActorShouldExit())

def current_actor_max_concurrency(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorMaxConcurrency())
Expand Down
6 changes: 5 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,10 @@ def exit_actor():
This API can be used only inside an actor. Use ray.kill
API if you'd like to kill an actor using actor handle.
When the API is called, the actor raises an exception and exits.
When this API is called, an exception is raised and the actor
will exit immediately. For asyncio actors, there may be a short
delay before the actor exits if the API is called from a background
task.
Any queued methods will fail. Any ``atexit``
handlers installed in the actor will be run.
Expand All @@ -1781,6 +1784,7 @@ def exit_actor():
"""
worker = ray._private.worker.global_worker
if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil():
worker.core_worker.set_current_actor_should_exit()
# In asyncio actor mode, we can't raise SystemExit because it will just
# quit the asycnio event loop thread, not the main thread. Instead, we
# raise a custom error to the main thread to tell it to exit.
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil:
cdef extern from "ray/core_worker/context.h" nogil:
cdef cppclass CWorkerContext "ray::core::WorkerContext":
c_bool CurrentActorIsAsync()
void SetCurrentActorShouldExit()
c_bool GetCurrentActorShouldExit()
const c_string &GetCurrentSerializedRuntimeEnv()
int CurrentActorMaxConcurrency()
const CActorID &GetRootDetachedActorID()
Expand Down
253 changes: 216 additions & 37 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import ray
from ray.actor import exit_actor
from ray.exceptions import AsyncioActorExit
import ray.cluster_utils
from ray._private.test_utils import (
wait_for_condition,
Expand Down Expand Up @@ -891,7 +892,7 @@ def foo():
ray.get(ref)


def test_exit_actor(shutdown_only, tmp_path):
def test_exit_actor_invalid_usage_error(shutdown_only):
"""
Verify TypeError is raised when exit_actor is not used
inside an actor.
Expand All @@ -910,83 +911,261 @@ def f():
):
ray.get(f.remote())

"""
Verify the basic case.
"""

def test_exit_actor_normal_actor_raise_immediately(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class Actor:
def exit(self):
exit_actor()
def __init__(self):
def f():
temp_file_atexit.touch()

@ray.remote
class AsyncActor:
async def exit(self):
atexit.register(f)

def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote()
ray.get(a.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(a.exit.remote())
assert "exit_actor()" in str(exc_info.value)

b = AsyncActor.remote()
ray.get(b.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(b.exit.remote())
assert "exit_actor()" in str(exc_info.value)
def verify():
return temp_file_atexit.exists()

"""
Verify atexit handler is called correctly.
"""
sync_temp_file = tmp_path / "actor.log"
async_temp_file = tmp_path / "async_actor.log"
sync_temp_file.touch()
async_temp_file.touch()
wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_normal_actor_in_constructor_should_exit(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class Actor:
def __init__(self):
def f():
print("atexit handler")
with open(sync_temp_file, "w") as f:
f.write("Actor\n")
temp_file_atexit.touch()

atexit.register(f)
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_normal_actor_user_catch_err_should_still_exit(
shutdown_only, tmp_path
):
temp_file = tmp_path / "actor.log"
assert not temp_file.exists()

@ray.remote
class Actor:
def exit(self):
exit_actor()
try:
exit_actor()
except SystemExit:
pass

def create(self):
temp_file.touch()

a = Actor.remote()
ray.get(a.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.exit.remote())

with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.create.remote())

assert not temp_file.exists()


def test_exit_actor_async_actor_raise_immediately(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
print("atexit handler")
with open(async_temp_file, "w") as f:
f.write("Async Actor\n")
temp_file_atexit.touch()

atexit.register(f)

async def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote()
a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())

try:
ray.get(a.exit.remote())
except Exception:
pass

with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(a.exit.remote())
assert (
# Exited when task execution returns
"exit_actor()" in str(exc_info.value)
# Exited during periodical check in worker
or "User requested to exit the actor" in str(exc_info.value)
)

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_in_constructor_should_exit(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_user_catch_err_should_still_exit(
shutdown_only, tmp_path
):
temp_file = tmp_path / "actor.log"
assert not temp_file.exists()

@ray.remote
class AsyncActor:
async def exit(self):
try:
exit_actor()
except AsyncioActorExit:
pass

async def create(self):
temp_file.touch()

a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())
b = AsyncActor.remote()
ray.get(b.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.exit.remote())

with pytest.raises(ray.exceptions.RayActorError):
ray.get(b.exit.remote())
ray.get(a.create.remote())
assert not temp_file.exists()


def test_exit_actor_async_actor_nested_task(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

signal = SignalActor.remote()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)

async def start_exit_task(self, signal):
asyncio.create_task(self.exit(signal))

async def exit(self, signal):
await signal.wait.remote()
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())
ray.get(a.start_exit_task.remote(signal))
ray.get(signal.send.remote())

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_nested_task_in_constructor_should_exit(
shutdown_only, tmp_path
):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)
asyncio.create_task(self.exit())

async def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
with open(async_temp_file) as f:
assert f.readlines() == ["Async Actor\n"]
with open(sync_temp_file) as f:
assert f.readlines() == ["Actor\n"]
return True
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_queued(shutdown_only):
Expand Down
Loading

0 comments on commit 9ed6ec6

Please sign in to comment.