Skip to content

Commit e7bcdba

Browse files
cratelynconorsch
authored andcommitted
pd: 🔨 rework RootCommand::start auto-https logic
## 👀 overview fixes #3627. this reorganizes the logic in pd's startup code related to automatically managed https functionality. ## 🎨 background & motivation this PR, besides cleaning up the `rustls-acme`-related auto-https logic, is also interested in *creating a state-of-affairs that will dovetail into pr #3522*. in particular, this expression to start the GRPC serve given a bound listener... ```rust tokio::task::Builder::new() .name("grpc_server") .spawn(grpc_server.serve_with_incoming(tls_incoming)) .expect("failed to spawn grpc server") ``` ...should be adjusted so as to work with an `axum::Router`. ### ⚖️ `rustls-acme` and `tokio-rustls-acme` quoth the #3627 description, citing an earlier comment: > In the ~year since this code was written, there may be better options. > `tokio-rustls-acme` seems promising \- <#3522 (comment)> for reference, the repositories for each live here, and here: - <https://github.com/FlorianUekermann/rustls-acme> - <https://github.com/n0-computer/tokio-rustls-acme> after some comparison, i have come to the opinion that `rustls-acme` will still be adequate for our purposes. the latter is a fork of the former, but active development appears to have continued in the former, and i did not see any particular "_must-have_" features for us in the latter. ## 🎴 changes this commit moves some of the auto-https related code from the `main` entrypoint, into standalone functions in `pd::main`. some constants are defined, to keep control flow clear and to help facilitate the addition of future options e.g. a flag to control the LetsEncrypt environment to use. ## 🚰 dropping down to `axum`; a brief note on future upgrades as stated above, we want to switch to an `axum::Router`. this means that we won't be able to use the `AcmeConfig::incoming` function. the `rustls-acme` library provides some "low-level" examples this work is based upon - <https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/low_level_tokio.rs> - <https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/low_level_axum.rs> we also use `tonic` 0.10.2 in pd, and elsewhere in the penumbra monorepo. tonic isn't using hyper 1.x yet. this was being worked on in hyperium/tonic#1583, continued on in hyperium/tonic#1595, and tracked in hyperium/tonic#1579. that work also depends upon hyperium/hyper#3461. we will need to be wait for tonic to finish its migration over to hyper 1.0, see: hyperium/tonic#1579 (comment) this is understandable, but i make note of this situation as a signpost for our future selves when considering a migration to recent versions of axum-server, axum, rustls-acme, or hyper. for now, it's easiest to stay in lock-step with tonic, and we can revisit the upgrade path(s) at a future date. === Refs: #3627 Refs: #3646 Refs: #3522
1 parent 1d368d4 commit e7bcdba

File tree

4 files changed

+137
-93
lines changed

4 files changed

+137
-93
lines changed

Cargo.lock

+19-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/bin/pd/Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ console-subscriber = "0.2"
122122
metrics-tracing-context = "0.11.0"
123123
metrics-util = "0.13"
124124
clap = { version = "3", features = ["derive", "env"] }
125-
rustls-acme = "0.6"
126125
atty = "0.2"
127126
fs_extra = "1.3.0"
128127

