Skip to content

Commit a34470b

Browse files
netsiriusgithub-actions[bot]iduartgomezclaude
authored
feat: implement transaction atomicity with parent-child relationship for sub-operations (#2009)
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: nacho.d.g <[email protected]> Co-authored-by: Ignacio Duart <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent abd0c7b commit a34470b

File tree

14 files changed

+1459
-540
lines changed

14 files changed

+1459
-540
lines changed

Cargo.lock

Lines changed: 382 additions & 368 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/freenet-ping/Cargo.lock

Lines changed: 127 additions & 56 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/config/secret.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::path::Path;
22

3-
use aes_gcm::KeyInit;
4-
use blake3::traits::digest::generic_array::GenericArray;
3+
#[allow(deprecated)]
4+
use aes_gcm::{aead::generic_array::GenericArray, KeyInit};
55
use chacha20poly1305::{XChaCha20Poly1305, XNonce};
66
use freenet_stdlib::client_api::DelegateRequest;
77
use rsa::pkcs8::DecodePrivateKey;
@@ -148,6 +148,7 @@ impl Default for Secrets {
148148
}
149149
}
150150

151+
#[allow(deprecated)]
151152
impl Secrets {
152153
#[inline]
153154
pub fn nonce(&self) -> XNonce {

crates/core/src/contract/executor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::path::PathBuf;
99
use std::sync::Arc;
1010
use std::time::{Duration, Instant};
1111

12-
use blake3::traits::digest::generic_array::GenericArray;
1312
use either::Either;
1413
use freenet_stdlib::client_api::{
1514
ClientError as WsClientError, ClientRequest, ContractError as StdContractError,

crates/core/src/contract/executor/runtime.rs

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -371,25 +371,29 @@ impl ContractExecutor for Executor<Runtime> {
371371
cipher,
372372
nonce,
373373
} => {
374-
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
375-
let key = delegate.key().clone();
376-
let arr = GenericArray::from_slice(&cipher);
377-
let cipher = XChaCha20Poly1305::new(arr);
378-
let nonce = GenericArray::from_slice(&nonce).to_owned();
379-
if let Some(contract) = attested_contract {
380-
self.delegate_attested_ids
381-
.entry(key.clone())
382-
.or_default()
383-
.push(*contract);
384-
}
385-
match self.runtime.register_delegate(delegate, cipher, nonce) {
386-
Ok(_) => Ok(DelegateResponse {
387-
key,
388-
values: Vec::new(),
389-
}),
390-
Err(err) => {
391-
tracing::warn!("failed registering delegate `{key}`: {err}");
392-
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
374+
#[allow(deprecated)]
375+
{
376+
use aes_gcm::aead::generic_array::GenericArray;
377+
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
378+
let key = delegate.key().clone();
379+
let arr = GenericArray::from_slice(&cipher);
380+
let cipher = XChaCha20Poly1305::new(arr);
381+
let nonce = GenericArray::from_slice(&nonce).to_owned();
382+
if let Some(contract) = attested_contract {
383+
self.delegate_attested_ids
384+
.entry(key.clone())
385+
.or_default()
386+
.push(*contract);
387+
}
388+
match self.runtime.register_delegate(delegate, cipher, nonce) {
389+
Ok(_) => Ok(DelegateResponse {
390+
key,
391+
values: Vec::new(),
392+
}),
393+
Err(err) => {
394+
tracing::warn!("failed registering delegate `{key}`: {err}");
395+
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
396+
}
393397
}
394398
}
395399
}
@@ -657,25 +661,29 @@ impl Executor<Runtime> {
657661
cipher,
658662
nonce,
659663
} => {
660-
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
661-
let key = delegate.key().clone();
662-
let arr = GenericArray::from_slice(&cipher);
663-
let cipher = XChaCha20Poly1305::new(arr);
664-
let nonce = GenericArray::from_slice(&nonce).to_owned();
665-
if let Some(contract) = attested_contract {
666-
self.delegate_attested_ids
667-
.entry(key.clone())
668-
.or_default()
669-
.push(*contract);
670-
}
671-
match self.runtime.register_delegate(delegate, cipher, nonce) {
672-
Ok(_) => Ok(DelegateResponse {
673-
key,
674-
values: Vec::new(),
675-
}),
676-
Err(err) => {
677-
tracing::warn!("failed registering delegate `{key}`: {err}");
678-
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
664+
#[allow(deprecated)]
665+
{
666+
use aes_gcm::aead::generic_array::GenericArray;
667+
use chacha20poly1305::{KeyInit, XChaCha20Poly1305};
668+
let key = delegate.key().clone();
669+
let arr = GenericArray::from_slice(&cipher);
670+
let cipher = XChaCha20Poly1305::new(arr);
671+
let nonce = GenericArray::from_slice(&nonce).to_owned();
672+
if let Some(contract) = attested_contract {
673+
self.delegate_attested_ids
674+
.entry(key.clone())
675+
.or_default()
676+
.push(*contract);
677+
}
678+
match self.runtime.register_delegate(delegate, cipher, nonce) {
679+
Ok(_) => Ok(DelegateResponse {
680+
key,
681+
values: Vec::new(),
682+
}),
683+
Err(err) => {
684+
tracing::warn!("failed registering delegate `{key}`: {err}");
685+
Err(ExecutorError::other(StdDelegateError::RegisterError(key)))
686+
}
679687
}
680688
}
681689
}

crates/core/src/message.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,39 @@ use ulid::Ulid;
3535
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy)]
3636
pub struct Transaction {
3737
id: Ulid,
38+
/// Parent transaction ID for child operations spawned by this transaction.
39+
/// Enables atomicity tracking for composite operations (e.g., PUT with SUBSCRIBE).
40+
parent: Option<Ulid>,
3841
}
3942

4043
impl Transaction {
41-
pub const NULL: &'static Transaction = &Transaction { id: Ulid(0) };
44+
pub const NULL: &'static Transaction = &Transaction {
45+
id: Ulid(0),
46+
parent: None,
47+
};
4248

4349
pub(crate) fn new<T: TxType>() -> Self {
4450
let ty = <T as TxType>::tx_type_id();
4551
let id = Ulid::new();
46-
Self::update(ty.0, id)
47-
// Self { id }
52+
Self::update(ty.0, id, None)
53+
}
54+
55+
/// Creates a child transaction with the specified type, linked to the parent
56+
/// for atomicity tracking in composite operations.
57+
pub(crate) fn new_child_of<T: TxType>(parent: &Transaction) -> Self {
58+
let ty = <T as TxType>::tx_type_id();
59+
let id = Ulid::new();
60+
Self::update(ty.0, id, Some(parent.id))
61+
}
62+
63+
/// Returns the parent transaction ID for child operations.
64+
pub fn parent_id(&self) -> Option<&Ulid> {
65+
self.parent.as_ref()
66+
}
67+
68+
/// Returns true if this transaction is a child operation.
69+
pub fn is_sub_operation(&self) -> bool {
70+
self.parent.is_some()
4871
}
4972

5073
pub(crate) fn transaction_type(&self) -> TransactionType {
@@ -100,18 +123,24 @@ impl Transaction {
100123
// Clear the ts significant bits of the ULID and replace them with the new cutoff ts.
101124
const TIMESTAMP_MASK: u128 = 0x00000000000000000000FFFFFFFFFFFFFFFF;
102125
let new_ulid = (id.0 & TIMESTAMP_MASK) | ((ttl_epoch as u128) << 80);
103-
Self { id: Ulid(new_ulid) }
126+
Self {
127+
id: Ulid(new_ulid),
128+
parent: None,
129+
}
104130
}
105131

106-
fn update(ty: TransactionType, id: Ulid) -> Self {
132+
fn update(ty: TransactionType, id: Ulid, parent: Option<Ulid>) -> Self {
107133
const TYPE_MASK: u128 = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00u128;
108134
// Clear the last byte
109135
let cleared = id.0 & TYPE_MASK;
110136
// Set the last byte with the transaction type
111137
let updated = cleared | (ty as u8) as u128;
112138

113139
// 2 words size for 64-bits platforms
114-
Self { id: Ulid(updated) }
140+
Self {
141+
id: Ulid(updated),
142+
parent,
143+
}
115144
}
116145
}
117146

@@ -120,7 +149,7 @@ impl<'a> arbitrary::Arbitrary<'a> for Transaction {
120149
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
121150
let ty: TransactionTypeId = u.arbitrary()?;
122151
let bytes: u128 = Ulid::new().0;
123-
Ok(Self::update(ty.0, Ulid(bytes)))
152+
Ok(Self::update(ty.0, Ulid(bytes), None))
124153
}
125154
}
126155

@@ -509,9 +538,9 @@ mod tests {
509538
fn pack_transaction_type() {
510539
let ts_0 = Ulid::new();
511540
std::thread::sleep(Duration::from_millis(1));
512-
let tx = Transaction::update(TransactionType::Connect, Ulid::new());
541+
let tx = Transaction::update(TransactionType::Connect, Ulid::new(), None);
513542
assert_eq!(tx.transaction_type(), TransactionType::Connect);
514-
let tx = Transaction::update(TransactionType::Subscribe, Ulid::new());
543+
let tx = Transaction::update(TransactionType::Subscribe, Ulid::new(), None);
515544
assert_eq!(tx.transaction_type(), TransactionType::Subscribe);
516545
std::thread::sleep(Duration::from_millis(1));
517546
let ts_1 = Ulid::new();

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::{
4646
ring::PeerKeyLocation,
4747
tracing::NetEventLog,
4848
};
49+
use freenet_stdlib::client_api::{ContractResponse, HostResponse};
4950

5051
type P2pBridgeEvent = Either<(PeerId, Box<NetMessage>), NodeEvent>;
5152

@@ -797,22 +798,22 @@ impl P2pConnManager {
797798
key,
798799
subscribed,
799800
} => {
800-
tracing::info!("Received LocalSubscribeComplete event for transaction: {tx}, contract: {key}");
801-
802-
// Deliver SubscribeResponse directly to result router
803-
tracing::info!("Sending SubscribeResponse to result router for transaction: {tx}");
804-
use freenet_stdlib::client_api::{ContractResponse, HostResponse};
805-
let response = Ok(HostResponse::ContractResponse(
806-
ContractResponse::SubscribeResponse { key, subscribed },
807-
));
808-
809-
match op_manager.result_router_tx.send((tx, response)).await {
810-
Ok(()) => {
811-
tracing::info!("Successfully sent SubscribeResponse to result router for transaction: {tx}");
812-
// Clean up client subscription after successful delivery
813-
state.tx_to_client.remove(&tx);
801+
tracing::debug!(%tx, %key, "local subscribe complete");
802+
803+
if !op_manager.is_sub_operation(tx) {
804+
let response = Ok(HostResponse::ContractResponse(
805+
ContractResponse::SubscribeResponse { key, subscribed },
806+
));
807+
808+
match op_manager.result_router_tx.send((tx, response)).await {
809+
Ok(()) => {
810+
tracing::debug!(%tx, "sent subscribe response to client");
811+
state.tx_to_client.remove(&tx);
812+
}
813+
Err(e) => {
814+
tracing::error!(%tx, error = %e, "failed to send subscribe response")
815+
}
814816
}
815-
Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e),
816817
}
817818
}
818819
NodeEvent::Disconnect { cause } => {

0 commit comments

Comments
 (0)