Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard] Remove ReportHead usage of DataSource.agents. #49878

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions python/ray/dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ async def run(self):
http_port = -1 if not self.http_server else self.http_server.http_port
grpc_port = -1 if not self.server else self.grpc_port
await self.gcs_aio_client.internal_kv_put(
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}".encode(),
json.dumps([http_port, grpc_port]).encode(),
f"{dashboard_consts.DASHBOARD_AGENT_ADDR_PREFIX}{self.node_id}".encode(),
json.dumps([self.ip, http_port, grpc_port]).encode(),
True,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
Expand Down
27 changes: 16 additions & 11 deletions python/ray/dashboard/client/src/common/ProfilingLink.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ClassNameProps } from "./props";
type CpuProfilingLinkProps = PropsWithChildren<
{
pid: string | number | null | undefined;
ip: string | null | undefined;
nodeId: string | null | undefined;
type: string | null;
} & ClassNameProps
>;
Expand All @@ -34,7 +34,7 @@ type TaskProfilingStackTraceProps = {
type MemoryProfilingProps = PropsWithChildren<
{
pid: string | number | null | undefined;
ip: string | null | undefined;
nodeId: string | null | undefined;
type?: string | null;
} & ClassNameProps
>;
Expand Down Expand Up @@ -92,15 +92,20 @@ export const TaskCpuStackTraceLink = ({

export const CpuStackTraceLink = ({
pid,
ip,
nodeId,
type = "",
}: CpuProfilingLinkProps) => {
if (!pid || !ip || typeof pid === "undefined" || typeof ip === "undefined") {
if (
!pid ||
!nodeId ||
typeof pid === "undefined" ||
typeof nodeId === "undefined"
) {
return <div></div>;
}
return (
<Link
href={`worker/traceback?pid=${pid}&ip=${ip}&native=0`}
href={`worker/traceback?pid=${pid}&node_id=${nodeId}&native=0`}
target="_blank"
title="Sample the current Python stack trace for this worker."
rel="noreferrer"
Expand All @@ -112,16 +117,16 @@ export const CpuStackTraceLink = ({

export const CpuProfilingLink = ({
pid,
ip,
nodeId,
type = "",
}: CpuProfilingLinkProps) => {
if (!pid || !ip) {
if (!pid || !nodeId) {
return <div></div>;
}

return (
<Link
href={`worker/cpu_profile?pid=${pid}&ip=${ip}&duration=5&native=0`}
href={`worker/cpu_profile?pid=${pid}&node_id=${nodeId}&duration=5&native=0`}
target="_blank"
title="Profile the Python worker for 5 seconds (default) and display a CPU flame graph."
rel="noreferrer"
Expand Down Expand Up @@ -283,13 +288,13 @@ export const ProfilerButton = ({

export const MemoryProfilingButton = ({
pid,
ip,
nodeId,
type = "",
}: MemoryProfilingProps) => {
if (!pid || !ip) {
if (!pid || !nodeId) {
return <div></div>;
}
const profilerUrl = `memory_profile?pid=${pid}&ip=${ip}`;
const profilerUrl = `memory_profile?pid=${pid}&node_id=${nodeId}`;

return <ProfilerButton profilerUrl={profilerUrl} type={type} />;
};
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/client/src/components/ActorTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -609,19 +609,19 @@ const ActorTable = ({
<br />
<CpuProfilingLink
pid={pid}
ip={address?.ipAddress}
nodeId={address?.rayletId}
type=""
/>
<br />
<CpuStackTraceLink
pid={pid}
ip={address?.ipAddress}
nodeId={address?.rayletId}
type=""
/>
<br />
<MemoryProfilingButton
pid={pid}
ip={address?.ipAddress}
nodeId={address?.rayletId}
/>
</React.Fragment>
</TableCell>
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/client/src/pages/actor/ActorDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,19 @@ const ActorDetailPage = () => {
<div>
<CpuStackTraceLink
pid={actorDetail.pid}
ip={actorDetail.address?.ipAddress}
nodeId={actorDetail.address?.rayletId}
type=""
/>
<br />
<CpuProfilingLink
pid={actorDetail.pid}
ip={actorDetail.address?.ipAddress}
nodeId={actorDetail.address?.rayletId}
type=""
/>
<br />
<MemoryProfilingButton
pid={actorDetail.pid}
ip={actorDetail.address?.ipAddress}
nodeId={actorDetail.address?.rayletId}
type=""
/>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,19 @@ export const JobMetadataSection = ({ job }: JobMetadataSectionProps) => {
<div>
<CpuStackTraceLink
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
<br />
<CpuProfilingLink
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
<br />
<MemoryProfilingButton
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
</div>
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/client/src/pages/job/JobRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,19 @@ export const JobRow = ({ job }: JobRowProps) => {
)}
<CpuStackTraceLink
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
<br />
<CpuProfilingLink
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
<br />
<MemoryProfilingButton
pid={job.driver_info?.pid}
ip={job.driver_info?.node_ip_address}
nodeId={job.driver_info?.node_id}
type="Driver"
/>
</TableCell>
Expand Down
7 changes: 3 additions & 4 deletions python/ray/dashboard/client/src/pages/node/NodeRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ type WorkerRowProps = {
*/
export const WorkerRow = ({ node, worker }: WorkerRowProps) => {
const {
ip,
mem,
raylet: { nodeId },
} = node;
Expand Down Expand Up @@ -278,11 +277,11 @@ export const WorkerRow = ({ node, worker }: WorkerRowProps) => {
Log
</Link>
<br />
<CpuProfilingLink pid={pid} ip={ip} type="" />
<CpuProfilingLink pid={pid} nodeId={nodeId} type="" />
<br />
<CpuStackTraceLink pid={pid} ip={ip} type="" />
<CpuStackTraceLink pid={pid} nodeId={nodeId} type="" />
<br />
<MemoryProfilingButton pid={pid} ip={ip} />
<MemoryProfilingButton pid={pid} nodeId={nodeId} />
</TableCell>
<TableCell>
<PercentageBar num={Number(cpu)} total={100}>
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ray._private.ray_constants import env_bool, env_integer

DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_ADDR_PREFIX = "DASHBOARD_AGENT_ADDR_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_S_ENV_NAME = (
"RAY_DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_S" # noqa
Expand Down
3 changes: 1 addition & 2 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ async def get_agent_infos(
return {}

def _create_agent_info(node_id: str):
(http_port, grpc_port) = DataSource.agents[node_id]
node_ip = DataSource.nodes[node_id]["nodeManagerAddress"]
(node_ip, http_port, grpc_port) = DataSource.agents[node_id]

return dict(
ipAddress=node_ip,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def add_agent(agent):
http_port = agent[1]["httpPort"]
grpc_port = agent[1]["grpcPort"]
DataSource.nodes[node_id] = {"nodeManagerAddress": node_ip}
DataSource.agents[node_id] = (http_port, grpc_port)
DataSource.agents[node_id] = (node_ip, http_port, grpc_port)

def del_agent(agent):
node_id = agent[0]
Expand Down
40 changes: 36 additions & 4 deletions python/ray/dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DEBUG_AUTOSCALING_STATUS,
env_integer,
)
from ray._private.gcs_pubsub import GcsAioResourceUsageSubscriber
from ray._private.utils import get_or_create_event_loop
from ray.autoscaler._private.util import (
LoadMetricsSummary,
Expand Down Expand Up @@ -215,10 +216,10 @@ async def _update_agent(self, node_id):
present until agent.py starts, so we need to loop waiting for agent.py writes
its port to internal kv.
"""
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}".encode()
key = f"{dashboard_consts.DASHBOARD_AGENT_ADDR_PREFIX}{node_id}".encode()
while True:
try:
agent_port = await self.gcs_aio_client.internal_kv_get(
agent_addr = await self.gcs_aio_client.internal_kv_get(
key,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
timeout=None,
Expand All @@ -227,8 +228,8 @@ async def _update_agent(self, node_id):
# node is still alive.
if DataSource.nodes.get(node_id, {}).get("state") != "ALIVE":
return
if agent_port:
DataSource.agents[node_id] = json.loads(agent_port)
if agent_addr:
DataSource.agents[node_id] = json.loads(agent_addr)
return
except Exception:
logger.exception(f"Error getting agent port for node {node_id}.")
Expand Down Expand Up @@ -451,10 +452,41 @@ def postprocess(node_id_response_tuples):
for node_id, new_stat in new_node_stats.items():
DataSource.node_stats[node_id] = new_stat

async def _update_node_physical_stats(self):
"""
Update DataSource.node_physical_stats by subscribing to the GCS resource usage.
"""
subscriber = GcsAioResourceUsageSubscriber(address=self.gcs_address)
await subscriber.subscribe()

loop = get_or_create_event_loop()

while True:
try:
# The key is b'RAY_REPORTER:{node id hex}',
# e.g. b'RAY_REPORTER:2b4fbd...'
key, data = await subscriber.poll()
if key is None:
continue

# NOTE: Every iteration is executed inside the thread-pool executor
# (TPE) to avoid blocking the Dashboard's event-loop
parsed_data = await loop.run_in_executor(
self._executor, json.loads, data
)

node_id = key.split(":")[-1]
DataSource.node_physical_stats[node_id] = parsed_data
except Exception:
logger.exception(
"Error receiving node physical stats from _update_node_physical_stats."
)

async def run(self, server):
await asyncio.gather(
self._update_nodes(),
self._update_node_stats(),
self._update_node_physical_stats(),
)

@staticmethod
Expand Down
47 changes: 47 additions & 0 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,5 +252,52 @@ def verify():
assert success


@pytest.mark.skipif(
sys.platform == "win32", reason="setproctitle does not change psutil.cmdline"
)
def test_node_physical_stats(enable_test_module, shutdown_only):
"""
Tests NodeHead._update_node_physical_stats.
"""
addresses = ray.init(include_dashboard=True, num_cpus=6)

@ray.remote(num_cpus=1)
class Actor:
def getpid(self):
return os.getpid()

actors = [Actor.remote() for _ in range(6)]
actor_pids = ray.get([actor.getpid.remote() for actor in actors])
actor_pids = set(actor_pids)

webui_url = addresses["webui_url"]
assert wait_until_server_available(webui_url) is True
webui_url = format_web_url(webui_url)

def _check_workers():
try:
resp = requests.get(webui_url + "/test/dump?key=node_physical_stats")
resp.raise_for_status()
result = resp.json()
assert result["result"] is True
node_physical_stats = result["data"]["nodePhysicalStats"]
assert len(node_physical_stats) == 1
current_stats = node_physical_stats[addresses["node_id"]]
# Check Actor workers
current_actor_pids = set()
for worker in current_stats["workers"]:
if "ray::Actor" in worker["cmdline"][0]:
current_actor_pids.add(worker["pid"])
assert current_actor_pids == actor_pids
# Check raylet cmdline
assert "raylet" in current_stats["cmdline"][0]
return True
except Exception as ex:
logger.info(ex)
return False

wait_for_condition(_check_workers, timeout=10)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
Loading
Loading