Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ jobs:
env:
ASCEND_HOME_PATH: /usr/local/Ascend/cann-8.5.0
PTOAS_ROOT: ${{ github.workspace }}/ptoas-bin
PTOAS_VERSION: v0.36
PTOAS_SHA256: 698f753b67ca4387e2e4ef96dfdbb35d71295e94f9f5b20a545b34647f548efc
PTOAS_VERSION: v0.37
PTOAS_SHA256: 60ddce76c69b6aba847f96dbbdbce2b3173cb5fb4143c6c5bd1a87a7176d8514
CMAKE_BUILD_PARALLEL_LEVEL: 16
CMAKE_C_COMPILER_LAUNCHER: ccache
CMAKE_CXX_COMPILER_LAUNCHER: ccache
Expand Down Expand Up @@ -171,8 +171,8 @@ jobs:
env:
ASCEND_HOME_PATH: /usr/local/Ascend/cann-8.5.0
PTOAS_ROOT: ${{ github.workspace }}/ptoas-bin
PTOAS_VERSION: v0.36
PTOAS_SHA256: 698f753b67ca4387e2e4ef96dfdbb35d71295e94f9f5b20a545b34647f548efc
PTOAS_VERSION: v0.37
PTOAS_SHA256: 60ddce76c69b6aba847f96dbbdbce2b3173cb5fb4143c6c5bd1a87a7176d8514
CMAKE_BUILD_PARALLEL_LEVEL: 16
CMAKE_C_COMPILER_LAUNCHER: ccache
CMAKE_CXX_COMPILER_LAUNCHER: ccache
Expand Down Expand Up @@ -264,8 +264,8 @@ jobs:
runs-on: ubuntu-latest
env:
PTOAS_ROOT: ${{ github.workspace }}/ptoas-bin
PTOAS_VERSION: v0.36
PTOAS_SHA256: 07bfedea5a9ba70266925ead70d87d129fc143f4e9ea280651d28cb2942a1055
PTOAS_VERSION: v0.37
PTOAS_SHA256: a4f52a6f2088e451ebb06a783141d556001d320df0a8081f09e4d68f361a12c2
container:
image: ghcr.io/hw-native-sys/pypto/github-ci:latest
steps:
Expand Down
21 changes: 21 additions & 0 deletions docs/en/dev/ir/05-operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,27 @@ class CrossCoreExample:

See [TPUSH/TPOP ISA Reference](../../reference/pto-isa/01-tpush_tpop.md) and [Buffer Management](../../reference/pto-isa/02-buffer_management.md) for hardware details.

### Cross-Rank Signal Operations

| Operation | Args | Description | Kwargs |
| --------- | ---- | ----------- | ------ |
| `tile.comm_notify` | 2 (signal, value) | Write or atomic-add an INT32 value into a remote rank's signal slot | `op` (`"atomic_add"` or `"set"`) |
| `tile.comm_wait` | 2 (signal, cmp_value) | Block until a local INT32 signal slot satisfies the given comparison | `cmp` (`"eq"`, `"ne"`, `"gt"`, `"ge"`, `"lt"`, `"le"`) |
| `tile.comm_test` | 2 (signal, cmp_value) | Non-blocking poll: returns BOOL = (local INT32 signal slot `<cmp>` cmp_value) | `cmp` (`"eq"`, `"ne"`, `"gt"`, `"ge"`, `"lt"`, `"le"`) |

For all three ops, `signal` is a 1-element INT32 tensor that views a GM signal slot. `tile.comm_notify` targets a remote rank's slot (typically obtained via `pl.import_peer_buffer`); `tile.comm_wait` / `tile.comm_test` poll the local rank's slot. The integer operand (`value` / `cmp_value`) is a Python int, `Scalar`, or `Expr`. They lower to `pto::comm::TNOTIFY` / `pto::comm::TWAIT` / `pto::comm::TTEST` on the AIV side. `tile.comm_test` returns `pl.Scalar[pl.BOOL]` (PTO `i1`); the others have no return value.

**Pipeline ordering note.** Cross-rank communication ops require pipe-level ordering between GM payload writes and signal writes (the cross-rank done-barrier pattern). Consistent with the rest of PyPTO, pipe synchronization is **not** inserted at the IR or codegen layer — it is the responsibility of the downstream PTOAS lowering. PyPTO users do not need to (and cannot) manually insert pipe barriers around `comm_notify` / `comm_wait` / `comm_test`.