128+
axum = { version = "0.6.20", features = ["tokio", "http2"] }
129+
axum-server = { version = "0.4.7", features = ["tls-rustls"] }
130+
rustls = "0.20.9"
131+
rustls-acme = { version = "0.6.0", features = ["axum"] }
132+
129133
[dev-dependencies]
130134
penumbra-proof-params = { path = "../../crypto/proof-params", features = [
131135
"bundled-proving-keys",

crates/bin/pd/src/auto_https.rs

+84-44
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,94 @@
1-
use std::{
2-
pin::Pin,
3-
task::{Context, Poll},
4-
};
1+
//! Automatic HTTPS certificate management facilities.
2+
//!
3+
//! See [`axum_acceptor`] for more information.
54
6-
use pin_project::pin_project;
7-
use rustls_acme::futures_rustls::server::TlsStream;
8-
use tokio::{
9-
io::{AsyncRead, AsyncWrite, ReadBuf},
10-
net::TcpStream,
5+
use {
6+
anyhow::Error,
7+
futures::Future,
8+
rustls::ServerConfig,
9+
rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig, AcmeState},
10+
std::{fmt::Debug, path::PathBuf, sync::Arc},
1111
};
12-
use tokio_util::compat::Compat;
13-
use tonic::transport::server::Connected;
14-
15-
/// Wrapper type needed to convert between futures_io and tokio traits
16-
#[pin_project]
17-
pub struct Wrapper {
18-
#[pin]
19-
pub inner: Compat<TlsStream<Compat<TcpStream>>>,
20-
}
2112

22-
impl Connected for Wrapper {
23-
type ConnectInfo = <TcpStream as Connected>::ConnectInfo;
13+
/// Protocols supported by this server, in order of preference.
14+
///
15+
/// See [rfc7301] for more info on ALPN.
16+
///
17+
/// [rfc7301]: https://datatracker.ietf.org/doc/html/rfc7301
18+
//
19+
// We also permit HTTP1.1 for backwards-compatibility, specifically for grpc-web.
20+
const ALPN_PROTOCOLS: [&'static [u8]; 2] = [b"h2", b"http/1.1"];
2421

25-
fn connect_info(&self) -> Self::ConnectInfo {
26-
self.inner.get_ref().get_ref().0.get_ref().connect_info()
27-
}
22+
/// The location of the file-based certificate cache.
23+
// NB: this must not be an absolute path see [Path::join].
24+
const CACHE_DIR: &'static str = "tokio_rustls_acme_cache";
25+
26+
/// If true, use the production Let's Encrypt environment.
27+
///
28+
/// If false, the ACME resolver will use the [staging environment].
29+
///
30+
/// [staging environment]: https://letsencrypt.org/docs/staging-environment/
31+
const PRODUCTION_LETS_ENCRYPT: bool = true;
32+
33+
/// Use ACME to resolve certificates and handle new connections.
34+
///
35+
/// This returns a tuple containing an [`AxumAcceptor`] that may be used with [`axum_server`], and
36+
/// a [`Future`] that represents the background task to poll and log for changes in the
37+
/// certificate environment.
38+
pub fn axum_acceptor(
39+
home: PathBuf,
40+
domain: String,
41+
) -> (AxumAcceptor, impl Future<Output = Result<(), Error>>) {
42+
// Use a file-based cache located within the home directory.
43+
let cache = home.join(CACHE_DIR);
44+
let cache = DirCache::new(cache);
45+
46+
// Create an ACME client, which we will use to resolve certificates.
47+
let state = AcmeConfig::new(vec![domain])
48+
.cache(cache)
49+
.directory_lets_encrypt(PRODUCTION_LETS_ENCRYPT)
50+
.state();
51+
52+
// Define our server configuration, using the ACME certificate resolver.
53+
let mut rustls_config = ServerConfig::builder()
54+
.with_safe_defaults()
55+
.with_no_client_auth()
56+
.with_cert_resolver(state.resolver());
57+
rustls_config.alpn_protocols = self::alpn_protocols();
58+
let rustls_config = Arc::new(rustls_config);
59+
60+
// Return our connection acceptor and our background worker task.
61+
let acceptor = state.axum_acceptor(rustls_config.clone());
62+
let worker = self::acme_worker(state);
63+
(acceptor, worker)
2864
}
2965

30-
impl AsyncRead for Wrapper {
31-
fn poll_read(
32-
self: Pin<&mut Self>,
33-
cx: &mut Context<'_>,
34-
buf: &mut ReadBuf<'_>,
35-
) -> Poll<std::io::Result<()>> {
36-
self.project().inner.poll_read(cx, buf)
66+
/// This function defines the task responsible for handling ACME events.
67+
///
68+
/// This function will never return, unless an error is encountered.
69+
#[tracing::instrument(level = "error", skip_all)]
70+
async fn acme_worker<EC, EA>(mut state: AcmeState<EC, EA>) -> Result<(), anyhow::Error>
71+
where
72+
EC: Debug + 'static,
73+
EA: Debug + 'static,
74+
{
75+
use futures::StreamExt;
76+
loop {
77+
match state.next().await {
78+
Some(Ok(ok)) => tracing::debug!("received acme event: {:?}", ok),
79+
Some(Err(err)) => tracing::error!("acme error: {:?}", err),
80+
None => {
81+
debug_assert!(false, "acme worker unexpectedly reached end-of-stream");
82+
tracing::error!("acme worker unexpectedly reached end-of-stream");
83+
anyhow::bail!("unexpected end-of-stream");
84+
}
85+
}
3786
}
3887
}
3988

40-
impl AsyncWrite for Wrapper {
41-
fn poll_write(
42-
self: Pin<&mut Self>,
43-
cx: &mut Context<'_>,
44-
buf: &[u8],
45-
) -> Poll<std::io::Result<usize>> {
46-
self.project().inner.poll_write(cx, buf)
47-
}
48-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
49-
self.project().inner.poll_flush(cx)
50-
}
51-
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
52-
self.project().inner.poll_shutdown(cx)
53-
}
89+
/// Returns a vector of the protocols supported by this server.
90+
///
91+
/// This is a convenience method to retrieve an owned copy of [`ALPN_PROTOCOLS`].
92+
fn alpn_protocols() -> Vec<Vec<u8>> {
93+
ALPN_PROTOCOLS.into_iter().map(<[u8]>::to_vec).collect()
5494
}

crates/bin/pd/src/main.rs

+29-35
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use metrics_util::layers::Stack;
99

1010
use anyhow::Context;
1111
use cnidarium::{StateDelta, Storage};
12-
use futures::stream::TryStreamExt;
1312
use ibc_proto::ibc::core::channel::v1::query_server::QueryServer as ChannelQueryServer;
1413
use ibc_proto::ibc::core::client::v1::query_server::QueryServer as ClientQueryServer;
1514
use ibc_proto::ibc::core::connection::v1::query_server::QueryServer as ConnectionQueryServer;
@@ -32,7 +31,7 @@ use penumbra_tower_trace::remote_addr;
3231
use rand::Rng;
3332
use rand_core::OsRng;
3433
use tendermint_config::net::Address as TendermintAddress;
35-
use tokio::{net::TcpListener, runtime};
34+
use tokio::runtime;
3635
use tonic::transport::Server;
3736
use tower_http::cors::CorsLayer;
3837
use tracing_subscriber::{prelude::*, EnvFilter};
@@ -276,39 +275,34 @@ async fn main() -> anyhow::Result<()> {
276275
)));
277276
}
278277

279-
let grpc_server = if let Some(domain) = grpc_auto_https {
280-
use pd::auto_https::Wrapper;
281-
use rustls_acme::{caches::DirCache, AcmeConfig};
282-
use tokio_stream::wrappers::TcpListenerStream;
283-
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
284-
285-
let mut acme_cache = pd_home.clone();
286-
acme_cache.push("rustls_acme_cache");
287-
288-
let bound_listener = TcpListener::bind(grpc_bind)
289-
.await
290-
.context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?;
291-
let listener = TcpListenerStream::new(bound_listener);
292-
// Configure HTTP2 support for the TLS negotiation; we also permit HTTP1.1
293-
// for backwards-compatibility, specifically for grpc-web.
294-
let alpn_config = vec!["h2".into(), "http/1.1".into()];
295-
let tls_incoming = AcmeConfig::new([domain.as_str()])
296-
.cache(DirCache::new(acme_cache))
297-
.directory_lets_encrypt(true) // Use the production LE environment
298-
.incoming(listener.map_ok(|conn| conn.compat()), alpn_config)
299-
.map_ok(|incoming| Wrapper {
300-
inner: incoming.compat(),
301-
});
302-
303-
tokio::task::Builder::new()
304-
.name("grpc_server")
305-
.spawn(grpc_server.serve_with_incoming(tls_incoming))
306-
.expect("failed to spawn grpc server")
307-
} else {
308-
tokio::task::Builder::new()
309-
.name("grpc_server")
310-
.spawn(grpc_server.serve(grpc_bind))
311-
.expect("failed to spawn grpc server")
278+
// Now we drop down a layer of abstraction, from tonic to axum.
279+
//
280+
// TODO(kate): this is where we may attach additional routes upon this router in the
281+
// future. see #3646 for more information.
282+
let router = grpc_server.into_router();
283+
let make_svc = router.into_make_service();
284+
285+
// Now start the GRPC server, initializing an ACME client to use as a certificate
286+
// resolver if auto-https has been enabled.
287+
macro_rules! spawn_grpc_server {
288+
($server:expr) => {
289+
tokio::task::Builder::new()
290+
.name("grpc_server")
291+
.spawn($server.serve(make_svc))
292+
.expect("failed to spawn grpc server")
293+
};
294+
}
295+
let grpc_server = axum_server::bind(grpc_bind);
296+
let grpc_server = match grpc_auto_https {
297+
Some(domain) => {
298+
let (acceptor, acme_worker) = pd::auto_https::axum_acceptor(pd_home, domain);
299+
// TODO(kate): we should eventually propagate errors from the ACME worker task.
300+
tokio::spawn(acme_worker);
301+
spawn_grpc_server!(grpc_server.acceptor(acceptor))
302+
}
303+
None => {
304+
spawn_grpc_server!(grpc_server)
305+
}
312306
};
313307

314308
// Configure a Prometheus recorder and exporter.

0 commit comments

Comments
 (0)