Skip to content

Commit 0ddb3d9

Browse files
committed
Add streaming server
Signed-off-by: Sascha Grunert <[email protected]>
1 parent 1e8095d commit 0ddb3d9

File tree

11 files changed

+1573
-243
lines changed

11 files changed

+1573
-243
lines changed

Cargo.lock

Lines changed: 267 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

conmon-rs/common/proto/conmon.capnp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,22 @@ interface Conmon {
194194
key @0 :Text;
195195
value @1 :Text;
196196
}
197+
198+
###############################################
199+
# ServeExecContainer
200+
struct ServeExecContainerRequest {
201+
metadata @0 :Metadata; # Standard metadata to carry.
202+
id @1 :Text;
203+
cmd @2 :List(Text);
204+
tty @3 :Bool;
205+
stdin @4 :Bool;
206+
stdout @5 :Bool;
207+
stderr @6 :Bool;
208+
}
209+
210+
struct ServeExecContainerResponse {
211+
url @0 :Text;
212+
}
213+
214+
serveExecContainer @8 (request: ServeExecContainerRequest) -> (response: ServeExecContainerResponse);
197215
}

conmon-rs/server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ path = "src/main.rs"
99

1010
[dependencies]
1111
anyhow = "1.0.81"
12+
axum = { version = "0.7.4", features = ["ws"]}
1213
capnp = "0.17.2"
1314
capnp-rpc = "0.17.0"
1415
clap = { version = "4.3.8", features = ["color", "cargo", "deprecated", "derive", "deprecated", "env", "string", "unicode", "wrap_help"] }
@@ -41,6 +42,7 @@ tokio = { version = "1.36.0", features = ["fs", "io-std", "io-util", "macros", "
4142
tokio-eventfd = "0.2.1"
4243
tokio-seqpacket = "0.7.1"
4344
tokio-util = { version = "0.7.10", features = ["compat"] }
45+
tower-http = { version = "0.5.2", features = ["trace"] }
4446
tracing = "0.1.40"
4547
tracing-opentelemetry = "0.23.0"
4648
tracing-subscriber = "0.3.18"

conmon-rs/server/src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ macro_rules! prefix {
1212
};
1313
}
1414

15-
#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
15+
#[derive(
16+
Clone, CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters,
17+
)]
1618
#[serde(rename_all = "kebab-case")]
1719
#[command(
1820
after_help("More info at: https://github.com/containers/conmon-rs"),

conmon-rs/server/src/container_io.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ impl SharedContainerIO {
5959
pub async fn attach(&self) -> SharedContainerAttach {
6060
self.0.read().await.attach().clone()
6161
}
62+
63+
/// Retrieve the underlying ContainerIO instance.
64+
pub fn container_io(&mut self) -> Arc<RwLock<ContainerIO>> {
65+
self.0.clone()
66+
}
6267
}
6368

6469
#[derive(Debug, Getters, MutGetters)]
@@ -155,6 +160,28 @@ impl ContainerIO {
155160
Ok(path)
156161
}
157162

