Skip to content

Commit

Permalink
Add CPU offload server
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Jan 15, 2025
1 parent 7c5209a commit b01c4e4
Show file tree
Hide file tree
Showing 29 changed files with 1,389 additions and 384 deletions.
733 changes: 409 additions & 324 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

[package]
Expand All @@ -17,7 +17,7 @@ all-features = true

[dependencies]
daphne = { path = "../daphne" }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests", "compute-offload"] }
either.workspace = true
futures.workspace = true
hex.workspace = true
Expand All @@ -33,6 +33,7 @@ tokio.workspace = true
tower = { workspace = true, features = ["util"] }
tracing.workspace = true
url.workspace = true
rayon.workspace = true

[dependencies.axum]
workspace = true
Expand Down
7 changes: 7 additions & 0 deletions crates/daphne-server/docker/example-service.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ RUN sed -i 's/localhost/leader_storage/g' configuration.toml
COPY --from=builder /dap/target/debug/examples/service .

ENTRYPOINT ["./service"]

FROM debian:bookworm AS compute-offload

COPY ./crates/daphne-server/examples/configuration-cpu-offload.toml configuration.toml
COPY --from=builder /dap/target/debug/examples/service .

ENTRYPOINT ["./service"]
34 changes: 34 additions & 0 deletions crates/daphne-server/examples/configuration-cpu-offload.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

port = 5000

# None of these settings are relevant and can be deleted later when
# daphne-server stops being an aggregator
[storage_proxy]
url = "http://localhost:4001"
# SECRET: This is a test secret. In production, we'll generate and securely provision the token.
auth_token = 'this-is-the-storage-proxy-auth-token'

[service]
env = "oxy"
role = "helper"
max_batch_duration = 360000
min_batch_interval_start = 259200
max_batch_interval_end = 259200
supported_hpke_kems = ["x25519_hkdf_sha256"]
default_version = "v09"
report_storage_epoch_duration = 300000
base_url = "http://127.0.0.1:8788"
default_num_agg_span_shards = 4

