Skip to content

Commit c5ba76c

Browse files
committed
fix exec hang when tx channel is full
1 parent 202c756 commit c5ba76c

File tree

2 files changed

+45
-45
lines changed

2 files changed

+45
-45
lines changed

crates/runc-shim/src/service.rs

+40-41
Original file line numberDiff line numberDiff line change
@@ -159,52 +159,51 @@ async fn process_exits(
159159
if let Subject::Pid(pid) = e.subject {
160160
debug!("receive exit event: {}", &e);
161161
let exit_code = e.exit_code;
162-
for (_k, cont) in containers.lock().await.iter_mut() {
163-
let bundle = cont.bundle.to_string();
164-
// pid belongs to container init process
165-
if cont.init.pid == pid {
166-
// kill all children process if the container has a private PID namespace
167-
if should_kill_all_on_exit(&bundle).await {
168-
cont.kill(None, 9, true).await.unwrap_or_else(|e| {
169-
error!("failed to kill init's children: {}", e)
170-
});
171-
}
172-
// set exit for init process
173-
cont.init.set_exited(exit_code).await;
174-
175-
// publish event
176-
let (_, code, exited_at) = match cont.get_exit_info(None).await {
177-
Ok(info) => info,
178-
Err(_) => break,
179-
};
180-
181-
let ts = convert_to_timestamp(exited_at);
182-
let event = TaskExit {
183-
container_id: cont.id.to_string(),
184-
id: cont.id.to_string(),
185-
pid: cont.pid().await as u32,
186-
exit_status: code as u32,
187-
exited_at: Some(ts).into(),
188-
..Default::default()
189-
};
190-
let topic = event.topic();
191-
tx.send((topic.to_string(), Box::new(event)))
192-
.await
193-
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
162+
let containers = containers.clone();
163+
let tx = tx.clone();
164+
tokio::spawn(async move {
165+
for (_k, cont) in containers.lock().await.iter_mut() {
166+
let bundle = cont.bundle.to_string();
167+
// pid belongs to container init process
168+
if cont.init.pid == pid {
169+
// kill all children process if the container has a private PID namespace
170+
if should_kill_all_on_exit(&bundle).await {
171+
cont.kill(None, 9, true).await.unwrap_or_else(|e| {
172+
error!("failed to kill init's children: {}", e)
173+
});
174+
}
175+
// set exit for init process
176+
cont.init.set_exited(exit_code).await;
177+
178+
// publish event
179+
let (_, code, exited_at) = match cont.get_exit_info(None).await {
180+
Ok(info) => info,
181+
Err(_) => break,
182+
};
183+
184+
let ts = convert_to_timestamp(exited_at);
185+
let event = TaskExit {
186+
container_id: cont.id.to_string(),
187+
id: cont.id.to_string(),
188+
pid: cont.pid().await as u32,
189+
exit_status: code as u32,
190+
exited_at: Some(ts).into(),
191+
..Default::default()
192+
};
193+
let topic = event.topic();
194+
tx.send((topic.to_string(), Box::new(event)))
195+
.await
196+
.unwrap_or_else(|e| warn!("send {} to publisher: {}", topic, e));
194197

195-
break;
196-
}
198+
break;
199+
}
197200

198-
// pid belongs to container common process
199-
for (_exec_id, p) in cont.processes.iter_mut() {
200-
// set exit for exec process
201-
if p.pid == pid {
201+
// pid belongs to container common process
202+
if let Some(p) = cont.processes.values_mut().find(|p| p.pid == pid) {
202203
p.set_exited(exit_code).await;
203-
// TODO: publish event
204-
break;
205204
}
206205
}
207-
}
206+
});
208207
}
209208
}
210209
monitor_unsubscribe(s.id).await.unwrap_or_default();

crates/shim/src/asynchronous/monitor.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::HashMap;
1919
use lazy_static::lazy_static;
2020
use log::error;
2121
use tokio::sync::{
22-
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
22+
mpsc::{channel, Receiver, Sender},
2323
Mutex,
2424
};
2525

@@ -68,17 +68,17 @@ pub struct Monitor {
6868

6969
pub(crate) struct Subscriber {
7070
pub(crate) topic: Topic,
71-
pub(crate) tx: UnboundedSender<ExitEvent>,
71+
pub(crate) tx: Sender<ExitEvent>,
7272
}
7373

7474
pub struct Subscription {
7575
pub id: i64,
76-
pub rx: UnboundedReceiver<ExitEvent>,
76+
pub rx: Receiver<ExitEvent>,
7777
}
7878

7979
impl Monitor {
8080
pub fn subscribe(&mut self, topic: Topic) -> Result<Subscription> {
81-
let (tx, rx) = unbounded_channel::<ExitEvent>();
81+
let (tx, rx) = channel::<ExitEvent>(128);
8282
let id = self.seq_id;
8383
self.seq_id += 1;
8484
let subscriber = Subscriber {
@@ -117,6 +117,7 @@ impl Monitor {
117117
subject: subject.clone(),
118118
exit_code,
119119
})
120+
.await
120121
.map_err(other_error!(e, "failed to send exit code"));
121122
results.push(res);
122123
}

0 commit comments

Comments
 (0)