diff --git a/src/gpu/aggregate.cu b/src/gpu/aggregate.cu index eeac8f0..0236cd8 100644 --- a/src/gpu/aggregate.cu +++ b/src/gpu/aggregate.cu @@ -195,12 +195,10 @@ gather_batch_k(const void* __restrict__ d_compressed, extern "C" int aggregate_batch_slot_init(struct aggregate_slot* slot, - uint64_t batch_chunk_count, uint64_t batch_covering_count, size_t comp_pool_bytes) { uint64_t C = batch_covering_count; - (void)batch_chunk_count; CHECK(Error, slot); memset(slot, 0, sizeof(*slot)); diff --git a/src/gpu/aggregate.h b/src/gpu/aggregate.h index 91c970d..874fd2c 100644 --- a/src/gpu/aggregate.h +++ b/src/gpu/aggregate.h @@ -39,7 +39,6 @@ extern "C" void aggregate_slot_destroy(struct aggregate_slot* slot); int aggregate_batch_slot_init(struct aggregate_slot* slot, - uint64_t batch_chunk_count, uint64_t batch_covering_count, size_t comp_pool_bytes); diff --git a/src/gpu/flush.compress_agg.c b/src/gpu/flush.compress_agg.c index 32ce1aa..f6de404 100644 --- a/src/gpu/flush.compress_agg.c +++ b/src/gpu/flush.compress_agg.c @@ -40,7 +40,6 @@ kick_compress(struct compress_agg_stage* stage, // --- Init / Destroy --- - int compress_agg_init(struct compress_agg_stage* stage, const struct computed_stream_layouts* cl, @@ -78,9 +77,9 @@ compress_agg_init(struct compress_agg_stage* stage, const int need_compressed = (stage->codec.type != CODEC_NONE); for (int fc = 0; fc < 2; ++fc) { if (need_compressed) - CU(Fail, - cuMemAlloc(&stage->d_compressed[fc], - M * stage->codec.max_output_size)); + CU( + Fail, + cuMemAlloc(&stage->d_compressed[fc], M * stage->codec.max_output_size)); CU(Fail, cuEventCreate(&stage->t_compress_start[fc], CU_EVENT_DEFAULT)); CU(Fail, cuEventCreate(&stage->t_compress_end[fc], CU_EVENT_DEFAULT)); CU(Fail, cuEventCreate(&stage->t_aggregate_end[fc], CU_EVENT_DEFAULT)); @@ -95,11 +94,11 @@ compress_agg_init(struct compress_agg_stage* stage, // them per-array. Each layout's GPU-side d_lifted_shape/strides are uploaded. for (int lv = 0; lv < cl->levels.nlod; ++lv) { stage->per_lod_agg_layouts[lv] = cl->per_level[lv].agg_layout; - CHECK(Fail, - aggregate_layout_upload(&stage->per_lod_agg_layouts[lv]) == 0); + CHECK(Fail, aggregate_layout_upload(&stage->per_lod_agg_layouts[lv]) == 0); } - // Cached max batch layout assuming each LOD fires its worst-case active count. + // Cached max batch layout assuming each LOD fires its worst-case active + // count. { uint32_t per_lod_max[LOD_MAX_LEVELS] = { 0 }; for (int lv = 0; lv < cl->levels.nlod; ++lv) @@ -124,17 +123,11 @@ compress_agg_init(struct compress_agg_stage* stage, // aggregate_batch_luts_unified). The CUB exclusive scan fills all // sentinel positions, so no per-LOD write_total fixup is needed. if (stage->max_total_batch_chunks > 0) { - // aggregate_batch_slot_init allocates (C+1) entries for d_offsets / - // d_permuted_sizes / h_offsets and C entries for h_permuted_sizes. We - // want all to fit total_batch_covering + nlod entries, so pass - // C = max_total_batch_covering + nlod (h_permuted_sizes ends up exactly - // matching the scan output length). const uint64_t C_max = stage->max_total_batch_covering + (uint64_t)stage->nlod; for (int fc = 0; fc < 2; ++fc) { CHECK(Fail, aggregate_batch_slot_init(&stage->agg[fc], - stage->max_total_batch_chunks, C_max, stage->max_total_data_bytes) == 0); CU(Fail, cuEventRecord(stage->agg[fc].ready, compute)); @@ -225,15 +218,13 @@ compress_agg_init(struct compress_agg_stage* stage, // Tail-carry buffer: total_shards * page_size bytes; uniform layout // across LODs (sink page size is uniform). if (stage->shards.page_size > 0) { - stage->shards.tail_carry_bytes = - total_shards * stage->shards.page_size; + stage->shards.tail_carry_bytes = total_shards * stage->shards.page_size; CU(Fail, cuMemAlloc(&stage->shards.d_tail_carry, stage->shards.tail_carry_bytes)); CU(Fail, - cuMemsetD8(stage->shards.d_tail_carry, - 0, - stage->shards.tail_carry_bytes)); + cuMemsetD8( + stage->shards.d_tail_carry, 0, stage->shards.tail_carry_bytes)); } } } @@ -241,8 +232,7 @@ compress_agg_init(struct compress_agg_stage* stage, // Per-LOD shard_state (writers + tail/footer pools + generation // bookkeeping). The unified kick/D2H path iterates this directly. for (int lv = 0; lv < cl->levels.nlod; ++lv) - CHECK(Fail, - init_shard_state(&stage->shard[lv], &cl->per_level[lv]) == 0); + CHECK(Fail, init_shard_state(&stage->shard[lv], &cl->per_level[lv]) == 0); // Seed events for (int fc = 0; fc < 2; ++fc) { @@ -368,13 +358,9 @@ int compress_agg_kick(struct compress_agg_stage* stage, const struct compress_agg_input* in, const struct level_geometry* levels, - const struct batch_state* batch, - const struct dim_info* dims, CUstream compress_stream, struct flush_handoff* out) { - (void)batch; - (void)dims; const int fc = in->fc; const uint32_t n_epochs = in->n_epochs; const uint8_t nlod = stage->nlod; @@ -400,28 +386,27 @@ compress_agg_kick(struct compress_agg_stage* stage, // Page size is uniform across LODs (sink-driven); read from LOD 0. struct batch_aggregate_layout layout; const size_t page_size = stage->per_lod_agg_layouts[0].page_size; - CHECK(Error, - batch_aggregate_layout_init(&layout, - stage->per_lod_agg_layouts, - per_lod_n_active, - nlod, - page_size) == 0); + CHECK( + Error, + batch_aggregate_layout_init( + &layout, stage->per_lod_agg_layouts, per_lod_n_active, nlod, page_size) == + 0); CHECK(Error, layout.total_data_bytes <= stage->max_total_data_bytes); CHECK(Error, layout.total_batch_chunks <= stage->max_total_batch_chunks); - CHECK(Error, - layout.total_batch_covering <= stage->max_total_batch_covering); + CHECK(Error, layout.total_batch_covering <= stage->max_total_batch_covering); // --- Phase 3: build & upload unified LUTs (cached in steady state) ------- // Cache key: per-LOD active count AND each LOD's pool_epoch values. Counts // alone are insufficient — the gather LUT encodes the actual epoch indices, // so two batches with identical counts but different active-epoch positions // (mid-stream phase shifts when K doesn't divide an LOD's append period) - // would mis-hit and reuse stale gather indices. See [[ok-let-s-make-a-curious-prism]]. - int lut_steady = stage->lut_cache_valid && - memcmp(stage->cached_per_lod_n_active, - per_lod_n_active, - (size_t)nlod * sizeof(uint32_t)) == 0; + // would mis-hit and reuse stale gather indices. See + // [[ok-let-s-make-a-curious-prism]]. + int lut_steady = + stage->lut_cache_valid && memcmp(stage->cached_per_lod_n_active, + per_lod_n_active, + (size_t)nlod * sizeof(uint32_t)) == 0; for (uint8_t lv = 0; lut_steady && lv < nlod; ++lv) { const uint32_t n_lv = per_lod_n_active[lv]; if (n_lv == 0) @@ -536,13 +521,13 @@ compress_agg_kick(struct compress_agg_stage* stage, out->active_levels_mask = in->active_levels_mask; out->batch_active_masks = in->batch_active_masks; out->nlod = nlod; - memcpy(out->per_lod_n_active, - per_lod_n_active, - (size_t)nlod * sizeof(uint32_t)); + memcpy( + out->per_lod_n_active, per_lod_n_active, (size_t)nlod * sizeof(uint32_t)); out->t_aggregate_end = stage->t_aggregate_end[fc]; out->t_compress_start = stage->t_compress_start[fc]; out->t_compress_end = stage->t_compress_end[fc]; out->max_output_size = stage->codec.max_output_size; + out->passthrough = (stage->codec.type == CODEC_NONE); out->agg = &stage->agg[fc]; out->layout = layout; out->per_lod_agg_layouts = stage->per_lod_agg_layouts; diff --git a/src/gpu/flush.compress_agg.h b/src/gpu/flush.compress_agg.h index ee6664f..e39eaf6 100644 --- a/src/gpu/flush.compress_agg.h +++ b/src/gpu/flush.compress_agg.h @@ -4,24 +4,18 @@ struct computed_stream_layouts; -// Initialize the compress+aggregate stage. Returns 0 on success. int compress_agg_init(struct compress_agg_stage* stage, const struct computed_stream_layouts* cl, const struct tile_stream_configuration* config, CUstream compute); -// Destroy the compress+aggregate stage. void compress_agg_destroy(struct compress_agg_stage* stage, int nlod); -// Kick compress+aggregate for a batch. Populates handoff for D2H stage. -// Returns 0 on success. int compress_agg_kick(struct compress_agg_stage* stage, const struct compress_agg_input* in, const struct level_geometry* levels, - const struct batch_state* batch, - const struct dim_info* dims, CUstream compress_stream, struct flush_handoff* out); diff --git a/src/gpu/flush.d2h_deliver.c b/src/gpu/flush.d2h_deliver.c index 63ad541..e12cbdb 100644 --- a/src/gpu/flush.d2h_deliver.c +++ b/src/gpu/flush.d2h_deliver.c @@ -9,6 +9,15 @@ #include +#define D2H_TRY(err_flag, name, call) \ + do { \ + CUresult _r = (call); \ + if (_r != CUDA_SUCCESS) { \ + handle_curesult(LOG_ERROR, _r, __FILE__, __LINE__, (name)); \ + (err_flag) = 1; \ + } \ + } while (0) + // --- Init / Destroy --- int @@ -21,8 +30,10 @@ d2h_deliver_init(struct d2h_deliver_stage* stage, for (int fc = 0; fc < 2; ++fc) { CU(Fail, cuEventCreate(&stage->t_d2h_start[fc], CU_EVENT_DEFAULT)); + CU(Fail, cuEventCreate(&stage->h_chunk_index_ready[fc], CU_EVENT_DEFAULT)); CU(Fail, cuEventCreate(&stage->ready[fc], CU_EVENT_DEFAULT)); CU(Fail, cuEventRecord(stage->t_d2h_start[fc], compute)); + CU(Fail, cuEventRecord(stage->h_chunk_index_ready[fc], compute)); CU(Fail, cuEventRecord(stage->ready[fc], compute)); } @@ -40,6 +51,7 @@ d2h_deliver_destroy(struct d2h_deliver_stage* stage) return; for (int fc = 0; fc < 2; ++fc) { cu_event_destroy(stage->t_d2h_start[fc]); + cu_event_destroy(stage->h_chunk_index_ready[fc]); cu_event_destroy(stage->ready[fc]); } } @@ -138,11 +150,8 @@ record_flush_metrics(const struct flush_handoff* handoff, handoff->t_aggregate_end, agg_bytes, agg_bytes); - accumulate_metric_cu(&metrics->d2h, - t_d2h_start, - t_d2h_ready, - agg_bytes, - agg_bytes); + accumulate_metric_cu( + &metrics->d2h, t_d2h_start, t_d2h_ready, agg_bytes, agg_bytes); } } @@ -157,14 +166,54 @@ lod_view(const struct flush_handoff* handoff, uint8_t lv, size_t data_base) struct aggregate_result ar = { .data = (uint8_t*)slot->h_aggregated + data_base, .offsets = slot->h_offsets + seg->batch_covering_offset + lv, - .chunk_sizes = - slot->h_permuted_sizes + seg->batch_covering_offset + lv, + .chunk_sizes = slot->h_permuted_sizes + seg->batch_covering_offset + lv, }; return ar; } -// Sync on ready[fc] (near-zero in steady state), record metrics, deliver -// to sinks. +static int +poll_event(CUevent ev, int fc) +{ + for (;;) { + CUresult r = cuEventQuery(ev); + if (r == CUDA_SUCCESS) + return 0; + if (r == CUDA_ERROR_DEINITIALIZED) { + log_debug("d2h_deliver: cuEventQuery returned DEINITIALIZED (fc=%d)", fc); + return 0; + } + if (r != CUDA_ERROR_NOT_READY) { + handle_curesult(LOG_ERROR, r, __FILE__, __LINE__, "cuEventQuery"); + return 1; + } + platform_sleep_ns(50000); + } +} + +// h_offsets must be pre-rebase (absolute, slot-relative) values here. +static int +lod_actual_bytes(const struct flush_handoff* handoff, + uint8_t lv, + size_t* out_bytes) +{ + *out_bytes = 0; + const struct lod_segment* seg = &handoff->layout.lods[lv]; + if (seg->n_active == 0) + return 0; + const struct aggregate_slot* slot = handoff->agg; + const uint64_t total = (uint64_t)seg->n_active * seg->covering_count; + const size_t last = seg->batch_covering_offset + (size_t)lv + total - 1; + const size_t end = slot->h_offsets[last] + slot->h_permuted_sizes[last]; + CHECK(Error, slot->h_offsets[last] >= seg->data_segment_offset); + const size_t actual = end - seg->data_segment_offset; + CHECK(Error, actual <= seg->data_segment_bytes); + *out_bytes = actual; + return 0; + +Error: + return 1; +} + static struct writer_result sync_and_deliver(struct d2h_deliver_stage* stage, const struct flush_handoff* handoff, @@ -175,9 +224,12 @@ sync_and_deliver(struct d2h_deliver_stage* stage, struct shard_sink* sink, const struct lod_state* lod, const struct lod_shared_state* lod_shared, - struct stream_metrics* metrics) + struct stream_metrics* metrics, + CUstream d2h_stream) { const int fc = handoff->fc; + struct aggregate_slot* slot = handoff->agg; + const struct batch_aggregate_layout* alayout = &handoff->layout; if (sink->has_error && sink->has_error(sink)) goto Error; @@ -185,21 +237,59 @@ sync_and_deliver(struct d2h_deliver_stage* stage, { struct platform_clock kick_clk = { 0 }; platform_toc(&kick_clk); - for (;;) { - CUresult r = cuEventQuery(stage->ready[fc]); - if (r == CUDA_SUCCESS) - break; - if (r == CUDA_ERROR_DEINITIALIZED) { - log_debug("d2h_deliver: cuEventQuery returned DEINITIALIZED (fc=%d)", - fc); - break; - } - if (r != CUDA_ERROR_NOT_READY) { - handle_curesult(LOG_ERROR, r, __FILE__, __LINE__, "cuEventQuery"); + + if (handoff->passthrough) { + if (poll_event(stage->ready[fc], fc)) goto Error; + } else { + if (poll_event(stage->h_chunk_index_ready[fc], fc)) + goto Error; + + int dispatch_err = 0; + if (alayout->page_size > 0) { + for (uint8_t lv = 0; lv < handoff->nlod && !dispatch_err; ++lv) { + if (handoff->per_lod_n_active[lv] == 0) + continue; + const struct lod_segment* seg = &alayout->lods[lv]; + size_t actual = 0; + if (lod_actual_bytes(handoff, lv, &actual)) + goto Error; + if (actual == 0) + continue; + D2H_TRY(dispatch_err, + "cuMemcpyDtoHAsync", + cuMemcpyDtoHAsync( + (uint8_t*)slot->h_aggregated + seg->data_segment_offset, + (CUdeviceptr)slot->d_aggregated + seg->data_segment_offset, + actual, + d2h_stream)); + } + } else if (alayout->total_batch_covering > 0) { + const size_t n = alayout->total_batch_covering + (size_t)handoff->nlod; + const size_t total = + slot->h_offsets[n - 1] + slot->h_permuted_sizes[n - 1]; + if (total > 0) + D2H_TRY(dispatch_err, + "cuMemcpyDtoHAsync", + cuMemcpyDtoHAsync(slot->h_aggregated, + (CUdeviceptr)slot->d_aggregated, + total, + d2h_stream)); } - platform_sleep_ns(50000); + + // Always record completion events, even if the D2H dispatch above + // failed: cap-stacking waiters block on slot->ready and would hang + // otherwise. Record-on-error is harmless because the stream is + // already in an error state and the next op will short-circuit. + CU(Error, cuEventRecord(stage->ready[fc], d2h_stream)); + CU(Error, cuEventRecord(slot->ready, d2h_stream)); + + if (dispatch_err) + goto Error; + if (poll_event(stage->ready[fc], fc)) + goto Error; } + float kick_ms = platform_toc(&kick_clk) * 1000.0f; accumulate_metric_ms(&metrics->kick_sync_stall, kick_ms, 0, 0); } @@ -326,7 +416,6 @@ maybe_update_metadata(const struct flush_handoff* handoff, struct shard_sink* sink, struct platform_clock* metadata_update_clock) { - (void)config; if (!sink->update_append) return 0; @@ -357,61 +446,59 @@ maybe_update_metadata(const struct flush_handoff* handoff, int d2h_deliver_kick(struct d2h_deliver_stage* stage, const struct flush_handoff* handoff, - const struct level_geometry* levels, - const struct batch_state* batch, - const struct dim_info* dims, struct shard_sink* sink, CUstream d2h_stream) { - (void)levels; - (void)batch; - (void)dims; const int fc = handoff->fc; struct aggregate_slot* slot = handoff->agg; const struct batch_aggregate_layout* layout = &handoff->layout; - // Block on the prior IO retiring this slot (single fence across all LODs). wait_io_fences(slot, sink, stage->metrics); - // d2h stream waits for aggregate to finish writing the unified buffer. CU(Error, cuStreamWaitEvent(d2h_stream, handoff->t_aggregate_end, 0)); CU(Error, cuEventRecord(stage->t_d2h_start[fc], d2h_stream)); - // One D2H for the unified offsets array (covers all LOD ranges). The - // unified prefix-sum produces total_batch_covering + nlod offsets — each - // LOD's tail sentinel sits at position - // (batch_covering_offset + n_active*covering + lv), all within the scan's - // output range. + // Compressed codecs use these in drain to size exact per-LOD transfers; + // pass-through copies them too so delivery's chunk-index walk is uniform. + int dispatch_err = 0; if (layout->total_batch_covering > 0) { - const size_t n = - layout->total_batch_covering + (size_t)handoff->nlod; - CU(Error, - cuMemcpyDtoHAsync(slot->h_offsets, - (CUdeviceptr)slot->d_offsets, - n * sizeof(size_t), - d2h_stream)); - CU(Error, - cuMemcpyDtoHAsync(slot->h_permuted_sizes, - (CUdeviceptr)slot->d_permuted_sizes, - n * sizeof(size_t), - d2h_stream)); + const size_t n = layout->total_batch_covering + (size_t)handoff->nlod; + D2H_TRY(dispatch_err, + "cuMemcpyDtoHAsync", + cuMemcpyDtoHAsync(slot->h_offsets, + (CUdeviceptr)slot->d_offsets, + n * sizeof(size_t), + d2h_stream)); + if (!dispatch_err) + D2H_TRY(dispatch_err, + "cuMemcpyDtoHAsync", + cuMemcpyDtoHAsync(slot->h_permuted_sizes, + (CUdeviceptr)slot->d_permuted_sizes, + n * sizeof(size_t), + d2h_stream)); } - - // One D2H for the unified aggregated data buffer. Sized to - // total_data_bytes (max across all LODs' page-aligned segments). - if (layout->total_data_bytes > 0) { - CU(Error, - cuMemcpyDtoHAsync(slot->h_aggregated, - (CUdeviceptr)slot->d_aggregated, - layout->total_data_bytes, - d2h_stream)); + if (!dispatch_err) + D2H_TRY(dispatch_err, + "cuEventRecord", + cuEventRecord(stage->h_chunk_index_ready[fc], d2h_stream)); + + // Compressed defers bulk D2H to sync_and_deliver once the chunk index lands. + if (!dispatch_err && handoff->passthrough && layout->total_data_bytes > 0) + D2H_TRY(dispatch_err, + "cuMemcpyDtoHAsync", + cuMemcpyDtoHAsync(slot->h_aggregated, + (CUdeviceptr)slot->d_aggregated, + layout->total_data_bytes, + d2h_stream)); + + // Always record passthrough completion events even on dispatch error: + // cap-stacking waiters block on slot->ready and would hang otherwise. + if (handoff->passthrough) { + CU(Error, cuEventRecord(stage->ready[fc], d2h_stream)); + CU(Error, cuEventRecord(slot->ready, d2h_stream)); } - CU(Error, cuEventRecord(stage->ready[fc], d2h_stream)); - // Also signal the slot's own ready event so the next compress that reuses - // this slot waits on it. - CU(Error, cuEventRecord(slot->ready, d2h_stream)); - return 0; + return dispatch_err; Error: return 1; @@ -421,7 +508,6 @@ struct writer_result d2h_deliver_drain(struct d2h_deliver_stage* stage, const struct flush_handoff* handoff, const struct level_geometry* levels, - const struct batch_state* batch, const struct dim_info* dims, const struct tile_stream_layout* layout, const struct tile_stream_configuration* config, @@ -429,9 +515,9 @@ d2h_deliver_drain(struct d2h_deliver_stage* stage, const struct lod_state* lod, const struct lod_shared_state* lod_shared, struct stream_metrics* metrics, - struct platform_clock* metadata_update_clock) + struct platform_clock* metadata_update_clock, + CUstream d2h_stream) { - (void)batch; struct writer_result r = sync_and_deliver(stage, handoff, levels, @@ -441,9 +527,11 @@ d2h_deliver_drain(struct d2h_deliver_stage* stage, sink, lod, lod_shared, - metrics); + metrics, + d2h_stream); if (!r.error) { - if (maybe_update_metadata(handoff, dims, config, sink, metadata_update_clock)) + if (maybe_update_metadata( + handoff, dims, config, sink, metadata_update_clock)) return writer_error(); } return r; diff --git a/src/gpu/flush.d2h_deliver.h b/src/gpu/flush.d2h_deliver.h index 40c103b..cf474a6 100644 --- a/src/gpu/flush.d2h_deliver.h +++ b/src/gpu/flush.d2h_deliver.h @@ -3,36 +3,26 @@ #include "gpu/stream.internal.h" #include "stream/dim_info.h" -// Initialize the D2H+deliver stage. Returns 0 on success. int d2h_deliver_init(struct d2h_deliver_stage* stage, size_t shard_alignment, CUstream compute); -// Destroy the D2H+deliver stage. void d2h_deliver_destroy(struct d2h_deliver_stage* stage); -// Enqueue the full D2H (offset + bulk) for this batch on the d2h stream, -// non-blocking. Briefly host-syncs on the offset D2H to size the bulk -// transfer, and waits on prior sink IO fences for this fc. ready[fc] is -// recorded after bulk D2H completes. Returns 0 on success. +// Pass-through codecs complete the D2H here; compressed codecs only land +// the chunk index and finish in drain (bulk D2H is sized by actual bytes). int d2h_deliver_kick(struct d2h_deliver_stage* stage, const struct flush_handoff* handoff, - const struct level_geometry* levels, - const struct batch_state* batch, - const struct dim_info* dims, struct shard_sink* sink, CUstream d2h_stream); -// Synchronize D2H, record metrics, deliver to sinks. -// Returns writer_ok() on success. struct writer_result d2h_deliver_drain(struct d2h_deliver_stage* stage, const struct flush_handoff* handoff, const struct level_geometry* levels, - const struct batch_state* batch, const struct dim_info* dims, const struct tile_stream_layout* layout, const struct tile_stream_configuration* config, @@ -40,4 +30,5 @@ d2h_deliver_drain(struct d2h_deliver_stage* stage, const struct lod_state* lod, const struct lod_shared_state* lod_shared, struct stream_metrics* metrics, - struct platform_clock* metadata_update_clock); + struct platform_clock* metadata_update_clock, + CUstream d2h_stream); diff --git a/src/gpu/flush.handoff.h b/src/gpu/flush.handoff.h index e12a997..65508f8 100644 --- a/src/gpu/flush.handoff.h +++ b/src/gpu/flush.handoff.h @@ -23,21 +23,26 @@ struct shard_tables; // must read per-LOD active counts from `per_lod_n_active` instead. struct flush_handoff { - int fc; // flush slot index - uint32_t n_epochs; // epochs in batch - uint32_t active_levels_mask; // which levels active - const uint32_t* batch_active_masks; // borrowed [K] per-epoch masks - uint32_t per_lod_n_active[LOD_MAX_LEVELS]; // owned, for delivery sizing + int fc; // flush slot index + uint32_t n_epochs; // epochs in batch + uint32_t active_levels_mask; // which levels active + const uint32_t* batch_active_masks; // borrowed [K] per-epoch masks + uint32_t per_lod_n_active[LOD_MAX_LEVELS]; // owned, for delivery sizing uint8_t nlod; CUevent t_aggregate_end; // D2H waits on this CUevent t_compress_start; // for metrics CUevent t_compress_end; // for metrics - struct aggregate_slot* agg; // borrowed: unified slot for fc - struct batch_aggregate_layout layout; // owned (by-value snapshot) + struct aggregate_slot* agg; // borrowed: unified slot for fc + struct batch_aggregate_layout layout; // owned (by-value snapshot) const struct aggregate_layout* per_lod_agg_layouts; // borrowed [nlod] struct shard_state* shards_by_lod[LOD_MAX_LEVELS]; // borrowed - struct shard_tables* shards; // borrowed (for tail HtoD) - size_t max_output_size; // codec bound + struct shard_tables* shards; // borrowed (for tail HtoD) + size_t max_output_size; // codec bound + + // Pass-through codec (CODEC_NONE): per-LOD bytes equal worst-case, so + // delivery skips the exact-size sync and keeps the kick-time bulk D2H + // path that overlaps with the next batch's compute. + uint8_t passthrough; }; diff --git a/src/gpu/stream.engine.h b/src/gpu/stream.engine.h index 55dbd2a..2d02004 100644 --- a/src/gpu/stream.engine.h +++ b/src/gpu/stream.engine.h @@ -146,10 +146,10 @@ struct shard_tables // Per-shard parameters used by add_shard_bias_unified_k and // copy_leading_tail_unified_k. Host shadows are owned; device buffers are // allocated at init sized to the max across arrays. - size_t* h_base_offsets; // base byte offset in d_aggregated - size_t* h_shard_capacity; // per shard - uint64_t* h_tps_group; // chunks-per-shard within a batch - uint64_t* h_offsets_base; // base index in d_offsets / d_permuted_sizes + size_t* h_base_offsets; // base byte offset in d_aggregated + size_t* h_shard_capacity; // per shard + uint64_t* h_tps_group; // chunks-per-shard within a batch + uint64_t* h_offsets_base; // base index in d_offsets / d_permuted_sizes size_t* d_base_offsets; size_t* d_shard_capacity; @@ -199,8 +199,8 @@ struct compress_agg_stage uint32_t* h_lut_gather_scratch; // for building unified LUT host-side uint32_t* h_lut_perm_scratch; uint32_t cached_per_lod_n_active[LOD_MAX_LEVELS]; // last uploaded counts - uint32_t* cached_pool_epochs; // [LOD_MAX_LEVELS * pool_epochs_stride] - uint32_t pool_epochs_stride; // max K used by scratch + cache + uint32_t* cached_pool_epochs; // [LOD_MAX_LEVELS * pool_epochs_stride] + uint32_t pool_epochs_stride; // max K used by scratch + cache int lut_cache_valid; uint64_t lut_steady_count; uint64_t lut_recompute_count; @@ -229,7 +229,8 @@ struct compress_agg_stage struct d2h_deliver_stage { CUevent t_d2h_start[2]; - CUevent ready[2]; // unified D2H completion (offsets+sizes+data) + CUevent h_chunk_index_ready[2]; // h_offsets + h_permuted_sizes on host + CUevent ready[2]; // full D2H done; gates slot reuse size_t shard_alignment; // from sink; 0 = no alignment struct stream_metrics* metrics; // borrowed, for stall-time accumulation diff --git a/src/gpu/stream.flush.c b/src/gpu/stream.flush.c index 627b02c..0928637 100644 --- a/src/gpu/stream.flush.c +++ b/src/gpu/stream.flush.c @@ -87,7 +87,6 @@ drain_fc(struct stream_engine* e, struct stream_context* ctx, int fc) struct writer_result r = d2h_deliver_drain(&e->d2h_deliver, &e->flush.pending_handoff[fc], &ctx->levels, - &e->batch, &ctx->dims, &ctx->layout, &ctx->config, @@ -95,7 +94,8 @@ drain_fc(struct stream_engine* e, struct stream_context* ctx, int fc) &e->lod, &e->lod_shared, &e->metrics, - &e->metadata_update_clock); + &e->metadata_update_clock, + e->streams.d2h); float ms = (float)(platform_toc(&stall_clk) * 1000.0); accumulate_metric_ms(&e->metrics.flush_stall, ms, 0, 0); @@ -127,7 +127,6 @@ drain_kick_and_swap(struct stream_engine* e, struct stream_context* ctx) return r; } - // Phase 2: kick compress+aggregate for the new batch (compress stream). fs->batch_epoch_count = (int)e->batch.accumulated; struct compress_agg_input in = make_compress_input(e, ctx, completed_pool, e->batch.accumulated); @@ -136,18 +135,12 @@ drain_kick_and_swap(struct stream_engine* e, struct stream_context* ctx) compress_agg_kick(&e->compress_agg, &in, &ctx->levels, - &e->batch, - &ctx->dims, e->streams.compress, &new_handoff) == 0); - // Phase 3: kick the full D2H (offset sync + bulk queue) — non-blocking. CHECK(Error, d2h_deliver_kick(&e->d2h_deliver, &new_handoff, - &ctx->levels, - &e->batch, - &ctx->dims, ctx->sink, e->streams.d2h) == 0); @@ -238,17 +231,12 @@ flush_kick_batch(struct stream_engine* e, compress_agg_kick(&e->compress_agg, &in, &ctx->levels, - &e->batch, - &ctx->dims, e->streams.compress, &handoff) == 0); CHECK(Error, d2h_deliver_kick(&e->d2h_deliver, &handoff, - &ctx->levels, - &e->batch, - &ctx->dims, ctx->sink, e->streams.d2h) == 0); diff --git a/src/multiarray/stream.gpu.c b/src/multiarray/stream.gpu.c index 1833548..d77028c 100644 --- a/src/multiarray/stream.gpu.c +++ b/src/multiarray/stream.gpu.c @@ -528,7 +528,6 @@ init_shared_resources(struct multiarray_tile_stream_gpu* ms, for (int fc = 0; fc < 2; ++fc) { CHECK(Fail, aggregate_batch_slot_init(&e->compress_agg.agg[fc], - mx->u_max_total_batch_chunks, C_max, mx->u_max_total_data_bytes) == 0); CU(Fail, cuEventRecord(e->compress_agg.agg[fc].ready, e->streams.compute)); diff --git a/src/types.stream.h b/src/types.stream.h index 8cc2e72..f64c6cf 100644 --- a/src/types.stream.h +++ b/src/types.stream.h @@ -12,9 +12,9 @@ struct stream_metric { const char* name; - float ms; // cumulative - float best_ms; // best single measurement (1e30f = not yet measured) - double best_input_bytes; // input_bytes at the best-ms call (for best GiB/s) + float ms; // cumulative + float best_ms; // best single measurement (1e30f = not yet measured) + double best_input_bytes; // input_bytes at the best-ms call (for best GiB/s) double best_output_bytes; // output_bytes at the best-ms call double input_bytes; // cumulative bytes read by stage double output_bytes; // cumulative bytes written by stage @@ -44,10 +44,12 @@ struct stream_metrics // flush_stall + kick_sync_stall + sink over-counts wall time. To isolate // the drain-side overhead not already attributed elsewhere, compute // flush_stall - kick_sync_stall - (sink portion inside drain). - struct stream_metric flush_stall; // whole d2h_deliver_drain call - // (drain_fc in stream.flush.c) - struct stream_metric kick_sync_stall; // cuEventSynchronize(ready[fc]) - // in sync_and_deliver + struct stream_metric flush_stall; // whole d2h_deliver_drain call + // (drain_fc in stream.flush.c) + // Passthrough: ready[fc] poll only. Compressed: h_chunk_index_ready + // poll + per-LOD bulk D2H dispatch + ready[fc] poll (the second poll + // now includes DMA transfer wall time). + struct stream_metric kick_sync_stall; struct stream_metric io_fence_stall; // GPU: wait_io_fences in // d2h_deliver_kick. CPU: wait_fence // before aggregate in diff --git a/tests/test_aggregate_contract_gpu.c b/tests/test_aggregate_contract_gpu.c index 4be6cb3..b7e17e9 100644 --- a/tests/test_aggregate_contract_gpu.c +++ b/tests/test_aggregate_contract_gpu.c @@ -126,7 +126,7 @@ run_gpu_aggregate(struct gpu_run* r, const size_t comp_pool_bytes = N * g->max_comp; CHECK(Fail, - aggregate_batch_slot_init(&r->slot, N, batch_C, comp_pool_bytes) == 0); + aggregate_batch_slot_init(&r->slot, batch_C, comp_pool_bytes) == 0); // Synthetic compressed pool: chunk i has size (10 + i%7), filled with // value (i+1)&0xff. Same shape as the CPU test. diff --git a/tests/test_compress_agg.c b/tests/test_compress_agg.c index e703519..e6e4851 100644 --- a/tests/test_compress_agg.c +++ b/tests/test_compress_agg.c @@ -143,8 +143,6 @@ ca_ctx_kick(struct ca_test_ctx* c, compress_agg_kick(&c->stage, &in, &c->cl.levels, - &c->batch, - &c->cl.dims, c->compute, handoff) == 0); CU(Fail, cuStreamSynchronize(c->compute)); @@ -766,8 +764,6 @@ test_compress_agg_lut_cache_position_shift(void) compress_agg_kick(&c.stage, &in, &c.cl.levels, - &c.batch, - &c.cl.dims, c.compute, &handoff) == 0); CU(Fail, cuStreamSynchronize(c.compute)); @@ -796,8 +792,6 @@ test_compress_agg_lut_cache_position_shift(void) compress_agg_kick(&c.stage, &in, &c.cl.levels, - &c.batch, - &c.cl.dims, c.compute, &handoff2) == 0); CU(Fail, cuStreamSynchronize(c.compute)); diff --git a/tests/test_d2h_deliver.c b/tests/test_d2h_deliver.c index 50b10ab..acf4d42 100644 --- a/tests/test_d2h_deliver.c +++ b/tests/test_d2h_deliver.c @@ -81,9 +81,7 @@ test_ctx_setup(struct test_ctx* c, c->ca_inited = 1; CHECK(Fail, - d2h_deliver_init(&c->d2h, - platform_page_alignment(), - c->compute) == 0); + d2h_deliver_init(&c->d2h, platform_page_alignment(), c->compute) == 0); c->d2h_inited = 1; size_t pool_bytes = (uint64_t)n_pool_epochs * c->cl.levels.total_chunks * @@ -150,24 +148,15 @@ test_ctx_kick_and_drain(struct test_ctx* c, compress_agg_kick(&c->ca, &in, &c->cl.levels, - &c->batch, - &c->cl.dims, c->compute, handoff) == 0); CHECK(Fail, - d2h_deliver_kick(&c->d2h, - handoff, - &c->cl.levels, - &c->batch, - &c->cl.dims, - sink, - c->d2h_stream) == 0); + d2h_deliver_kick(&c->d2h, handoff, sink, c->d2h_stream) == 0); struct writer_result r = d2h_deliver_drain(&c->d2h, handoff, &c->cl.levels, - &c->batch, &c->cl.dims, &c->cl.layouts[0], config, @@ -175,7 +164,8 @@ test_ctx_kick_and_drain(struct test_ctx* c, &c->lod, &c->lod_shared, &c->metrics, - &c->metadata_clock); + &c->metadata_clock, + c->d2h_stream); CHECK(Fail, r.error == 0); return 0; @@ -652,7 +642,202 @@ test_d2h_double_buffer(void) return ok ? 0 : 1; } +// Three-cycle compressed run: cycle 3 reuses fc=0, so slot->ready +// recorded in cycle 1 must survive into cycle 3's compress wait. Two +// cycles would never reuse a slot. +static int +test_d2h_zstd_double_buffer(void) +{ + log_info("=== test_d2h_zstd_double_buffer ==="); + + struct dimension dims[3]; + struct tile_stream_configuration config; + make_test_config(&config, dims, (struct codec_config){ .id = CODEC_ZSTD }, 1); + + struct test_shard_sink sink; + test_sink_init(&sink, TEST_SHARD_SINK_MAX_SHARDS, 512 * 1024); + + struct test_ctx c; + test_ctx_init(&c); + uint8_t* decomp_buf = NULL; + int ok = 0; + + CHECK(Fail, test_ctx_setup(&c, &config, 2) == 0); + + const uint64_t total_chunks = c.cl.levels.total_chunks; + const uint64_t chunk_stride = c.cl.layouts[0].chunk_stride; + const size_t bytes_per_element = dtype_bpe(config.dtype); + const size_t chunk_bytes = chunk_stride * bytes_per_element; + size_t epoch_pool_bytes = total_chunks * chunk_stride * bytes_per_element; + + CHECK( + Fail, + fill_pool_epoch( + c.d_pool, total_chunks, chunk_stride, bytes_per_element, fill_epoch0) == + 0); + CU(Fail, cuEventRecord(c.pool_ready, c.compute)); + + { + struct flush_handoff handoff; + CHECK(Fail, + test_ctx_kick_and_drain( + &c, &config, &sink.base, 0, 1, c.d_pool, c.pool_ready, &handoff) == + 0); + } + + CHECK(Fail, sink.finalize_count == 0); + + CHECK(Fail, + fill_pool_epoch(c.d_pool + epoch_pool_bytes, + total_chunks, + chunk_stride, + bytes_per_element, + fill_epoch1) == 0); + CU(Fail, cuEventRecord(c.pool_ready, c.compute)); + + { + struct flush_handoff handoff; + CHECK(Fail, + test_ctx_kick_and_drain(&c, + &config, + &sink.base, + 1, + 1, + c.d_pool + epoch_pool_bytes, + c.pool_ready, + &handoff) == 0); + } + + CHECK(Fail, sink.finalize_count == 1); + + // Cycle 3: reuse fc=0. compress_agg's wait on prev_d2h_done depends on + // sync_and_deliver having recorded slot->ready in cycle 1. + CHECK( + Fail, + fill_pool_epoch( + c.d_pool, total_chunks, chunk_stride, bytes_per_element, fill_epoch2) == + 0); + CU(Fail, cuEventRecord(c.pool_ready, c.compute)); + + { + struct flush_handoff handoff; + CHECK(Fail, + test_ctx_kick_and_drain( + &c, &config, &sink.base, 0, 1, c.d_pool, c.pool_ready, &handoff) == + 0); + } + + CHECK(Fail, sink.finalize_count == 1); + + // Cycle 4 finalizes shard 1 so cycle 3's data lands on disk and can be + // verified — without this, cycle 3 corruption would pass silently. + CHECK(Fail, + fill_pool_epoch(c.d_pool + epoch_pool_bytes, + total_chunks, + chunk_stride, + bytes_per_element, + fill_epoch3) == 0); + CU(Fail, cuEventRecord(c.pool_ready, c.compute)); + + { + struct flush_handoff handoff; + CHECK(Fail, + test_ctx_kick_and_drain(&c, + &config, + &sink.base, + 1, + 1, + c.d_pool + epoch_pool_bytes, + c.pool_ready, + &handoff) == 0); + } + + CHECK(Fail, sink.finalize_count == 2); + + { + struct shard_state* ss = &c.ca.shard[0]; + uint64_t tps_total = ss->chunks_per_shard_total; + size_t index_data_bytes = tps_total * 2 * sizeof(uint64_t); + size_t index_total_bytes = index_data_bytes + 4; + + const struct aggregate_layout* al = &c.ca.per_lod_agg_layouts[0]; + uint64_t cps_inner = ss->chunks_per_shard_inner; + + decomp_buf = (uint8_t*)malloc(chunk_bytes); + CHECK(Fail, decomp_buf); + + uint16_t (*fills[4])(uint64_t) = { + fill_epoch0, fill_epoch1, fill_epoch2, fill_epoch3 + }; + int errors = 0; + for (int shard = 0; shard < 2; ++shard) { + CHECK(Fail, sink.writers[0][shard].size >= index_total_bytes); + size_t index_start = sink.writers[0][shard].size - index_total_bytes; + const uint64_t* idx = + (const uint64_t*)(sink.writers[0][shard].buf + index_start); + + for (int local_epoch = 0; local_epoch < 2; ++local_epoch) { + const int global_epoch = shard * 2 + local_epoch; + uint16_t (*fill_fn)(uint64_t) = fills[global_epoch]; + for (uint64_t t = 0; t < total_chunks; ++t) { + uint32_t pi = cpu_perm( + t, al->lifted_rank, al->lifted_shape, al->lifted_strides); + uint64_t slot_idx = (uint64_t)local_epoch * cps_inner + pi; + uint64_t tile_off = idx[2 * slot_idx]; + uint64_t tile_sz = idx[2 * slot_idx + 1]; + + CHECK(Fail, tile_sz > 0); + CHECK(Fail, tile_off + tile_sz <= sink.writers[0][shard].size); + + size_t result = ZSTD_decompress(decomp_buf, + chunk_bytes, + sink.writers[0][shard].buf + tile_off, + tile_sz); + if (ZSTD_isError(result)) { + log_error(" shard %d epoch %d chunk %lu: ZSTD_decompress: %s", + shard, + global_epoch, + (unsigned long)t, + ZSTD_getErrorName(result)); + errors++; + continue; + } + CHECK(Fail, result == chunk_bytes); + + uint16_t expected_val = fill_fn(t); + const uint16_t* got = (const uint16_t*)decomp_buf; + for (uint64_t e = 0; e < chunk_stride; ++e) { + if (got[e] != expected_val) { + if (errors < 5) + log_error(" shard %d epoch %d chunk %lu elem %lu: " + "expected %u got %u", + shard, + global_epoch, + (unsigned long)t, + (unsigned long)e, + expected_val, + got[e]); + errors++; + } + } + } + } + } + CHECK(Fail, errors == 0); + } + + ok = 1; + +Fail: + free(decomp_buf); + test_ctx_destroy(&c); + test_sink_free(&sink); + log_info(" %s", ok ? "PASS" : "FAIL"); + return ok ? 0 : 1; +} + RUN_GPU_TESTS({ "d2h_single_epoch_none", test_d2h_single_epoch_none }, { "d2h_batch_none", test_d2h_batch_none }, { "d2h_zstd_single_epoch", test_d2h_zstd_single_epoch }, - { "d2h_double_buffer", test_d2h_double_buffer }, ) + { "d2h_double_buffer", test_d2h_double_buffer }, + { "d2h_zstd_double_buffer", test_d2h_zstd_double_buffer }, )