Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,43 @@ struct PTO2SchedulerState {
// Ready queues remain global (scheduling is ring-agnostic)
PTO2ReadyQueue ready_queues[PTO2_NUM_RESOURCE_SHAPES];

bool enqueue_ready_once(PTO2TaskSlotState &slot_state, PTO2LocalReadyBuffer *local_bufs = nullptr) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (!slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
return false;
}

PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state);
}
return true;
}

#if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING
bool enqueue_ready_once(
PTO2TaskSlotState &slot_state, uint64_t &atomic_count, uint64_t &push_wait,
PTO2LocalReadyBuffer *local_bufs = nullptr
) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (!slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
atomic_count += 1; // failed CAS
return false;
}

atomic_count += 1; // successful CAS
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state, atomic_count, push_wait);
}
return true;
}
#endif

// Wiring subsystem — groups all wiring-related state for cache-line isolation.
//
// Three cache-line regions by writer:
Expand Down Expand Up @@ -680,11 +717,11 @@ struct PTO2SchedulerState {
int32_t init_rc = early_finished + 1;
int32_t new_rc = ws->fanin_refcount.fetch_add(init_rc, std::memory_order_acq_rel) + init_rc;
if (new_rc >= ws->fanin_count) {
ready_queues[static_cast<int32_t>(ws->active_mask.to_shape())].push(ws);
enqueue_ready_once(*ws);
}
} else {
ws->fanin_refcount.fetch_add(1, std::memory_order_acq_rel);
ready_queues[static_cast<int32_t>(ws->active_mask.to_shape())].push(ws);
enqueue_ready_once(*ws);
}

ws->dep_pool_mark = rss.dep_pool.top;
Expand Down Expand Up @@ -773,13 +810,7 @@ struct PTO2SchedulerState {
int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1;

if (new_refcount == slot_state.fanin_count) {
// Local-first: try per-CoreType thread-local buffer before global queue
// Route by active_mask: AIC-containing tasks → buf[0], AIV-only → buf[1]
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state);
}
return true;
return enqueue_ready_once(slot_state, local_bufs);
}
return false;
}
Expand All @@ -793,18 +824,7 @@ struct PTO2SchedulerState {
atomic_count += 1; // fanin_refcount.fetch_add

if (new_refcount == slot_state.fanin_count) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
atomic_count += 1; // CAS(task_state PENDING→READY)
// Local-first: try per-CoreType thread-local buffer before global queue
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state, atomic_count, push_wait);
}
return true;
}
return enqueue_ready_once(slot_state, atomic_count, push_wait, local_bufs);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ void SchedulerContext::drain_worker_dispatch(int32_t block_num) {
return;
}
PTO2ResourceShape shape = slot_state->active_mask.to_shape();
if (slot_state->next_block_idx == 0) {
PTO2TaskState expected = PTO2_TASK_READY;
slot_state->task_state.compare_exchange_strong(
expected, PTO2_TASK_RUNNING, std::memory_order_acq_rel, std::memory_order_acquire
);
}

for (int32_t t = 0; t < active_sched_threads_ && slot_state->next_block_idx < block_num; t++) {
auto valid = core_trackers_[t].get_idle_core_offset_states(shape);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@ void SchedulerContext::dispatch_block(
#endif
}

namespace {
inline void mark_task_running_on_first_dispatch(PTO2TaskSlotState &slot_state) {
if (slot_state.next_block_idx != 0) return;
PTO2TaskState expected = PTO2_TASK_READY;
slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_RUNNING, std::memory_order_acq_rel, std::memory_order_acquire
);
}
} // namespace

