Skip to content

Commit

Permalink
Implement the aggregation flow as in draft07
Browse files Browse the repository at this point in the history
In draft07, the Leader sends its prep share in the
AggregationJobInitReq; the Helper computes its prep share, computes the
prep message, and commits. For 1-round VDAFs, the flow is complete after
one request.

The main goal of this change is to implement this new control flow.
Other changes:

* Clean up and unify data structures for aggregation job state.

* Rename the `aggregation_job_continue_repeats_due_to_replays` metric
  (we need to do an analogous thing during AggregationJobInitReq
  processing).

* Regression: Remove the `handle_agg_job_req_init_expired_task` tests.
  Ideally we'd like to keep these, but it's difficult to do so without
  major refactoring.

* Ensure Leader handles an `Err` from `try_put_agg_share_span()` as
  fatal.

Work-in-progress: The code is not yet wire-compatible with draft07.
  • Loading branch information
cjpatton committed Oct 27, 2023
1 parent d88c55f commit 2834966
Show file tree
Hide file tree
Showing 10 changed files with 1,142 additions and 742 deletions.
2 changes: 1 addition & 1 deletion daphne/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ async-trait.workspace = true
assert_matches = { workspace = true, optional = true }
base64.workspace = true
deepsize = { version = "0.2.0", optional = true }
futures.workspace = true
hex.workspace = true
hpke-rs = { workspace = true, features = ["hazmat", "serialization"] }
hpke-rs-crypto.workspace = true
Expand All @@ -44,6 +43,7 @@ url.workspace = true
[dev-dependencies]
assert_matches.workspace = true
criterion.workspace = true
futures.workspace = true
matchit.workspace = true
paste.workspace = true
regex = "1.10.0"
Expand Down
8 changes: 4 additions & 4 deletions daphne/benches/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use daphne::{
hpke::HpkeKemId, testing::AggregationJobTest, DapLeaderTransition, DapMeasurement, DapVersion,
Prio3Config, VdafConfig,
hpke::HpkeKemId, testing::AggregationJobTest, DapLeaderAggregationJobTransition,
DapMeasurement, DapVersion, Prio3Config, VdafConfig,
};

