@@ -7,20 +7,15 @@ use std::sync::Arc;
7
7
use anyhow:: { anyhow, Context } ;
8
8
use axum:: routing:: get;
9
9
use axum_server:: tls_rustls:: RustlsConfig ;
10
- use axum_server_dual_protocol:: ServerExt ;
11
- use futures:: future:: BoxFuture ;
12
- use futures:: TryFutureExt ;
13
- use :: http:: { header:: CONTENT_TYPE , Request } ;
14
10
use pem:: Pem ;
15
- use tonic:: transport:: Server ;
16
11
use tonic_async_interceptor:: async_interceptor;
17
- use tower:: { make :: Shared , steer :: Steer , BoxError , ServiceExt } ;
12
+ use tower:: ServiceExt ;
18
13
use tower_http:: services:: { ServeDir , ServeFile } ;
19
- use tracing:: { debug, info, warn } ;
14
+ use tracing:: { debug, info} ;
20
15
use uuid:: Uuid ;
21
16
22
17
use opendut_auth:: confidential:: pem:: PemFromConfig ;
23
- use opendut_auth:: registration:: client:: { RegistrationClient , RegistrationClientRef } ;
18
+ use opendut_auth:: registration:: client:: RegistrationClient ;
24
19
use opendut_auth:: registration:: resources:: ResourceHomeUrl ;
25
20
use opendut_util:: settings:: LoadedConfig ;
26
21
use opendut_util:: telemetry:: logging:: LoggingConfig ;
@@ -30,15 +25,15 @@ use util::in_memory_cache::CustomInMemoryCache;
30
25
31
26
use crate :: auth:: grpc_auth_layer:: GrpcAuthenticationLayer ;
32
27
use crate :: auth:: json_web_key:: JwkCacheValue ;
33
- use crate :: cluster:: manager:: { ClusterManager , ClusterManagerOptions , ClusterManagerRef } ;
28
+ use crate :: cluster:: manager:: { ClusterManager , ClusterManagerOptions } ;
34
29
use crate :: grpc:: { ClusterManagerFacade , MetadataProviderFacade , PeerManagerFacade , PeerMessagingBrokerFacade } ;
35
30
use crate :: http:: router;
36
31
use crate :: http:: state:: { CarlInstallDirectory , HttpState , LeaConfig , LeaIdentityProviderConfig } ;
37
- use crate :: peer:: broker:: { PeerMessagingBroker , PeerMessagingBrokerOptions , PeerMessagingBrokerRef } ;
32
+ use crate :: multiplex_service:: GrpcMultiplexLayer ;
33
+ use crate :: peer:: broker:: { PeerMessagingBroker , PeerMessagingBrokerOptions } ;
38
34
use crate :: provisioning:: cleo_script:: CleoScript ;
39
- use crate :: resources:: manager:: { ResourcesManager , ResourcesManagerRef } ;
35
+ use crate :: resources:: manager:: ResourcesManager ;
40
36
use crate :: resources:: storage:: PersistenceOptions ;
41
- use crate :: vpn:: Vpn ;
42
37
43
38
pub mod grpc;
44
39
pub mod util;
@@ -55,6 +50,7 @@ mod vpn;
55
50
mod http;
56
51
mod provisioning;
57
52
mod auth;
53
+ mod multiplex_service;
58
54
59
55
#[ tracing:: instrument]
60
56
pub async fn create_with_telemetry ( settings_override : config:: Config ) -> anyhow:: Result < ( ) > {
@@ -148,40 +144,10 @@ pub async fn create(settings: LoadedConfig) -> anyhow::Result<()> {
148
144
}
149
145
} ;
150
146
151
- info ! ( "Server listening at {address}..." ) ;
152
- spawn_server (
153
- address,
154
- tls_config,
155
- resources_manager,
156
- cluster_manager,
157
- peer_messaging_broker,
158
- vpn,
159
- carl_url,
160
- settings. config ,
161
- ca_certificate,
162
- oidc_registration_client,
163
- grpc_auth_layer,
164
- ) . await . unwrap ( ) ;
165
-
166
- Ok ( ( ) )
167
- }
147
+ //TODO remove
148
+ let ca = ca_certificate;
149
+ let settings = settings. config ;
168
150
169
- /// Isolation in function returning BoxFuture needed due to this: https://github.com/rust-lang/rust/issues/102211#issuecomment-1397600424
170
- #[ allow( clippy:: too_many_arguments) ]
171
- #[ tracing:: instrument( skip_all, level="TRACE" ) ]
172
- fn spawn_server (
173
- address : SocketAddr ,
174
- tls_config : TlsConfig ,
175
- resources_manager : ResourcesManagerRef ,
176
- cluster_manager : ClusterManagerRef ,
177
- peer_messaging_broker : PeerMessagingBrokerRef ,
178
- vpn : Vpn ,
179
- carl_url : ResourceHomeUrl ,
180
- settings : config:: Config ,
181
- ca : Pem ,
182
- oidc_registration_client : Option < RegistrationClientRef > ,
183
- grpc_auth_layer : GrpcAuthenticationLayer ,
184
- ) -> BoxFuture < ' static , anyhow:: Result < ( ) > > {
185
151
let oidc_enabled = settings. get_bool ( "network.oidc.enabled" ) . unwrap_or ( false ) ;
186
152
187
153
let cluster_manager_facade = ClusterManagerFacade :: new ( Arc :: clone ( & cluster_manager) , Arc :: clone ( & resources_manager) ) ;
@@ -196,19 +162,6 @@ fn spawn_server(
196
162
) ;
197
163
let peer_messaging_broker_facade = PeerMessagingBrokerFacade :: new ( Arc :: clone ( & peer_messaging_broker) ) ;
198
164
199
- let grpc = Server :: builder ( )
200
- . layer ( async_interceptor ( move |request| {
201
- Clone :: clone ( & grpc_auth_layer) . auth_interceptor ( request)
202
- } ) )
203
- . accept_http1 ( true ) //gRPC-web uses HTTP1
204
- . add_service ( cluster_manager_facade. into_grpc_service ( ) )
205
- . add_service ( metadata_provider_facade. into_grpc_service ( ) )
206
- . add_service ( peer_manager_facade. into_grpc_service ( ) )
207
- . add_service ( peer_messaging_broker_facade. into_grpc_service ( ) )
208
- . into_service ( )
209
- . map_response ( |response| response. map ( axum:: body:: boxed) )
210
- . boxed_clone ( ) ;
211
-
212
165
let lea_dir = project:: make_path_absolute ( settings. get_string ( "serve.ui.directory" )
213
166
. expect ( "Failed to find configuration for `serve.ui.directory`." ) )
214
167
. expect ( "Failure while making path absolute." ) ;
@@ -258,52 +211,38 @@ fn spawn_server(
258
211
}
259
212
260
213
let http = axum:: Router :: new ( )
261
- . fallback_service (
262
- axum:: Router :: new ( )
263
- . nest_service (
264
- "/api/licenses" ,
265
- ServeDir :: new ( & licenses_dir)
266
- . fallback ( ServeFile :: new ( licenses_dir. join ( "index.json" ) ) )
267
- )
268
- . route ( "/api/cleo/:architecture/download" , get ( router:: cleo:: download_cleo) )
269
- . route ( "/api/edgar/:architecture/download" , get ( router:: edgar:: download_edgar) )
270
- . route ( "/api/lea/config" , get ( router:: lea_config) )
271
- . nest_service (
272
- "/" ,
273
- ServeDir :: new ( & lea_dir)
274
- . fallback ( ServeFile :: new ( lea_index_html) )
275
- )
276
- . with_state ( app_state)
214
+ . nest_service (
215
+ "/api/licenses" ,
216
+ ServeDir :: new ( & licenses_dir)
217
+ . fallback ( ServeFile :: new ( licenses_dir. join ( "index.json" ) ) )
277
218
)
278
- . map_err ( BoxError :: from)
279
- . boxed_clone ( ) ;
280
-
281
- let http_grpc = Steer :: new ( vec ! [ grpc, http] , |request : & Request < _ > , _services : & [ _ ] | {
282
- request. headers ( )
283
- . get ( CONTENT_TYPE )
284
- . map ( |content_type| {
285
- let content_type = content_type. as_bytes ( ) ;
286
-
287
- if content_type. starts_with ( b"application/grpc" ) {
288
- 0
289
- } else {
290
- 1
291
- }
292
- } ) . unwrap_or ( 1 )
293
- } ) ;
294
-
295
- match tls_config {
296
- TlsConfig :: Enabled ( tls_config) => {
297
- Box :: pin ( axum_server_dual_protocol:: bind_dual_protocol ( address, tls_config)
298
- . set_upgrade ( true ) //http -> https
299
- . serve ( Shared :: new ( http_grpc) )
300
- . map_err ( |cause| anyhow ! ( cause) ) )
301
- }
302
- TlsConfig :: Disabled => {
303
- // Disable TLS in case a load balancer with TLS termination is present
304
- Box :: pin ( axum_server:: bind ( address) . serve ( Shared :: new ( http_grpc) ) . map_err ( From :: from) )
305
- }
306
- }
219
+ . route ( "/api/cleo/:architecture/download" , get ( router:: cleo:: download_cleo) )
220
+ . route ( "/api/edgar/:architecture/download" , get ( router:: edgar:: download_edgar) )
221
+ . route ( "/api/lea/config" , get ( router:: lea_config) )
222
+ . nest_service (
223
+ "/" ,
224
+ ServeDir :: new ( & lea_dir)
225
+ . fallback ( ServeFile :: new ( lea_index_html) )
226
+ )
227
+ . with_state ( app_state)
228
+ . into_service ( )
229
+ . map_response ( |response| response. map ( tonic:: body:: boxed) ) ;
230
+
231
+ let grpc = tonic:: transport:: Server :: builder ( )
232
+ . layer ( async_interceptor ( move |request| {
233
+ Clone :: clone ( & grpc_auth_layer) . auth_interceptor ( request)
234
+ } ) )
235
+ . accept_http1 ( true ) //gRPC-web uses HTTP1
236
+ . layer ( GrpcMultiplexLayer :: new ( http) )
237
+ . add_service ( cluster_manager_facade. into_grpc_service ( ) )
238
+ . add_service ( metadata_provider_facade. into_grpc_service ( ) )
239
+ . add_service ( peer_manager_facade. into_grpc_service ( ) )
240
+ . add_service ( peer_messaging_broker_facade. into_grpc_service ( ) ) ;
241
+
242
+ info ! ( "Server listening at {address}..." ) ;
243
+ grpc. serve ( address) . await ?; //TODO tls_config?
244
+
245
+ Ok ( ( ) )
307
246
}
308
247
309
248
enum TlsConfig {
0 commit comments