Skip to content

Commit

Permalink
Replace DapSender with DapRole
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Nov 27, 2024
1 parent d310786 commit 61db156
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 48 deletions.
13 changes: 7 additions & 6 deletions crates/daphne-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::sync::Arc;
use config::{DaphneServiceConfig, PeerBearerToken};
use daphne::{
audit_log::{AuditLog, NoopAuditLog},
constants::DapRole,
fatal_error,
messages::{Base64Encode, TaskId},
roles::{leader::in_memory_leader::InMemoryLeaderState, DapAggregator},
DapError, DapSender,
DapError,
};
use daphne_service_utils::bearer_token::BearerToken;
use either::Either::{self, Left, Right};
Expand Down Expand Up @@ -114,7 +115,7 @@ impl router::DaphneService for App {
async fn check_bearer_token(
&self,
presented_token: &BearerToken,
sender: DapSender,
sender: DapRole,
task_id: TaskId,
is_taskprov: bool,
) -> Result<(), Either<String, DapError>> {
Expand All @@ -131,16 +132,16 @@ impl router::DaphneService for App {
.filter(|_| self.service_config.taskprov.is_some() && is_taskprov)
{
match (&taskprov.peer_auth, sender) {
(PeerBearerToken::Leader { expected_token }, DapSender::Leader)
| (PeerBearerToken::Collector { expected_token }, DapSender::Collector)
(PeerBearerToken::Leader { expected_token }, DapRole::Leader)
| (PeerBearerToken::Collector { expected_token }, DapRole::Collector)
if expected_token == presented_token =>
{
Ok(())
}
(PeerBearerToken::Leader { .. }, DapSender::Collector) => Err(Right(fatal_error!(
(PeerBearerToken::Leader { .. }, DapRole::Collector) => Err(Right(fatal_error!(
err = "expected a leader sender but got a collector sender"
))),
(PeerBearerToken::Collector { .. }, DapSender::Leader) => Err(Right(fatal_error!(
(PeerBearerToken::Collector { .. }, DapRole::Leader) => Err(Right(fatal_error!(
err = "expected a collector sender but got a leader sender"
))),
_ => reject(format_args!("using taskprov")),
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{borrow::Cow, time::Instant};

use axum::{async_trait, http::Method};
use daphne::{
constants::DapMediaType,
constants::{DapMediaType, DapRole},
error::DapAbort,
fatal_error,
messages::{BatchId, BatchSelector, Collection, CollectionJobId, Report, TaskId},
Expand Down Expand Up @@ -171,7 +171,7 @@ impl crate::App {
}
} else if let Some(bearer_token) = self
.bearer_tokens()
.get(daphne::DapSender::Leader, meta.task_id)
.get(DapRole::Leader, meta.task_id)
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get leader bearer token"))?
{
Expand Down
22 changes: 11 additions & 11 deletions crates/daphne-server/src/roles/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use daphne::{messages::TaskId, DapSender, ReplayProtection};
use daphne::{constants::DapRole, messages::TaskId, ReplayProtection};
use daphne_service_utils::bearer_token::BearerToken;

use crate::storage_proxy_connection::{
Expand Down Expand Up @@ -54,12 +54,12 @@ impl BearerTokens<'_> {
#[cfg(feature = "test-utils")]
pub async fn put_if_not_exists(
&self,
role: DapSender,
sender: DapRole,
task_id: TaskId,
token: BearerToken,
) -> Result<Option<BearerToken>, storage_proxy_connection::Error> {
self.kv
.put_if_not_exists::<kv::prefix::KvBearerToken>(&(role, task_id).into(), token)
.put_if_not_exists::<kv::prefix::KvBearerToken>(&(sender, task_id).into(), token)
.await
}

Expand All @@ -72,13 +72,13 @@ impl BearerTokens<'_> {
/// - `Err(error)` if any io error occurs while fetching
pub async fn matches(
&self,
role: DapSender,
sender: DapRole,
task_id: TaskId,
token: &BearerToken,
) -> Result<bool, Marc<storage_proxy_connection::Error>> {
self.kv
.peek::<kv::prefix::KvBearerToken, _, _>(
&(role, task_id).into(),
&(sender, task_id).into(),
&kv::KvGetOptions {
cache_not_found: false,
},
Expand All @@ -90,12 +90,12 @@ impl BearerTokens<'_> {

pub async fn get(
&self,
role: DapSender,
sender: DapRole,
task_id: TaskId,
) -> Result<Option<BearerToken>, Marc<storage_proxy_connection::Error>> {
self.kv
.get_cloned::<kv::prefix::KvBearerToken>(
&(role, task_id).into(),
&(sender, task_id).into(),
&kv::KvGetOptions {
cache_not_found: false,
},
Expand All @@ -107,13 +107,13 @@ impl BearerTokens<'_> {
#[cfg(feature = "test-utils")]
mod test_utils {
use daphne::{
constants::DapAggregatorRole,
constants::{DapAggregatorRole, DapRole},
fatal_error,
hpke::{HpkeConfig, HpkeReceiverConfig},
messages::decode_base64url_vec,
roles::DapAggregator,
vdaf::{Prio3Config, VdafConfig},
DapError, DapQueryConfig, DapSender, DapTaskConfig, DapVersion,
DapError, DapQueryConfig, DapTaskConfig, DapVersion,
};
use daphne_service_utils::{
bearer_token::BearerToken,
Expand Down Expand Up @@ -236,7 +236,7 @@ mod test_utils {
let token = BearerToken::from(cmd.leader_authentication_token);
if self
.bearer_tokens()
.put_if_not_exists(DapSender::Leader, cmd.task_id, token)
.put_if_not_exists(DapRole::Leader, cmd.task_id, token)
.await
.is_err()
{
Expand All @@ -252,7 +252,7 @@ mod test_utils {
let token = BearerToken::from(token_string);
if self
.bearer_tokens()
.put_if_not_exists(DapSender::Collector, cmd.task_id, token)
.put_if_not_exists(DapRole::Collector, cmd.task_id, token)
.await
.is_err()
{
Expand Down
16 changes: 8 additions & 8 deletions crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ pub mod dap_sender {
pub const FROM_HELPER: DapSender = 1 << 2;
pub const FROM_LEADER: DapSender = 1 << 3;

pub const fn to_enum(id: DapSender) -> daphne::DapSender {
pub const fn to_enum(id: DapSender) -> daphne::constants::DapRole {
match id {
FROM_CLIENT => daphne::DapSender::Client,
FROM_COLLECTOR => daphne::DapSender::Collector,
FROM_HELPER => daphne::DapSender::Helper,
FROM_LEADER => daphne::DapSender::Leader,
FROM_CLIENT => daphne::constants::DapRole::Client,
FROM_COLLECTOR => daphne::constants::DapRole::Collector,
FROM_HELPER => daphne::constants::DapRole::Helper,
FROM_LEADER => daphne::constants::DapRole::Leader,
_ => panic!("invalid dap sender. Please specify a valid dap_sender from the crate::extractor::dap_sender module"),
}
}
Expand Down Expand Up @@ -344,14 +344,14 @@ mod test {
};
use daphne::{
async_test_versions,
constants::DapMediaType,
constants::{DapMediaType, DapRole},
messages::{
request::{CollectionPollReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregationJobId, AggregationJobInitReq, Base64Encode, CollectionJobId, CollectionReq,
TaskId,
},
DapError, DapRequest, DapRequestMeta, DapSender, DapVersion,
DapError, DapRequest, DapRequestMeta, DapVersion,
};
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
use either::Either::{self, Left};
Expand Down Expand Up @@ -409,7 +409,7 @@ mod test {
async fn check_bearer_token(
&self,
token: &BearerToken,
_sender: DapSender,
_sender: DapRole,
_task_id: TaskId,
_is_taskprov: bool,
) -> Result<(), Either<String, DapError>> {
Expand Down
11 changes: 7 additions & 4 deletions crates/daphne-server/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use axum::{
Json,
};
use daphne::{
constants::DapAggregatorRole, error::DapAbort, fatal_error, messages::TaskId, DapError,
DapRequestMeta, DapResponse, DapSender,
constants::{DapAggregatorRole, DapRole},
error::DapAbort,
fatal_error,
messages::TaskId,
DapError, DapRequestMeta, DapResponse,
};
use daphne_service_utils::bearer_token::BearerToken;
use either::Either;
Expand Down Expand Up @@ -49,7 +52,7 @@ pub trait DaphneService {
async fn check_bearer_token(
&self,
presented_token: &BearerToken,
sender: DapSender,
sender: DapRole,
task_id: TaskId,
is_taskprov: bool,
) -> Result<(), Either<String, DapError>>;
Expand All @@ -74,7 +77,7 @@ where
async fn check_bearer_token(
&self,
presented_token: &BearerToken,
sender: DapSender,
sender: DapRole,
task_id: TaskId,
is_taskprov: bool,
) -> Result<(), Either<String, DapError>> {
Expand Down
17 changes: 9 additions & 8 deletions crates/daphne-server/src/storage_proxy_connection/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ pub mod prefix {
};

use daphne::{
constants::DapRole,
messages::{Base64Encode, TaskId},
taskprov, DapSender, DapTaskConfig, DapVersion,
taskprov, DapTaskConfig, DapVersion,
};
use daphne_service_utils::bearer_token::BearerToken;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -134,9 +135,9 @@ pub mod prefix {
}

#[derive(Debug)]
pub struct KvBearerTokenKey(DapSender, TaskId);
impl From<(DapSender, TaskId)> for KvBearerTokenKey {
fn from((s, t): (DapSender, TaskId)) -> Self {
pub struct KvBearerTokenKey(DapRole, TaskId);
impl From<(DapRole, TaskId)> for KvBearerTokenKey {
fn from((s, t): (DapRole, TaskId)) -> Self {
Self(s, t)
}
}
Expand All @@ -145,10 +146,10 @@ pub mod prefix {
let Self(sender, task_id) = self;
let task_id = task_id.to_base64url();
match sender {
DapSender::Client => write!(f, "client/task/{task_id}"),
DapSender::Collector => write!(f, "collector/task/{task_id}"),
DapSender::Helper => write!(f, "helper/task/{task_id}"),
DapSender::Leader => write!(f, "leader/task/{task_id}"),
DapRole::Client => write!(f, "client/task/{task_id}"),
DapRole::Collector => write!(f, "collector/task/{task_id}"),
DapRole::Helper => write!(f, "helper/task/{task_id}"),
DapRole::Leader => write!(f, "leader/task/{task_id}"),
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions crates/daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,15 +963,6 @@ impl DapAggregateShare {
}
}

/// DAP sender role.
#[derive(Clone, Copy, Debug)]
pub enum DapSender {
Client,
Collector,
Helper,
Leader,
}

/// Status of a collect job.
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
Expand Down

0 comments on commit 61db156

Please sign in to comment.