fn handle_agg_job_init_req(c: &mut Criterion) {
Expand Down Expand Up @@ -38,15 +38,15 @@ fn handle_agg_job_init_req(c: &mut Criterion) {

let agg_job_init_req = rt.block_on(async {
let reports = agg_job_test.produce_reports(vec![measurement; batch_size]);
let DapLeaderTransition::Continue(_leader_state, agg_job_init_req) =
let DapLeaderAggregationJobTransition::Continued(_leader_state, agg_job_init_req) =
agg_job_test.produce_agg_job_init_req(reports).await
else {
panic!("unexpected transition");
};
agg_job_init_req
});

c.bench_function(&format!("handle_agg_job_init_req {vdaf:?}"), |b| {
c.bench_function(&format!("handle_agg_job_init_req {vdaf}"), |b| {
b.to_async(&rt).iter(|| async {
black_box(agg_job_test.handle_agg_job_init_req(&agg_job_init_req)).await
})
Expand Down
101 changes: 57 additions & 44 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,56 +561,74 @@ pub enum DapAggregateResult {
U128Vec(Vec<u128>),
}

/// The Leader's state after sending an AggregateInitReq.
#[derive(Debug)]
pub struct DapLeaderState {
pub(crate) seq: Vec<(VdafPrepState, VdafPrepMessage, Time, ReportId)>,
part_batch_sel: PartialBatchSelector,
#[derive(Clone)]
#[cfg_attr(any(test, feature = "test-utils"), derive(Debug, deepsize::DeepSizeOf))]
pub(crate) struct AggregationJobReportState {
// draft02 compatibility: The Leader does not transmit its prep share.
draft02_prep_share: Option<VdafPrepMessage>,
prep_state: VdafPrepState,
time: Time,
report_id: ReportId,
}

/// The Leader's state after sending an AggregateContReq.
#[derive(Debug)]
pub struct DapLeaderUncommitted {
pub(crate) seq: Vec<(DapOutputShare, ReportId)>,
/// Aggregator state during an aggregation job.
#[derive(Clone)]
#[cfg_attr(any(test, feature = "test-utils"), derive(Debug, deepsize::DeepSizeOf))]
pub struct DapAggregationJobState {
pub(crate) seq: Vec<AggregationJobReportState>,
part_batch_sel: PartialBatchSelector,
}

/// The Helper's state during the aggregation flow.
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
pub struct DapHelperState {
pub(crate) part_batch_sel: PartialBatchSelector,
pub(crate) seq: Vec<(VdafPrepState, Time, ReportId)>,
/// Leader state during an aggregation job in which it has computed the output shares but is
/// waiting for the Helper's response before it commits them.
#[derive(Debug)]
pub struct DapAggregationJobUncommitted {
pub(crate) seq: Vec<DapOutputShare>,
part_batch_sel: PartialBatchSelector,
}

impl Encode for DapHelperState {
impl Encode for DapAggregationJobState {
fn encode(&self, bytes: &mut Vec<u8>) {
self.part_batch_sel.encode(bytes);
for (state, time, report_id) in self.seq.iter() {
state.encode(bytes);
time.encode(bytes);
report_id.encode(bytes);
for report_state in self.seq.iter() {
if report_state.draft02_prep_share.is_some() {
// draft02 compatibility: The prep share is kept in this data structure for
// backwards compatibility. It's only used by the Leader, so we don't ever expect
// to encode it.
unreachable!("Tried to encode DapAggregationJobState with leader prep share");
}
report_state.prep_state.encode(bytes);
report_state.time.encode(bytes);
report_state.report_id.encode(bytes);
}
}
}

impl DapHelperState {
// TODO(cjpatton) Consider replacing this with an implementation of
// `ParameterizedDecode<VdafConfig>`. This requires changing the wire format to make the sequence
// of report states length-prefixed.
impl DapAggregationJobState {
/// Decode the Helper state from a byte string.
pub fn get_decoded(vdaf_config: &VdafConfig, data: &[u8]) -> Result<Self, DapError> {
let mut r = std::io::Cursor::new(data);
let part_batch_sel = PartialBatchSelector::decode(&mut r)
.map_err(|e| DapAbort::from_codec_error(e, None))?;
let mut seq = vec![];
while (r.position() as usize) < data.len() {
let state = VdafPrepState::decode_with_param(&(vdaf_config, false), &mut r)
let prep_state = VdafPrepState::decode_with_param(&(vdaf_config, false), &mut r)
.map_err(|e| DapAbort::from_codec_error(e, None))?;
let time = Time::decode(&mut r).map_err(|e| DapAbort::from_codec_error(e, None))?;
let report_id =
ReportId::decode(&mut r).map_err(|e| DapAbort::from_codec_error(e, None))?;
seq.push((state, time, report_id))
seq.push(AggregationJobReportState {
draft02_prep_share: None,
prep_state,
time,
report_id,
});
}

Ok(DapHelperState {
Ok(Self {
part_batch_sel,
seq,
})
Expand Down Expand Up @@ -721,31 +739,26 @@ impl DapAggregateShare {
}

/// Leader state transition during the aggregation flow.
#[derive(Debug)]
pub enum DapLeaderTransition<M: Debug> {
/// The Leader has produced the next outbound message and its state has been updated.
Continue(DapLeaderState, M),
#[cfg_attr(any(test, feature = "test-utils"), derive(Debug))]
pub enum DapLeaderAggregationJobTransition<M: Debug> {
/// Waiting for a response from the Helper.
Continued(DapAggregationJobState, M),

/// The leader has computed output shares, but is waiting on an AggregateResp from the hepler
/// before committing them.
Uncommitted(DapLeaderUncommitted, M),
/// Output shares computed, but waiting for a response from the Helper before committing.
Uncommitted(DapAggregationJobUncommitted, M),

/// The Leader has completed the aggregation flow without computing an aggregate share.
Skip,
/// Committed to the output shares.
Finished(DapAggregateSpan<DapAggregateShare>),
}

/// Helper state transition during the aggregation flow.
#[derive(Debug)]
pub enum DapHelperTransition<M: Debug> {
/// The Helper has produced the next outbound message and its state has been updated.
Continue(DapHelperState, M),
/// Helper state transitions during the aggregation flow.
#[cfg_attr(any(test, feature = "test-utils"), derive(Debug))]
pub enum DapHelperAggregationJobTransition<M: Debug> {
/// Waiting for a response from the Leader.
Continued(DapAggregationJobState, M),

/// The Helper has produced the last outbound message and has computed a sequence of output
/// shares.
//
// TODO Instead of merging all output shares into a single aggregate share, return a collection
// of aggregat shares, each corresponding to a different batch interval.
Finish(Vec<DapOutputShare>, M),
/// Committed to the output shares.
Finished(DapAggregateSpan<DapAggregateShare>, M),
}

/// Specification of a concrete VDAF.
Expand Down
Loading

0 comments on commit 2834966

Please sign in to comment.