Skip to content

Commit

Permalink
Implement Leader async aggregation. (#3564)
Browse files Browse the repository at this point in the history
This change includes unit tests, but no integration tests -- those will
need to come with the Helper async aggregation implementation, as
without it we do not have anything to integration test against.

A few implementation notes:
 * I renamed the report aggregation states to better match their
   functionality (IMO).
 * If the Helper does not provide a retry-after header, the Leader will
   poll each "processing" aggregation job (at most) once per minute.
  • Loading branch information
branlwyd authored Dec 13, 2024
1 parent c26487c commit e4aab44
Show file tree
Hide file tree
Showing 17 changed files with 3,172 additions and 447 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
retry-after.workspace = true
sec1.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
207 changes: 108 additions & 99 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
auth_tokens::AuthenticationToken,
hpke::{self, HpkeApplicationInfo, Label},
retries::retry_http_request_notify,
retries::{retry_http_request_notify, HttpResponse},
time::{Clock, DurationExt, IntervalExt, TimeExt},
vdaf::{
new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128, vdaf_application_context,
Expand Down Expand Up @@ -1752,105 +1752,106 @@ impl VdafOps {
C: Clock,
for<'a> A::PrepareState: ParameterizedDecode<(&'a A, usize)>,
{
if let Some(existing_aggregation_job) = tx
let existing_aggregation_job = match tx
.get_aggregation_job::<SEED_SIZE, B, A>(task_id, aggregation_job_id)
.await?
{
if existing_aggregation_job.state() == &AggregationJobState::Deleted {
return Err(datastore::Error::User(
Error::DeletedAggregationJob(*task_id, *aggregation_job_id).into(),
));
}
Some(existing_aggregation_job) => existing_aggregation_job,
None => return Ok(None),
};

if existing_aggregation_job.last_request_hash() != Some(request_hash) {
if let Some(log_forbidden_mutations) = log_forbidden_mutations {
let original_report_ids: Vec<_> = tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
)
.await?
.iter()
.map(|ra| *ra.report_id())
.collect();
let mutating_request_report_ids: Vec<_> = req
.prepare_inits()
.iter()
.map(|pi| *pi.report_share().metadata().id())
.collect();
let event = AggregationJobInitForbiddenMutationEvent {
task_id: *task_id,
aggregation_job_id: *aggregation_job_id,
original_request_hash: existing_aggregation_job.last_request_hash(),
original_report_ids,
original_batch_id: format!(
"{:?}",
existing_aggregation_job.partial_batch_identifier()
),
original_aggregation_parameter: existing_aggregation_job
.aggregation_parameter()
.get_encoded()
.map_err(|e| datastore::Error::User(e.into()))?,
mutating_request_hash: Some(request_hash),
mutating_request_report_ids,
mutating_request_batch_id: format!(
"{:?}",
req.batch_selector().batch_identifier()
),
mutating_request_aggregation_parameter: req
.aggregation_parameter()
.to_vec(),
};
let event_id = crate::diagnostic::write_event(
log_forbidden_mutations,
"agg-job-illegal-mutation",
event,
)
.await
.map(|event_id| format!("{event_id:?}"))
.unwrap_or_else(|error| {
tracing::error!(?error, "failed to write hash mismatch event");
"no event id".to_string()
});

tracing::info!(
?event_id,
original_request_hash = existing_aggregation_job
.last_request_hash()
.map(hex::encode),
mutating_request_hash = hex::encode(request_hash),
"request hash mismatch on retried aggregation job request",
);
}
return Err(datastore::Error::User(
Error::ForbiddenMutation {
resource_type: "aggregation job",
identifier: aggregation_job_id.to_string(),
}
.into(),
));
}
if existing_aggregation_job.state() == &AggregationJobState::Deleted {
return Err(datastore::Error::User(
Error::DeletedAggregationJob(*task_id, *aggregation_job_id).into(),
));
}

// This is a repeated request. Send the same response we computed last time.
return Ok(Some(AggregationJobResp::Finished {
prepare_resps: tx
if existing_aggregation_job.last_request_hash() != Some(request_hash) {
if let Some(log_forbidden_mutations) = log_forbidden_mutations {
let original_report_ids: Vec<_> = tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
existing_aggregation_job.aggregation_parameter(),
)
.await?
.iter()
.filter_map(ReportAggregation::last_prep_resp)
.cloned()
.collect(),
}));
.map(|ra| *ra.report_id())
.collect();
let mutating_request_report_ids: Vec<_> = req
.prepare_inits()
.iter()
.map(|pi| *pi.report_share().metadata().id())
.collect();
let event = AggregationJobInitForbiddenMutationEvent {
task_id: *task_id,
aggregation_job_id: *aggregation_job_id,
original_request_hash: existing_aggregation_job.last_request_hash(),
original_report_ids,
original_batch_id: format!(
"{:?}",
existing_aggregation_job.partial_batch_identifier()
),
original_aggregation_parameter: existing_aggregation_job
.aggregation_parameter()
.get_encoded()
.map_err(|e| datastore::Error::User(e.into()))?,
mutating_request_hash: Some(request_hash),
mutating_request_report_ids,
mutating_request_batch_id: format!(
"{:?}",
req.batch_selector().batch_identifier()
),
mutating_request_aggregation_parameter: req.aggregation_parameter().to_vec(),
};
let event_id = crate::diagnostic::write_event(
log_forbidden_mutations,
"agg-job-illegal-mutation",
event,
)
.await
.map(|event_id| format!("{event_id:?}"))
.unwrap_or_else(|error| {
tracing::error!(?error, "failed to write hash mismatch event");
"no event id".to_string()
});

tracing::info!(
?event_id,
original_request_hash = existing_aggregation_job
.last_request_hash()
.map(hex::encode),
mutating_request_hash = hex::encode(request_hash),
"request hash mismatch on retried aggregation job request",
);
}
return Err(datastore::Error::User(
Error::ForbiddenMutation {
resource_type: "aggregation job",
identifier: aggregation_job_id.to_string(),
}
.into(),
));
}

Ok(None)
// This is a repeated request. Send the same response we computed last time.
return Ok(Some(AggregationJobResp::Finished {
prepare_resps: tx
.get_report_aggregations_for_aggregation_job(
vdaf,
&Role::Helper,
task_id,
aggregation_job_id,
existing_aggregation_job.aggregation_parameter(),
)
.await?
.iter()
.filter_map(ReportAggregation::last_prep_resp)
.cloned()
.collect(),
}));
}

