Skip to content

Commit c727b19

Browse files
committed
WIP unsuccessful attempt upgrading Axum in CARL's lib.rs.
A relevant example in Axum has not yet been updated, so might not yet be possible: https://github.com/tokio-rs/axum/blob/main/examples/rest-grpc-multiplex/src/main.rs Used alternative example: tokio-rs/axum#2736 (comment) But lifetime issues prevent it from compiling.
1 parent ec065ba commit c727b19

File tree

17 files changed

+472
-460
lines changed

17 files changed

+472
-460
lines changed

Cargo.lock

+196-258
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+16-11
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ anyhow = "1.0.79"
4949
assert_fs = "1.1.1"
5050
async-trait = "0.1.77"
5151
axum = "0.7.5"
52-
axum-server = "0.6.0"
53-
axum-server-dual-protocol = "0.6.0"
52+
axum-server = "0.7.1"
53+
axum-server-dual-protocol = "0.7.0"
5454
backon = { version = "1.2.0", default-features = false }
5555
base64 = "0.22.1"
5656
brotli = "7.0.0"
@@ -77,11 +77,15 @@ fs-err = "3.0.0"
7777
fs_extra = "1.3.0"
7878
futures = "0.3.30"
7979
glob = "0.3.1"
80-
gloo-net = { version = "0.5.0" }
80+
gloo-net = { version = "0.6.0" }
8181
gloo-timers = { version = "0.3.0" }
8282
googletest = { version = "0.12.0" }
8383
home = "0.5.5"
8484
http = "1.1.0"
85+
http-body = "1.0.1"
86+
http-body-util = "0.1.2"
87+
hyper = "1.4.1"
88+
hyper-util = "0.1.6"
8589
indicatif = "0.17.7"
8690
indoc = "2.0.4"
8791
jsonwebtoken = "9.2.0"
@@ -97,13 +101,14 @@ nix = "0.29.0"
97101
oauth2 = { version = "5.0.0-alpha.4", default-features = false }
98102
openidconnect = { version = "4.0.0-alpha.2", default-features = false }
99103
openssl-sys = { version = "0.9.102", features = ["vendored"] }
100-
opentelemetry = "0.23.0"
101-
opentelemetry-appender-tracing = "0.4.0"
102-
opentelemetry-otlp = "0.16.0"
103-
opentelemetry_sdk = "0.23.0"
104-
opentelemetry-semantic-conventions = "0.15.0"
104+
opentelemetry = "0.24.0"
105+
opentelemetry-appender-tracing = "0.5.0"
106+
opentelemetry-otlp = "0.17.0"
107+
opentelemetry_sdk = "0.24.1"
108+
opentelemetry-semantic-conventions = "0.16.0"
105109
pem = { version = "3.0.3", features = ["serde"] }
106110
phf = { version = "0.11", features = ["macros"] }
111+
pin-project = "1.1.7"
107112
ping-rs = { version = "0.1.2" }
108113
pq-sys = { version = "0.6.1", features = ["bundled"] }
109114
predicates = "3.0.4"
@@ -142,12 +147,12 @@ toml_edit = "0.22.15"
142147
tonic = { version = "0.12.0", default-features = false }
143148
tonic-build = { version = "0.12.0", default-features = false }
144149
tonic-web = "0.12.0"
145-
tonic-web-wasm-client = { version = "0.5.1" }
146-
tonic-async-interceptor = { version = "0.11.0" }
150+
tonic-web-wasm-client = { version = "0.6.0" }
151+
tonic-async-interceptor = { version = "0.12.0" }
147152
tower = "0.4.13"
148153
tower-http = { version = "0.5.2", features = ["cors", "fs"] }
149154
tracing = { version = "0.1.40" }
150-
tracing-opentelemetry = "0.24.0"
155+
tracing-opentelemetry = "0.25.0"
151156
tracing-subscriber = { version = "0.3.18", default-features = false }
152157
tracing-web = { version = "0.1.3" }
153158
url = "2.5.0"

opendut-carl/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@ flate2 = { workspace = true }
2828
futures = { workspace = true }
2929
googletest = { workspace = true }
3030
http = { workspace = true }
31+
http-body = { workspace = true }
32+
http-body-util = { workspace = true }
33+
hyper = { workspace = true }
3134
indoc = { workspace = true }
3235
jsonwebtoken = { workspace = true}
3336
openidconnect = { workspace = true }
3437
openssl-sys = { workspace = true }
3538
opentelemetry = { workspace = true }
3639
opentelemetry_sdk = { workspace = true }
3740
pem = { workspace = true, features = ["serde"]}
41+
pin-project = { workspace = true }
3842
pq-sys = { workspace = true }
3943
reqwest = { workspace = true, features = ["json"] }
4044
serde = { workspace = true, features = ["derive"] }