163+
pub fn channels(
164+
&mut self,
165+
) -> Result<(
166+
&mut UnboundedReceiver<Message>,
167+
Option<&mut UnboundedReceiver<Message>>,
168+
)> {
169+
match self.typ_mut() {
170+
ContainerIOType::Terminal(t) => {
171+
if let Some(message_rx) = t.message_rx_mut() {
172+
Ok((message_rx, None))
173+
} else {
174+
bail!("read_all_with_timeout called before message_rx was registered");
175+
}
176+
}
177+
ContainerIOType::Streams(s) => {
178+
let stdout_rx = &mut s.message_rx_stdout;
179+
let stderr_rx = &mut s.message_rx_stderr;
180+
Ok((stdout_rx, Some(stderr_rx)))
181+
}
182+
}
183+
}
184+
158185
pub async fn read_all_with_timeout(
159186
&mut self,
160187
time_to_timeout: Option<Instant>,

conmon-rs/server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod oom_watcher;
2424
mod pause;
2525
mod rpc;
2626
mod server;
27+
mod streaming_server;
2728
mod streams;
2829
mod telemetry;
2930
mod terminal;

conmon-rs/server/src/rpc.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,4 +458,46 @@ impl conmon::Server for Server {
458458
.instrument(debug_span!("promise")),
459459
)
460460
}
461+
462+
fn serve_exec_container(
463+
&mut self,
464+
params: conmon::ServeExecContainerParams,
465+
mut results: conmon::ServeExecContainerResults,
466+
) -> Promise<(), capnp::Error> {
467+
debug!("Got a serve exec container request");
468+
let req = pry!(pry!(params.get()).get_request());
469+
470+
let span = debug_span!(
471+
"serve_exec_container",
472+
uuid = Uuid::new_v4().to_string().as_str()
473+
);
474+
let _enter = span.enter();
475+
pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));
476+
477+
let id = pry!(req.get_id()).to_string();
478+
let cmd = capnp_vec_str!(req.get_cmd());
479+
let (tty, stdin, stdout, stderr) = (
480+
req.get_tty(),
481+
req.get_stdin(),
482+
req.get_stdout(),
483+
req.get_stderr(),
484+
);
485+
let mut streaming_server = self.streaming_server().clone();
486+
let child_reaper = self.reaper().clone();
487+
let config = self.config().clone();
488+
489+
Promise::from_future(
490+
async move {
491+
let url = capnp_err!(
492+
streaming_server
493+
.exec_url(child_reaper, config, id, cmd, tty, stdin, stdout, stderr)
494+
.await
495+
)?;
496+
497+
results.get().init_response().set_url(&url);
498+
Ok(())
499+
}
500+
.instrument(debug_span!("promise")),
501+
)
502+
}
461503
}

conmon-rs/server/src/server.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
journal::Journal,
1010
listener::{DefaultListener, Listener},
1111
pause::Pause,
12+
streaming_server::StreamingServer,
1213
telemetry::Telemetry,
1314
version::Version,
1415
};
@@ -53,6 +54,9 @@ pub struct Server {
5354
/// Fd socket instance.
5455
#[getset(get = "pub(crate)")]
5556
fd_socket: Arc<FdSocket>,
57+
58+
#[getset(get = "pub(crate)")]
59+
streaming_server: Box<StreamingServer>,
5660
}
5761

5862
impl Server {
@@ -62,6 +66,7 @@ impl Server {
6266
config: Default::default(),
6367
reaper: Default::default(),
6468
fd_socket: Default::default(),
69+
streaming_server: Default::default(),
6570
};
6671

6772
if let Some(v) = server.config().version() {
@@ -273,7 +278,12 @@ impl Server {
273278
.context("remove existing fd socket file")
274279
}
275280

276-
async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
281+
async fn start_backend(mut self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
282+
self.streaming_server
283+
.start()
284+
.await
285+
.context("start streaming server")?;
286+
277287
let listener =
278288
Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
279289
let client: conmon::Client = capnp_rpc::new_client(self);
@@ -357,6 +367,17 @@ impl GenerateRuntimeArgs<'_> {
357367

358368
/// Generate the OCI runtime CLI arguments from the provided parameters.
359369
pub(crate) fn exec_sync_args(&self, command: Reader) -> Result<Vec<String>> {
370+
let mut args = self.exec_sync_args_without_command();
371+
372+
for arg in command {
373+
args.push(arg?.to_string());
374+
}
375+
376+
debug!("Exec args {:?}", args.join(" "));
377+
Ok(args)
378+
}
379+
380+
pub(crate) fn exec_sync_args_without_command(&self) -> Vec<String> {
360381
let mut args = vec![];
361382

362383
if let Some(rr) = self.config.runtime_root() {
@@ -378,11 +399,6 @@ impl GenerateRuntimeArgs<'_> {
378399
args.push(format!("--pid-file={}", self.pidfile.display()));
379400
args.push(self.id.into());
380401

381-
for arg in command {
382-
args.push(arg?.to_string());
383-
}
384-
385-
debug!("Exec args {:?}", args.join(" "));
386-
Ok(args)
402+
args
387403
}
388404
}

0 commit comments

Comments
 (0)