Skip to content

Commit bcd6803

Browse files
committed
feat(rust): added support for multiple concurrent routes to tcp inlets
1 parent 1504383 commit bcd6803

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2318
-1249
lines changed

implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -71,37 +71,37 @@ impl<T: TcpPortalsRepository> TcpPortalsRepository for AutoRetry<T> {
7171

7272
#[derive(Debug, Clone, PartialEq, Eq)]
7373
pub struct TcpInlet {
74-
bind_addr: SocketAddr,
75-
outlet_addr: MultiAddr,
74+
bind_address: SocketAddr,
75+
outlet_addresses: Vec<MultiAddr>,
7676
alias: String,
7777
privileged: bool,
7878
}
7979

8080
impl TcpInlet {
8181
pub fn new(
8282
bind_addr: &SocketAddr,
83-
outlet_addr: &MultiAddr,
83+
outlet_addresses: &[MultiAddr],
8484
alias: &str,
8585
privileged: bool,
8686
) -> TcpInlet {
8787
Self {
88-
bind_addr: *bind_addr,
89-
outlet_addr: outlet_addr.clone(),
88+
bind_address: *bind_addr,
89+
outlet_addresses: outlet_addresses.to_owned(),
9090
alias: alias.to_string(),
9191
privileged,
9292
}
9393
}
9494

95-
pub fn bind_addr(&self) -> SocketAddr {
96-
self.bind_addr
95+
pub fn bind_address(&self) -> SocketAddr {
96+
self.bind_address
9797
}
9898

99-
pub fn outlet_addr(&self) -> MultiAddr {
100-
self.outlet_addr.clone()
99+
pub fn outlet_addresses(&self) -> &Vec<MultiAddr> {
100+
&self.outlet_addresses
101101
}
102102

103-
pub fn alias(&self) -> String {
104-
self.alias.clone()
103+
pub fn alias(&self) -> &str {
104+
&self.alias
105105
}
106106

107107
pub fn privileged(&self) -> bool {

implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs

+24-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
use itertools::Itertools;
2+
use sqlx::*;
13
use std::net::SocketAddr;
24
use std::str::FromStr;
35
use std::sync::Arc;
4-
5-
use sqlx::*;
66
use tracing::debug;
77

88
use crate::cli_state::storage::tcp_portals_repository::TcpPortalsRepository;
@@ -62,8 +62,14 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
6262
ON CONFLICT DO NOTHING"#,
6363
)
6464
.bind(node_name)
65-
.bind(tcp_inlet.bind_addr().to_string())
66-
.bind(tcp_inlet.outlet_addr().to_string())
65+
.bind(tcp_inlet.bind_address().to_string())
66+
.bind(
67+
tcp_inlet
68+
.outlet_addresses()
69+
.iter()
70+
.map(|x| x.to_string())
71+
.join("//"),
72+
)
6773
.bind(tcp_inlet.alias())
6874
.bind(tcp_inlet.privileged());
6975
query.execute(&*self.database.pool).await.void()?;
@@ -158,18 +164,26 @@ struct TcpInletRow {
158164
impl TcpInletRow {
159165
fn bind_addr(&self) -> Result<SocketAddr> {
160166
SocketAddr::from_str(&self.bind_addr)
161-
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
167+
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
162168
}
163169

164-
fn outlet_addr(&self) -> Result<MultiAddr> {
165-
MultiAddr::from_str(&self.outlet_addr)
166-
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
170+
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> {
171+
let mut multiaddresses = Vec::new();
172+
173+
for addr in self.outlet_addr.split("//") {
174+
multiaddresses.push(
175+
MultiAddr::from_str(addr)
176+
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))?,
177+
);
178+
}
179+
180+
Ok(multiaddresses)
167181
}
168182

169183
fn tcp_inlet(&self) -> Result<TcpInlet> {
170184
Ok(TcpInlet::new(
171185
&self.bind_addr()?,
172-
&self.outlet_addr()?,
186+
&self.outlet_addresses()?,
173187
&self.alias,
174188
self.privileged.to_bool(),
175189
))
@@ -212,7 +226,7 @@ mod tests {
212226

213227
let tcp_inlet = TcpInlet::new(
214228
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
215-
&MultiAddr::from_str("/node/outlet").unwrap(),
229+
&["/node/outlet1".parse()?, "/node/outlet2".parse()?],
216230
"alias",
217231
true,
218232
);

implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ impl CliState {
1515
&self,
1616
node_name: &str,
1717
bind_addr: &SocketAddr,
18-
outlet_addr: &MultiAddr,
18+
outlet_addresses: &[MultiAddr],
1919
alias: &str,
2020
privileged: bool,
2121
) -> Result<TcpInlet> {
22-
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
22+
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addresses, alias, privileged);
2323
self.tcp_portals_repository()
2424
.store_tcp_inlet(node_name, &tcp_inlet)
2525
.await?;

implementations/rust/ockam/ockam_api/src/control_api/backend/inlet.rs

+29-11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use ockam_core::Route;
1515
use ockam_multiaddr::MultiAddr;
1616
use ockam_node::Context;
1717
use std::sync::Arc;
18+
use std::time::Duration;
1819

1920
impl HttpControlNodeApiBackend {
2021
pub(super) async fn handle_tcp_inlet(
@@ -36,7 +37,7 @@ impl HttpControlNodeApiBackend {
3637
},
3738
Method::DELETE => match resource_id {
3839
None => ControlApiHttpResponse::missing_resource_id(ResourceKind::TcpInlets),
39-
Some(id) => handle_tcp_inlet_delete(&self.node_manager, id).await,
40+
Some(id) => handle_tcp_inlet_delete(context, &self.node_manager, id).await,
4041
},
4142
_ => {
4243
warn!("Invalid method: {method}");
@@ -155,16 +156,30 @@ async fn handle_tcp_inlet_create(
155156
)?),
156157
};
157158

159+
if request.to.is_empty() {
160+
return ControlApiHttpResponse::bad_request("`to` must not be empty");
161+
}
162+
163+
let to = {
164+
let mut to = Vec::new();
165+
for address in request.to.iter() {
166+
to.push(address.parse()?);
167+
}
168+
to
169+
};
170+
158171
let result = node_manager
159172
.create_inlet(
160173
context,
161174
request.from.try_into()?,
162175
Route::default(),
163176
Route::default(),
164-
request.to.parse()?,
177+
request.target_redundancy.unwrap_or(to.len() - 1),
178+
to,
165179
request.name.unwrap_or_else(random_string),
166180
allow,
167181
None,
182+
Some(Duration::from_millis(request.ping_timeout)),
168183
authorized,
169184
false,
170185
None,
@@ -292,10 +307,11 @@ async fn handle_tcp_inlet_list(
292307
)
293308
)]
294309
async fn handle_tcp_inlet_delete(
310+
context: &Context,
295311
node_manager: &Arc<NodeManager>,
296312
resource_id: &str,
297313
) -> Result<ControlApiHttpResponse, ControlApiError> {
298-
let result = node_manager.delete_inlet(resource_id).await;
314+
let result = node_manager.delete_inlet(context, resource_id).await;
299315
match result {
300316
Ok(_) => Ok(ControlApiHttpResponse::without_body(
301317
StatusCode::NO_CONTENT,
@@ -368,11 +384,13 @@ mod test {
368384
hostname: "127.0.0.1".to_string(),
369385
port: 0,
370386
},
371-
to: "/service/outlet".to_string(),
387+
to: vec!["/service/outlet".to_string()],
388+
target_redundancy: None,
372389
identity: None,
373390
authorized: None,
374391
allow: None,
375-
retry_wait: 1000,
392+
retry_wait: 1_000,
393+
ping_timeout: 1_000,
376394
})
377395
.unwrap(),
378396
),
@@ -390,8 +408,8 @@ mod test {
390408
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
391409
assert_eq!(inlet_status.name, "inlet-name");
392410
assert_eq!(inlet_status.status, ConnectionStatus::Down);
393-
assert_eq!(inlet_status.current_route, None);
394-
assert_eq!(inlet_status.to, "/service/outlet");
411+
assert!(inlet_status.active_routes.is_empty());
412+
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
395413
assert_eq!(inlet_status.bind_address.hostname, "127.0.0.1");
396414
assert!(inlet_status.bind_address.port > 0);
397415

@@ -414,8 +432,8 @@ mod test {
414432
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
415433
assert_eq!(inlet_status.name, "inlet-name");
416434
assert_eq!(inlet_status.status, ConnectionStatus::Up);
417-
assert_eq!(inlet_status.current_route, Some("0#outlet".to_string()));
418-
assert_eq!(inlet_status.to, "/service/outlet");
435+
assert_eq!(inlet_status.active_routes, vec!["0#outlet".to_string()]);
436+
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
419437

420438
let request = ControlApiHttpRequest {
421439
method: "GET".to_string(),
@@ -435,8 +453,8 @@ mod test {
435453
assert_eq!(inlets.len(), 1);
436454
assert_eq!(inlets[0].name, "inlet-name");
437455
assert_eq!(inlets[0].status, ConnectionStatus::Up);
438-
assert_eq!(inlets[0].current_route, Some("0#outlet".to_string()));
439-
assert_eq!(inlets[0].to, "/service/outlet");
456+
assert_eq!(inlets[0].active_routes, vec!["0#outlet".to_string()]);
457+
assert_eq!(inlets[0].to, vec!["/service/outlet"]);
440458

441459
let request = ControlApiHttpRequest {
442460
method: "DELETE".to_string(),

implementations/rust/ockam/ockam_api/src/control_api/protocol/inlet.rs

+34-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 {
1313
20000
1414
}
1515

16+
fn ping_timeout_default() -> u64 {
17+
10_000
18+
}
19+
1620
#[derive(Debug, Serialize, Deserialize, Default, ToSchema)]
1721
#[serde(rename_all = "kebab-case")]
1822
pub enum InletKind {
@@ -65,9 +69,13 @@ pub struct CreateInletRequest {
6569
#[serde(default = "tcp_inlet_default_bind_address")]
6670
#[schema(default = tcp_inlet_default_bind_address)]
6771
pub from: HostnamePort,
68-
/// Multiaddress to a TCP Outlet
72+
/// Multiaddresses to a TCP Outlet
6973
#[schema(example = "/project/default/service/forward_to_node1/secure/api/service/outlet")]
70-
pub to: String,
74+
pub to: Vec<String>,
75+
/// Target redundancy for the TCP Inlet routes; 0 means only one route is instantiated;
76+
/// When omitted, the number of provided Multiaddresses minus one applies
77+
#[serde(default)]
78+
pub target_redundancy: Option<usize>,
7179
/// Identity to be used to create the secure channel;
7280
/// When omitted, the node's identity will be used
7381
pub identity: Option<String>,
@@ -84,6 +92,11 @@ pub struct CreateInletRequest {
8492
#[serde(default = "retry_wait_default")]
8593
#[schema(default = retry_wait_default)]
8694
pub retry_wait: u64,
95+
/// How long until the outlet route is considered disconnected;
96+
/// In milliseconds
97+
#[serde(default = "ping_timeout_default")]
98+
#[schema(default = ping_timeout_default)]
99+
pub ping_timeout: u64,
87100
}
88101
#[derive(Debug, Serialize, Deserialize, ToSchema)]
89102
#[serde(rename_all = "kebab-case")]
@@ -103,24 +116,33 @@ pub struct InletStatus {
103116
pub status: ConnectionStatus,
104117
/// Bind address of the TCP Inlet
105118
pub bind_address: HostnamePort,
106-
/// The current route of the TCP Inlet, populated only when the status is `up`
107-
pub current_route: Option<String>,
108-
/// Multiaddress to the TCP Outlet
109-
pub to: String,
119+
/// The active route of the TCP Inlet, empty when the connection is down
120+
pub active_routes: Vec<String>,
121+
/// The number of target redundant routes, 0 means only one route is instantiated
122+
pub target_redundancy: usize,
123+
/// Multiaddresses to the TCP Outlet
124+
pub to: Vec<String>,
110125
}
111126

112-
impl TryFrom<crate::nodes::models::portal::InletStatus> for InletStatus {
127+
impl TryFrom<crate::nodes::models::portal::InletStatusView> for InletStatus {
113128
type Error = ockam_core::Error;
114129

115-
fn try_from(status: crate::nodes::models::portal::InletStatus) -> Result<Self, Self::Error> {
116-
let bind_address = HostnamePort::try_from(status.bind_addr.as_str())?;
130+
fn try_from(
131+
status: crate::nodes::models::portal::InletStatusView,
132+
) -> Result<Self, Self::Error> {
133+
let bind_address = HostnamePort::try_from(status.bind_address.as_str())?;
117134

118135
Ok(InletStatus {
119-
status: status.status.into(),
136+
status: status.connection.into(),
120137
bind_address,
121138
name: status.alias,
122-
current_route: status.outlet_route.map(|r| r.to_string()),
123-
to: status.outlet_addr,
139+
active_routes: status
140+
.outlet_routes
141+
.into_iter()
142+
.map(|r| r.to_string())
143+
.collect(),
144+
target_redundancy: status.target_redundancy,
145+
to: status.outlet_addresses,
124146
})
125147
}
126148
}

0 commit comments

Comments
 (0)