Skip to content

Commit 57004e0

Browse files
committed
feat(rust): added support for multiple concurrent routes to tcp inlets
1 parent b3cd57a commit 57004e0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2290
-1237
lines changed

implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,21 @@ impl<T: TcpPortalsRepository> TcpPortalsRepository for AutoRetry<T> {
7272
#[derive(Debug, Clone, PartialEq, Eq)]
7373
pub struct TcpInlet {
7474
bind_addr: SocketAddr,
75-
outlet_addr: MultiAddr,
75+
outlet_addresses: Vec<MultiAddr>,
7676
alias: String,
7777
privileged: bool,
7878
}
7979

8080
impl TcpInlet {
8181
pub fn new(
8282
bind_addr: &SocketAddr,
83-
outlet_addr: &MultiAddr,
83+
outlet_addresses: &[MultiAddr],
8484
alias: &str,
8585
privileged: bool,
8686
) -> TcpInlet {
8787
Self {
8888
bind_addr: *bind_addr,
89-
outlet_addr: outlet_addr.clone(),
89+
outlet_addresses: outlet_addresses.to_owned(),
9090
alias: alias.to_string(),
9191
privileged,
9292
}
@@ -96,12 +96,12 @@ impl TcpInlet {
9696
self.bind_addr
9797
}
9898

99-
pub fn outlet_addr(&self) -> MultiAddr {
100-
self.outlet_addr.clone()
99+
pub fn outlet_addr(&self) -> &Vec<MultiAddr> {
100+
&self.outlet_addresses
101101
}
102102

103-
pub fn alias(&self) -> String {
104-
self.alias.clone()
103+
pub fn alias(&self) -> &str {
104+
&self.alias
105105
}
106106

107107
pub fn privileged(&self) -> bool {

implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs

+23-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
use itertools::Itertools;
2+
use sqlx::*;
13
use std::net::SocketAddr;
24
use std::str::FromStr;
35
use std::sync::Arc;
4-
5-
use sqlx::*;
66
use tracing::debug;
77

88
use crate::cli_state::storage::tcp_portals_repository::TcpPortalsRepository;
@@ -63,7 +63,13 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
6363
)
6464
.bind(node_name)
6565
.bind(tcp_inlet.bind_addr().to_string())
66-
.bind(tcp_inlet.outlet_addr().to_string())
66+
.bind(
67+
tcp_inlet
68+
.outlet_addr()
69+
.iter()
70+
.map(|x| x.to_string())
71+
.join("//"),
72+
)
6773
.bind(tcp_inlet.alias())
6874
.bind(tcp_inlet.privileged());
6975
query.execute(&*self.database.pool).await.void()?;
@@ -158,18 +164,26 @@ struct TcpInletRow {
158164
impl TcpInletRow {
159165
fn bind_addr(&self) -> Result<SocketAddr> {
160166
SocketAddr::from_str(&self.bind_addr)
161-
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
167+
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
162168
}
163169

164-
fn outlet_addr(&self) -> Result<MultiAddr> {
165-
MultiAddr::from_str(&self.outlet_addr)
166-
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
170+
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> {
171+
let mut multiaddresses = Vec::new();
172+
173+
for addr in self.outlet_addr.split("//") {
174+
multiaddresses.push(
175+
MultiAddr::from_str(addr)
176+
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))?,
177+
);
178+
}
179+
180+
Ok(multiaddresses)
167181
}
168182

169183
fn tcp_inlet(&self) -> Result<TcpInlet> {
170184
Ok(TcpInlet::new(
171185
&self.bind_addr()?,
172-
&self.outlet_addr()?,
186+
&self.outlet_addresses()?,
173187
&self.alias,
174188
self.privileged.to_bool(),
175189
))
@@ -212,7 +226,7 @@ mod tests {
212226

213227
let tcp_inlet = TcpInlet::new(
214228
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
215-
&MultiAddr::from_str("/node/outlet").unwrap(),
229+
&["/node/outlet1".parse()?, "/node/outlet2".parse()?],
216230
"alias",
217231
true,
218232
);

implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ impl CliState {
1515
&self,
1616
node_name: &str,
1717
bind_addr: &SocketAddr,
18-
outlet_addr: &MultiAddr,
18+
outlet_addresses: &[MultiAddr],
1919
alias: &str,
2020
privileged: bool,
2121
) -> Result<TcpInlet> {
22-
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
22+
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addresses, alias, privileged);
2323
self.tcp_portals_repository()
2424
.store_tcp_inlet(node_name, &tcp_inlet)
2525
.await?;

implementations/rust/ockam/ockam_api/src/control_api/backend/inlet.rs

+29-11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use ockam_core::Route;
1414
use ockam_multiaddr::MultiAddr;
1515
use ockam_node::Context;
1616
use std::sync::Arc;
17+
use std::time::Duration;
1718

1819
impl HttpControlNodeApiBackend {
1920
pub(super) async fn handle_tcp_inlet(
@@ -35,7 +36,7 @@ impl HttpControlNodeApiBackend {
3536
},
3637
Method::DELETE => match resource_id {
3738
None => ControlApiHttpResponse::missing_resource_id(ResourceKind::TcpInlets),
38-
Some(id) => handle_tcp_inlet_delete(&self.node_manager, id).await,
39+
Some(id) => handle_tcp_inlet_delete(context, &self.node_manager, id).await,
3940
},
4041
_ => {
4142
warn!("Invalid method: {method}");
@@ -153,16 +154,30 @@ async fn handle_tcp_inlet_create(
153154
)?),
154155
};
155156

157+
if request.to.is_empty() {
158+
return ControlApiHttpResponse::bad_request("`to` must not be empty");
159+
}
160+
161+
let to = {
162+
let mut to = Vec::new();
163+
for address in request.to.iter() {
164+
to.push(address.parse()?);
165+
}
166+
to
167+
};
168+
156169
let result = node_manager
157170
.create_inlet(
158171
context,
159172
request.from.try_into()?,
160173
Route::default(),
161174
Route::default(),
162-
request.to.parse()?,
175+
request.target_redundancy.unwrap_or(to.len() - 1),
176+
to,
163177
request.name.unwrap_or_else(random_string),
164178
allow,
165179
None,
180+
Some(Duration::from_millis(request.ping_timeout)),
166181
authorized,
167182
false,
168183
None,
@@ -286,10 +301,11 @@ async fn handle_tcp_inlet_list(
286301
)
287302
)]
288303
async fn handle_tcp_inlet_delete(
304+
context: &Context,
289305
node_manager: &Arc<NodeManager>,
290306
resource_id: &str,
291307
) -> Result<ControlApiHttpResponse, ControlApiError> {
292-
let result = node_manager.delete_inlet(resource_id).await;
308+
let result = node_manager.delete_inlet(context, resource_id).await;
293309
match result {
294310
Ok(_) => Ok(ControlApiHttpResponse::without_body(
295311
StatusCode::NO_CONTENT,
@@ -362,11 +378,13 @@ mod test {
362378
hostname: "127.0.0.1".to_string(),
363379
port: 0,
364380
},
365-
to: "/service/outlet".to_string(),
381+
to: vec!["/service/outlet".to_string()],
382+
target_redundancy: None,
366383
identity: None,
367384
authorized: None,
368385
allow: None,
369-
retry_wait: 1000,
386+
retry_wait: 1_000,
387+
ping_timeout: 1_000,
370388
})
371389
.unwrap(),
372390
),
@@ -384,8 +402,8 @@ mod test {
384402
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
385403
assert_eq!(inlet_status.name, "inlet-name");
386404
assert_eq!(inlet_status.status, ConnectionStatus::Down);
387-
assert_eq!(inlet_status.current_route, None);
388-
assert_eq!(inlet_status.to, "/service/outlet");
405+
assert!(inlet_status.active_routes.is_empty());
406+
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
389407
assert_eq!(inlet_status.bind_address.hostname, "127.0.0.1");
390408
assert!(inlet_status.bind_address.port > 0);
391409

@@ -408,8 +426,8 @@ mod test {
408426
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
409427
assert_eq!(inlet_status.name, "inlet-name");
410428
assert_eq!(inlet_status.status, ConnectionStatus::Up);
411-
assert_eq!(inlet_status.current_route, Some("0#outlet".to_string()));
412-
assert_eq!(inlet_status.to, "/service/outlet");
429+
assert_eq!(inlet_status.active_routes, vec!["0#outlet".to_string()]);
430+
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
413431

414432
let request = ControlApiHttpRequest {
415433
method: "GET".to_string(),
@@ -429,8 +447,8 @@ mod test {
429447
assert_eq!(inlets.len(), 1);
430448
assert_eq!(inlets[0].name, "inlet-name");
431449
assert_eq!(inlets[0].status, ConnectionStatus::Up);
432-
assert_eq!(inlets[0].current_route, Some("0#outlet".to_string()));
433-
assert_eq!(inlets[0].to, "/service/outlet");
450+
assert_eq!(inlets[0].active_routes, vec!["0#outlet".to_string()]);
451+
assert_eq!(inlets[0].to, vec!["/service/outlet"]);
434452

435453
let request = ControlApiHttpRequest {
436454
method: "DELETE".to_string(),

implementations/rust/ockam/ockam_api/src/control_api/protocol/inlet.rs

+34-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 {
1313
20000
1414
}
1515

16+
fn ping_timeout_default() -> u64 {
17+
10_000
18+
}
19+
1620
#[derive(Debug, Serialize, Deserialize, Default, ToSchema)]
1721
#[serde(rename_all = "kebab-case")]
1822
pub enum InletKind {
@@ -65,9 +69,13 @@ pub struct CreateInletRequest {
6569
#[serde(default = "tcp_inlet_default_bind_address")]
6670
#[schema(default = tcp_inlet_default_bind_address)]
6771
pub from: HostnamePort,
68-
/// Multiaddress to a TCP Outlet
72+
/// Multiaddresses to a TCP Outlet
6973
#[schema(example = "/project/default/service/forward_to_node1/secure/api/service/outlet")]
70-
pub to: String,
74+
pub to: Vec<String>,
75+
/// Target redundancy for the TCP Inlet routes; 0 means only one route is instantiated
76+
/// When omitted, the number of provided Multiaddresses minus one applies
77+
#[serde(default)]
78+
pub target_redundancy: Option<usize>,
7179
/// Identity to be used to create the secure channel;
7280
/// When omitted, the node's identity will be used
7381
pub identity: Option<String>,
@@ -84,6 +92,11 @@ pub struct CreateInletRequest {
8492
#[serde(default = "retry_wait_default")]
8593
#[schema(default = retry_wait_default)]
8694
pub retry_wait: u64,
95+
/// How long until the outlet route is considered disconnected;
96+
/// In milliseconds
97+
#[serde(default = "ping_timeout_default")]
98+
#[schema(default = ping_timeout_default)]
99+
pub ping_timeout: u64,
87100
}
88101
#[derive(Debug, Serialize, Deserialize, ToSchema)]
89102
#[serde(rename_all = "kebab-case")]
@@ -103,24 +116,33 @@ pub struct InletStatus {
103116
pub status: ConnectionStatus,
104117
/// Bind address of the TCP Inlet
105118
pub bind_address: HostnamePort,
106-
/// The current route of the TCP Inlet, populated only when the status is `up`
107-
pub current_route: Option<String>,
108-
/// Multiaddress to the TCP Outlet
109-
pub to: String,
119+
/// The active route of the TCP Inlet, empty when the connection is down
120+
pub active_routes: Vec<String>,
121+
/// The number of target redundant routes, 0 means only one route is instantiated
122+
pub target_redundancy: usize,
123+
/// Multiaddresses to the TCP Outlet
124+
pub to: Vec<String>,
110125
}
111126

112-
impl TryFrom<crate::nodes::models::portal::InletStatus> for InletStatus {
127+
impl TryFrom<crate::nodes::models::portal::InletStatusView> for InletStatus {
113128
type Error = ockam_core::Error;
114129

115-
fn try_from(status: crate::nodes::models::portal::InletStatus) -> Result<Self, Self::Error> {
116-
let bind_address = HostnamePort::try_from(status.bind_addr.as_str())?;
130+
fn try_from(
131+
status: crate::nodes::models::portal::InletStatusView,
132+
) -> Result<Self, Self::Error> {
133+
let bind_address = HostnamePort::try_from(status.bind_address.as_str())?;
117134

118135
Ok(InletStatus {
119-
status: status.status.into(),
136+
status: status.connection.into(),
120137
bind_address,
121138
name: status.alias,
122-
current_route: status.outlet_route.map(|r| r.to_string()),
123-
to: status.outlet_addr,
139+
active_routes: status
140+
.outlet_routes
141+
.into_iter()
142+
.map(|r| r.to_string())
143+
.collect(),
144+
target_redundancy: status.target_redundancy,
145+
to: status.outlet_addresses,
124146
})
125147
}
126148
}

0 commit comments

Comments
 (0)