void SchedulerContext::dispatch_shape(
int32_t thread_idx, PTO2ResourceShape shape, CoreTracker::DispatchPhase phase, PTO2LocalReadyBuffer &local_buf,
CoreTracker &tracker, bool &entered_drain, bool &made_progress, bool &try_pushed
Expand Down Expand Up @@ -274,6 +284,7 @@ void SchedulerContext::dispatch_shape(
#if PTO2_SCHED_PROFILING
uint64_t t_setup_start = get_sys_cnt_aicpu();
#endif
mark_task_running_on_first_dispatch(*slot_state);
do {
auto core_offset = cores.pop_first();
dispatch_block(thread_idx, core_offset, *slot_state, shape, is_pending);
Expand Down Expand Up @@ -550,6 +561,12 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_
if (made_progress) {
idle_iterations = 0;
} else {
int32_t global_completed = completed_tasks_.load(std::memory_order_relaxed);
if (global_completed != last_progress_count) {
last_progress_count = global_completed;
idle_iterations = 0;
continue;
}
while (deferred_release_count > 0) {
#if PTO2_SCHED_PROFILING
int32_t fe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,43 @@ struct PTO2SchedulerState {
// Ready queues remain global (scheduling is ring-agnostic)
PTO2ReadyQueue ready_queues[PTO2_NUM_RESOURCE_SHAPES];

bool enqueue_ready_once(PTO2TaskSlotState &slot_state, PTO2LocalReadyBuffer *local_bufs = nullptr) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (!slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
return false;
}

PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state);
}
return true;
}

#if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING
bool enqueue_ready_once(
PTO2TaskSlotState &slot_state, uint64_t &atomic_count, uint64_t &push_wait,
PTO2LocalReadyBuffer *local_bufs = nullptr
) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (!slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
atomic_count += 1; // failed CAS
return false;
}

atomic_count += 1; // successful CAS
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state, atomic_count, push_wait);
}
return true;
}
#endif

// Wiring subsystem — groups all wiring-related state for cache-line isolation.
//
// Three cache-line regions by writer:
Expand Down Expand Up @@ -680,11 +717,11 @@ struct PTO2SchedulerState {
int32_t init_rc = early_finished + 1;
int32_t new_rc = ws->fanin_refcount.fetch_add(init_rc, std::memory_order_acq_rel) + init_rc;
if (new_rc >= ws->fanin_count) {
ready_queues[static_cast<int32_t>(ws->active_mask.to_shape())].push(ws);
enqueue_ready_once(*ws);
}
} else {
ws->fanin_refcount.fetch_add(1, std::memory_order_acq_rel);
ready_queues[static_cast<int32_t>(ws->active_mask.to_shape())].push(ws);
enqueue_ready_once(*ws);
}