opendut-carl/src/http/router/cleo.rs

+11-24
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,39 @@
1-
use axum::body::StreamBody;
1+
use axum::body::Body;
22
use axum::extract::{Path, State};
33
use axum::response::IntoResponse;
4-
use axum_server_dual_protocol::tokio_util::io::ReaderStream;
5-
use http::{header, StatusCode};
6-
use crate::{CarlInstallDirectory};
4+
use http::Request;
5+
use tower_http::services::ServeFile;
76

7+
use crate::CarlInstallDirectory;
88
use crate::util::{CLEO_IDENTIFIER, CleoArch};
99

1010
pub async fn download_cleo(
1111
Path(architecture): Path<CleoArch>,
1212
State(carl_install_directory): State<CarlInstallDirectory>,
1313
) -> impl IntoResponse {
1414
let cleo_file_name = format!("{}-{}.tar.gz", &architecture.distribution_name(), crate::app_info::CRATE_VERSION);
15-
let cleo_dir = carl_install_directory.path.join(CLEO_IDENTIFIER).join(&cleo_file_name);
15+
let cleo_file_path = carl_install_directory.path.join(CLEO_IDENTIFIER).join(&cleo_file_name);
1616

17-
let file = match tokio::fs::File::open(cleo_dir).await {
18-
Ok(file) => { file }
19-
Err(_) => { return StatusCode::NOT_FOUND.into_response(); }
20-
};
21-
22-
let stream = ReaderStream::new(file);
23-
let body = StreamBody::new(stream);
24-
let content_disposition = format!("attachment; filename=\"{}\"", cleo_file_name);
25-
let headers = [
26-
(header::CONTENT_TYPE, "application/gzip"),
27-
(
28-
header::CONTENT_DISPOSITION,
29-
content_disposition.as_str(),
30-
),
31-
];
32-
(headers, body).into_response()
17+
let request = Request::new(Body::empty());
18+
ServeFile::new(cleo_file_path).try_call(request).await.unwrap()
3319
}
3420

3521
#[cfg(test)]
3622
mod test {
3723
use std::fs;
3824
use std::fs::File;
25+
3926
use assert_fs::fixture::PathChild;
4027
use assert_fs::TempDir;
4128
use axum::extract::{Path, State};
4229
use axum::response::IntoResponse;
4330
use googletest::assert_that;
4431
use googletest::matchers::eq;
4532
use http::header;
46-
use crate::CarlInstallDirectory;
4733

48-
use crate::util::{CLEO_IDENTIFIER, CleoArch};
34+
use crate::CarlInstallDirectory;
4935
use crate::router::cleo::download_cleo;
36+
use crate::util::{CLEO_IDENTIFIER, CleoArch};
5037

5138
#[tokio::test()]
5239
async fn download_cleo_succeeds() -> anyhow::Result<()> {
@@ -70,4 +57,4 @@ mod test {
7057

7158
Ok(())
7259
}
73-
}
60+
}

opendut-carl/src/http/router/edgar.rs

+7-21
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use axum::body::StreamBody;
1+
use axum::body::{Body};
22
use axum::extract::{Path, State};
33
use axum::response::IntoResponse;
4-
use axum_server_dual_protocol::tokio_util::io::ReaderStream;
5-
use http::{header, StatusCode};
4+
use http::Request;
5+
use tower_http::services::ServeFile;
66
use crate::http::state::CarlInstallDirectory;
77
use crate::util::{EDGAR_IDENTIFIER, EdgarArch};
88

@@ -12,24 +12,10 @@ pub async fn download_edgar(
1212
) -> impl IntoResponse {
1313

1414
let file_name = format!("{}-{}.tar.gz", &architecture.distribution_name(), crate::app_info::CRATE_VERSION);
15-
let edgar_dir = carl_install_directory.path.join(EDGAR_IDENTIFIER).join(&file_name);
15+
let edgar_path = carl_install_directory.path.join(EDGAR_IDENTIFIER).join(&file_name);
1616

17-
let file = match tokio::fs::File::open(edgar_dir).await {
18-
Ok(file) => { file }
19-
Err(_) => { return StatusCode::NOT_FOUND.into_response(); }
20-
};
21-
22-
let stream = ReaderStream::new(file);
23-
let body = StreamBody::new(stream);
24-
let content_disposition = format!("attachment; filename=\"{}\"", file_name);
25-
let headers = [
26-
(header::CONTENT_TYPE, "application/gzip"),
27-
(
28-
header::CONTENT_DISPOSITION,
29-
content_disposition.as_str(),
30-
),
31-
];
32-
(headers, body).into_response()
17+
let request = Request::new(Body::empty());
18+
ServeFile::new(edgar_path).try_call(request).await.unwrap()
3319
}
3420

