refactor some container lifecycle logic

This commit is contained in:
Yan Ka, Chiu
2023-07-20 13:44:41 -04:00
parent 0bab596dc0
commit 0d9bf93a10
2 changed files with 66 additions and 65 deletions

View File

@@ -197,14 +197,21 @@ pub struct ProcessRunner {
container: RunningContainer,
should_run_main: bool,
main_started: bool,
main_exited: bool,
// a queue containing the processes to be spawn by the end of event loop
spawn_queue: VecDeque<(String, Jexec)>,
inits: SerialExec,
deinits: SerialExec,
}
/// Processes that should start synchronously; that the next process should start if and only if
/// the previous process (but not all the descentdant of previous proces) exited
#[derive(Debug)]
struct SyncProcesses {
struct SerialExec {
base_id: String,
idx: usize,
execs: VecDeque<Jexec>,
@@ -212,9 +219,9 @@ struct SyncProcesses {
activated: bool,
}
impl SyncProcesses {
fn new(base_id: &str, execs: Vec<Jexec>, activated: bool) -> SyncProcesses {
SyncProcesses {
impl SerialExec {
fn new(base_id: &str, execs: Vec<Jexec>, activated: bool) -> SerialExec {
SerialExec {
base_id: base_id.to_string(),
execs: VecDeque::from(execs),
idx: 0,
@@ -443,32 +450,28 @@ impl ProcessRunner {
pub fn new(kq: i32, container: RunningContainer) -> ProcessRunner {
ProcessRunner {
kq, //: kqueue().unwrap(),
container,
named_process: Vec::new(),
pmap: HashMap::new(),
rpmap: HashMap::new(),
control_streams: HashMap::new(),
main_started: false,
should_run_main: false,
spawn_queue: VecDeque::new(),
inits: SerialExec::new("init", container.init_proto.clone(), !container.init_norun),
deinits: SerialExec::new("deinit", container.deinit_proto.clone(), false),
main_exited: false,
container,
}
}
#[inline]
pub fn run_main(&mut self) {
if let Some(main) = self.container.main_proto.clone() {
match self.spawn_process("main", &main, None) {
Ok(spawn_info) => {
debug!("main spawn: {spawn_info:#?}");
}
Err(error) => error!("cannot spawn main: {error:#?}"),
}
self.container.main_started_notify.notify_waiters();
self.main_started = true;
self.should_run_main = false;
self.spawn_queue.push_back(("main".to_string(), main));
}
}
fn handle_control_stream_cmd(&mut self, mut fd: i32, method: String, request: JsonPacket) {
use ipc::proto::{write_response, Response};
use ipc::proto::write_response;
use ipc::transport::PacketTransport;
use std::io::Write;
if method == "exec" {
@@ -493,7 +496,12 @@ impl ProcessRunner {
}
}
} else if method == "run_main" {
self.should_run_main = true;
if let Some(main) = self.container.main_proto.clone() {
self.spawn_queue.push_back(("main".to_string(), main));
} else {
let packet = write_response(0, ()).unwrap();
_ = fd.send_packet(&packet).unwrap()
}
} else if method == "write_hosts" {
let recv: Vec<HostEntry> = serde_json::from_value(request.data).unwrap();
if let Ok(host_path) = crate::util::realpath(&self.container.root, "/etc/hosts") {
@@ -503,10 +511,10 @@ impl ProcessRunner {
.create(true)
.open(host_path)
{
writeln!(&mut file, "::1 localhost");
writeln!(&mut file, "127.0.0.1 localhost");
_ = writeln!(&mut file, "::1 localhost");
_ = writeln!(&mut file, "127.0.0.1 localhost");
for entry in recv.iter() {
writeln!(&mut file, "{} {}", entry.ip_addr, entry.hostname);
_ = writeln!(&mut file, "{} {}", entry.ip_addr, entry.hostname);
}
}
}
@@ -515,14 +523,7 @@ impl ProcessRunner {
}
}
fn handle_pid_event(
&mut self,
event: KEvent,
inits: &mut SyncProcesses,
deinits: &mut SyncProcesses,
last_deinit: &mut Option<String>,
next_processes: &mut VecDeque<(String, Jexec)>,
) -> bool {
fn handle_pid_event(&mut self, event: KEvent, last_deinit: &mut Option<String>) -> bool {
let fflag = event.fflags();
let pid = event.ident() as u32;
if fflag.contains(FilterFlag::NOTE_EXIT) {
@@ -551,14 +552,22 @@ impl ProcessRunner {
stat.set_exited(event.data() as i32);
unsafe { nix::libc::waitpid(pid as i32, std::ptr::null_mut(), 0) };
if inits.try_drain_proc_queue(stat.id(), next_processes)
if self
.inits
.try_drain_proc_queue(stat.id(), &mut self.spawn_queue)
&& !self.container.main_norun
{
self.should_run_main = true;
info!("SHOULD RUN MAIN");
if let Some(main) = self.container.main_proto.clone() {
self.spawn_queue.push_back(("main".to_string(), main));
}
}
if deinits.try_drain_proc_queue(stat.id(), next_processes) {
*last_deinit = deinits.last_spawn.clone();
if self
.deinits
.try_drain_proc_queue(stat.id(), &mut self.spawn_queue)
{
*last_deinit = self.deinits.last_spawn.clone();
// allow for the last deinit action to run at most
// 15 seconds
let event = KEvent::from_timer_seconds_oneshot(1486, 15);
@@ -568,14 +577,15 @@ impl ProcessRunner {
if descentdant_gone {
stat.set_tree_exited();
if stat.id() == "main" {
if (self.container.deinit_norun || deinits.is_empty())
self.main_exited = true;
if (self.container.deinit_norun || self.deinits.is_empty())
&& !self.container.persist
{
return true;
} else {
debug!("activating deinit queue");
deinits.activate();
deinits.try_drain_proc_queue("", next_processes);
self.deinits.activate();
self.deinits.try_drain_proc_queue("", &mut self.spawn_queue);
}
} else if let Some(last_deinit) = last_deinit.clone() {
if last_deinit == stat.id() {
@@ -612,21 +622,29 @@ impl ProcessRunner {
_ = kevent_ts(kq, &[kill_event], &mut [], None);
let mut inits = SyncProcesses::new(
"init",
self.container.init_proto.clone(),
!self.container.init_norun,
);
let mut deinits = SyncProcesses::new("deinit", self.container.deinit_proto.clone(), false);
let mut last_deinit = None;
if inits.is_empty() && !self.container.main_norun {
if self.inits.is_empty() && !self.container.main_norun {
self.run_main();
} else if let Some((id, jexec)) = inits.pop_front() {
} else if let Some((id, jexec)) = self.inits.pop_front() {
self.inits.activate();
_ = self.spawn_process(&id, &jexec, None);
}
'kq: loop {
while let Some((id, process)) = self.spawn_queue.pop_front() {
match self.spawn_process(&id, &process, None) {
Ok(spawn_info) => {
debug!("{id} spawn: {spawn_info:#?}");
if id == "main" {
self.main_started = true;
self.container.main_started_notify.notify_waiters();
}
}
Err(error) => error!("cannot spawn {id}: {error:#?}"),
}
}
sender.send_if_modified(|x| {
*x = self.container.serialized();
true
@@ -634,18 +652,10 @@ impl ProcessRunner {
let nevx = kevent_ts(kq, &[], &mut events, None);
let nev = nevx.unwrap();
let mut next_processes = VecDeque::new();
for event in &events[..nev] {
match event.filter().unwrap() {
EventFilter::EVFILT_PROC => {
if self.handle_pid_event(
*event,
&mut inits,
&mut deinits,
&mut last_deinit,
&mut next_processes,
) {
if self.handle_pid_event(*event, &mut last_deinit) {
break 'kq;
}
}
@@ -672,12 +682,12 @@ impl ProcessRunner {
}
EventFilter::EVFILT_USER => {
debug!("{event:#?}");
if self.container.deinit_norun || deinits.is_empty() {
if self.container.deinit_norun || self.deinits.is_empty() {
break 'kq;
} else {
debug!("activating deinit queue");
deinits.activate();
deinits.try_drain_proc_queue("", &mut next_processes);
self.deinits.activate();
self.deinits.try_drain_proc_queue("", &mut self.spawn_queue);
}
}
_ => {
@@ -685,13 +695,6 @@ impl ProcessRunner {
}
}
}
if self.should_run_main {
warn!("run main");
self.run_main();
}
while let Some((id, process)) = next_processes.pop_front() {
_ = self.spawn_process(&id, &process, None);
}
}
self.cleanup(sender);

View File

@@ -206,9 +206,7 @@ pub struct InstantiateRequest {
pub no_clean: bool,
pub name: Option<String>,
pub dns: DnsSetting,
pub extra_layers: List<Fd>,
pub main_started_notify: Maybe<Fd>,
}