Skip to content

Commit

Permalink
[observability][export-api] Env var config to write export events of …
Browse files Browse the repository at this point in the history
…specific source types (#49541)

Signed-off-by: Nikita Vemuri <[email protected]>
  • Loading branch information
nikitavemuri authored Jan 22, 2025
1 parent 8d83686 commit 67f9490
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 17 deletions.
20 changes: 20 additions & 0 deletions python/ray/_private/event/export_event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,23 @@ def get_export_event_logger(
_export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger)

return _export_event_logger[source_name]


def check_export_api_enabled(
source: ExportEvent.SourceType,
) -> bool:
"""
Check RAY_ENABLE_EXPORT_API_WRITE and RAY_ENABLE_EXPORT_API_WRITE_CONFIG environment
variables to verify if export events should be written for the given source type.
Args:
source: The source of the export event.
"""
if ray_constants.RAY_ENABLE_EXPORT_API_WRITE:
return True
source_name = ExportEvent.SourceType.Name(source)
return (
source_name in ray_constants.RAY_ENABLE_EXPORT_API_WRITE_CONFIG
if ray_constants.RAY_ENABLE_EXPORT_API_WRITE_CONFIG
else False
)
15 changes: 15 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,23 @@ def gcs_actor_scheduling_enabled():

RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON"

# Write export API event of all resource types to file if enabled.
# RAY_enable_export_api_write_config will not be considered if
# this is enabled.
RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False)

# Comma separated string containing individual resource
# to write export API events for. This configuration is only used if
# RAY_enable_export_api_write is not enabled. Full list of valid
# resource types in ExportEvent.SourceType enum in
# src/ray/protobuf/export_api/export_event.proto
# Example config:
# `export RAY_enable_export_api_write_config='EXPORT_SUBMISSION_JOB,EXPORT_ACTOR'`
RAY_ENABLE_EXPORT_API_WRITE_CONFIG_STR = os.environ.get(
"RAY_enable_export_api_write_config", ""
)
RAY_ENABLE_EXPORT_API_WRITE_CONFIG = RAY_ENABLE_EXPORT_API_WRITE_CONFIG_STR.split(",")

RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool(
"RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import pytest

# RAY_enable_export_api_write env var must be set before importing
# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE
# RAY_enable_export_api_write_config env var must be set before importing
# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE_CONFIG
# even outside a Ray driver.
os.environ["RAY_enable_export_api_write"] = "true"
os.environ["RAY_enable_export_api_write_config"] = "EXPORT_SUBMISSION_JOB"

import ray
from ray._private.gcs_utils import GcsAioClient
Expand All @@ -32,6 +32,45 @@ async def check_job_succeeded(job_manager, job_id):
return status == JobStatus.SUCCEEDED


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write_config": "EXPORT_SUBMISSION_JOB,EXPORT_TASK",
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_check_export_api_enabled(call_ray_start, tmp_path): # noqa: F811
"""
Test check_export_api_enabled is True for EXPORT_SUBMISSION_JOB and EXPORT_TASK but
not for EXPORT_ACTOR because of the value of RAY_enable_export_api_write_config.
"""

@ray.remote
def test_check_export_api_enabled_remote():
from ray._private.event.export_event_logger import check_export_api_enabled
from ray.core.generated.export_event_pb2 import ExportEvent

success = True
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_SUBMISSION_JOB
)
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_TASK
)
success = success and (
not check_export_api_enabled(ExportEvent.SourceType.EXPORT_ACTOR)
)
return success

assert ray.get(test_check_export_api_enabled_remote.remote())


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
Expand All @@ -45,6 +84,78 @@ async def check_job_succeeded(job_manager, job_id):
],
indirect=True,
)
async def test_check_export_api_enabled_global(call_ray_start, tmp_path): # noqa: F811
"""
Test check_export_api_enabled always returns True because RAY_enable_export_api_write
is set to True.
"""

@ray.remote
def test_check_export_api_enabled_remote():
from ray._private.event.export_event_logger import check_export_api_enabled
from ray.core.generated.export_event_pb2 import ExportEvent

success = True
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_SUBMISSION_JOB
)
success = success and check_export_api_enabled(
ExportEvent.SourceType.EXPORT_ACTOR
)
return success

