Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 80 additions & 71 deletions src/gt4py/next/program_processors/runners/dace/workflow/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def _make_if_region_for_metrics_collection(
return if_region, then_state


def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
def add_instrumentation(
sdfg: dace.SDFG, gpu: bool, sync_states: tuple[dace.SDFGState, dace.SDFGState] | None
) -> None:
"""
Instrument SDFG with measurement of total execution time.

Expand All @@ -169,16 +171,51 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:

The execution time is measured in seconds and represented as a 'float64' value.
It is written to the global array 'SDFG_ARG_METRIC_COMPUTE_TIME'.

Args:
sdfg: The SDFG to be instrumented with time measurements.
gpu: Flag that specifies if the SDFG is targeting GPU execution.
sync_states: If provided, a tuple of two states, the source state and the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean with "source state" if you mean the first state in the SDFG then I would suggest using sdfg.start_block/sdfg.start_state.

sync state of the SDFG, containing tasklets with GPU device synchronization.
"""
output, _ = sdfg.add_array(gtx_wfdcommon.SDFG_ARG_METRIC_COMPUTE_TIME, [1], dace.float64)
start_time, _ = sdfg.add_scalar("gt_start_time", dace.int64, transient=True)
metrics_level = sdfg.add_symbol(gtx_wfdcommon.SDFG_ARG_METRIC_LEVEL, dace.int32)

#### 1. Synchronize the CUDA device, in order to wait for kernels completion.
entry_if_region, begin_state = _make_if_region_for_metrics_collection(
"metrics_entry", metrics_level, sdfg
)
exit_if_region, end_state = _make_if_region_for_metrics_collection(
"metrics_exit", metrics_level, sdfg
)
if sync_states is None:
# Use the newly created entry if-region as new source node
for source_state in sdfg.source_nodes():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that all nodes in the SDFG are states and we do not have nested controlflow regions, which is currently true.
This means if we ever change the design we would have to modify this code as well.
So I would eitehr generalize it or add a if not isinstance(source_state, dace.SDFGState) raise NotImplemented() exception to it to be notified.

if source_state not in [entry_if_region, exit_if_region]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would slightly restructure the code to something like:

if sync_states is None:
    sync_states = __find_sync_states__()
    //Variable for the GPU case below

the-else-branch-of-the-if

sdfg.add_edge(entry_if_region, source_state, dace.InterstateEdge())
source_state.is_start_block = False
assert sdfg.out_degree(entry_if_region) > 0
entry_if_region.is_start_block = True
# Similarly, the exit if-region as sink node.
for sink_state in sdfg.sink_nodes():
if sink_state not in [entry_if_region, exit_if_region]:
sdfg.add_edge(sink_state, exit_if_region, dace.InterstateEdge())
assert sdfg.in_degree(exit_if_region) > 0
else:
# Keep the existing synchronization points, and put the entry if-region after the entry state
entry_state, exit_state = sync_states
for edge in list(sdfg.out_edges(entry_state)):
sdfg.add_edge(entry_if_region, edge.dst, edge.data)
sdfg.remove_edge(edge)
sdfg.add_edge(entry_state, entry_if_region, dace.InterstateEdge())
# Put the exit if-region right after the exit state.
sdfg.add_edge(exit_state, exit_if_region, dace.InterstateEdge())

#### 1. Synchronize the CUDA device if the sync states are not provided.
# Even when the target device is GPU, it can happen that dace emits code without
# GPU kernels. In this case, the cuda headers are not imported and the SDFG is
# compiled as plain C++. Therefore, we also check here the schedule of SDFG maps.
if gpu and _has_gpu_schedule(sdfg):
if gpu and _has_gpu_schedule(sdfg) and sync_states is None:
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."

Expand All @@ -193,29 +230,19 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
has_side_effects = False

#### 2. Timestamp the SDFG entry point.
entry_if_region, begin_state = _make_if_region_for_metrics_collection(
"program_entry", metrics_level, sdfg
)

for source_state in sdfg.source_nodes():
if source_state is entry_if_region:
continue
sdfg.add_edge(entry_if_region, source_state, dace.InterstateEdge())
source_state.is_start_block = False
assert sdfg.out_degree(entry_if_region) > 0
entry_if_region.is_start_block = True

tlet_start_timer = begin_state.add_tasklet(
"gt_start_timer",
inputs={},
outputs={"time"},
code="""\
code=sync_code
+ """\
auto now = std::chrono::high_resolution_clock::now();
time = std::chrono::duration_cast<std::chrono::nanoseconds>(
now.time_since_epoch()
).count();
""",
language=dace.dtypes.Language.CPP,
side_effects=has_side_effects,
)
begin_state.add_edge(
tlet_start_timer,
Expand All @@ -226,17 +253,6 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
)

#### 3. Collect the SDFG end timestamp and produce the compute metric.
exit_if_region, end_state = _make_if_region_for_metrics_collection(
"program_exit", metrics_level, sdfg
)

for sink_state in sdfg.sink_nodes():
if sink_state is exit_if_region:
continue
sdfg.add_edge(sink_state, exit_if_region, dace.InterstateEdge())
assert sdfg.in_degree(exit_if_region) > 0

# Populate the branch that computes the stencil time metric
tlet_stop_timer = end_state.add_tasklet(
"gt_stop_timer",
inputs={"run_cpp_start_time"},
Expand Down Expand Up @@ -286,70 +302,62 @@ def add_instrumentation(sdfg: dace.SDFG, gpu: bool) -> None:
sdfg.validate()


def make_sdfg_call_sync(sdfg: dace.SDFG, gpu: bool) -> None:
def make_sdfg_call_sync(sdfg: dace.SDFG, gpu: bool) -> tuple[dace.SDFGState, dace.SDFGState] | None:
"""Process the SDFG such that the call is synchronous.

This means that `CompiledSDFG.fast_call()` will return only after all computations
have _finished_ and the results are available. This function only has an effect for
work that runs on the GPU. Furthermore, all work is scheduled on the default stream.

Todo: Revisit this function once DaCe changes its behaviour in this regard.
Returns:
The SDFG entry and exit states, each calling the GPU primitive for device synchronization.
"""

if not gpu:
# This is only a problem on GPU. Dace uses OpenMP on CPU and
# the OpenMP parallel region creates a synchronization point.
return
return None
elif not _has_gpu_schedule(sdfg):
# Even when the target device is GPU, it can happen that dace
# emits code without GPU kernels. In this case, the cuda headers
# are not imported and the SDFG is compiled as plain C++.
return
return None

assert dace.Config.get("compiler.cuda.max_concurrent_streams") == -1, (
f"Expected `max_concurrent_streams == -1` but it was `{dace.Config.get('compiler.cuda.max_concurrent_streams')}`."
)
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."

# If we are using the default stream, things are a bit simpler/harder. For some
# reasons when using the default stream, DaCe seems to skip _all_ synchronization,
# for more see [DaCe issue#2120](https://github.com/spcl/dace/issues/2120).
# Thus the `CompiledSDFG.fast_call()` call is truly asynchronous, i.e. just
# launches the kernels and then exist. Thus we have to add a synchronization
# at the end to have a synchronous call. We can not use `SDFG.append_exit_code()`
# because that code is only run at the `exit()` stage, not after a call. Thus we
# will generate an SDFGState that contains a Tasklet with the sync call.
sync_state = sdfg.add_state("sync_state")
for sink_state in sdfg.sink_nodes():
if sink_state is sync_state:
entry_state = sdfg.add_state("sync_entry")
for source_state in sdfg.source_nodes():
if source_state is entry_state:
continue
sdfg.add_edge(sink_state, sync_state, dace.InterstateEdge())
assert sdfg.in_degree(sync_state) > 0
sdfg.add_edge(entry_state, source_state, dace.InterstateEdge())
source_state.is_start_block = False
assert sdfg.out_degree(entry_state) > 0
entry_state.is_start_block = True

# NOTE: Since the synchronization is done through the Tasklet explicitly,
# we can disable synchronization for the last state. Might be useless.
sync_state.nosync = True
exit_state = sdfg.add_state("sync_exit")
for sink_state in sdfg.sink_nodes():
if sink_state is exit_state:
continue
sdfg.add_edge(sink_state, exit_state, dace.InterstateEdge())
assert sdfg.in_degree(exit_state) > 0

# NOTE: We should actually wrap the `StreamSynchronize` function inside a
# NOTE: We should actually wrap the `DeviceSynchronize` function inside a
# `DACE_GPU_CHECK()` macro. However, this only works in GPU context, but
# here we are in CPU context. Thus we can not do it.
dace_gpu_backend = dace.Config.get("compiler.cuda.backend")
assert dace_gpu_backend in ["cuda", "hip"], f"GPU backend '{dace_gpu_backend}' is unknown."
sync_state.add_tasklet(
"sync_tlet",
inputs=set(),
outputs=set(),
code=f"{dace_gpu_backend}StreamSynchronize({dace_gpu_backend}StreamDefault);",
language=dace.dtypes.Language.CPP,
side_effects=True,
)

# DaCe [still generates a stream](https://github.com/spcl/dace/blob/54c935cfe74a52c5107dc91680e6201ddbf86821/dace/codegen/targets/cuda.py#L467)
# despite not using it. Thus to be absolutely sure, we will not set that stream
# to the default stream.
sdfg.append_init_code(
f"__dace_gpu_set_all_streams(__state, {dace_gpu_backend}StreamDefault);",
location="cuda",
)
# NOTE: Since the synchronization is done through the Tasklet explicitly,
# we can disable synchronization for the state.
for state in [entry_state, exit_state]:
state.add_tasklet(
"sync_tlet",
inputs=set(),
outputs=set(),
code=f"{dace_gpu_backend}DeviceSynchronize();",
language=dace.dtypes.Language.CPP,
side_effects=True,
)
state.nosync = True
return entry_state, exit_state


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -436,11 +444,12 @@ def _generate_sdfg_without_configuring_dace(

if self.async_sdfg_call:
make_sdfg_call_async(sdfg, on_gpu)
sync_states = None
else:
make_sdfg_call_sync(sdfg, on_gpu)
sync_states = make_sdfg_call_sync(sdfg, on_gpu)

if self.use_metrics:
add_instrumentation(sdfg, on_gpu)
add_instrumentation(sdfg, on_gpu, sync_states)

return sdfg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ def _are_streams_set_to_default_stream(sdfg: dace.SDFG) -> bool:
)


def _are_streams_synchronized(sdfg: dace.SDFG) -> bool:
re_stream_sync = re.compile(r"\b(cuda|hip)StreamSynchronize\b")
def _is_device_synchronized(sdfg: dace.SDFG) -> bool:
re_device_sync = re.compile(r"\b(cuda|hip)DeviceSynchronize\(\)\b")
# The synchronization calls are in the CPU not the GPU code.
return any(
re_stream_sync.match(code.clean_code)
re_device_sync.match(code.clean_code)
for code in sdfg.generate_code()
if code.language == "cpp"
)


def _check_sdfg_with_async_call(sdfg: dace.SDFG) -> None:
def _check_sdfg_async_call(sdfg: dace.SDFG) -> None:
# Because we are using the default stream, the launch is asynchronous. Thus we
# have to check if there is no synchronization state. However, we will do a
# stronger test. Instead we will make sure that there are no synchronization
Expand All @@ -158,44 +158,59 @@ def _check_sdfg_with_async_call(sdfg: dace.SDFG) -> None:
# edge. However, we do not have that case.

assert not any(
state.label == "sync_state"
state.label == "sync_entry"
for state in sdfg.source_nodes()
if isinstance(state, dace.SDFGState)
)
assert not any(
state.label == "sync_exit"
for state in sdfg.sink_nodes()
if isinstance(state, dace.SDFGState)
)
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
assert _are_streams_set_to_default_stream(sdfg)


def _check_sdfg_without_async_call(sdfg: dace.SDFG) -> None:
def _check_sdfg_sync_call(sdfg: dace.SDFG) -> None:
states = sdfg.states()
sink_states = sdfg.sink_nodes()

# Test if the distinctive sink node is present.
# Test if the distinctive source and sink nodes are present.
assert len(states) > 2
source_states = sdfg.source_nodes()
assert len(source_states) == 1
entry_state = source_states[0]
assert isinstance(entry_state, dace.SDFGState)
assert entry_state.label == "sync_entry"
sink_states = sdfg.sink_nodes()
assert len(sink_states) == 1
assert len(sink_states) < len(states)
sync_state = sink_states[0]
assert isinstance(sync_state, dace.SDFGState)
assert sync_state.label == "sync_state"
assert sync_state.nosync == True # Because sync is done through the tasklet.
assert sync_state.number_of_nodes() == 1

sync_tlet = next(iter(sync_state.nodes()))
assert isinstance(sync_tlet, dace_nodes.Tasklet)
assert sync_tlet.side_effects
assert sync_tlet.label == "sync_tlet"

assert re.match(r"(cuda|hip)StreamSynchronize\(\1StreamDefault\)", sync_tlet.code.as_string)
assert _are_streams_set_to_default_stream(sdfg)
exit_state = sink_states[0]
assert isinstance(exit_state, dace.SDFGState)
assert exit_state.label == "sync_exit"

for state in [entry_state, exit_state]:
assert state.nosync == True # Because sync is done through the tasklet.
assert state.number_of_nodes() == 1

sync_tlet = next(iter(state.nodes()))
assert isinstance(sync_tlet, dace_nodes.Tasklet)
assert sync_tlet.side_effects
assert sync_tlet.label == "sync_tlet"
assert re.match(r"(cuda|hip)DeviceSynchronize\(\);", sync_tlet.code.as_string)


def _check_cpu_sdfg_call(sdfg: dace.SDFG) -> None:
# CPU is always synchronous execution, thus we check that there is no sync state.
assert not any(
state.label == "sync_state"
state.label == "sync_entry"
for state in sdfg.source_nodes()
if isinstance(state, dace.SDFGState)
)
assert not any(
state.label == "sync_exit"
for state in sdfg.sink_nodes()
if isinstance(state, dace.SDFGState)
)
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -234,9 +249,9 @@ def test_generate_sdfg_async_call(make_async_sdfg_call: bool, device_type: core_
if device_type == core_defs.DeviceType.CPU:
_check_cpu_sdfg_call(sdfg)
elif make_async_sdfg_call:
_check_sdfg_with_async_call(sdfg)
_check_sdfg_async_call(sdfg)
else:
_check_sdfg_without_async_call(sdfg)
_check_sdfg_sync_call(sdfg)


def test_generate_sdfg_async_call_no_map(device_type: core_defs.DeviceType):
Expand Down Expand Up @@ -270,7 +285,7 @@ def test_generate_sdfg_async_call_no_map(device_type: core_defs.DeviceType):
if device_type == core_defs.DeviceType.CPU:
_check_cpu_sdfg_call(sdfg)
else:
_check_sdfg_with_async_call(sdfg)
_check_sdfg_async_call(sdfg)


def _make_multi_state_sdfg_0(
Expand Down Expand Up @@ -412,7 +427,7 @@ def test_generate_sdfg_async_call_multi_state(
# by this. See https://github.com/spcl/dace/issues/2120 for more.
# In the case of `_make_multi_state_sdfg_3()` there would be a sync after the Map, before
# the Tasklet, if the default stream was not used!
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
else:
# There is no dependency between the states, so no sync.
assert not _are_streams_synchronized(sdfg)
assert not _is_device_synchronized(sdfg)
Loading