Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove INITIALIZE and MARK_AGGREGATED from ReportsProcessed #415

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ prio = "0.15.3"
prometheus = "0.13.3"
rand = "0.8.5"
reqwest = "0.11.22"
replace_with = "0.1.7"
ring = "0.16.20"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
Expand Down
1 change: 1 addition & 0 deletions daphne/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ hpke-rs-rust-crypto.workspace = true
prio = { workspace = true, features = ["prio2"] }
prometheus.workspace = true
rand.workspace = true
replace_with.workspace = true
ring.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
67 changes: 48 additions & 19 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,32 @@ impl<T> Extend<(DapBatchBucket, (T, Vec<(ReportId, Time)>))> for DapAggregateSpa
}
}

impl FromIterator<(DapBatchBucket, (ReportId, Time))> for DapAggregateSpan<()> {
fn from_iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = (DapBatchBucket, (ReportId, Time))>,
{
let mut this = Self::default();
this.extend(iter);
this
}
}

impl Extend<(DapBatchBucket, (ReportId, Time))> for DapAggregateSpan<()> {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = (DapBatchBucket, (ReportId, Time))>,
{
for (k, v) in iter {
self.span
.entry(k)
.or_insert_with(|| ((), Vec::new()))
.1
.push(v);
}
}
}