/// Implements [helper aggregate initialization][1].
Expand Down Expand Up @@ -2202,7 +2203,7 @@ impl VdafOps {
// Helper is not finished. Await the next message from the Leader to advance to
// the next step.
(
ReportAggregationState::WaitingHelper { prepare_state },
ReportAggregationState::HelperContinue { prepare_state },
PrepareStepResult::Continue {
message: outgoing_message,
},
Expand Down Expand Up @@ -2421,22 +2422,24 @@ impl VdafOps {

Box::pin(async move {
// Read existing state.
let (aggregation_job, report_aggregations) = try_join!(
tx.get_aggregation_job::<SEED_SIZE, B, A>(task.id(), &aggregation_job_id),
tx.get_report_aggregations_for_aggregation_job(
let aggregation_job = tx
.get_aggregation_job::<SEED_SIZE, B, A>(task.id(), &aggregation_job_id)
.await?
.ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedAggregationJob(*task.id(), aggregation_job_id)
.into(),
)
})?;
let report_aggregations = tx
.get_report_aggregations_for_aggregation_job(
vdaf.as_ref(),
&Role::Helper,
task.id(),
&aggregation_job_id,
aggregation_job.aggregation_parameter(),
)
)?;

let aggregation_job = aggregation_job.ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedAggregationJob(*task.id(), aggregation_job_id)
.into(),
)
})?;
.await?;

