Skip to content

Commit ffe6799

Browse files
committed
feat(rust): switching key negotiation to a double secure channel
1 parent c85eb64 commit ffe6799

File tree

24 files changed

+331
-125
lines changed

24 files changed

+331
-125
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

implementations/rust/ockam/ockam_api/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ tracing-error = "0.2.0"
112112
tracing-opentelemetry = "0.27.0"
113113
tracing-subscriber = { version = "0.3", features = ["json"] }
114114
url = "2.5.2"
115-
zeroize = { version = "1.8.1", features = ["zeroize_derive"] }
116115

117116
ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.66.0", features = ["cbor", "serde"] }
118117
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.99.0" }

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs

+35-11
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ mod test {
266266
pub fn rekey_rotation() -> ockam_core::Result<()> {
267267
let runtime = Arc::new(Runtime::new().unwrap());
268268
let runtime_cloned = runtime.clone();
269-
std::env::set_var("OCKAM_LOGGING", "false");
270269

271270
runtime_cloned.block_on(async move {
272271
let test_body = async move {
@@ -297,24 +296,34 @@ mod test {
297296
.get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into())
298297
.unwrap();
299298

299+
let test_clock = TestClock::new(0);
300+
300301
KafkaKeyExchangeListener::create(
302+
test_clock.clone(),
301303
&consumer_node.context,
302304
consumer_node
303305
.node_manager
304306
.secure_channels
305307
.vault()
306308
.encryption_at_rest_vault,
307-
Duration::from_secs(60),
308-
Duration::from_secs(60),
309-
Duration::from_secs(60),
309+
consumer_node
310+
.node_manager
311+
.secure_channels
312+
.vault()
313+
.secure_channel_vault,
314+
consumer_node
315+
.node_manager
316+
.secure_channels
317+
.secure_channel_registry(),
318+
Duration::from_secs(5 * 60), //rotation
319+
Duration::from_secs(10 * 60), //validity
320+
Duration::from_secs(60), //rekey
310321
&consumer_secure_channel_listener_flow_control_id,
311322
AllowAll,
312323
AllowAll,
313324
)
314325
.await?;
315326

316-
let test_clock = TestClock::new(0);
317-
318327
let destination = consumer_node.listen_address().await.multi_addr().unwrap();
319328
let producer_secure_channel_controller = create_secure_channel_controller(
320329
test_clock.clone(),
@@ -355,7 +364,10 @@ mod test {
355364
.await?;
356365

357366
assert_eq!(third_key.rekey_counter, 1);
358-
assert_eq!(first_key.secret_key_handle, third_key.secret_key_handle);
367+
assert_eq!(
368+
first_key.key_identifier_for_consumer,
369+
third_key.key_identifier_for_consumer
370+
);
359371

360372
// 04:00 - yet another rekey should happen, but no rotation
361373
test_clock.add_seconds(60 * 3);
@@ -365,7 +377,10 @@ mod test {
365377
.await?;
366378

367379
assert_eq!(fourth_key.rekey_counter, 2);
368-
assert_eq!(first_key.secret_key_handle, fourth_key.secret_key_handle);
380+
assert_eq!(
381+
first_key.key_identifier_for_consumer,
382+
fourth_key.key_identifier_for_consumer
383+
);
369384

370385
// 05:00 - the default duration of the key is 10 minutes,
371386
// but the rotation should happen after 5 minutes
@@ -375,7 +390,10 @@ mod test {
375390
.get_or_exchange_key(&mut producer_node.context, "topic_name")
376391
.await?;
377392

378-
assert_ne!(third_key.secret_key_handle, fifth_key.secret_key_handle);
393+
assert_ne!(
394+
third_key.key_identifier_for_consumer,
395+
fifth_key.key_identifier_for_consumer
396+
);
379397
assert_eq!(fifth_key.rekey_counter, 0);
380398

381399
// Now let's simulate a failure to rekey by shutting down the consumer
@@ -389,7 +407,10 @@ mod test {
389407
.await?;
390408

391409
assert_eq!(sixth_key.rekey_counter, 1);
392-
assert_eq!(fifth_key.secret_key_handle, sixth_key.secret_key_handle);
410+
assert_eq!(
411+
fifth_key.key_identifier_for_consumer,
412+
sixth_key.key_identifier_for_consumer
413+
);
393414

394415
// 10:00 - Rotation fails, but the existing key is still valid
395416
// and needs to be rekeyed
@@ -400,7 +421,10 @@ mod test {
400421
.await?;
401422

402423
assert_eq!(seventh_key.rekey_counter, 2);
403-
assert_eq!(fifth_key.secret_key_handle, seventh_key.secret_key_handle);
424+
assert_eq!(
425+
fifth_key.key_identifier_for_consumer,
426+
seventh_key.key_identifier_for_consumer
427+
);
404428

405429
// 15:00 - Rotation fails, and the existing key is no longer valid
406430
test_clock.add_seconds(60 * 5);

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/listener.rs

+55-16
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,42 @@
11
use crate::DefaultAddress;
22
use minicbor::{CborLen, Decode, Encode};
3-
use ockam::identity::TimestampInSeconds;
3+
use ockam::identity::{
4+
SecureChannelApiRequest, SecureChannelApiResponse, SecureChannelRegistry, TimestampInSeconds,
5+
};
6+
use ockam_core::compat::clock::Clock;
47
use ockam_core::flow_control::FlowControlId;
58
use ockam_core::{
6-
async_trait, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
9+
async_trait, route, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
710
OutgoingAccessControl, Routed, Worker,
811
};
912
use ockam_node::{Context, WorkerBuilder};
10-
use ockam_vault::VaultForEncryptionAtRest;
11-
use rand::Rng;
13+
use ockam_vault::{VaultForEncryptionAtRest, VaultForSecureChannels};
1214
use std::sync::Arc;
1315
use std::time::Duration;
1416

1517
pub(crate) struct KafkaKeyExchangeListener {
1618
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
19+
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
20+
secure_channel_registry: SecureChannelRegistry,
1721
rekey_period: Duration,
1822
key_validity: Duration,
1923
key_rotation: Duration,
24+
clock: Box<dyn Clock>,
2025
}
2126

2227
#[derive(Debug, CborLen, Encode, Decode)]
2328
#[rustfmt::skip]
2429
pub(crate) struct KeyExchangeRequest {
30+
#[n(1)] pub local_decryptor_address: Address,
2531
}
2632

2733
#[derive(Debug, CborLen, Encode, Decode)]
2834
#[rustfmt::skip]
2935
pub(crate) struct KeyExchangeResponse {
3036
#[n(0)] pub key_identifier_for_consumer: Vec<u8>,
31-
#[n(1)] pub secret_key: [u8; 32],
32-
#[n(2)] pub valid_until: TimestampInSeconds,
33-
#[n(3)] pub rotate_after: TimestampInSeconds,
34-
#[n(4)] pub rekey_period: Duration,
37+
#[n(1)] pub valid_until: TimestampInSeconds,
38+
#[n(2)] pub rotate_after: TimestampInSeconds,
39+
#[n(3)] pub rekey_period: Duration,
3540
}
3641

3742
impl Encodable for KeyExchangeRequest {
@@ -70,14 +75,43 @@ impl Worker for KafkaKeyExchangeListener {
7075
context: &mut Self::Context,
7176
message: Routed<Self::Message>,
7277
) -> ockam_core::Result<()> {
73-
let mut secret_key = [0u8; 32];
74-
rand::thread_rng().fill(&mut secret_key[..]);
75-
let handle = self
76-
.encryption_at_rest
77-
.import_aead_key(secret_key.to_vec())
78-
.await?;
78+
let request: KeyExchangeRequest = minicbor::decode(message.payload())?;
79+
let local_decryptor = Address::from_string(request.local_decryptor_address);
80+
81+
let entry = self
82+
.secure_channel_registry
83+
.get_channel_by_decryptor_address(&local_decryptor);
84+
let handle = match entry {
85+
None => {
86+
warn!("No secure channel found for local decryptor {local_decryptor}",);
87+
return Ok(());
88+
}
89+
Some(entry) => {
90+
let response: SecureChannelApiResponse = context
91+
.send_and_receive(
92+
route![entry.decryptor_api_address().clone()],
93+
SecureChannelApiRequest::ExtractKey,
94+
)
95+
.await?;
96+
97+
let key_identifier = match response {
98+
SecureChannelApiResponse::Ok(key_identifier) => key_identifier,
99+
SecureChannelApiResponse::Err(error) => {
100+
error!("Error extracting key: {error}");
101+
return Ok(());
102+
}
103+
};
104+
105+
let secret = self
106+
.secure_channel_vault
107+
.export_rekey(&key_identifier)
108+
.await?;
109+
110+
self.encryption_at_rest.import_aead_key(secret).await?
111+
}
112+
};
79113

80-
let now = TimestampInSeconds(ockam_core::compat::time::now()?);
114+
let now = TimestampInSeconds(self.clock.now()?);
81115
let valid_until = now + self.key_validity;
82116
let rotate_after = now + self.key_rotation;
83117

@@ -86,7 +120,6 @@ impl Worker for KafkaKeyExchangeListener {
86120
message.return_route().clone(),
87121
KeyExchangeResponse {
88122
key_identifier_for_consumer: handle.into_vec(),
89-
secret_key,
90123
valid_until,
91124
rotate_after,
92125
rekey_period: self.rekey_period,
@@ -101,8 +134,11 @@ impl Worker for KafkaKeyExchangeListener {
101134
impl KafkaKeyExchangeListener {
102135
#[allow(clippy::too_many_arguments)]
103136
pub async fn create(
137+
clock: impl Clock,
104138
context: &Context,
105139
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
140+
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
141+
secure_channel_registry: SecureChannelRegistry,
106142
key_rotation: Duration,
107143
key_validity: Duration,
108144
rekey_period: Duration,
@@ -117,9 +153,12 @@ impl KafkaKeyExchangeListener {
117153

118154
WorkerBuilder::new(KafkaKeyExchangeListener {
119155
encryption_at_rest,
156+
secure_channel_vault,
120157
key_rotation,
121158
key_validity,
122159
rekey_period,
160+
secure_channel_registry,
161+
clock: Box::new(clock),
123162
})
124163
.with_address(address)
125164
.with_incoming_access_control(incoming_access_control)

implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs

+76-7
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use crate::kafka::key_exchange::controller::{
55
use crate::kafka::key_exchange::listener::{KeyExchangeRequest, KeyExchangeResponse};
66
use crate::kafka::ConsumerResolution;
77
use crate::DefaultAddress;
8-
use ockam::identity::TimestampInSeconds;
8+
use ockam::identity::{SecureChannelApiRequest, SecureChannelApiResponse, TimestampInSeconds};
99
use ockam_core::errcode::{Kind, Origin};
10-
use ockam_core::{Error, Result};
10+
use ockam_core::{route, Error, Result};
1111
use ockam_multiaddr::proto::{Secure, Service};
1212
use ockam_multiaddr::MultiAddr;
1313
use ockam_node::{Context, MessageSendReceiveOptions};
@@ -35,6 +35,72 @@ impl KafkaKeyExchangeControllerImpl {
3535
destination.push_back(Secure::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?;
3636
destination.push_back(Service::new(DefaultAddress::KAFKA_CUSTODIAN))?;
3737
if let Some(node_manager) = inner.node_manager.upgrade() {
38+
// create a second secure channel to be used for key exchange
39+
let (aead_secret_key_handle, their_decryptor_address) = {
40+
let key_exchange_connection = node_manager
41+
.make_connection(context, &destination, node_manager.identifier(), None, None)
42+
.await?;
43+
44+
let encryptor_address = key_exchange_connection
45+
.secure_channel_encryptors
46+
.first()
47+
.expect("encryptor should be present");
48+
49+
let entry = node_manager
50+
.secure_channels
51+
.secure_channel_registry()
52+
.get_channel_by_encryptor_address(encryptor_address)
53+
.expect("channel should be present");
54+
55+
if !inner
56+
.consumer_policy_access_control
57+
.is_identity_authorized(entry.their_id())
58+
.await?
59+
{
60+
key_exchange_connection
61+
.close(context, &node_manager)
62+
.await?;
63+
return Err(Error::new(
64+
Origin::Channel,
65+
Kind::Invalid,
66+
"Consumer is not authorized to use the secure channel",
67+
));
68+
}
69+
70+
let response: SecureChannelApiResponse = context
71+
.send_and_receive(
72+
route![entry.encryptor_api_address().clone()],
73+
SecureChannelApiRequest::ExtractKey,
74+
)
75+
.await?;
76+
77+
match response {
78+
SecureChannelApiResponse::Ok(secret_handle) => {
79+
let secret = node_manager
80+
.secure_channels
81+
.vault()
82+
.secure_channel_vault
83+
.export_rekey(&secret_handle)
84+
.await?;
85+
86+
key_exchange_connection
87+
.close(context, &node_manager)
88+
.await?;
89+
90+
(
91+
self.encryption_at_rest.import_aead_key(secret).await?,
92+
entry.their_decryptor_address(),
93+
)
94+
}
95+
SecureChannelApiResponse::Err(error) => {
96+
key_exchange_connection
97+
.close(context, &node_manager)
98+
.await?;
99+
return Err(error);
100+
}
101+
}
102+
};
103+
38104
let connection = node_manager
39105
.make_connection(context, &destination, node_manager.identifier(), None, None)
40106
.await?;
@@ -52,14 +118,17 @@ impl KafkaKeyExchangeControllerImpl {
52118

53119
let route = connection.route()?;
54120
let response: KeyExchangeResponse = context
55-
.send_and_receive_extended(route, KeyExchangeRequest {}, send_and_receive_options)
121+
.send_and_receive_extended(
122+
route,
123+
KeyExchangeRequest {
124+
local_decryptor_address: their_decryptor_address,
125+
},
126+
send_and_receive_options,
127+
)
56128
.await?
57129
.into_body()?;
58130

59-
let aead_secret_key_handle = self
60-
.encryption_at_rest
61-
.import_aead_key(response.secret_key.to_vec())
62-
.await?;
131+
connection.close(context, &node_manager).await?;
63132

64133
Ok(ExchangedKey {
65134
secret_key_handler: aead_secret_key_handle,

implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use ockam::{Address, Context, Result};
2323
use ockam_abac::PolicyExpression;
2424
use ockam_abac::{Action, Resource, ResourceType};
2525
use ockam_core::api::{Error, Response};
26+
use ockam_core::compat::clock::ProductionClock;
2627
use ockam_core::compat::rand::random_string;
2728
use ockam_core::flow_control::FlowControls;
2829
use ockam_core::route;
@@ -197,8 +198,11 @@ impl InMemoryNode {
197198

198199
// TODO: remove key exchange from inlets
199200
KafkaKeyExchangeListener::create(
201+
ProductionClock,
200202
context,
201203
vault.encryption_at_rest_vault,
204+
self.secure_channels.vault().secure_channel_vault,
205+
self.secure_channels.secure_channel_registry(),
202206
std::time::Duration::from_secs(60 * 60 * 24),
203207
std::time::Duration::from_secs(60 * 60 * 30),
204208
std::time::Duration::from_secs(60 * 60),
@@ -395,8 +399,11 @@ impl InMemoryNode {
395399
.await?;
396400

397401
KafkaKeyExchangeListener::create(
402+
ProductionClock,
398403
context,
399404
vault.encryption_at_rest_vault,
405+
self.secure_channels.vault().secure_channel_vault,
406+
self.secure_channels.secure_channel_registry(),
400407
request.key_rotation,
401408
request.key_validity,
402409
request.rekey_period,

0 commit comments

Comments
 (0)