Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Core] Execute user requested actor exit in C++ side #49918

Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,7 @@ cdef void execute_task(
# Execute the task.
with core_worker.profile_event(b"task:execute"):
task_exception = True
task_exception_instance = None
try:
if debugger_breakpoint != b"":
ray.util.pdb.set_trace(
Expand Down Expand Up @@ -1986,10 +1987,14 @@ cdef void execute_task(
" {}.".format(
core_worker.get_current_task_id()),
exc_info=True)
raise e
task_exception_instance = e
finally:
# Record the end of the task log.
worker.record_task_log_end(task_id, attempt_number)
if task_exception_instance is not None:
raise task_exception_instance
if core_worker.get_current_actor_should_exit():
raise_sys_exit_with_custom_error_message("exit_actor() is called.")

if (returns[0].size() == 1
and not inspect.isgenerator(outputs)
Expand Down Expand Up @@ -4671,6 +4676,14 @@ cdef class CoreWorker:
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorIsAsync())

def set_current_actor_should_exit(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.SetCurrentActorShouldExit())

def get_current_actor_should_exit(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.GetCurrentActorShouldExit())

def current_actor_max_concurrency(self):
return (CCoreWorkerProcess.GetCoreWorker().GetWorkerContext()
.CurrentActorMaxConcurrency())
Expand Down
6 changes: 5 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,10 @@ def exit_actor():
This API can be used only inside an actor. Use ray.kill
API if you'd like to kill an actor using actor handle.

When the API is called, the actor raises an exception and exits.
When this API is called, an exception is raised and the actor
will exit immediately. For asyncio actors, there may be a short
delay before the actor exits if the API is called from a background
task.
Any queued methods will fail. Any ``atexit``
handlers installed in the actor will be run.

Expand All @@ -1781,6 +1784,7 @@ def exit_actor():
"""
worker = ray._private.worker.global_worker
if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil():
worker.core_worker.set_current_actor_should_exit()
# In asyncio actor mode, we can't raise SystemExit because it will just
# quit the asycnio event loop thread, not the main thread. Instead, we
# raise a custom error to the main thread to tell it to exit.
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil:
cdef extern from "ray/core_worker/context.h" nogil:
cdef cppclass CWorkerContext "ray::core::WorkerContext":
c_bool CurrentActorIsAsync()
void SetCurrentActorShouldExit()
c_bool GetCurrentActorShouldExit()
const c_string &GetCurrentSerializedRuntimeEnv()
int CurrentActorMaxConcurrency()
const CActorID &GetRootDetachedActorID()
Expand Down
253 changes: 216 additions & 37 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import ray
from ray.actor import exit_actor
from ray.exceptions import AsyncioActorExit
import ray.cluster_utils
from ray._private.test_utils import (
wait_for_condition,
Expand Down Expand Up @@ -891,7 +892,7 @@ def foo():
ray.get(ref)


def test_exit_actor(shutdown_only, tmp_path):
def test_exit_actor_invalid_usage_error(shutdown_only):
"""
Verify TypeError is raised when exit_actor is not used
inside an actor.
Expand All @@ -910,83 +911,261 @@ def f():
):
ray.get(f.remote())

"""
Verify the basic case.
"""

def test_exit_actor_normal_actor_raise_immediately(shutdown_only, tmp_path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test case for calling ray.exit_actor() within the constructor if there isn't one already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 3 tests: test_exit_actor_normal_actor_in_constructor_should_exit, test_exit_actor_async_actor_in_constructor_should_exit, and test_exit_actor_async_actor_nested_task_in_constructor_should_exit

temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class Actor:
def exit(self):
exit_actor()
def __init__(self):
def f():
temp_file_atexit.touch()

@ray.remote
class AsyncActor:
async def exit(self):
atexit.register(f)

def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote()
ray.get(a.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(a.exit.remote())
assert "exit_actor()" in str(exc_info.value)

b = AsyncActor.remote()
ray.get(b.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(b.exit.remote())
assert "exit_actor()" in str(exc_info.value)
def verify():
return temp_file_atexit.exists()

"""
Verify atexit handler is called correctly.
"""
sync_temp_file = tmp_path / "actor.log"
async_temp_file = tmp_path / "async_actor.log"
sync_temp_file.touch()
async_temp_file.touch()
wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_normal_actor_in_constructor_should_exit(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class Actor:
def __init__(self):
def f():
print("atexit handler")
with open(sync_temp_file, "w") as f:
f.write("Actor\n")
temp_file_atexit.touch()

atexit.register(f)
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_normal_actor_user_catch_err_should_still_exit(
shutdown_only, tmp_path
):
temp_file = tmp_path / "actor.log"
assert not temp_file.exists()

@ray.remote
class Actor:
def exit(self):
exit_actor()
try:
exit_actor()
except SystemExit:
pass

def create(self):
temp_file.touch()

a = Actor.remote()
ray.get(a.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.exit.remote())

with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.create.remote())

assert not temp_file.exists()


def test_exit_actor_async_actor_raise_immediately(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
print("atexit handler")
with open(async_temp_file, "w") as f:
f.write("Async Actor\n")
temp_file_atexit.touch()

atexit.register(f)

async def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = Actor.remote()
a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())

try:
ray.get(a.exit.remote())
except Exception:
pass

with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(a.exit.remote())
assert (
# Exited when task execution returns
"exit_actor()" in str(exc_info.value)
# Exited during periodical check in worker
or "User requested to exit the actor" in str(exc_info.value)
)
Comment on lines +1032 to +1037
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things for this test:

  1. We should add the same check that we have for the sync case that exit_actor() immediately raises an exception).
  2. I might be misguided, but I think we can have two test cases here instead of just checking for both errors. The first one can call exit_actor on the task directly executing the actor method and that should always return "exit_actor() was called...". The second one can call exit_actor in a background asyncio task and then assert that `"User requested to exit the actor" is eventually raised.


def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_in_constructor_should_exit(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_user_catch_err_should_still_exit(
shutdown_only, tmp_path
):
temp_file = tmp_path / "actor.log"
assert not temp_file.exists()

@ray.remote
class AsyncActor:
async def exit(self):
try:
exit_actor()
except AsyncioActorExit:
pass

async def create(self):
temp_file.touch()

a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())
b = AsyncActor.remote()
ray.get(b.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.exit.remote())

with pytest.raises(ray.exceptions.RayActorError):
ray.get(b.exit.remote())
ray.get(a.create.remote())
assert not temp_file.exists()


def test_exit_actor_async_actor_nested_task(shutdown_only, tmp_path):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

signal = SignalActor.remote()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)

async def start_exit_task(self, signal):
asyncio.create_task(self.exit(signal))

async def exit(self, signal):
await signal.wait.remote()
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote()
ray.get(a.__ray_ready__.remote())
ray.get(a.start_exit_task.remote(signal))
ray.get(signal.send.remote())

def verify():
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_async_actor_nested_task_in_constructor_should_exit(
shutdown_only, tmp_path
):
temp_file_atexit = tmp_path / "atexit.log"
temp_file_after_exit_actor = tmp_path / "after_exit_actor.log"
assert not temp_file_atexit.exists()
assert not temp_file_after_exit_actor.exists()

@ray.remote
class AsyncActor:
def __init__(self):
def f():
temp_file_atexit.touch()

atexit.register(f)
asyncio.create_task(self.exit())

async def exit(self):
exit_actor()
# The following code should not be executed.
temp_file_after_exit_actor.touch()

a = AsyncActor.remote() # noqa: F841 # Need to preserve the reference.

def verify():
with open(async_temp_file) as f:
assert f.readlines() == ["Async Actor\n"]
with open(sync_temp_file) as f:
assert f.readlines() == ["Actor\n"]
return True
return temp_file_atexit.exists()

wait_for_condition(verify)
time.sleep(3)
assert not temp_file_after_exit_actor.exists()


def test_exit_actor_queued(shutdown_only):
Expand Down
Loading