Skip to content

Commit

Permalink
Revert "[pytorch/et] Allow ET to save additional resources for comple…
Browse files Browse the repository at this point in the history
…ting a trace like generated kernels and index tensor data (pytorch#143430)"

This reverts commit 33dd4f1.

Reverted pytorch#143430 on behalf of https://github.com/huydhn due to The internal diff D58707846 has been backed out ([comment](pytorch#143430 (comment)))
  • Loading branch information
pytorchmergebot committed Dec 21, 2024
1 parent 47c4e01 commit bee47b0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 278 deletions.
169 changes: 0 additions & 169 deletions test/profiler/test_execution_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,92 +204,6 @@ def trace_handler(p):
f" rf_ids_kineto = {rf_ids_kineto}\n",
)

@unittest.skipIf(not kineto_available(), "Kineto is required")
@skipIfHpu
@skipIfTorchDynamo("profiler gets ignored if dynamo activated")
def test_execution_trace_env_enabled_with_kineto(self, device):
import os

os.environ["ENABLE_PYTORCH_EXECUTION_TRACE"] = "1"
os.environ["ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS"] = "1"
trace_called_num = 0

def trace_handler(p):
nonlocal trace_called_num
trace_called_num += 1

use_device = (
torch.profiler.ProfilerActivity.CUDA
or torch.profiler.ProfilerActivity.XPU in supported_activities()
or torch.profiler.ProfilerActivity.HPU in supported_activities()
)
# Create a temp file to save kineto data.
kt = tempfile.NamedTemporaryFile(
mode="w+t", suffix=".kineto.json", delete=False
)
kt.close()

with profile(
activities=supported_activities(),
schedule=torch.profiler.schedule(
skip_first=3, wait=1, warmup=1, active=2, repeat=1
),
on_trace_ready=trace_handler,
) as p:
for idx in range(10):
with record_function(f"## LOOP {idx} ##"):
self.payload(device, use_device=use_device)
p.step()

# Uncomment for debugging
# print("Output kineto = ", kt.name)
# print("Output ET = ", fp.name)

p.export_chrome_trace(kt.name)
self.assertEqual(trace_called_num, 1)
et_path = p.execution_trace_observer.get_output_file_path()
et_res_path = p.execution_trace_observer.get_resources_dir(et_path)
# the path should be set up due to our env variables
self.assertTrue(et_path is not None)
# et_res_path should be an empty directory
self.assertTrue(os.path.isdir(et_res_path))
self.assertEqual(len(os.listdir(et_res_path)), 0)
# Compare the collected Execution Trace and Kineto Trace
# in terms of record func
nodes = self.get_execution_trace_root(et_path)
loop_count = 0
found_root_node = False
for n in nodes:
assert "name" in n
if "[pytorch|profiler|execution_trace|process]" in n["name"]:
found_root_node = True
if n["name"].startswith("## LOOP "):
loop_count += 1
self.assertTrue(found_root_node)
# Since profiler trace is active for 2 iterations
self.assertEqual(loop_count, 2)

# Compare the collected Execution Trace and Kineto Trace
# in terms of record func ID (rf_id) and External IDs
# both of these should match for the same trace window.

with open(kt.name) as f:
kineto = json.load(f)
events = kineto["traceEvents"]

# Look up rf_ids in both Execution and Kineto trace as two lists.
rf_ids_et = self.get_execution_trace_rf_ids(nodes)
rf_ids_kineto = self.get_kineto_rf_ids(events)

self.assertCountEqual(rf_ids_et, rf_ids_kineto)
self.assertListEqual(
rf_ids_et,
rf_ids_kineto,
msg=f"ET and kineto rf_id should exactly match\n"
f" rf_ids_et = {rf_ids_et}\n"
f" rf_ids_kineto = {rf_ids_kineto}\n",
)

