Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): added support for multiple concurrent routes to tcp inlets #8832

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,37 +71,37 @@ impl<T: TcpPortalsRepository> TcpPortalsRepository for AutoRetry<T> {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TcpInlet {
bind_addr: SocketAddr,
outlet_addr: MultiAddr,
bind_address: SocketAddr,
outlet_addresses: Vec<MultiAddr>,
alias: String,
privileged: bool,
}

impl TcpInlet {
pub fn new(
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Let's make it a Vector here

alias: &str,
privileged: bool,
) -> TcpInlet {
Self {
bind_addr: *bind_addr,
outlet_addr: outlet_addr.clone(),
bind_address: *bind_addr,
outlet_addresses: outlet_addresses.to_owned(),
alias: alias.to_string(),
privileged,
}
}

pub fn bind_addr(&self) -> SocketAddr {
self.bind_addr
pub fn bind_address(&self) -> SocketAddr {
self.bind_address
}

pub fn outlet_addr(&self) -> MultiAddr {
self.outlet_addr.clone()
pub fn outlet_addresses(&self) -> &Vec<MultiAddr> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I would return a slice here, unless a caller of that function rely on the fact that it's a Vec (I can't think of such case)

&self.outlet_addresses
}

pub fn alias(&self) -> String {
self.alias.clone()
pub fn alias(&self) -> &str {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

&self.alias
}

pub fn privileged(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use itertools::Itertools;
use sqlx::*;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use sqlx::*;
use tracing::debug;

use crate::cli_state::storage::tcp_portals_repository::TcpPortalsRepository;
Expand Down Expand Up @@ -62,8 +62,14 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
ON CONFLICT DO NOTHING"#,
)
.bind(node_name)
.bind(tcp_inlet.bind_addr().to_string())
.bind(tcp_inlet.outlet_addr().to_string())
.bind(tcp_inlet.bind_address().to_string())
.bind(
tcp_inlet
.outlet_addresses()
.iter()
.map(|x| x.to_string())
.join("//"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: A small comment would be great, but overall was easy to understand

)
.bind(tcp_inlet.alias())
.bind(tcp_inlet.privileged());
query.execute(&*self.database.pool).await.void()?;
Expand Down Expand Up @@ -158,18 +164,26 @@ struct TcpInletRow {
impl TcpInletRow {
fn bind_addr(&self) -> Result<SocketAddr> {
SocketAddr::from_str(&self.bind_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
}

fn outlet_addr(&self) -> Result<MultiAddr> {
MultiAddr::from_str(&self.outlet_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> {
let mut multiaddresses = Vec::new();

for addr in self.outlet_addr.split("//") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could that be shrank into self.outlet_addr.split("//").map()? Or is error handling ruining it?

multiaddresses.push(
MultiAddr::from_str(addr)
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))?,
);
}

Ok(multiaddresses)
}

fn tcp_inlet(&self) -> Result<TcpInlet> {
Ok(TcpInlet::new(
&self.bind_addr()?,
&self.outlet_addr()?,
&self.outlet_addresses()?,
&self.alias,
self.privileged.to_bool(),
))
Expand Down Expand Up @@ -212,7 +226,7 @@ mod tests {

let tcp_inlet = TcpInlet::new(
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
&MultiAddr::from_str("/node/outlet").unwrap(),
&["/node/outlet1".parse()?, "/node/outlet2".parse()?],
"alias",
true,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ impl CliState {
&self,
node_name: &str,
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
alias: &str,
privileged: bool,
) -> Result<TcpInlet> {
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addresses, alias, privileged);
self.tcp_portals_repository()
.store_tcp_inlet(node_name, &tcp_inlet)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use ockam_core::Route;
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use std::sync::Arc;
use std::time::Duration;

impl HttpControlNodeApiBackend {
pub(super) async fn handle_tcp_inlet(
Expand All @@ -36,7 +37,7 @@ impl HttpControlNodeApiBackend {
},
Method::DELETE => match resource_id {
None => ControlApiHttpResponse::missing_resource_id(ResourceKind::TcpInlets),
Some(id) => handle_tcp_inlet_delete(&self.node_manager, id).await,
Some(id) => handle_tcp_inlet_delete(context, &self.node_manager, id).await,
},
_ => {
warn!("Invalid method: {method}");
Expand Down Expand Up @@ -155,16 +156,30 @@ async fn handle_tcp_inlet_create(
)?),
};

if request.to.is_empty() {
return ControlApiHttpResponse::bad_request("`to` must not be empty");
}

let to = {
let mut to = Vec::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Same question here. Or also possible to reserve capacity at least

for address in request.to.iter() {
to.push(address.parse()?);
}
to
};

let result = node_manager
.create_inlet(
context,
request.from.try_into()?,
Route::default(),
Route::default(),
request.to.parse()?,
request.target_redundancy.unwrap_or(to.len() - 1),
to,
request.name.unwrap_or_else(random_string),
allow,
None,
Some(Duration::from_millis(request.ping_timeout)),
authorized,
false,
None,
Expand Down Expand Up @@ -292,10 +307,11 @@ async fn handle_tcp_inlet_list(
)
)]
async fn handle_tcp_inlet_delete(
context: &Context,
node_manager: &Arc<NodeManager>,
resource_id: &str,
) -> Result<ControlApiHttpResponse, ControlApiError> {
let result = node_manager.delete_inlet(resource_id).await;
let result = node_manager.delete_inlet(context, resource_id).await;
match result {
Ok(_) => Ok(ControlApiHttpResponse::without_body(
StatusCode::NO_CONTENT,
Expand Down Expand Up @@ -368,11 +384,13 @@ mod test {
hostname: "127.0.0.1".to_string(),
port: 0,
},
to: "/service/outlet".to_string(),
to: vec!["/service/outlet".to_string()],
target_redundancy: None,
identity: None,
authorized: None,
allow: None,
retry_wait: 1000,
retry_wait: 1_000,
ping_timeout: 1_000,
})
.unwrap(),
),
Expand All @@ -390,8 +408,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Down);
assert_eq!(inlet_status.current_route, None);
assert_eq!(inlet_status.to, "/service/outlet");
assert!(inlet_status.active_routes.is_empty());
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
assert_eq!(inlet_status.bind_address.hostname, "127.0.0.1");
assert!(inlet_status.bind_address.port > 0);

Expand All @@ -414,8 +432,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Up);
assert_eq!(inlet_status.current_route, Some("0#outlet".to_string()));
assert_eq!(inlet_status.to, "/service/outlet");
assert_eq!(inlet_status.active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlet_status.to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "GET".to_string(),
Expand All @@ -435,8 +453,8 @@ mod test {
assert_eq!(inlets.len(), 1);
assert_eq!(inlets[0].name, "inlet-name");
assert_eq!(inlets[0].status, ConnectionStatus::Up);
assert_eq!(inlets[0].current_route, Some("0#outlet".to_string()));
assert_eq!(inlets[0].to, "/service/outlet");
assert_eq!(inlets[0].active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlets[0].to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "DELETE".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 {
20000
}

fn ping_timeout_default() -> u64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a Duration instead? At least I'd include the units I the function name. If it's because of some API arguments constraints the duration parser we use in the command?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I took a more "classical" API approach, where the API is low-level as possible, and everything else can be added on top via libraries.
Given the last meeting, I probably took the wrong approach and we should lift the API level.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duration is a part of core so I don't mind using it everywhere where applicable

10_000
}

#[derive(Debug, Serialize, Deserialize, Default, ToSchema)]
#[serde(rename_all = "kebab-case")]
pub enum InletKind {
Expand Down Expand Up @@ -65,9 +69,13 @@ pub struct CreateInletRequest {
#[serde(default = "tcp_inlet_default_bind_address")]
#[schema(default = tcp_inlet_default_bind_address)]
pub from: HostnamePort,
/// Multiaddress to a TCP Outlet
/// Multiaddresses to a TCP Outlet
#[schema(example = "/project/default/service/forward_to_node1/secure/api/service/outlet")]
pub to: String,
pub to: Vec<String>,
/// Target redundancy for the TCP Inlet routes; 0 means only one route is instantiated;
/// When omitted, the number of provided Multiaddresses minus one applies
#[serde(default)]
pub target_redundancy: Option<usize>,
/// Identity to be used to create the secure channel;
/// When omitted, the node's identity will be used
pub identity: Option<String>,
Expand All @@ -84,6 +92,11 @@ pub struct CreateInletRequest {
#[serde(default = "retry_wait_default")]
#[schema(default = retry_wait_default)]
pub retry_wait: u64,
/// How long until the outlet route is considered disconnected;
/// In milliseconds
#[serde(default = "ping_timeout_default")]
#[schema(default = ping_timeout_default)]
pub ping_timeout: u64,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -103,24 +116,33 @@ pub struct InletStatus {
pub status: ConnectionStatus,
/// Bind address of the TCP Inlet
pub bind_address: HostnamePort,
/// The current route of the TCP Inlet, populated only when the status is `up`
pub current_route: Option<String>,
/// Multiaddress to the TCP Outlet
pub to: String,
/// The active route of the TCP Inlet, empty when the connection is down
pub active_routes: Vec<String>,
/// The number of target redundant routes, 0 means only one route is instantiated
pub target_redundancy: usize,
/// Multiaddresses to the TCP Outlet
pub to: Vec<String>,
}

impl TryFrom<crate::nodes::models::portal::InletStatus> for InletStatus {
impl TryFrom<crate::nodes::models::portal::InletStatusView> for InletStatus {
type Error = ockam_core::Error;

fn try_from(status: crate::nodes::models::portal::InletStatus) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_addr.as_str())?;
fn try_from(
status: crate::nodes::models::portal::InletStatusView,
) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_address.as_str())?;

Ok(InletStatus {
status: status.status.into(),
status: status.connection.into(),
bind_address,
name: status.alias,
current_route: status.outlet_route.map(|r| r.to_string()),
to: status.outlet_addr,
active_routes: status
.outlet_routes
.into_iter()
.map(|r| r.to_string())
.collect(),
target_redundancy: status.target_redundancy,
to: status.outlet_addresses,
})
}
}
Loading
Loading