-
-
Notifications
You must be signed in to change notification settings - Fork 559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rust): added support for multiple concurrent routes to tcp inlets #8832
base: develop
Are you sure you want to change the base?
Conversation
1a7d9ea
to
1087b99
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
47b7701
to
57004e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for multiple concurrent routes to TCP inlets, enabling the use of multiple MultiAddr values along with new parameters for target redundancy and ping timeouts. Key changes include updating CLI parameters and models, modifying repository and registry types to handle multiple addresses, and adjusting connection logic and API endpoints accordingly.
- Updated access control, CLI, and storage modules to support multiple outlet addresses.
- Refactored InletStatus to InletStatusView with active route tracking.
- Modified connection instantiation and secure channel logic, along with minor logging and debug improvements.
Reviewed Changes
Copilot reviewed 53 out of 53 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/access_control.rs | Introduces optional authority handling using original MultiAddr. |
implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository_sql.rs | Adjusts binding of outlet addresses by joining them. |
implementations/rust/ockam/ockam_api/src/control_api/backend/inlet.rs | Adds validation for empty to and manages multiaddresses properly. |
implementations/rust/ockam/ockam_api/src/nodes/registry.rs | Renames InletInfo to TcpInletHandle and updates related fields. |
implementations/rust/ockam/ockam_api/src/nodes/service/in_memory_node.rs | Updates stop method to gracefully terminate inlets and relays. |
implementations/rust/ockam/ockam_api/src/control_api/protocol/inlet.rs | Transitions to InletStatusView with active_routes and target redundancy. |
implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs | Updates CreateInlet and InletStatus (now InletStatusView) models to handle multiple routes. |
implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs | Revises multiaddr matching logic in ConnectionBuilder. |
implementations/rust/ockam/ockam_api/src/influxdb/portal.rs | Adjusts outlet address handling for InfluxDB portals. |
implementations/rust/ockam/ockam_api/src/nodes/connection/plain_tcp.rs, plain_udp.rs | Adds Debug derives for instantiators. |
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs | Updates payload creation to support multiple outlet addresses. |
implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs | Changes outlet address handling in Kafka services. |
implementations/rust/ockam/ockam_api/src/nodes/connection/project.rs | Adds Debug derive for project instantiator. |
implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs | Updates Session::create invocation to include new parameters. |
implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs | Adapts TcpInlet definition to manage a list of outlet addresses. |
implementations/rust/ockam/ockam_api/src/nodes/connection/secure.rs | Enhances debug logging for secure channel instantiator. |
implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs | Updates outlet address handling for Kafka inlet payloads. |
implementations/rust/ockam/ockam_api/src/nodes/models/node.rs | Reflects inlet changes using the new InletStatusView. |
implementations/rust/ockam/ockam_api/src/cli_state/tcp_portals.rs | Adjusts CLI state to persist TcpInlet with multiple outlet addresses. |
4c380e1
to
418cf46
Compare
implementations/rust/ockam/ockam_api/src/cli_state/storage/tcp_portals_repository.rs
Outdated
Show resolved
Hide resolved
@@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 { | |||
20000 | |||
} | |||
|
|||
fn ping_timeout_default() -> u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a Duration
instead? At least I'd include the units I the function name. If it's because of some API arguments constraints the duration parser we use in the command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I took a more "classical" API approach, where the API is low-level as possible, and everything else can be added on top via libraries.
Given the last meeting, I probably took the wrong approach and we should lift the API level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duration
is a part of core
so I don't mind using it everywhere where applicable
implementations/rust/ockam/ockam_api/src/control_api/protocol/inlet.rs
Outdated
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Outdated
Show resolved
Hide resolved
implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_shared_state.rs
Show resolved
Hide resolved
implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_shared_state.rs
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer/mod.rs
Outdated
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer/mod.rs
Outdated
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer/mod.rs
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer/selector.rs
Show resolved
Hide resolved
implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer/mod.rs
Show resolved
Hide resolved
418cf46
to
bcd6803
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a big PR, could only consume a part of it so far. Submitting what I have
alias: String, | ||
privileged: bool, | ||
} | ||
|
||
impl TcpInlet { | ||
pub fn new( | ||
bind_addr: &SocketAddr, | ||
outlet_addr: &MultiAddr, | ||
outlet_addresses: &[MultiAddr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Let's make it a Vector here
} | ||
|
||
pub fn outlet_addr(&self) -> MultiAddr { | ||
self.outlet_addr.clone() | ||
pub fn outlet_addresses(&self) -> &Vec<MultiAddr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: I would return a slice here, unless a caller of that function rely on the fact that it's a Vec (I can't think of such case)
} | ||
|
||
pub fn alias(&self) -> String { | ||
self.alias.clone() | ||
pub fn alias(&self) -> &str { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
.outlet_addresses() | ||
.iter() | ||
.map(|x| x.to_string()) | ||
.join("//"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: A small comment would be great, but overall was easy to understand
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> { | ||
let mut multiaddresses = Vec::new(); | ||
|
||
for addr in self.outlet_addr.split("//") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could that be shrank into self.outlet_addr.split("//").map()
? Or is error handling ruining it?
@@ -88,11 +79,6 @@ impl Processor for InternalProcessor { | |||
raw_socket_read_result.ipv4_info.source_ip(), | |||
raw_socket_read_result.tcp_info.source_port(), | |||
); | |||
|
|||
if connection.their_identifier != inlet_shared_state.their_identifier() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you removing that check just because it's checked in a different place, or some other reason?
|
||
let mut session = | ||
Session::create(ctx, replacer, additional_session_options, ping_timeout)?; | ||
session.start_monitoring()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to start with a first session instead?
} | ||
// let's wait until something changes and try again | ||
debug!("No active route found for the inlet, waiting for a connection"); | ||
self.notify.notified().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for both regular and privileged portals choose_active_route
would better return an error instead of waiting for a connection. WDYT?
@@ -122,15 +110,15 @@ impl Processor for InternalProcessor { | |||
|
|||
let portal_packet = OckamPortalPacket::from_tcp_packet( | |||
connection.connection_identifier.clone(), | |||
inlet_shared_state.route_index(), | |||
connection.inlet_route_state.route_index(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses locking internally and locks again to get the route ~10 lines below that. That's a small performance penalty, but also brings a possibility of a data race.
Ok(()) | ||
} | ||
|
||
#[allow(non_snake_case)] | ||
#[ockam_macros::test(timeout = 5000)] | ||
#[ockam_macros::test(timeout = 5000_0000)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably left from debugging
This PR adds the possibility of multiple MultiAddr to the inlets.
There are few new parameters:
--ping-timeout
, this is a proxy for ping interval and gets divided by the max. amount of failed pings--target-redundancy
, the amount of redundant routes instantiated, 0 means only one route will be instantiated, can still be specified when a single MultiAddr since dns could resolve into multiple IP addresses. By default it inherits the number of--to
parameters.--to
can now be passed multiple timesA single MultiAddr can turn into multiple variants:
/project/default/...
=>/dnsaddr/xx.projects.orchestrator.io/...
=> [/ip4/1.1.1.1/...
,/ip4/2.2.2.2/...
]A session is created for each route (target redundancy+1) , every time it reconnects it selects a MultiAddr from the selector, making sure every MultiAddr variant gets used(unless redundancy+1 > available variants).
When DNS fails (or return empty) and no other MultiAddr is available, the session will retry every 15 seconds.
The inlet port will be bind upon creation, when a client connection reaches the inlet, a random route will be picked from the available ones, if no route is available it'll wait until one is available.