Skip to content

Commit

Permalink
Rename fixed-size queries to leader-selected queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoyla committed Nov 28, 2024
1 parent 8be8c80 commit 129cdc6
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 166 deletions.
2 changes: 1 addition & 1 deletion crates/dapf/src/acceptance/load_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ pub async fn execute_single_combination_from_env(
println!("\t- load_control: {load_control:?}");

let batch_id = BatchId(thread_rng().gen());
let part_batch_sel = PartialBatchSelector::FixedSizeByBatchId { batch_id };
let part_batch_sel = PartialBatchSelector::LeaderSelectedByBatchId { batch_id };

let run = async {
let (test_task_config, hpke_config_fetch_time) = t
Expand Down
6 changes: 3 additions & 3 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl Test {
time_precision: 3600,
lifetime: 60,
min_batch_size: reports_per_batch.try_into().unwrap(),
query: DapQueryConfig::FixedSize {
query: DapQueryConfig::LeaderSelected {
max_batch_size: NonZeroU32::new(reports_per_batch.try_into().unwrap()),
},
vdaf: self.vdaf_config,
Expand Down Expand Up @@ -554,7 +554,7 @@ impl Test {
) -> Result<Duration> {
// Prepare AggregateShareReq.
let agg_share_req = AggregateShareReq {
batch_sel: BatchSelector::FixedSizeByBatchId { batch_id },
batch_sel: BatchSelector::LeaderSelectedByBatchId { batch_id },
agg_param: Vec::new(),
report_count: leader_agg_share.report_count,
checksum: leader_agg_share.checksum,
Expand Down Expand Up @@ -636,7 +636,7 @@ impl Test {
////

let batch_id = BatchId(rngs::OsRng.gen());
let part_batch_sel = PartialBatchSelector::FixedSizeByBatchId { batch_id };
let part_batch_sel = PartialBatchSelector::LeaderSelectedByBatchId { batch_id };

let (leader_agg_share, agg_job_duration) = self
.run_agg_jobs(
Expand Down
8 changes: 5 additions & 3 deletions crates/dapf/src/cli_parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ impl FromStr for CliDapQueryConfig {
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if s == "time-interval" {
Ok(Self(DapQueryConfig::TimeInterval))
} else if let Some(size) = s.strip_prefix("fixed-size") {
Ok(Self(DapQueryConfig::FixedSize {
} else if let Some(size) = s.strip_prefix("leader-selected") {
Ok(Self(DapQueryConfig::LeaderSelected {
max_batch_size: if let Some(size) = size.strip_prefix("-") {
Some(
size.parse()
Expand All @@ -163,7 +163,9 @@ impl FromStr for CliDapQueryConfig {
} else if size.is_empty() {
None
} else {
return Err(format!("{size} is an invalid fixed size max batch size"));
return Err(format!(
"{size} is an invalid leader selected max batch size"
));
},
}))
} else {
Expand Down
10 changes: 5 additions & 5 deletions crates/dapf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,8 @@ async fn handle_decode_actions(action: DecodeAction) -> anyhow::Result<()> {
let batch_selector = batch_selector.unwrap_or_else(||
match message.part_batch_sel {
PartialBatchSelector::TimeInterval => panic!("can't deduce the time interval, please provide a batch selector"),
PartialBatchSelector::FixedSizeByBatchId { batch_id } => {
BatchSelector::FixedSizeByBatchId { batch_id }
PartialBatchSelector::LeaderSelectedByBatchId { batch_id } => {
BatchSelector::LeaderSelectedByBatchId { batch_id }
}
}
);
Expand Down Expand Up @@ -844,7 +844,7 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
let vdaf_verify_key = encode_base64url(vdaf.gen_verify_key());
let CliDapQueryConfig(query) = use_or_request_from_user_or_default(
query,
|| DapQueryConfig::FixedSize {
|| DapQueryConfig::LeaderSelected {
max_batch_size: None,
},
"query",
Expand Down Expand Up @@ -876,7 +876,7 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
vdaf_verify_key,
query_type: match query {
DapQueryConfig::TimeInterval => 1,
DapQueryConfig::FixedSize { .. } => 2,
DapQueryConfig::LeaderSelected { .. } => 2,
},
min_batch_size: use_or_request_from_user_or_default(
min_batch_size,
Expand All @@ -885,7 +885,7 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
)?,
max_batch_size: match query {
DapQueryConfig::TimeInterval => None,
DapQueryConfig::FixedSize { max_batch_size } => max_batch_size,
DapQueryConfig::LeaderSelected { max_batch_size } => max_batch_size,
},
time_precision: use_or_request_from_user_or_default(
time_precision,
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl DapAggregator for crate::App {
}))?;
let version = task_config.as_ref().version;

let agg_span = task_config.batch_span_for_sel(&BatchSelector::FixedSizeByBatchId {
let agg_span = task_config.batch_span_for_sel(&BatchSelector::LeaderSelectedByBatchId {
batch_id: *batch_id,
})?;

Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ mod test_utils {
err = "command failed: unexpected max batch size"
))
}
(2, max_batch_size) => DapQueryConfig::FixedSize { max_batch_size },
(2, max_batch_size) => DapQueryConfig::LeaderSelected { max_batch_size },
_ => {
return Err(fatal_error!(
err = "command failed: unrecognized query type"
Expand Down
12 changes: 6 additions & 6 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,9 +1162,9 @@ async fn leader_collect_abort_overlapping_batch_interval(version: DapVersion) {
async_test_versions! { leader_collect_abort_overlapping_batch_interval }

#[tokio::test]
async fn fixed_size() {
async fn leader_selected() {
let version = DapVersion::Draft09;
let t = TestRunner::fixed_size(version).await;
let t = TestRunner::leader_selected(version).await;
let path = t.upload_path();
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
Expand Down Expand Up @@ -1204,7 +1204,7 @@ async fn fixed_size() {
// Collector: Get the collect URI.
let agg_param = DapAggregationParam::Empty;
let collect_req = CollectionReq {
query: Query::FixedSizeCurrentBatch,
query: Query::LeaderSelectedCurrentBatch,
agg_param: agg_param.get_encoded().unwrap(),
};
let collect_uri = t
Expand Down Expand Up @@ -1247,7 +1247,7 @@ async fn fixed_size() {
.consume_encrypted_agg_shares(
&t.collector_hpke_receiver,
&t.task_id,
&BatchSelector::FixedSizeByBatchId { batch_id },
&BatchSelector::LeaderSelectedByBatchId { batch_id },
collection.report_count,
&agg_param,
collection.encrypted_agg_shares.to_vec(),
Expand Down Expand Up @@ -1302,7 +1302,7 @@ async fn fixed_size() {
// Collector: Get the collect URI.
let agg_param = DapAggregationParam::Empty;
let collect_req = CollectionReq {
query: Query::FixedSizeCurrentBatch,
query: Query::LeaderSelectedCurrentBatch,
agg_param: agg_param.get_encoded().unwrap(),
};
let collect_uri = t
Expand All @@ -1329,7 +1329,7 @@ async fn fixed_size() {
DapMediaType::CollectionReq,
None,
CollectionReq {
query: Query::FixedSizeByBatchId {
query: Query::LeaderSelectedByBatchId {
batch_id: prev_batch_id,
},
agg_param: Vec::new(),
Expand Down
6 changes: 3 additions & 3 deletions crates/daphne-server/tests/e2e/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ impl TestRunner {
Self::with(version, &DapQueryConfig::TimeInterval).await
}

pub async fn fixed_size(version: DapVersion) -> Self {
pub async fn leader_selected(version: DapVersion) -> Self {
Self::with(
version,
&DapQueryConfig::FixedSize {
&DapQueryConfig::LeaderSelected {
max_batch_size: Some(NonZeroU32::new(MAX_BATCH_SIZE).unwrap()),
},
)
Expand Down Expand Up @@ -176,7 +176,7 @@ impl TestRunner {

let (query_type, max_batch_size) = match t.task_config.query {
DapQueryConfig::TimeInterval => (1, None),
DapQueryConfig::FixedSize { max_batch_size } => (2, Some(max_batch_size)),
DapQueryConfig::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)),
};

const MAX_ATTEMPTS: usize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ mod tests {
"batch/IiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiI",
format!(
"{}",
DapBatchBucket::FixedSize {
DapBatchBucket::LeaderSelected {
batch_id: BatchId([34; 32]),
shard: 0,
}
Expand All @@ -164,7 +164,7 @@ mod tests {
"batch/IiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiIiI/shard/2323",
format!(
"{}",
DapBatchBucket::FixedSize {
DapBatchBucket::LeaderSelected {
batch_id: BatchId([34; 32]),
shard: 2323,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne/src/hpke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ mod test {
sender: DapAggregatorRole::Leader,
task_id: &crate::messages::TaskId(rand::random()),
agg_param: &crate::DapAggregationParam::Empty,
batch_selector: &crate::messages::BatchSelector::FixedSizeByBatchId {
batch_selector: &crate::messages::BatchSelector::LeaderSelectedByBatchId {
batch_id: BatchId(rand::random()),
},
};
Expand Down Expand Up @@ -696,7 +696,7 @@ mod test {
duration: rand::random(),
},
},
BatchSelector::FixedSizeByBatchId {
BatchSelector::LeaderSelectedByBatchId {
batch_id: BatchId(rand::random()),
},
];
Expand Down
42 changes: 22 additions & 20 deletions crates/daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ pub enum DapQueryConfig {
/// specified by the query.
TimeInterval,

/// The "fixed-size" query type where by the Leader assigns reports to arbitrary batches
/// identified by batch IDs. This type includes an optional maximum batch size: if set, then
/// The "leader-selected" query type where by the Leader assigns reports to arbitrary batches
/// identified by batch IDs. In draft-09 this type includes an optional maximum batch size: if set, then
/// Aggregators are meant to stop aggregating reports when this limit is reached.
FixedSize {
LeaderSelected {
#[serde(default)]
max_batch_size: Option<NonZeroU32>,
},
Expand All @@ -249,8 +249,8 @@ impl DapQueryConfig {
Self::TimeInterval { .. },
PartialBatchSelector::TimeInterval
) | (
Self::FixedSize { .. },
PartialBatchSelector::FixedSizeByBatchId { .. }
Self::LeaderSelected { .. },
PartialBatchSelector::LeaderSelectedByBatchId { .. }
)
)
}
Expand All @@ -262,8 +262,8 @@ impl DapQueryConfig {
Self::TimeInterval { .. },
BatchSelector::TimeInterval { .. }
) | (
Self::FixedSize { .. },
BatchSelector::FixedSizeByBatchId { .. }
Self::LeaderSelected { .. },
BatchSelector::LeaderSelectedByBatchId { .. }
)
)
}
Expand All @@ -273,7 +273,7 @@ impl std::fmt::Display for DapQueryConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimeInterval => write!(f, "time_interval"),
Self::FixedSize { .. } => write!(f, "fixed_size"),
Self::LeaderSelected { .. } => write!(f, "leader_selected"),
}
}
}
Expand All @@ -282,12 +282,12 @@ impl std::fmt::Display for DapQueryConfig {
///
/// A bucket is the smallest, disjoint set of reports that can be queried: For time-interval
/// queries, the bucket to which a report is assigned is determined by truncating its timestamp by
/// the task's `time_precision` parameter; for fixed-size queries, the span consists of a single
/// the task's `time_precision` parameter; for `leader_selected` queries, the span consists of a single
/// bucket, which is the batch determined by the batch ID (i.e., the partial batch selector).
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
pub enum DapBatchBucket {
FixedSize { batch_id: BatchId, shard: usize },
LeaderSelected { batch_id: BatchId, shard: usize },
TimeInterval { batch_window: Time, shard: usize },
}

Expand All @@ -298,7 +298,7 @@ impl DapBatchBucket {
batch_window: _,
shard,
}
| Self::FixedSize { batch_id: _, shard } => *shard,
| Self::LeaderSelected { batch_id: _, shard } => *shard,
}
}
}
Expand All @@ -307,7 +307,7 @@ impl std::fmt::Display for DapBatchBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimeInterval { batch_window, .. } => write!(f, "window/{batch_window}")?,
Self::FixedSize { batch_id, .. } => write!(f, "batch/{batch_id}")?,
Self::LeaderSelected { batch_id, .. } => write!(f, "batch/{batch_id}")?,
};

let shard = self.shard();
Expand Down Expand Up @@ -389,10 +389,12 @@ impl DapAggregateSpan<DapAggregateShare> {
batch_window: task_config.quantized_time_lower_bound(time),
shard,
},
PartialBatchSelector::FixedSizeByBatchId { batch_id } => DapBatchBucket::FixedSize {
batch_id: *batch_id,
shard,
},
PartialBatchSelector::LeaderSelectedByBatchId { batch_id } => {
DapBatchBucket::LeaderSelected {
batch_id: *batch_id,
shard,
}
}
};

let (agg_share, reports) = self.span.entry(bucket).or_default();
Expand Down Expand Up @@ -707,10 +709,10 @@ impl DapTaskConfig {
}
Ok(span)
}
BatchSelector::FixedSizeByBatchId { batch_id } => {
BatchSelector::LeaderSelectedByBatchId { batch_id } => {
let mut span = HashSet::with_capacity(num_agg_span_shards);
for shard in 0..num_agg_span_shards {
span.insert(DapBatchBucket::FixedSize {
span.insert(DapBatchBucket::LeaderSelected {
batch_id: *batch_id,
shard,
});
Expand All @@ -727,7 +729,7 @@ impl DapTaskConfig {
report_count: u64,
) -> Result<bool, DapAbort> {
match self.query {
DapQueryConfig::FixedSize {
DapQueryConfig::LeaderSelected {
max_batch_size: Some(max_batch_size),
} => {
if report_count > u64::from(max_batch_size.get()) {
Expand All @@ -740,7 +742,7 @@ impl DapTaskConfig {
}
}
DapQueryConfig::TimeInterval
| DapQueryConfig::FixedSize {
| DapQueryConfig::LeaderSelected {
max_batch_size: None,
} => (),
};
Expand Down
Loading

0 comments on commit 129cdc6

Please sign in to comment.