Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 78 additions & 74 deletions src/gt4py/next/program_processors/runners/dace/workflow/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _make_if_region_for_metrics_collection(
return if_region, then_state


def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
def add_instrumentation(sdfg: dace.SDFG, gpu: bool, sdfg_has_synchronization: bool) -> None:
"""
Instrument SDFG with measurement of total execution time.

Expand All @@ -169,16 +169,51 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:

The execution time is measured in seconds and represented as a 'float64' value.
It is written to the global array 'SDFG_ARG_METRIC_COMPUTE_TIME'.

Args:
sdfg: The SDFG to be instrumented with time measurements.
gpu: Flag that specifies if the SDFG is targeting GPU execution.
sdfg_has_synchronization: Whether the SDFG contains start and stop synchronization points.
"""
output, _ = sdfg.add_array(gtx_wfdcommon.SDFG_ARG_METRIC_COMPUTE_TIME, [1], dace.float64)
start_time, _ = sdfg.add_scalar("gt_start_time", dace.int64, transient=True)
metrics_level = sdfg.add_symbol(gtx_wfdcommon.SDFG_ARG_METRIC_LEVEL, dace.int32)

#### 1. Synchronize the CUDA device, in order to wait for kernels completion.
# retrieve the sink nodes before adding the if-region nodes, because these will
# become new sink nodes.
sink_nodes = sdfg.sink_nodes()
start_block = sdfg.start_block

entry_if_region, begin_state = _make_if_region_for_metrics_collection(
"metrics_entry", metrics_level, sdfg
)
exit_if_region, end_state = _make_if_region_for_metrics_collection(
"metrics_exit", metrics_level, sdfg
)

if sdfg_has_synchronization:
# Keep the existing synchronization points, and put the entry if-region
# after the start block.
for edge in list(sdfg.out_edges(start_block)):
sdfg.add_edge(entry_if_region, edge.dst, edge.data)
sdfg.remove_edge(edge)
sdfg.add_edge(start_block, entry_if_region, dace.InterstateEdge())
# Put the exit if-region right after the last block.
assert len(sink_nodes) == 1
sdfg.add_edge(sink_nodes[0], exit_if_region, dace.InterstateEdge())
else:
# Use the newly created entry if-region as new start block.
sdfg.add_edge(entry_if_region, start_block, dace.InterstateEdge())
# Similarly, use the exit if-region as the last block.
for sink_node in sink_nodes:
sdfg.add_edge(sink_node, exit_if_region, dace.InterstateEdge())
assert sdfg.in_degree(exit_if_region) > 0

#### 1. Synchronize the CUDA device if the sync states are not provided.
# Even when the target device is GPU, it can happen that dace emits code without
# GPU kernels. In this case, the cuda headers are not imported and the SDFG is
# compiled as plain C++. Therefore, we also check here the schedule of SDFG maps.
if gpu and _has_gpu_schedule(sdfg):
if gpu and _has_gpu_schedule(sdfg) and not sdfg_has_synchronization:
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."

Expand All @@ -189,33 +224,23 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
has_side_effects = True

else:
sync_code = ""
sync_code = "// The SDFG execution should already be synchronized"
has_side_effects = False

#### 2. Timestamp the SDFG entry point.
entry_if_region, begin_state = _make_if_region_for_metrics_collection(
"program_entry", metrics_level, sdfg
)

for source_state in sdfg.source_nodes():
if source_state is entry_if_region:
continue
sdfg.add_edge(entry_if_region, source_state, dace.InterstateEdge())
source_state.is_start_block = False
assert sdfg.out_degree(entry_if_region) > 0
entry_if_region.is_start_block = True

tlet_start_timer = begin_state.add_tasklet(
"gt_start_timer",
inputs={},
outputs={"time"},
code="""\
code=sync_code
+ """
auto now = std::chrono::high_resolution_clock::now();
time = std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()
).count();
""",
language=dace.dtypes.Language.CPP,
side_effects=has_side_effects,
)
begin_state.add_edge(
tlet_start_timer,
Expand All @@ -226,17 +251,6 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
)

#### 3. Collect the SDFG end timestamp and produce the compute metric.
exit_if_region, end_state = _make_if_region_for_metrics_collection(
"program_exit", metrics_level, sdfg
)

for sink_state in sdfg.sink_nodes():
if sink_state is exit_if_region:
continue
sdfg.add_edge(sink_state, exit_if_region, dace.InterstateEdge())
assert sdfg.in_degree(exit_if_region) > 0

# Populate the branch that computes the stencil time metric
tlet_stop_timer = end_state.add_tasklet(
"gt_stop_timer",
inputs={"run_cpp_start_time"},
Expand Down Expand Up @@ -269,13 +283,13 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:

if gpu and _has_gpu_schedule(sdfg) and config.ADD_GPU_TRACE_MARKERS:
sdfg.instrument = dace.dtypes.InstrumentationType.GPU_TX_MARKERS
for node, _ in sdfg.all_nodes_recursive():
for sink_node, _ in sdfg.all_nodes_recursive():
if isinstance(
node, dace.nodes.MapEntry
sink_node, dace.nodes.MapEntry
): # Add ranges to scopes and maps that are NOT scheduled to the GPU
node.instrument = dace.dtypes.InstrumentationType.GPU_TX_MARKERS
elif isinstance(node, dace.sdfg.state.SDFGState):
node.instrument = dace.dtypes.InstrumentationType.GPU_TX_MARKERS
sink_node.instrument = dace.dtypes.InstrumentationType.GPU_TX_MARKERS
elif isinstance(sink_node, dace.sdfg.state.SDFGState):
sink_node.instrument = dace.dtypes.InstrumentationType.GPU_TX_MARKERS

# Check SDFG validity after applying the above changes.
# Normally, we do not call `SDFGState.add_tasklet()` directly, instead we call
Expand All @@ -292,8 +306,6 @@ def make_sdfg_call_sync(sdfg: dace.SDFG, gpu: bool) -> None:
This means that `CompiledSDFG.fast_call()` will return only after all computations
have _finished_ and the results are available. This function only has an effect for
work that runs on the GPU. Furthermore, all work is scheduled on the default stream.

Todo: Revisit this function once DaCe changes its behaviour in this regard.
"""

if not gpu:
Expand All @@ -306,50 +318,38 @@ def make_sdfg_call_sync(sdfg: dace.SDFG, gpu: bool) -> None:
# are not imported and the SDFG is compiled as plain C++.
return

assert dace.Config.get("compiler.cuda.max_concurrent_streams") == -1, (
f"Expected `max_concurrent_streams == -1` but it was `{dace.Config.get('compiler.cuda.max_concurrent_streams')}`."
)
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."

# If we are using the default stream, things are a bit simpler/harder. For some
# reasons when using the default stream, DaCe seems to skip _all_ synchronization,
# for more see [DaCe issue#2120](https://github.com/spcl/dace/issues/2120).
# Thus the `CompiledSDFG.fast_call()` call is truly asynchronous, i.e. just
# launches the kernels and then exist. Thus we have to add a synchronization
# at the end to have a synchronous call. We can not use `SDFG.append_exit_code()`
# because that code is only run at the `exit()` stage, not after a call. Thus we
# will generate an SDFGState that contains a Tasklet with the sync call.
sync_state = sdfg.add_state("sync_state")
for sink_state in sdfg.sink_nodes():
if sink_state is sync_state:
entry_state = sdfg.add_state("sync_entry")
for source_node in sdfg.source_nodes():
if source_node is entry_state:
continue
sdfg.add_edge(sink_state, sync_state, dace.InterstateEdge())
assert sdfg.in_degree(sync_state) > 0
sdfg.add_edge(entry_state, source_node, dace.InterstateEdge())
assert sdfg.out_degree(entry_state) > 0

# NOTE: Since the synchronization is done through the Tasklet explicitly,
# we can disable synchronization for the last state. Might be useless.
sync_state.nosync = True
exit_state = sdfg.add_state("sync_exit")
for sink_node in sdfg.sink_nodes():
if sink_node is exit_state:
continue
sdfg.add_edge(sink_node, exit_state, dace.InterstateEdge())
assert sdfg.in_degree(exit_state) > 0

# NOTE: We should actually wrap the `StreamSynchronize` function inside a
# NOTE: We should actually wrap the `DeviceSynchronize` function inside a
# `DACE_GPU_CHECK()` macro. However, this only works in GPU context, but
# here we are in CPU context. Thus we can not do it.
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."
sync_state.add_tasklet(
"sync_tlet",
inputs=set(),
outputs=set(),
code=f"{dace_gpu_backend}StreamSynchronize({dace_gpu_backend}StreamDefault);",
language=dace.dtypes.Language.CPP,
side_effects=True,
)

# DaCe [still generates a stream](https://github.com/spcl/dace/blob/54c935cfe74a52c5107dc91680e6201ddbf86821/dace/codegen/targets/cuda.py#L467)
# despite not using it. Thus to be absolutely sure, we will not set that stream
# to the default stream.
sdfg.append_init_code(
f"__dace_gpu_set_all_streams(__state, {dace_gpu_backend}StreamDefault);",
location="cuda",
)
# NOTE: Since the synchronization is done through the Tasklet explicitly,
# we can disable synchronization for the state.
for state in [entry_state, exit_state]:
state.add_tasklet(
"sync_tlet",
inputs=set(),
outputs=set(),
code=f"{dace_gpu_backend}DeviceSynchronize();",
language=dace.dtypes.Language.CPP,
side_effects=True,
)
state.nosync = True


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -440,7 +440,11 @@ def _generate_sdfg_without_configuring_dace(
make_sdfg_call_sync(sdfg, on_gpu)

if self.use_metrics:
add_instrumentation(sdfg, on_gpu)
add_instrumentation(
sdfg=sdfg,
gpu=on_gpu,
sdfg_has_synchronization=(on_gpu and not self.async_sdfg_call),
)

return sdfg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ def _are_streams_set_to_default_stream(sdfg: dace.SDFG) -> bool:
)


def _are_streams_synchronized(sdfg: dace.SDFG) -> bool:
re_stream_sync = re.compile(r"\b(cuda|hip)StreamSynchronize\b")
def _is_device_synchronized(sdfg: dace.SDFG) -> bool:
re_device_sync = re.compile(r"\b(cuda|hip)DeviceSynchronize\(\)\b")
# The synchronization calls are in the CPU not the GPU code.
return any(
re_stream_sync.match(code.clean_code)
re_device_sync.match(code.clean_code)
for code in sdfg.generate_code()
if code.language == "cpp"
)


def _check_sdfg_with_async_call(sdfg: dace.SDFG) -> None:
def _check_sdfg_async_call(sdfg: dace.SDFG) -> None:
# Because we are using the default stream, the launch is asynchronous. Thus we
# have to check if there is no synchronization state. However, we will do a
# stronger test. Instead we will make sure that there are no synchronization
Expand All @@ -158,44 +158,59 @@ def _check_sdfg_with_async_call(sdfg: dace.SDFG) -> None:
# edge. However, we do not have that case.

assert not any(
state.label == "sync_state"
state.label == "sync_entry"
for state in sdfg.source_nodes()
if isinstance(state, dace.SDFGState)
)
assert not any(
state.label == "sync_exit"
for state in sdfg.sink_nodes()
if isinstance(state, dace.SDFGState)
)
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
assert _are_streams_set_to_default_stream(sdfg)


def _check_sdfg_without_async_call(sdfg: dace.SDFG) -> None:
def _check_sdfg_sync_call(sdfg: dace.SDFG) -> None:
states = sdfg.states()
sink_states = sdfg.sink_nodes()

# Test if the distinctive sink node is present.
# Test if the distinctive source and sink nodes are present.
assert len(states) > 2
source_states = sdfg.source_nodes()
assert len(source_states) == 1
entry_state = source_states[0]
assert isinstance(entry_state, dace.SDFGState)
assert entry_state.label == "sync_entry"
sink_states = sdfg.sink_nodes()
assert len(sink_states) == 1
assert len(sink_states) < len(states)
sync_state = sink_states[0]
assert isinstance(sync_state, dace.SDFGState)
assert sync_state.label == "sync_state"
assert sync_state.nosync == True # Because sync is done through the tasklet.
assert sync_state.number_of_nodes() == 1

sync_tlet = next(iter(sync_state.nodes()))
assert isinstance(sync_tlet, dace_nodes.Tasklet)
assert sync_tlet.side_effects
assert sync_tlet.label == "sync_tlet"

assert re.match(r"(cuda|hip)StreamSynchronize\(\1StreamDefault\)", sync_tlet.code.as_string)
assert _are_streams_set_to_default_stream(sdfg)
exit_state = sink_states[0]
assert isinstance(exit_state, dace.SDFGState)
assert exit_state.label == "sync_exit"

for state in [entry_state, exit_state]:
assert state.nosync == True # Because sync is done through the tasklet.
assert state.number_of_nodes() == 1

sync_tlet = next(iter(state.nodes()))
assert isinstance(sync_tlet, dace_nodes.Tasklet)
assert sync_tlet.side_effects
assert sync_tlet.label == "sync_tlet"
assert re.match(r"(cuda|hip)DeviceSynchronize\(\);", sync_tlet.code.as_string)


def _check_cpu_sdfg_call(sdfg: dace.SDFG) -> None:
# CPU is always synchronous execution, thus we check that there is no sync state.
assert not any(
state.label == "sync_state"
state.label == "sync_entry"
for state in sdfg.source_nodes()
if isinstance(state, dace.SDFGState)
)
assert not any(
state.label == "sync_exit"
for state in sdfg.sink_nodes()
if isinstance(state, dace.SDFGState)
)
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -234,9 +249,9 @@ def test_generate_sdfg_async_call(make_async_sdfg_call: bool, device_type: core_
if device_type == core_defs.DeviceType.CPU:
_check_cpu_sdfg_call(sdfg)
elif make_async_sdfg_call:
_check_sdfg_with_async_call(sdfg)
_check_sdfg_async_call(sdfg)
else:
_check_sdfg_without_async_call(sdfg)
_check_sdfg_sync_call(sdfg)


def test_generate_sdfg_async_call_no_map(device_type: core_defs.DeviceType):
Expand Down Expand Up @@ -270,7 +285,7 @@ def test_generate_sdfg_async_call_no_map(device_type: core_defs.DeviceType):
if device_type == core_defs.DeviceType.CPU:
_check_cpu_sdfg_call(sdfg)
else:
_check_sdfg_with_async_call(sdfg)
_check_sdfg_async_call(sdfg)


def _make_multi_state_sdfg_0(
Expand Down Expand Up @@ -412,7 +427,7 @@ def test_generate_sdfg_async_call_multi_state(
# by this. See https://github.com/spcl/dace/issues/2120 for more.
# In the case of `_make_multi_state_sdfg_3()` there would be a sync after the Map, before
# the Tasklet, if the default stream was not used!
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
else:
# There is no dependency between the states, so no sync.
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
Loading