assert ray.get(test_check_export_api_enabled_remote.remote())


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write_config": "invalid source type",
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_check_export_api_empty_config(call_ray_start, tmp_path): # noqa: F811
"""
Test check_export_api_enabled is False for all sources because
RAY_enable_export_api_write_config is not a vaild source type.
"""

@ray.remote
def test_check_export_api_enabled_remote():
from ray._private.event.export_event_logger import check_export_api_enabled
from ray.core.generated.export_event_pb2 import ExportEvent

success = True
success = success and not (
check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB)
)
success = success and (
not check_export_api_enabled(ExportEvent.SourceType.EXPORT_ACTOR)
)
return success

assert ray.get(test_check_export_api_enabled_remote.remote())


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write_config": "EXPORT_SUBMISSION_JOB",
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811
"""
Test submission job events are correctly generated and written to file
Expand Down
7 changes: 5 additions & 2 deletions python/ray/dashboard/modules/job/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from typing import Any, Dict, Optional, Tuple, Union

from ray._private import ray_constants
from ray._private.event.export_event_logger import get_export_event_logger
from ray._private.event.export_event_logger import (
check_export_api_enabled,
get_export_event_logger,
)
from ray._private.gcs_utils import GcsAioClient
from ray._private.runtime_env.packaging import parse_uri
from ray.core.generated.export_event_pb2 import ExportEvent
Expand Down Expand Up @@ -213,7 +216,7 @@ def __init__(
self._export_submission_job_event_logger: logging.Logger = None
try:
if (
ray_constants.RAY_ENABLE_EXPORT_API_WRITE
check_export_api_enabled(ExportEvent.SourceType.EXPORT_SUBMISSION_JOB)
and export_event_log_dir_root is not None
):
self._export_submission_job_event_logger = get_export_event_logger(
Expand Down
12 changes: 11 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -927,5 +927,15 @@ RAY_CONFIG(int, object_manager_client_connection_num, 4)
// Update this to overwrite it.
RAY_CONFIG(int, object_manager_rpc_threads_num, 0)

// Write export API events to file if enabled
// Write export API event of all resource types to file if enabled.
// RAY_enable_export_api_write_config will not be considered if
// this is enabled.
RAY_CONFIG(bool, enable_export_api_write, false)

// Comma separated string containing individual resource
// types to write export API events for. This configuration is only used if
// RAY_enable_export_api_write is not enabled. Full list of valid
// resource types in ExportEvent.SourceType enum in
// src/ray/protobuf/export_api/export_event.proto
// Example config: `export RAY_enable_export_api_write_config='EXPORT_ACTOR,EXPORT_TASK'`
RAY_CONFIG(std::vector<std::string>, enable_export_api_write_config, {})
5 changes: 1 addition & 4 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

#include "ray/core_worker/task_event_buffer.h"

#include "ray/gcs/pb_util.h"
#include "ray/util/event.h"

namespace ray {
namespace core {

Expand Down Expand Up @@ -239,7 +236,7 @@ TaskEventBufferImpl::~TaskEventBufferImpl() { Stop(); }

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
export_event_write_enabled_ = RayConfig::instance().enable_export_api_write();
export_event_write_enabled_ = TaskEventBufferImpl::IsExportAPIEnabledTask();
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/gcs/pb_util.h"
#include "ray/util/counter_map.h"
#include "ray/util/event.h"
#include "src/ray/protobuf/export_api/export_task_event.pb.h"
#include "src/ray/protobuf/gcs.pb.h"

Expand Down Expand Up @@ -378,6 +380,14 @@ class TaskEventBufferImpl : public TaskEventBuffer {
const std::vector<std::shared_ptr<TaskEvent>> &status_events_to_write_for_export,
const std::vector<std::shared_ptr<TaskEvent>> &profile_events_to_send);

// Verify if export events should be written for EXPORT_TASK source types
bool IsExportAPIEnabledTask() const {
return IsExportAPIEnabledSourceType(
"EXPORT_TASK",
::RayConfig::instance().enable_export_api_write(),
::RayConfig::instance().enable_export_api_write_config());
}

/// Reset the counters during flushing data to GCS.
void ResetCountersForFlush();

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const {
rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; }

void GcsActor::WriteActorExportEvent() const {
/// Write actor_table_data_ as a export actor event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
/// Verify actor export events should be written to file
/// and then write actor_table_data_ as an export event.
if (!export_event_write_enabled_) {
return;
}
std::shared_ptr<rpc::ExportActorData> export_actor_data_ptr =
Expand Down
12 changes: 12 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class GcsActor {
counter)
: actor_table_data_(std::move(actor_table_data)), counter_(counter) {
RefreshMetrics();
export_event_write_enabled_ = IsExportAPIEnabledActor();
}

/// Create a GcsActor by actor_table_data and task_spec.
Expand All @@ -70,6 +71,7 @@ class GcsActor {
counter_(counter) {
RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD);
RefreshMetrics();
export_event_write_enabled_ = IsExportAPIEnabledActor();
}

/// Create a GcsActor by TaskSpec.
Expand Down Expand Up @@ -140,6 +142,7 @@ class GcsActor {
actor_table_data_.set_call_site(task_spec.call_site());
}
RefreshMetrics();
export_event_write_enabled_ = IsExportAPIEnabledActor();
}

~GcsActor() {
Expand Down Expand Up @@ -196,6 +199,13 @@ class GcsActor {
/// Write an event containing this actor's ActorTableData
/// to file for the Export API.
void WriteActorExportEvent() const;
// Verify if export events should be written for EXPORT_ACTOR source types
bool IsExportAPIEnabledActor() const {
return IsExportAPIEnabledSourceType(
"EXPORT_ACTOR",
RayConfig::instance().enable_export_api_write(),
RayConfig::instance().enable_export_api_write_config());
}

const ResourceRequest &GetAcquiredResources() const;
void SetAcquiredResources(ResourceRequest &&resource_request);
Expand Down Expand Up @@ -256,6 +266,8 @@ class GcsActor {
bool grant_or_reject_ = false;
/// The last recorded metric state.
std::optional<rpc::ActorTableData::ActorState> last_metric_state_;
/// If true, actor events are exported for Export API
bool export_event_write_enabled_ = false;
};

using RegisterActorCallback =
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void GcsJobManager::WriteDriverJobExportEvent(rpc::JobTableData job_data) const
/// Write job_data as a export driver job event if
/// enable_export_api_write() is enabled and if this job is
/// not in the _ray_internal_ namespace.
if (!RayConfig::instance().enable_export_api_write()) {
if (!export_event_write_enabled_) {
return;
}
if (job_data.config().ray_namespace().find(kRayInternalNamespacePrefix) == 0) {
Expand Down
15 changes: 14 additions & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class GcsJobManager : public rpc::JobInfoHandler {
function_manager_(function_manager),
internal_kv_(internal_kv),
io_context_(io_context),
core_worker_clients_(client_factory) {}
core_worker_clients_(client_factory) {
export_event_write_enabled_ = IsExportAPIEnabledDriverJob();
}

void Initialize(const GcsInitData &gcs_init_data);

Expand Down Expand Up @@ -99,6 +101,14 @@ class GcsJobManager : public rpc::JobInfoHandler {

void WriteDriverJobExportEvent(rpc::JobTableData job_data) const;

// Verify if export events should be written for EXPORT_DRIVER_JOB source types
bool IsExportAPIEnabledDriverJob() const {
return IsExportAPIEnabledSourceType(
"EXPORT_DRIVER_JOB",
RayConfig::instance().enable_export_api_write(),
RayConfig::instance().enable_export_api_write_config());
}

/// Record metrics.
/// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether
/// succeed or fail) will be reported periodically.
Expand Down Expand Up @@ -135,6 +145,9 @@ class GcsJobManager : public rpc::JobInfoHandler {
instrumented_io_context &io_context_;
/// The cached core worker clients which are used to communicate with workers.
rpc::CoreWorkerClientPool core_worker_clients_;

/// If true, driver job events are exported for Export API
bool export_event_write_enabled_ = false;
};

} // namespace gcs
Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ GcsNodeManager::GcsNodeManager(GcsPublisher *gcs_publisher,
gcs_table_storage_(gcs_table_storage),
io_context_(io_context),
raylet_client_pool_(raylet_client_pool),
cluster_id_(cluster_id) {}
cluster_id_(cluster_id) {
export_event_write_enabled_ = IsExportAPIEnabledNode();
}

void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const {
/// Write node_info as a export node event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
if (!export_event_write_enabled_) {
return;
}
std::shared_ptr<rpc::ExportNodeData> export_node_data_ptr =
Expand Down
Loading

0 comments on commit 67f9490

Please sign in to comment.