3521
#[cfg(test)]
@@ -70,4 +56,4 @@ mod test {
7056

7157
Ok(())
7258
}
73-
}
59+
}

opendut-carl/src/lib.rs

+42-103
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,15 @@ use std::sync::Arc;
77
use anyhow::{anyhow, Context};
88
use axum::routing::get;
99
use axum_server::tls_rustls::RustlsConfig;
10-
use axum_server_dual_protocol::ServerExt;
11-
use futures::future::BoxFuture;
12-
use futures::TryFutureExt;
13-
use ::http::{header::CONTENT_TYPE, Request};
1410
use pem::Pem;
15-
use tonic::transport::Server;
1611
use tonic_async_interceptor::async_interceptor;
17-
use tower::{make::Shared, steer::Steer, BoxError, ServiceExt};
12+
use tower::ServiceExt;
1813
use tower_http::services::{ServeDir, ServeFile};
19-
use tracing::{debug, info, warn};
14+
use tracing::{debug, info};
2015
use uuid::Uuid;
2116

2217
use opendut_auth::confidential::pem::PemFromConfig;
23-
use opendut_auth::registration::client::{RegistrationClient, RegistrationClientRef};
18+
use opendut_auth::registration::client::RegistrationClient;
2419
use opendut_auth::registration::resources::ResourceHomeUrl;
2520
use opendut_util::settings::LoadedConfig;
2621
use opendut_util::telemetry::logging::LoggingConfig;
@@ -30,15 +25,15 @@ use util::in_memory_cache::CustomInMemoryCache;
3025

3126
use crate::auth::grpc_auth_layer::GrpcAuthenticationLayer;
3227
use crate::auth::json_web_key::JwkCacheValue;
33-
use crate::cluster::manager::{ClusterManager, ClusterManagerOptions, ClusterManagerRef};
28+
use crate::cluster::manager::{ClusterManager, ClusterManagerOptions};
3429
use crate::grpc::{ClusterManagerFacade, MetadataProviderFacade, PeerManagerFacade, PeerMessagingBrokerFacade};
3530
use crate::http::router;
3631
use crate::http::state::{CarlInstallDirectory, HttpState, LeaConfig, LeaIdentityProviderConfig};
37-
use crate::peer::broker::{PeerMessagingBroker, PeerMessagingBrokerOptions, PeerMessagingBrokerRef};
32+
use crate::multiplex_service::GrpcMultiplexLayer;
33+
use crate::peer::broker::{PeerMessagingBroker, PeerMessagingBrokerOptions};
3834
use crate::provisioning::cleo_script::CleoScript;
39-
use crate::resources::manager::{ResourcesManager, ResourcesManagerRef};
35+
use crate::resources::manager::ResourcesManager;
4036
use crate::resources::storage::PersistenceOptions;
41-
use crate::vpn::Vpn;
4237

4338
pub mod grpc;
4439
pub mod util;
@@ -55,6 +50,7 @@ mod vpn;
5550
mod http;
5651
mod provisioning;
5752
mod auth;
53+
mod multiplex_service;
5854

5955
#[tracing::instrument]
6056
pub async fn create_with_telemetry(settings_override: config::Config) -> anyhow::Result<()> {
@@ -148,40 +144,10 @@ pub async fn create(settings: LoadedConfig) -> anyhow::Result<()> {
148144
}
149145
};
150146

151-
info!("Server listening at {address}...");
152-
spawn_server(
153-
address,
154-
tls_config,
155-
resources_manager,
156-
cluster_manager,
157-
peer_messaging_broker,
158-
vpn,
159-
carl_url,
160-
settings.config,
161-
ca_certificate,
162-
oidc_registration_client,
163-
grpc_auth_layer,
164-
).await.unwrap();
165-
166-
Ok(())
167-
}
147+
//TODO remove
148+
let ca = ca_certificate;
149+
let settings = settings.config;
168150

169-
/// Isolation in function returning BoxFuture needed due to this: https://github.com/rust-lang/rust/issues/102211#issuecomment-1397600424
170-
#[allow(clippy::too_many_arguments)]
171-
#[tracing::instrument(skip_all, level="TRACE")]
172-
fn spawn_server(
173-
address: SocketAddr,
174-
tls_config: TlsConfig,
175-
resources_manager: ResourcesManagerRef,
176-
cluster_manager: ClusterManagerRef,
177-
peer_messaging_broker: PeerMessagingBrokerRef,
178-
vpn: Vpn,
179-
carl_url: ResourceHomeUrl,
180-
settings: config::Config,
181-
ca: Pem,
182-
oidc_registration_client: Option<RegistrationClientRef>,
183-
grpc_auth_layer: GrpcAuthenticationLayer,
184-
) -> BoxFuture<'static, anyhow::Result<()>> {
185151
let oidc_enabled = settings.get_bool("network.oidc.enabled").unwrap_or(false);
186152

