Skip to content

Commit 538c7ab

Browse files
committed
Change fifo to pipe in shim like go shim
use tokio pipe to resovle pipe's problem Signed-off-by: jokemanfire <[email protected]>
1 parent 217f0ee commit 538c7ab

File tree

8 files changed

+175
-100
lines changed

8 files changed

+175
-100
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ simple_logger = { version = "5.0", default-features = false }
4444
tempfile = "3.6"
4545
thiserror = "1.0"
4646
time = { version = "0.3.29", features = ["serde", "std", "formatting"] }
47-
tokio = "1.26"
47+
tokio = "1.40"
4848
tonic = "0.11"
4949
tonic-build = "0.11"
5050
tower = "0.4"

crates/runc-shim/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ license.workspace = true
1515
repository.workspace = true
1616
homepage.workspace = true
1717

18+
1819
[[bin]]
1920
# Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd.
2021
# Note: the runtime's binary name must start with "io.containerd.runc" in order to

crates/runc-shim/src/common.rs

+23-12
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ use std::{
2929

3030
use containerd_shim::{
3131
api::{ExecProcessRequest, Options},
32-
io_error, other, other_error,
33-
util::IntoOption,
34-
Error,
32+
io_error, other, other_error, Error,
3533
};
3634
use log::{debug, warn};
3735
use nix::{
@@ -43,7 +41,7 @@ use nix::{
4341
};
4442
use oci_spec::runtime::{LinuxNamespaceType, Spec};
4543
use runc::{
46-
io::{Io, NullIo, FIFO},
44+
io::{IOOption, Io, NullIo, PipedIo},
4745
options::GlobalOpts,
4846
Runc, Spawner,
4947
};
@@ -76,8 +74,8 @@ pub struct ProcessIO {
7674

7775
pub fn create_io(
7876
id: &str,
79-
_io_uid: u32,
80-
_io_gid: u32,
77+
io_uid: u32,
78+
io_gid: u32,
8179
stdio: &Stdio,
8280
) -> containerd_shim::Result<ProcessIO> {
8381
let mut pio = ProcessIO::default();
@@ -100,19 +98,32 @@ pub fn create_io(
10098

10199
if scheme == FIFO_SCHEME {
102100
debug!(
103-
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
101+
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
104102
id,
105103
stdio.stdin.as_str(),
106104
stdio.stdout.as_str(),
107105
stdio.stderr.as_str()
108106
);
109-
let io = FIFO {
110-
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
111-
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
112-
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
107+
108+
// let io = FIFO {
109+
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
110+
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
111+
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
112+
// };
113+
// pio.copy = false;
114+
115+
if stdio.stdin.is_empty() {
116+
debug!("stdin is empty");
117+
}
118+
let opts = IOOption {
119+
open_stdin: !stdio.stdin.is_empty(),
120+
open_stdout: !stdio.stdout.is_empty(),
121+
open_stderr: !stdio.stderr.is_empty(),
113122
};
123+
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
124+
pio.copy = true;
125+
114126
pio.io = Some(Arc::new(io));
115-
pio.copy = false;
116127
}
117128
Ok(pio)
118129
}

crates/runc-shim/src/processes.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tokio::{
3737
sync::oneshot::{channel, Receiver, Sender},
3838
};
3939

40-
use crate::io::Stdio;
40+
use crate::{common::ProcessIO, io::Stdio};
4141

4242
#[async_trait]
4343
pub trait Process {
@@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
7171
pub state: Status,
7272
pub id: String,
7373
pub stdio: Stdio,
74+
pub io: Option<Arc<ProcessIO>>,
7475
pub pid: i32,
7576
pub exit_code: i32,
7677
pub exited_at: Option<OffsetDateTime>,
@@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
8687
state: Status::CREATED,
8788
id: id.to_string(),
8889
stdio,
90+
io: None,
8991
pid: 0,
9092
exit_code: 0,
9193
exited_at: None,

crates/runc-shim/src/runc.rs

+28-5
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ impl RuncFactory {
163163
(Some(s), None)
164164
} else {
165165
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
166-
create_opts.io = pio.io.as_ref().cloned();
167-
(None, Some(pio))
166+
let ref_pio = Arc::new(pio);
167+
create_opts.io = ref_pio.io.clone();
168+
init.io = Some(ref_pio.clone());
169+
(None, Some(ref_pio))
168170
};
169171

170172
let resp = init
@@ -178,6 +180,22 @@ impl RuncFactory {
178180
}
179181
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
180182
}
183+
if !init.stdio.stdin.is_empty() {
184+
let stdin_clone = init.stdio.stdin.clone();
185+
let stdin_w = init.stdin.clone();
186+
// Open the write side in advance to make sure read side will not block,
187+
// open it in another thread otherwise it will block too.
188+
tokio::spawn(async move {
189+
if let Ok(stdin_w_file) = OpenOptions::new()
190+
.write(true)
191+
.open(stdin_clone.as_str())
192+
.await
193+
{
194+
let mut lock_guard = stdin_w.lock().unwrap();
195+
*lock_guard = Some(stdin_w_file);
196+
}
197+
});
198+
}
181199
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
182200
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
183201
init.pid = pid;
@@ -232,6 +250,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
232250
stderr: req.stderr.to_string(),
233251
terminal: req.terminal,
234252
},
253+
io: None,
235254
pid: 0,
236255
exit_code: 0,
237256
exited_at: None,
@@ -299,6 +318,7 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
299318
);
300319
}
301320
}
321+
302322
self.exit_signal.signal();
303323
Ok(())
304324
}
@@ -394,8 +414,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
394414
(Some(s), None)
395415
} else {
396416
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
397-
exec_opts.io = pio.io.as_ref().cloned();
398-
(None, Some(pio))
417+
let ref_pio = Arc::new(pio);
418+
exec_opts.io = ref_pio.io.clone();
419+
p.io = Some(ref_pio.clone());
420+
(None, Some(ref_pio))
399421
};
400422
//TODO checkpoint support
401423
let exec_result = self
@@ -632,7 +654,7 @@ where
632654
async fn copy_io_or_console<P>(
633655
p: &mut ProcessTemplate<P>,
634656
socket: Option<ConsoleSocket>,
635-
pio: Option<ProcessIO>,
657+
pio: Option<Arc<ProcessIO>>,
636658
exit_signal: Arc<ExitSignal>,
637659
) -> Result<()> {
638660
if p.stdio.terminal {
@@ -670,6 +692,7 @@ impl Spawner for ShimExecutor {
670692
}
671693
};
672694
let pid = child.id().unwrap();
695+
673696
let (stdout, stderr, exit_code) = tokio::join!(
674697
read_std(child.stdout),
675698
read_std(child.stderr),

crates/runc-shim/src/service.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ use containerd_shim::{
3232
convert_to_timestamp, read_options, read_pid_from_file, read_runtime, read_spec, timestamp,
3333
write_str_to_file,
3434
},
35-
Config, Context, DeleteResponse, Error, Flags, StartOpts,
35+
Config,Context, DeleteResponse, Error, Flags, StartOpts,
3636
};
37+
3738
use log::{debug, error, warn};
39+
3840
use tokio::sync::mpsc::{channel, Receiver, Sender};
3941

4042
use crate::{

0 commit comments

Comments
 (0)