Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Core] Execute user requested actor exit in C++ side #49918

Conversation

MortalHappiness
Copy link
Member

@MortalHappiness MortalHappiness commented Jan 17, 2025

Why are these changes needed?

When an asyncio task creates another asyncio task, raising AsyncioActorExit cannot make the caller exit because they are not the same task. Therefore, this PR makes exit_actor to request actor exit in core worker context, which will be checked regularly by core worker.

This PR also removes AsyncioActorExit exception because it is no longer needed.

Related issue number

Closes: #49451

Follow-up: Unify actor exit codepaths

Core worker exit codepaths:

Either ForceExit or Exit:

  • Cancel task (called by ray.cancel python function):
    ForceExit(rpc::WorkerExitType::INTENDED_USER_EXIT,
  • Requested to kill actor by GCS:
  • Received ExitRequest RPC call:
    Exit(rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
  • Based on the status of task_execution_callback application-language callback:
    • status = options_.task_execution_callback(
    • if (status.IsCreationTaskError()) {
      Exit(rpc::WorkerExitType::USER_ERROR,
      absl::StrCat(
      "Worker exits because there was an exception in the initialization method "
      "(e.g., __init__). Fix the exceptions from the initialization to resolve "
      "the issue. ",
      status.message()),
      creation_task_exception_pb_bytes);
      } else if (status.IsIntentionalSystemExit()) {
      Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
      absl::StrCat("Worker exits by an user request. ", status.message()),
      creation_task_exception_pb_bytes);
      } else if (status.IsUnexpectedSystemExit()) {
      Exit(rpc::WorkerExitType::SYSTEM_ERROR,
      absl::StrCat("Worker exits unexpectedly. ", status.message()),
      creation_task_exception_pb_bytes);
      } else if (!status.ok()) {
      RAY_LOG(FATAL) << "Unexpected task status type : " << status;
      }
      return status;
  • Based on the status of check_signals application-language callback:
    • auto status = options_.check_signals();
      if (status.IsIntentionalSystemExit()) {
      Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
      absl::StrCat("Worker exits by a signal. ", status.message()),
      nullptr);
      }
      if (status.IsUnexpectedSystemExit()) {
      Exit(
      rpc::WorkerExitType::SYSTEM_ERROR,
      absl::StrCat("Worker exits unexpectedly by a signal. ", status.message()),
      nullptr);
      }

Actor exit codepaths (in GCS):

Either DestroyActor or KillActor

Examples:

  • Normal actor exit (process completed): GCS will poll owner for actor ref deleted, and then notify core worker to delete actor.
  • exit_actor is called (this PR): core worker exits based on application-language callback.
  • ray.kill is called: DestroyActor in GCS will be called, and then it'll notify core worker to exit.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@MortalHappiness MortalHappiness added the go add ONLY when ready to merge, run all tests label Jan 17, 2025
@MortalHappiness MortalHappiness force-pushed the bugfix/#49451-exit-actor-async-background branch 4 times, most recently from 8eac3e4 to ad9caea Compare January 20, 2025 12:24
@edoakes
Copy link
Collaborator

edoakes commented Feb 5, 2025

@MortalHappiness let me know when this is ready for review

@edoakes
Copy link
Collaborator

edoakes commented Feb 5, 2025

At first glance, I think we need to use a threading.Event here, not an asyncio.Event. This is because we need to communicate across threads, not just across asyncio tasks within the same thread.

@MortalHappiness
Copy link
Member Author

MortalHappiness commented Feb 6, 2025

@edoakes What do you think would be a better approach? Implementing it purely on the Python side using threading.Event, or implementing it on the C++ side, as you mentioned in #49451 (comment)?

If implemented on the C++ side, I will export a function to Python and call this function when exit_actor is triggered. Inside this function, I will modify some state and periodically check whether this state indicates that an exit is required.

@edoakes
Copy link
Collaborator

edoakes commented Feb 6, 2025

@edoakes What do you think would be a better approach? Implementing it purely on the Python side using threading.Event, or implementing it on the C++ side, as you mentioned in #49451 (comment)?

If implemented on the C++ side, I will export a function to Python and call this function when exit_actor is triggered. Inside this function, I will modify some state and periodically check whether this state indicates that an exit is required.

If it's possible to do it in C++ without significantly more code change I'd prefer to do it there. That is more future proof and allows us to reuse it across different language frontends in the future.

@MortalHappiness MortalHappiness force-pushed the bugfix/#49451-exit-actor-async-background branch 2 times, most recently from 81246c5 to 545ca38 Compare February 25, 2025 10:35
@MortalHappiness MortalHappiness changed the title [Fix][Core] Use asyncio.Event instead of AsyncioActorExit exception to terminate async actor [Fix][Core] Execute user requested task exit in C++ side Feb 25, 2025
@MortalHappiness MortalHappiness changed the title [Fix][Core] Execute user requested task exit in C++ side [Fix][Core] Execute user requested actor exit in C++ side Feb 25, 2025
@MortalHappiness MortalHappiness force-pushed the bugfix/#49451-exit-actor-async-background branch from 545ca38 to 9f4b19c Compare February 25, 2025 11:02
@MortalHappiness MortalHappiness marked this pull request as ready for review February 25, 2025 13:17
@edoakes edoakes requested a review from a team February 25, 2025 14:10
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MortalHappiness great start!

Let's please unify as many of the actor exit codepaths as possible to share the same logic. For example, can we also unify the single threaded and multi threaded actor exit logic to use the same flag?

@MortalHappiness MortalHappiness requested a review from a team as a code owner February 26, 2025 00:26
@MortalHappiness
Copy link
Member Author

Let's please unify as many of the actor exit codepaths as possible to share the same logic. For example, can we also unify the single threaded and multi threaded actor exit logic to use the same flag?

Should I do this in this PR or in follow-up PRs?

@edoakes
Copy link
Collaborator

edoakes commented Feb 26, 2025

Let's please unify as many of the actor exit codepaths as possible to share the same logic. For example, can we also unify the single threaded and multi threaded actor exit logic to use the same flag?

Should I do this in this PR or in follow-up PRs?

I'm ok to do it in follow-up PRs, but could you summarize for me all of the existing exit paths before this PR and what you see as the end goal state?

@MortalHappiness
Copy link
Member Author

Hi @edoakes I've tried to summarize exit codepaths in PR description. Let me know if I missed something.

@edoakes
Copy link
Collaborator

edoakes commented Feb 27, 2025

Looks good now -- my only concern is that we don't have any behavior change for the sync worker case. Let's make sure that still raises immediately and fails the current task w/ an RayActorError

It seems like the "raising immediately" behavior might not be covered by a current test case (if my understanding is correct). If so, please add a test case for that. For example:

def method(self):
    ray.exit_actor()
    do_something() # This shouldn't be called and this method should raise `RayActorError` when `ray.get` is called.

@MortalHappiness
Copy link
Member Author

Hi @edoakes I've addressed all comments. Ready for another review. Thanks.

Comment on lines 1789 to 1791
if not worker.core_worker.current_actor_is_asyncio():
raise_sys_exit_with_custom_error_message("exit_actor() is called.")
worker.core_worker.set_current_actor_should_exit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little confused at why this is conditional because we should try to make the behavior as uniform as possible:

  • For non-async actors, don't we still want to set the field in the core worker? Otherwise, if users accidentally or intentionally catch the SystemExit exception that is raised, the actor won't actually be exited.
  • For async actors, don't we still want to raise the SystemExit exception immediately?

Comment on lines 921 to 924
raise AssertionError(
"This line should not be reached because non-async actor"
"should exit immediately after exit_actor is called."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this assertion would actually catch a regression because this line might execute and then the exit method would still return a RayActorError. For any condition like this, you should verify that if you intentionally break the behavior (in this case, make it so that sync actors don't raise immediately), the test fails. I don't think it will in this case.

Instead, what you will need to do is add an "out of bounds" check -- so for example catch the SystemExit exception and then send a signal on an actor or write a tempfile like the other test cases. Then after the actor exits, assert that the exception was raised and therefore the out of band behavior happened.

Comment on lines +941 to +1037
assert (
# Exited when task execution returns
"exit_actor()" in str(exc_info.value)
# Exited during periodical check in worker
or "User requested to exit the actor" in str(exc_info.value)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things for this test:

  1. We should add the same check that we have for the sync case that exit_actor() immediately raises an exception).
  2. I might be misguided, but I think we can have two test cases here instead of just checking for both errors. The first one can call exit_actor on the task directly executing the actor method and that should always return "exit_actor() was called...". The second one can call exit_actor in a background asyncio task and then assert that `"User requested to exit the actor" is eventually raised.

Comment on lines 1022 to 1024
ray.get(a.__ray_ready__.remote())
with pytest.raises(ray.exceptions.RayActorError) as exc_info:
ray.get(a.start_exit_task.remote())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm this test doesn't look correct. the method starts a background task so there is no guarantee that runs before the method finishes successfully. I think what you need to do is start the background task and have it block on a SignalActor before it actually calls exit_actor(), check that start_exit_task completes successfully, then unblock the background task so it calls exit_actor(), then use wait_for_condition on ray.get(a.noop_method.remote()) to check that eventually subsequent methods will return the actor error

@MortalHappiness
Copy link
Member Author

MortalHappiness commented Mar 4, 2025

@edoakes I've addressed all of your comments, but the actual implementation differs slightly. Let me describe the changes here:

  • I added back the AsyncioActorExit exception. Since you mentioned that a normal async task should raise immediately, I can't just call set_current_actor_should_exit because this flag is checked periodically rather than triggering an immediate exception. I also can't raise SystemExit, as noted in the original comment

    ray/python/ray/actor.py

    Lines 1784 to 1786 in e151dc2

    # In asyncio actor mode, we can't raise SystemExit because it will just
    # quit the asycnio event loop thread, not the main thread. Instead, we
    # raise a custom error to the main thread to tell it to exit.

  • I split the test into 6 separate tests:

    1. test_exit_actor_invalid_usage_error:
      • This is the original test for invalid call of exit_actor in non-actor functions.
    2. test_exit_actor_normal_actor_raise_immediately:
      • This is the original test for a normal actor, but I added one line to ensure that any code after exit_actor is not executed.
    3. test_exit_actor_normal_actor_user_catch_err_should_still_exit:
      • This is an "out-of-bound" test for a normal actor.
      • In this case, the user catches SystemExit, so the program does not raise immediately.
      • However, the actor will still exit shortly after the flag is checked by the core worker, similar to the exit path for an async background task.
    4. test_exit_actor_async_actor_raise_immediately:
      • This is the original test for an async actor, but I added one line to verify that any code after exit_actor is not executed.
      • Why does this test still check both "exit_actor()" in str(exc_info.value)" and "User requested to exit the actor" in str(exc_info.value)"?
        • Most of the time, the first case occurs.
        • However, since we call set_current_actor_should_exit (which is checked periodically by the core worker), there is a small chance that the second case occurs instead.
    5. test_exit_actor_async_actor_user_catch_err_should_still_exit:
      • This is an "out-of-bound" test for an async actor.
      • In this case, the user catches AsyncioActorExit, so the program does not raise immediately.
      • However, the actor will still exit shortly after the flag is checked by the core worker, similar to the exit path for an async background task.
    6. test_exit_actor_async_actor_nested_task:
      • I updated this test to use SignalActor.
      • No error is raised in this case, but the actor still exits short after when the flag is checked by the core worker.
      • Therefore the atexit handler is triggered.

@edoakes
Copy link
Collaborator

edoakes commented Mar 4, 2025

@MortalHappiness why can't we set the should_exit flag and then raise the normal SystemExit? AFAICT this makes the behavior as consistent as possible:

  • The currently-running task will raise and exit immediately (same as a sync actor method).
  • The actor will exit soon after once the should_exit flag is checked.

Am I missing something?

@MortalHappiness
Copy link
Member Author

MortalHappiness commented Mar 5, 2025

@MortalHappiness why can't we set the should_exit flag and then raise the normal SystemExit? AFAICT this makes the behavior as consistent as possible:

  • The currently-running task will raise and exit immediately (same as a sync actor method).
  • The actor will exit soon after once the should_exit flag is checked.

Am I missing something?

@edoakes After some debug printing and some experiment, I found that asyncio.run_coroutine_thread_safe handles SystemExit and KeyboardInterrupt differently.

CPython source code: https://github.com/python/cpython/blob/e53d105872fafa77507ea33b7ecf0faddd4c3b60/Lib/asyncio/tasks.py#L991-L1011

The following is a toy example.

import time
import asyncio
import threading

async def f():
    print("Inside coroutine f()")
    # raise ValueError
    # raise BaseException
    raise SystemExit

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def main():
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=start_loop, args=(loop,))
    thread.start()

    future = asyncio.run_coroutine_threadsafe(f(), loop)

    try:
        time.sleep(2)
        print("loop alive?", loop.is_running())
        future.result()
    except BaseException as e:
        print("Caught exception:", repr(e))

    print("about to stop loop")
    loop.call_soon_threadsafe(loop.stop)
    thread.join()

if __name__ == "__main__":
    main()

image

Explanation:

  • raise ValueError or BaseException in f: Loop alive, and exception was caught.
  • raise SystemExit in f: Loop dead. Program hangs forever on future.result() because future.set_exception(exc) is not called inside asyncio.run_coroutine_thread_safe

And in Ray, we use run_coroutine_thread_safe to run async tasks:

future = asyncio.run_coroutine_threadsafe(async_func(), eventloop)

This is why we cannot raise SystemExit in exit_actor.

MortalHappiness and others added 16 commits March 5, 2025 16:17
Co-authored-by: Edward Oakes <[email protected]>
Signed-off-by: Chi-Sheng Liu <[email protected]>
Closes: ray-project#49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
Closes: ray-project#49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
Closes: ray-project#49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
This reverts commit 516bf49.

Signed-off-by: Chi-Sheng Liu <[email protected]>
Closes: ray-project#49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
@MortalHappiness MortalHappiness force-pushed the bugfix/#49451-exit-actor-async-background branch from 5945cfc to 04cf1ee Compare March 5, 2025 08:17
@edoakes
Copy link
Collaborator

edoakes commented Mar 5, 2025

@MortalHappiness I see, thanks for the explanation. Is it possible to add a top-level exception handler for SystemExit in the async_func() that will call future.set_exception()?

@MortalHappiness
Copy link
Member Author

MortalHappiness commented Mar 6, 2025

@MortalHappiness I see, thanks for the explanation. Is it possible to add a top-level exception handler for SystemExit in the async_func() that will call future.set_exception()?

@edoakes Not easy to do that because the future object is returned by asyncio.run_coroutine_threadsafe, so we cannot call future.set_exception inside async_func.

It's easier to raise a custom exception in this case. My idea is that exit_actor only raises SystemExit, then async_func catches SystemExit, raises a custom exception, and then future.result() catches the custom exception and raises SystemExit. Although this approach is similar to the previous custom exception class AsyncioActorExit, the advantage is that the external file will never see this custom exception class. It only needs to be declared inside this function. WDYT?

Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes LGTM, just a few nits

Your latest suggestion sounds better than what is in the code, but given that we already have the existing behavior with AsyncioActorExit and it would be a very minor API breakage to change, better to leave it as is.

Comment on lines 120 to 122
void SetCurrentActorShouldExit() ABSL_LOCKS_EXCLUDED(mutex_);

bool GetCurrentActorShouldExit() const ABSL_LOCKS_EXCLUDED(mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add header comments for these indicating what they're used for

Copy link
Member Author

@MortalHappiness MortalHappiness Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Verify the basic case.
"""

def test_exit_actor_normal_actor_raise_immediately(shutdown_only, tmp_path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test case for calling ray.exit_actor() within the constructor if there isn't one already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 3 tests: test_exit_actor_normal_actor_in_constructor_should_exit, test_exit_actor_async_actor_in_constructor_should_exit, and test_exit_actor_async_actor_nested_task_in_constructor_should_exit

@edoakes edoakes enabled auto-merge (squash) March 7, 2025 00:18
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

Closes: ray-project#49451
Signed-off-by: Chi-Sheng Liu <[email protected]>
@github-actions github-actions bot disabled auto-merge March 7, 2025 00:19
@MortalHappiness
Copy link
Member Author

@edoakes I found that you enabled auto-merged. But I found that I accidentally changed 1 line that should not be changed so I pushed it again and the auto-merge is disabled. Could you enable it again? Thanks.

@edoakes edoakes enabled auto-merge (squash) March 7, 2025 00:24
@edoakes edoakes merged commit 9ed6ec6 into ray-project:master Mar 7, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] ray.actor.exit_actor() does not seem to work from within an async background thread
2 participants