diff --git a/ipc/src/packet/codec/mod.rs b/ipc/src/packet/codec/mod.rs index 1dbdd0e..99dfd28 100644 --- a/ipc/src/packet/codec/mod.rs +++ b/ipc/src/packet/codec/mod.rs @@ -112,6 +112,9 @@ impl List { pub fn to_vec(self) -> Vec { self.0 } + pub fn push(&mut self, item: T) { + self.0.push(item) + } } impl Default for List { diff --git a/xc-bin/src/format/mod.rs b/xc-bin/src/format/mod.rs index 5181b5a..73b5a1c 100644 --- a/xc-bin/src/format/mod.rs +++ b/xc-bin/src/format/mod.rs @@ -217,6 +217,41 @@ impl FromStr for PublishSpec { } } +const GB: usize = 1 << 30; +const MB: usize = 1 << 20; +const KB: usize = 1 << 10; + +const GB_F64: f64 = GB as f64; +const MB_F64: f64 = MB as f64; +const KB_F64: f64 = KB as f64; + +pub fn format_capacity(size: usize) -> String { + let bytes = size as f64; + if size > GB { + format!("{:.2} GB", bytes / GB_F64) + } else if size > MB { + format!("{:.2} MB", bytes / MB_F64) + } else if size > KB { + format!("{:.2} KB", bytes / KB_F64) + } else { + format!("{:.2} B", bytes) + } +} + +pub fn format_bandwidth(size: usize, secs: u64) -> String { + let bits = (size * 8) as f64; + let ss = secs as f64; + if size > GB { + format!("{:.2} gbps", bits / GB_F64 / ss) + } else if size > MB { + format!("{:.2} mbps", bits / MB_F64 / ss) + } else if size > KB { + format!("{:.2} kbps", bits / KB_F64 / ss) + } else { + format!("{:.2} bps", bits / ss) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/xc-bin/src/jailfile/directives/from.rs b/xc-bin/src/jailfile/directives/from.rs index a7a2cd1..dc49c72 100644 --- a/xc-bin/src/jailfile/directives/from.rs +++ b/xc-bin/src/jailfile/directives/from.rs @@ -73,29 +73,24 @@ impl Directive for FromDirective { } } let name = format!("build-{}", gen_id()); + /* create container */ let req = InstantiateRequest { - alt_root: None, name: Some(name.to_string()), - hostname: None, - copies: List::new(), dns: context.dns.clone(), image_reference: self.image_reference.clone(), - no_clean: false, main_norun: true, init_norun: true, deinit_norun: true, persist: true, - ips: Vec::new(), main_started_notify: Maybe::None, entry_point: "main".to_string(), entry_point_args: Vec::new(), envs: HashMap::new(), - vnet: false, - mount_req: Vec::new(), - extra_layers: List::new(), ipreq: context.network.clone(), + ..InstantiateRequest::default() }; + eprintln!("before instantiate"); match do_instantiate(&mut context.conn, req)? { Ok(response) => { diff --git a/xc-bin/src/main.rs b/xc-bin/src/main.rs index 8be157a..3908511 100644 --- a/xc-bin/src/main.rs +++ b/xc-bin/src/main.rs @@ -32,7 +32,7 @@ mod redirect; use crate::channel::{use_channel_action, ChannelAction}; use crate::error::ActionError; -use crate::format::{BindMount, EnvPair, IpWant, PublishSpec}; +use crate::format::{format_bandwidth, format_capacity, BindMount, EnvPair, IpWant, PublishSpec}; use crate::image::{use_image_action, ImageAction}; use crate::jailfile::directives::volume::VolumeDirective; use crate::network::{use_network_action, NetworkAction}; @@ -510,84 +510,70 @@ fn main() -> Result<(), ActionError> { image_reference: image_reference.clone(), remote_reference: new_image_reference.clone(), }; - if let Ok(_res) = do_push_image(&mut conn, req)? { - let mut lines_count = 0; - loop { - std::thread::sleep(std::time::Duration::from_millis(500)); - if lines_count > 0 { - eprint!("{}\x1B[0J", "\x1B[F".repeat(lines_count)); - } + match do_push_image(&mut conn, req)? { + Ok(_) => { + let mut lines_count = 0; + loop { + std::thread::sleep(std::time::Duration::from_millis(500)); + if lines_count > 0 { + eprint!("{}\x1B[0J", "\x1B[F".repeat(lines_count)); + } - let reqt = UploadStat { - image_reference: image_reference.clone(), - remote_reference: new_image_reference.clone(), - }; - let res = do_upload_stat(&mut conn, reqt)?.unwrap(); - if let Some(error) = res.fault { - eprintln!("{error}"); - return Ok(()); - } else if res.layers.is_empty() { - lines_count = 1; - eprintln!("initializing"); - } else if res.done { - eprintln!("Completed"); - return Ok(()); - } else { - lines_count = res.layers.len() + 2; - let x = res.current_upload.unwrap_or(0); - for (i, digest) in res.layers.iter().enumerate() { - match i.cmp(&x) { - Ordering::Less => eprintln!("{digest} ... done"), - Ordering::Equal => { - let speed = res.duration_secs.and_then(|secs| { - res.bytes.map(|bytes| (bytes * 8) as f64 / secs as f64) - }); - let uploaded = res - .bytes - .map(|bytes| { - let bytes = bytes as f64; - if bytes > 1000000000.0 { - format!("{:.2} GB", bytes / 1000000000.0) - } else if bytes > 1000000.0 { - format!("{:.2} MB", bytes / 1000000.0) - } else if bytes > 1000.0 { - format!("{:.2} KB", bytes / 1000.0) - } else { - format!("{:.2} B", bytes) - } - }) - .unwrap_or_else(|| "".to_string()); - let label = match speed { - None => "".to_string(), - Some(speed) => { - if speed > 1000000000.0 { - format!("{:.2} gbps", speed / 1000000000.0) - } else if speed > 1000000.0 { - format!("{:.2} mbps", speed / 1000000.0) - } else if speed > 1000.0 { - format!("{:.2} kbps", speed / 1000.0) - } else { - format!("{:.2} bps", speed) - } - } - }; - eprintln!("{digest} ... uploading {uploaded} @ {label}"); - } - Ordering::Greater => eprintln!("{digest}"), - }; - } - if res.push_config { - eprintln!("Image config ... done"); + let reqt = UploadStat { + image_reference: image_reference.clone(), + remote_reference: new_image_reference.clone(), + }; + let res = do_upload_stat(&mut conn, reqt)?.unwrap(); + if let Some(error) = res.fault { + eprintln!("{error}"); + return Ok(()); + } else if res.layers.is_empty() { + lines_count = 1; + eprintln!("initializing"); + } else if res.done { + eprintln!("Completed"); + return Ok(()); } else { - eprintln!("Image config"); - } - if res.push_manifest { - eprintln!("Image manifest ... done") - } else { - eprintln!("Image manifest") + lines_count = res.layers.len() + 2; + let x = res.current_upload_idx.unwrap_or(0); + for (i, digest) in res.layers.iter().enumerate() { + match i.cmp(&x) { + Ordering::Less => eprintln!("{digest} ... done"), + Ordering::Equal => { + let uploaded = + res.bytes.map(format_capacity).unwrap_or_default(); + let bandwidth = res + .bytes + .and_then(|bytes| { + res.duration_secs + .map(|sec| format_bandwidth(bytes, sec)) + }) + .unwrap_or_default(); + let total = res + .current_layer_size + .map(format_capacity) + .unwrap_or_default(); + eprintln!("{digest} ... uploading {uploaded}/{total} @ {bandwidth}"); + } + Ordering::Greater => eprintln!("{digest}"), + }; + } + if res.push_config { + eprintln!("Image config ... done"); + } else { + eprintln!("Image config"); + } + if res.push_manifest { + eprintln!("Image manifest ... done") + } else { + eprintln!("Image manifest") + } } } } + Err(err) => { + eprintln!("cannot push image: {err:#?}") + } } } Action::Rdr(rdr) => { @@ -698,7 +684,6 @@ fn main() -> Result<(), ActionError> { Maybe::Some(Fd(fd)) }; let mut reqt = InstantiateRequest { - alt_root: None, name, hostname, copies, @@ -710,14 +695,12 @@ fn main() -> Result<(), ActionError> { entry_point_args, extra_layers, no_clean, - main_norun: false, - init_norun: false, - deinit_norun: false, persist, dns, image_reference, ips: ips.into_iter().map(|v| v.0).collect(), main_started_notify: main_started_notify.clone(), + ..InstantiateRequest::default() }; if create_only { diff --git a/xc/src/container/mod.rs b/xc/src/container/mod.rs index eb518cc..9a1cfec 100644 --- a/xc/src/container/mod.rs +++ b/xc/src/container/mod.rs @@ -33,7 +33,7 @@ use self::process::ProcessStat; use crate::container::running::RunningContainer; use crate::models::exec::Jexec; use crate::models::jail_image::JailImage; -use crate::models::network::{DnsSetting, IpAssign}; +use crate::models::network::IpAssign; use crate::util::realpath; use anyhow::Context; @@ -44,19 +44,17 @@ use ipcidr::IpCidr; use jail::param::Value; use jail::StoppedJail; use oci_util::image_reference::ImageReference; -use request::{CopyFileReq, Mount}; +use request::Mount; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ffi::OsString; use std::net::IpAddr; -use std::os::fd::{AsRawFd, FromRawFd}; use std::path::PathBuf; use std::sync::Arc; use tracing::info; -/// Represents an instance of container #[derive(Debug, Clone)] -pub struct Container { +pub struct CreateContainer { pub id: String, pub name: String, pub hostname: String, @@ -75,54 +73,23 @@ pub struct Container { pub deinit_norun: bool, pub persist: bool, pub no_clean: bool, + /// Do not create /proc automatically and abort mounting procfs if the directory is missing. pub linux_no_create_proc_dir: bool, + /// Do not cerate /sys automatically and abort mounting sysfs if the directory is missing pub linux_no_create_sys_dir: bool, + /// Do not mount linux sysfs + pub linux_no_mount_sys: bool, + /// Do not mount linux procfs + pub linux_no_mount_proc: bool, pub zfs_origin: Option, - pub dns: DnsSetting, pub origin_image: Option, pub allowing: Vec, pub image_reference: Option, - pub copies: Vec, pub default_router: Option, } -impl Container { - fn setup_resolv_conf(&self) -> anyhow::Result<()> { - let resolv_conf_path = realpath(&self.root, "/etc/resolv.conf") - .with_context(|| format!("failed finding /etc/resolv.conf in jail {}", self.id))?; - - match &self.dns { - DnsSetting::Nop => {} - DnsSetting::Inherit => { - std::fs::copy("/etc/resolv.conf", resolv_conf_path).with_context(|| { - format!( - "failed copying resolv.conf to destination container: {}", - self.id - ) - })?; - } - DnsSetting::Specified { - servers, - search_domains, - } => { - let servers = servers - .iter() - .map(|host| format!("nameserver {host}")) - .collect::>() - .join("\n"); - let domains = search_domains - .iter() - .map(|host| format!("domain {host}")) - .collect::>() - .join("\n"); - let resolv_conf = format!("{domains}\n{servers}\n"); - std::fs::write(resolv_conf_path, resolv_conf)?; - } - } - Ok(()) - } - - pub fn start_transactionally(&self, undo: &mut UndoStack) -> anyhow::Result { +impl CreateContainer { + pub fn create_transactionally(&self, undo: &mut UndoStack) -> anyhow::Result { info!(name = self.name, "starting jail"); let root = &self.root; @@ -131,8 +98,6 @@ impl Container { undo.pf_create_anchor(anchor) .with_context(|| format!("failed to create pf anchor for container {}", self.id))?; - self.setup_resolv_conf()?; - let devfs_ruleset = self.devfs_ruleset_id; let mut proto = StoppedJail::new(root).name(&self.name).param( @@ -161,33 +126,6 @@ impl Container { )?; } - for copy in self.copies.iter() { - let dest = realpath(root, ©.destination)?; - let in_fd = copy.source; - let file = unsafe { std::fs::File::from_raw_fd(copy.source) }; - let metadata = file.metadata().unwrap(); - - let sink = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open(dest) - .unwrap(); - - let sfd = sink.as_raw_fd(); - - let size = unsafe { - nix::libc::copy_file_range( - in_fd, - std::ptr::null_mut(), - sfd, - std::ptr::null_mut(), - metadata.len() as usize, - 0, - ) - }; - - eprintln!("copied: {size}"); - } let mut ifaces_to_move = HashMap::new(); if !self.vnet { @@ -275,7 +213,7 @@ impl Container { let proc_path = format!("{root}/proc"); let sys_path = format!("{root}/sys"); - { + if !self.linux_no_mount_proc { let path = std::path::Path::new(&proc_path); let path_existed = path.exists(); @@ -291,7 +229,8 @@ impl Container { )?; } } - { + + if !self.linux_no_mount_sys { let path = std::path::Path::new(&sys_path); let path_existed = path.exists(); @@ -310,9 +249,10 @@ impl Container { } let jail = proto.start()?; - let dmillis = std::time::Duration::from_millis(10); if self.vnet { + let dmillis = std::time::Duration::from_millis(10); + for (iface, addresses) in ifaces_to_move.iter() { undo.move_if(iface.to_owned(), jail.jid)?; std::thread::sleep(dmillis); diff --git a/xc/src/container/runner/control_stream.rs b/xc/src/container/runner/control_stream.rs new file mode 100644 index 0000000..10cb43a --- /dev/null +++ b/xc/src/container/runner/control_stream.rs @@ -0,0 +1,142 @@ +// Copyright (c) 2023 Yan Ka, Chiu. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions, and the following disclaimer, +// without modification, immediately at the beginning of the file. +// 2. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +// OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +// SUCH DAMAGE. + +use ipc::packet::codec::json::JsonPacket; +use ipc::packet::Packet; +use ipc::proto::Request; +use std::io::Read; +use std::os::fd::{AsRawFd, RawFd}; +use std::os::unix::net::UnixStream; + +#[derive(Debug)] +struct ReadingPacket { + buffer: Vec, + read_len: usize, + expected_len: usize, + fds: Vec, +} + +impl ReadingPacket { + fn ready(&self) -> bool { + self.read_len == self.expected_len + } + + fn read(&mut self, socket: &mut UnixStream, known_avail: usize) -> Result<(), std::io::Error> { + if !self.ready() { + let len = socket.read(&mut self.buffer[self.read_len..][..known_avail])?; + self.read_len += len; + } + Ok(()) + } + + fn new(socket: &mut UnixStream) -> Result { + let mut header_bytes = [0u8; 16]; + _ = socket.read(&mut header_bytes)?; + let expected_len = u64::from_be_bytes(header_bytes[0..8].try_into().unwrap()) as usize; + let fds_count = u64::from_be_bytes(header_bytes[8..].try_into().unwrap()) as usize; + if fds_count > 64 { + panic!("") + } + let mut buffer = vec![0u8; expected_len]; + let mut fds = Vec::new(); + let read_len = + ipc::transport::recv_packet_once(socket.as_raw_fd(), fds_count, &mut buffer, &mut fds)?; + + Ok(Self { + buffer, + read_len, + expected_len, + fds, + }) + } +} + +pub(crate) enum Readiness { + Pending, + Ready(T), +} + +// XXX: naively pretend writes always successful +#[derive(Debug)] +pub(crate) struct ControlStream { + socket: UnixStream, + processing: Option, +} + +impl ControlStream { + pub(crate) fn new(socket: UnixStream) -> ControlStream { + ControlStream { + socket, + processing: None, + } + } + + pub(crate) fn socket_fd(&self) -> RawFd { + self.socket.as_raw_fd() + } + + pub(crate) fn try_get_request( + &mut self, + known_avail: usize, + ) -> Result, anyhow::Error> { + self.pour_in_bytes(known_avail) + .and_then(|readiness| match readiness { + Readiness::Pending => Ok(Readiness::Pending), + Readiness::Ready(packet) => { + let request: ipc::packet::TypedPacket = + packet.map_failable(|vec| serde_json::from_slice(vec))?; + + let method = request.data.method.to_string(); + let packet = request.map(|req| req.value.clone()); + + Ok(Readiness::Ready((method, packet))) + } + }) + } + + pub(crate) fn pour_in_bytes( + &mut self, + known_avail: usize, + ) -> Result, anyhow::Error> { + if let Some(reading_packet) = &mut self.processing { + if reading_packet.ready() { + panic!("the client is sending more bytes than expected"); + } + reading_packet.read(&mut self.socket, known_avail).unwrap(); + } else { + let reading_packet = ReadingPacket::new(&mut self.socket).unwrap(); + self.processing = Some(reading_packet); + } + let Some(processing) = self.processing.take() else { panic!() }; + if processing.ready() { + Ok(Readiness::Ready(Packet { + data: processing.buffer, + fds: processing.fds, + })) + } else { + self.processing = Some(processing); + Ok(Readiness::Pending) + } + } +} diff --git a/xc/src/container/runner.rs b/xc/src/container/runner/mod.rs similarity index 86% rename from xc/src/container/runner.rs rename to xc/src/container/runner/mod.rs index c1bbefe..f07a452 100644 --- a/xc/src/container/runner.rs +++ b/xc/src/container/runner/mod.rs @@ -22,6 +22,10 @@ // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF // SUCH DAMAGE. +mod control_stream; + +use self::control_stream::{ControlStream, Readiness}; + use crate::container::error::ExecError; use crate::container::process::*; use crate::container::running::RunningContainer; @@ -35,125 +39,16 @@ use anyhow::Context; use freebsd::event::{EventFdNotify, KEventExt}; use freebsd::FreeBSDCommandExt; use ipc::packet::codec::json::JsonPacket; -use ipc::packet::Packet; -use ipc::proto::Request; use jail::process::Jailed; use nix::libc::intptr_t; use nix::sys::event::{kevent_ts, EventFilter, EventFlag, FilterFlag, KEvent}; use std::collections::{HashMap, VecDeque}; -use std::io::Read; -use std::os::fd::{AsRawFd, RawFd}; use std::os::unix::net::UnixStream; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use tokio::sync::watch::{channel, Receiver, Sender}; use tracing::{debug, error, info, trace, warn}; -#[derive(Debug)] -struct ReadingPacket { - buffer: Vec, - read_len: usize, - expected_len: usize, - fds: Vec, -} - -impl ReadingPacket { - fn ready(&self) -> bool { - self.read_len == self.expected_len - } - - fn read(&mut self, socket: &mut UnixStream, known_avail: usize) -> Result<(), std::io::Error> { - if !self.ready() { - let len = socket.read(&mut self.buffer[self.read_len..][..known_avail])?; - self.read_len += len; - } - Ok(()) - } - - fn new(socket: &mut UnixStream) -> Result { - let mut header_bytes = [0u8; 16]; - _ = socket.read(&mut header_bytes)?; - let expected_len = u64::from_be_bytes(header_bytes[0..8].try_into().unwrap()) as usize; - let fds_count = u64::from_be_bytes(header_bytes[8..].try_into().unwrap()) as usize; - if fds_count > 64 { - panic!("") - } - let mut buffer = vec![0u8; expected_len]; - let mut fds = Vec::new(); - let read_len = - ipc::transport::recv_packet_once(socket.as_raw_fd(), fds_count, &mut buffer, &mut fds)?; - - Ok(Self { - buffer, - read_len, - expected_len, - fds, - }) - } -} - -enum Readiness { - Pending, - Ready(T), -} - -// XXX: naively pretend writes always successful -#[derive(Debug)] -pub struct ControlStream { - socket: UnixStream, - processing: Option, -} - -impl ControlStream { - pub fn new(socket: UnixStream) -> ControlStream { - ControlStream { - socket, - processing: None, - } - } - - fn try_get_request( - &mut self, - known_avail: usize, - ) -> Result, anyhow::Error> { - self.pour_in_bytes(known_avail) - .and_then(|readiness| match readiness { - Readiness::Pending => Ok(Readiness::Pending), - Readiness::Ready(packet) => { - let request: ipc::packet::TypedPacket = - packet.map_failable(|vec| serde_json::from_slice(vec))?; - - let method = request.data.method.to_string(); - let packet = request.map(|req| req.value.clone()); - - Ok(Readiness::Ready((method, packet))) - } - }) - } - - fn pour_in_bytes(&mut self, known_avail: usize) -> Result, anyhow::Error> { - if let Some(reading_packet) = &mut self.processing { - if reading_packet.ready() { - panic!("the client is sending more bytes than expected"); - } - reading_packet.read(&mut self.socket, known_avail).unwrap(); - } else { - let reading_packet = ReadingPacket::new(&mut self.socket).unwrap(); - self.processing = Some(reading_packet); - } - let Some(processing) = self.processing.take() else { panic!() }; - if processing.ready() { - Ok(Readiness::Ready(Packet { - data: processing.buffer, - fds: processing.fds, - })) - } else { - self.processing = Some(processing); - Ok(Readiness::Pending) - } - } -} - #[derive(Debug)] pub struct ProcessRunnerStat { pub(super) id: String, @@ -195,6 +90,14 @@ pub struct ProcessRunner { pub(super) control_streams: HashMap, + /// This field records the epoch seconds when the container is "started", which defined by a + /// container that has completed its init-routine + pub(super) started: Option, + + /// If `auto_start` is true, the container executes its init routine automatically after + /// creation + pub(super) auto_start: bool, + container: RunningContainer, main_started: bool, @@ -276,9 +179,9 @@ impl SerialExec { } impl ProcessRunner { - pub fn add_control_stream(&mut self, control_stream: ControlStream) { + pub(crate) fn add_control_stream(&mut self, control_stream: ControlStream) { debug!("adding control stream"); - let fd = control_stream.socket.as_raw_fd(); + let fd = control_stream.socket_fd(); self.control_streams.insert(fd, control_stream); let read_event = KEvent::from_read(fd); _ = kevent_ts(self.kq, &[read_event], &mut [], None); @@ -447,9 +350,9 @@ impl ProcessRunner { _ = kevent_ts(self.kq, &[event], &mut [], None); } - pub fn new(kq: i32, container: RunningContainer) -> ProcessRunner { + pub fn new(kq: i32, container: RunningContainer, auto_start: bool) -> ProcessRunner { ProcessRunner { - kq, //: kqueue().unwrap(), + kq, named_process: Vec::new(), pmap: HashMap::new(), rpmap: HashMap::new(), @@ -460,6 +363,8 @@ impl ProcessRunner { deinits: SerialExec::new("deinit", container.deinit_proto.clone(), false), main_exited: false, container, + started: None, + auto_start, } } @@ -502,6 +407,7 @@ impl ProcessRunner { let packet = write_response(0, ()).unwrap(); _ = fd.send_packet(&packet).unwrap() } + } else if method == "start" { } else if method == "write_hosts" { let recv: Vec = serde_json::from_value(request.data).unwrap(); if let Ok(host_path) = crate::util::realpath(&self.container.root, "/etc/hosts") { @@ -523,6 +429,21 @@ impl ProcessRunner { } } + fn start(&mut self) { + self.started = Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + if self.inits.is_empty() && !self.container.main_norun { + self.run_main(); + } else if let Some((id, jexec)) = self.inits.pop_front() { + self.inits.activate(); + _ = self.spawn_process(&id, &jexec, None); + } + } + fn handle_pid_event(&mut self, event: KEvent, last_deinit: &mut Option) -> bool { let fflag = event.fflags(); let pid = event.ident() as u32; @@ -624,11 +545,8 @@ impl ProcessRunner { let mut last_deinit = None; - if self.inits.is_empty() && !self.container.main_norun { - self.run_main(); - } else if let Some((id, jexec)) = self.inits.pop_front() { - self.inits.activate(); - _ = self.spawn_process(&id, &jexec, None); + if self.auto_start { + self.start(); } 'kq: loop { @@ -727,10 +645,11 @@ impl ProcessRunner { pub fn run( container: RunningContainer, control_stream: UnixStream, + auto_start: bool, ) -> (i32, Receiver) { let kq = nix::sys::event::kqueue().unwrap(); let (tx, rx) = channel(container.serialized()); - let mut pr = ProcessRunner::new(kq, container); + let mut pr = ProcessRunner::new(kq, container, auto_start); pr.add_control_stream(ControlStream::new(control_stream)); let kq = pr.kq; std::thread::spawn(move || { diff --git a/xc/src/container/running.rs b/xc/src/container/running.rs index 252bf90..5f2aa8f 100644 --- a/xc/src/container/running.rs +++ b/xc/src/container/running.rs @@ -27,15 +27,20 @@ use crate::container::request::Mount; use crate::container::ContainerManifest; use crate::models::exec::Jexec; use crate::models::jail_image::JailImage; -use crate::models::network::IpAssign; +use crate::models::network::{DnsSetting, IpAssign}; +use crate::util::realpath; +use anyhow::Context; use freebsd::event::EventFdNotify; use oci_util::image_reference::ImageReference; use std::collections::HashMap; use std::net::IpAddr; +use std::os::fd::{AsRawFd, FromRawFd}; use std::sync::Arc; use tokio::sync::watch::Receiver; +use super::request::CopyFileReq; + #[derive(Clone, Debug)] pub struct RunningContainer { pub devfs_ruleset_id: u16, @@ -76,6 +81,69 @@ pub struct RunningContainer { } impl RunningContainer { + pub fn copyin(&self, req: &CopyFileReq) -> anyhow::Result<()> { + let dest = realpath(&self.root, &req.destination)?; + let in_fd = req.source; + let file = unsafe { std::fs::File::from_raw_fd(in_fd) }; + let metadata = file.metadata().unwrap(); + let sink = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(dest) + .unwrap(); + + let sfd = sink.as_raw_fd(); + + let _size = unsafe { + nix::libc::copy_file_range( + in_fd, + std::ptr::null_mut(), + sfd, + std::ptr::null_mut(), + metadata.len() as usize, + 0, + ) + }; + + Ok(()) + // eprintln!("copied: {size}"); + } + + pub fn setup_resolv_conf(&self, dns: &DnsSetting) -> anyhow::Result<()> { + let resolv_conf_path = realpath(&self.root, "/etc/resolv.conf") + .with_context(|| format!("failed finding /etc/resolv.conf in jail {}", self.id))?; + + match dns { + DnsSetting::Nop => {} + DnsSetting::Inherit => { + std::fs::copy("/etc/resolv.conf", resolv_conf_path).with_context(|| { + format!( + "failed copying resolv.conf to destination container: {}", + self.id + ) + })?; + } + DnsSetting::Specified { + servers, + search_domains, + } => { + let servers = servers + .iter() + .map(|host| format!("nameserver {host}")) + .collect::>() + .join("\n"); + let domains = search_domains + .iter() + .map(|host| format!("domain {host}")) + .collect::>() + .join("\n"); + let resolv_conf = format!("{domains}\n{servers}\n"); + std::fs::write(resolv_conf_path, resolv_conf)?; + } + } + Ok(()) + } + pub fn serialized(&self) -> ContainerManifest { let mut processes = HashMap::new(); diff --git a/xcd/src/context/instantiate.rs b/xcd/src/context/instantiate.rs index b1e04dc..9d8a76d 100644 --- a/xcd/src/context/instantiate.rs +++ b/xcd/src/context/instantiate.rs @@ -68,12 +68,15 @@ pub struct InstantiateBlueprint { pub envs: HashMap, pub entry_point: String, pub entry_point_args: Vec, - pub devfs_ruleset_id: u16, pub ip_alloc: Vec, pub default_router: Option, - pub main_started_notify: Option, + pub create_only: bool, + pub linux_no_create_sys_dir: bool, + pub linux_no_create_proc_dir: bool, + pub linux_no_mount_sys: bool, + pub linux_no_mount_proc: bool, } impl InstantiateBlueprint { @@ -130,11 +133,7 @@ impl InstantiateBlueprint { required_envs: Vec::new(), } }; - /* - let Some(entry_point) = config.entry_points.get(&request.entry_point) else { - precondition_failure!(ENOENT, "requested entry point not found: {}", request.entry_point); - }; - */ + let entry_point_args = if request.entry_point_args.is_empty() { entry_point .default_args @@ -367,6 +366,11 @@ impl InstantiateBlueprint { devfs_ruleset_id, default_router, main_started_notify, + create_only: request.create_only, + linux_no_create_sys_dir: request.linux_no_create_sys_dir, + linux_no_create_proc_dir: request.linux_no_create_proc_dir, + linux_no_mount_sys: request.linux_no_mount_sys, + linux_no_mount_proc: request.linux_no_mount_proc, }) } } diff --git a/xcd/src/image/push.rs b/xcd/src/image/push.rs index 0baf0ea..0de520c 100644 --- a/xcd/src/image/push.rs +++ b/xcd/src/image/push.rs @@ -55,7 +55,7 @@ impl FromId for PushImageStatus { #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct PushImageStatusDesc { pub layers: Vec, - pub current_upload: Option, + pub current_upload_idx: Option, pub push_config: bool, pub push_manifest: bool, pub done: bool, @@ -63,17 +63,19 @@ pub struct PushImageStatusDesc { pub bytes: Option, pub duration_secs: Option, + pub current_layer_size: Option, } #[derive(Clone, Default)] pub struct PushImageStatus { pub layers: Vec, /// the layer we are pushing, identify by the position of it in the layers stack - pub current_upload: Option, + pub current_upload_idx: Option, pub push_config: bool, pub push_manifest: bool, pub done: bool, pub fault: Option, + pub current_layer_size: Option, pub upload_status: Option>, } @@ -95,11 +97,12 @@ impl PushImageStatus { }; PushImageStatusDesc { layers: self.layers.clone(), - current_upload: self.current_upload, + current_upload_idx: self.current_upload_idx, push_config: self.push_config, push_manifest: self.push_manifest, done: self.done, fault: self.fault.clone(), + current_layer_size: self.current_layer_size, bytes, duration_secs, } @@ -112,6 +115,7 @@ pub async fn push_image( remote_reference: ImageReference, ) -> Result>, PushImageError> { let id = format!("{reference}->{remote_reference}"); + info!("push image: {id}"); let name = remote_reference.name; let tag = remote_reference.tag.to_string(); @@ -199,21 +203,24 @@ pub async fn push_image( let path = format!("{layers_dir}/{}", map.archive_digest); let path = std::path::Path::new(&path); let file = std::fs::OpenOptions::new().read(true).open(path)?; + let metadata = file.metadata().unwrap(); + let layer_size = metadata.len() as usize; // let dedup_check = Ok::(false); let dedup_check = session.exists_digest(&map.archive_digest).await; _ = emitter.use_try(|state| { - if state.current_upload.is_none() { - state.current_upload = Some(0); + if state.current_upload_idx.is_none() { + state.current_upload_idx = Some(0); } + state.current_layer_size = Some(layer_size); Ok(()) }); + let descriptor = if dedup_check.is_ok() && dedup_check.unwrap() { - let metadata = file.metadata().unwrap(); _ = emitter.use_try(|state| { - if state.current_upload.is_none() { - state.current_upload = Some(0); + if state.current_upload_idx.is_none() { + state.current_upload_idx = Some(0); } Ok(()) }); @@ -221,7 +228,7 @@ pub async fn push_image( Descriptor { digest: map.archive_digest.clone(), media_type: content_type.to_string(), - size: metadata.len() as usize, + size: layer_size, } } else { info!("pushing {path:?}"); @@ -238,7 +245,7 @@ pub async fn push_image( }; uploads.push(descriptor); _ = emitter.use_try(|state| { - state.current_upload = Some(state.current_upload.unwrap() + 1); + state.current_upload_idx = Some(state.current_upload_idx.unwrap() + 1); Ok(()) }); } diff --git a/xcd/src/ipc.rs b/xcd/src/ipc.rs index c3e47f3..79e0ecc 100644 --- a/xcd/src/ipc.rs +++ b/xcd/src/ipc.rs @@ -34,7 +34,7 @@ use ipc::service::{ConnectionContext, Service}; use ipc_macro::{ipc_method, FromPacket}; use oci_util::digest::OciDigest; use oci_util::distribution::client::{BasicAuth, Registry}; -use oci_util::image_reference::ImageReference; +use oci_util::image_reference::{ImageReference, ImageTag}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::io::Seek; @@ -208,6 +208,68 @@ pub struct InstantiateRequest { pub dns: DnsSetting, pub extra_layers: List, pub main_started_notify: Maybe, + pub create_only: bool, + + pub linux_no_create_sys_dir: bool, + pub linux_no_mount_sys: bool, + pub linux_no_create_proc_dir: bool, + pub linux_no_mount_proc: bool, +} + +impl InstantiateRequest { + pub fn dns(&mut self, dns: DnsSetting) { + self.dns = dns; + } + + pub fn add_mount_req(&mut self, mount_req: MountReq) { + self.mount_req.push(mount_req); + } + + pub fn add_copyin(&mut self, req: CopyFile) { + self.copies.push(req); + } + + pub fn add_extra_layer(&mut self, extra_layer_fd: i32) { + self.extra_layers.push(Fd(extra_layer_fd)); + } +} + +impl Default for InstantiateRequest { + fn default() -> InstantiateRequest { + let image_reference = ImageReference { + hostname: None, + name: "xc-predefine".to_string(), + tag: ImageTag::Tag("empty".to_string()), + }; + + InstantiateRequest { + image_reference, + alt_root: None, + envs: HashMap::new(), + vnet: false, + ips: Vec::new(), + ipreq: Vec::new(), + mount_req: Vec::new(), + copies: List::new(), + entry_point: String::new(), + entry_point_args: Vec::new(), + hostname: None, + main_norun: false, + init_norun: false, + deinit_norun: false, + persist: false, + no_clean: false, + name: None, + dns: DnsSetting::Nop, + extra_layers: List::new(), + main_started_notify: Maybe::None, + create_only: false, + linux_no_create_sys_dir: false, + linux_no_mount_sys: false, + linux_no_create_proc_dir: false, + linux_no_mount_proc: false, + } + } } #[derive(Serialize, Deserialize, Debug)] diff --git a/xcd/src/site.rs b/xcd/src/site.rs index b917a9c..ce0a527 100644 --- a/xcd/src/site.rs +++ b/xcd/src/site.rs @@ -30,7 +30,6 @@ use ipc::packet::Packet; use ipc::proto::{Request, Response}; use ipc::transport::PacketTransport; use oci_util::digest::OciDigest; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::ffi::OsString; use std::fs::File; @@ -45,7 +44,7 @@ use tokio::sync::watch::Receiver; use tracing::{error, info}; use xc::config::XcConfig; use xc::container::effect::UndoStack; -use xc::container::{Container, ContainerManifest}; +use xc::container::{ContainerManifest, CreateContainer}; use xc::models::exec::Jexec; use xc::models::jail_image::JailImage; use xc::models::network::HostEntry; @@ -377,7 +376,7 @@ impl Site { info!("no extra layers to extract"); } - let container = Container { + let container = CreateContainer { name: blueprint.name, hostname: blueprint.hostname, id: blueprint.id, @@ -395,24 +394,33 @@ impl Site { main_norun: blueprint.main_norun, persist: blueprint.persist, no_clean: blueprint.no_clean, - linux_no_create_sys_dir: false, - linux_no_create_proc_dir: false, + linux_no_create_sys_dir: blueprint.linux_no_create_sys_dir, + linux_no_mount_sys: blueprint.linux_no_mount_sys, + linux_no_create_proc_dir: blueprint.linux_no_create_proc_dir, + linux_no_mount_proc: blueprint.linux_no_mount_proc, zfs_origin, - dns: blueprint.dns, origin_image: blueprint.origin_image, allowing: blueprint.allowing, image_reference: blueprint.image_reference, - copies: blueprint.copies, default_router: blueprint.default_router, }; let running_container = container - .start_transactionally(&mut self.undo) + .create_transactionally(&mut self.undo) .context("fail to start container")?; + + _ = running_container.setup_resolv_conf(&blueprint.dns); + + for copy in blueprint.copies.into_iter() { + _ = running_container.copyin(©); + } + let container_notify = running_container.notify.clone(); let main_started_notify = running_container.main_started_notify.clone(); - let (kq, recv) = xc::container::runner::run(running_container, sock_b); + let (kq, recv) = + xc::container::runner::run(running_container, sock_b, !blueprint.create_only); + self.container = Some(recv); self.ctl_channel = Some(kq); self.container_notify = Some(container_notify);