diff --git a/xcd/src/image/mod.rs b/xcd/src/image/mod.rs new file mode 100644 index 0000000..bfb0a74 --- /dev/null +++ b/xcd/src/image/mod.rs @@ -0,0 +1,596 @@ +// Copyright (c) 2023 Yan Ka, Chiu. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions, and the following disclaimer, +// without modification, immediately at the beginning of the file. +// 2. The name of the author may not be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +// OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +// OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +// SUCH DAMAGE. +pub mod pull; +pub mod push; + +use crate::registry::*; +use crate::task::*; +use self::pull::*; +use self::push::*; + +use freebsd::fs::zfs::{ZfsError, ZfsHandle}; +use oci_util::digest::{DigestAlgorithm, Hasher, OciDigest}; +use oci_util::distribution::client::*; +use oci_util::layer::ChainId; +use oci_util::models::Descriptor; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::str::FromStr; +use std::sync::Arc; +use thiserror::Error; +use tokio::sync::watch::Receiver; +use tokio::sync::Mutex; +use tracing::{debug, info}; +use xc::image_store::sqlite::SqliteImageStore; +use xc::image_store::{DiffIdMap, ImageRecord, ImageStore, ImageStoreError}; +use xc::models::jail_image::JailImage; +use xc::tasks::{DownloadLayerStatus, ImportImageState, ImportImageStatus}; + +struct DiffMap { + diff_id: OciDigest, + descriptor: Descriptor, +} + +/// Shared environment accessible to workers +#[derive(Clone)] +pub(self) struct SharedContext { + image_store: Arc>>, + registries: Arc>>, + image_dataset: PathBuf, + layers_dir: PathBuf, +} + +impl SharedContext { + fn new( + image_store: Arc>>, + image_dataset: impl AsRef, + layers_dir: impl AsRef, + registries: Arc>>, + ) -> SharedContext { + SharedContext { + image_store, + image_dataset: image_dataset.as_ref().to_path_buf(), + layers_dir: layers_dir.as_ref().to_path_buf(), + registries, + } + } +} + +pub struct ImageManager { + context: SharedContext, + layers: NotificationStore, + rootfs: NotificationStore, + images: NotificationStore, + push_image: NotificationStore, +} + +impl ImageManager { + pub fn new( + image_store: Arc>>, + image_dataset: impl AsRef, + layers_dir: impl AsRef, + registries: Arc>>, + ) -> ImageManager { + let shared_context = SharedContext::new(image_store, image_dataset, layers_dir, registries); + ImageManager { + layers: NotificationStore::new(shared_context.clone()), + rootfs: NotificationStore::new(shared_context.clone()), + images: NotificationStore::new(shared_context.clone()), + push_image: NotificationStore::new(shared_context.clone()), + context: shared_context, + } + } + + pub async fn register_and_tag_manifest( + &self, + name: &str, + tag: &str, + manifest: &JailImage, + ) -> Result { + self.context + .image_store + .lock() + .await + .register_and_tag_manifest(name, tag, manifest) + } + + pub async fn query_manifest( + &self, + name: &str, + tag: &str, + ) -> Result { + self.context + .image_store + .lock() + .await + .query_manifest(name, tag) + } + + pub async fn query_tags(&self, name: &str) -> Result, ImageStoreError> { + self.context.image_store.lock().await.list_all_tags(name) + } + + pub async fn list_all_tagged(&self) -> Result, ImageStoreError> { + self.context.image_store.lock().await.list_all_tagged() + } + + #[allow(dead_code)] + pub async fn query_records_using_commit( + &self, + commit_id: &str, + ) -> Result, ImageStoreError> { + self.context + .image_store + .lock() + .await + .query_records_using_commit(commit_id) + } + + pub async fn map_diff_id( + &self, + diff_id: &OciDigest, + archive: &OciDigest, + content_type: &str, + ) -> Result<(), ImageStoreError> { + self.context + .image_store + .lock() + .await + .map_diff_id(diff_id, archive, content_type) + } + + #[allow(dead_code)] + pub async fn associate_commit_manifest( + &self, + commit_id: &str, + manifest: &JailImage, + ) -> Result<(), ImageStoreError> { + self.context + .image_store + .lock() + .await + .associate_commit_manifest(commit_id, manifest) + } + + pub async fn query_archives( + &self, + diff_id: &OciDigest, + ) -> Result, ImageStoreError> { + self.context + .image_store + .lock() + .await + .query_archives(diff_id) + } + + pub fn get_upload_state(&mut self, id: &str) -> PushImageStatusDesc { + match self.push_image.get(&id.to_string()) { + None => PushImageStatusDesc::default(), + Some(value) => { + let mut status = value.borrow().last_state.clone(); + status.fault = value.borrow().fault(); + status.to_desc() + } + } + } + + /// Given an id representing a import image task, try query its download status. If the + /// task is on-going, walk through the sub-task required by this download task and report + /// the status of each + /// + pub fn get_download_state(&mut self, id: &str) -> ImportImageStatus { + match self.images.get(&id.to_string()) { + None => ImportImageStatus::unavailable(), + Some(value) => { + // If the value is in the images notification store, this means we have + // ended the end state of the state machine, which means either downloaded + // the image, or we encountered a fault + let completed = value.borrow().is_completed(); + let val = value.borrow().last_state.clone(); + + let mut status = ImportImageStatus { + manifest: val.manifest, + config: val.config, + fault: value.borrow().fault(), + ..ImportImageStatus::default() + }; + + let state = if completed { + ImportImageState::Done + } else if status.config.is_some() { + let mut dls = Vec::new(); + for desc in status.manifest.clone().unwrap().layers.iter() { + if let Some(c) = self.layers.get(&desc.digest) { + let t = c.borrow().is_completed(); + let v = c.borrow().last_state.clone(); + if !t { + dls.push(DownloadLayerStatus { + digest: desc.digest.clone(), + downloaded: v.written, + total: Some(v.total), + }) + } + } + } + if dls.is_empty() { + if completed { + ImportImageState::Done + } else { + ImportImageState::ExtractLayers + } + } else { + status.layers = Some(dls); + ImportImageState::DownloadLayers + } + } else if status.manifest.is_some() { + ImportImageState::DownloadConfig + } else { + ImportImageState::DownloadManifest + }; + + status.state = state; + status + } + } + } + + fn get_layer( + &mut self, + mut session: Session, + descriptor: Descriptor, + ) -> Receiver> { + let digest = descriptor.digest.clone(); + if let Some(rx) = self.layers.get(&descriptor.digest) { + rx + } else { + let (mut emitter, rx) = self.layers.register(&descriptor.digest); + let context = self.context.clone(); + + if !emitter.is_completed() { + _ = emitter.use_try(|state| { + state.total = descriptor.size; + Ok(()) + }); + + tokio::spawn(async move { + let mut emitter = emitter; + let target_path = { + let mut parent = context.layers_dir.clone(); + parent.push(format!("{digest}")); + parent + }; + + let in_progress_path = { + let mut parent = context.layers_dir.clone(); + parent.push(format!("{digest}.progress")); + parent + }; + + macro_rules! use_state { + ($f:expr) => {{ + let res = emitter.use_try($f); + if res.is_err() { + return; + } + res.unwrap() + }}; + } + + if let Ok(mut response) = session.fetch_blob(&digest).await { + let in_progress_path_string = + in_progress_path.to_string_lossy().to_string(); + let mut hasher = Hasher::new(digest.algorithm()); + + let (cat, format) = if descriptor.media_type.ends_with("gzip") { + ("gzcat", "gzip") + } else if descriptor.media_type.ends_with("zstd") { + ("zstdcat", "zstd") + } else { + ("cat", "plain") + }; + + let shell_script = + format!("tee {in_progress_path_string} | {cat} - | sha256 -q"); + + let mut helper = Command::new("sh") + .arg("-c") + .arg(shell_script) + .stdout(Stdio::piped()) + .stdin(Stdio::piped()) + .spawn() + .expect("cannot spawn sh helper"); + + let stdin = helper.stdin.as_mut().unwrap(); + + while let Ok(Some(chunk)) = response.chunk().await { + hasher.update(&chunk); + stdin.write_all(&chunk).unwrap(); + // file.write_all(&chunk).unwrap(); + use_state!(|state| { + state.written += chunk.len(); + Ok(()) + }); + } + + let output = helper.wait_with_output().unwrap(); + + let diff_id = { + let string = std::str::from_utf8(&output.stdout).unwrap().trim(); + format!("sha256:{string}") + }; + + info!("get_layer: dffid={diff_id}"); + let digest = hasher.finalize(); + let diff_id = OciDigest::from_str(&diff_id).unwrap(); + + context + .image_store + .lock() + .await + .map_diff_id(&diff_id, &digest, format) + .unwrap(); + + use_state!(|_| std::fs::rename(&in_progress_path, &target_path) + .map_err(|_| "failed to mv file".to_string())); + emitter.set_completed() + } + }); + } + + rx + } + } + + fn stage_root( + &mut self, + session: &Session, + chain_id: &ChainId, + diff_maps: &[DiffMap], // descriptors: &[Descriptor], + ) -> Receiver> { + if let Some(rx) = self.rootfs.get(chain_id) { + rx + } else { + let (mut emitter, rx) = self.rootfs.register(chain_id); + + if !emitter.is_completed() { + let dataset = self.context.image_dataset.clone(); + let existing = zfs_list_chain_ids(dataset); + + if existing.contains(chain_id) { + emitter.set_completed(); + return rx; + } + + let recipe = RootFsRecipe::resolve(&existing, diff_maps); + + let mut layers = Vec::with_capacity(recipe.digests.len()); + + for digest in recipe.digests.iter() { + let layer = self.get_layer(session.clone(), digest.clone()); + if !(*layer.borrow()).is_completed() { + layers.push(layer); + } + } + + let context = self.context.clone(); + + tokio::spawn(async move { + let mut emitter = emitter; + let n2 = layers + .iter() + .map(|r| r.borrow().notify.clone()) + .collect::>(); + + let notifies = n2.iter().map(|r| r.notified()); + futures::future::join_all(notifies).await; + + let faults = layers.iter().filter_map(|s| (*s.borrow()).fault()); + if let Some(reason) = faults.reduce(|a, b| format!("{a}\n{b}")) { + emitter.set_faulted(&reason); + } else if let Err(reason) = recipe + .stage_layers_assume_existed(&context.image_dataset, &context.layers_dir) + .await + { + emitter.set_faulted(&format!("{reason:?}")); + } else { + emitter.set_completed(); + } + }); + } + rx + } + } +} + +/// The source dataset to be cloned from following by the extraction of layers to create the +/// desired rootfs +#[derive(Clone, Debug)] +pub struct RootFsRecipe { + chain_id: ChainId, + /// The dataset to be cloned + source: Option, + /// The filesystem layers to be extracted at the cloned root + digests: Vec, // digests: Vec, + + diff_ids: Vec, +} + +/// Given a dataset, get all the children datasets that have their name conform to the OCI chain id +/// format +fn zfs_list_chain_ids(dataset: impl AsRef) -> Vec { + let handle = ZfsHandle::default(); + let mut chain_ids = Vec::new(); + for path in handle.list_direct_children(dataset).unwrap().iter() { + let name = path.file_name().map(|n| n.to_string_lossy().to_string()); + if let Some(chain_id) = name.and_then(|n| n.parse::().ok()) { + chain_ids.push(chain_id); + } + } + chain_ids +} + +impl RootFsRecipe { + fn resolve(existed: &[ChainId], diff_id_maps: &[DiffMap]) -> RootFsRecipe { + if diff_id_maps.is_empty() { + panic!() + } + + let algorithm = DigestAlgorithm::Sha256; + let mut ancestors = Vec::with_capacity(diff_id_maps.len()); + let mut chain_id = oci_util::layer::ChainId::new(&diff_id_maps[0].diff_id); + + // for each layers[..i], calculate the chain_id + { + ancestors.push(chain_id.clone()); + for diff_id_map in &diff_id_maps[1..] { + chain_id.consume_diff_id(algorithm, &diff_id_map.diff_id); + ancestors.push(chain_id.clone()); + } + } + + // from the ancestors vector, find the first item from the end of the vector that already + // already exist in `datasets` + { + for (i, id) in ancestors.iter().enumerate().rev() { + if existed.contains(id) { + return RootFsRecipe { + chain_id, + source: Some(id.clone()), + diff_ids: diff_id_maps + .iter() + .map(|di| di.diff_id.clone()) + .collect::>(), + digests: if i + 1 == diff_id_maps.len() { + Vec::new() + } else { + diff_id_maps[i + 1..] + .iter() + .map(|di| di.descriptor.clone()) + .collect::>() + }, + }; + } + } + } + + RootFsRecipe { + chain_id, + source: None, + digests: diff_id_maps + .iter() + .map(|di| di.descriptor.clone()) + .collect::>(), + diff_ids: diff_id_maps + .iter() + .map(|di| di.diff_id.clone()) + .collect::>(), + } + } + + /// + /// # Arguments + /// * `dataset`: The ZFS dataset for images + /// * `layers_dir`: The directory that contains all the layer diff files + pub async fn stage_layers_assume_existed( + &self, + dataset: impl AsRef, + layers_dir: impl AsRef, + ) -> Result<(), StageLayerError> { + let handle = ZfsHandle::default(); + let dataset = dataset.as_ref().to_path_buf(); + + // dataset to contain this chain + let target_dataset = { + let mut target = dataset.clone(); + target.push(self.chain_id.as_str()); + target + }; + + if let Some(id) = &self.source { + debug!(id = id.as_str(), "cloning from ancestor dataset"); + let mut source_dataset = dataset; + source_dataset.push(id.as_str()); + if !handle.exists(&source_dataset) { + return Err(StageLayerError::SourceDatasetNotFound(id.clone())); + } + handle.clone2(&source_dataset, "xc", &target_dataset)?; + } else { + debug!("creating new dataset as no ancestors found"); + handle.create2(&target_dataset, false, false)?; + } + + let layers = self + .diff_ids + .iter() + .fold(String::new(), |a, b| format!("{a},{b}")); + // let diff_ids = self.diff_ids.iter().reduce(|a, b| format!("{a},{b}")).unwrap_or_else(String::new); + + // at this point, our datase should exist + handle.set_prop(&target_dataset, "xc:chain_id", self.chain_id.as_str())?; + + handle.set_prop(&target_dataset, "xc:layers", &layers)?; + + let root = handle + .mount_point(&target_dataset)? + .ok_or(StageLayerError::NoMountPoint)?; + debug!( + root = root.to_string_lossy().to_string(), + "begin to extract layers" + ); + + for digest in self.digests.iter() { + let mut file = layers_dir.as_ref().to_path_buf(); + file.push(digest.digest.as_str()); + let file_path = file.to_string_lossy().to_string(); + debug!(file_path, "extracting"); + _ = tokio::process::Command::new("ocitar") + .arg("-xf") + .arg(&file) + .arg("-C") + .arg(&root) + .status() + .await; + debug!(file_path, "finished"); + } + handle.snapshot2(target_dataset, "xc")?; + Ok(()) + } +} + +#[derive(Error, Debug)] +pub enum StageLayerError { + #[error("Cannot expected source dataset to clone from. chain_id: {0}")] + SourceDatasetNotFound(ChainId), + #[error("Error on ZFS operation: {0}")] + ZfsError(ZfsError), + #[error("dataset has no mountpoint")] + NoMountPoint, +} + +impl From for StageLayerError { + fn from(e: ZfsError) -> Self { + Self::ZfsError(e) + } +}