```python
import pypto.language as pl

# inside an InCore function on AIV side:
pl.tile.comm_notify(remote_signal, 1, op="atomic_add") # producer side
pl.tile.comm_wait(local_signal, 1, cmp="ge") # consumer side (blocking)
ok = pl.tile.comm_test(local_signal, 1, cmp="ge") # consumer side (non-blocking, BOOL)
```

## File Organization

| Directory/File | Contents |
Expand Down
21 changes: 21 additions & 0 deletions docs/zh-cn/dev/ir/05-operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,27 @@ class CrossCoreExample:

参阅 [TPUSH/TPOP ISA 参考](../../reference/pto-isa/01-tpush_tpop.md) 和[缓冲区管理](../../reference/pto-isa/02-buffer_management.md)了解硬件细节。

### 跨 Rank 信号操作

| 操作 | 参数 | 说明 | Kwargs |
| ---- | ---- | ---- | ------ |
| `tile.comm_notify` | 2 (signal, value) | 向远端 rank 信号槽写入或原子加 INT32 值 | `op`(`"atomic_add"` 或 `"set"`) |
| `tile.comm_wait` | 2 (signal, cmp_value) | 阻塞直至本地 INT32 信号槽满足给定比较 | `cmp`(`"eq"`、`"ne"`、`"gt"`、`"ge"`、`"lt"`、`"le"`) |
| `tile.comm_test` | 2 (signal, cmp_value) | 非阻塞轮询:返回 BOOL = (本地 INT32 信号槽 `<cmp>` cmp_value) | `cmp`(`"eq"`、`"ne"`、`"gt"`、`"ge"`、`"lt"`、`"le"`) |

三个 op 的 `signal` 都是一个 1 元素 INT32 tensor,视图指向 GM 中的信号槽:`tile.comm_notify` 写远端 rank 的槽(通常通过 `pl.import_peer_buffer` 获取),`tile.comm_wait` / `tile.comm_test` 轮询本地 rank 的槽。整数操作数(`value` / `cmp_value`)可以是 Python `int`、`Scalar` 或 `Expr`。在 AIV 侧分别 lowering 为 `pto::comm::TNOTIFY` / `pto::comm::TWAIT` / `pto::comm::TTEST`。`tile.comm_test` 返回 `pl.Scalar[pl.BOOL]`(PTO `i1`),其余两者无返回值。

**流水排序说明。** 跨 rank 通信 op 需要在 GM payload 写与 signal 写之间保证 pipe 级别的顺序(即跨 rank done-barrier 模式)。与 PyPTO 其它部分一致,pipe 同步**不在** IR 或 codegen 层插入,而由下游的 PTOAS 在 lowering 阶段处理。PyPTO 用户无需(也无法)手工在 `comm_notify` / `comm_wait` / `comm_test` 周围插入 pipe-barrier。

```python
import pypto.language as pl

# 在 AIV 侧 InCore 函数内部:
pl.tile.comm_notify(remote_signal, 1, op="atomic_add") # 生产者
pl.tile.comm_wait(local_signal, 1, cmp="ge") # 消费者(阻塞)
ok = pl.tile.comm_test(local_signal, 1, cmp="ge") # 消费者(非阻塞,返回 BOOL)
```

## 文件组织

