From 67f9490c726ec528349e13e382ca86b6d54393f6 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 22 Jan 2025 09:09:14 -0800 Subject: [PATCH] [observability][export-api] Env var config to write export events of specific source types (#49541) Signed-off-by: Nikita Vemuri --- .../ray/_private/event/export_event_logger.py | 20 +++ python/ray/_private/ray_constants.py | 15 +++ .../tests/test_generate_export_events.py | 117 +++++++++++++++++- python/ray/dashboard/modules/job/common.py | 7 +- src/ray/common/ray_config_def.h | 12 +- src/ray/core_worker/task_event_buffer.cc | 5 +- src/ray/core_worker/task_event_buffer.h | 10 ++ src/ray/gcs/gcs_server/gcs_actor_manager.cc | 6 +- src/ray/gcs/gcs_server/gcs_actor_manager.h | 12 ++ src/ray/gcs/gcs_server/gcs_job_manager.cc | 2 +- src/ray/gcs/gcs_server/gcs_job_manager.h | 15 ++- src/ray/gcs/gcs_server/gcs_node_manager.cc | 6 +- src/ray/gcs/gcs_server/gcs_node_manager.h | 11 ++ src/ray/util/event.cc | 15 +++ src/ray/util/event.h | 5 + src/ray/util/tests/BUILD | 1 + src/ray/util/tests/event_test.cc | 29 +++++ 17 files changed, 271 insertions(+), 17 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 95df7fea6a3a0..6cc5d0faed5dd 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -142,3 +142,23 @@ def get_export_event_logger( _export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger) return _export_event_logger[source_name] + + +def check_export_api_enabled( + source: ExportEvent.SourceType, +) -> bool: + """ + Check RAY_ENABLE_EXPORT_API_WRITE and RAY_ENABLE_EXPORT_API_WRITE_CONFIG environment + variables to verify if export events should be written for the given source type. + + Args: + source: The source of the export event. + """ + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE: + return True + source_name = ExportEvent.SourceType.Name(source) + return ( + source_name in ray_constants.RAY_ENABLE_EXPORT_API_WRITE_CONFIG + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE_CONFIG + else False + ) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 18c525412e6e6..166fc42791b77 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -530,8 +530,23 @@ def gcs_actor_scheduling_enabled(): RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON" +# Write export API event of all resource types to file if enabled. +# RAY_enable_export_api_write_config will not be considered if +# this is enabled. RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False) +# Comma separated string containing individual resource +# to write export API events for. This configuration is only used if +# RAY_enable_export_api_write is not enabled. Full list of valid +# resource types in ExportEvent.SourceType enum in +# src/ray/protobuf/export_api/export_event.proto +# Example config: +# `export RAY_enable_export_api_write_config='EXPORT_SUBMISSION_JOB,EXPORT_ACTOR'` +RAY_ENABLE_EXPORT_API_WRITE_CONFIG_STR = os.environ.get( + "RAY_enable_export_api_write_config", "" +) +RAY_ENABLE_EXPORT_API_WRITE_CONFIG = RAY_ENABLE_EXPORT_API_WRITE_CONFIG_STR.split(",") + RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool( "RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6 ) diff --git a/python/ray/dashboard/modules/event/tests/test_generate_export_events.py b/python/ray/dashboard/modules/event/tests/test_generate_export_events.py index 9894a7de07ffe..f1ac91690eb3e 100644 --- a/python/ray/dashboard/modules/event/tests/test_generate_export_events.py +++ b/python/ray/dashboard/modules/event/tests/test_generate_export_events.py @@ -6,10 +6,10 @@ import pytest -# RAY_enable_export_api_write env var must be set before importing -# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE +# RAY_enable_export_api_write_config env var must be set before importing +# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE_CONFIG # even outside a Ray driver. -os.environ["RAY_enable_export_api_write"] = "true" +os.environ["RAY_enable_export_api_write_config"] = "EXPORT_SUBMISSION_JOB" import ray from ray._private.gcs_utils import GcsAioClient @@ -32,6 +32,45 @@ async def check_job_succeeded(job_manager, job_id): return status == JobStatus.SUCCEEDED +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write_config": "EXPORT_SUBMISSION_JOB,EXPORT_TASK", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) +async def test_check_export_api_enabled(call_ray_start, tmp_path): # noqa: F811 + """ + Test check_export_api_enabled is True for EXPORT_SUBMISSION_JOB and EXPORT_TASK but + not for EXPORT_ACTOR because of the value of RAY_enable_export_api_write_config. + """ + + @ray.remote + def test_check_export_api_enabled_remote(): + from ray._private.event.export_event_logger import check_export_api_enabled + from ray.core.generated.export_event_pb2 import ExportEvent + + success = True + success = success and check_export_api_enabled( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB + ) + success = success and check_export_api_enabled( + ExportEvent.SourceType.EXPORT_TASK + ) + success = success and ( + not check_export_api_enabled(ExportEvent.SourceType.EXPORT_ACTOR) + ) + return success + + assert ray.get(test_check_export_api_enabled_remote.remote()) + + @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", @@ -45,6 +84,78 @@ async def check_job_succeeded(job_manager, job_id): ], indirect=True, ) +async def test_check_export_api_enabled_global(call_ray_start, tmp_path): # noqa: F811 + """ + Test check_export_api_enabled always returns True because RAY_enable_export_api_write + is set to True. + """ + + @ray.remote + def test_check_export_api_enabled_remote(): + from ray._private.event.export_event_logger import check_export_api_enabled + from ray.core.generated.export_event_pb2 import ExportEvent + + success = True + success = success and check_export_api_enabled( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB + ) + success = success and check_export_api_enabled( + ExportEvent.SourceType.EXPORT_ACTOR + ) + return success + + assert ray.get(test_check_export_api_enabled_remote.remote()) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write_config": "invalid source type", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) +async def test_check_export_api_empty_config(call_ray_start, tmp_path): # noqa: F811 + """ + Test check_export_api_enabled is False for all sources because + RAY_enable_export_api_write_config is not a vaild source type. + """ + + @ray.remote + def test_check_export_api_enabled_remote(): + from ray._private.event.export_event_logger import check_export_api_enabled + from ray.core.generated.export_event_pb2 import ExportEvent + + success = True + success = success and not ( + check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB) + ) + success = success and ( + not check_export_api_enabled(ExportEvent.SourceType.EXPORT_ACTOR) + ) + return success + + assert ray.get(test_check_export_api_enabled_remote.remote()) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write_config": "EXPORT_SUBMISSION_JOB", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 """ Test submission job events are correctly generated and written to file diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index b928baab3aa2b..8b308ded25d27 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -8,7 +8,10 @@ from typing import Any, Dict, Optional, Tuple, Union from ray._private import ray_constants -from ray._private.event.export_event_logger import get_export_event_logger +from ray._private.event.export_event_logger import ( + check_export_api_enabled, + get_export_event_logger, +) from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri from ray.core.generated.export_event_pb2 import ExportEvent @@ -213,7 +216,7 @@ def __init__( self._export_submission_job_event_logger: logging.Logger = None try: if ( - ray_constants.RAY_ENABLE_EXPORT_API_WRITE + check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB) and export_event_log_dir_root is not None ): self._export_submission_job_event_logger = get_export_event_logger( diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 3bc2955f8cf72..d97b5089ba660 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -927,5 +927,15 @@ RAY_CONFIG(int, object_manager_client_connection_num, 4) // Update this to overwrite it. RAY_CONFIG(int, object_manager_rpc_threads_num, 0) -// Write export API events to file if enabled +// Write export API event of all resource types to file if enabled. +// RAY_enable_export_api_write_config will not be considered if +// this is enabled. RAY_CONFIG(bool, enable_export_api_write, false) + +// Comma separated string containing individual resource +// types to write export API events for. This configuration is only used if +// RAY_enable_export_api_write is not enabled. Full list of valid +// resource types in ExportEvent.SourceType enum in +// src/ray/protobuf/export_api/export_event.proto +// Example config: `export RAY_enable_export_api_write_config='EXPORT_ACTOR,EXPORT_TASK'` +RAY_CONFIG(std::vector, enable_export_api_write_config, {}) diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index ed8dba3c264bd..2054d676f422c 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -14,9 +14,6 @@ #include "ray/core_worker/task_event_buffer.h" -#include "ray/gcs/pb_util.h" -#include "ray/util/event.h" - namespace ray { namespace core { @@ -239,7 +236,7 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); } Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); - export_event_write_enabled_ = RayConfig::instance().enable_export_api_write(); + export_event_write_enabled_ = TaskEventBufferImpl::IsExportAPIEnabledTask(); auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms(); RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index adfbafa42e18a..96a1e4573486e 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -27,7 +27,9 @@ #include "ray/common/id.h" #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_client/gcs_client.h" +#include "ray/gcs/pb_util.h" #include "ray/util/counter_map.h" +#include "ray/util/event.h" #include "src/ray/protobuf/export_api/export_task_event.pb.h" #include "src/ray/protobuf/gcs.pb.h" @@ -378,6 +380,14 @@ class TaskEventBufferImpl : public TaskEventBuffer { const std::vector> &status_events_to_write_for_export, const std::vector> &profile_events_to_send); + // Verify if export events should be written for EXPORT_TASK source types + bool IsExportAPIEnabledTask() const { + return IsExportAPIEnabledSourceType( + "EXPORT_TASK", + ::RayConfig::instance().enable_export_api_write(), + ::RayConfig::instance().enable_export_api_write_config()); + } + /// Reset the counters during flushing data to GCS. void ResetCountersForFlush(); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 7a76b96423369..2f6f8a91334fb 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -242,9 +242,9 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } void GcsActor::WriteActorExportEvent() const { - /// Write actor_table_data_ as a export actor event if - /// enable_export_api_write() is enabled. - if (!RayConfig::instance().enable_export_api_write()) { + /// Verify actor export events should be written to file + /// and then write actor_table_data_ as an export event. + if (!export_event_write_enabled_) { return; } std::shared_ptr export_actor_data_ptr = diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 7431b1976cfb8..26e5e0f1f6f11 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -52,6 +52,7 @@ class GcsActor { counter) : actor_table_data_(std::move(actor_table_data)), counter_(counter) { RefreshMetrics(); + export_event_write_enabled_ = IsExportAPIEnabledActor(); } /// Create a GcsActor by actor_table_data and task_spec. @@ -70,6 +71,7 @@ class GcsActor { counter_(counter) { RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD); RefreshMetrics(); + export_event_write_enabled_ = IsExportAPIEnabledActor(); } /// Create a GcsActor by TaskSpec. @@ -140,6 +142,7 @@ class GcsActor { actor_table_data_.set_call_site(task_spec.call_site()); } RefreshMetrics(); + export_event_write_enabled_ = IsExportAPIEnabledActor(); } ~GcsActor() { @@ -196,6 +199,13 @@ class GcsActor { /// Write an event containing this actor's ActorTableData /// to file for the Export API. void WriteActorExportEvent() const; + // Verify if export events should be written for EXPORT_ACTOR source types + bool IsExportAPIEnabledActor() const { + return IsExportAPIEnabledSourceType( + "EXPORT_ACTOR", + RayConfig::instance().enable_export_api_write(), + RayConfig::instance().enable_export_api_write_config()); + } const ResourceRequest &GetAcquiredResources() const; void SetAcquiredResources(ResourceRequest &&resource_request); @@ -256,6 +266,8 @@ class GcsActor { bool grant_or_reject_ = false; /// The last recorded metric state. std::optional last_metric_state_; + /// If true, actor events are exported for Export API + bool export_event_write_enabled_ = false; }; using RegisterActorCallback = diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index e89fc0f7be414..fcad3e3d8ff5e 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -37,7 +37,7 @@ void GcsJobManager::WriteDriverJobExportEvent(rpc::JobTableData job_data) const /// Write job_data as a export driver job event if /// enable_export_api_write() is enabled and if this job is /// not in the _ray_internal_ namespace. - if (!RayConfig::instance().enable_export_api_write()) { + if (!export_event_write_enabled_) { return; } if (job_data.config().ray_namespace().find(kRayInternalNamespacePrefix) == 0) { diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index 9534a475e0925..f06d9724f675e 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -63,7 +63,9 @@ class GcsJobManager : public rpc::JobInfoHandler { function_manager_(function_manager), internal_kv_(internal_kv), io_context_(io_context), - core_worker_clients_(client_factory) {} + core_worker_clients_(client_factory) { + export_event_write_enabled_ = IsExportAPIEnabledDriverJob(); + } void Initialize(const GcsInitData &gcs_init_data); @@ -99,6 +101,14 @@ class GcsJobManager : public rpc::JobInfoHandler { void WriteDriverJobExportEvent(rpc::JobTableData job_data) const; + // Verify if export events should be written for EXPORT_DRIVER_JOB source types + bool IsExportAPIEnabledDriverJob() const { + return IsExportAPIEnabledSourceType( + "EXPORT_DRIVER_JOB", + RayConfig::instance().enable_export_api_write(), + RayConfig::instance().enable_export_api_write_config()); + } + /// Record metrics. /// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether /// succeed or fail) will be reported periodically. @@ -135,6 +145,9 @@ class GcsJobManager : public rpc::JobInfoHandler { instrumented_io_context &io_context_; /// The cached core worker clients which are used to communicate with workers. rpc::CoreWorkerClientPool core_worker_clients_; + + /// If true, driver job events are exported for Export API + bool export_event_write_enabled_ = false; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index b3f8e151e18d5..8a5ce487c7b39 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -38,12 +38,14 @@ GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher, gcs_table_storage_(gcs_table_storage), io_context_(io_context), raylet_client_pool_(raylet_client_pool), - cluster_id_(cluster_id) {} + cluster_id_(cluster_id) { + export_event_write_enabled_ = IsExportAPIEnabledNode(); +} void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const { /// Write node_info as a export node event if /// enable_export_api_write() is enabled. - if (!RayConfig::instance().enable_export_api_write()) { + if (!export_event_write_enabled_) { return; } std::shared_ptr export_node_data_ptr = diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index f2ee24be3ae10..2de6fe29fe9d9 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -183,6 +183,14 @@ class GcsNodeManager : public rpc::NodeInfoHandler { void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const; + // Verify if export events should be written for EXPORT_NODE source types + bool IsExportAPIEnabledNode() const { + return IsExportAPIEnabledSourceType( + "EXPORT_NODE", + RayConfig::instance().enable_export_api_write(), + RayConfig::instance().enable_export_api_write_config()); + } + rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport( rpc::GcsNodeInfo::GcsNodeState node_state) const { switch (node_state) { @@ -269,6 +277,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { boost::bimaps::unordered_multiset_of>; NodeIDAddrBiMap node_map_; + /// If true, node events are exported for Export API + bool export_event_write_enabled_ = false; + friend GcsAutoscalerStateManagerTest; friend GcsStateTest; }; diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index ee22e93cde71a..4de0e14ea41da 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -512,4 +512,19 @@ void RayEventInit(const std::vector source_types, }); } +bool IsExportAPIEnabledSourceType( + std::string source_type, + bool enable_export_api_write_global, + std::vector enable_export_api_write_config) { + if (enable_export_api_write_global) { + return true; + } + for (const auto &element : enable_export_api_write_config) { + if (element == source_type) { + return true; + } + } + return false; +} + } // namespace ray diff --git a/src/ray/util/event.h b/src/ray/util/event.h index bcbabfdbf0e0c..75a43237a556c 100644 --- a/src/ray/util/event.h +++ b/src/ray/util/event.h @@ -342,6 +342,11 @@ class RayExportEvent { ExportEventDataPtr event_data_ptr_; }; +bool IsExportAPIEnabledSourceType( + std::string source_type, + bool enable_export_api_write_global, + std::vector enable_export_api_write_config_str); + /// Ray Event initialization. /// /// This function should be called when the main thread starts. diff --git a/src/ray/util/tests/BUILD b/src/ray/util/tests/BUILD index 22a582833d3b3..7159a0f61a69f 100644 --- a/src/ray/util/tests/BUILD +++ b/src/ray/util/tests/BUILD @@ -71,6 +71,7 @@ ray_cc_test( "@boost//:range", "@com_google_googletest//:gtest_main", "//src/ray/protobuf:gcs_cc_proto", + "//src/ray/common:ray_config", ], ) diff --git a/src/ray/util/tests/event_test.cc b/src/ray/util/tests/event_test.cc index f29b4ca809a53..8684f9e58ff87 100644 --- a/src/ray/util/tests/event_test.cc +++ b/src/ray/util/tests/event_test.cc @@ -24,6 +24,9 @@ #include #include +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "ray/common/ray_config.h" #include "ray/util/event_label.h" #include "ray/util/random.h" #include "ray/util/string_utils.h" @@ -534,6 +537,32 @@ TEST_F(EventTest, TestExportEvent) { EXPECT_EQ(raylet_event_as_json["message"].get(), "test warning"); } +TEST_F(EventTest, TestIsExportAPIEnabledSourceType) { + EXPECT_EQ( + IsExportAPIEnabledSourceType( + "EXPORT_TASK", false, std::vector{"EXPORT_TASK", "EXPORT_ACTOR"}), + true); + EXPECT_EQ( + IsExportAPIEnabledSourceType( + "EXPORT_TASK", true, std::vector{"EXPORT_TASK", "EXPORT_ACTOR"}), + true); + EXPECT_EQ(IsExportAPIEnabledSourceType( + "EXPORT_TASK", false, std::vector{"EXPORT_ACTOR"}), + false); + EXPECT_EQ(IsExportAPIEnabledSourceType( + "EXPORT_TASK", true, std::vector{"EXPORT_ACTOR"}), + true); + + EXPECT_EQ(IsExportAPIEnabledSourceType( + "EXPORT_TASK", false, std::vector{"invalid resource type"}), + false); + + const std::string input = "EXPORT_TASK,EXPORT_ACTOR"; + const std::vector expected_output{"EXPORT_TASK", "EXPORT_ACTOR"}; + auto output = ConvertValue>("std::vector", input); + ASSERT_EQ(output, expected_output); +} + TEST_F(EventTest, TestRayCheckAbort) { auto custom_fields = absl::flat_hash_map(); custom_fields.emplace("node_id", "node 1");