cargo fmt

This commit is contained in:
elliptic
2024-02-25 23:07:37 -05:00
parent 8080395fd2
commit 701ca34903
23 changed files with 546 additions and 314 deletions

View File

@@ -177,7 +177,7 @@ pub fn kevent_classic(
match res {
-1 => {
if errno != nix::libc::EINTR {
break Err(nix::errno::Errno::from_i32(errno))
break Err(nix::errno::Errno::from_i32(errno));
}
}
size => break Ok(size as usize),
@@ -191,7 +191,7 @@ impl KqueueExt for nix::sys::event::Kqueue {
match self.kevent(changelist, eventlist, None) {
Ok(size) => break Ok(size),
Err(errno) if errno != Errno::EINTR => break Err(errno),
_ => continue
_ => continue,
}
}
}

View File

@@ -222,9 +222,17 @@ pub fn create_tap() -> Result<String, IfconfigError> {
.output()
.map_err(IfconfigError::RunError)?;
if output.status.success() {
Ok(std::str::from_utf8(&output.stdout).unwrap().trim_end().to_string())
Ok(std::str::from_utf8(&output.stdout)
.unwrap()
.trim_end()
.to_string())
} else {
Err(IfconfigError::CliError(std::str::from_utf8(&output.stderr).unwrap().trim_end().to_string()))
Err(IfconfigError::CliError(
std::str::from_utf8(&output.stderr)
.unwrap()
.trim_end()
.to_string(),
))
}
}
@@ -233,8 +241,16 @@ pub fn create_tun<A: AsRef<str>>() -> Result<String, IfconfigError> {
.output()
.map_err(IfconfigError::RunError)?;
if output.status.success() {
Ok(std::str::from_utf8(&output.stdout).unwrap().trim_end().to_string())
Ok(std::str::from_utf8(&output.stdout)
.unwrap()
.trim_end()
.to_string())
} else {
Err(IfconfigError::CliError(std::str::from_utf8(&output.stderr).unwrap().trim_end().to_string()))
Err(IfconfigError::CliError(
std::str::from_utf8(&output.stderr)
.unwrap()
.trim_end()
.to_string(),
))
}
}

View File

@@ -25,6 +25,7 @@ pub mod json;
use crate::packet::TypedPacket;
use freebsd::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::os::fd::{AsRawFd, RawFd};
@@ -35,6 +36,18 @@ pub struct FdRef(usize);
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
pub struct Fd(pub RawFd);
impl Fd {
pub fn stdout() -> Fd {
Fd(STDOUT_FILENO)
}
pub fn stderr() -> Fd {
Fd(STDERR_FILENO)
}
pub fn stdin() -> Fd {
Fd(STDIN_FILENO)
}
}
impl FromPacket for Fd {
type Dual = FdRef;
fn decode_from_dual(value: Self::Dual, fds: &[RawFd]) -> Self {

View File

@@ -35,7 +35,7 @@ use std::path::Path;
use std::time::{Duration, SystemTime};
use thiserror::Error;
use tokio::sync::watch::Sender;
use tracing::{debug, info};
use tracing::{debug, error, info};
#[derive(Debug, Error)]
pub enum ClientError {
@@ -57,6 +57,8 @@ pub enum ClientError {
IoError(std::io::Error),
#[error("Missing Bearer Token")]
MissingBearerToken,
#[error("No credential")]
MissingCredential,
#[error("digest mismatch, expected: {0}, got: {1}")]
DigestMismatched(OciDigest, OciDigest),
}
@@ -192,6 +194,7 @@ impl Registry {
Session {
registry: self.clone(),
repository,
use_basic_auth: false,
bearer_token: None,
}
}
@@ -201,6 +204,7 @@ impl Registry {
pub struct Session {
registry: Registry,
repository: String,
use_basic_auth: bool,
bearer_token: Option<crate::models::DockerAuthToken>,
}
@@ -238,17 +242,15 @@ impl Session {
if let Some(basic_auth) = self.registry.basic_auth.clone() {
request = request.basic_auth(basic_auth.username, Some(basic_auth.password));
}
debug!("try_authenticate request: {request:#?}");
let response = request.send().await?;
debug!("try_authenticate response: {response:#?}");
let auth_token: crate::models::DockerAuthToken = response.json().await?;
debug!("try_authenticate auth_token: {auth_token:#?}");
if auth_token.token().is_none() {
return Err(ClientError::MissingBearerToken);
} else {
debug!("try_authenticate setting bearer token: {auth_token:#?}");
self.bearer_token = Some(auth_token);
}
} else if fields.get("Basic realm").is_some() {
self.use_basic_auth = true;
}
Ok(())
}
@@ -257,6 +259,17 @@ impl Session {
let mut req = request;
if let Some(bearer) = self.bearer_token.clone() {
req = req.bearer_auth(bearer.token().unwrap());
} else if self.use_basic_auth {
if let Some(basic_auth) = self.registry.basic_auth.clone() {
debug!(
username = basic_auth.username,
password = basic_auth.password,
"using basic auth"
);
req = req.basic_auth(basic_auth.username, Some(basic_auth.password));
} else {
return Err(ClientError::MissingCredential);
}
}
Ok(req.send().await?)
}
@@ -272,7 +285,7 @@ impl Session {
let response = self.request(request).await?;
if response.status().as_u16() == 401 {
if let Some(www_auth) = response.headers().get("www-authenticate") {
debug!("www-auth: {www_auth:#?}");
debug!("www-authenticate: {www_auth:#?}");
self.try_authenticate(www_auth.to_str()?).await?;
Ok(self.request(cloned).await?)
} else {
@@ -383,6 +396,8 @@ impl Session {
}
}
/// Attempt to upload a layer with known OciDigest, returns Ok(None) if we can found the layer
/// on the registry (skips upload), or Ok(Some(descriptor)) if the upload is successful
pub async fn upload_content_known_digest(
&mut self,
progress: Option<Sender<UploadStat>>,
@@ -411,6 +426,7 @@ impl Session {
.await?;
if !init_res.status().is_success() {
error!("unsuccessful response");
return Err(ClientError::UnsuccessfulResponse(init_res));
}

View File

@@ -183,14 +183,34 @@ impl FromStr for ImageReference {
#[cfg(test)]
mod tests {
use super::ImageReference;
use super::{ImageReference, ImageTag};
use crate::digest::OciDigest;
use std::str::FromStr;
#[test]
fn test_parser() {
fn test_to_string() {
let input = "127.0.0.1/helloworld:1234567";
let output = ImageReference::from_str(input).unwrap();
eprintln!("output: {output:#?}");
assert_eq!(output.to_string(), input.to_string())
}
#[test]
fn test_parse_localhost() {
let input = "localhost:5000/helloworld:1234567";
let reference = ImageReference::from_str(input).unwrap();
assert_eq!(reference.hostname, Some("localhost:5000".to_string()))
}
#[test]
fn test_parse_multiple_components() {
let digest = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
let input = format!("a/b/c/d/e/f/g@{digest}");
let reference = ImageReference::from_str(&input).unwrap();
assert_eq!(reference.hostname, None);
assert_eq!(reference.name, "a/b/c/d/e/f/g");
assert_eq!(
reference.tag,
ImageTag::Digest(OciDigest::from_str(digest).unwrap())
);
}
}

View File

@@ -24,7 +24,7 @@
use crate::buffer::Buffer;
use freebsd::event::{KEventExt, KqueueExt};
use freebsd::nix::pty::openpty;
use freebsd::nix::sys::event::{Kqueue, EventFilter, EventFlag, KEvent};
use freebsd::nix::sys::event::{EventFilter, EventFlag, KEvent, Kqueue};
use freebsd::FreeBSDCommandExt;
use std::io::Write;
use std::os::fd::OwnedFd;

View File

@@ -23,7 +23,7 @@
// SUCH DAMAGE.
use freebsd::event::{KEventExt, KqueueExt};
use freebsd::nix::libc::{STDIN_FILENO, STDOUT_FILENO, VMIN, VTIME};
use freebsd::nix::sys::event::{Kqueue, EventFilter, EventFlag, KEvent};
use freebsd::nix::sys::event::{EventFilter, EventFlag, KEvent, Kqueue};
use freebsd::nix::sys::socket::{recv, send, MsgFlags};
use freebsd::nix::sys::termios::{tcsetattr, InputFlags, LocalFlags, OutputFlags, SetArg, Termios};
use freebsd::nix::unistd::{read, write};
@@ -112,7 +112,9 @@ impl ForwardState {
} else {
self.escaped = false;
}
} else if byte == &0x10 /* ctrl-p */ {
} else if byte == &0x10
/* ctrl-p */
{
self.escaped = true;
} else {
self.local_to_stream.push(*byte);

View File

@@ -263,9 +263,9 @@ pub fn format_capacity(size: usize) -> String {
}
}
pub fn format_bandwidth(size: usize, secs: u64) -> String {
pub fn format_bandwidth(size: usize, ms: u128) -> String {
let bits = (size * 8) as f64;
let ss = secs as f64;
let ss = (ms as f64) / 1000.0;
if size > GB {
format!("{:.2} gbps", bits / GB_F64 / ss)
} else if size > MB {

View File

@@ -228,7 +228,8 @@ impl Directive for RunDirective {
let bytes_to_write =
writer.read_to(&mut write_buf[..writable.min(8192)]);
match freebsd::nix::unistd::write(fd, &write_buf[..bytes_to_write]) {
match freebsd::nix::unistd::write(fd, &write_buf[..bytes_to_write])
{
Err(err) => {
error!("cannot write to remote process stdin: {err}");
_ = freebsd::nix::unistd::close(fd);

View File

@@ -490,6 +490,10 @@ fn main() -> Result<(), ActionError> {
match do_push_image(&mut conn, req)? {
Ok(_) => {
let mut lines_count = 0;
let mut last_pulled_ms: Option<u128> = None;
let mut last_pulled_bytes: std::collections::HashMap<OciDigest, usize> =
std::collections::HashMap::new();
// let mut last_pulled_ms: std::collections::HashMap<OciDigest, u128> = std::collections::HashMap::new();
loop {
std::thread::sleep(std::time::Duration::from_millis(500));
if lines_count > 0 {
@@ -519,18 +523,48 @@ fn main() -> Result<(), ActionError> {
Ordering::Equal => {
let uploaded =
res.bytes.map(format_capacity).unwrap_or_default();
let bandwidth = res
let (avg, bandwidth) = res
.bytes
.and_then(|bytes| {
res.duration_secs
.map(|sec| format_bandwidth(bytes, sec))
let b = bytes
- last_pulled_bytes.get(digest).unwrap_or(&0);
last_pulled_bytes.insert(digest.clone(), bytes);
res.duration_ms.map(|ms| {
(
format_bandwidth(bytes, ms),
format_bandwidth(
b,
ms - last_pulled_ms.unwrap_or_default(),
),
)
})
})
.unwrap_or_default();
/*
let (avg, bandwidth) = res
.bytes
.and_then(|bytes| {
let b = last_pulled_bytes.get(digest).map(|b| bytes - b).unwrap_or(*bytes);
let (avg, s) = res.duration_ms
.map(|ms| {
(
format_bandwidth(bytes, ms),
format_bandwidth(b, last_pulled_ms.map(|l| ms - l).unwrap_or_else(|| ms))
)
});
// .map(|ms| format_bandwidth(bytes, ms));
last_pulled_bytes.insert(digest.clone(), bytes);
(avg, s)
})
.unwrap_or_default();
*/
let total = res
.current_layer_size
.map(format_capacity)
.unwrap_or_default();
eprintln!("{digest} ... uploading {uploaded}/{total} @ {bandwidth}");
eprintln!("{digest} ... uploading {uploaded}/{total} @ {bandwidth} (avg {avg})");
}
Ordering::Greater => eprintln!("{digest}"),
};
@@ -546,6 +580,9 @@ fn main() -> Result<(), ActionError> {
eprintln!("Image manifest")
}
}
if let Some(ms) = res.duration_ms {
last_pulled_ms = Some(ms);
}
}
}
Err(err) => {
@@ -609,7 +646,13 @@ fn main() -> Result<(), ActionError> {
Some(unsafe { eventfd(0, freebsd::nix::libc::EFD_NONBLOCK) })
};
let reqt = InstantiateRequest {
let stdin = if args.detach || !args.interactive {
Maybe::None
} else {
Maybe::Some(Fd::stdin())
};
let mut reqt = InstantiateRequest {
dns,
create_only: false,
main_norun: false,
@@ -622,9 +665,27 @@ fn main() -> Result<(), ActionError> {
}),
main_exited_fd: Maybe::from_option(main_exited_notify.map(Fd)),
port_redirections: publish.into_iter().map(|p| p.to_host_spec()).collect(),
use_tty: args.terminal,
stdin,
stdout: if args.detach {
Maybe::None
} else {
Maybe::Some(Fd::stdout())
},
stderr: if args.detach {
Maybe::None
} else {
Maybe::Some(Fd::stderr())
},
..create.create_request()?
};
if args.terminal {
if let Ok(term) = std::env::var("TERM") {
reqt.envs.insert("TERM".to_string(), term.to_string());
}
}
let res = do_instantiate(&mut conn, reqt)?;
let exit_notify = main_exited_notify.map(EventFdNotify::from_fd);
(res, main_started_notify, exit_notify)
@@ -660,36 +721,47 @@ fn main() -> Result<(), ActionError> {
}
let id = res.id;
if let Ok(container) =
do_show_container_nocache(&mut conn, ShowContainerRequest { id })?
{
if let Some(reason) = container.running_container.fault {
error!("Container faulted {reason}");
} else {
let spawn_info = container
.running_container
.processes
.get("main")
.as_ref()
.and_then(|proc| proc.spawn_info.as_ref())
.expect("process not started yet or not found");
if let Some(socket) = &spawn_info.terminal_socket {
if let Ok(exit_by_user) = attach::run(socket) {
if !exit_by_user {
if let Some(notify) = main_exited_notify {
if let Ok(exit_value) = notify.notified_sync_take_value() {
let exit_status = decode_exit_code(exit_value);
std::process::exit(exit_status.code().unwrap_or(EXIT_FAILURE))
if args.terminal {
if let Ok(container) =
do_show_container_nocache(&mut conn, ShowContainerRequest { id })?
{
if let Some(reason) = container.running_container.fault {
error!("Container faulted {reason}");
} else {
let spawn_info = container
.running_container
.processes
.get("main")
.as_ref()
.and_then(|proc| proc.spawn_info.as_ref())
.expect("process not started yet or not found");
if let Some(socket) = &spawn_info.terminal_socket {
if let Ok(exit_by_user) = attach::run(socket) {
if !exit_by_user {
if let Some(notify) = main_exited_notify {
if let Ok(exit_value) =
notify.notified_sync_take_value()
{
let exit_status = decode_exit_code(exit_value);
std::process::exit(
exit_status.code().unwrap_or(EXIT_FAILURE),
)
}
}
}
}
} else {
info!("main process is not running with tty");
}
} else {
info!("main process is not running with tty");
}
} else {
panic!("cannot find container");
}
} else if let Some(notify) = main_exited_notify {
if let Ok(exit_value) = notify.notified_sync_take_value() {
let exit_status = decode_exit_code(exit_value);
std::process::exit(exit_status.code().unwrap_or(EXIT_FAILURE))
}
} else {
panic!("cannot find container");
}
}
} else {
@@ -795,17 +867,17 @@ fn main() -> Result<(), ActionError> {
stdin: if terminal {
Maybe::None
} else {
Maybe::Some(ipc::packet::codec::Fd(0))
Maybe::Some(Fd::stdin())
},
stdout: if terminal {
Maybe::None
} else {
Maybe::Some(ipc::packet::codec::Fd(1))
Maybe::Some(Fd::stdout())
},
stderr: if terminal {
Maybe::None
} else {
Maybe::Some(ipc::packet::codec::Fd(2))
Maybe::Some(Fd::stderr())
},
user,
group,

View File

@@ -95,6 +95,12 @@ pub(crate) struct RunArg {
pub(crate) entry_point: Option<String>,
pub(crate) entry_point_args: Vec<String>,
#[arg(short = 't', action)]
pub(crate) terminal: bool,
#[arg(short = 'i', long = "interactive", action)]
pub(crate) interactive: bool,
}
#[derive(Parser, Debug)]
@@ -160,8 +166,11 @@ pub(crate) struct CreateArgs {
#[arg(long = "tun")]
pub(crate) tun_ifaces: Vec<String>,
#[arg(long = "max.children", default_value="0")]
pub(crate) max_children: u32
#[arg(long = "enable-pf", default_value_t)]
pub(crate) enable_pf: bool,
#[arg(long = "max.children", default_value = "0")]
pub(crate) max_children: u32,
}
impl CreateArgs {
@@ -259,10 +268,7 @@ impl CreateArgs {
ips.push(IpWant(xc::models::network::IpAssign {
network: None,
interface: "lo0".to_string(),
addresses: vec![
"127.0.0.1/8".parse().unwrap(),
"::1/128".parse().unwrap()
]
addresses: vec!["127.0.0.1/8".parse().unwrap(), "::1/128".parse().unwrap()],
}))
}
@@ -292,6 +298,7 @@ impl CreateArgs {
tun_interfaces: Some(self.tun_ifaces),
tap_interfaces: Some(self.tap_ifaces),
children_max: self.max_children,
enable_pf: self.enable_pf,
..InstantiateRequest::default()
})
}

View File

@@ -429,7 +429,7 @@ impl CreateContainer {
finished_at: None,
jailed_datasets: self.jailed_datasets,
main_ip_selector: self.main_ip_selector,
envs: self.envs
envs: self.envs,
})
}
}

View File

@@ -196,7 +196,7 @@ impl ProcessRunner {
let exit_notify = exit_notify.or(exec.notify.map(|e| Arc::new(EventFdNotify::from_fd(e))));
debug!(exit_notify=format!("{exit_notify:?}"), "==");
debug!(exit_notify = format!("{exit_notify:?}"), "==");
let mut envs = self.container.envs.clone();
@@ -252,7 +252,6 @@ impl ProcessRunner {
},
};
for (key, value) in exec.envs.iter() {
envs.insert(key.to_string(), value.to_string());
}
@@ -273,14 +272,17 @@ impl ProcessRunner {
let network_name = network.network.as_ref().unwrap();
envs.insert(
format!("XC_NETWORK_{network_name}_ADDR_COUNT"),
network.addresses.len().to_string()
network.addresses.len().to_string(),
);
envs.insert(
format!("XC_NETWORK_{network_name}_IFACE"),
network.interface.to_string(),
);
for (i, addr) in network.addresses.iter().enumerate() {
envs.insert(format!("XC_NETWORK_{network_name}_ADDR_{i}"), addr.to_string());
envs.insert(
format!("XC_NETWORK_{network_name}_ADDR_{i}"),
addr.to_string(),
);
}
}
@@ -288,7 +290,6 @@ impl ProcessRunner {
envs.insert("XC_ID".to_string(), self.container.id.to_string());
let mut cmd = std::process::Command::new(&exec.arg0);
cmd.env_clear()
@@ -502,11 +503,7 @@ impl ProcessRunner {
self.parent_map.remove(&pid);
let descentdant = self.descentdant_map.get_mut(&ancestor).unwrap();
trace!(
pid,
ancestor,
"NOTE_EXIT"
);
trace!(pid, ancestor, "NOTE_EXIT");
if let Some(pos) = descentdant.iter().position(|x| *x == pid) {
descentdant.remove(pos);
@@ -521,7 +518,7 @@ impl ProcessRunner {
if stat.pid() == ancestor {
if ancestor == pid {
stat.set_exited(event.data() as i32);
info!(pid, exit_code=event.data(), "pid exited");
info!(pid, exit_code = event.data(), "pid exited");
unsafe {
freebsd::nix::libc::waitpid(pid as i32, std::ptr::null_mut(), 0)
};
@@ -550,7 +547,11 @@ impl ProcessRunner {
if descentdant_gone {
stat.set_tree_exited();
if stat.id() == "main" {
info!(id=self.container.id, jid=self.container.jid, "main process exited");
info!(
id = self.container.id,
jid = self.container.jid,
"main process exited"
);
self.main_exited = true;
self.container.finished_at = Some(epoch_now_nano());
if (self.container.deinit_norun || self.deinits.is_empty())
@@ -568,7 +569,10 @@ impl ProcessRunner {
}
}
} else {
debug!(descentdant=format!("{descentdant:?}"), "remaining descentdants");
debug!(
descentdant = format!("{descentdant:?}"),
"remaining descentdants"
);
}
}
}
@@ -789,7 +793,8 @@ pub fn run(
});
match fork_result {
freebsd::nix::unistd::ForkResult::Child => {
let title = std::ffi::CString::new(format!("worker jid={}", container.jid)).unwrap();
let title =
std::ffi::CString::new(format!("worker jid={}", container.jid)).unwrap();
unsafe { freebsd::libc::setproctitle(title.as_ptr()) };
let kq = unsafe { freebsd::nix::libc::kqueue() };
let mut pr = ProcessRunner::new(kq, container, auto_start);

View File

@@ -25,8 +25,8 @@
use crate::container::ProcessStat;
use freebsd::event::EventFdNotify;
use std::sync::Arc;
use std::os::unix::process::ExitStatusExt;
use std::sync::Arc;
use tokio::sync::watch::Sender;
use tracing::debug;
@@ -56,13 +56,13 @@ impl ProcessRunnerStat {
}
pub(super) fn set_exited(&mut self, exit_code: i32) {
debug!(pid=self.pid, exit_code=exit_code, "set_exited");
debug!(pid = self.pid, exit_code = exit_code, "set_exited");
self.process_stat.send_if_modified(|status| {
status.set_exited(exit_code);
true
});
if let Some(notify) = &self.exit_notify {
debug!(pid=self.pid, exit_code=exit_code, "notifing listeners");
debug!(pid = self.pid, exit_code = exit_code, "notifing listeners");
notify
.clone()
.notify_waiters_with_value(encode_exit_code(exit_code));

View File

@@ -27,7 +27,7 @@ use crate::container::request::Mount;
use crate::container::ContainerManifest;
use crate::models::exec::Jexec;
use crate::models::jail_image::JailImage;
use crate::models::network::{DnsSetting, IpAssign, MainAddressSelector, AssignedAddress};
use crate::models::network::{AssignedAddress, DnsSetting, IpAssign, MainAddressSelector};
use crate::util::realpath;
use anyhow::Context;
@@ -105,11 +105,11 @@ impl<'a> Iterator for ContainerNetworkIter<'a> {
loop {
match self.0.next() {
None => return None,
x@Some(assign) => {
x @ Some(assign) => {
if assign.network.is_none() {
continue
continue;
} else {
return x
return x;
}
}
}

View File

@@ -160,7 +160,7 @@ pub struct HostEntry {
#[derive(Clone, Debug)]
pub enum MainAddressSelector {
Network(String),
Ip(IpAddr)
Ip(IpAddr),
}
pub struct AssignedAddress {
@@ -170,45 +170,52 @@ pub struct AssignedAddress {
impl AssignedAddress {
pub fn new(interface: String, address: IpAddr) -> AssignedAddress {
AssignedAddress {
interface,
address,
}
AssignedAddress { interface, address }
}
}
impl MainAddressSelector {
pub fn select<'a, I: Iterator<Item = &'a IpAssign>>(selector: &Option<Self>, pool: I) -> Option<AssignedAddress> {
pub fn select<'a, I: Iterator<Item = &'a IpAssign>>(
selector: &Option<Self>,
pool: I,
) -> Option<AssignedAddress> {
match selector {
None => {
for alloc in pool {
if alloc.network.is_some() {
if let Some(address) = alloc.addresses.first() {
return Some(AssignedAddress::new(alloc.interface.to_string(), address.addr()))
return Some(AssignedAddress::new(
alloc.interface.to_string(),
address.addr(),
));
}
}
}
None
},
Some(MainAddressSelector::Ip(address)) => /*Some(*address)*/ {
}
Some(MainAddressSelector::Ip(address)) =>
/*Some(*address)*/
{
for alloc in pool {
if alloc.addresses.iter().any(|addr| addr.addr() == *address) {
return Some(AssignedAddress::new(alloc.interface.to_string(), *address))
return Some(AssignedAddress::new(alloc.interface.to_string(), *address));
}
}
None
},
}
Some(MainAddressSelector::Network(network)) => {
for alloc in pool {
match alloc.network.as_ref() {
Some(_network) if network == _network => {
match alloc.addresses.first() {
None => continue,
Some(addr) =>
return Some(AssignedAddress::new(alloc.interface.to_string(), addr.addr()))
Some(_network) if network == _network => match alloc.addresses.first() {
None => continue,
Some(addr) => {
return Some(AssignedAddress::new(
alloc.interface.to_string(),
addr.addr(),
))
}
},
_ => continue
_ => continue,
}
}
None
@@ -221,7 +228,9 @@ impl std::fmt::Display for MainAddressSelector {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MainAddressSelector::Ip(ip) => formatter.write_fmt(format_args!("ipadd/{ip}")),
MainAddressSelector::Network(network) => formatter.write_fmt(format_args!("network/{network}"))
MainAddressSelector::Network(network) => {
formatter.write_fmt(format_args!("network/{network}"))
}
}
}
}
@@ -229,21 +238,24 @@ impl std::fmt::Display for MainAddressSelector {
impl std::str::FromStr for MainAddressSelector {
type Err = anyhow::Error;
fn from_str(input: &str) -> Result<Self, Self::Err> {
input.split_once('/').ok_or(anyhow::anyhow!("malformed")).and_then(|(proto, val)| {
match proto {
"ipaddr" => val.parse::<IpAddr>()
input
.split_once('/')
.ok_or(anyhow::anyhow!("malformed"))
.and_then(|(proto, val)| match proto {
"ipaddr" => val
.parse::<IpAddr>()
.map_err(anyhow::Error::new)
.map(MainAddressSelector::Ip),
"network" => Ok(MainAddressSelector::Network(val.to_string())),
_ => Err(anyhow::anyhow!("malformed"))
}
})
_ => Err(anyhow::anyhow!("malformed")),
})
}
}
impl Serialize for MainAddressSelector {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::ser::Serializer
where
S: serde::ser::Serializer,
{
serializer.serialize_str(&self.to_string())
}
@@ -257,7 +269,8 @@ impl<'de> serde::de::Visitor<'de> for MainAddressSelectorVisitor {
formatter.write_str("expecting ip/<ipadr> or network/<network-name>")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where E: serde::de::Error
where
E: serde::de::Error,
{
value.parse().map_err(|e| E::custom(format!("{e:?}")))
}
@@ -265,7 +278,8 @@ impl<'de> serde::de::Visitor<'de> for MainAddressSelectorVisitor {
impl<'de> Deserialize<'de> for MainAddressSelector {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: serde::de::Deserializer<'de>
where
D: serde::de::Deserializer<'de>,
{
deserializer.deserialize_str(MainAddressSelectorVisitor)
}

View File

@@ -172,8 +172,8 @@ fn _realpath(
PathComp::RootDir => {
current.push(head);
real_path = root.clone();
continue
},
continue;
}
PathComp::CurDir => continue,
PathComp::ParentDir => {
if !current.pop() {

View File

@@ -570,11 +570,16 @@ impl ServerContext {
warn!("pf is disabled");
}
if let Some(address) = MainAddressSelector::select(&blueprint.main_ip_selector, blueprint.ip_alloc.iter()) {
if let Some(address) =
MainAddressSelector::select(&blueprint.main_ip_selector, blueprint.ip_alloc.iter())
{
if !blueprint.port_redirections.is_empty() {
for rdr in blueprint.port_redirections.iter() {
let mut rdr = rdr.clone();
rdr.with_host_info(&this.config.ext_ifs, ipcidr::IpCidr::from_singleton(address.address));
rdr.with_host_info(
&this.config.ext_ifs,
ipcidr::IpCidr::from_singleton(address.address),
);
this.port_forward_table.append_rule(id, rdr);
}
this.reload_pf_rdr_anchor()?;
@@ -587,7 +592,7 @@ impl ServerContext {
for dataset in jailing_datasets.iter() {
res.dataset_tracker.unjail(dataset);
}
return Err(error)
return Err(error);
}
};

View File

@@ -599,17 +599,19 @@ impl RootFsRecipe {
if !status.success() {
debug!(
file_path,
root=root.to_str(),
exit_code=status.code(),
"failed to extract file to root: ocitar exit with non-zero exit code")
root = root.to_str(),
exit_code = status.code(),
"failed to extract file to root: ocitar exit with non-zero exit code"
)
}
},
}
Err(error) => {
debug!(
file_path,
root=root.to_str(),
error=error.to_string(),
"failed to extract file to root");
root = root.to_str(),
error = error.to_string(),
"failed to extract file to root"
);
}
}
debug!(file_path, "finished");

View File

@@ -36,7 +36,7 @@ use std::sync::Arc;
use thiserror::Error;
use tokio::sync::watch::Receiver;
use tokio::sync::RwLock;
use tracing::{debug, info};
use tracing::{debug, error, info};
#[derive(Error, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum PushImageError {
@@ -62,7 +62,7 @@ pub struct PushImageStatusDesc {
pub fault: Option<String>,
pub bytes: Option<usize>,
pub duration_secs: Option<u64>,
pub duration_ms: Option<u128>,
pub current_layer_size: Option<usize>,
}
@@ -81,13 +81,13 @@ pub struct PushImageStatus {
impl PushImageStatus {
pub(super) fn to_desc(&self) -> PushImageStatusDesc {
let (bytes, duration_secs) = match &self.upload_status {
let (bytes, duration_ms) = match &self.upload_status {
None => (None, None),
Some(receiver) => {
let stat = receiver.borrow();
if let Some((bytes, elapsed)) = stat.started_at.and_then(|started_at| {
stat.uploaded
.map(|bytes| (bytes, started_at.elapsed().unwrap().as_secs()))
.map(|bytes| (bytes, started_at.elapsed().unwrap().as_millis()))
}) {
(Some(bytes), Some(elapsed))
} else {
@@ -104,10 +104,190 @@ impl PushImageStatus {
fault: self.fault.clone(),
current_layer_size: self.current_layer_size,
bytes,
duration_secs,
duration_ms,
}
}
}
async fn push_image_impl(
this: Arc<RwLock<ImageManager>>,
registry: Registry,
record: super::ImageRecord,
name: String,
tag: String,
layers_dir: std::path::PathBuf,
emitter: &mut TaskHandle<String, PushImageStatus>,
) -> Result<(), anyhow::Error> {
let mut session = registry.new_session(name.to_string());
let layers = record.manifest.layers().clone();
let mut selections = Vec::new();
#[allow(unreachable_code)]
'layer_loop: for layer in layers.iter() {
let maps = {
let this = this.clone();
let this = this.read().await;
this.query_archives(layer).await?
};
for map in maps.iter() {
let layer_file_path = {
let mut path = layers_dir.to_path_buf();
path.push(map.archive_digest.as_str());
path
};
if layer_file_path.exists() {
selections.push(map.clone());
continue 'layer_loop;
}
}
// XXX: we actually can recover by generating those layers, just not in this version
{
tracing::error!("cannot find archive layer for {layer}");
emitter.set_faulted(&format!("cannot find archive layer for {layer}"));
anyhow::bail!("cannot find archive layer for {layer}");
}
break;
}
_ = emitter.use_try(|state| {
state.layers = layers.clone();
Ok(())
});
if layers.len() != selections.len() {
todo!()
}
let mut uploads = Vec::new();
for map in selections.iter() {
let content_type = match map.algorithm.as_str() {
"gzip" => "application/vnd.oci.image.layer.v1.tar+gzip",
"zstd" => "application/vnd.oci.image.layer.v1.tar+zstd",
"plain" => "application/vnd.oci.image.layer.v1.tar",
_ => unreachable!(),
};
let mut path = layers_dir.to_path_buf();
path.push(map.archive_digest.as_str());
let file = std::fs::OpenOptions::new().read(true).open(&path)?;
let metadata = file.metadata().unwrap();
let layer_size = metadata.len() as usize;
let dedup_check = session.exists_digest(&map.archive_digest).await;
_ = emitter.use_try(|state| {
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
state.current_layer_size = Some(layer_size);
Ok(())
});
let descriptor = if dedup_check.is_ok() && dedup_check.unwrap() {
_ = emitter.use_try(|state| {
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
Ok(())
});
Descriptor {
digest: map.archive_digest.clone(),
media_type: content_type.to_string(),
size: layer_size,
}
} else {
info!("pushing {path:?}");
let (tx, upload_status) = tokio::sync::watch::channel(UploadStat::default());
_ = emitter.use_try(|state| {
state.upload_status = Some(upload_status.clone());
Ok(())
});
match session
.upload_content_known_digest(
Some(tx),
&map.archive_digest,
content_type.to_string(),
true,
map.origin.clone(),
file,
)
.await?
{
Some(descriptor) => descriptor,
None => Descriptor {
digest: map.archive_digest.clone(),
media_type: content_type.to_string(),
size: layer_size,
},
}
};
uploads.push(descriptor);
_ = emitter.use_try(|state| {
state.current_upload_idx = Some(state.current_upload_idx.unwrap() + 1);
Ok(())
});
}
// upload config
let config = serde_json::to_vec(&record.manifest)?;
_ = emitter.use_try(|state| {
state.push_config = true;
Ok(())
});
debug!("pushing config");
let config_descriptor = session
.upload_content(
None,
"application/vnd.oci.image.config.v1+json".to_string(),
config.as_slice(),
)
.await?;
let manifest = oci_util::models::ImageManifest {
schema_version: 2,
media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
config: config_descriptor, //,config.unwrap(),
layers: uploads,
};
debug!("Registering manifest: {manifest:#?}");
_ = emitter.use_try(|state| {
state.push_manifest = true;
Ok(())
});
debug!("Registering manifest: {manifest:#?}");
let arch = record.manifest.architecture();
let arch_tag = format!("{tag}-{arch}");
let platform = Platform {
os: record.manifest.os().to_string(),
architecture: arch.to_string(),
os_version: None,
os_features: Vec::new(),
variant: None,
features: Vec::new(),
};
let descriptor = session.register_manifest(&arch_tag, &manifest).await?;
session
.merge_manifest_list(&descriptor, &platform, &tag)
.await?;
_ = emitter.use_try(|state| {
state.done = true;
Ok(())
});
emitter.set_completed();
Ok::<(), anyhow::Error>(())
}
pub async fn push_image(
this: Arc<RwLock<ImageManager>>,
layers_dir: impl AsRef<std::path::Path>,
@@ -115,7 +295,7 @@ pub async fn push_image(
remote_reference: ImageReference,
) -> Result<Receiver<Task<String, PushImageStatus>>, PushImageError> {
let id = format!("{reference}->{remote_reference}");
info!("push image: {id}");
info!(id, "push image");
let name = remote_reference.name;
let tag = remote_reference.tag.to_string();
@@ -129,10 +309,14 @@ pub async fn push_image(
.await
.map_err(|_| PushImageError::NoSuchLocalReference)?;
let registry = remote_reference
.hostname
.and_then(|registry| reg.get_registry_by_name(&registry))
.ok_or(PushImageError::RegistryNotFound)?;
let registry = match remote_reference.hostname {
None => reg
.default_registry()
.ok_or(PushImageError::RegistryNotFound)?,
Some(hostname) => reg
.get_registry_by_name(&hostname)
.unwrap_or_else(|| Registry::new(hostname.to_string(), None)),
};
(registry, record)
};
@@ -150,179 +334,31 @@ pub async fn push_image(
let layers_dir = layers_dir.as_ref().to_path_buf();
tokio::spawn(async move {
let this = this.clone();
let mut session = registry.new_session(name.to_string());
let layers = record.manifest.layers().clone();
let mut selections = Vec::new();
// let (tx, upload_status) = tokio::sync::watch::channel(UploadStat::default());
#[allow(unreachable_code)]
'layer_loop: for layer in layers.iter() {
let maps = {
let this = this.clone();
let this = this.read().await;
this.query_archives(layer).await?
};
for map in maps.iter() {
/*
let layer_file = format!("{}/{}", layers_dir, map.archive_digest);
let layer_file_path = std::path::Path::new(&layer_file);
*/
let layer_file_path = {
let mut path = layers_dir.to_path_buf();
path.push(map.archive_digest.as_str());
path
};
if layer_file_path.exists() {
selections.push(map.clone());
continue 'layer_loop;
}
}
// XXX: we actually can recover by generating those layers, just not in this version
{
tracing::error!("cannot find archive layer for {layer}");
emitter.set_faulted(&format!("cannot find archive layer for {layer}"));
anyhow::bail!("cannot find archive layer for {layer}");
}
break;
}
_ = emitter.use_try(|state| {
state.layers = layers.clone();
Ok(())
});
if layers.len() != selections.len() {
todo!()
}
let mut uploads = Vec::new();
for map in selections.iter() {
let content_type = match map.algorithm.as_str() {
"gzip" => "application/vnd.oci.image.layer.v1.tar+gzip",
"zstd" => "application/vnd.oci.image.layer.v1.tar+zstd",
"plain" => "application/vnd.oci.image.layer.v1.tar",
_ => unreachable!(),
};
let mut path = layers_dir.to_path_buf();
path.push(map.archive_digest.as_str());
let file = std::fs::OpenOptions::new().read(true).open(&path)?;
let metadata = file.metadata().unwrap();
let layer_size = metadata.len() as usize;
let dedup_check = session.exists_digest(&map.archive_digest).await;
_ = emitter.use_try(|state| {
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
state.current_layer_size = Some(layer_size);
Ok(())
});
let descriptor = if dedup_check.is_ok() && dedup_check.unwrap() {
match push_image_impl(
this.clone(),
registry,
record,
name.to_string(),
tag.to_string(),
layers_dir,
&mut emitter,
)
.await
{
Ok(_) => {
_ = emitter.use_try(|state| {
if state.current_upload_idx.is_none() {
state.current_upload_idx = Some(0);
}
state.done = true;
Ok(())
});
Descriptor {
digest: map.archive_digest.clone(),
media_type: content_type.to_string(),
size: layer_size,
}
} else {
info!("pushing {path:?}");
let (tx, upload_status) = tokio::sync::watch::channel(UploadStat::default());
}
Err(error) => {
_ = emitter.use_try(|state| {
state.upload_status = Some(upload_status.clone());
Ok(())
state.fault = Some(format!("Upload failed: {error}"));
state.done = true;
Err::<(), String>(error.to_string())
});
let maybe_descriptor = session
.upload_content_known_digest(
Some(tx),
&map.archive_digest,
content_type.to_string(),
true,
map.origin.clone(),
file,
)
.await?;
if let Some(descriptor) = maybe_descriptor {
descriptor
} else {
Descriptor {
digest: map.archive_digest.clone(),
media_type: content_type.to_string(),
size: layer_size,
}
}
};
uploads.push(descriptor);
_ = emitter.use_try(|state| {
state.current_upload_idx = Some(state.current_upload_idx.unwrap() + 1);
Ok(())
});
}
}
// upload config
let config = serde_json::to_vec(&record.manifest)?;
_ = emitter.use_try(|state| {
state.push_config = true;
Ok(())
});
debug!("pushing config");
let config_descriptor = session
.upload_content(
None,
"application/vnd.oci.image.config.v1+json".to_string(),
config.as_slice(),
)
.await?;
let manifest = oci_util::models::ImageManifest {
schema_version: 2,
media_type: "application/vnd.oci.image.manifest.v1+json".to_owned(),
config: config_descriptor, //,config.unwrap(),
layers: uploads,
};
debug!("Registering manifest: {manifest:#?}");
_ = emitter.use_try(|state| {
state.push_manifest = true;
Ok(())
});
debug!("Registering manifest: {manifest:#?}");
let arch = record.manifest.architecture();
let arch_tag = format!("{tag}-{arch}");
let platform = Platform {
os: record.manifest.os().to_string(),
architecture: arch.to_string(),
os_version: None,
os_features: Vec::new(),
variant: None,
features: Vec::new(),
};
let descriptor = session.register_manifest(&arch_tag, &manifest).await?;
session
.merge_manifest_list(&descriptor, &platform, &tag)
.await?;
_ = emitter.use_try(|state| {
state.done = true;
Ok(())
});
emitter.set_completed();
Ok::<(), anyhow::Error>(())
});
Ok(rx)
}

View File

@@ -98,7 +98,6 @@ impl CheckedInstantiateRequest {
}
}
for assign in request.ips.iter() {
let iface = &assign.interface;
if !existing_ifaces.contains(iface) {
@@ -249,7 +248,7 @@ pub struct InstantiateBlueprint {
pub children_max: u32,
pub main_ip_selector: Option<MainAddressSelector>,
pub created_interfaces: Vec<String>,
pub port_redirections: Vec<PortRedirection>
pub port_redirections: Vec<PortRedirection>,
}
impl InstantiateBlueprint {
@@ -315,7 +314,7 @@ impl InstantiateBlueprint {
ipc::packet::codec::Maybe::None => None,
ipc::packet::codec::Maybe::Some(x) => Some(EventFdNotify::from_fd(x.as_raw_fd())),
};
let main_exited_notify = match request.request.main_exited_fd {
ipc::packet::codec::Maybe::None => None,
ipc::packet::codec::Maybe::Some(x) => Some(x.as_raw_fd()),
@@ -365,14 +364,22 @@ impl InstantiateBlueprint {
let interface = freebsd::net::ifconfig::create_tap()?;
tuntap_ifaces.push(interface.to_string());
envs.insert(tap, interface.clone());
ip_alloc.push(IpAssign { network: None, addresses: Vec::new(), interface });
ip_alloc.push(IpAssign {
network: None,
addresses: Vec::new(),
interface,
});
}
for tun in request.request.tun_interfaces.unwrap_or_default() {
let interface = freebsd::net::ifconfig::create_tap()?;
tuntap_ifaces.push(interface.to_string());
envs.insert(tun, interface.clone());
ip_alloc.push(IpAssign { network: None, addresses: Vec::new(), interface });
ip_alloc.push(IpAssign {
network: None,
addresses: Vec::new(),
interface,
});
}
let mut mount_req = Vec::new();
@@ -456,6 +463,10 @@ impl InstantiateBlueprint {
devfs_rules.push("path dtrace/helper unhide".to_string());
}
if request.request.enable_pf {
devfs_rules.push("path pf unhide".to_string());
}
for rule in request.devfs_rules.iter() {
devfs_rules.push(rule.to_string());
}
@@ -511,7 +522,15 @@ impl InstantiateBlueprint {
}
let mut jexec = entry_point.resolve_args(&envs, &spec.entry_point_args)?;
jexec.output_mode = StdioMode::Terminal;
if request.request.use_tty {
jexec.output_mode = StdioMode::Terminal;
} else {
jexec.output_mode = StdioMode::Forward {
stdin: request.request.stdin.to_option().map(|fd| fd.as_raw_fd()),
stdout: request.request.stdout.to_option().map(|fd| fd.as_raw_fd()),
stderr: request.request.stderr.to_option().map(|fd| fd.as_raw_fd()),
};
}
jexec.notify = main_exited_notify.map(|a| a.as_raw_fd());
tracing::warn!("jexec: {jexec:#?}");
Some(jexec)

View File

@@ -30,7 +30,7 @@ use crate::resources::network::Network;
use crate::resources::volume::{Volume, VolumeDriverKind};
use freebsd::event::EventFdNotify;
use freebsd::libc::{EINVAL, EIO, EPERM, ENOENT};
use freebsd::libc::{EINVAL, EIO, ENOENT, EPERM};
use ipc::packet::codec::{Fd, FromPacket, List, Maybe};
use ipc::proto::{enoent, ipc_err, GenericResult};
use ipc::service::{ConnectionContext, Service};
@@ -53,7 +53,7 @@ use xc::container::request::NetworkAllocRequest;
use xc::image_store::ImageStoreError;
use xc::models::exec::{IpcJexec, IpcStdioMode};
use xc::models::jail_image::JailConfig;
use xc::models::network::{DnsSetting, IpAssign, PortRedirection, MainAddressSelector};
use xc::models::network::{DnsSetting, IpAssign, MainAddressSelector, PortRedirection};
use xc::util::{gen_id, CompressionFormat, CompressionFormatExt};
#[derive(FromPacket, Debug)]
@@ -245,8 +245,11 @@ pub struct InstantiateRequest {
pub stdin: Maybe<Fd>,
pub stdout: Maybe<Fd>,
pub stderr: Maybe<Fd>,
pub use_tty: bool,
pub port_redirections: Vec<PortRedirection>,
pub enable_pf: bool,
}
impl InstantiateRequest {
@@ -313,7 +316,9 @@ impl Default for InstantiateRequest {
stdin: Maybe::None,
stdout: Maybe::None,
stderr: Maybe::None,
use_tty: false,
port_redirections: Vec::new(),
enable_pf: false,
}
}
}
@@ -817,7 +822,7 @@ async fn fd_import(
Err(error) => {
return ipc_err(
EPERM,
&format!("cannot open file for write and create at {tempfile_path:?}: {error}")
&format!("cannot open file for write and create at {tempfile_path:?}: {error}"),
)
}
};
@@ -830,8 +835,8 @@ async fn fd_import(
error!("cannot create temporary zfs dataset at {tempdataset}: {error}");
return ipc_err(
EPERM,
&format!("cannot create temporary zfs dataset at {tempdataset}: {error}")
)
&format!("cannot create temporary zfs dataset at {tempdataset}: {error}"),
);
}
};
@@ -841,15 +846,15 @@ async fn fd_import(
error!("dataset {tempdataset} do not have a mountpoint");
return ipc_err(
ENOENT,
&format!("dataset {tempdataset} do not have a mountpoint")
)
},
&format!("dataset {tempdataset} do not have a mountpoint"),
);
}
Err(error) => {
error!("zfs exec failure: {error} when querying mountpoint for {tempdataset}");
return ipc_err(
EINVAL,
&format!("zfs exec failure: {error} when querying mountpoint for {tempdataset}")
)
&format!("zfs exec failure: {error} when querying mountpoint for {tempdataset}"),
);
}
};

View File

@@ -37,7 +37,6 @@ pub trait RegistriesProvider {
}
pub struct JsonRegistryProvider {
// file: std::fs::File,
path: std::path::PathBuf,
data: RegistriesJsonScheme,
}