Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the async worker #760

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-subscriber = "0.3.18"
url = { version = "2.5.4", features = ["serde"] }
wasm-bindgen = "0.2.99"
webpki = "0.22.4"
worker = { version = "0.5", features = ["http"] }
worker = "0.5"
x509-parser = "0.15.1"

[workspace.dependencies.sentry]
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ helper:
-c ./crates/daphne-server/examples/configuration-helper.toml
h: helper

compute-offload:
RUST_LOG=hyper=off,debug cargo run \
--profile release-symbols \
--features test-utils \
--example service \
-- \
-c ./crates/daphne-server/examples/configuration-cpu-offload.toml
co: compute-offload


helper-worker:
cd ./crates/daphne-worker-test/ && \
wrangler dev -c wrangler.aggregator.toml --port 8788 -e helper
Expand Down
45 changes: 37 additions & 8 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use daphne::{
BatchId, BatchSelector, PartialBatchSelector, TaskId,
},
metrics::DaphneMetrics,
protocol::ReadyAggregationJobResp,
testing::report_generator::ReportGenerator,
vdaf::VdafConfig,
DapAggregateShare, DapAggregateSpan, DapAggregationParam, DapBatchMode, DapMeasurement,
Expand Down Expand Up @@ -511,7 +512,7 @@ impl Test {
let _guard = load_control.wait().await;
info!("Starting AggregationJobInitReq");
let start = Instant::now();
let agg_job_resp = self
let mut agg_job_resp = self
.http_client
.submit_aggregation_job_init_req(
self.helper_url.join(&format!(
Expand All @@ -528,14 +529,42 @@ impl Test {
)
.await?;
let duration = start.elapsed();
info!("Finished AggregationJobInitReq in {duration:#?}");
info!("Finished submitting AggregationJobInitReq in {duration:#?}");
let mut poll_count = 1;
let ready = loop {
agg_job_resp = match agg_job_resp {
messages::AggregationJobResp::Ready { prep_resps } => {
if poll_count != 1 {
info!(
"Finished polling for AggregationJobResp after {:#?}",
start.elapsed()
);
}
break ReadyAggregationJobResp { prep_resps };
}
messages::AggregationJobResp::Processing => {
if poll_count == 1 {
info!("Polling for AggregationJobResp");
}
tokio::time::sleep(Duration::from_millis(poll_count * 200)).await;
poll_count += 1;
self.http_client
.poll_aggregation_job_init(
self.helper_url
.join(&format!("tasks/{task_id}/aggregation_jobs/{agg_job_id}"))?,
task_config.version,
functions::helper::Options {
taskprov_advertisement: taskprov_advertisement.as_ref(),
bearer_token: self.bearer_token.as_ref(),
},
)
.await?
}
};
};

let agg_share_span = task_config.consume_agg_job_resp(
task_id,
agg_job_state,
agg_job_resp.unwrap_ready(), // TODO: implement polling
self.metrics(),
)?;
let agg_share_span =
task_config.consume_agg_job_resp(task_id, agg_job_state, ready, self.metrics())?;

let aggregated_report_count = agg_share_span
.iter()
Expand Down
12 changes: 10 additions & 2 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use std::{future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};
Expand Down Expand Up @@ -79,6 +79,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn get_agg_share(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<DapAggregateShare, DapError> {
Expand Down Expand Up @@ -115,6 +116,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn mark_collected(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<(), DapError> {
Expand Down Expand Up @@ -255,6 +257,7 @@ impl DapAggregator for crate::App {

async fn is_batch_overlapping(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<bool, DapError> {
Expand Down Expand Up @@ -288,7 +291,12 @@ impl DapAggregator for crate::App {
)
}

async fn batch_exists(&self, task_id: &TaskId, batch_id: &BatchId) -> Result<bool, DapError> {
async fn batch_exists(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_id: &BatchId,
) -> Result<bool, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
Expand Down
12 changes: 11 additions & 1 deletion crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use axum::async_trait;
use daphne::{
messages::{AggregationJobId, TaskId},
fatal_error,
messages::{AggregationJobId, AggregationJobResp, TaskId},
roles::{helper::AggregationJobRequestHash, DapHelper},
DapError, DapVersion,
};
Expand All @@ -20,4 +21,13 @@ impl DapHelper for crate::App {
// the server implementation can't check for this
Ok(())
}

async fn poll_aggregated(
&self,
_version: DapVersion,
_task_id: &TaskId,
_agg_job_id: &AggregationJobId,
) -> Result<AggregationJobResp, DapError> {
Err(fatal_error!(err = "polling not implemented"))
}
}
4 changes: 4 additions & 0 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl DapLeader for crate::App {
{
self.send_http(meta, Method::PUT, url, payload).await
}

async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
self.send_http(meta, Method::PUT, url, ()).await
}
}

impl crate::App {
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl DecodeFromDapHttpBody for HashedAggregationJobReq {
}
}

/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for CollectionPollReq {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Ok(Self)
}
}

/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for () {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Expand Down
5 changes: 4 additions & 1 deletion crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ fn main() {
#[cfg(feature = "durable_requests")]
compiler
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.file("./src/durable_requests/bindings/aggregate_store_v2.capnp")
.file("./src/durable_requests/bindings/agg_job_response_store.capnp")
.file("./src/durable_requests/bindings/replay_checker.capnp");

#[cfg(feature = "compute-offload")]
compiler.file("./src/compute_offload/compute_offload.capnp");
Expand Down
18 changes: 17 additions & 1 deletion crates/daphne-service-utils/src/capnproto/base.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,29 @@ struct U8L16 @0x9e3f65b13f71cfcb {
snd @1 :UInt64;
}

struct PartialBatchSelector {
struct PartialBatchSelector @0xae86084e56c22fc0 {
union {
timeInterval @0 :Void;
leaderSelectedByBatchId @1 :BatchId;
}
}

enum ReportError @0xa76428617779e659 {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


using ReportId = U8L16;
using BatchId = U8L32;
using TaskId = U8L32;
Expand Down
39 changes: 39 additions & 0 deletions crates/daphne-service-utils/src/capnproto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::base_capnp::{self, partial_batch_selector, u8_l16, u8_l32};
use capnp::struct_list;
use capnp::traits::{FromPointerBuilder, FromPointerReader};
use daphne::messages;
use daphne::{
messages::{AggregationJobId, BatchId, PartialBatchSelector, ReportId, TaskId},
DapVersion,
Expand Down Expand Up @@ -204,6 +205,44 @@ impl CapnprotoPayloadDecode for PartialBatchSelector {
}
}

impl From<messages::ReportError> for base_capnp::ReportError {
fn from(failure: messages::ReportError) -> Self {
match failure {
messages::ReportError::Reserved => Self::Reserved,
messages::ReportError::BatchCollected => Self::BatchCollected,
messages::ReportError::ReportReplayed => Self::ReportReplayed,
messages::ReportError::ReportDropped => Self::ReportDropped,
messages::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
messages::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
messages::ReportError::VdafPrepError => Self::VdafPrepError,
messages::ReportError::BatchSaturated => Self::BatchSaturated,
messages::ReportError::TaskExpired => Self::TaskExpired,
messages::ReportError::InvalidMessage => Self::InvalidMessage,
messages::ReportError::ReportTooEarly => Self::ReportTooEarly,
messages::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

impl From<base_capnp::ReportError> for messages::ReportError {
fn from(val: base_capnp::ReportError) -> Self {
match val {
base_capnp::ReportError::Reserved => Self::Reserved,
base_capnp::ReportError::BatchCollected => Self::BatchCollected,
base_capnp::ReportError::ReportReplayed => Self::ReportReplayed,
base_capnp::ReportError::ReportDropped => Self::ReportDropped,
base_capnp::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
base_capnp::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
base_capnp::ReportError::VdafPrepError => Self::VdafPrepError,
base_capnp::ReportError::BatchSaturated => Self::BatchSaturated,
base_capnp::ReportError::TaskExpired => Self::TaskExpired,
base_capnp::ReportError::InvalidMessage => Self::InvalidMessage,
base_capnp::ReportError::ReportTooEarly => Self::ReportTooEarly,
base_capnp::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

pub fn encode_list<I, O>(list: I, mut builder: struct_list::Builder<'_, O>)
where
I: IntoIterator<Item: CapnprotoPayloadEncode>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

@0xd932f3d934afce3b;

# Utilities

using Base = import "../capnproto/base.capnp";

using VdafConfig = Text; # json encoded
Expand Down Expand Up @@ -94,27 +92,11 @@ struct PrepareInit @0x8192568cb3d03f59 {



struct InitializedReports {
struct InitializedReport {
struct InitializedReports @0xf36341397ae4a146 {
struct InitializedReport @0xfa833aa6b5d03d6d {
using VdafPrepShare = Data;
using VdafPrepState = Data;

enum ReportError {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


union {
ready :group {
metadata @0 :ReportMetadata;
Expand All @@ -125,7 +107,7 @@ struct InitializedReports {
}
rejected :group {
metadata @5 :ReportMetadata;
failure @6 :ReportError;
failure @6 :Base.ReportError;
}
}
}
Expand Down
38 changes: 0 additions & 38 deletions crates/daphne-service-utils/src/compute_offload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,44 +479,6 @@ impl CapnprotoPayloadDecode for InitializedReports {
}
}

impl From<messages::ReportError> for initialized_report::ReportError {
fn from(failure: messages::ReportError) -> Self {
match failure {
messages::ReportError::Reserved => Self::Reserved,
messages::ReportError::BatchCollected => Self::BatchCollected,
messages::ReportError::ReportReplayed => Self::ReportReplayed,
messages::ReportError::ReportDropped => Self::ReportDropped,
messages::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
messages::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
messages::ReportError::VdafPrepError => Self::VdafPrepError,
messages::ReportError::BatchSaturated => Self::BatchSaturated,
messages::ReportError::TaskExpired => Self::TaskExpired,
messages::ReportError::InvalidMessage => Self::InvalidMessage,
messages::ReportError::ReportTooEarly => Self::ReportTooEarly,
messages::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

impl From<initialized_report::ReportError> for messages::ReportError {
fn from(val: initialized_report::ReportError) -> Self {
match val {
initialized_report::ReportError::Reserved => Self::Reserved,
initialized_report::ReportError::BatchCollected => Self::BatchCollected,
initialized_report::ReportError::ReportReplayed => Self::ReportReplayed,
initialized_report::ReportError::ReportDropped => Self::ReportDropped,
initialized_report::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
initialized_report::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
initialized_report::ReportError::VdafPrepError => Self::VdafPrepError,
initialized_report::ReportError::BatchSaturated => Self::BatchSaturated,
initialized_report::ReportError::TaskExpired => Self::TaskExpired,
initialized_report::ReportError::InvalidMessage => Self::InvalidMessage,
initialized_report::ReportError::ReportTooEarly => Self::ReportTooEarly,
initialized_report::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

fn to_capnp<E: ToString>(e: E) -> capnp::Error {
capnp::Error {
kind: capnp::ErrorKind::Failed,
Expand Down
Loading
Loading