This commit is contained in:
Yan Ka, Chiu
2023-07-21 21:41:25 -04:00
parent 0d9bf93a10
commit 72d21ea33d
12 changed files with 475 additions and 309 deletions

View File

@@ -112,6 +112,9 @@ impl<T: FromPacket> List<T> {
pub fn to_vec(self) -> Vec<T> {
self.0
}
pub fn push(&mut self, item: T) {
self.0.push(item)
}
}
impl<T: FromPacket> Default for List<T> {

View File

@@ -217,6 +217,41 @@ impl FromStr for PublishSpec {
}
}
const GB: usize = 1 << 30;
const MB: usize = 1 << 20;
const KB: usize = 1 << 10;
const GB_F64: f64 = GB as f64;
const MB_F64: f64 = MB as f64;
const KB_F64: f64 = KB as f64;
pub fn format_capacity(size: usize) -> String {
let bytes = size as f64;
if size > GB {
format!("{:.2} GB", bytes / GB_F64)
} else if size > MB {
format!("{:.2} MB", bytes / MB_F64)
} else if size > KB {
format!("{:.2} KB", bytes / KB_F64)
} else {
format!("{:.2} B", bytes)
}
}
pub fn format_bandwidth(size: usize, secs: u64) -> String {
let bits = (size * 8) as f64;
let ss = secs as f64;
if size > GB {
format!("{:.2} gbps", bits / GB_F64 / ss)
} else if size > MB {
format!("{:.2} mbps", bits / MB_F64 / ss)
} else if size > KB {
format!("{:.2} kbps", bits / KB_F64 / ss)
} else {
format!("{:.2} bps", bits / ss)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -73,29 +73,24 @@ impl Directive for FromDirective {
}
}
let name = format!("build-{}", gen_id());
/* create container */
let req = InstantiateRequest {
alt_root: None,
name: Some(name.to_string()),
hostname: None,
copies: List::new(),
dns: context.dns.clone(),
image_reference: self.image_reference.clone(),
no_clean: false,
main_norun: true,
init_norun: true,
deinit_norun: true,
persist: true,
ips: Vec::new(),
main_started_notify: Maybe::None,
entry_point: "main".to_string(),
entry_point_args: Vec::new(),
envs: HashMap::new(),
vnet: false,
mount_req: Vec::new(),
extra_layers: List::new(),
ipreq: context.network.clone(),
..InstantiateRequest::default()
};
eprintln!("before instantiate");
match do_instantiate(&mut context.conn, req)? {
Ok(response) => {

View File

@@ -32,7 +32,7 @@ mod redirect;
use crate::channel::{use_channel_action, ChannelAction};
use crate::error::ActionError;
use crate::format::{BindMount, EnvPair, IpWant, PublishSpec};
use crate::format::{format_bandwidth, format_capacity, BindMount, EnvPair, IpWant, PublishSpec};
use crate::image::{use_image_action, ImageAction};
use crate::jailfile::directives::volume::VolumeDirective;
use crate::network::{use_network_action, NetworkAction};
@@ -510,84 +510,70 @@ fn main() -> Result<(), ActionError> {
image_reference: image_reference.clone(),
remote_reference: new_image_reference.clone(),
};
if let Ok(_res) = do_push_image(&mut conn, req)? {
let mut lines_count = 0;
loop {
std::thread::sleep(std::time::Duration::from_millis(500));
if lines_count > 0 {
eprint!("{}\x1B[0J", "\x1B[F".repeat(lines_count));
}
match do_push_image(&mut conn, req)? {
Ok(_) => {
let mut lines_count = 0;
loop {
std::thread::sleep(std::time::Duration::from_millis(500));
if lines_count > 0 {
eprint!("{}\x1B[0J", "\x1B[F".repeat(lines_count));
}
let reqt = UploadStat {
image_reference: image_reference.clone(),
remote_reference: new_image_reference.clone(),
};
let res = do_upload_stat(&mut conn, reqt)?.unwrap();
if let Some(error) = res.fault {
eprintln!("{error}");
return Ok(());
} else if res.layers.is_empty() {
lines_count = 1;
eprintln!("initializing");
} else if res.done {
eprintln!("Completed");
return Ok(());
} else {
lines_count = res.layers.len() + 2;
let x = res.current_upload.unwrap_or(0);
for (i, digest) in res.layers.iter().enumerate() {
match i.cmp(&x) {
Ordering::Less => eprintln!("{digest} ... done"),
Ordering::Equal => {
let speed = res.duration_secs.and_then(|secs| {
res.bytes.map(|bytes| (bytes * 8) as f64 / secs as f64)
});
let uploaded = res
.bytes
.map(|bytes| {
let bytes = bytes as f64;
if bytes > 1000000000.0 {
format!("{:.2} GB", bytes / 1000000000.0)
} else if bytes > 1000000.0 {
format!("{:.2} MB", bytes / 1000000.0)
} else if bytes > 1000.0 {
format!("{:.2} KB", bytes / 1000.0)
} else {
format!("{:.2} B", bytes)
}
})
.unwrap_or_else(|| "".to_string());
let label = match speed {
None => "".to_string(),
Some(speed) => {
if speed > 1000000000.0 {
format!("{:.2} gbps", speed / 1000000000.0)
} else if speed > 1000000.0 {
format!("{:.2} mbps", speed / 1000000.0)
} else if speed > 1000.0 {
format!("{:.2} kbps", speed / 1000.0)
} else {
format!("{:.2} bps", speed)
}
}
};
eprintln!("{digest} ... uploading {uploaded} @ {label}");
}
Ordering::Greater => eprintln!("{digest}"),
};
}
if res.push_config {
eprintln!("Image config ... done");
let reqt = UploadStat {
image_reference: image_reference.clone(),
remote_reference: new_image_reference.clone(),
};
let res = do_upload_stat(&mut conn, reqt)?.unwrap();
if let Some(error) = res.fault {
eprintln!("{error}");
return Ok(());
} else if res.layers.is_empty() {
lines_count = 1;
eprintln!("initializing");
} else if res.done {
eprintln!("Completed");
return Ok(());
} else {
eprintln!("Image config");
}
if res.push_manifest {
eprintln!("Image manifest ... done")
} else {
eprintln!("Image manifest")
lines_count = res.layers.len() + 2;
let x = res.current_upload_idx.unwrap_or(0);
for (i, digest) in res.layers.iter().enumerate() {
match i.cmp(&x) {
Ordering::Less => eprintln!("{digest} ... done"),
Ordering::Equal => {
let uploaded =
res.bytes.map(format_capacity).unwrap_or_default();
let bandwidth = res
.bytes
.and_then(|bytes| {
res.duration_secs
.map(|sec| format_bandwidth(bytes, sec))
})
.unwrap_or_default();
let total = res
.current_layer_size
.map(format_capacity)
.unwrap_or_default();
eprintln!("{digest} ... uploading {uploaded}/{total} @ {bandwidth}");
}
Ordering::Greater => eprintln!("{digest}"),
};
}
if res.push_config {
eprintln!("Image config ... done");
} else {
eprintln!("Image config");
}
if res.push_manifest {
eprintln!("Image manifest ... done")
} else {
eprintln!("Image manifest")
}
}
}
}
Err(err) => {
eprintln!("cannot push image: {err:#?}")
}
}
}
Action::Rdr(rdr) => {
@@ -698,7 +684,6 @@ fn main() -> Result<(), ActionError> {
Maybe::Some(Fd(fd))
};
let mut reqt = InstantiateRequest {
alt_root: None,
name,
hostname,
copies,
@@ -710,14 +695,12 @@ fn main() -> Result<(), ActionError> {
entry_point_args,
extra_layers,
no_clean,
main_norun: false,
init_norun: false,
deinit_norun: false,
persist,
dns,
image_reference,
ips: ips.into_iter().map(|v| v.0).collect(),
main_started_notify: main_started_notify.clone(),
..InstantiateRequest::default()
};
if create_only {

View File

@@ -33,7 +33,7 @@ use self::process::ProcessStat;
use crate::container::running::RunningContainer;
use crate::models::exec::Jexec;
use crate::models::jail_image::JailImage;
use crate::models::network::{DnsSetting, IpAssign};
use crate::models::network::IpAssign;
use crate::util::realpath;
use anyhow::Context;
@@ -44,19 +44,17 @@ use ipcidr::IpCidr;
use jail::param::Value;
use jail::StoppedJail;
use oci_util::image_reference::ImageReference;
use request::{CopyFileReq, Mount};
use request::Mount;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ffi::OsString;
use std::net::IpAddr;
use std::os::fd::{AsRawFd, FromRawFd};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
/// Represents an instance of container
#[derive(Debug, Clone)]
pub struct Container {
pub struct CreateContainer {
pub id: String,
pub name: String,
pub hostname: String,
@@ -75,54 +73,23 @@ pub struct Container {
pub deinit_norun: bool,
pub persist: bool,
pub no_clean: bool,
/// Do not create /proc automatically and abort mounting procfs if the directory is missing.
pub linux_no_create_proc_dir: bool,
/// Do not cerate /sys automatically and abort mounting sysfs if the directory is missing
pub linux_no_create_sys_dir: bool,
/// Do not mount linux sysfs
pub linux_no_mount_sys: bool,
/// Do not mount linux procfs
pub linux_no_mount_proc: bool,
pub zfs_origin: Option<String>,
pub dns: DnsSetting,
pub origin_image: Option<JailImage>,
pub allowing: Vec<String>,
pub image_reference: Option<ImageReference>,
pub copies: Vec<CopyFileReq>,
pub default_router: Option<IpAddr>,
}
impl Container {
fn setup_resolv_conf(&self) -> anyhow::Result<()> {
let resolv_conf_path = realpath(&self.root, "/etc/resolv.conf")
.with_context(|| format!("failed finding /etc/resolv.conf in jail {}", self.id))?;
match &self.dns {
DnsSetting::Nop => {}
DnsSetting::Inherit => {
std::fs::copy("/etc/resolv.conf", resolv_conf_path).with_context(|| {
format!(
"failed copying resolv.conf to destination container: {}",
self.id
)
})?;
}
DnsSetting::Specified {
servers,
search_domains,
} => {
let servers = servers
.iter()
.map(|host| format!("nameserver {host}"))
.collect::<Vec<_>>()
.join("\n");
let domains = search_domains
.iter()
.map(|host| format!("domain {host}"))
.collect::<Vec<_>>()
.join("\n");
let resolv_conf = format!("{domains}\n{servers}\n");
std::fs::write(resolv_conf_path, resolv_conf)?;
}
}
Ok(())
}
pub fn start_transactionally(&self, undo: &mut UndoStack) -> anyhow::Result<RunningContainer> {
impl CreateContainer {
pub fn create_transactionally(&self, undo: &mut UndoStack) -> anyhow::Result<RunningContainer> {
info!(name = self.name, "starting jail");
let root = &self.root;
@@ -131,8 +98,6 @@ impl Container {
undo.pf_create_anchor(anchor)
.with_context(|| format!("failed to create pf anchor for container {}", self.id))?;
self.setup_resolv_conf()?;
let devfs_ruleset = self.devfs_ruleset_id;
let mut proto = StoppedJail::new(root).name(&self.name).param(
@@ -161,33 +126,6 @@ impl Container {
)?;
}
for copy in self.copies.iter() {
let dest = realpath(root, &copy.destination)?;
let in_fd = copy.source;
let file = unsafe { std::fs::File::from_raw_fd(copy.source) };
let metadata = file.metadata().unwrap();
let sink = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(dest)
.unwrap();
let sfd = sink.as_raw_fd();
let size = unsafe {
nix::libc::copy_file_range(
in_fd,
std::ptr::null_mut(),
sfd,
std::ptr::null_mut(),
metadata.len() as usize,
0,
)
};
eprintln!("copied: {size}");
}
let mut ifaces_to_move = HashMap::new();
if !self.vnet {
@@ -275,7 +213,7 @@ impl Container {
let proc_path = format!("{root}/proc");
let sys_path = format!("{root}/sys");
{
if !self.linux_no_mount_proc {
let path = std::path::Path::new(&proc_path);
let path_existed = path.exists();
@@ -291,7 +229,8 @@ impl Container {
)?;
}
}
{
if !self.linux_no_mount_sys {
let path = std::path::Path::new(&sys_path);
let path_existed = path.exists();
@@ -310,9 +249,10 @@ impl Container {
}
let jail = proto.start()?;
let dmillis = std::time::Duration::from_millis(10);
if self.vnet {
let dmillis = std::time::Duration::from_millis(10);
for (iface, addresses) in ifaces_to_move.iter() {
undo.move_if(iface.to_owned(), jail.jid)?;
std::thread::sleep(dmillis);

View File

@@ -0,0 +1,142 @@
// Copyright (c) 2023 Yan Ka, Chiu.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions, and the following disclaimer,
// without modification, immediately at the beginning of the file.
// 2. The name of the author may not be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
// OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
use ipc::packet::codec::json::JsonPacket;
use ipc::packet::Packet;
use ipc::proto::Request;
use std::io::Read;
use std::os::fd::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
#[derive(Debug)]
struct ReadingPacket {
buffer: Vec<u8>,
read_len: usize,
expected_len: usize,
fds: Vec<RawFd>,
}
impl ReadingPacket {
fn ready(&self) -> bool {
self.read_len == self.expected_len
}
fn read(&mut self, socket: &mut UnixStream, known_avail: usize) -> Result<(), std::io::Error> {
if !self.ready() {
let len = socket.read(&mut self.buffer[self.read_len..][..known_avail])?;
self.read_len += len;
}
Ok(())
}
fn new(socket: &mut UnixStream) -> Result<ReadingPacket, anyhow::Error> {
let mut header_bytes = [0u8; 16];
_ = socket.read(&mut header_bytes)?;
let expected_len = u64::from_be_bytes(header_bytes[0..8].try_into().unwrap()) as usize;
let fds_count = u64::from_be_bytes(header_bytes[8..].try_into().unwrap()) as usize;
if fds_count > 64 {
panic!("")
}
let mut buffer = vec![0u8; expected_len];
let mut fds = Vec::new();
let read_len =
ipc::transport::recv_packet_once(socket.as_raw_fd(), fds_count, &mut buffer, &mut fds)?;
Ok(Self {
buffer,
read_len,
expected_len,
fds,
})
}
}
pub(crate) enum Readiness<T> {
Pending,
Ready(T),
}
// XXX: naively pretend writes always successful
#[derive(Debug)]
pub(crate) struct ControlStream {
socket: UnixStream,
processing: Option<ReadingPacket>,
}
impl ControlStream {
pub(crate) fn new(socket: UnixStream) -> ControlStream {
ControlStream {
socket,
processing: None,
}
}
pub(crate) fn socket_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
pub(crate) fn try_get_request(
&mut self,
known_avail: usize,
) -> Result<Readiness<(String, JsonPacket)>, anyhow::Error> {
self.pour_in_bytes(known_avail)
.and_then(|readiness| match readiness {
Readiness::Pending => Ok(Readiness::Pending),
Readiness::Ready(packet) => {
let request: ipc::packet::TypedPacket<Request> =
packet.map_failable(|vec| serde_json::from_slice(vec))?;
let method = request.data.method.to_string();
let packet = request.map(|req| req.value.clone());
Ok(Readiness::Ready((method, packet)))
}
})
}
pub(crate) fn pour_in_bytes(
&mut self,
known_avail: usize,
) -> Result<Readiness<Packet>, anyhow::Error> {
if let Some(reading_packet) = &mut self.processing {
if reading_packet.ready() {
panic!("the client is sending more bytes than expected");
}
reading_packet.read(&mut self.socket, known_avail).unwrap();
} else {
let reading_packet = ReadingPacket::new(&mut self.socket).unwrap();
self.processing = Some(reading_packet);
}
let Some(processing) = self.processing.take() else { panic!() };
if processing.ready() {
Ok(Readiness::Ready(Packet {
data: processing.buffer,
fds: processing.fds,
}))
} else {
self.processing = Some(processing);
Ok(Readiness::Pending)
}
}
}

View File

@@ -22,6 +22,10 @@
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
mod control_stream;
use self::control_stream::{ControlStream, Readiness};
use crate::container::error::ExecError;
use crate::container::process::*;
use crate::container::running::RunningContainer;
@@ -35,125 +39,16 @@ use anyhow::Context;
use freebsd::event::{EventFdNotify, KEventExt};
use freebsd::FreeBSDCommandExt;
use ipc::packet::codec::json::JsonPacket;
use ipc::packet::Packet;
use ipc::proto::Request;
use jail::process::Jailed;
use nix::libc::intptr_t;
use nix::sys::event::{kevent_ts, EventFilter, EventFlag, FilterFlag, KEvent};
use std::collections::{HashMap, VecDeque};
use std::io::Read;
use std::os::fd::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use tokio::sync::watch::{channel, Receiver, Sender};
use tracing::{debug, error, info, trace, warn};
#[derive(Debug)]
struct ReadingPacket {
buffer: Vec<u8>,
read_len: usize,
expected_len: usize,
fds: Vec<RawFd>,
}
impl ReadingPacket {
fn ready(&self) -> bool {
self.read_len == self.expected_len
}
fn read(&mut self, socket: &mut UnixStream, known_avail: usize) -> Result<(), std::io::Error> {
if !self.ready() {
let len = socket.read(&mut self.buffer[self.read_len..][..known_avail])?;
self.read_len += len;
}
Ok(())
}
fn new(socket: &mut UnixStream) -> Result<ReadingPacket, anyhow::Error> {
let mut header_bytes = [0u8; 16];
_ = socket.read(&mut header_bytes)?;
let expected_len = u64::from_be_bytes(header_bytes[0..8].try_into().unwrap()) as usize;
let fds_count = u64::from_be_bytes(header_bytes[8..].try_into().unwrap()) as usize;
if fds_count > 64 {
panic!("")
}
let mut buffer = vec![0u8; expected_len];
let mut fds = Vec::new();
let read_len =
ipc::transport::recv_packet_once(socket.as_raw_fd(), fds_count, &mut buffer, &mut fds)?;
Ok(Self {
buffer,
read_len,
expected_len,
fds,
})
}
}
enum Readiness<T> {
Pending,
Ready(T),
}
// XXX: naively pretend writes always successful
#[derive(Debug)]
pub struct ControlStream {
socket: UnixStream,
processing: Option<ReadingPacket>,
}
impl ControlStream {
pub fn new(socket: UnixStream) -> ControlStream {
ControlStream {
socket,
processing: None,
}
}
fn try_get_request(
&mut self,
known_avail: usize,
) -> Result<Readiness<(String, JsonPacket)>, anyhow::Error> {
self.pour_in_bytes(known_avail)
.and_then(|readiness| match readiness {
Readiness::Pending => Ok(Readiness::Pending),
Readiness::Ready(packet) => {
let request: ipc::packet::TypedPacket<Request> =
packet.map_failable(|vec| serde_json::from_slice(vec))?;
let method = request.data.method.to_string();
let packet = request.map(|req| req.value.clone());
Ok(Readiness::Ready((method, packet)))
}
})
}
fn pour_in_bytes(&mut self, known_avail: usize) -> Result<Readiness<Packet>, anyhow::Error> {
if let Some(reading_packet) = &mut self.processing {
if reading_packet.ready() {
panic!("the client is sending more bytes than expected");
}
reading_packet.read(&mut self.socket, known_avail).unwrap();
} else {
let reading_packet = ReadingPacket::new(&mut self.socket).unwrap();
self.processing = Some(reading_packet);
}
let Some(processing) = self.processing.take() else { panic!() };
if processing.ready() {
Ok(Readiness::Ready(Packet {
data: processing.buffer,
fds: processing.fds,
}))
} else {
self.processing = Some(processing);
Ok(Readiness::Pending)
}
}
}
#[derive(Debug)]
pub struct ProcessRunnerStat {
pub(super) id: String,
@@ -195,6 +90,14 @@ pub struct ProcessRunner {
pub(super) control_streams: HashMap<i32, ControlStream>,
/// This field records the epoch seconds when the container is "started", which defined by a
/// container that has completed its init-routine
pub(super) started: Option<u64>,
/// If `auto_start` is true, the container executes its init routine automatically after
/// creation
pub(super) auto_start: bool,
container: RunningContainer,
main_started: bool,
@@ -276,9 +179,9 @@ impl SerialExec {
}
impl ProcessRunner {
pub fn add_control_stream(&mut self, control_stream: ControlStream) {
pub(crate) fn add_control_stream(&mut self, control_stream: ControlStream) {
debug!("adding control stream");
let fd = control_stream.socket.as_raw_fd();
let fd = control_stream.socket_fd();
self.control_streams.insert(fd, control_stream);
let read_event = KEvent::from_read(fd);
_ = kevent_ts(self.kq, &[read_event], &mut [], None);
@@ -447,9 +350,9 @@ impl ProcessRunner {
_ = kevent_ts(self.kq, &[event], &mut [], None);
}
pub fn new(kq: i32, container: RunningContainer) -> ProcessRunner {
pub fn new(kq: i32, container: RunningContainer, auto_start: bool) -> ProcessRunner {
ProcessRunner {
kq, //: kqueue().unwrap(),
kq,
named_process: Vec::new(),
pmap: HashMap::new(),
rpmap: HashMap::new(),
@@ -460,6 +363,8 @@ impl ProcessRunner {
deinits: SerialExec::new("deinit", container.deinit_proto.clone(), false),
main_exited: false,
container,
started: None,
auto_start,
}
}
@@ -502,6 +407,7 @@ impl ProcessRunner {
let packet = write_response(0, ()).unwrap();
_ = fd.send_packet(&packet).unwrap()
}
} else if method == "start" {
} else if method == "write_hosts" {
let recv: Vec<HostEntry> = serde_json::from_value(request.data).unwrap();
if let Ok(host_path) = crate::util::realpath(&self.container.root, "/etc/hosts") {
@@ -523,6 +429,21 @@ impl ProcessRunner {
}
}
fn start(&mut self) {
self.started = Some(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
if self.inits.is_empty() && !self.container.main_norun {
self.run_main();
} else if let Some((id, jexec)) = self.inits.pop_front() {
self.inits.activate();
_ = self.spawn_process(&id, &jexec, None);
}
}
fn handle_pid_event(&mut self, event: KEvent, last_deinit: &mut Option<String>) -> bool {
let fflag = event.fflags();
let pid = event.ident() as u32;
@@ -624,11 +545,8 @@ impl ProcessRunner {
let mut last_deinit = None;
if self.inits.is_empty() && !self.container.main_norun {
self.run_main();
} else if let Some((id, jexec)) = self.inits.pop_front() {
self.inits.activate();
_ = self.spawn_process(&id, &jexec, None);
if self.auto_start {
self.start();
}
'kq: loop {
@@ -727,10 +645,11 @@ impl ProcessRunner {
pub fn run(
container: RunningContainer,
control_stream: UnixStream,
auto_start: bool,
) -> (i32, Receiver<ContainerManifest>) {
let kq = nix::sys::event::kqueue().unwrap();
let (tx, rx) = channel(container.serialized());
let mut pr = ProcessRunner::new(kq, container);
let mut pr = ProcessRunner::new(kq, container, auto_start);
pr.add_control_stream(ControlStream::new(control_stream));
let kq = pr.kq;
std::thread::spawn(move || {

View File

@@ -27,15 +27,20 @@ use crate::container::request::Mount;
use crate::container::ContainerManifest;
use crate::models::exec::Jexec;
use crate::models::jail_image::JailImage;
use crate::models::network::IpAssign;
use crate::models::network::{DnsSetting, IpAssign};
use crate::util::realpath;
use anyhow::Context;
use freebsd::event::EventFdNotify;
use oci_util::image_reference::ImageReference;
use std::collections::HashMap;
use std::net::IpAddr;
use std::os::fd::{AsRawFd, FromRawFd};
use std::sync::Arc;
use tokio::sync::watch::Receiver;
use super::request::CopyFileReq;
#[derive(Clone, Debug)]
pub struct RunningContainer {
pub devfs_ruleset_id: u16,
@@ -76,6 +81,69 @@ pub struct RunningContainer {
}
impl RunningContainer {
pub fn copyin(&self, req: &CopyFileReq) -> anyhow::Result<()> {
let dest = realpath(&self.root, &req.destination)?;
let in_fd = req.source;
let file = unsafe { std::fs::File::from_raw_fd(in_fd) };
let metadata = file.metadata().unwrap();
let sink = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(dest)
.unwrap();
let sfd = sink.as_raw_fd();
let _size = unsafe {
nix::libc::copy_file_range(
in_fd,
std::ptr::null_mut(),
sfd,
std::ptr::null_mut(),
metadata.len() as usize,
0,
)
};
Ok(())
// eprintln!("copied: {size}");
}
pub fn setup_resolv_conf(&self, dns: &DnsSetting) -> anyhow::Result<()> {
let resolv_conf_path = realpath(&self.root, "/etc/resolv.conf")
.with_context(|| format!("failed finding /etc/resolv.conf in jail {}", self.id))?;
match dns {
DnsSetting::Nop => {}
DnsSetting::Inherit => {
std::fs::copy("/etc/resolv.conf", resolv_conf_path).with_context(|| {
format!(
"failed copying resolv.conf to destination container: {}",
self.id
)
})?;
}
DnsSetting::Specified {
servers,
search_domains,
} => {
let servers = servers
.iter()
.map(|host| format!("nameserver {host}"))
.collect::<Vec<_>>()
.join("\n");
let domains = search_domains
.iter()
.map(|host| format!("domain {host}"))
.collect::<Vec<_>>()
.join("\n");
let resolv_conf = format!("{domains}\n{servers}\n");
std::fs::write(resolv_conf_path, resolv_conf)?;
}
}
Ok(())
}
pub fn serialized(&self) -> ContainerManifest {
let mut processes = HashMap::new();

View File

@@ -68,12 +68,15 @@ pub struct InstantiateBlueprint {
pub envs: HashMap<String, String>,
pub entry_point: String,
pub entry_point_args: Vec<String>,
pub devfs_ruleset_id: u16,
pub ip_alloc: Vec<IpAssign>,
pub default_router: Option<IpAddr>,
pub main_started_notify: Option<EventFdNotify>,
pub create_only: bool,
pub linux_no_create_sys_dir: bool,
pub linux_no_create_proc_dir: bool,
pub linux_no_mount_sys: bool,
pub linux_no_mount_proc: bool,
}
impl InstantiateBlueprint {
@@ -130,11 +133,7 @@ impl InstantiateBlueprint {
required_envs: Vec::new(),
}
};
/*
let Some(entry_point) = config.entry_points.get(&request.entry_point) else {
precondition_failure!(ENOENT, "requested entry point not found: {}", request.entry_point);
};
*/
let entry_point_args = if request.entry_point_args.is_empty() {
entry_point
.default_args
@@ -367,6 +366,11 @@ impl InstantiateBlueprint {
devfs_ruleset_id,
default_router,
main_started_notify,
create_only: request.create_only,
linux_no_create_sys_dir: request.linux_no_create_sys_dir,
linux_no_create_proc_dir: request.linux_no_create_proc_dir,
linux_no_mount_sys: request.linux_no_mount_sys,
linux_no_mount_proc: request.linux_no_mount_proc,
})
}
}

View File

@@ -55,7 +55,7 @@ impl FromId<SharedContext, String> for PushImageStatus {
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PushImageStatusDesc {
pub layers: Vec<OciDigest>,
pub current_upload: Option<usize>,
pub current_upload_idx: Option<usize>,
pub push_config: bool,
pub push_manifest: bool,
pub done: bool,
@@ -63,17 +63,19 @@ pub struct PushImageStatusDesc {
pub bytes: Option<usize>,
pub duration_secs: Option<u64>,
pub current_layer_size: Option<usize>,
}
#[derive(Clone, Default)]
pub struct PushImageStatus {
pub layers: Vec<OciDigest>,
/// the layer we are pushing, identify by the position of it in the layers stack
pub current_upload: Option<usize>,
pub current_upload_idx: Option<usize>,
pub push_config: bool,
pub push_manifest: bool,
pub done: bool,
pub fault: Option<String>,
pub current_layer_size: Option<usize>,
pub upload_status: Option<Receiver<UploadStat>>,
}
@@ -95,11 +97,12 @@ impl PushImageStatus {
};
PushImageStatusDesc {
layers: self.layers.clone(),
current_upload: self.current_upload,
current_upload_idx: self.current_upload_idx,
push_config: self.push_config,
push_manifest: self.push_manifest,
done: self.done,
fault: self.fault.clone(),
current_layer_size: self.current_layer_size,
bytes,
duration_secs,
}
@@ -112,6 +115,7 @@ pub async fn push_image(
remote_reference: ImageReference,
) -> Result<Receiver<Task<String, PushImageStatus>>, PushImageError> {
let id = format!("{reference}->{remote_reference}");
info!("push image: {id}");
let name = remote_reference.name;
let tag = remote_reference.tag.to_string();
@@ -199,21 +203,24 @@ pub async fn push_image(
let path = format!("{layers_dir}/{}", map.archive_digest);
let path = std::path::Path::new(&path);
let file = std::fs::OpenOptions::new().read(true).open(path)?;
let metadata = file.metadata().unwrap();
let layer_size = metadata.len() as usize;
// let dedup_check = Ok::<bool, std::io::Error>(false);
let dedup_check = session.exists_digest(&map.archive_digest).await;
_ = emitter.use_try(|state| {
if state.current_upload.is_none() {
state.current_upload = Some(0);
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
state.current_layer_size = Some(layer_size);
Ok(())
});
let descriptor = if dedup_check.is_ok() && dedup_check.unwrap() {
let metadata = file.metadata().unwrap();
_ = emitter.use_try(|state| {
if state.current_upload.is_none() {
state.current_upload = Some(0);
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
Ok(())
});
@@ -221,7 +228,7 @@ pub async fn push_image(
Descriptor {
digest: map.archive_digest.clone(),
media_type: content_type.to_string(),
size: metadata.len() as usize,
size: layer_size,
}
} else {
info!("pushing {path:?}");
@@ -238,7 +245,7 @@ pub async fn push_image(
};
uploads.push(descriptor);
_ = emitter.use_try(|state| {
state.current_upload = Some(state.current_upload.unwrap() + 1);
state.current_upload_idx = Some(state.current_upload_idx.unwrap() + 1);
Ok(())
});
}

View File

@@ -34,7 +34,7 @@ use ipc::service::{ConnectionContext, Service};
use ipc_macro::{ipc_method, FromPacket};
use oci_util::digest::OciDigest;
use oci_util::distribution::client::{BasicAuth, Registry};
use oci_util::image_reference::ImageReference;
use oci_util::image_reference::{ImageReference, ImageTag};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Seek;
@@ -208,6 +208,68 @@ pub struct InstantiateRequest {
pub dns: DnsSetting,
pub extra_layers: List<Fd>,
pub main_started_notify: Maybe<Fd>,
pub create_only: bool,
pub linux_no_create_sys_dir: bool,
pub linux_no_mount_sys: bool,
pub linux_no_create_proc_dir: bool,
pub linux_no_mount_proc: bool,
}
impl InstantiateRequest {
pub fn dns(&mut self, dns: DnsSetting) {
self.dns = dns;
}
pub fn add_mount_req(&mut self, mount_req: MountReq) {
self.mount_req.push(mount_req);
}
pub fn add_copyin(&mut self, req: CopyFile) {
self.copies.push(req);
}
pub fn add_extra_layer(&mut self, extra_layer_fd: i32) {
self.extra_layers.push(Fd(extra_layer_fd));
}
}
impl Default for InstantiateRequest {
fn default() -> InstantiateRequest {
let image_reference = ImageReference {
hostname: None,
name: "xc-predefine".to_string(),
tag: ImageTag::Tag("empty".to_string()),
};
InstantiateRequest {
image_reference,
alt_root: None,
envs: HashMap::new(),
vnet: false,
ips: Vec::new(),
ipreq: Vec::new(),
mount_req: Vec::new(),
copies: List::new(),
entry_point: String::new(),
entry_point_args: Vec::new(),
hostname: None,
main_norun: false,
init_norun: false,
deinit_norun: false,
persist: false,
no_clean: false,
name: None,
dns: DnsSetting::Nop,
extra_layers: List::new(),
main_started_notify: Maybe::None,
create_only: false,
linux_no_create_sys_dir: false,
linux_no_mount_sys: false,
linux_no_create_proc_dir: false,
linux_no_mount_proc: false,
}
}
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -30,7 +30,6 @@ use ipc::packet::Packet;
use ipc::proto::{Request, Response};
use ipc::transport::PacketTransport;
use oci_util::digest::OciDigest;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ffi::OsString;
use std::fs::File;
@@ -45,7 +44,7 @@ use tokio::sync::watch::Receiver;
use tracing::{error, info};
use xc::config::XcConfig;
use xc::container::effect::UndoStack;
use xc::container::{Container, ContainerManifest};
use xc::container::{ContainerManifest, CreateContainer};
use xc::models::exec::Jexec;
use xc::models::jail_image::JailImage;
use xc::models::network::HostEntry;
@@ -377,7 +376,7 @@ impl Site {
info!("no extra layers to extract");
}
let container = Container {
let container = CreateContainer {
name: blueprint.name,
hostname: blueprint.hostname,
id: blueprint.id,
@@ -395,24 +394,33 @@ impl Site {
main_norun: blueprint.main_norun,
persist: blueprint.persist,
no_clean: blueprint.no_clean,
linux_no_create_sys_dir: false,
linux_no_create_proc_dir: false,
linux_no_create_sys_dir: blueprint.linux_no_create_sys_dir,
linux_no_mount_sys: blueprint.linux_no_mount_sys,
linux_no_create_proc_dir: blueprint.linux_no_create_proc_dir,
linux_no_mount_proc: blueprint.linux_no_mount_proc,
zfs_origin,
dns: blueprint.dns,
origin_image: blueprint.origin_image,
allowing: blueprint.allowing,
image_reference: blueprint.image_reference,
copies: blueprint.copies,
default_router: blueprint.default_router,
};
let running_container = container
.start_transactionally(&mut self.undo)
.create_transactionally(&mut self.undo)
.context("fail to start container")?;
_ = running_container.setup_resolv_conf(&blueprint.dns);
for copy in blueprint.copies.into_iter() {
_ = running_container.copyin(&copy);
}
let container_notify = running_container.notify.clone();
let main_started_notify = running_container.main_started_notify.clone();
let (kq, recv) = xc::container::runner::run(running_container, sock_b);
let (kq, recv) =
xc::container::runner::run(running_container, sock_b, !blueprint.create_only);
self.container = Some(recv);
self.ctl_channel = Some(kq);
self.container_notify = Some(container_notify);