Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 110 additions & 135 deletions python/simpler/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,54 +274,36 @@ def _ensure_prepared(cw, registry, prepared, cid: int, *, lazy: bool, device_id:
prepared.add(cid)


def _chip_process_loop(
def _run_chip_main_loop(
cw: ChipWorker,
buf: memoryview,
bins,
mailbox_addr: int,
state_addr: int,
device_id: int,
registry: dict,
log_level: int = 1,
log_info_v: int = 5,
*,
on_task_done_success=None,
) -> None:
"""Runs in forked child process. Loads host_runtime.so in own address space.

Reads the unified mailbox layout (same offsets as _sub_worker_loop, but
this loop also consumes config fields + args_blob).

`log_level` / `log_info_v` are the parent's snapshot of the simpler logger
(computed via `_log.get_current_config()`); the child cannot read the
parent's logger after fork, so the values are passed explicitly.

Per-callable_id dispatch: TASK_READY carries a cid in OFF_CALLABLE; the
child looks the cid up in the COW-inherited Python ``registry`` to get
the ChipCallable, calls ``cw.prepare_callable(cid, callable)`` once,
then ``cw.run_prepared(cid, args, cfg)``. ``_CTRL_PREPARE`` is the
explicit pre-warm path (parent pushes after init() to amortise the
first H2D upload).
"""Unified TASK_READY / CONTROL_REQUEST / SHUTDOWN state machine.

Used by both ``_chip_process_loop`` (no extra side effects on task
success) and ``_chip_process_loop_with_bootstrap`` (flushes
``store_to_host`` buffers before publishing TASK_DONE).

`on_task_done_success`, if provided, is invoked after a successful
``run_prepared_from_blob`` and before publishing TASK_DONE. It must
return ``(code, msg)`` — typically ``(0, "")`` on success, or an
error tuple if the hook itself failed (e.g. D2H staging error).
Returning a non-zero code overrides the kernel's success.

Per-callable_id dispatch: TASK_READY carries a cid in OFF_CALLABLE;
the child looks the cid up in the COW-inherited Python ``registry``
to get the ChipCallable, calls ``cw.prepare_callable(cid, callable)``
once, then ``cw.run_prepared(cid, args, cfg)``. ``_CTRL_PREPARE`` is
the explicit pre-warm path (parent pushes after init() to amortise
the first H2D upload); TASK_READY also lazy-prepares as a safety net.
"""
import traceback as _tb # noqa: PLC0415

try:
cw = ChipWorker()
cw.init(device_id, bins, log_level=log_level, log_info_v=log_info_v)
except Exception as e:
_tb.print_exc()
# Write the message so any parent reader that *does* inspect this
# path sees the real cause. State handshake for this init-time
# failure is broken — see KNOWN_ISSUES.md — and that is not part
# of the L4 scope.
_write_error(buf, 1, _format_exc(f"chip_process dev={device_id} init", e))
return

mailbox_addr = ctypes.addressof(ctypes.c_char.from_buffer(buf))
state_addr = mailbox_addr + _OFF_STATE
sys.stderr.write(f"[chip_process pid={os.getpid()} dev={device_id}] ready\n")
sys.stderr.flush()

# Per-child set of cids already prepared on this device. The parent
# pre-warms via _CTRL_PREPARE, but TASK_READY also lazy-prepares as a
# safety net (e.g. registrations that bypassed the prefetch path).
prepared: set[int] = set()

while True:
state = _mailbox_load_i32(state_addr)
if state == _TASK_READY:
Expand All @@ -341,6 +323,15 @@ def _chip_process_loop(
except Exception as e: # noqa: BLE001
code = 1
msg = _format_exc(f"chip_process dev={device_id}", e)

# On a successful kernel run, give the caller a chance to do
# post-run work (e.g. store_to_host D2H staging) before the
# parent sees TASK_DONE. The kernel's failure path skips the
# hook because the device output region is undefined and
# staging garbage would mask the real error in post-mortems.
if code == 0 and on_task_done_success is not None:
code, msg = on_task_done_success()

_write_error(buf, code, msg)
_mailbox_store_i32(state_addr, _TASK_DONE)
elif state == _CONTROL_REQUEST:
Expand Down Expand Up @@ -374,11 +365,52 @@ def _chip_process_loop(
_write_error(buf, code, msg)
_mailbox_store_i32(state_addr, _CONTROL_DONE)
elif state == _SHUTDOWN:
cw.finalize()
break


def _chip_process_loop_with_bootstrap( # noqa: PLR0912, PLR0915
def _chip_process_loop(
buf: memoryview,
bins,
device_id: int,
registry: dict,
log_level: int = 1,
log_info_v: int = 5,
) -> None:
"""Runs in forked child process. Loads host_runtime.so in own address space.

`log_level` / `log_info_v` are the parent's snapshot of the simpler logger
(computed via `_log.get_current_config()`); the child cannot read the
parent's logger after fork, so the values are passed explicitly.

The main loop is delegated to ``_run_chip_main_loop`` — see its docstring
for the TASK_READY / CONTROL_REQUEST / SHUTDOWN state machine.
"""
import traceback as _tb # noqa: PLC0415

try:
cw = ChipWorker()
cw.init(device_id, bins, log_level=log_level, log_info_v=log_info_v)
except Exception as e:
_tb.print_exc()
# Write the message so any parent reader that *does* inspect this
# path sees the real cause. State handshake for this init-time
# failure is broken — see KNOWN_ISSUES.md — and that is not part
# of the L4 scope.
_write_error(buf, 1, _format_exc(f"chip_process dev={device_id} init", e))
return

mailbox_addr = ctypes.addressof(ctypes.c_char.from_buffer(buf))
state_addr = mailbox_addr + _OFF_STATE
sys.stderr.write(f"[chip_process pid={os.getpid()} dev={device_id}] ready\n")
sys.stderr.flush()

try:
_run_chip_main_loop(cw, buf, mailbox_addr, state_addr, device_id, registry)
finally:
cw.finalize()


def _chip_process_loop_with_bootstrap(
buf: memoryview,
bins,
device_id: int,
Expand All @@ -393,12 +425,12 @@ def _chip_process_loop_with_bootstrap( # noqa: PLR0912, PLR0915

The child constructs its own ``ChipBootstrapChannel`` wrapping the
pre-fork shared-memory region, calls ``bootstrap_context`` (which
publishes SUCCESS/ERROR on the channel), and on success enters the same
task / control polling loop as ``_chip_process_loop``. On any failure
before the main loop starts, the channel has already been written by the
callee and the function returns — the ``os._exit(0)`` in the fork
branch reaps the process without an extra non-zero exit code that would
confuse the parent's ``waitpid`` teardown.
publishes SUCCESS/ERROR on the channel), and on success enters the
shared task / control polling loop. On any failure before the main
loop starts, the channel has already been written by the callee and
the function returns — the ``os._exit(0)`` in the fork branch reaps
the process without an extra non-zero exit code that would confuse
the parent's ``waitpid`` teardown.
"""
channel = ChipBootstrapChannel(bootstrap_mailbox_addr, max_buffer_count)

Expand Down Expand Up @@ -427,101 +459,44 @@ def _chip_process_loop_with_bootstrap( # noqa: PLR0912, PLR0915
# buffer with store_to_host=True. Processed after every task completion
# so the parent can read results from SharedMemory without a cross-fork
# host-pointer copy_from (which is broken across processes).
_store_to_host: list[tuple[int, object]] = []
store_to_host: list[tuple[int, object]] = []
for spec, ptr in zip(bootstrap_cfg.buffers, result.buffer_ptrs):
if spec.store_to_host:
_store_to_host.append((ptr, bootstrap_cfg.output_staging(spec.name)))
store_to_host.append((ptr, bootstrap_cfg.output_staging(spec.name)))

mailbox_addr = ctypes.addressof(ctypes.c_char.from_buffer(buf))
state_addr = mailbox_addr + _OFF_STATE
sys.stderr.write(f"[chip_process pid={os.getpid()} dev={device_id} bootstrap] ready\n")
sys.stderr.flush()

# Per-child set of cids already prepared on this device. Mirrors
# `_chip_process_loop`'s `prepared`.
prepared: set[int] = set()
def flush_store_to_host() -> tuple[int, str]:
# Runs *before* publishing TASK_DONE so the parent cannot observe
# the mailbox transition (and start reading the output
# SharedMemory) while the D2H DMA is still in flight.
for dev_ptr, staging in store_to_host:
# Skip zero-byte stagings up-front — mirrors the
# load_from_host H2D path in task_interface.py and avoids a
# spurious ValueError from ``ctypes.c_char.from_buffer`` on
# an empty buffer.
if staging.size == 0:
continue
try:
shm = SharedMemory(name=staging.shm_name)
try:
shm_buf = shm.buf
assert shm_buf is not None
host_ptr = ctypes.addressof(ctypes.c_char.from_buffer(shm_buf))
cw.copy_from(host_ptr, dev_ptr, staging.size)
finally:
shm.close()
except Exception as e: # noqa: BLE001
return 1, _format_exc(f"chip_process dev={device_id} store_to_host={staging.name!r}", e)
return 0, ""

try:
while True:
state = _mailbox_load_i32(state_addr)
if state == _TASK_READY:
cid = int(struct.unpack_from("Q", buf, _OFF_CALLABLE)[0]) & 0xFFFFFFFF
cfg = _read_config_from_mailbox(buf)

code = 0
msg = ""
try:
_ensure_prepared(cw, registry, prepared, cid, lazy=True, device_id=device_id)
# Hand the mailbox bytes straight to C++ (zero-copy zero-decode);
# see the matching comment in `_chip_process_loop`.
cw._impl.run_prepared_from_blob(cid, mailbox_addr + _OFF_ARGS, _MAILBOX_ARGS_CAPACITY, cfg)
except Exception as e: # noqa: BLE001
code = 1
msg = _format_exc(f"chip_process dev={device_id}", e)

# Flush store_to_host buffers *before* publishing TASK_DONE so
# the parent cannot observe the mailbox transition (and start
# reading the output SharedMemory) while the D2H DMA is still
# in flight. Only flush on a successful kernel run: on
# failure the device output region is undefined and stamping
# garbage into the parent's SharedMemory would mask the real
# error in any post-mortem.
if code == 0:
for dev_ptr, staging in _store_to_host:
# Skip zero-byte stagings up-front — mirrors the
# load_from_host H2D path in task_interface.py and
# avoids a spurious ValueError from
# ``ctypes.c_char.from_buffer`` on an empty buffer.
if staging.size == 0:
continue
try:
shm = SharedMemory(name=staging.shm_name)
try:
shm_buf = shm.buf
assert shm_buf is not None
host_ptr = ctypes.addressof(ctypes.c_char.from_buffer(shm_buf))
cw.copy_from(host_ptr, dev_ptr, staging.size)
finally:
shm.close()
except Exception as e: # noqa: BLE001
code = 1
msg = _format_exc(f"chip_process dev={device_id} store_to_host={staging.name!r}", e)
break

_write_error(buf, code, msg)
_mailbox_store_i32(state_addr, _TASK_DONE)
elif state == _CONTROL_REQUEST:
sub_cmd = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0]
code = 0
msg = ""
try:
if sub_cmd == _CTRL_MALLOC:
size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0]
ptr = cw._impl.malloc(size)
struct.pack_into("Q", buf, _CTRL_OFF_RESULT, ptr)
elif sub_cmd == _CTRL_FREE:
ptr = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0]
cw._impl.free(ptr)
elif sub_cmd == _CTRL_COPY_TO:
dst = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0]
src = struct.unpack_from("Q", buf, _CTRL_OFF_ARG1)[0]
n = struct.unpack_from("Q", buf, _CTRL_OFF_ARG2)[0]
cw._impl.copy_to(dst, src, n)
elif sub_cmd == _CTRL_COPY_FROM:
dst = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0]
src = struct.unpack_from("Q", buf, _CTRL_OFF_ARG1)[0]
n = struct.unpack_from("Q", buf, _CTRL_OFF_ARG2)[0]
cw._impl.copy_from(dst, src, n)
elif sub_cmd == _CTRL_PREPARE:
cid = int(struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0]) & 0xFFFFFFFF
_ensure_prepared(cw, registry, prepared, cid, lazy=False, device_id=device_id)
except Exception as e: # noqa: BLE001
code = 1
msg = _format_exc(f"chip_process dev={device_id} ctrl={int(sub_cmd)}", e)
_write_error(buf, code, msg)
_mailbox_store_i32(state_addr, _CONTROL_DONE)
elif state == _SHUTDOWN:
break
_run_chip_main_loop(
cw, buf, mailbox_addr, state_addr, device_id, registry, on_task_done_success=flush_store_to_host
)
finally:
# Teardown contract: release the comm handle before finalize so HCCL
# state is torn down in LIFO order; the channel shm the parent may
Expand Down
Loading
Loading