Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement draft-ietf-ppm-dap-taskprov-01. #3509

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ url.workspace = true
uuid = { workspace = true, features = ["serde"] }

[dev-dependencies]
janus_aggregator = { path = ".", features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator = { workspace = true, features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator_core = { workspace = true, features = ["test-util"] }
mockito = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["testing"] }
Expand Down
49 changes: 16 additions & 33 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use janus_aggregator_core::{
},
Datastore, Error as DatastoreError, Transaction,
},
task::{self, AggregatorTask, VerifyKey},
task::{self, AggregatorTask, BatchMode, VerifyKey},
taskprov::PeerAggregator,
};
#[cfg(feature = "fpvec_bounded_l2")]
Expand All @@ -65,7 +65,7 @@ use janus_core::{
};
use janus_messages::{
batch_mode::{LeaderSelected, TimeInterval},
taskprov::{DpMechanism, TaskConfig},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, ExtensionType, HpkeConfig,
Expand Down Expand Up @@ -691,44 +691,26 @@ impl<C: Clock> Aggregator<C> {
// TODO(#1647): Check whether task config parameters are acceptable for privacy and
// availability of the system.

if let DpMechanism::Unrecognized { .. } =
task_config.vdaf_config().dp_config().dp_mechanism()
{
if !self
.cfg
.taskprov_config
.ignore_unknown_differential_privacy_mechanism
{
return Err(Error::InvalidTask(
*task_id,
OptOutReason::InvalidParameter("unrecognized DP mechanism".into()),
));
}
}

let vdaf_instance =
task_config
.vdaf_config()
.vdaf_type()
.try_into()
.map_err(|err: &str| {
Error::InvalidTask(*task_id, OptOutReason::InvalidParameter(err.to_string()))
})?;
let vdaf_instance = task_config.vdaf_config().try_into().map_err(|err: &str| {
Error::InvalidTask(*task_id, OptOutReason::InvalidParameter(err.to_string()))
})?;

let vdaf_verify_key = peer_aggregator.derive_vdaf_verify_key(task_id, &vdaf_instance);

let task_end = task_config.task_start().add(task_config.task_duration())?;

let task = Arc::new(
AggregatorTask::new(
*task_id,
leader_url,
task_config.query_config().query().try_into()?,
BatchMode::try_from(*task_config.batch_mode())?,
vdaf_instance,
vdaf_verify_key,
None, // TODO(#3636): update taskprov implementation to specify task start
Some(*task_config.task_end()),
Some(*task_config.task_start()),
Some(task_end),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
*task_config.query_config().time_precision(),
u64::from(*task_config.min_batch_size()),
*task_config.time_precision(),
*peer_aggregator.tolerable_clock_skew(),
task::AggregatorTaskParameters::TaskprovHelper,
)
Expand Down Expand Up @@ -795,7 +777,8 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_end() {
let task_end = task_config.task_start().add(task_config.task_duration())?;
if self.clock.now() > task_end {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be checking report timestamps against task_start + task_duration. The spec requirement is "Aggregators MUST reject reports that have timestamps later than the end time, and MAY choose to opt out of the task if task_duration is too long." However, that means we can't take care of this check in this function, and instead would have to handle it separately in the aggregation endpoint and upload endpoint (if we ever implement it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is implementing the requirement that "A protocol participant MUST opt out if ... [t]he task has ended."[1] I think using the server's clock, rather than the timestamp from a given report, is appropriate for this specific check.

You're correct that we should also be checking each report's timestamp against the end (and start) of the task, and we MUST reject reports that don't fall into the acceptable time interval. Those checks are implemented in handle_upload_generic.

Interestingly, I don't think the equivalent Helper-side timestamp check has ever been implemented, though we do implement checking that the report isn't from too far in the future. I filed #3524 for this, arguing that it is not mandatory. Still, it's a small change, so I'll address it if I have time.

[1] https://www.ietf.org/archive/id/draft-ietf-ppm-dap-taskprov-01.html#name-opting-into-a-task

return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded));
}

Expand Down Expand Up @@ -2098,7 +2081,7 @@ impl VdafOps {

if require_taskprov_extension {
let valid_taskprov_extension_present = extensions
.get(&ExtensionType::Taskprov)
.get(&ExtensionType::Taskbind)
.map(|data| data.is_empty())
.unwrap_or(false);
if !valid_taskprov_extension_present {
Expand All @@ -2117,7 +2100,7 @@ impl VdafOps {
);
return Err(ReportError::InvalidMessage);
}
} else if extensions.contains_key(&ExtensionType::Taskprov) {
} else if extensions.contains_key(&ExtensionType::Taskbind) {
// taskprov not enabled, but the taskprov extension is present.
debug!(
task_id = %task.id(),
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn aggregation_job_init_unexpected_taskprov_extension() {
.prepare_init_generator
.clone()
.with_private_extensions(Vec::from([Extension::new(
ExtensionType::Taskprov,
ExtensionType::Taskbind,
Vec::new(),
)]))
.next(&0)
Expand Down
66 changes: 32 additions & 34 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument};
use async_trait::async_trait;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_aggregator_core::{datastore::Datastore, instrumented};
use janus_aggregator_core::{datastore::Datastore, instrumented, taskprov::taskprov_task_id};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
http::extract_bearer_token,
Expand All @@ -25,10 +25,9 @@ use opentelemetry::{
KeyValue,
};
use prio::codec::Encode;
use ring::digest::{digest, SHA256};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{borrow::Cow, time::Duration as StdDuration};
use std::{io::Cursor, sync::Arc};
use tracing::warn;
use trillium::{Conn, Handler, KnownHeaderName, Status};
use trillium_api::{api, State, TryFromConn};
Expand Down Expand Up @@ -757,38 +756,37 @@ fn parse_taskprov_header<C: Clock>(
task_id: &TaskId,
conn: &Conn,
) -> Result<Option<TaskConfig>, Error> {
if aggregator.cfg.taskprov_config.enabled {
match conn.request_headers().get(TASKPROV_HEADER) {
Some(taskprov_header) => {
let task_config_encoded =
&URL_SAFE_NO_PAD.decode(taskprov_header).map_err(|_| {
Error::InvalidMessage(
Some(*task_id),
"taskprov header could not be decoded",
)
})?;

if task_id.as_ref() != digest(&SHA256, task_config_encoded).as_ref() {
Err(Error::InvalidMessage(
Some(*task_id),
"derived taskprov task ID does not match task config",
))
} else {
// TODO(#1684): Parsing the taskprov header like this before we've been able
// to actually authenticate the client is undesireable. We should rework this
// such that the authorization header is handled before parsing the untrusted
// input.
Ok(Some(
TaskConfig::decode(&mut Cursor::new(task_config_encoded))
.map_err(Error::MessageDecode)?,
))
}
}
None => Ok(None),
}
} else {
Ok(None)
if !aggregator.cfg.taskprov_config.enabled {
return Ok(None);
}

let taskprov_header = match conn.request_headers().get(TASKPROV_HEADER) {
Some(taskprov_header) => taskprov_header,
None => return Ok(None),
};

let task_config_encoded = URL_SAFE_NO_PAD.decode(taskprov_header).map_err(|_| {
Error::InvalidMessage(
Some(*task_id),
"taskprov header could not be base64-decoded",
)
})?;

// Compute expected task ID & verify it matches the task ID from the request.
let expected_task_id = taskprov_task_id(&task_config_encoded);
if task_id != &expected_task_id {
return Err(Error::InvalidMessage(
Some(*task_id),
"derived taskprov task ID does not match task config",
));
}

// TODO(#1684): Parsing the taskprov header like this before we've been able to actually
// authenticate the client is undesireable. We should rework this such that the authorization
// header is handled before parsing the untrusted input.
Ok(Some(
TaskConfig::get_decoded(&task_config_encoded).map_err(Error::MessageDecode)?,
))
}

struct BodyBytes(Vec<u8>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ async fn hpke_config_with_taskprov() {
.unwrap();

let cfg = Config {
taskprov_config: TaskprovConfig {
enabled: true,
ignore_unknown_differential_privacy_mechanism: false,
},
taskprov_config: TaskprovConfig { enabled: true },
hpke_config_signing_key: Some(hpke_config_signing_key()),
..Default::default()
};
Expand Down
Loading
Loading