| 目录/文件 | 内容 |
Expand Down
66 changes: 66 additions & 0 deletions python/pypto/ir/op/tile_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,72 @@ def tpop_from_aiv(
return _ir_core.create_op_call("tile.tpop_from_aiv", [], kwargs, actual_span)


_NOTIFY_OPS = ("atomic_add", "set")


def comm_notify(signal: Expr, value: Expr, *, op: str, span: Span | None = None) -> Call:
"""Send a flag notification to a remote rank's signal slot.

Lowers to ``pto::comm::TNOTIFY`` via PTOAS ``pto.comm.tnotify``. The
signal is a 1-element INT32 Tensor (GM) that views the destination rank's
signal location in its HCCL window — typically obtained via
``pl.import_peer_buffer``.

Args:
signal: Destination signal tensor (1-element INT32) in remote rank's window
value: INT32 scalar value to write or atomic-add
op: Notify operation, ``"atomic_add"`` or ``"set"``
span: Optional source span
"""
if op not in _NOTIFY_OPS:
raise ValueError(f"tile.comm_notify: op must be one of {_NOTIFY_OPS}, got {op!r}")
actual_span = _get_span_or_capture(span, frame_offset=1)
return _ir_core.create_op_call("tile.comm_notify", [signal, value], {"op": op}, actual_span)


_WAIT_CMPS = ("eq", "ne", "gt", "ge", "lt", "le")


def comm_wait(signal: Expr, cmp_value: Expr, *, cmp: str, span: Span | None = None) -> Call:
"""Block until a local INT32 signal slot satisfies a comparison.

Lowers to ``pto::comm::TWAIT`` via PTOAS ``pto.comm.twait``. The signal
is a 1-element INT32 Tensor (GM) in the local rank's window — the slot
peers ``tile.comm_notify`` into.

Args:
signal: Local signal tensor (1-element INT32) to poll
cmp_value: INT32 scalar comparison value
cmp: Comparison predicate, one of ``"eq"`` | ``"ne"`` | ``"gt"`` |
``"ge"`` | ``"lt"`` | ``"le"``
span: Optional source span
"""
if cmp not in _WAIT_CMPS:
raise ValueError(f"tile.comm_wait: cmp must be one of {_WAIT_CMPS}, got {cmp!r}")
actual_span = _get_span_or_capture(span, frame_offset=1)
return _ir_core.create_op_call("tile.comm_wait", [signal, cmp_value], {"cmp": cmp}, actual_span)


def comm_test(signal: Expr, cmp_value: Expr, *, cmp: str, span: Span | None = None) -> Call:
"""Non-blocking poll of a local INT32 signal slot, returning a BOOL.

Lowers to ``pto::comm::TTEST`` via PTOAS ``pto.comm.ttest``. Same operand
shape as :func:`comm_wait`, but does not block — the result is BOOL and
equals ``signal <cmp> cmp_value``.

Args:
signal: Local signal tensor (1-element INT32) to poll
cmp_value: INT32 scalar comparison value
cmp: Comparison predicate, one of ``"eq"`` | ``"ne"`` | ``"gt"`` |
``"ge"`` | ``"lt"`` | ``"le"``
span: Optional source span
"""
if cmp not in _WAIT_CMPS:
raise ValueError(f"tile.comm_test: cmp must be one of {_WAIT_CMPS}, got {cmp!r}")
actual_span = _get_span_or_capture(span, frame_offset=1)
return _ir_core.create_op_call("tile.comm_test", [signal, cmp_value], {"cmp": cmp}, actual_span)


# ============================================================================
# Sorting Operations
# ============================================================================
Expand Down
121 changes: 119 additions & 2 deletions python/pypto/language/op/system_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

from pypto.ir.op import system_ops as _ir_ops
from pypto.ir.op import tile_ops as _ir_tile_ops
from pypto.ir.op.system_ops import (
AUTO,
aic_initialize_pipe,
Expand All @@ -26,9 +27,9 @@
sync_src,
)
from pypto.pypto_core import DataType
from pypto.pypto_core.ir import Call, Span
from pypto.pypto_core.ir import Call, ConstInt, Expr, Span

from ..typing import Scalar, Tile
from ..typing import Scalar, Tensor, Tile

__all__ = [
"AUTO",
Expand All @@ -47,6 +48,9 @@
"import_peer_buffer",
"tfree_to_aic",
"tfree_to_aiv",
"comm_notify",
"comm_wait",
"comm_test",
]


Expand Down Expand Up @@ -143,3 +147,116 @@ def import_peer_buffer(*, name: str, peer_func: str, span: Span | None = None) -
"""
call = _ir_ops.import_peer_buffer(name=name, peer_func=peer_func, span=span)
return Scalar(DataType.INT32, call)


def _value_to_int32_expr(value: int | Scalar | Expr, arg_name: str) -> Expr:
"""Coerce an ``int | Scalar | Expr`` argument to an INT32 ``Expr``.

Frontend callers can pass any of the three forms; the IR binding expects a
single ``Expr`` whose ScalarType dtype is ``INT32``. The DSL parser turns
literal Python ints into ``ConstInt`` with ``DataType.INDEX`` by default,
so an integer constant arriving here is rewrapped as ``INT32`` to satisfy
the IR-level contract of ``tile.comm_notify`` / ``tile.comm_wait`` /
``tile.comm_test``. Non-constant ``Expr`` and ``Scalar`` values are
passed through unchanged.
"""
if isinstance(value, Scalar):
return value.unwrap()
if isinstance(value, ConstInt):
return ConstInt(int(value.value), DataType.INT32, value.span or Span.unknown())
if isinstance(value, Expr):
return value
if isinstance(value, int) and not isinstance(value, bool):
return ConstInt(value, DataType.INT32, Span.unknown())
raise TypeError(f"Argument '{arg_name}' must be int, pl.Scalar, or pl.Expr, got {type(value).__name__}")


def comm_notify(
signal: Tensor,
value: int | Scalar | Expr,
*,
op: str,
span: Span | None = None,
) -> Call:
"""Send a flag notification to a remote rank's signal slot.

Lowers to ``pto::comm::TNOTIFY`` via PTOAS ``pto.comm.tnotify``. The
signal is a 1-element INT32 ``pl.Tensor`` (GM) that views the destination
rank's signal location in its HCCL window — typically obtained via
:func:`import_peer_buffer`.

Note:
Cross-rank communication ops require pipe-level ordering between GM
payload writes and signal writes (the cross-rank done-barrier
pattern). Consistent with the rest of PyPTO, pipe synchronization is
**not** inserted at the IR or codegen layer — it is the
responsibility of the downstream PTOAS lowering. Users do not need
to (and cannot) manually insert pipe barriers around ``comm_notify``.

Args:
signal: Destination signal tensor (1-element INT32) in remote rank's window.
value: INT32 scalar value to write or atomic-add (Python int, Scalar, or Expr).
op: Notify operation, ``"atomic_add"`` or ``"set"``.
span: Optional source span.

Returns:
The IR ``Call`` for ``tile.comm_notify`` (used for its side effect; no return value).
"""
return _ir_tile_ops.comm_notify(signal.unwrap(), _value_to_int32_expr(value, "value"), op=op, span=span)


def comm_wait(
signal: Tensor,
cmp_value: int | Scalar | Expr,
*,
cmp: str,
span: Span | None = None,
) -> Call:
"""Block until a local INT32 signal slot satisfies a comparison.

Lowers to ``pto::comm::TWAIT`` via PTOAS ``pto.comm.twait``. The signal
is a 1-element INT32 ``pl.Tensor`` (GM) in the local rank's window — the
slot peers ``pl.tile.comm_notify`` into.

Args:
signal: Local signal tensor (1-element INT32) to poll.
cmp_value: INT32 scalar comparison value (Python int, Scalar, or Expr).
cmp: Comparison predicate, one of ``"eq"`` | ``"ne"`` | ``"gt"`` |
``"ge"`` | ``"lt"`` | ``"le"``.
span: Optional source span.

Returns:
The IR ``Call`` for ``tile.comm_wait`` (used for its side effect; no return value).
"""
return _ir_tile_ops.comm_wait(
signal.unwrap(), _value_to_int32_expr(cmp_value, "cmp_value"), cmp=cmp, span=span
)


def comm_test(
signal: Tensor,
cmp_value: int | Scalar | Expr,
*,
cmp: str,
span: Span | None = None,
) -> Scalar:
"""Non-blocking poll of a local INT32 signal slot, returning a BOOL Scalar.

Lowers to ``pto::comm::TTEST`` via PTOAS ``pto.comm.ttest``. Same operand
shape as :func:`comm_wait`, but does not block — the result is
``pl.Scalar[pl.BOOL]`` and equals ``signal <cmp> cmp_value``.

Args:
signal: Local signal tensor (1-element INT32) to poll.
cmp_value: INT32 scalar comparison value (Python int, Scalar, or Expr).
cmp: Comparison predicate, one of ``"eq"`` | ``"ne"`` | ``"gt"`` |
``"ge"`` | ``"lt"`` | ``"le"``.
span: Optional source span.

Returns:
``pl.Scalar[pl.BOOL]`` wrapping the ``tile.comm_test`` IR call (PTO ``... -> i1``).
"""
call = _ir_tile_ops.comm_test(
signal.unwrap(), _value_to_int32_expr(cmp_value, "cmp_value"), cmp=cmp, span=span
)
return Scalar(DataType.BOOL, call)
6 changes: 6 additions & 0 deletions python/pypto/language/op/tile_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@
"tpush_to_aic",
"tpop_from_aic",
"tpop_from_aiv",
"comm_notify",
"comm_wait",
"comm_test",
"sort32",
"gather",
"gather_mask",
Expand All @@ -139,6 +142,9 @@

from ..typing import IntLike, Scalar, Tensor, Tile
from .system_ops import ( # noqa: F401
comm_notify,
comm_test,
comm_wait,
tpop_from_aic,
tpop_from_aiv,
tpush_to_aic,
Expand Down
Loading
Loading