Add round-robin load balancing#30
Conversation
|
I will review it tomorrow, but which tests should this pass? Are that tests from the other PR? |
There was a problem hiding this comment.
Pull request overview
This PR adds the first pass of client-side load balancing for Alternator by introducing routing scopes, live-node discovery, and a per-request query plan that selects nodes in round-robin order. It extends the client/configuration surface so requests can target cluster-, datacenter-, or rack-level subsets and refresh the node list in the background.
Changes:
- Add
RoutingScope,LiveNodes, andQueryPlanto represent routing targets, refresh live nodes via/localnodes, and select nodes round-robin per request. - Wire the new routing behavior into client construction and request interception so selected node URIs can override the outgoing request target.
- Extend configuration/builders and dependencies to support seed hosts, scheme/port, refresh intervals, and URL/state management.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
src/routing_scope.rs |
Adds the public routing-scope model and fallback-chain helpers. |
src/query_plan.rs |
Adds per-request round-robin node selection state stored in the Smithy config bag. |
src/live_nodes.rs |
Implements background discovery/polling of live Alternator nodes via /localnodes. |
src/lib.rs |
Exposes the new routing module and internal discovery/query-plan modules. |
src/interceptors.rs |
Injects query-plan creation and rewrites request URIs to selected nodes. |
src/config.rs |
Adds builder/config fields for discovery intervals, routing scope, scheme, port, and seed hosts; also derives discovery settings from endpoint_url. |
src/client.rs |
Instantiates LiveNodes, adds the round-robin interceptor, and starts the background task. |
Cargo.toml |
Adds runtime/support dependencies for Tokio, URL handling, arc-swap, rand, and new dev dependency entries. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let client = reqwest::Client::builder() | ||
| .timeout(Duration::from_secs(5)) | ||
| .connect_timeout(Duration::from_secs(2)) | ||
| .build() | ||
| .ok()?; |
| pub fn set_active_interval(&mut self, active_interval: u64) -> &mut Self { | ||
| self.alternator_ext.active_interval = Some(active_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn idle_interval(mut self, idle_interval: u64) -> Self { | ||
| self.set_idle_interval(idle_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_idle_interval(&mut self, idle_interval: u64) -> &mut Self { | ||
| self.alternator_ext.idle_interval = Some(idle_interval); |
| pub fn active_interval(mut self, active_interval: u64) -> Self { | ||
| self.set_active_interval(active_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_active_interval(&mut self, active_interval: u64) -> &mut Self { | ||
| self.alternator_ext.active_interval = Some(active_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn idle_interval(mut self, idle_interval: u64) -> Self { | ||
| self.set_idle_interval(idle_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_idle_interval(&mut self, idle_interval: u64) -> &mut Self { | ||
| self.alternator_ext.idle_interval = Some(idle_interval); | ||
| self | ||
| } | ||
|
|
||
| pub fn routing_scope(mut self, routing_scope: crate::routing_scope::RoutingScope) -> Self { | ||
| self.set_routing_scope(routing_scope); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_routing_scope( | ||
| &mut self, | ||
| routing_scope: crate::routing_scope::RoutingScope, | ||
| ) -> &mut Self { | ||
| self.alternator_ext.routing_scope = Some(routing_scope); | ||
| self | ||
| } | ||
|
|
||
| pub fn scheme(mut self, scheme: impl Into<String>) -> Self { | ||
| self.set_scheme(scheme); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_scheme(&mut self, scheme: impl Into<String>) -> &mut Self { | ||
| self.alternator_ext.scheme = Some(scheme.into()); | ||
| self | ||
| } | ||
|
|
||
| pub fn port(mut self, port: u16) -> Self { | ||
| self.set_port(port); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_port(&mut self, port: u16) -> &mut Self { | ||
| self.alternator_ext.port = Some(port); | ||
| self | ||
| } | ||
|
|
||
| pub fn seed_hosts<I, S>(mut self, seed_hosts: I) -> Self | ||
| where | ||
| I: IntoIterator<Item = S>, | ||
| S: Into<String>, | ||
| { | ||
| self.set_seed_hosts(seed_hosts.into_iter().map(Into::into).collect()); | ||
| self | ||
| } | ||
|
|
||
| pub fn set_seed_hosts(&mut self, seed_hosts: Vec<String>) -> &mut Self { | ||
| self.alternator_ext.seed_hosts = Some(seed_hosts); | ||
| self |
| // Take the next node from the query plan and override the request URI. | ||
| if let Some(query_plan) = cfg.interceptor_state().load::<QueryPlan>() | ||
| && let Some(next_node) = query_plan.next_node() | ||
| { | ||
| let _ = context.request_mut().set_uri(next_node.to_string()); |
| && let Some(host) = url.host_str() | ||
| { | ||
| self.set_seed_hosts(vec![host.to_string()]); |
| let live_nodes = LiveNodes::new(&config); | ||
| if let Some(nodes) = &live_nodes { | ||
| dynamodb_config = | ||
| dynamodb_config.interceptor(RoundRobinQueryPlanInterceptor::new(nodes.clone())); | ||
| } |
There was a problem hiding this comment.
They are in #27, they should soon be merged.
| pub fn set_endpoint_url(&mut self, endpoint_url: Option<String>) -> &mut Self { | ||
| if let Some(url_str) = endpoint_url.as_deref() | ||
| && let Ok(url) = url::Url::parse(url_str) | ||
| && let Some(host) = url.host_str() | ||
| { | ||
| self.set_seed_hosts(vec![host.to_string()]); | ||
| self.set_scheme(format!("{}://", url.scheme())); | ||
| if let Some(port) = url.port() { | ||
| self.set_port(port); | ||
| } | ||
| } | ||
| self.dynamodb_builder.set_endpoint_url(endpoint_url); |
| hyper = { version = "1.8", features = ["client", "server", "http1"] } | ||
| http = "1.0" | ||
| http-body-util = "0.1" | ||
| ctor = "0.10.0" |
| if let Some(nodes) = live_nodes { | ||
| nodes.start(); |
Yes, but we actually can't run them right now because they both need routing scope from this PR. Maybe I could move adding routing scope into a separate PR. |
14772d6 to
0d31976
Compare
m-szymon
left a comment
There was a problem hiding this comment.
In general seems a good direction, but address the comments.
As we discussed I would like to see tests passing.
| } | ||
|
|
||
| /// Sets the URI scheme (http or https). | ||
| pub fn set_scheme(&mut self, scheme: impl Into<String>) -> &mut Self { |
There was a problem hiding this comment.
But it requires "http://" not just "http", right?
There was a problem hiding this comment.
Yes, I'm changing it so that it will accept different variants like "http" or "http:".
| Ok(()) | ||
| } | ||
|
|
||
| fn modify_before_signing( |
There was a problem hiding this comment.
It is nowhere explained why we need two hooks. What is the benefit of adding [QueryPlan] to the config bag before request serialization and using it later?
There was a problem hiding this comment.
This allows us to track the already attempted nodes. modify_before_serialization is triggered exactly once per request, while modify_before_signing launches before every retry.
| } | ||
| } | ||
|
|
||
| impl Intercept for RoundRobinQueryPlanInterceptor { |
There was a problem hiding this comment.
Why do we need two interceptors?
Or maybe why can't this interceptor implement modify_before_signing?
There was a problem hiding this comment.
I did it this way, because the hook in modify_before_signing is exactly the same across all load balancing strategies, while the modify_before_signing is different, so this allows us to reuse code. Also now all the optimizations are in the same place.
| .live_nodes | ||
| .get_live_nodes_round_robin() | ||
| .into_iter() | ||
| .find(|n| !used_nodes.contains(n))?; |
There was a problem hiding this comment.
What happens if retry number is higher then number of nodes?
There was a problem hiding this comment.
Then the original request is kept.
|
|
||
| /// Returns the current list of live nodes starting with the next node in round-robin order. | ||
| /// Used by [`crate::QueryPlan`] round-robin strategy. | ||
| pub fn get_live_nodes_round_robin(&self) -> Vec<Url> { |
There was a problem hiding this comment.
I think it is quite heavy operation and it seems we call it quite often in hot path.
In general there seems to be a lot of allocations involved with all the rotation, parsing, rebuilding of urls.
Maybe it is premature optimization, but take a look on that aspect.
There was a problem hiding this comment.
I've moved selecting the node to live_nodes.rs, removed the rotation, and changed Url to Arc<Url> everywhere, so urls are allocated exactly once.
m-szymon
left a comment
There was a problem hiding this comment.
In general looks good.
But I am concerned if previous http tests are still correct with load balancing?
| let scope = scope_utils::datacenter_scope_from_index(cluster, 1); | ||
| let client = create_client_with_scope(cluster, scope.clone()); | ||
|
|
||
| tokio::time::sleep(ACTIVE_INTERVAL).await; |
There was a problem hiding this comment.
Using sleep is on one hand slow and on the other not guarantees that live node update completed.
Polling (with timeout) would be better.
I guess we don't have API to extract list from driver? Then maybe w should simply loop and observe when load balancing starts. Only then we do exact test.
| //! The proxy is expected to live no longer than the server. | ||
| //! When the server is closed, the future finishes. | ||
| //! The proxy can accept many clients during its lifetime, but only one at a time. | ||
| //! The proxy can accept many clients during its lifetime, up to 2 at a time. |
There was a problem hiding this comment.
How that influences https tests? Are they still correct? @wkkasztan
BTW how that earlier tests work with load balancing?
There was a problem hiding this comment.
We discussed that, those tests didn't rely on that property.
They all pass, from what I've seen, I don't now if load balancing affects them.
There was a problem hiding this comment.
But I suspect now there might be race. Those tests are short and quick so it is likely to pass. But is the "livenodes" connection prevented to connecting to proxy that doesn't expect this? Maybe those tests should disable load balancing?
Tests that combine the optimizations is something that I have seen is missing, but we can add it later.
There was a problem hiding this comment.
Currently every test in http_content/ spawns its own proxy. These tests only use one connection at a time. What happens with load balancing is, for example in correct_line.rs, it sends GET request, the test proxy opens a connection, but fails an assert, the task handling this request panics and live_nodes.rs gets connection closed before message completed. The process repeats every time service discovery is attempted. Because of that, there shouldn't be any races and the load balancing does not interfere with http content tests.
While this works, it certainly isn't pretty. I don't think we want to give user an option to disable load balancing, the other clients don't have that, but we could do some workaround for the tests purposes. One way to disable load balancing would be to pass an invalid configuration, for example to add .seed_hosts(Vec::<String>::new()) or .scheme("fake_scheme") to every client creation in http_content/.
There was a problem hiding this comment.
fails an assert, the task handling this request panics
And at this point entire test should fail, right @wkkasztan?
Now it doesn't because we have delay in livenodes.
There was a problem hiding this comment.
The test doesn't fail, because the proxy itself does not panic, only the background task handling the livenodes request does and livenodes handles it. The test is basically watching only its own requests and if anything goes wrong with them, the test fails on the .await.unwrap() on these requests to the proxy. That means, the tests don't see any errors and tests pass, but @wkkasztan could verify that.
There was a problem hiding this comment.
Im not sure how load balancing is implemented, but from what i can say:
-
Each test spawns it's own proxy, for its own purposes.
-
In each HTTP test, proxy parses and asserts all received requests. Thus it expects only messages caused by client operation calls. If an unrelated internal process sends a message to the proxy - it may or may not unexpectedly fail.
If this is the case - one neat solution here is to edit http_test.rs AsyncTestContext::setup() function. Before the on_request function forwards given (request, sender) to it's inner swappable, we can discriminate messages and ignore the unrelated ones. That way HTTP test coder is freed from including load balancing messages inside his test's proxy.
Though this assumes, we know the format of all load-balancing messages, and that it won't change in the future.
Other way i see, is to somehow configurate the client to ensure connection persistance + limit proxy connection pool to one, for HTTP tests.
- Tests expect that each and every operation request travels through the proxy, which is bound to the address specified on client construction. If the client is allowed to skip the proxy, the test may unexpectedly succeed, which is dangerous.
The only reasonable solution here, which i see is to enforce the client to actually use the proxy.
We could also rewrite HTTP tests so that they use a custom HttpClient instead of the proxy, as we haven't used it in our optimizations. Though someone might do it in the future, so I'd stick with the proxy.
There was a problem hiding this comment.
@lukaszg22 I think we should disable second connection it these tests with .seed_hosts(Vec::<String>::new()).
| /// Set the live nodes for the client. | ||
| /// | ||
| /// This is used by the client to keep track of the currently known live nodes in the cluster, | ||
| /// which is updated at runtime by the client itself. | ||
| /// | ||
| /// This field is set automatically in [`AlternatorClient::from_conf`], where a new [`LiveNodes`] instance is constructed | ||
| /// based on the provided config. | ||
| /// | ||
| /// Setting this explicitly is useful when you have multiple clients running with the | ||
| /// same load balancing settings. By default, each client spawns its own background | ||
| /// discovery task. To avoid redundant tasks doing the exact same work, you can | ||
| /// construct [`LiveNodes`] once yourself, and share it across multiple clients, or | ||
| /// use [`AlternatorClient::from_conf_with_live_nodes`] to construct them, to achieve the same result. | ||
| /// | ||
| /// For more information, see [`LiveNodes`]. |
There was a problem hiding this comment.
I find this description a little chaotic.
Does that mean that background task is owned by LiveNodes object and we can share the task, by sharing object, right?
Start with it - why we may want to use that API (and say it is optional). Only then explain how it works.
And have a test for it - it will show how to use it.
There was a problem hiding this comment.
And add some info in commit message.
There was a problem hiding this comment.
Exactly. I changed it and added a test in config.rs,
| let client = AlternatorClient::from_conf( | ||
| AlternatorConfig::builder() | ||
| .endpoint_url(format!("http://{}", ctx.get_proxy_address())) | ||
| // A workaround to disable load balancing, so that the GETs from the discovery do not interfere with the test logic. |
There was a problem hiding this comment.
You added this comment in one place. If it is necessary, then it should be everywhere.
But I think it is not - maybe better add some info in the file comment.
And commit message needs info why we do it, not necessary how we do it.
There was a problem hiding this comment.
I switched to a single note in http_test.rs.
| ) | ||
| } | ||
|
|
||
| // Poll until the client's live nodes match the given IPs, or timeout. |
There was a problem hiding this comment.
This can be, as is, separate commit, but add it some commit message, with explanation why we change it.
Or squash it with "Add load balancing tests".
| /// .build(); | ||
| /// | ||
| /// let client = AlternatorClient::from_conf(config); | ||
| /// ``` |
There was a problem hiding this comment.
Don't remove this - it is closing of comment
There was a problem hiding this comment.
But this is updated in #34, should it be here as well?
There was a problem hiding this comment.
It doesn't say anything about enabling load balancing. Sure, there will be conflict but it should appear.
There was a problem hiding this comment.
Oh, ok, for some reason I thought you were talking about the request compression. I will update it.
Proxies on nodes must handle 2 connections at a time if service discovery is running - 1 for main client and one for discovery client.
This prevents background GET requests from the node discovery mechanism from interfering with the HTTP content tests logic.
This allows users to share a `LiveNodes` instance across multiple clients. This can be used to avoid redundant work - if multiple clients use the same load balancing settings, their background discovery tasks would be doing identical work.
Previously, the tests used fixed sleeps to wait for the client to refresh its live nodes list, which was both slow and did not guarantee that the background update had actually finished. Using polling makes the tests faster and more reliable.
m-szymon
left a comment
There was a problem hiding this comment.
Why is it still a draft?
Now that all the load balancing PRs (scylladb#30, scylladb#31, and scylladb#32) are merged, those are no longer needed.
Introduces load balancing with round-robin strategy.
live_nodes.rs- a background task for service discovery that refreshes known nodes list and falls back to further scopes if needed,query_plan.rs- object put in a config bag before request is made, chooses which node will the request be sent to.Closes #18