// Deleted aggregation jobs cannot be stepped
if *aggregation_job.state() == AggregationJobState::Deleted {
Expand Down Expand Up @@ -3201,6 +3204,12 @@ fn write_task_aggregation_counter<C: Clock>(
task_id: TaskId,
counters: TaskAggregationCounter,
) {
if counters.is_zero() {
// Don't spawn a task or interact with the datastore if doing so won't change the state of
// the datastore.
return;
}

// We write task aggregation counters back in a separate tokio task & datastore transaction,
// so that any slowness induced by writing the counters (e.g. due to transaction retry) does
// not slow the main processing. The lack of transactionality between writing the updated
Expand Down Expand Up @@ -3354,7 +3363,7 @@ async fn send_request_to_helper(
request_body: Option<RequestBody>,
auth_token: &AuthenticationToken,
http_request_duration_histogram: &Histogram<f64>,
) -> Result<Bytes, Error> {
) -> Result<HttpResponse, Error> {
let (auth_header, auth_value) = auth_token.request_authentication();
let domain = Arc::from(url.domain().unwrap_or_default());
let method_str = Arc::from(method.as_str());
Expand Down Expand Up @@ -3383,7 +3392,7 @@ async fn send_request_to_helper(
// Successful response.
Ok(response) => {
timer.finish_attempt("success");
Ok(response.body().clone())
Ok(response)
}

// HTTP-level error.
Expand Down
16 changes: 9 additions & 7 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl VdafOps {
// the report was dropped (if it's not already in an error state) and continue.
if matches!(
report_agg.state(),
ReportAggregationState::WaitingHelper { .. }
ReportAggregationState::HelperContinue { .. }
) {
report_aggregations_to_write.push(WritableReportAggregation::new(
report_agg
Expand All @@ -99,11 +99,11 @@ impl VdafOps {
};

let prep_state = match report_aggregation.state() {
ReportAggregationState::WaitingHelper { prepare_state } => prepare_state.clone(),
ReportAggregationState::WaitingLeader { .. } => {
ReportAggregationState::HelperContinue { prepare_state } => prepare_state.clone(),
ReportAggregationState::LeaderContinue { .. } => {
return Err(datastore::Error::User(
Error::Internal(
"helper encountered unexpected ReportAggregationState::WaitingLeader"
"helper encountered unexpected ReportAggregationState::LeaderContinue"
.to_string(),
)
.into(),
Expand All @@ -128,7 +128,7 @@ impl VdafOps {
// the report was dropped (if it's not already in an error state) and continue.
if matches!(
report_aggregation.state(),
ReportAggregationState::WaitingHelper { .. }
ReportAggregationState::HelperContinue { .. }
) {
report_aggregations_to_write.push(WritableReportAggregation::new(
report_aggregation
Expand Down Expand Up @@ -189,7 +189,7 @@ impl VdafOps {
// state and await the next message from
// the Leader to advance preparation.
PingPongState::Continued(prepare_state) => (
ReportAggregationState::WaitingHelper {
ReportAggregationState::HelperContinue {
prepare_state,
},
None,
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {
*prepare_init.report_share().metadata().time(),
0,
None,
ReportAggregationState::WaitingHelper {
ReportAggregationState::HelperContinue {
prepare_state: *transcript.helper_prepare_transitions[0]
.prepare_state(),
},
Expand Down Expand Up @@ -743,6 +743,7 @@ mod tests {
&Role::Helper,
&task_id,
&aggregation_job_id,
&test_case.aggregation_parameter,
)
.await
.unwrap();
Expand Down Expand Up @@ -795,6 +796,7 @@ mod tests {
&Role::Helper,
&task_id,
&aggregation_job_id,
&test_case.aggregation_parameter,
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit e4aab44

Please sign in to comment.