Skip to content

Commit 7b261e2

Browse files
committed
feat(rust): switching key negotiation to a double secure channel
1 parent c85eb64 commit 7b261e2

File tree

25 files changed

+371
-151
lines changed

25 files changed

+371
-151
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

implementations/rust/ockam/ockam_api/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ tracing-error = "0.2.0"
112112
tracing-opentelemetry = "0.27.0"
113113
tracing-subscriber = { version = "0.3", features = ["json"] }
114114
url = "2.5.2"
115-
zeroize = { version = "1.8.1", features = ["zeroize_derive"] }
116115

117116
ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.66.0", features = ["cbor", "serde"] }
118117
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.99.0" }

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs

+33-17
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ mod test {
251251
use crate::kafka::key_exchange::listener::KafkaKeyExchangeListener;
252252
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
253253
use crate::test_utils::{AuthorityConfiguration, TestNode};
254-
use crate::DefaultAddress;
255254
use ockam::identity::Identifier;
256255
use ockam_abac::{Action, Env, Resource, ResourceType};
257256
use ockam_core::compat::clock::test::TestClock;
@@ -266,7 +265,6 @@ mod test {
266265
pub fn rekey_rotation() -> ockam_core::Result<()> {
267266
let runtime = Arc::new(Runtime::new().unwrap());
268267
let runtime_cloned = runtime.clone();
269-
std::env::set_var("OCKAM_LOGGING", "false");
270268

271269
runtime_cloned.block_on(async move {
272270
let test_body = async move {
@@ -291,30 +289,33 @@ mod test {
291289
)
292290
.await;
293291

294-
let consumer_secure_channel_listener_flow_control_id = consumer_node
295-
.context
296-
.flow_controls()
297-
.get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into())
298-
.unwrap();
292+
let test_clock = TestClock::new(0);
299293

300294
KafkaKeyExchangeListener::create(
295+
test_clock.clone(),
301296
&consumer_node.context,
302297
consumer_node
303298
.node_manager
304299
.secure_channels
305300
.vault()
306301
.encryption_at_rest_vault,
302+
consumer_node
303+
.node_manager
304+
.secure_channels
305+
.vault()
306+
.secure_channel_vault,
307+
consumer_node
308+
.node_manager
309+
.secure_channels
310+
.secure_channel_registry(),
311+
Duration::from_secs(5 * 60), //rotation
312+
Duration::from_secs(10 * 60), //validity
307313
Duration::from_secs(60),
308-
Duration::from_secs(60),
309-
Duration::from_secs(60),
310-
&consumer_secure_channel_listener_flow_control_id,
311314
AllowAll,
312315
AllowAll,
313316
)
314317
.await?;
315318

316-
let test_clock = TestClock::new(0);
317-
318319
let destination = consumer_node.listen_address().await.multi_addr().unwrap();
319320
let producer_secure_channel_controller = create_secure_channel_controller(
320321
test_clock.clone(),
@@ -355,7 +356,10 @@ mod test {
355356
.await?;
356357

357358
assert_eq!(third_key.rekey_counter, 1);
358-
assert_eq!(first_key.secret_key_handle, third_key.secret_key_handle);
359+
assert_eq!(
360+
first_key.key_identifier_for_consumer,
361+
third_key.key_identifier_for_consumer
362+
);
359363

360364
// 04:00 - yet another rekey should happen, but no rotation
361365
test_clock.add_seconds(60 * 3);
@@ -365,7 +369,10 @@ mod test {
365369
.await?;
366370

367371
assert_eq!(fourth_key.rekey_counter, 2);
368-
assert_eq!(first_key.secret_key_handle, fourth_key.secret_key_handle);
372+
assert_eq!(
373+
first_key.key_identifier_for_consumer,
374+
fourth_key.key_identifier_for_consumer
375+
);
369376

370377
// 05:00 - the default duration of the key is 10 minutes,
371378
// but the rotation should happen after 5 minutes
@@ -375,7 +382,10 @@ mod test {
375382
.get_or_exchange_key(&mut producer_node.context, "topic_name")
376383
.await?;
377384

378-
assert_ne!(third_key.secret_key_handle, fifth_key.secret_key_handle);
385+
assert_ne!(
386+
third_key.key_identifier_for_consumer,
387+
fifth_key.key_identifier_for_consumer
388+
);
379389
assert_eq!(fifth_key.rekey_counter, 0);
380390

381391
// Now let's simulate a failure to rekey by shutting down the consumer
@@ -389,7 +399,10 @@ mod test {
389399
.await?;
390400

391401
assert_eq!(sixth_key.rekey_counter, 1);
392-
assert_eq!(fifth_key.secret_key_handle, sixth_key.secret_key_handle);
402+
assert_eq!(
403+
fifth_key.key_identifier_for_consumer,
404+
sixth_key.key_identifier_for_consumer
405+
);
393406

394407
// 10:00 - Rotation fails, but the existing key is still valid
395408
// and needs to be rekeyed
@@ -400,7 +413,10 @@ mod test {
400413
.await?;
401414

402415
assert_eq!(seventh_key.rekey_counter, 2);
403-
assert_eq!(fifth_key.secret_key_handle, seventh_key.secret_key_handle);
416+
assert_eq!(
417+
fifth_key.key_identifier_for_consumer,
418+
seventh_key.key_identifier_for_consumer
419+
);
404420

405421
// 15:00 - Rotation fails, and the existing key is no longer valid
406422
test_clock.add_seconds(60 * 5);

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/listener.rs

+68-19
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,42 @@
11
use crate::DefaultAddress;
22
use minicbor::{CborLen, Decode, Encode};
3-
use ockam::identity::TimestampInSeconds;
4-
use ockam_core::flow_control::FlowControlId;
3+
use ockam::identity::{
4+
SecureChannelApiRequest, SecureChannelApiResponse, SecureChannelRegistry, TimestampInSeconds,
5+
};
6+
use ockam_core::compat::clock::Clock;
7+
use ockam_core::errcode::{Kind, Origin};
58
use ockam_core::{
6-
async_trait, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
9+
async_trait, route, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
710
OutgoingAccessControl, Routed, Worker,
811
};
912
use ockam_node::{Context, WorkerBuilder};
10-
use ockam_vault::VaultForEncryptionAtRest;
11-
use rand::Rng;
13+
use ockam_vault::{VaultForEncryptionAtRest, VaultForSecureChannels};
1214
use std::sync::Arc;
1315
use std::time::Duration;
1416

1517
pub(crate) struct KafkaKeyExchangeListener {
1618
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
19+
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
20+
secure_channel_registry: SecureChannelRegistry,
1721
rekey_period: Duration,
1822
key_validity: Duration,
1923
key_rotation: Duration,
24+
clock: Box<dyn Clock>,
2025
}
2126

2227
#[derive(Debug, CborLen, Encode, Decode)]
2328
#[rustfmt::skip]
2429
pub(crate) struct KeyExchangeRequest {
30+
#[n(1)] pub local_decryptor_address: Address,
2531
}
2632

2733
#[derive(Debug, CborLen, Encode, Decode)]
2834
#[rustfmt::skip]
2935
pub(crate) struct KeyExchangeResponse {
3036
#[n(0)] pub key_identifier_for_consumer: Vec<u8>,
31-
#[n(1)] pub secret_key: [u8; 32],
32-
#[n(2)] pub valid_until: TimestampInSeconds,
33-
#[n(3)] pub rotate_after: TimestampInSeconds,
34-
#[n(4)] pub rekey_period: Duration,
37+
#[n(1)] pub valid_until: TimestampInSeconds,
38+
#[n(2)] pub rotate_after: TimestampInSeconds,
39+
#[n(3)] pub rekey_period: Duration,
3540
}
3641

3742
impl Encodable for KeyExchangeRequest {
@@ -70,14 +75,43 @@ impl Worker for KafkaKeyExchangeListener {
7075
context: &mut Self::Context,
7176
message: Routed<Self::Message>,
7277
) -> ockam_core::Result<()> {
73-
let mut secret_key = [0u8; 32];
74-
rand::thread_rng().fill(&mut secret_key[..]);
75-
let handle = self
76-
.encryption_at_rest
77-
.import_aead_key(secret_key.to_vec())
78-
.await?;
78+
let request: KeyExchangeRequest = minicbor::decode(message.payload())?;
79+
let local_decryptor = Address::from_string(request.local_decryptor_address);
80+
81+
let entry = self
82+
.secure_channel_registry
83+
.get_channel_by_decryptor_address(&local_decryptor);
84+
let handle = match entry {
85+
None => {
86+
warn!("No secure channel found for local decryptor {local_decryptor}",);
87+
return Ok(());
88+
}
89+
Some(entry) => {
90+
let response: SecureChannelApiResponse = context
91+
.send_and_receive(
92+
route![entry.decryptor_api_address().clone()],
93+
SecureChannelApiRequest::ExtractKey,
94+
)
95+
.await?;
96+
97+
let key_identifier = match response {
98+
SecureChannelApiResponse::Ok(key_identifier) => key_identifier,
99+
SecureChannelApiResponse::Err(error) => {
100+
error!("Error extracting key: {error}");
101+
return Ok(());
102+
}
103+
};
79104

80-
let now = TimestampInSeconds(ockam_core::compat::time::now()?);
105+
let secret = self
106+
.secure_channel_vault
107+
.export_rekey(&key_identifier)
108+
.await?;
109+
110+
self.encryption_at_rest.import_aead_key(secret).await?
111+
}
112+
};
113+
114+
let now = TimestampInSeconds(self.clock.now()?);
81115
let valid_until = now + self.key_validity;
82116
let rotate_after = now + self.key_rotation;
83117

@@ -86,7 +120,6 @@ impl Worker for KafkaKeyExchangeListener {
86120
message.return_route().clone(),
87121
KeyExchangeResponse {
88122
key_identifier_for_consumer: handle.into_vec(),
89-
secret_key,
90123
valid_until,
91124
rotate_after,
92125
rekey_period: self.rekey_period,
@@ -101,25 +134,41 @@ impl Worker for KafkaKeyExchangeListener {
101134
impl KafkaKeyExchangeListener {
102135
#[allow(clippy::too_many_arguments)]
103136
pub async fn create(
137+
clock: impl Clock,
104138
context: &Context,
105139
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
140+
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
141+
secure_channel_registry: SecureChannelRegistry,
106142
key_rotation: Duration,
107143
key_validity: Duration,
108144
rekey_period: Duration,
109-
secure_channel_flow_control: &FlowControlId,
110145
incoming_access_control: impl IncomingAccessControl,
111146
outgoing_access_control: impl OutgoingAccessControl,
112147
) -> ockam_core::Result<()> {
113148
let address = Address::from_string(DefaultAddress::KAFKA_CUSTODIAN);
149+
let secure_channel_flow_control = context
150+
.flow_controls()
151+
.get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into())
152+
.ok_or_else(|| {
153+
ockam_core::Error::new(
154+
Origin::Channel,
155+
Kind::NotFound,
156+
"Secure channel listener flow control not found",
157+
)
158+
})?;
159+
114160
context
115161
.flow_controls()
116-
.add_consumer(address.clone(), secure_channel_flow_control);
162+
.add_consumer(address.clone(), &secure_channel_flow_control);
117163

118164
WorkerBuilder::new(KafkaKeyExchangeListener {
119165
encryption_at_rest,
166+
secure_channel_vault,
120167
key_rotation,
121168
key_validity,
122169
rekey_period,
170+
secure_channel_registry,
171+
clock: Box::new(clock),
123172
})
124173
.with_address(address)
125174
.with_incoming_access_control(incoming_access_control)

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs

+76-7
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use crate::kafka::key_exchange::controller::{
55
use crate::kafka::key_exchange::listener::{KeyExchangeRequest, KeyExchangeResponse};
66
use crate::kafka::ConsumerResolution;
77
use crate::DefaultAddress;
8-
use ockam::identity::TimestampInSeconds;
8+
use ockam::identity::{SecureChannelApiRequest, SecureChannelApiResponse, TimestampInSeconds};
99
use ockam_core::errcode::{Kind, Origin};
10-
use ockam_core::{Error, Result};
10+
use ockam_core::{route, Error, Result};
1111
use ockam_multiaddr::proto::{Secure, Service};
1212
use ockam_multiaddr::MultiAddr;
1313
use ockam_node::{Context, MessageSendReceiveOptions};
@@ -35,6 +35,72 @@ impl KafkaKeyExchangeControllerImpl {
3535
destination.push_back(Secure::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?;
3636
destination.push_back(Service::new(DefaultAddress::KAFKA_CUSTODIAN))?;
3737
if let Some(node_manager) = inner.node_manager.upgrade() {
38+
// create a second secure channel to be used for key exchange
39+
let (aead_secret_key_handle, their_decryptor_address) = {
40+
let key_exchange_connection = node_manager
41+
.make_connection(context, &destination, node_manager.identifier(), None, None)
42+
.await?;
43+
44+
let encryptor_address = key_exchange_connection
45+
.secure_channel_encryptors
46+
.first()
47+
.expect("encryptor should be present");
48+
49+
let entry = node_manager
50+
.secure_channels
51+
.secure_channel_registry()
52+
.get_channel_by_encryptor_address(encryptor_address)
53+
.expect("channel should be present");
54+
55+
if !inner
56+
.consumer_policy_access_control
57+
.is_identity_authorized(entry.their_id())
58+
.await?
59+
{
60+
key_exchange_connection
61+
.close(context, &node_manager)
62+
.await?;
63+
return Err(Error::new(
64+
Origin::Channel,
65+
Kind::Invalid,
66+
"Consumer is not authorized to use the secure channel",
67+
));
68+
}
69+
70+
let response: SecureChannelApiResponse = context
71+
.send_and_receive(
72+
route![entry.encryptor_api_address().clone()],
73+
SecureChannelApiRequest::ExtractKey,
74+
)
75+
.await?;
76+
77+
match response {
78+
SecureChannelApiResponse::Ok(secret_handle) => {
79+
let secret = node_manager
80+
.secure_channels
81+
.vault()
82+
.secure_channel_vault
83+
.export_rekey(&secret_handle)
84+
.await?;
85+
86+
key_exchange_connection
87+
.close(context, &node_manager)
88+
.await?;
89+
90+
(
91+
self.encryption_at_rest.import_aead_key(secret).await?,
92+
entry.their_decryptor_address(),
93+
)
94+
}
95+
SecureChannelApiResponse::Err(error) => {
96+
key_exchange_connection
97+
.close(context, &node_manager)
98+
.await?;
99+
return Err(error);
100+
}
101+
}
102+
};
103+
38104
let connection = node_manager
39105
.make_connection(context, &destination, node_manager.identifier(), None, None)
40106
.await?;
@@ -52,14 +118,17 @@ impl KafkaKeyExchangeControllerImpl {
52118

53119
let route = connection.route()?;
54120
let response: KeyExchangeResponse = context
55-
.send_and_receive_extended(route, KeyExchangeRequest {}, send_and_receive_options)
121+
.send_and_receive_extended(
122+
route,
123+
KeyExchangeRequest {
124+
local_decryptor_address: their_decryptor_address,
125+
},
126+
send_and_receive_options,
127+
)
56128
.await?
57129
.into_body()?;
58130

59-
let aead_secret_key_handle = self
60-
.encryption_at_rest
61-
.import_aead_key(response.secret_key.to_vec())
62-
.await?;
131+
connection.close(context, &node_manager).await?;
63132

64133
Ok(ExchangedKey {
65134
secret_key_handler: aead_secret_key_handle,

0 commit comments

Comments
 (0)