/// Per-task DAP parameters.
#[derive(Clone, Deserialize, Serialize)]
pub struct DapTaskConfig {
Expand Down Expand Up @@ -484,32 +510,35 @@ impl DapTaskConfig {
&self,
part_batch_sel: &'sel PartialBatchSelector,
consumed_reports: impl Iterator<Item = &'rep EarlyReportStateConsumed<'rep>>,
) -> Result<HashMap<DapBatchBucket, Vec<&'rep EarlyReportStateConsumed<'rep>>>, DapError> {
) -> Result<DapAggregateSpan<()>, DapError> {
if !self.query.is_valid_part_batch_sel(part_batch_sel) {
return Err(fatal_error!(
err = "partial batch selector not compatible with task",
));
}
Ok(consumed_reports
.filter(|consumed_report| consumed_report.is_ready())
.map(|consumed_report| {
let bucket = self.bucket_for(part_batch_sel, consumed_report);
let metadata = consumed_report.metadata();
(bucket, (metadata.id.clone(), metadata.time))
})
.collect())
}

let mut span: HashMap<_, Vec<_>> = HashMap::new();
for consumed_report in consumed_reports.filter(|consumed_report| consumed_report.is_ready())
{
let bucket = match part_batch_sel {
PartialBatchSelector::TimeInterval => DapBatchBucket::TimeInterval {
batch_window: self.quantized_time_lower_bound(consumed_report.metadata().time),
},
PartialBatchSelector::FixedSizeByBatchId { batch_id } => {
DapBatchBucket::FixedSize {
batch_id: batch_id.clone(),
}
}
};

let consumed_reports_per_bucket = span.entry(bucket).or_default();
consumed_reports_per_bucket.push(consumed_report);
pub fn bucket_for(
&self,
part_batch_sel: &PartialBatchSelector,
consumed_report: &EarlyReportStateConsumed<'_>,
) -> DapBatchBucket {
match part_batch_sel {
PartialBatchSelector::TimeInterval => DapBatchBucket::TimeInterval {
batch_window: self.quantized_time_lower_bound(consumed_report.metadata().time),
},
PartialBatchSelector::FixedSizeByBatchId { batch_id } => DapBatchBucket::FixedSize {
batch_id: batch_id.clone(),
},
}

Ok(span)
}

/// Check if the batch size is too small. Returns an error if the report count is too large.
Expand Down
60 changes: 30 additions & 30 deletions daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod leader;

use crate::{
constants::DapMediaType,
messages::{BatchSelector, ReportMetadata, TaskId, Time, TransitionFailure},
messages::{BatchSelector, ReportMetadata, TaskId, Time},
taskprov::{self, TaskprovVersion},
DapAbort, DapError, DapQueryConfig, DapRequest, DapTaskConfig,
};
Expand Down Expand Up @@ -103,34 +103,6 @@ async fn check_batch<S>(
Ok(())
}

/// Check for transition failures due to:
///
/// * the report having already been processed
/// * the report having already been collected
/// * the report not being within time bounds
///
/// Returns `Some(TransitionFailure)` if there is a problem,
/// or `None` if no transition failure occurred.
pub fn early_metadata_check(
metadata: &ReportMetadata,
processed: bool,
collected: bool,
min_time: u64,
max_time: u64,
) -> Option<TransitionFailure> {
if processed {
Some(TransitionFailure::ReportReplayed)
} else if collected {
Some(TransitionFailure::BatchCollected)
} else if metadata.time < min_time {
Some(TransitionFailure::ReportDropped)
} else if metadata.time > max_time {
Some(TransitionFailure::ReportTooEarly)
} else {
None
}
}

fn check_request_content_type<S>(
req: &DapRequest<S>,
expected: DapMediaType,
Expand Down Expand Up @@ -195,7 +167,7 @@ async fn resolve_taskprov<S>(

#[cfg(test)]
mod test {
use super::{early_metadata_check, DapAggregator, DapAuthorizedSender, DapHelper, DapLeader};
use super::{DapAggregator, DapAuthorizedSender, DapHelper, DapLeader};
use crate::{
assert_metrics_include, async_test_version, async_test_versions,
auth::BearerToken,
Expand Down Expand Up @@ -234,6 +206,34 @@ mod test {
}};
}

/// Check for transition failures due to:
///
/// * the report having already been processed
/// * the report having already been collected
/// * the report not being within time bounds
///
/// Returns `Some(TransitionFailure)` if there is a problem,
/// or `None` if no transition failure occurred.
pub fn early_metadata_check(
metadata: &ReportMetadata,
processed: bool,
collected: bool,
min_time: u64,
max_time: u64,
) -> Option<TransitionFailure> {
if processed {
Some(TransitionFailure::ReportReplayed)
} else if collected {
Some(TransitionFailure::BatchCollected)
} else if metadata.time < min_time {
Some(TransitionFailure::ReportDropped)
} else if metadata.time > max_time {
Some(TransitionFailure::ReportTooEarly)
} else {
None
}
}

pub(super) struct TestData {
pub now: Time,
global_config: DapGlobalConfig,
Expand Down
22 changes: 9 additions & 13 deletions daphne/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
AggregationJobContinueReq, AggregationJobId, AggregationJobInitReq, AggregationJobResp,
BatchId, BatchSelector, Collection, CollectionJobId, CollectionReq,
Draft02AggregationJobId, HpkeCiphertext, Interval, PartialBatchSelector, Report, ReportId,
ReportMetadata, TaskId, Time, TransitionFailure,
TaskId, Time, TransitionFailure,
},
metrics::DaphneMetrics,
roles::{DapAggregator, DapAuthorizedSender, DapHelper, DapLeader, DapReportInitializer},
Expand Down Expand Up @@ -697,7 +697,7 @@ impl MockAggregator {
&self,
task_id: &TaskId,
bucket: &DapBatchBucket,
metadata: &ReportMetadata,
id: &ReportId,
) -> Option<TransitionFailure> {
// Check AggStateStore to see whether the report is part of a batch that has already
// been collected.
Expand All @@ -713,7 +713,7 @@ impl MockAggregator {
.lock()
.expect("report_store: failed to lock");
let report_store = guard.entry(task_id.clone()).or_default();
if report_store.processed.contains(&metadata.id) {
if report_store.processed.contains(id) {
return Some(TransitionFailure::ReportReplayed);
}

Expand Down Expand Up @@ -920,17 +920,13 @@ impl DapReportInitializer for MockAggregator {
)?;

let mut early_fails = HashMap::new();
for (bucket, reports_consumed_per_bucket) in span.iter() {
for metadata in reports_consumed_per_bucket
.iter()
.map(|report| report.metadata())
{
for (bucket, ((), report_ids_and_time)) in span.iter() {
for (id, _) in report_ids_and_time {
// Check whether Report has been collected or replayed.
if let Some(transition_failure) = self
.check_report_early_fail(task_id, bucket, metadata)
.await
if let Some(transition_failure) =
self.check_report_early_fail(task_id, bucket, id).await
{
early_fails.insert(metadata.id.clone(), transition_failure);
early_fails.insert(id.clone(), transition_failure);
};
}
}
Expand Down Expand Up @@ -1233,7 +1229,7 @@ impl DapLeader<BearerToken> for MockAggregator {

// Check whether Report has been collected or replayed.
if let Some(transition_failure) = self
.check_report_early_fail(task_id, &bucket, &report.report_metadata)
.check_report_early_fail(task_id, &bucket, &report.report_metadata.id)
.await
{
return Err(DapError::Transition(transition_failure));
Expand Down
27 changes: 27 additions & 0 deletions daphne/src/vdaf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use prio::{
},
};
use rand::prelude::*;
use replace_with::replace_with_or_abort;
use serde::{Deserialize, Serialize, Serializer};
use std::{
borrow::Cow,
Expand Down Expand Up @@ -185,6 +186,20 @@ impl<'req> EarlyReportStateConsumed<'req> {
input_share: input_share.payload,
})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docucomment.

/// Convert this EarlyReportStateConsumed into a rejected [EarlyReportStateInitialized] using
/// `failure` as the reason. If this is already a rejected report, the passed in `failure`
/// value overwrites the previous one.
pub fn into_initialized_rejected_due_to(
self,
failure: TransitionFailure,
) -> EarlyReportStateInitialized<'req> {
let metadata = match self {
Self::Ready { metadata, .. } => metadata,
Self::Rejected { metadata, .. } => metadata,
};
EarlyReportStateInitialized::Rejected { metadata, failure }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self was Self::Rejected, then we'll end up overwriting thefailure here. Is this intended behavior? If so, we should document why this is the behavior we want.

My gut feeling is we probably don't want to override the failure reason, but I could be wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should override, principle of least surprise, if I pass a parameter I expect it to be used

}
}

impl EarlyReportState for EarlyReportStateConsumed<'_> {
Expand Down Expand Up @@ -307,6 +322,18 @@ impl<'req> EarlyReportStateInitialized<'req> {
};
Ok(early_report_state_initialized)
}

/// Turn this report into a rejected report using `failure` as the reason for it's rejection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again: Just want to double check that overwriting the failure reason in case it was already rejected is intended.

pub fn reject_due_to(&mut self, failure: TransitionFailure) {
// this never aborts because the closure never panics
replace_with_or_abort(self, |self_| {
let metadata = match self_ {
Self::Rejected { metadata, .. } => metadata,
Self::Ready { metadata, .. } => metadata,
};
Self::Rejected { metadata, failure }
})
}
}

impl EarlyReportState for EarlyReportStateInitialized<'_> {
Expand Down
Loading
Loading