Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/gpu/aggregate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,10 @@ gather_batch_k(const void* __restrict__ d_compressed,

extern "C" int
aggregate_batch_slot_init(struct aggregate_slot* slot,
uint64_t batch_chunk_count,
uint64_t batch_covering_count,
size_t comp_pool_bytes)
{
uint64_t C = batch_covering_count;
(void)batch_chunk_count;

CHECK(Error, slot);
memset(slot, 0, sizeof(*slot));
Expand Down
1 change: 0 additions & 1 deletion src/gpu/aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ extern "C"
void aggregate_slot_destroy(struct aggregate_slot* slot);

int aggregate_batch_slot_init(struct aggregate_slot* slot,
uint64_t batch_chunk_count,
uint64_t batch_covering_count,
size_t comp_pool_bytes);

Expand Down
65 changes: 25 additions & 40 deletions src/gpu/flush.compress_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ kick_compress(struct compress_agg_stage* stage,

// --- Init / Destroy ---


int
compress_agg_init(struct compress_agg_stage* stage,
const struct computed_stream_layouts* cl,
Expand Down Expand Up @@ -78,9 +77,9 @@ compress_agg_init(struct compress_agg_stage* stage,
const int need_compressed = (stage->codec.type != CODEC_NONE);
for (int fc = 0; fc < 2; ++fc) {
if (need_compressed)
CU(Fail,
cuMemAlloc(&stage->d_compressed[fc],
M * stage->codec.max_output_size));
CU(
Fail,
cuMemAlloc(&stage->d_compressed[fc], M * stage->codec.max_output_size));
CU(Fail, cuEventCreate(&stage->t_compress_start[fc], CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&stage->t_compress_end[fc], CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&stage->t_aggregate_end[fc], CU_EVENT_DEFAULT));
Expand All @@ -95,11 +94,11 @@ compress_agg_init(struct compress_agg_stage* stage,
// them per-array. Each layout's GPU-side d_lifted_shape/strides are uploaded.
for (int lv = 0; lv < cl->levels.nlod; ++lv) {
stage->per_lod_agg_layouts[lv] = cl->per_level[lv].agg_layout;
CHECK(Fail,
aggregate_layout_upload(&stage->per_lod_agg_layouts[lv]) == 0);
CHECK(Fail, aggregate_layout_upload(&stage->per_lod_agg_layouts[lv]) == 0);
}

// Cached max batch layout assuming each LOD fires its worst-case active count.
// Cached max batch layout assuming each LOD fires its worst-case active
// count.
{
uint32_t per_lod_max[LOD_MAX_LEVELS] = { 0 };
for (int lv = 0; lv < cl->levels.nlod; ++lv)
Expand All @@ -124,17 +123,11 @@ compress_agg_init(struct compress_agg_stage* stage,
// aggregate_batch_luts_unified). The CUB exclusive scan fills all
// sentinel positions, so no per-LOD write_total fixup is needed.
if (stage->max_total_batch_chunks > 0) {
// aggregate_batch_slot_init allocates (C+1) entries for d_offsets /
// d_permuted_sizes / h_offsets and C entries for h_permuted_sizes. We
// want all to fit total_batch_covering + nlod entries, so pass
// C = max_total_batch_covering + nlod (h_permuted_sizes ends up exactly
// matching the scan output length).
const uint64_t C_max =
stage->max_total_batch_covering + (uint64_t)stage->nlod;
for (int fc = 0; fc < 2; ++fc) {
CHECK(Fail,
aggregate_batch_slot_init(&stage->agg[fc],
stage->max_total_batch_chunks,
C_max,
stage->max_total_data_bytes) == 0);
CU(Fail, cuEventRecord(stage->agg[fc].ready, compute));
Expand Down Expand Up @@ -225,24 +218,21 @@ compress_agg_init(struct compress_agg_stage* stage,
// Tail-carry buffer: total_shards * page_size bytes; uniform layout
// across LODs (sink page size is uniform).
if (stage->shards.page_size > 0) {
stage->shards.tail_carry_bytes =
total_shards * stage->shards.page_size;
stage->shards.tail_carry_bytes = total_shards * stage->shards.page_size;
CU(Fail,
cuMemAlloc(&stage->shards.d_tail_carry,
stage->shards.tail_carry_bytes));
CU(Fail,
cuMemsetD8(stage->shards.d_tail_carry,
0,
stage->shards.tail_carry_bytes));
cuMemsetD8(
stage->shards.d_tail_carry, 0, stage->shards.tail_carry_bytes));
}
}
}

// Per-LOD shard_state (writers + tail/footer pools + generation
// bookkeeping). The unified kick/D2H path iterates this directly.
for (int lv = 0; lv < cl->levels.nlod; ++lv)
CHECK(Fail,
init_shard_state(&stage->shard[lv], &cl->per_level[lv]) == 0);
CHECK(Fail, init_shard_state(&stage->shard[lv], &cl->per_level[lv]) == 0);

// Seed events
for (int fc = 0; fc < 2; ++fc) {
Expand Down Expand Up @@ -368,13 +358,9 @@ int
compress_agg_kick(struct compress_agg_stage* stage,
const struct compress_agg_input* in,
const struct level_geometry* levels,
const struct batch_state* batch,
const struct dim_info* dims,
CUstream compress_stream,
struct flush_handoff* out)
{
(void)batch;
(void)dims;
const int fc = in->fc;
const uint32_t n_epochs = in->n_epochs;
const uint8_t nlod = stage->nlod;
Expand All @@ -400,28 +386,27 @@ compress_agg_kick(struct compress_agg_stage* stage,
// Page size is uniform across LODs (sink-driven); read from LOD 0.
struct batch_aggregate_layout layout;
const size_t page_size = stage->per_lod_agg_layouts[0].page_size;
CHECK(Error,
batch_aggregate_layout_init(&layout,
stage->per_lod_agg_layouts,
per_lod_n_active,
nlod,
page_size) == 0);
CHECK(
Error,
batch_aggregate_layout_init(
&layout, stage->per_lod_agg_layouts, per_lod_n_active, nlod, page_size) ==
0);

CHECK(Error, layout.total_data_bytes <= stage->max_total_data_bytes);
CHECK(Error, layout.total_batch_chunks <= stage->max_total_batch_chunks);
CHECK(Error,
layout.total_batch_covering <= stage->max_total_batch_covering);
CHECK(Error, layout.total_batch_covering <= stage->max_total_batch_covering);

// --- Phase 3: build & upload unified LUTs (cached in steady state) -------
// Cache key: per-LOD active count AND each LOD's pool_epoch values. Counts
// alone are insufficient — the gather LUT encodes the actual epoch indices,
// so two batches with identical counts but different active-epoch positions
// (mid-stream phase shifts when K doesn't divide an LOD's append period)
// would mis-hit and reuse stale gather indices. See [[ok-let-s-make-a-curious-prism]].
int lut_steady = stage->lut_cache_valid &&
memcmp(stage->cached_per_lod_n_active,
per_lod_n_active,
(size_t)nlod * sizeof(uint32_t)) == 0;
// would mis-hit and reuse stale gather indices. See
// [[ok-let-s-make-a-curious-prism]].
int lut_steady =
stage->lut_cache_valid && memcmp(stage->cached_per_lod_n_active,
per_lod_n_active,
(size_t)nlod * sizeof(uint32_t)) == 0;
for (uint8_t lv = 0; lut_steady && lv < nlod; ++lv) {
const uint32_t n_lv = per_lod_n_active[lv];
if (n_lv == 0)
Expand Down Expand Up @@ -536,13 +521,13 @@ compress_agg_kick(struct compress_agg_stage* stage,
out->active_levels_mask = in->active_levels_mask;
out->batch_active_masks = in->batch_active_masks;
out->nlod = nlod;
memcpy(out->per_lod_n_active,
per_lod_n_active,
(size_t)nlod * sizeof(uint32_t));
memcpy(
out->per_lod_n_active, per_lod_n_active, (size_t)nlod * sizeof(uint32_t));
out->t_aggregate_end = stage->t_aggregate_end[fc];
out->t_compress_start = stage->t_compress_start[fc];
out->t_compress_end = stage->t_compress_end[fc];
out->max_output_size = stage->codec.max_output_size;
out->passthrough = (stage->codec.type == CODEC_NONE);
out->agg = &stage->agg[fc];
out->layout = layout;
out->per_lod_agg_layouts = stage->per_lod_agg_layouts;
Expand Down
6 changes: 0 additions & 6 deletions src/gpu/flush.compress_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@

struct computed_stream_layouts;

// Initialize the compress+aggregate stage. Returns 0 on success.
int
compress_agg_init(struct compress_agg_stage* stage,
const struct computed_stream_layouts* cl,
const struct tile_stream_configuration* config,
CUstream compute);

// Destroy the compress+aggregate stage.
void
compress_agg_destroy(struct compress_agg_stage* stage, int nlod);

// Kick compress+aggregate for a batch. Populates handoff for D2H stage.
// Returns 0 on success.
int
compress_agg_kick(struct compress_agg_stage* stage,
const struct compress_agg_input* in,
const struct level_geometry* levels,
const struct batch_state* batch,
const struct dim_info* dims,
CUstream compress_stream,
struct flush_handoff* out);
Loading
Loading