This commit contains multiple pretty big changes:

- Adjust how ZFS image datasets created
- Preliminary support for build layers caching by creating cache directory that
  preserves the state of the files that may change generated layers
- Implement remove image and purge ipc calls
This commit is contained in:
Yan Ka, Chiu
2023-07-15 14:33:58 -04:00
parent ebbf75d1ab
commit d5555d2ee5
24 changed files with 489 additions and 162 deletions

View File

@@ -200,7 +200,27 @@ impl ZfsHandle {
Err(ZfsError::Generic(output.status, stderr.to_string()))
}
}
pub fn list_snapshots(&self, dataset: impl AsRef<Path>) -> Result<Vec<PathBuf>> {
let output = self.use_command_with_output(|cmd| {
cmd.arg("list")
.arg("-H")
.arg("-t")
.arg("snap")
.arg("-o")
.arg("name")
.arg("-d")
.arg("1")
.arg(dataset.as_ref());
})?;
let mut bufs = Vec::new();
for line in output.lines().flatten() {
bufs.push(Path::new(&line).to_path_buf());
}
Ok(bufs)
}
pub fn list_direct_children(&self, dataset: impl AsRef<Path>) -> Result<Vec<PathBuf>> {
let output = self.use_command_with_output(|cmd| {
cmd.arg("list")

View File

@@ -81,7 +81,7 @@ async fn main() -> Result<()> {
.expect("manifest not found");
let config_descriptor = manifest.config;
let config: FreeOciConfig<Value> = session
let config: Value = session
.fetch_blob_as(&config_descriptor.digest)
.await?
.expect("config not found");

View File

@@ -132,6 +132,14 @@ pub struct FreeOciConfig<T> {
pub os: String,
pub config: Option<T>,
pub rootfs: OciConfigRootFs,
pub history: Vec<Histroy>,
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug)]
pub struct Histroy {
created: String,
created_by: String,
empty_layer: Option<bool>,
}
impl<T> FreeOciConfig<T> {

View File

@@ -90,7 +90,6 @@ impl<R: Read> Read for DigestReaderHandle<R> {
}
}
pub struct DigestSink<W: Write> {
sink: W,
digest: Rc<RefCell<Sha256>>,
@@ -100,10 +99,7 @@ pub struct DigestSink<W: Write> {
impl<T: Write> DigestSink<T> {
pub fn new<W: Write>(sink: W, digest: Rc<RefCell<Sha256>>) -> DigestSink<W> {
DigestSink {
sink,
digest,
}
DigestSink { sink, digest }
}
}
impl<W: Write> Write for DigestSink<W> {

View File

@@ -7,6 +7,9 @@ edition = "2021"
name = "xc"
path = "src/main.rs"
[dev-dependencies]
serial_test = "0.8.0"
[dependencies]
anyhow = "1"
clap = { version = "3", features = ["derive"] }

View File

@@ -52,6 +52,9 @@ pub(crate) enum ImageAction {
GetConfig {
image_id: String,
},
Remove {
image_id: ImageReference,
},
SetConfig {
image_id: String,
meta_path: String,
@@ -377,6 +380,9 @@ pub(crate) fn use_image_action(
}
}
}
ImageAction::Remove { image_id } => {
do_remove_image(conn, image_id)?;
}
ImageAction::SetConfig {
image_id,
meta_path,

View File

@@ -40,6 +40,9 @@ pub(crate) struct AddEnvDirective {
}
impl Directive for AddEnvDirective {
fn up_to_date(&self) -> bool {
true
}
fn from_action(action: &Action) -> Result<AddEnvDirective> {
let mut args = vec!["dummy".to_string()];
args.extend(action.args.clone());

View File

@@ -43,6 +43,9 @@ pub(crate) struct CopyDirective {
}
impl Directive for CopyDirective {
fn up_to_date(&self) -> bool {
true
}
fn from_action(action: &Action) -> Result<CopyDirective> {
if action.directive_name != "COPY" {
bail!("directive_name is not COPY")

View File

@@ -39,6 +39,11 @@ pub(crate) struct FromDirective {
}
impl Directive for FromDirective {
fn up_to_date(&self) -> bool {
// XXX: Technically, we should check against the image manifest digest to verify the same
// tag is still pointing to the same image, but hey, I'm lazy!
true
}
fn from_action(action: &Action) -> Result<FromDirective> {
if action.directive_name != "FROM" {
bail!("directive_name is not FROM");

View File

@@ -39,6 +39,7 @@ use xc::models::{EntryPoint, EnvSpec, MountSpec, SystemVPropValue};
pub(crate) trait Directive: Sized {
fn from_action(action: &Action) -> Result<Self, anyhow::Error>;
fn run_in_context(&self, context: &mut JailContext) -> Result<(), anyhow::Error>;
fn up_to_date(&self) -> bool;
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -202,6 +203,10 @@ impl ConfigMod {
}
impl Directive for ConfigMod {
fn up_to_date(&self) -> bool {
true
}
fn from_action(action: &Action) -> Result<Self, anyhow::Error> {
match action.directive_name.as_str() {
"WORKDIR" => {

View File

@@ -95,6 +95,9 @@ pub(crate) struct RunDirective {
}
impl Directive for RunDirective {
fn up_to_date(&self) -> bool {
true
}
fn from_action(action: &Action) -> Result<RunDirective> {
let mut args_iter = action.args.iter();
let mut envs = HashMap::new();

View File

@@ -43,6 +43,9 @@ pub(crate) struct VolumeDirective {
name: Option<String>,
}
impl Directive for VolumeDirective {
fn up_to_date(&self) -> bool {
true
}
fn from_action(action: &Action) -> Result<VolumeDirective> {
if action.directive_name != "VOLUME" {
bail!("directive_name is not VOLUME")

View File

@@ -23,6 +23,7 @@
// SUCH DAMAGE.
pub mod directives;
pub mod parse;
pub mod statefile;
use ipc::packet::codec::{Fd, Maybe};
use oci_util::image_reference::ImageReference;

View File

@@ -142,6 +142,7 @@ enum Action {
no_print_header: bool,
format: Option<String>,
},
Purge,
Pull {
image_id: ImageReference,
local_reference: Option<ImageReference>,
@@ -438,6 +439,9 @@ fn main() -> Result<(), ActionError> {
Action::Network(action) => {
_ = use_network_action(&mut conn, action);
}
Action::Purge => {
do_purge(&mut conn, ())?.unwrap();
}
Action::Ps {
no_print_header,
format,

View File

@@ -100,7 +100,7 @@ macro_rules! impl_undos {
paste! {
#[doc = $doc]
#[doc = ""]
#[doc = "# Parameters"]
#[doc = "# Arguments"]
$(
#[doc = "* `" $arg "` - " $($adoc)? ""]
)*

View File

@@ -62,6 +62,8 @@ pub trait ImageStore {
fn register_manifest(&self, manifest: &JailImage) -> Result<OciDigest, ImageStoreError>;
fn purge_all_untagged_manifest(&self) -> Result<(), ImageStoreError>;
fn tag_manifest(
&self,
manifest: &OciDigest,

View File

@@ -74,7 +74,8 @@ impl SqliteImageStore {
create table if not exists image_manifests (
manifest text not null,
digest text not null primary key
digest text not null primary key,
origin text
);
create table if not exists image_manifest_tags (
@@ -86,13 +87,9 @@ impl SqliteImageStore {
references image_manifests(digest)
on delete cascade
);
create table if not exists commit_assoc (
commit_id text not null,
digest text not null
);
",
);
)?;
// only for people installed and used xc prior to July 6th, 2023
if self
.db
@@ -106,6 +103,15 @@ impl SqliteImageStore {
}
impl ImageStore for SqliteImageStore {
fn purge_all_untagged_manifest(&self) -> Result<(), ImageStoreError> {
self.db.execute(
"
delete from image_manifests where digest not in (select digest from image_manifest_tags)
",
[],
)?;
Ok(())
}
fn query_diff_id(&self, digest: &OciDigest) -> Result<Option<DiffIdMap>, ImageStoreError> {
let mut stmt = self.db.prepare_cached(
"

View File

@@ -193,6 +193,7 @@ impl Default for JailImage {
typ: "layers".to_string(),
diff_ids: Vec::new(),
},
history: Vec::new(),
})
}
}
@@ -242,6 +243,7 @@ impl JailConfig {
typ: "layers".to_string(),
diff_ids,
},
history: Vec::new(),
})
}
pub fn from_json(value: serde_json::Value) -> Option<JailImage> {

View File

@@ -38,6 +38,23 @@ where
Ok(opt.unwrap_or_default())
}
pub fn sha256_hex_file_r_bytes(path: impl AsRef<Path>) -> Result<[u8; 32], anyhow::Error> {
use sha2::{Digest, Sha256};
use std::io::Read;
let stat = std::fs::metadata(path.as_ref())?;
let mut file = std::fs::OpenOptions::new().read(true).open(path.as_ref())?;
let mut buf = [0u8; 4096];
let mut hasher = Sha256::new();
let mut remaining = stat.len() as usize;
while remaining > 0 {
let nread = file.read(&mut buf)?;
hasher.update(&buf[..(4096.min(nread))]);
remaining -= nread;
}
let digest: [u8; 32] = hasher.finalize().into();
Ok(digest)
}
#[derive(Debug, Clone)]
pub enum PathComp {
RootDir,
@@ -349,7 +366,7 @@ mod tests {
let root = "/usr/home/yuuji/test_xc_realpath/root";
let path = "/a/b/c";
let expected = "/usr/home/yuuji/test_xc_realpath/root/usr";
let result = realpath(root, path, 16);
let result = realpath(root, path);
eprintln!("result: {result:#?}");
assert!(false);
// assert_eq!(result, Ok(Some(std::path::PathBuf::from(expected))))

View File

@@ -39,7 +39,7 @@ use crate::util::TwoWayMap;
use anyhow::Context;
use freebsd::fs::zfs::{ZfsHandle, ZfsSnapshot};
use freebsd::net::pf;
use oci_util::digest::OciDigest;
use oci_util::digest::{DigestAlgorithm, OciDigest};
use oci_util::image_reference::ImageReference;
use oci_util::layer::ChainId;
use std::collections::HashMap;
@@ -170,6 +170,94 @@ impl ServerContext {
self.sites.get(id).cloned()
}
// XXX: Potential race condition when trying to import/commit/pull images during purge
pub(crate) async fn purge_images(&self) -> anyhow::Result<()> {
let config = self.config();
let layers_dir = &config.layers_dir;
let im = self.image_manager.read().await;
_ = im.purge().await?;
let files = std::fs::read_dir(&layers_dir).and_then(|dir| {
let mut files = Vec::new();
for entry in dir {
let entry = entry?;
if entry.file_type()?.is_file() {
if let Some(filename) = entry
.file_name()
.to_str()
.and_then(|s| s.parse::<OciDigest>().ok())
{
files.push(filename);
}
}
}
Ok(files)
})?;
let zfs = ZfsHandle::default();
let chain_ids = ZfsHandle::default()
.list_direct_children(&config.image_dataset)?
.into_iter()
.filter_map(|pb| {
pb.file_name()
.and_then(|oss| oss.to_str())
.and_then(|s| s.parse::<ChainId>().ok())
});
let mut file_set: std::collections::HashSet<OciDigest> =
std::collections::HashSet::from_iter(files.into_iter());
let mut chain_id_set: std::collections::HashSet<ChainId> =
std::collections::HashSet::from_iter(chain_ids);
let records = im.list_all_tagged().await?;
for record in records.iter() {
if !file_set.is_empty() {
let files = record.manifest.layers();
for file in files.iter() {
for repr in im.query_archives(&file).await?.iter() {
file_set.remove(&repr.archive_digest);
file_set.remove(&repr.diff_id);
}
}
}
if !chain_id_set.is_empty() {
if let Some(cid) = record.manifest.chain_id() {
chain_id_set.remove(&cid);
let props = zfs.get_props(format!("{}/{cid}", config.image_dataset))?;
let mut origin_chain = None;
while {
if let Some(Some(origin)) = props.get("origin") {
if let Some(c) = origin
.split_once('@')
.and_then(|(_, c)| ChainId::from_str(c).ok())
{
chain_id_set.remove(&c);
origin_chain = Some(c);
}
}
origin_chain.is_some()
} {}
}
}
}
for garbage in file_set.iter() {
std::fs::remove_file(format!("{layers_dir}/{garbage}"))?;
}
for chain_id in chain_id_set.iter() {
_ = zfs.destroy(
format!("{}/{chain_id}", config.image_dataset),
false,
false,
false,
);
}
Ok(())
}
pub async fn resolve_image(
&self,
name: impl AsRef<str>,
@@ -260,51 +348,17 @@ impl ServerContext {
}
ret
}
pub(crate) async fn do_commit_file(
&mut self,
container_name: &str,
file_fd: RawFd,
) -> Result<OciDigest, anyhow::Error> {
let file = unsafe { std::fs::File::from_raw_fd(file_fd) };
let container = self
.resolve_container_by_name(container_name)
.await
.expect("no such container");
let container_id = container.id;
let commit_id = xc::util::gen_id();
let config = self.config();
let running_dataset = format!("{}/{}", config.container_dataset, container_id);
let dst_dataset = format!("{}/{}", config.container_dataset, commit_id);
let zfs_origin = container.zfs_origin.expect("missing zfs origin");
let zfs = ZfsHandle::default();
zfs.snapshot2(&running_dataset, &commit_id).unwrap();
debug!("taking zfs snapshot for {dst_dataset}@xc");
let child = std::process::Command::new("ocitar")
.arg("-cf-")
.arg("--write-to-stderr")
.stdout(file)
.stderr(Stdio::piped())
.arg("--zfs-diff")
.arg(format!("{zfs_origin}@xc"))
.arg(format!("{running_dataset}@{commit_id}"))
.spawn()
.expect("fail to spawn ocitar");
let output = child.wait_with_output()?;
let (diff_id, _digest) = {
let mut results = std::str::from_utf8(&output.stdout).unwrap().trim().lines();
let diff_id = results.next().expect("unexpected output");
let digest = results.next().expect("unexpected output");
eprintln!("diff_id: {diff_id}");
(
OciDigest::from_str(diff_id).unwrap(),
OciDigest::from_str(digest).unwrap()
)
};
zfs.destroy(format!("{running_dataset}@{commit_id}"), true, true, true)?;
let site = self.get_site(container_name).context("no surch site")?;
let mut site = site.write().await;
let snapshot = site.snapshot_with_generated_tag()?;
let (diff_id, _digest) = site.commit_to_file("init", &snapshot, file)?;
Ok(diff_id)
}
@@ -313,88 +367,121 @@ impl ServerContext {
container_name: &str,
name: &str,
tag: &str,
) -> Result<String, anyhow::Error> {
let container = self
.resolve_container_by_name(container_name)
.await
.expect("no such container");
let container_id = container.id.to_string();
let commit_id = xc::util::gen_id();
) -> Result<OciDigest, anyhow::Error> {
let config = self.config();
let layers_dir = &config.layers_dir;
let temp_file = format!("{layers_dir}/{commit_id}");
//
let running_dataset = format!("{}/{}", config.container_dataset, container_id);
let dst_dataset = format!("{}/{}", config.container_dataset, commit_id);
let zfs_origin = container.zfs_origin.clone().expect("missing zfs origin");
let zfs = ZfsHandle::default();
let mut manifest = container
.origin_image
.clone()
.expect("missing origin image");
{
zfs.snapshot2(running_dataset.clone(), &commit_id.to_string())
.unwrap();
zfs.clone2(
running_dataset.clone(),
&commit_id.to_string(),
dst_dataset.clone(),
)
.unwrap();
zfs.promote(dst_dataset.clone()).unwrap();
}
let snapshot = ZfsSnapshot::new(&dst_dataset, "xc");
debug!("taking zfs snapshot for {dst_dataset}@xc");
snapshot.execute(&zfs)?;
let prev_chain_id = zfs_origin.rsplit_once('/').expect("").1;
debug!("prev_chain_id: {prev_chain_id:#?}");
let mut chain_id = ChainId::from_str(prev_chain_id)?;
let output = std::process::Command::new("ocitar")
.arg("-cf")
.arg(temp_file.clone())
.arg("--compression")
.arg("zstd")
.arg("--zfs-diff")
.arg(format!("{zfs_origin}@xc"))
.arg(format!("{dst_dataset}@xc"))
.output()
.expect("fail to spawn ocitar");
let (diff_id, digest) = {
let mut results = std::str::from_utf8(&output.stdout).unwrap().trim().lines();
let diff_id = results.next().expect("unexpected output");
let digest = results.next().expect("unexpected output");
eprintln!("diff_id: {diff_id}");
eprintln!("rename: {temp_file} -> {}/{digest}", config.layers_dir);
std::fs::rename(temp_file, format!("{}/{digest}", config.layers_dir))?;
(
OciDigest::from_str(diff_id).unwrap(),
OciDigest::from_str(digest).unwrap()
)
};
chain_id.consume_diff_id(oci_util::digest::DigestAlgorithm::Sha256, &diff_id);
let new_name = format!("{}/{chain_id}", config.image_dataset);
zfs.rename(&dst_dataset, new_name)?;
manifest.push_layer(&diff_id);
let commit_id = xc::util::gen_id();
let site = self.get_site(container_name).context("no such site")?;
let mut site = site.write().await;
let snapshot = site.snapshot_with_generated_tag()?;
let temp_file_path = format!("{layers_dir}/{commit_id}");
let temp_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&temp_file_path)?;
let (diff_id, digest) = site.commit_to_file("init", &snapshot, temp_file)?;
// XXX: otherwise default to null image
let mut image = site.container_dump().and_then(|c| c.origin_image).unwrap();
image.push_layer(&diff_id);
let chain_id = image.chain_id().unwrap();
let dst_dataset = format!("{}/{chain_id}", config.image_dataset);
site.promote_snapshot(&snapshot, &dst_dataset)?;
site.zfs.snapshot2(&dst_dataset, "xc")?;
let context = self.image_manager.read().await;
_ = context
.register_and_tag_manifest(name, tag, &manifest)
.await;
_ = context.register_and_tag_manifest(name, &tag, &image).await;
context.map_diff_id(&diff_id, &digest, "zstd").await?;
Ok(commit_id)
std::fs::rename(&temp_file_path, format!("{layers_dir}/{digest}"))?;
Ok(diff_id)
}
/*
pub(crate) async fn do_commit(
&mut self,
container_name: &str,
name: &str,
tag: &str,
) -> Result<String, anyhow::Error> {
let container = self
.resolve_container_by_name(container_name)
.await
.expect("no such container");
let container_id = container.id.to_string();
let commit_id = xc::util::gen_id();
let config = self.config();
let layers_dir = &config.layers_dir;
let temp_file = format!("{layers_dir}/{commit_id}");
//
let running_dataset = format!("{}/{}", config.container_dataset, container_id);
let dst_dataset = format!("{}/{}", config.container_dataset, commit_id);
let zfs_origin = container.zfs_origin.clone().expect("missing zfs origin");
let zfs = ZfsHandle::default();
let mut manifest = container
.origin_image
.clone()
.expect("missing origin image");
{
zfs.snapshot2(running_dataset.clone(), &commit_id.to_string())
.unwrap();
zfs.clone2(
running_dataset.clone(),
&commit_id.to_string(),
dst_dataset.clone(),
)
.unwrap();
zfs.promote(dst_dataset.clone()).unwrap();
}
let snapshot = ZfsSnapshot::new(&dst_dataset, "xc");
debug!("taking zfs snapshot for {dst_dataset}@xc");
snapshot.execute(&zfs)?;
let prev_chain_id = zfs_origin.rsplit_once('/').expect("").1;
debug!("prev_chain_id: {prev_chain_id:#?}");
let mut chain_id = ChainId::from_str(prev_chain_id)?;
let output = std::process::Command::new("ocitar")
.arg("-cf")
.arg(temp_file.clone())
.arg("--compression")
.arg("zstd")
.arg("--zfs-diff")
.arg(format!("{zfs_origin}@xc"))
.arg(format!("{dst_dataset}@xc"))
.output()
.expect("fail to spawn ocitar");
let (diff_id, digest) = {
let mut results = std::str::from_utf8(&output.stdout).unwrap().trim().lines();
let diff_id = results.next().expect("unexpected output");
let digest = results.next().expect("unexpected output");
eprintln!("diff_id: {diff_id}");
eprintln!("rename: {temp_file} -> {}/{digest}", config.layers_dir);
std::fs::rename(temp_file, format!("{}/{digest}", config.layers_dir))?;
(
OciDigest::from_str(diff_id).unwrap(),
OciDigest::from_str(digest).unwrap()
)
};
chain_id.consume_diff_id(oci_util::digest::DigestAlgorithm::Sha256, &diff_id);
let new_name = format!("{}/{chain_id}", config.image_dataset);
zfs.rename(&dst_dataset, new_name)?;
manifest.push_layer(&diff_id);
let context = self.image_manager.read().await;
_ = context
.register_and_tag_manifest(name, tag, &manifest)
.await;
context.map_diff_id(&diff_id, &digest, "zstd").await?;
Ok(commit_id)
}
*/
pub(crate) async fn do_rdr(
&mut self,
name: &str,

View File

@@ -48,9 +48,10 @@ use xc::image_store::{DiffIdMap, ImageRecord, ImageStore, ImageStoreError};
use xc::models::jail_image::JailImage;
use xc::tasks::{DownloadLayerStatus, ImportImageState, ImportImageStatus};
struct DiffMap {
diff_id: OciDigest,
descriptor: Descriptor,
#[derive(Clone)]
pub struct DiffMap {
pub diff_id: OciDigest,
pub descriptor: Descriptor,
}
/// Shared environment accessible to workers
@@ -169,19 +170,6 @@ impl ImageManager {
.map_diff_id(diff_id, archive, content_type)
}
#[allow(dead_code)]
pub async fn associate_commit_manifest(
&self,
commit_id: &str,
manifest: &JailImage,
) -> Result<(), ImageStoreError> {
self.context
.image_store
.lock()
.await
.associate_commit_manifest(commit_id, manifest)
}
pub async fn query_archives(
&self,
diff_id: &OciDigest,
@@ -193,6 +181,19 @@ impl ImageManager {
.query_archives(diff_id)
}
pub async fn untag_image(&self, name: &str, tag: &str) -> Result<(), ImageStoreError> {
self.context.image_store.lock().await.untag(name, tag)
}
pub async fn purge(&self) -> Result<(), ImageStoreError> {
self.context
.image_store
.lock()
.await
.purge_all_untagged_manifest()?;
Ok(())
}
pub fn get_upload_state(&mut self, id: &str) -> PushImageStatusDesc {
match self.push_image.get(&id.to_string()) {
None => PushImageStatusDesc::default(),
@@ -436,13 +437,14 @@ impl ImageManager {
/// desired rootfs
#[derive(Clone, Debug)]
pub struct RootFsRecipe {
chain_id: ChainId,
/// expected chain_id
pub chain_id: ChainId,
/// The dataset to be cloned
source: Option<ChainId>,
pub source: Option<ChainId>,
/// The filesystem layers to be extracted at the cloned root
digests: Vec<Descriptor>, // digests: Vec<OciDigest>,
pub digests: Vec<Descriptor>, // digests: Vec<OciDigest>,
diff_ids: Vec<OciDigest>,
pub diff_ids: Vec<OciDigest>,
}
/// Given a dataset, get all the children datasets that have their name conform to the OCI chain id
@@ -543,7 +545,10 @@ impl RootFsRecipe {
if !handle.exists(&source_dataset) {
return Err(StageLayerError::SourceDatasetNotFound(id.clone()));
}
handle.clone2(&source_dataset, "xc", &target_dataset)?;
// TODO: rollback the dataset first
handle.snapshot2(&source_dataset, "xc2")?;
handle.clone2(&source_dataset, "xc2", &target_dataset)?;
handle.promote(&target_dataset)?;
} else {
debug!("creating new dataset as no ancestors found");
handle.create2(&target_dataset, false, false)?;

View File

@@ -421,6 +421,34 @@ async fn describe_images(
Ok(rows)
}
#[ipc_method(method = "remove_image")]
async fn remove_image(
context: Arc<RwLock<ServerContext>>,
local_context: &mut ConnectionContext<Variables>,
request: ImageReference,
) -> GenericResult<()> {
// XXX: Handle @<digest> tag
context
.read()
.await
.image_manager
.read()
.await
.untag_image(&request.name, &request.tag.to_string())
.await
.unwrap();
Ok(())
}
#[ipc_method(method = "purge")]
async fn purge(
context: Arc<RwLock<ServerContext>>,
local_context: &mut ConnectionContext<Variables>,
request: (),
) -> GenericResult<()> {
context.read().await.purge_images().await.unwrap();
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ListNetworkRequest {}
@@ -729,13 +757,12 @@ async fn commit_container(
) -> GenericResult<CommitResponse> {
let mut ctx = context.write().await;
let result = if let Maybe::Some(fd) = request.alt_out {
ctx.do_commit_file(&request.container_name, fd.0)
.await
.map(|a| a.to_string())
ctx.do_commit_file(&request.container_name, fd.0).await
} else {
ctx.do_commit(&request.container_name, &request.name, &request.tag)
.await
};
}
.map(|s| s.to_string());
match result {
Ok(commit_id) => {
let response = CommitResponse { commit_id };
@@ -967,6 +994,8 @@ pub(crate) async fn register_to_service(
service: &mut Service<tokio::sync::RwLock<ServerContext>, Variables>,
) {
service.register_event_delegate(on_channel_closed).await;
service.register(purge).await;
service.register(remove_image).await;
service.register(create_channel).await;
service.register(exec).await;
service.register(fd_import).await;

View File

@@ -26,6 +26,7 @@ mod config_manager;
mod context;
mod devfs_store;
mod image;
mod layer_manager;
mod network_manager;
mod port;
mod registry;

View File

@@ -22,15 +22,21 @@
// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
use crate::context::instantiate::InstantiateBlueprint;
use anyhow::{anyhow, bail, Context};
use freebsd::event::{EventFdNotify, Notify};
use freebsd::fs::zfs::ZfsHandle;
use ipc::packet::Packet;
use ipc::proto::Request;
use ipc::transport::PacketTransport;
use oci_util::digest::OciDigest;
use std::ffi::OsString;
use std::fs::File;
use std::os::fd::{FromRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::process::Stdio;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::watch::Receiver;
use tracing::{error, info};
@@ -52,9 +58,13 @@ pub struct Site {
id: String,
undo: UndoStack,
config: Receiver<XcConfig>,
zfs: ZfsHandle,
pub(crate) zfs: ZfsHandle,
root: Option<OsString>,
/// The dataset contains the root of the container
pub(crate) root_dataset: Option<String>,
/// The dataset where the root dataset cloned from
zfs_origin: Option<String>,
zfs_snapshots: Vec<String>,
container: Option<Receiver<ContainerManifest>>,
notify: Arc<Notify>,
pub main_notify: Option<Arc<EventFdNotify>>,
@@ -88,7 +98,9 @@ impl Site {
config,
zfs: ZfsHandle::default(),
root: None,
root_dataset: None,
zfs_origin: None,
zfs_snapshots: Vec::new(),
container: None,
notify: Arc::new(Notify::new()),
main_notify: None,
@@ -110,6 +122,108 @@ impl Site {
}
}
/// Clone and promote a snapshot to a dataset, any previous snapshot will become snapshot of
/// the promoted dataset
///
/// # Arugments
///
/// `tag`: The snapshot to promote
/// `dst_dataset`: The dataset which the promoted snapshot should become
///
pub fn promote_snapshot(
&mut self,
tag: &str,
dst_dataset: impl AsRef<Path>,
) -> anyhow::Result<()> {
if let Some(root_dataset) = &self.root_dataset {
self.zfs_snapshots
.iter()
.position(|s| s == tag)
.context("no such snapshot")?;
self.zfs.clone2(&root_dataset, &tag, dst_dataset.as_ref())?;
self.zfs.promote(dst_dataset.as_ref())?;
self.zfs_snapshots.drain(..self.zfs_snapshots.len());
}
Ok(())
}
pub fn snapshot_with_generated_tag(&mut self) -> anyhow::Result<String> {
let mut tag = xc::util::gen_id();
while self.zfs_snapshots.contains(&tag) {
tag = xc::util::gen_id();
}
self.snapshot(&tag)?;
Ok(tag)
}
pub fn snapshot(&mut self, tag: &str) -> anyhow::Result<()> {
if self.zfs_snapshots.contains(&String::from(tag)) {
bail!("duplicate tag");
}
if let Some(root_dataset) = &self.root_dataset {
self.zfs.snapshot2(&root_dataset.to_string(), tag)?;
self.zfs_snapshots.push(tag.to_string())
}
Ok(())
}
pub fn commit_to_file(
&mut self,
start_tag: &str,
end_tag: &str,
file: File,
) -> anyhow::Result<(OciDigest, OciDigest)> {
let Some(root_dataset) = &self.root_dataset else {
bail!("container is not backed by zfs");
};
let mut contains_start_tag = false;
let mut contains_end_tag = false;
for tag in self.zfs_snapshots.iter() {
if tag == start_tag {
contains_start_tag = true;
} else if tag == end_tag {
if !contains_start_tag {
bail!("end tag detected before start tag");
}
contains_end_tag = true;
}
}
if !contains_start_tag {
bail!("no such start tag");
}
if !contains_end_tag {
bail!("no such end tag");
}
let output = std::process::Command::new("ocitar")
.arg("-cf-")
.arg("--write-to-stderr")
.stdout(file)
.arg("--compression")
.arg("zstd")
.stderr(Stdio::piped())
.arg("--zfs-diff")
.arg(format!("{root_dataset}@{start_tag}"))
.arg(format!("{root_dataset}@{end_tag}"))
.output()
.context("cannot spawn ocitar")?;
let (diff_id, digest) = {
let mut results = std::str::from_utf8(&output.stderr).unwrap().trim().lines();
let diff_id = results.next().expect("unexpceted output");
let digest = results.next().expect("unexpected output");
(
OciDigest::from_str(diff_id).unwrap(),
OciDigest::from_str(digest).unwrap(),
)
};
Ok((diff_id, digest))
}
pub fn unwind(&mut self) -> anyhow::Result<()> {
self.undo.pop_all().context("failure on undo")?;
self.state = SiteState::Terminated;
@@ -259,11 +373,8 @@ impl Site {
pub fn stage(&mut self, oci_config: &JailImage) -> anyhow::Result<()> {
if let SiteState::Empty = self.state {
guard!(self, {
let (root, zfs_origin) = self
.create_rootfs(oci_config)
self.create_rootfs(oci_config)
.context("cannot create root file system")?;
self.root = Some(root);
self.zfs_origin = zfs_origin;
self.state = SiteState::RootFsOnly;
Ok(())
})
@@ -272,7 +383,7 @@ impl Site {
}
}
fn create_rootfs(&mut self, image: &JailImage) -> anyhow::Result<(OsString, Option<String>)> {
fn create_rootfs(&mut self, image: &JailImage) -> anyhow::Result<()> {
let config = self.config.borrow().clone();
let image_dataset = config.image_dataset;
let container_dataset = config.container_dataset;
@@ -298,12 +409,19 @@ impl Site {
.context("while cloning dataset for container")?;
}
}
let mount_point = self
.zfs
.mount_point(dest_dataset.clone())
.with_context(|| format!("cannot get mount point for {dest_dataset}"))?
.with_context(|| format!("dataset {dest_dataset} does not have a mount point"))?
.into_os_string();
Ok((mount_point, zfs_origin))
self.root_dataset = Some(dest_dataset);
self.root = Some(mount_point);
self.zfs_origin = zfs_origin;
self.snapshot("init").context("fail on initial snapshot")?;
Ok(())
}
}