ws->dep_pool_mark = rss.dep_pool.top;
Expand Down Expand Up @@ -773,13 +810,7 @@ struct PTO2SchedulerState {
int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1;

if (new_refcount == slot_state.fanin_count) {
// Local-first: try per-CoreType thread-local buffer before global queue
// Route by active_mask: AIC-containing tasks → buf[0], AIV-only → buf[1]
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state);
}
return true;
return enqueue_ready_once(slot_state, local_bufs);
}
return false;
}
Expand All @@ -793,18 +824,7 @@ struct PTO2SchedulerState {
atomic_count += 1; // fanin_refcount.fetch_add

if (new_refcount == slot_state.fanin_count) {
PTO2TaskState expected = PTO2_TASK_PENDING;
if (slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire
)) {
atomic_count += 1; // CAS(task_state PENDING→READY)
// Local-first: try per-CoreType thread-local buffer before global queue
PTO2ResourceShape shape = slot_state.active_mask.to_shape();
if (!local_bufs || !local_bufs[static_cast<int32_t>(shape)].try_push(&slot_state)) {
ready_queues[static_cast<int32_t>(shape)].push(&slot_state, atomic_count, push_wait);
}
return true;
}
return enqueue_ready_once(slot_state, atomic_count, push_wait, local_bufs);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ void SchedulerContext::drain_worker_dispatch(Runtime *runtime, int32_t block_num
return;
}
PTO2ResourceShape shape = slot_state->active_mask.to_shape();
if (slot_state->next_block_idx == 0) {
PTO2TaskState expected = PTO2_TASK_READY;
slot_state->task_state.compare_exchange_strong(
expected, PTO2_TASK_RUNNING, std::memory_order_acq_rel, std::memory_order_acquire
);
}

for (int32_t t = 0; t < active_sched_threads_ && slot_state->next_block_idx < block_num; t++) {
auto valid = core_trackers_[t].get_idle_core_offset_states(shape);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ void SchedulerContext::dispatch_block(
#endif
}

namespace {
inline void mark_task_running_on_first_dispatch(PTO2TaskSlotState &slot_state) {
if (slot_state.next_block_idx != 0) return;
PTO2TaskState expected = PTO2_TASK_READY;
slot_state.task_state.compare_exchange_strong(
expected, PTO2_TASK_RUNNING, std::memory_order_acq_rel, std::memory_order_acquire
);
}
} // namespace

void SchedulerContext::dispatch_shape(
Runtime *runtime, int32_t thread_idx, PTO2ResourceShape shape, CoreTracker::DispatchPhase phase,
PTO2LocalReadyBuffer &local_buf, CoreTracker &tracker, bool &entered_drain, bool &made_progress, bool &try_pushed
Expand Down Expand Up @@ -291,6 +301,7 @@ void SchedulerContext::dispatch_shape(
#if PTO2_SCHED_PROFILING
uint64_t t_setup_start = get_sys_cnt_aicpu();
#endif
mark_task_running_on_first_dispatch(*slot_state);
do {
auto core_offset = cores.pop_first();
dispatch_block(runtime, thread_idx, core_offset, *slot_state, shape, is_pending);
Expand Down Expand Up @@ -565,6 +576,12 @@ int32_t SchedulerContext::resolve_and_dispatch(Runtime *runtime, int32_t thread_
if (made_progress) {
idle_iterations = 0;
} else {
int32_t global_completed = completed_tasks_.load(std::memory_order_relaxed);
if (global_completed != last_progress_count) {
last_progress_count = global_completed;
idle_iterations = 0;
continue;
}
while (deferred_release_count > 0) {
#if PTO2_SCHED_PROFILING
int32_t fe =
Expand Down
17 changes: 7 additions & 10 deletions tests/ut/cpp/a2a3/test_task_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*
* This file focuses on:
* - Full lifecycle through src API
* - Non-profiling ready path behavior (task_state stays PENDING)
* - Non-profiling ready path behavior (task_state transitions to READY)
* - Double subtask completion (counter-model weakness)
*/

Expand Down Expand Up @@ -93,23 +93,20 @@ TEST_F(TaskStateTest, FullLifecycleThroughAPI) {
}

// =============================================================================
// Non-profiling release_fanin does not CAS task_state to READY.
// Non-profiling release_fanin transitions task_state to READY.
//
// Readiness is determined solely by fanin_refcount reaching fanin_count.
// task_state stays PENDING after the non-profiling ready path. This is
// correct by design -- the profiling overload adds the CAS only to count
// atomic operations.
// Readiness is detected by fanin_refcount reaching fanin_count, then recorded
// with a PENDING -> READY CAS so the ready notification is exactly once and the
// task state matches the scheduler state machine in both profiling modes.
// =============================================================================
TEST_F(TaskStateTest, NonProfilingReadyPathStaysPending) {
TEST_F(TaskStateTest, NonProfilingReadyPathMarksReady) {
alignas(64) PTO2TaskSlotState slot;
init_slot(slot, PTO2_TASK_PENDING, 1, 1);

bool ready = sched.release_fanin_and_check_ready(slot);
ASSERT_TRUE(ready) << "Task should be detected as ready via refcount";

// task_state remains PENDING -- this is correct by design.
EXPECT_EQ(slot.task_state.load(), PTO2_TASK_PENDING)
<< "Non-profiling path intentionally does not transition task_state to READY";
EXPECT_EQ(slot.task_state.load(), PTO2_TASK_READY) << "Ready path must publish READY state";
}

// =============================================================================
Expand Down
Loading