Skip to content

Commit 18f42b7

Browse files
committed
Add CPU offload server
1 parent 6e576ff commit 18f42b7

File tree

22 files changed

+1379
-366
lines changed

22 files changed

+1379
-366
lines changed

Cargo.lock

+409-321
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/daphne-server/Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
# SPDX-License-Identifier: BSD-3-Clause
33

44
[package]
@@ -17,7 +17,7 @@ all-features = true
1717

1818
[dependencies]
1919
daphne = { path = "../daphne" }
20-
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] }
20+
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests", "compute-offload"] }
2121
either.workspace = true
2222
futures.workspace = true
2323
hex.workspace = true
@@ -33,6 +33,7 @@ tokio.workspace = true
3333
tower = { workspace = true, features = ["util"] }
3434
tracing.workspace = true
3535
url.workspace = true
36+
rayon.workspace = true
3637

3738
[dependencies.axum]
3839
workspace = true

crates/daphne-server/docker/example-service.Dockerfile

+7
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,10 @@ RUN sed -i 's/localhost/leader_storage/g' configuration.toml
3131
COPY --from=builder /dap/target/debug/examples/service .
3232

3333
ENTRYPOINT ["./service"]
34+
35+
FROM debian:bookworm AS compute-offload
36+
37+
COPY ./crates/daphne-server/examples/configuration-cpu-offload.toml configuration.toml
38+
COPY --from=builder /dap/target/debug/examples/service .
39+
40+
ENTRYPOINT ["./service"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
port = 5000
5+
6+
# None of these settings are relevant and can be deleted later when
7+
# daphne-server stops being an aggregator
8+
[storage_proxy]
9+
url = "http://localhost:4001"
10+
# SECRET: This is a test secret. In production, we'll generate and securely provision the token.
11+
auth_token = 'this-is-the-storage-proxy-auth-token'
12+
13+
[service]
14+
env = "oxy"
15+
role = "helper"
16+
max_batch_duration = 360000
17+
min_batch_interval_start = 259200
18+
max_batch_interval_end = 259200
19+
supported_hpke_kems = ["x25519_hkdf_sha256"]
20+
default_version = "v09"
21+
report_storage_epoch_duration = 300000
22+
base_url = "http://127.0.0.1:8788"
23+
default_num_agg_span_shards = 4
24+
25+
[service.taskprov]
26+
peer_auth.leader.expected_token = "I-am-the-leader" # SECRET
27+
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET
28+
hpke_collector_config = """{
29+
"id": 23,
30+
"kem_id": "p256_hkdf_sha256",
31+
"kdf_id": "hkdf_sha256",
32+
"aead_id": "aes128_gcm",
33+
"public_key": "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466"
34+
}"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
2+
// SPDX-License-Identifier: BSD-3-Clause
3+
4+
use crate::App;
5+
use axum::{async_trait, extract::FromRequest, response::IntoResponse, routing::post};
6+
use daphne::{error::DapAbort, InitializedReport};
7+
use daphne_service_utils::{
8+
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt},
9+
compute_offload::{InitializeReports, InitializedReports},
10+
};
11+
use http::StatusCode;
12+
use prio::codec::ParameterizedDecode;
13+
use rayon::iter::{IntoParallelIterator as _, ParallelIterator};
14+
15+
pub(super) fn add_routes(router: super::Router<App>) -> super::Router<App> {
16+
router.route(
17+
"/compute_offload/initialize_reports",
18+
post(initialize_reports),
19+
)
20+
}
21+
22+
struct CapnprotoExtractor<T>(T);
23+
24+
#[async_trait]
25+
impl<S, T> FromRequest<S> for CapnprotoExtractor<T>
26+
where
27+
T: CapnprotoPayloadDecode,
28+
{
29+
type Rejection = StatusCode;
30+
31+
async fn from_request(
32+
req: http::Request<axum::body::Body>,
33+
_state: &S,
34+
) -> Result<Self, Self::Rejection> {
35+
let bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
36+
.await
37+
.map_err(|_| StatusCode::BAD_REQUEST)?;
38+
let t = T::decode_from_bytes(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;
39+
40+
Ok(CapnprotoExtractor(t))
41+
}
42+
}
43+
44+
#[tracing::instrument(skip_all, fields(%task_id, report_count = prep_inits.len()))]
45+
async fn initialize_reports(
46+
CapnprotoExtractor(InitializeReports {
47+
hpke_keys,
48+
valid_report_range,
49+
task_id,
50+
task_config,
51+
agg_param,
52+
prep_inits,
53+
}): CapnprotoExtractor<InitializeReports<'static>>,
54+
) -> impl IntoResponse {
55+
tracing::info!("initializing reports");
56+
let initialized_reports = prep_inits
57+
.into_par_iter()
58+
.map(|prep_init| {
59+
InitializedReport::from_leader(
60+
&hpke_keys.as_ref(),
61+
valid_report_range.clone(),
62+
&task_id,
63+
&task_config,
64+
prep_init.report_share,
65+
prep_init.payload,
66+
&daphne::DapAggregationParam::get_decoded_with_param(&task_config.vdaf, &agg_param)
67+
.map_err(|e| DapAbort::from_codec_error(e, task_id))?,
68+
)
69+
})
70+
.collect::<Result<Vec<_>, _>>();
71+
72+
match initialized_reports {
73+
Ok(reports) => {
74+
let body = InitializedReports {
75+
vdaf: task_config.vdaf.into_owned(),
76+
reports,
77+
}
78+
.encode_to_bytes();
79+
80+
(StatusCode::OK, body).into_response()
81+
}
82+
Err(error) => (StatusCode::BAD_REQUEST, axum::Json(error)).into_response(),
83+
}
84+
}

crates/daphne-server/src/router/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
mod aggregator;
5+
mod compute_offload;
56
mod extractor;
67
mod helper;
78
mod leader;
@@ -99,6 +100,8 @@ pub fn new(role: DapAggregatorRole, aggregator: App) -> axum::Router<()> {
99100
DapAggregatorRole::Helper => helper::add_helper_routes(router),
100101
};
101102

103+
let router = compute_offload::add_routes(router);
104+
102105
#[cfg(feature = "test-utils")]
103106
let router = test_routes::add_test_routes(router, role);
104107

crates/daphne-service-utils/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ daphne = { path = "../daphne", default-features = false }
1717
prio_draft09 = { workspace = true, optional = true }
1818
prio = { workspace = true, optional = true }
1919
serde.workspace = true
20+
serde_json = { workspace = true, optional = true }
2021
url = { workspace = true, optional = true }
2122

2223
[dev-dependencies]
@@ -28,6 +29,7 @@ capnpc = { workspace = true, optional = true }
2829

2930
[features]
3031
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
32+
compute-offload = ["dep:capnp", "dep:capnpc", "dep:serde_json", "dep:prio"]
3133
durable_requests = [
3234
"dep:capnp",
3335
"dep:capnpc",

crates/daphne-service-utils/build.rs

+16-7
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
fn main() {
5-
#[cfg(feature = "durable_requests")]
6-
::capnpc::CompilerCommand::new()
7-
.file("./src/capnproto/base.capnp")
8-
.file("./src/durable_requests/durable_request.capnp")
9-
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
10-
.run()
11-
.expect("compiling schema");
5+
#[cfg(any(feature = "durable_requests", feature = "compute-offload"))]
6+
{
7+
let mut compiler = ::capnpc::CompilerCommand::new();
8+
9+
compiler.file("./src/capnproto/base.capnp");
10+
11+
#[cfg(feature = "durable_requests")]
12+
compiler
13+
.file("./src/durable_requests/durable_request.capnp")
14+
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
15+
16+
#[cfg(feature = "compute-offload")]
17+
compiler.file("./src/compute_offload/compute_offload.capnp");
18+
19+
compiler.run().expect("compiling schema");
20+
}
1221
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
@0xd932f3d934afce3b;
5+
6+
# Utilities
7+
8+
using Base = import "../capnproto/base.capnp";
9+
10+
using VdafConfig = Text; # json encoded
11+
12+
struct TimeRange @0xf0d27aaa9b1959f7 {
13+
start @0 :UInt64;
14+
end @1 :UInt64;
15+
}
16+
17+
# Top level message
18+
struct InitializeReports @0x90aadb2f44c9fb78 {
19+
hpkeKeys @0 :List(HpkeReceiverConfig);
20+
validReportRange @1 :TimeRange;
21+
taskId @2 :Base.TaskId;
22+
taskConfig @3 :PartialDapTaskConfig;
23+
aggParam @4 :Data; # encoded
24+
prepInits @5 :List(PrepareInit);
25+
}
26+
27+
struct HpkeReceiverConfig @0xeec9b4a50458edb7 {
28+
struct HpkeConfig @0xa546066418a5cdc7 {
29+
enum HpkeKemId @0xf4bbeaed8d1fd18a {
30+
p256HkdfSha256 @0; x25519HkdfSha256 @1;
31+
}
32+
enum HpkeKdfId @0x9336afc63df27ba3 { hkdfSha256 @0; }
33+
enum HpkeAeadId @0xd68d403e118c806c { aes128Gcm @0; }
34+
35+
id @0 :UInt8;
36+
kemId @1 :HpkeKemId;
37+
kdfId @2 :HpkeKdfId;
38+
aeadId @3 :HpkeAeadId;
39+
publicKey @4 :Data;
40+
}
41+
42+
config @0 :HpkeConfig;
43+
privateKey @1 :Data;
44+
}
45+
46+
struct PartialDapTaskConfig @0xdcc9bf18fc62d406 {
47+
48+
version @0 :Base.DapVersion;
49+
methodIsTaskprov @1 :Bool;
50+
notAfter @2 :Base.Time;
51+
vdaf @3 :VdafConfig;
52+
vdafVerifyKey @4 :VdafVerifyKey;
53+
}
54+
55+
struct VdafVerifyKey @0xf890ee7dfa2e36b8 {
56+
union {
57+
l16 @0 :Base.U8L16;
58+
l32 @1 :Base.U8L32;
59+
}
60+
}
61+
62+
struct ReportMetadata @0xefba178ad4584bc4 {
63+
64+
id @0 :Base.ReportId;
65+
time @1 :Base.Time;
66+
}
67+
68+
struct PrepareInit @0x8192568cb3d03f59 {
69+
70+
struct HpkeCiphertext @0xf0813319decf7eaf {
71+
configId @0 :UInt8;
72+
enc @1 :Data;
73+
payload @2 :Data;
74+
}
75+
76+
struct ReportShare @0xb4134aa2db41ef60 {
77+
reportMetadata @0 :ReportMetadata;
78+
publicShare @1 :Data;
79+
encryptedInputShare @2 :HpkeCiphertext;
80+
}
81+
82+
reportShare @0 :ReportShare;
83+
payload @1 :Data;
84+
}
85+
86+
87+
88+
struct InitializedReports {
89+
struct InitializedReport {
90+
using VdafPrepShare = Data;
91+
using VdafPrepState = Data;
92+
93+
enum ReportError {
94+
reserved @0;
95+
batchCollected @1;
96+
reportReplayed @2;
97+
reportDropped @3;
98+
hpkeUnknownConfigId @4;
99+
hpkeDecryptError @5;
100+
vdafPrepError @6;
101+
batchSaturated @7;
102+
taskExpired @8;
103+
invalidMessage @9;
104+
reportTooEarly @10;
105+
taskNotStarted @11;
106+
}
107+
108+
109+
union {
110+
ready :group {
111+
metadata @0 :ReportMetadata;
112+
publicShare @1 :Data;
113+
prepShare @2 :VdafPrepShare;
114+
prepState @3 :VdafPrepState;
115+
peerPrepShare @4 :Data;
116+
}
117+
rejected :group {
118+
metadata @5 :ReportMetadata;
119+
failure @6 :ReportError;
120+
}
121+
}
122+
}
123+
124+
vdafConfig @0 :VdafConfig;
125+
reports @1 :List(InitializedReport);
126+
}

0 commit comments

Comments
 (0)