[service.taskprov]
peer_auth.leader.expected_token = "I-am-the-leader" # SECRET
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET
hpke_collector_config = """{
"id": 23,
"kem_id": "p256_hkdf_sha256",
"kdf_id": "hkdf_sha256",
"aead_id": "aes128_gcm",
"public_key": "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466"
}"""
4 changes: 2 additions & 2 deletions crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use daphne::{

#[async_trait]
impl DapHelper for crate::App {
async fn assert_agg_job_is_immutable(
async fn assert_agg_job_is_legal(
&self,
_id: AggregationJobId,
_version: DapVersion,
_task_id: &TaskId,
_req: &AggregationJobRequestHash,
_req_hash: &AggregationJobRequestHash,
) -> Result<(), DapError> {
// the server implementation can't check for this
Ok(())
Expand Down
84 changes: 84 additions & 0 deletions crates/daphne-server/src/router/compute_offload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use crate::App;
use axum::{async_trait, extract::FromRequest, response::IntoResponse, routing::post};
use daphne::{error::DapAbort, InitializedReport};
use daphne_service_utils::{
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt},
compute_offload::{InitializeReports, InitializedReports},
};
use http::StatusCode;
use prio::codec::ParameterizedDecode;
use rayon::iter::{IntoParallelIterator as _, ParallelIterator};

pub(super) fn add_routes(router: super::Router<App>) -> super::Router<App> {
router.route(
"/compute_offload/initialize_reports",
post(initialize_reports),
)
}

struct CapnprotoExtractor<T>(T);

#[async_trait]
impl<S, T> FromRequest<S> for CapnprotoExtractor<T>
where
T: CapnprotoPayloadDecode,
{
type Rejection = StatusCode;

async fn from_request(
req: http::Request<axum::body::Body>,
_state: &S,
) -> Result<Self, Self::Rejection> {
let bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
let t = T::decode_from_bytes(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;

Ok(CapnprotoExtractor(t))
}
}

#[tracing::instrument(skip_all, fields(%task_id, report_count = prep_inits.len()))]
async fn initialize_reports(
CapnprotoExtractor(InitializeReports {
hpke_keys,
valid_report_range,
task_id,
task_config,
agg_param,
prep_inits,
}): CapnprotoExtractor<InitializeReports<'static>>,
) -> impl IntoResponse {
tracing::info!("initializing reports");
let initialized_reports = prep_inits
.into_par_iter()
.map(|prep_init| {
InitializedReport::from_leader(
&hpke_keys.as_ref(),
valid_report_range.clone(),
&task_id,
&task_config,
prep_init.report_share,
prep_init.payload,
&daphne::DapAggregationParam::get_decoded_with_param(&task_config.vdaf, &agg_param)
.map_err(|e| DapAbort::from_codec_error(e, task_id))?,
)
})
.collect::<Result<Vec<_>, _>>();

match initialized_reports {
Ok(reports) => {
let body = InitializedReports {
vdaf: task_config.vdaf.into_owned(),
reports,
}
.encode_to_bytes();

(StatusCode::OK, body).into_response()
}
Err(error) => (StatusCode::BAD_REQUEST, axum::Json(error)).into_response(),
}
}
5 changes: 4 additions & 1 deletion crates/daphne-server/src/router/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

mod aggregator;
mod compute_offload;
mod extractor;
mod helper;
mod leader;
Expand Down Expand Up @@ -99,6 +100,8 @@ pub fn new(role: DapAggregatorRole, aggregator: App) -> axum::Router<()> {
DapAggregatorRole::Helper => helper::add_helper_routes(router),
};

let router = compute_offload::add_routes(router);

#[cfg(feature = "test-utils")]
let router = test_routes::add_test_routes(router, role);

Expand Down
2 changes: 2 additions & 0 deletions crates/daphne-service-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ daphne = { path = "../daphne", default-features = false }
prio_draft09 = { workspace = true, optional = true }
prio = { workspace = true, optional = true }
serde.workspace = true
serde_json = { workspace = true, optional = true }
url = { workspace = true, optional = true }

[dev-dependencies]
Expand All @@ -28,6 +29,7 @@ capnpc = { workspace = true, optional = true }

[features]
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
compute-offload = ["dep:capnp", "dep:capnpc", "dep:serde_json", "dep:prio"]
durable_requests = [
"dep:capnp",
"dep:capnpc",
Expand Down
23 changes: 16 additions & 7 deletions crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
// SPDX-License-Identifier: BSD-3-Clause

fn main() {
#[cfg(feature = "durable_requests")]
::capnpc::CompilerCommand::new()
.file("./src/capnproto/base.capnp")
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.run()
.expect("compiling schema");
#[cfg(any(feature = "durable_requests", feature = "compute-offload"))]
{
let mut compiler = ::capnpc::CompilerCommand::new();

compiler.file("./src/capnproto/base.capnp");

#[cfg(feature = "durable_requests")]
compiler
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");

#[cfg(feature = "compute-offload")]
compiler.file("./src/compute_offload/compute_offload.capnp");

compiler.run().expect("compiling schema");
}
}
120 changes: 120 additions & 0 deletions crates/daphne-service-utils/src/compute_offload/compute_offload.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

@0xd932f3d934afce3b;

# Utilities

using Base = import "../capnproto/base.capnp";

using VdafConfig = Text; # json encoded
using VdafVerifyKey = Base.U8L32;

struct TimeRange @0xf0d27aaa9b1959f7 {
start @0 :UInt64;
end @1 :UInt64;
}

# Top level message
struct InitializeReports @0x90aadb2f44c9fb78 {
hpkeKeys @0 :List(HpkeReceiverConfig);
validReportRange @1 :TimeRange;
taskId @2 :Base.TaskId;
taskConfig @3 :PartialDapTaskConfig;
aggParam @4 :Data; # encoded
prepInits @5 :List(PrepareInit);
}

struct HpkeReceiverConfig @0xeec9b4a50458edb7 {
struct HpkeConfig @0xa546066418a5cdc7 {
enum HpkeKemId @0xf4bbeaed8d1fd18a {
p256HkdfSha256 @0; x25519HkdfSha256 @1;
}
enum HpkeKdfId @0x9336afc63df27ba3 { hkdfSha256 @0; }
enum HpkeAeadId @0xd68d403e118c806c { aes128Gcm @0; }

id @0 :UInt8;
kemId @1 :HpkeKemId;
kdfId @2 :HpkeKdfId;
aeadId @3 :HpkeAeadId;
publicKey @4 :Data;
}

config @0 :HpkeConfig;
privateKey @1 :Data;
}

struct PartialDapTaskConfig @0xdcc9bf18fc62d406 {

version @0 :Base.DapVersion;
methodIsTaskprov @1 :Bool;
notAfter @2 :Base.Time;
vdaf @3 :VdafConfig;
vdafVerifyKey @4 :VdafVerifyKey;
}

struct ReportMetadata @0xefba178ad4584bc4 {

id @0 :Base.ReportId;
time @1 :Base.Time;
}

struct PrepareInit @0x8192568cb3d03f59 {

struct HpkeCiphertext @0xf0813319decf7eaf {
configId @0 :UInt8;
enc @1 :Data;
payload @2 :Data;
}

struct ReportShare @0xb4134aa2db41ef60 {
reportMetadata @0 :ReportMetadata;
publicShare @1 :Data;
encryptedInputShare @2 :HpkeCiphertext;
}

reportShare @0 :ReportShare;
payload @1 :Data;
}



struct InitializedReports {
struct InitializedReport {
using VdafPrepShare = Data;
using VdafPrepState = Data;

enum ReportError {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


union {
ready :group {
metadata @0 :ReportMetadata;
publicShare @1 :Data;
prepShare @2 :VdafPrepShare;
prepState @3 :VdafPrepState;
peerPrepShare @4 :Data;
}
rejected :group {
metadata @5 :ReportMetadata;
failure @6 :ReportError;
}
}
}

vdafConfig @0 :VdafConfig;
reports @1 :List(InitializedReport);
}
Loading

0 comments on commit b01c4e4

Please sign in to comment.