def test_execution_trace_alone(self, device):
use_device = (
torch.profiler.ProfilerActivity.CUDA
Expand Down Expand Up @@ -330,31 +244,6 @@ def test_execution_trace_alone(self, device):
assert found_root_node
assert loop_count == expected_loop_events

def test_execution_trace_env_disabled(self, device):
import os

os.environ["ENABLE_PYTORCH_EXECUTION_TRACE"] = "0"
os.environ["ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS"] = "0"
use_device = (
torch.profiler.ProfilerActivity.CUDA
or torch.profiler.ProfilerActivity.HPU in supported_activities()
or torch.profiler.ProfilerActivity.XPU in supported_activities()
)

with profile(
activities=torch.profiler.supported_activities(),
record_shapes=True,
schedule=torch.profiler.schedule(
skip_first=3, wait=1, warmup=1, active=2, repeat=1
),
) as p:
for idx in range(10):
with record_function(f"## LOOP {idx} ##"):
self.payload(device, use_device=use_device)
p.step()

self.assertTrue(p.execution_trace_observer is None)

@unittest.skipIf(IS_WINDOWS, "torch.compile does not support WINDOWS")
@unittest.skipIf(
sys.version_info >= (3, 12), "torch.compile is not supported on python 3.12+"
Expand Down Expand Up @@ -408,64 +297,6 @@ def fn(a, b, c):
assert len(n["outputs"]["values"]) == 0
assert found_captured_triton_kernel_node

@unittest.skipIf(IS_WINDOWS, "torch.compile does not support WINDOWS")
@unittest.skipIf(
sys.version_info >= (3, 12), "torch.compile is not supported on python 3.12+"
)
@unittest.skipIf(
(not has_triton()) or (not TEST_CUDA and not TEST_XPU),
"need triton and device(CUDA or XPU) availability to run",
)
@skipCPUIf(True, "skip CPU device for testing profiling triton")
def test_execution_trace_env_enabled_with_pt2(self, device):
import os

os.environ["ENABLE_PYTORCH_EXECUTION_TRACE"] = "1"
os.environ["ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS"] = "1"

@torchdynamo.optimize("inductor")
def fn(a, b, c):
x = torch.nn.functional.linear(a, b)
x = x + c
return x.cos()

a, b, c = (torch.randn(4, 4, requires_grad=True).to(device) for _ in range(3))

inputs = [a, b, c]
with torch._inductor.config.patch(compile_threads=1):
fn(*inputs)

with profile(
activities=torch.profiler.supported_activities(),
record_shapes=True,
schedule=torch.profiler.schedule(
skip_first=3, wait=1, warmup=1, active=2, repeat=1
),
) as p:
for idx in range(10):
with record_function(f"## LOOP {idx} ##"):
fn(*inputs)
p.step()

et_path = p.execution_trace_observer.get_output_file_path()
et_res_path = p.execution_trace_observer.get_resources_dir(et_path)
# the path should be set up due to our env variables
self.assertTrue(et_path is not None)
# et_res_path should be an empty directory
self.assertTrue(os.path.isdir(et_res_path))
self.assertEqual(len(os.listdir(et_res_path)), 2)
nodes = self.get_execution_trace_root(et_path)
found_captured_triton_kernel_node = False
for n in nodes:
assert "name" in n
if "triton_" in n["name"]:
for attr in n["attrs"]:
if attr["name"] == "kernel_file" and attr["value"] != "":
found_captured_triton_kernel_node = True
assert len(n["inputs"]["values"]) > 0
assert len(n["outputs"]["values"]) == 0
assert found_captured_triton_kernel_node

def test_execution_trace_start_stop(self, device):
use_device = (
torch.profiler.ProfilerActivity.CUDA
Expand Down
125 changes: 16 additions & 109 deletions torch/profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,7 @@ def __init__(
with_flops=with_flops,
with_modules=with_modules,
experimental_config=experimental_config,
execution_trace_observer=execution_trace_observer
if execution_trace_observer
else ExecutionTraceObserver.build_execution_trace_obs_from_env(),
execution_trace_observer=execution_trace_observer,
acc_events=acc_events,
custom_trace_id_callback=custom_trace_id_callback,
)
Expand Down Expand Up @@ -880,129 +878,29 @@ def __init__(self) -> None:
"""
self._registered = False
self._execution_trace_running = False
self.extra_resources_collection = False
self.resources_dir: str = ""
self.output_file_path: str = ""

def __del__(self):
"""
Calls unregister_callback() to make sure to finalize outputs.
"""
self.unregister_callback()

@staticmethod
def build_execution_trace_obs_from_env() -> Optional["ExecutionTraceObserver"]:
"""
Returns an ExecutionTraceObserver instance if the environment variable
ENABLE_PYTORCH_EXECUTION_TRACE is set to 1, otherwise returns None.
Configures the observer to also collect extra resources if the environment variable
``ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS=1``. These are resources such as generated kernels,
index tensor data etc. that are required to make the Execution Trace replayable.
"""
if os.environ.get("ENABLE_PYTORCH_EXECUTION_TRACE", "0") == "1":
try:
fp = tempfile.NamedTemporaryFile("w+t", suffix=".et.json", delete=False)
except Exception as e:
warn(
f"Execution trace will not be recorded. Exception on creating default temporary file: {e}"
)
return None
fp.close()
et = ExecutionTraceObserver()
et.register_callback(fp.name)
# additionally, check if the env requires us to collect extra resources
if os.environ.get("ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS", "0") == "1":
et.set_extra_resource_collection(True)
else:
et.set_extra_resource_collection(False)
return et
return None

def set_extra_resource_collection(self, val) -> None:
"""
Collects extra resources such as generated kernels, index tensor data, and any other
metadata that is required to complete the Execution Trace content.
The caller should call this method with val=True after calling register_callback() if they want
to collect the extra resources.
"""
self.extra_resources_collection = val
if self.extra_resources_collection:
self.get_resources_dir(can_create=True)
return

def register_callback(self, output_file_path: str) -> Self:
"""
Adds ET observer to record function callbacks. The data will be
written to output_file_path.
"""
if not self._registered:
self.output_file_path = output_file_path
self._output_file_path = output_file_path
self._registered = _add_execution_trace_observer(output_file_path)
return self

def get_resources_dir(self, can_create=False) -> Optional[str]:
"""
Generates the resources directory for the generated kernels,
or index tensor data or any other metadata that is required
to complete the Execution Trace content.
The directory is created right where the ET file is being output.
Only works if the observer has called set_extra_resource_collection(val=True).
Returns None if the observer is not configured with extra resource collection.
"""
if not self.extra_resources_collection:
return None
if self.resources_dir:
# already created
return self.resources_dir
generated_path = ExecutionTraceObserver.get_resources_dir_for_et_path(
self.output_file_path, create_dir=can_create
)
if not generated_path:
# could not find of create the resources dir
return None
self.resources_dir = generated_path
return self.resources_dir

@staticmethod
def get_resources_dir_for_et_path(
trace_path, create_dir: bool = False
) -> Optional[str]:
work_dir, file_name = os.path.split(trace_path)
resource_dir = os.path.join(
work_dir, os.path.splitext(file_name)[0] + "_resources"
)
if not os.path.exists(resource_dir):
if create_dir:
try:
os.mkdir(resource_dir)
except Exception:
warn(f"Execution trace exception when creating {resource_dir}")
return None
else:
return None
return resource_dir

def unregister_callback(self):
"""
Removes ET observer from record function callbacks.
"""

def _save_triton_kernels():
try:
resource_dir = self.get_resources_dir()
except Exception as e:
warn(
f"Execution trace exception when generating resource directory: {e}"
)
return
if not resource_dir:
return

# Save the kernel paths for the generated kernels
from torch._inductor.codecache import PyCodeCache as PyCodeCache

Expand All @@ -1011,6 +909,12 @@ def _save_triton_kernels():
for v in PyCodeCache.modules
if getattr(v, "__file__", None) is not None
]
work_dir, file_name = os.path.split(self._output_file_path)
resource_dir = os.path.join(
work_dir, os.path.splitext(file_name)[0] + "_resources"
)
if not os.path.exists(resource_dir):
os.mkdir(resource_dir)

for kernel_file in kernel_files:
if kernel_file is None:
Expand Down Expand Up @@ -1064,14 +968,17 @@ def cleanup(self):
"""
self.unregister_callback()

def get_output_file_path(self) -> Optional[str]:
def get_output_file_path(self) -> str:
"""
Returns the output file name or None.
Returns the output file name.
"""
if self.output_file_path:
return self.output_file_path
if self.is_registered:
return self._output_file_path
else:
return None
raise RuntimeError(
"A callback to the ET profiler needs to be registered "
"first before getting the output file path"
)

def _record_pg_config(self) -> None:
# Records the PG config info to the trace as node:
Expand Down

0 comments on commit bee47b0

Please sign in to comment.