187153
let cluster_manager_facade = ClusterManagerFacade::new(Arc::clone(&cluster_manager), Arc::clone(&resources_manager));
@@ -196,19 +162,6 @@ fn spawn_server(
196162
);
197163
let peer_messaging_broker_facade = PeerMessagingBrokerFacade::new(Arc::clone(&peer_messaging_broker));
198164

199-
let grpc = Server::builder()
200-
.layer(async_interceptor(move |request| {
201-
Clone::clone(&grpc_auth_layer).auth_interceptor(request)
202-
}))
203-
.accept_http1(true) //gRPC-web uses HTTP1
204-
.add_service(cluster_manager_facade.into_grpc_service())
205-
.add_service(metadata_provider_facade.into_grpc_service())
206-
.add_service(peer_manager_facade.into_grpc_service())
207-
.add_service(peer_messaging_broker_facade.into_grpc_service())
208-
.into_service()
209-
.map_response(|response| response.map(axum::body::boxed))
210-
.boxed_clone();
211-
212165
let lea_dir = project::make_path_absolute(settings.get_string("serve.ui.directory")
213166
.expect("Failed to find configuration for `serve.ui.directory`."))
214167
.expect("Failure while making path absolute.");
@@ -258,52 +211,38 @@ fn spawn_server(
258211
}
259212

260213
let http = axum::Router::new()
261-
.fallback_service(
262-
axum::Router::new()
263-
.nest_service(
264-
"/api/licenses",
265-
ServeDir::new(&licenses_dir)
266-
.fallback(ServeFile::new(licenses_dir.join("index.json")))
267-
)
268-
.route("/api/cleo/:architecture/download", get(router::cleo::download_cleo))
269-
.route("/api/edgar/:architecture/download", get(router::edgar::download_edgar))
270-
.route("/api/lea/config", get(router::lea_config))
271-
.nest_service(
272-
"/",
273-
ServeDir::new(&lea_dir)
274-
.fallback(ServeFile::new(lea_index_html))
275-
)
276-
.with_state(app_state)
214+
.nest_service(
215+
"/api/licenses",
216+
ServeDir::new(&licenses_dir)
217+
.fallback(ServeFile::new(licenses_dir.join("index.json")))
277218
)
278-
.map_err(BoxError::from)
279-
.boxed_clone();
280-
281-
let http_grpc = Steer::new(vec![grpc, http], |request: &Request<_>, _services: &[_]| {
282-
request.headers()
283-
.get(CONTENT_TYPE)
284-
.map(|content_type| {
285-
let content_type = content_type.as_bytes();
286-
287-
if content_type.starts_with(b"application/grpc") {
288-
0
289-
} else {
290-
1
291-
}
292-
}).unwrap_or(1)
293-
});
294-
295-
match tls_config {
296-
TlsConfig::Enabled(tls_config) => {
297-
Box::pin(axum_server_dual_protocol::bind_dual_protocol(address, tls_config)
298-
.set_upgrade(true) //http -> https
299-
.serve(Shared::new(http_grpc))
300-
.map_err(|cause| anyhow!(cause)))
301-
}
302-
TlsConfig::Disabled => {
303-
// Disable TLS in case a load balancer with TLS termination is present
304-
Box::pin(axum_server::bind(address).serve(Shared::new(http_grpc)).map_err(From::from))
305-
}
306-
}
219+
.route("/api/cleo/:architecture/download", get(router::cleo::download_cleo))
220+
.route("/api/edgar/:architecture/download", get(router::edgar::download_edgar))
221+
.route("/api/lea/config", get(router::lea_config))
222+
.nest_service(
223+
"/",
224+
ServeDir::new(&lea_dir)
225+
.fallback(ServeFile::new(lea_index_html))
226+
)
227+
.with_state(app_state)
228+
.into_service()
229+
.map_response(|response| response.map(tonic::body::boxed));
230+
231+
let grpc = tonic::transport::Server::builder()
232+
.layer(async_interceptor(move |request| {
233+
Clone::clone(&grpc_auth_layer).auth_interceptor(request)
234+
}))
235+
.accept_http1(true) //gRPC-web uses HTTP1
236+
.layer(GrpcMultiplexLayer::new(http))
237+
.add_service(cluster_manager_facade.into_grpc_service())
238+
.add_service(metadata_provider_facade.into_grpc_service())
239+
.add_service(peer_manager_facade.into_grpc_service())
240+
.add_service(peer_messaging_broker_facade.into_grpc_service());
241+
242+
info!("Server listening at {address}...");
243+
grpc.serve(address).await?; //TODO tls_config?
244+
245+
Ok(())
307246
}
308247

309248
enum TlsConfig {

0 commit comments

Comments
 (0)