diff --git a/xc/src/container/runner.rs b/xc/src/container/runner.rs index 1ecc41d..c1bbefe 100644 --- a/xc/src/container/runner.rs +++ b/xc/src/container/runner.rs @@ -197,14 +197,21 @@ pub struct ProcessRunner { container: RunningContainer, - should_run_main: bool, main_started: bool, + main_exited: bool, + + // a queue containing the processes to be spawn by the end of event loop + spawn_queue: VecDeque<(String, Jexec)>, + + inits: SerialExec, + + deinits: SerialExec, } /// Processes that should start synchronously; that the next process should start if and only if /// the previous process (but not all the descentdant of previous proces) exited #[derive(Debug)] -struct SyncProcesses { +struct SerialExec { base_id: String, idx: usize, execs: VecDeque, @@ -212,9 +219,9 @@ struct SyncProcesses { activated: bool, } -impl SyncProcesses { - fn new(base_id: &str, execs: Vec, activated: bool) -> SyncProcesses { - SyncProcesses { +impl SerialExec { + fn new(base_id: &str, execs: Vec, activated: bool) -> SerialExec { + SerialExec { base_id: base_id.to_string(), execs: VecDeque::from(execs), idx: 0, @@ -443,32 +450,28 @@ impl ProcessRunner { pub fn new(kq: i32, container: RunningContainer) -> ProcessRunner { ProcessRunner { kq, //: kqueue().unwrap(), - container, named_process: Vec::new(), pmap: HashMap::new(), rpmap: HashMap::new(), control_streams: HashMap::new(), main_started: false, - should_run_main: false, + spawn_queue: VecDeque::new(), + inits: SerialExec::new("init", container.init_proto.clone(), !container.init_norun), + deinits: SerialExec::new("deinit", container.deinit_proto.clone(), false), + main_exited: false, + container, } } + #[inline] pub fn run_main(&mut self) { if let Some(main) = self.container.main_proto.clone() { - match self.spawn_process("main", &main, None) { - Ok(spawn_info) => { - debug!("main spawn: {spawn_info:#?}"); - } - Err(error) => error!("cannot spawn main: {error:#?}"), - } - self.container.main_started_notify.notify_waiters(); - self.main_started = true; - self.should_run_main = false; + self.spawn_queue.push_back(("main".to_string(), main)); } } fn handle_control_stream_cmd(&mut self, mut fd: i32, method: String, request: JsonPacket) { - use ipc::proto::{write_response, Response}; + use ipc::proto::write_response; use ipc::transport::PacketTransport; use std::io::Write; if method == "exec" { @@ -493,7 +496,12 @@ impl ProcessRunner { } } } else if method == "run_main" { - self.should_run_main = true; + if let Some(main) = self.container.main_proto.clone() { + self.spawn_queue.push_back(("main".to_string(), main)); + } else { + let packet = write_response(0, ()).unwrap(); + _ = fd.send_packet(&packet).unwrap() + } } else if method == "write_hosts" { let recv: Vec = serde_json::from_value(request.data).unwrap(); if let Ok(host_path) = crate::util::realpath(&self.container.root, "/etc/hosts") { @@ -503,10 +511,10 @@ impl ProcessRunner { .create(true) .open(host_path) { - writeln!(&mut file, "::1 localhost"); - writeln!(&mut file, "127.0.0.1 localhost"); + _ = writeln!(&mut file, "::1 localhost"); + _ = writeln!(&mut file, "127.0.0.1 localhost"); for entry in recv.iter() { - writeln!(&mut file, "{} {}", entry.ip_addr, entry.hostname); + _ = writeln!(&mut file, "{} {}", entry.ip_addr, entry.hostname); } } } @@ -515,14 +523,7 @@ impl ProcessRunner { } } - fn handle_pid_event( - &mut self, - event: KEvent, - inits: &mut SyncProcesses, - deinits: &mut SyncProcesses, - last_deinit: &mut Option, - next_processes: &mut VecDeque<(String, Jexec)>, - ) -> bool { + fn handle_pid_event(&mut self, event: KEvent, last_deinit: &mut Option) -> bool { let fflag = event.fflags(); let pid = event.ident() as u32; if fflag.contains(FilterFlag::NOTE_EXIT) { @@ -551,14 +552,22 @@ impl ProcessRunner { stat.set_exited(event.data() as i32); unsafe { nix::libc::waitpid(pid as i32, std::ptr::null_mut(), 0) }; - if inits.try_drain_proc_queue(stat.id(), next_processes) + if self + .inits + .try_drain_proc_queue(stat.id(), &mut self.spawn_queue) && !self.container.main_norun { - self.should_run_main = true; + info!("SHOULD RUN MAIN"); + if let Some(main) = self.container.main_proto.clone() { + self.spawn_queue.push_back(("main".to_string(), main)); + } } - if deinits.try_drain_proc_queue(stat.id(), next_processes) { - *last_deinit = deinits.last_spawn.clone(); + if self + .deinits + .try_drain_proc_queue(stat.id(), &mut self.spawn_queue) + { + *last_deinit = self.deinits.last_spawn.clone(); // allow for the last deinit action to run at most // 15 seconds let event = KEvent::from_timer_seconds_oneshot(1486, 15); @@ -568,14 +577,15 @@ impl ProcessRunner { if descentdant_gone { stat.set_tree_exited(); if stat.id() == "main" { - if (self.container.deinit_norun || deinits.is_empty()) + self.main_exited = true; + if (self.container.deinit_norun || self.deinits.is_empty()) && !self.container.persist { return true; } else { debug!("activating deinit queue"); - deinits.activate(); - deinits.try_drain_proc_queue("", next_processes); + self.deinits.activate(); + self.deinits.try_drain_proc_queue("", &mut self.spawn_queue); } } else if let Some(last_deinit) = last_deinit.clone() { if last_deinit == stat.id() { @@ -612,21 +622,29 @@ impl ProcessRunner { _ = kevent_ts(kq, &[kill_event], &mut [], None); - let mut inits = SyncProcesses::new( - "init", - self.container.init_proto.clone(), - !self.container.init_norun, - ); - let mut deinits = SyncProcesses::new("deinit", self.container.deinit_proto.clone(), false); let mut last_deinit = None; - if inits.is_empty() && !self.container.main_norun { + if self.inits.is_empty() && !self.container.main_norun { self.run_main(); - } else if let Some((id, jexec)) = inits.pop_front() { + } else if let Some((id, jexec)) = self.inits.pop_front() { + self.inits.activate(); _ = self.spawn_process(&id, &jexec, None); } 'kq: loop { + while let Some((id, process)) = self.spawn_queue.pop_front() { + match self.spawn_process(&id, &process, None) { + Ok(spawn_info) => { + debug!("{id} spawn: {spawn_info:#?}"); + if id == "main" { + self.main_started = true; + self.container.main_started_notify.notify_waiters(); + } + } + Err(error) => error!("cannot spawn {id}: {error:#?}"), + } + } + sender.send_if_modified(|x| { *x = self.container.serialized(); true @@ -634,18 +652,10 @@ impl ProcessRunner { let nevx = kevent_ts(kq, &[], &mut events, None); let nev = nevx.unwrap(); - let mut next_processes = VecDeque::new(); - for event in &events[..nev] { match event.filter().unwrap() { EventFilter::EVFILT_PROC => { - if self.handle_pid_event( - *event, - &mut inits, - &mut deinits, - &mut last_deinit, - &mut next_processes, - ) { + if self.handle_pid_event(*event, &mut last_deinit) { break 'kq; } } @@ -672,12 +682,12 @@ impl ProcessRunner { } EventFilter::EVFILT_USER => { debug!("{event:#?}"); - if self.container.deinit_norun || deinits.is_empty() { + if self.container.deinit_norun || self.deinits.is_empty() { break 'kq; } else { debug!("activating deinit queue"); - deinits.activate(); - deinits.try_drain_proc_queue("", &mut next_processes); + self.deinits.activate(); + self.deinits.try_drain_proc_queue("", &mut self.spawn_queue); } } _ => { @@ -685,13 +695,6 @@ impl ProcessRunner { } } } - if self.should_run_main { - warn!("run main"); - self.run_main(); - } - while let Some((id, process)) = next_processes.pop_front() { - _ = self.spawn_process(&id, &process, None); - } } self.cleanup(sender); diff --git a/xcd/src/ipc.rs b/xcd/src/ipc.rs index 2026004..c3e47f3 100644 --- a/xcd/src/ipc.rs +++ b/xcd/src/ipc.rs @@ -206,9 +206,7 @@ pub struct InstantiateRequest { pub no_clean: bool, pub name: Option, pub dns: DnsSetting, - pub extra_layers: List, - pub main_started_notify: Maybe, }