diff --git a/Cargo.lock b/Cargo.lock index 115a36bc..06b29ac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,6 +237,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ambient-authority" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" + [[package]] name = "anes" version = "0.1.6" @@ -410,6 +416,36 @@ dependencies = [ "libbz2-rs-sys", ] +[[package]] +name = "cap-primitives" +version = "3.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cf3aea8a5081171859ef57bc1606b1df6999df4f1110f8eef68b30098d1d3a" +dependencies = [ + "ambient-authority", + "fs-set-times", + "io-extras", + "io-lifetimes", + "ipnet", + "maybe-owned", + "rustix", + "rustix-linux-procfs", + "windows-sys 0.52.0", + "winx", +] + +[[package]] +name = "cap-std" +version = "3.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6dc3090992a735d23219de5c204927163d922f42f575a0189b005c62d37549a" +dependencies = [ + "cap-primitives", + "io-extras", + "io-lifetimes", + "rustix", +] + [[package]] name = "cast" version = "0.3.0" @@ -851,6 +887,17 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-set-times" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a" +dependencies = [ + "io-lifetimes", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "futures-core" version = "0.3.32" @@ -1002,6 +1049,8 @@ dependencies = [ "futures-core", "futures-util", "harmonia-file-core", + "harmonia-file-fd", + "harmonia-file-io-pure", "harmonia-file-nar", "harmonia-store-db", "harmonia-store-derivation", @@ -1065,10 +1114,34 @@ dependencies = [ name = "harmonia-file-core" version = "3.1.0" dependencies = [ - "harmonia-utils-test", "serde", "serde_json", +] + +[[package]] +name = "harmonia-file-fd" +version = "3.1.0" +dependencies = [ + "cap-std", + "futures-core", + "harmonia-file-io-pure", + "nix", + "tempfile", "thiserror", + "tokio", +] + +[[package]] +name = "harmonia-file-io-pure" +version = "3.1.0" +dependencies = [ + "futures-core", + "futures-util", + "harmonia-file-core", + "serde", + "serde_json", + "thiserror", + "tokio", ] [[package]] @@ -1082,6 +1155,8 @@ dependencies = [ "futures-sink", "futures-util", "harmonia-file-core", + "harmonia-file-fd", + "harmonia-file-io-pure", "harmonia-utils-io", "harmonia-utils-test", "nix", @@ -1096,7 +1171,6 @@ dependencies = [ "tokio-test", "tokio-util", "tracing", - "walkdir", ] [[package]] @@ -1601,6 +1675,28 @@ dependencies = [ "hashbrown 0.17.0", ] +[[package]] +name = "io-extras" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65" +dependencies = [ + "io-lifetimes", + "windows-sys 0.52.0", +] + +[[package]] +name = "io-lifetimes" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06432fb54d3be7964ecd3649233cddf80db2832f47fec34c01f65b3d9d774983" + +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "itertools" version = "0.13.0" @@ -1713,6 +1809,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "maybe-owned" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" + [[package]] name = "md5" version = "0.8.0" @@ -2242,6 +2344,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustix-linux-procfs" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc84bf7e9aa16c4f2c758f27412dc9841341e16aa682d9c7ac308fe3ee12056" +dependencies = [ + "once_cell", + "rustix", +] + [[package]] name = "rustls" version = "0.23.40" @@ -3121,6 +3233,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winx" +version = "0.36.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" +dependencies = [ + "bitflags", + "windows-sys 0.52.0", +] + [[package]] name = "wit-bindgen" version = "0.57.1" diff --git a/Cargo.toml b/Cargo.toml index a990a29b..a48d8373 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ members = [ "harmonia-store-remote", "harmonia-store-aterm", "harmonia-file-core", + "harmonia-file-fd", + "harmonia-file-io-pure", "harmonia-file-nar", "harmonia-protocol", "harmonia-protocol-derive", @@ -45,6 +47,7 @@ async-stream = "0.3" blake3 = "1.8" bstr = "1.11" bytes = "1.11" +cap-std = "3" data-encoding = "2.11" derive_more = { version = "2.1", features = [ "display" ] } ed25519-dalek = "2" @@ -98,6 +101,8 @@ tokio-test = "0.4" # Internal crates harmonia-file-core = { path = "harmonia-file-core" } +harmonia-file-fd = { path = "harmonia-file-fd" } +harmonia-file-io-pure = { path = "harmonia-file-io-pure" } harmonia-file-nar = { path = "harmonia-file-nar" } harmonia-protocol = { path = "harmonia-protocol" } harmonia-protocol-derive = { path = "harmonia-protocol-derive" } diff --git a/docs/architecture/harmonia-store-structure.md b/docs/architecture/harmonia-store-structure.md index 76b5b18c..a5415b96 100644 --- a/docs/architecture/harmonia-store-structure.md +++ b/docs/architecture/harmonia-store-structure.md @@ -22,8 +22,8 @@ composed. ┌────────────────────────────┬─────────────────────────┐ │ Format / File │ Database │ │ harmonia-file-nar │ harmonia-store-db │ -│ harmonia-file-core │ │ -│ NAR pack/unpack, types │ SQLite store metadata │ +│ harmonia-file-core/io/fd │ │ +│ NAR pack/unpack, file I/O │ SQLite store metadata │ └────────────────────────────┴─────────────────────────┘ ↓ ┌──────────────────────────────────────────────────────┐ @@ -51,7 +51,9 @@ composed. | [harmonia-store-aterm](../../harmonia-store-aterm/) | ATerm derivation parser | | [harmonia-store-path-info](../../harmonia-store-path-info/) | ValidPathInfo types (pure) | | [harmonia-store-db](../../harmonia-store-db/README.md) | SQLite store metadata | -| [harmonia-file-core](../../harmonia-file-core/) | File tree types and serde (pure) | +| [harmonia-file-core](../../harmonia-file-core/) | File tree data types and serde | +| [harmonia-file-io-pure](../../harmonia-file-io-pure/) | Async IO traits, in-memory impls, listing | +| [harmonia-file-fd](../../harmonia-file-fd/) | Filesystem source/sink via cap-std | | [harmonia-file-nar](../../harmonia-file-nar/README.md) | NAR archive format | | [harmonia-protocol](../../harmonia-protocol/README.md) | Daemon wire protocol | | [harmonia-protocol-derive](../../harmonia-protocol-derive/README.md) | Derive macros for protocol types | @@ -100,16 +102,20 @@ graph BT end subgraph "File" file-core + file-fd + file-io-pure file-nar end bench client ssh-store - file-nar --> file-core - file-nar --> utils-io + file-io-pure --> file-core utils-hash --> utils-base-encoding utils-signature --> utils-base-encoding + file-fd --> file-io-pure store-path --> utils-hash + file-nar --> file-fd + file-nar --> utils-io store-content-address --> store-path store-ref-scan --> store-path store-derivation --> store-content-address @@ -155,9 +161,11 @@ intra-workspace dependencies. - `harmonia-store-derivation`: derivations, derived paths, placeholders, realisations — depends on `harmonia-store-content-address`. - `harmonia-store-aterm`, `harmonia-store-path-info`, `harmonia-store-build-result`, `harmonia-store-nar-info`: format/metadata crates kept separate to avoid coupling. -**File** (`harmonia-file-core`, `harmonia-file-nar`) -- `harmonia-file-core`: pure file tree types and serde matching nix's JSON format. -- `harmonia-file-nar`: NAR pack/unpack against generic `AsyncRead`/`AsyncWrite`. Streaming; never requires the full input in memory. Knows nothing about derivations or signatures. +**File** (`harmonia-file-core`, `harmonia-file-io-pure`, `harmonia-file-fd`, `harmonia-file-nar`) +- `harmonia-file-core`: pure file tree data types (`FileTree`, `FileSystemObject`), serde matching nix's JSON format. +- `harmonia-file-io-pure`: async IO traits (`FileSystemSource`/`FileSystemSink`), in-memory implementations, listing functions. +- `harmonia-file-fd`: real filesystem source/sink via `cap-std` (openat, no symlink following). +- `harmonia-file-nar`: NAR pack/unpack against generic `FileSystemSource`/`FileSystemSink` traits. Streaming; never requires the full input in memory. Knows nothing about derivations or signatures. **Database** (`harmonia-store-db`) - Schema matches Nix's `db.sqlite` exactly. diff --git a/harmonia-cache/Cargo.toml b/harmonia-cache/Cargo.toml index cf5e9bc1..27e7309e 100644 --- a/harmonia-cache/Cargo.toml +++ b/harmonia-cache/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true license.workspace = true homepage.workspace = true repository.workspace = true +readme = "README.md" [[bin]] name = "harmonia-cache" @@ -42,6 +43,8 @@ bytes = { workspace = true } futures-core = { workspace = true } futures-util = { workspace = true } harmonia-file-core = { workspace = true } +harmonia-file-fd = { workspace = true } +harmonia-file-io-pure = { workspace = true } harmonia-file-nar = { workspace = true } harmonia-store-db = { workspace = true } harmonia-store-derivation = { workspace = true } diff --git a/harmonia-cache/src/error.rs b/harmonia-cache/src/error.rs index b6396e41..c18f1e3d 100644 --- a/harmonia-cache/src/error.rs +++ b/harmonia-cache/src/error.rs @@ -71,6 +71,7 @@ pub enum ServeError { }, #[error("Access denied: {path}")] + #[allow(dead_code)] AccessDenied { path: String }, } diff --git a/harmonia-cache/src/narlist.rs b/harmonia-cache/src/narlist.rs index ee4cc080..7f3e0e67 100644 --- a/harmonia-cache/src/narlist.rs +++ b/harmonia-cache/src/narlist.rs @@ -1,139 +1,33 @@ use crate::ServerResult; -use crate::error::{CacheError, IoErrorContext, Result, ServeError}; +use crate::error::{CacheError, ServeError}; use actix_web::{HttpResponse, http, web}; -use harmonia_file_core::{Directory, FileSystemObject, FileTree, Regular, Symlink}; -use harmonia_file_nar::NarFileInfo; +use harmonia_file_core::FileTree; +use harmonia_file_fd::DirSource; +use harmonia_file_io_pure::{Stat, list_deep}; use serde::Serialize; -use std::fs::Metadata; -use std::os::unix::fs::PermissionsExt; -use std::path::Path; use crate::config::Config; use crate::{cache_control_max_age_1y, nixhash, some_or_404}; use std::path::PathBuf; -use tokio::fs::symlink_metadata; #[derive(Debug, Serialize)] struct NarList { version: u16, - root: FileTree, + root: FileTree, } -fn file_entry(metadata: Metadata) -> FileTree { - FileTree(FileSystemObject::Regular(Regular { - executable: metadata.permissions().mode() & 0o111 != 0, - contents: NarFileInfo { - size: metadata.len(), - nar_offset: None, - }, - })) -} - -async fn symlink_entry(path: &Path) -> Result> { - let target = tokio::fs::read_link(&path) +async fn get_nar_list(path: PathBuf) -> crate::error::Result { + let source = DirSource::open_path(&path) .await - .io_context(format!("Failed to read link {}", path.display()))?; - Ok(FileTree(FileSystemObject::Symlink(Symlink { - target: target.to_string_lossy().into_owned(), - }))) -} - -struct Frame { - path: PathBuf, - entries: std::collections::BTreeMap>>, - dir_entry: tokio::fs::ReadDir, -} - -async fn get_nar_list(path: PathBuf) -> Result { - let st = symlink_metadata(&path).await.io_context(format!( - "Failed to get symlink metadata for {}", - path.display() - ))?; - - let file_type = st.file_type(); - let root = if file_type.is_file() { - file_entry(st) - } else if file_type.is_symlink() { - symlink_entry(&path).await? - } else if file_type.is_dir() { - let dir_entry = tokio::fs::read_dir(&path) - .await - .io_context(format!("Failed to read directory {}", path.display()))?; - let mut stack = vec![Frame { - path, - entries: std::collections::BTreeMap::new(), - dir_entry, - }]; - - let mut root: Option> = None; - - while let Some(frame) = stack.last_mut() { - if let Some(entry) = frame - .dir_entry - .next_entry() - .await - .io_context("Failed to read next directory entry")? - { - let name = entry.file_name().to_string_lossy().into_owned(); - let entry_path = entry.path(); - let entry_st = symlink_metadata(&entry_path).await.io_context(format!( - "Failed to get metadata for {}", - entry_path.display() - ))?; - let entry_file_type = entry_st.file_type(); - - if entry_file_type.is_file() { - frame.entries.insert(name, Box::new(file_entry(entry_st))); - } else if entry_file_type.is_symlink() { - frame - .entries - .insert(name, Box::new(symlink_entry(&entry_path).await?)); - } else if entry_file_type.is_dir() { - let dir_entry = tokio::fs::read_dir(&entry_path) - .await - .io_context(format!("Failed to read directory {}", entry_path.display()))?; - stack.push(Frame { - path: entry_path, - entries: std::collections::BTreeMap::new(), - dir_entry, - }); - } - } else { - let frame = stack - .pop() - .expect("stack should not be empty inside loop iteration"); - let dir_tree = FileTree(FileSystemObject::Directory(Directory { - entries: frame.entries, - })); - if let Some(parent) = stack.last_mut() { - let name = match frame.path.file_name() { - Some(name) => name.to_string_lossy().into_owned(), - None => { - return Err(ServeError::AccessDenied { - path: frame.path.display().to_string(), - } - .into()); - } - }; - parent.entries.insert(name, Box::new(dir_tree)); - } else { - root = Some(dir_tree); - } - } - } - - root.expect("root should be set after processing directory stack") - } else { - return Err(ServeError::ServeFailed { - source: std::io::Error::other(format!( - "Unsupported file type for path: {}", - path.display() - )), - } - .into()); - }; - + .map_err(|e| ServeError::ServeFailed { + source: std::io::Error::other(format!("Failed to open {}: {e}", path.display())), + })?; + let root = list_deep(&source) + .await + .map_err(|e| ServeError::ServeFailed { + source: std::io::Error::other(e.to_string()), + })?; Ok(NarList { version: 1, root }) } @@ -154,7 +48,9 @@ pub(crate) async fn get(hash: web::Path, settings: web::Data) -> #[cfg(test)] mod test { use super::*; + use crate::error::{IoErrorContext, Result}; use std::fs; + use std::os::unix::fs::PermissionsExt; use std::process::Command; #[tokio::test] @@ -237,6 +133,13 @@ mod test { assert_eq!(ours_normalized, reference_normalized); + // Also parse nix's own `nar ls --json`, not just produce it. + let parsed = serde_json::from_slice::>(&res2.stdout) + .expect("harmonia should parse nix's nar ls --json output"); + let mut reparsed = serde_json::to_value(&parsed).unwrap(); + normalize(&mut reparsed); + assert_eq!(reparsed, reference_normalized); + Ok(()) } } diff --git a/harmonia-file-core/Cargo.toml b/harmonia-file-core/Cargo.toml index 023d2ebd..57226e2f 100644 --- a/harmonia-file-core/Cargo.toml +++ b/harmonia-file-core/Cargo.toml @@ -9,14 +9,3 @@ repository.workspace = true [dependencies] serde.workspace = true serde_json.workspace = true -thiserror.workspace = true - -[dev-dependencies] -harmonia-utils-test = { workspace = true } - -[[test]] -name = "json_upstream" -path = "tests/json_upstream/mod.rs" - -[lints] -workspace = true diff --git a/harmonia-file-core/README.md b/harmonia-file-core/README.md index 31f0129d..f641bbae 100644 --- a/harmonia-file-core/README.md +++ b/harmonia-file-core/README.md @@ -11,7 +11,6 @@ matching nix's `FileSystemObject` type hierarchy. - `FileSystemObject` — tagged enum: `Regular`, `Directory`, or `Symlink` - `FileTree` — recursive tree (newtype wrapping `FileSystemObject>>`) -- `ShallowTree` — one-level tree with `Opaque` children - `MemoryTree` — in-memory tree (`FileTree>`) ## Serde diff --git a/harmonia-file-core/src/lib.rs b/harmonia-file-core/src/lib.rs index 7438518a..2735dc97 100644 --- a/harmonia-file-core/src/lib.rs +++ b/harmonia-file-core/src/lib.rs @@ -20,12 +20,16 @@ use std::collections::BTreeMap; /// A regular file. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Regular { - #[serde(default)] + #[serde(default, skip_serializing_if = "is_false")] pub executable: bool, #[serde(flatten)] pub contents: C, } +fn is_false(b: &bool) -> bool { + !*b +} + /// A directory whose children are of type `Child`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Directory { @@ -56,10 +60,3 @@ pub struct FileTree(pub FileSystemObject>>); /// An in-memory file tree with byte-vector contents. pub type MemoryTree = FileTree>; - -/// An opaque placeholder used in shallow listings. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct Opaque; - -/// A shallow (one-level) file tree — directory children are [`Opaque`]. -pub type ShallowTree = FileSystemObject; diff --git a/harmonia-file-core/src/serde_impl.rs b/harmonia-file-core/src/serde_impl.rs index 6f50d8f8..a45089f8 100644 --- a/harmonia-file-core/src/serde_impl.rs +++ b/harmonia-file-core/src/serde_impl.rs @@ -3,25 +3,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use crate::{FileSystemObject, FileTree, Opaque}; - -// --------------------------------------------------------------------------- -// Opaque — serializes as `{}` -// --------------------------------------------------------------------------- - -impl Serialize for Opaque { - fn serialize(&self, serializer: S) -> Result { - use serde::ser::SerializeMap; - serializer.serialize_map(Some(0))?.end() - } -} - -impl<'de> Deserialize<'de> for Opaque { - fn deserialize>(deserializer: D) -> Result { - let _ = >::deserialize(deserializer)?; - Ok(Opaque) - } -} +use crate::{FileSystemObject, FileTree}; // --------------------------------------------------------------------------- // FileTree — newtype delegates to FileSystemObject @@ -35,8 +17,7 @@ impl Serialize for FileTree { impl<'de, C: DeserializeOwned> Deserialize<'de> for FileTree { fn deserialize>(deserializer: D) -> Result { - let inner: FileSystemObject>> = - FileSystemObject::deserialize(deserializer)?; + let inner = FileSystemObject::>>::deserialize(deserializer)?; Ok(FileTree(inner)) } } diff --git a/harmonia-file-fd/Cargo.toml b/harmonia-file-fd/Cargo.toml new file mode 100644 index 00000000..79860a42 --- /dev/null +++ b/harmonia-file-fd/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "harmonia-file-fd" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Filesystem-backed FileSystemSource and FileSystemSink via cap-std" + +[dependencies] +cap-std = { workspace = true } +futures-core = { workspace = true } +harmonia-file-io-pure = { workspace = true } +nix = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } + +[lints] +workspace = true diff --git a/harmonia-file-fd/README.md b/harmonia-file-fd/README.md new file mode 100644 index 00000000..bf785fd6 --- /dev/null +++ b/harmonia-file-fd/README.md @@ -0,0 +1,25 @@ +# harmonia-file-fd + +Filesystem-backed `FileSystemSource` and `FileSystemSink` via this crate's async `cap-std` wrapper. + +## Overview + +This crate provides capability-based async filesystem access through +the `harmonia-file-io-pure` traits. All navigation uses `openat`/`fstatat` +syscalls — no path assembly and no symlink following on intermediate +components. + +## Read side (`DirSource`) + +`DirSource` wraps a `cap_tokio::fs::Dir` and implements `FileSystemSource`. +`entries()` collects and sorts entry names + metadata but does NOT open +child directory handles — each child is a lazy thunk that opens on demand. + +Large files (>256 KiB) are memory-mapped for zero-copy reads. + +## Write side (`DirSlotSink`) + +`DirSlotSink` implements `FileSystemSink` for a slot (parent dir + name). +`create_directory()` creates the dir and returns a `DirDirSink` for +populating children. `create_regular_file()` creates and opens the file. +`create_symlink()` creates the symlink. diff --git a/harmonia-file-fd/src/cap.rs b/harmonia-file-fd/src/cap.rs new file mode 100644 index 00000000..9611e950 --- /dev/null +++ b/harmonia-file-fd/src/cap.rs @@ -0,0 +1,128 @@ +//! Async wrapper over sync `cap-std`, offloading each operation to `spawn_blocking`. + +use std::ffi::OsString; +use std::io; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::vec::IntoIter; + +use tokio::task::spawn_blocking; + +pub use cap_std::AmbientAuthority; +pub use cap_std::ambient_authority; +pub use cap_std::fs::{Metadata, MetadataExt, OpenOptions}; + +/// Opened under a [`Dir`] capability, so a plain async [`tokio`] file suffices. +pub type File = tokio::fs::File; + +/// Runs one blocking cap-std call off the async runtime and folds any join failure back into an io error. +async fn blocking(f: F) -> io::Result +where + F: FnOnce() -> io::Result + Send + 'static, + T: Send + 'static, +{ + match spawn_blocking(f).await { + Ok(result) => result, + Err(join_error) => Err(io::Error::other(join_error)), + } +} + +#[derive(Clone, Debug)] +pub struct Dir(Arc); + +impl Dir { + /// This runs once at the root, after which all navigation stays confined to the capability it returns. + pub async fn open_ambient_dir(path: &Path, authority: AmbientAuthority) -> io::Result { + let path = path.to_owned(); + let dir = blocking(move || cap_std::fs::Dir::open_ambient_dir(&path, authority)).await?; + Ok(Self(Arc::new(dir))) + } + + /// Reads the metadata of the directory this handle points at. + pub async fn dir_metadata(&self) -> io::Result { + let dir = self.0.clone(); + blocking(move || dir.dir_metadata()).await + } + + /// Reads a child's metadata without following it when it happens to be a symlink. + pub async fn symlink_metadata(&self, name: &str) -> io::Result { + let dir = self.0.clone(); + let name = name.to_owned(); + blocking(move || dir.symlink_metadata(name)).await + } + + /// Opens a child directory relative to this one. + pub async fn open_dir(&self, name: &str) -> io::Result { + let dir = self.0.clone(); + let name = name.to_owned(); + let opened = blocking(move || dir.open_dir(name)).await?; + Ok(Self(Arc::new(opened))) + } + + /// Opens a child file for reading. + pub async fn open(&self, name: &str) -> io::Result { + let dir = self.0.clone(); + let name = name.to_owned(); + let std_file = blocking(move || dir.open(name).map(cap_std::fs::File::into_std)).await?; + Ok(File::from_std(std_file)) + } + + /// Opens a child file with whatever options the caller chose. + pub async fn open_with(&self, name: &str, options: &OpenOptions) -> io::Result { + let dir = self.0.clone(); + let name = name.to_owned(); + let options = options.clone(); + let std_file = blocking(move || { + dir.open_with(name, &options) + .map(cap_std::fs::File::into_std) + }) + .await?; + Ok(File::from_std(std_file)) + } + + /// Returns the link target verbatim, including absolute paths that point outside the capability. + pub async fn read_link(&self, name: &str) -> io::Result { + let dir = self.0.clone(); + let name = name.to_owned(); + blocking(move || dir.read_link_contents(name)).await + } + + /// Lists a directory, capturing every child name before the handle is dropped. + pub async fn read_dir(&self, name: &str) -> io::Result>> { + let dir = self.0.clone(); + let name = name.to_owned(); + blocking(move || { + let entries = dir + .read_dir(name)? + .map(|entry| entry.map(|entry| DirEntry(entry.file_name()))) + .collect::>(); + Ok(entries.into_iter()) + }) + .await + } + + /// Creates a child directory relative to this one. + pub async fn create_dir(&self, name: &str) -> io::Result<()> { + let dir = self.0.clone(); + let name = name.to_owned(); + blocking(move || dir.create_dir(name)).await + } + + /// Creates a symlink named `link` that points at `original`. + pub async fn symlink(&self, original: &str, link: &str) -> io::Result<()> { + let dir = self.0.clone(); + let original = original.to_owned(); + let link = link.to_owned(); + blocking(move || dir.symlink(original, link)).await + } +} + +/// Just the name, copied out so nothing touches [`ReadDir`](cap_std::fs::ReadDir) after [`blocking`]. +pub struct DirEntry(OsString); + +impl DirEntry { + /// This hands back an owned copy because the entry only keeps the name around. + pub fn file_name(&self) -> OsString { + self.0.clone() + } +} diff --git a/harmonia-file-fd/src/lib.rs b/harmonia-file-fd/src/lib.rs new file mode 100644 index 00000000..f4ef53eb --- /dev/null +++ b/harmonia-file-fd/src/lib.rs @@ -0,0 +1,41 @@ +//! Filesystem-backed [`FileSystemSource`] and [`FileSystemSink`] over the +//! capability-confined async [`cap`] surface. +//! +//! All navigation uses `openat`/`fstatat` syscalls — no path assembly +//! and no symlink following on intermediate components. +//! +//! # Read side ([`DirSource`]) +//! +//! [`DirSource::entries`](FileSystemSource::entries) collects and sorts +//! entry names + metadata but does NOT open child directory handles. +//! Each child is returned as a lazy `Entry` thunk — the actual `openat` +//! only happens when you call [`entries`](FileSystemSource::entries) or +//! [`open`](FileSystemSource::open) on the child. +//! +//! [`DirSource::read_file`](FileSystemSource::read_file) uses +//! memory-mapped IO for files larger than 256 KiB (see [`mmap`]). +//! +//! # Write side ([`DirSlotSink`]) +//! +//! [`DirSlotSink`] creates a node at a slot (parent dir + name). +//! [`DirDirSink`] populates a directory's children one at a time. +//! [`DirFileSink`] streams file contents via [`AsyncWrite`](tokio::io::AsyncWrite). + +pub mod cap; +pub mod mmap; +mod sink; +mod source; + +pub use sink::{DirDirSink, DirFileSink, DirSlotSink}; +pub use source::{DirSource, DirSourceEntries, DirSourceError, FileReader}; + +#[cfg(unix)] +fn is_executable(meta: &cap::Metadata) -> bool { + use cap::MetadataExt; + meta.mode() & 0o111 != 0 +} + +#[cfg(not(unix))] +fn is_executable(_meta: &cap::Metadata) -> bool { + false +} diff --git a/harmonia-file-nar/src/archive/mmap.rs b/harmonia-file-fd/src/mmap.rs similarity index 87% rename from harmonia-file-nar/src/archive/mmap.rs rename to harmonia-file-fd/src/mmap.rs index 3279b1c4..7e24b31a 100644 --- a/harmonia-file-nar/src/archive/mmap.rs +++ b/harmonia-file-fd/src/mmap.rs @@ -8,12 +8,11 @@ use std::io; use std::os::fd::AsFd; -use std::path::Path; /// A read-only memory-mapped file region. /// POSIX guarantees that mmap adds its own reference to the underlying file /// object, so the fd can be closed immediately after mapping (IEEE Std 1003.1). -pub(crate) struct MappedFile { +pub struct MappedFile { ptr: *mut std::ffi::c_void, len: usize, } @@ -30,9 +29,13 @@ pub(crate) struct MappedFile { unsafe impl Send for MappedFile {} unsafe impl Sync for MappedFile {} +/// Files up to this size are read into a heap buffer; larger files are +/// memory-mapped for zero-copy reads. +pub const MMAP_THRESHOLD: u64 = 256 * 1024; + impl MappedFile { - /// Memory-map a file for reading. Returns an empty mapping for zero-length files. - pub fn open(path: &Path, size: u64) -> io::Result { + /// Memory-map a file for reading from an open file descriptor. + pub fn from_fd(fd: &impl AsFd, size: u64) -> io::Result { if size == 0 { return Ok(Self { ptr: std::ptr::null_mut(), @@ -40,7 +43,6 @@ impl MappedFile { }); } - let file = std::fs::File::open(path)?; let len = usize::try_from(size) .map_err(|_| io::Error::other("file too large to mmap on this platform"))?; @@ -53,7 +55,7 @@ impl MappedFile { .ok_or_else(|| io::Error::other("file size is 0"))?, nix::sys::mman::ProtFlags::PROT_READ, nix::sys::mman::MapFlags::MAP_PRIVATE, - file.as_fd(), + fd.as_fd(), 0, ) } @@ -78,6 +80,14 @@ impl MappedFile { // SAFETY: ptr is valid for len bytes (from mmap), read-only, and lives until drop. unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len) } } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } } // Allow `Bytes::from_owner(MappedFile)` so the mapping can be handed out as a @@ -92,7 +102,6 @@ impl AsRef<[u8]> for MappedFile { impl Drop for MappedFile { fn drop(&mut self) { if self.len > 0 { - // SAFETY: ptr and len come from a successful mmap call. let _ = unsafe { nix::sys::mman::munmap( std::ptr::NonNull::new(self.ptr).expect("mmap returned null"), diff --git a/harmonia-file-fd/src/sink.rs b/harmonia-file-fd/src/sink.rs new file mode 100644 index 00000000..ca0c5baa --- /dev/null +++ b/harmonia-file-fd/src/sink.rs @@ -0,0 +1,97 @@ +use harmonia_file_io_pure::{DirectorySink, FileSystemSink, RegularFileSink}; + +use crate::cap::{Dir, File}; + +/// A slot in a directory where a new node will be created. +pub struct DirSlotSink { + parent: Dir, + name: String, +} + +impl DirSlotSink { + /// Create a sink for a new entry in the given directory. + pub fn new(parent: Dir, name: String) -> Self { + Self { parent, name } + } +} + +impl FileSystemSink for DirSlotSink { + type Error = std::io::Error; + type Directory = DirDirSink; + type File = DirFileSink; + + async fn create_directory(self) -> Result { + self.parent.create_dir(&self.name).await?; + let dir = self.parent.open_dir(&self.name).await?; + Ok(DirDirSink { dir }) + } + + async fn create_regular_file(self, executable: bool) -> Result { + use crate::cap::OpenOptions; + let mut opts = OpenOptions::new(); + opts.write(true).create_new(true); + let file = self.parent.open_with(&self.name, &opts).await?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mode = if executable { 0o755 } else { 0o644 }; + file.set_permissions(std::fs::Permissions::from_mode(mode)) + .await?; + } + Ok(DirFileSink { file }) + } + + async fn create_symlink(self, target: &str) -> Result<(), Self::Error> { + self.parent.symlink(target, &self.name).await + } +} + +/// An open directory being populated with children. +pub struct DirDirSink { + dir: Dir, +} + +impl DirectorySink for DirDirSink { + type Error = std::io::Error; + type Child<'a> = DirSlotSink; + + async fn create_child(&mut self, name: &str) -> Result { + Ok(DirSlotSink { + parent: self.dir.clone(), + name: name.to_owned(), + }) + } +} + +/// An open file being written. +pub struct DirFileSink { + file: File, +} + +impl tokio::io::AsyncWrite for DirFileSink { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.file).poll_write(cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.file).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.file).poll_shutdown(cx) + } +} + +impl RegularFileSink for DirFileSink { + type Error = std::io::Error; +} diff --git a/harmonia-file-fd/src/source.rs b/harmonia-file-fd/src/source.rs new file mode 100644 index 00000000..e9db13ec --- /dev/null +++ b/harmonia-file-fd/src/source.rs @@ -0,0 +1,236 @@ +use std::path::Path; + +use harmonia_file_io_pure::{FileSystemSource, FileType, Stat}; + +use crate::cap::{Dir, Metadata, ambient_authority}; +use crate::mmap; + +/// A node in the filesystem acting as a [`FileSystemSource`]. +/// +/// Navigation uses `openat`/`fstatat` syscalls — no path assembly, no +/// symlink following on intermediate components. +#[derive(Debug)] +pub struct DirSource(pub(crate) DirSourceInner); + +#[derive(Debug)] +pub(crate) enum DirSourceInner { + /// A directory — we hold an open `Dir` handle. + Dir { dir: Dir, meta: Metadata }, + /// A file or symlink — we hold the parent dir and child name. + Entry { + parent: Dir, + name: String, + meta: Metadata, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum DirSourceError { + #[error(transparent)] + Io(#[from] std::io::Error), + #[error("not a regular file")] + NotAFile, + #[error("not a symlink")] + NotASymlink, + #[error("not a directory")] + NotADirectory, + #[error("unsupported file type (not a regular file, directory, or symlink)")] + UnsupportedFileType, +} + +impl DirSource { + /// Open a directory as a source. + pub async fn open(dir: Dir) -> Result { + let meta = dir.dir_metadata().await?; + Ok(Self(DirSourceInner::Dir { dir, meta })) + } + + /// Open any store path. A directory is opened directly, whereas a file or + /// symlink is reached from its parent so a root symlink is not followed. + pub async fn open_path(path: &Path) -> Result { + if tokio::fs::symlink_metadata(path).await?.is_dir() { + let dir = Dir::open_ambient_dir(path, ambient_authority()).await?; + Self::open(dir).await + } else { + let parent = path + .parent() + .ok_or_else(|| std::io::Error::other("path has no parent directory"))?; + let name = path + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| std::io::Error::other("path has no valid file name"))?; + let dir = Dir::open_ambient_dir(parent, ambient_authority()).await?; + let parent_source = Self::open(dir).await?; + FileSystemSource::open(&parent_source, name).await + } + } + + pub(crate) fn meta(&self) -> &Metadata { + match &self.0 { + DirSourceInner::Dir { meta, .. } => meta, + DirSourceInner::Entry { meta, .. } => meta, + } + } + + /// Get or open the underlying `Dir` handle. For `Dir` variants, + /// returns the existing handle. For `Entry` variants pointing to a + /// directory, opens it from the parent. + pub(crate) async fn open_dir(&self) -> Result { + match &self.0 { + DirSourceInner::Dir { dir, .. } => Ok(dir.clone()), + DirSourceInner::Entry { parent, name, meta } if meta.is_dir() => { + Ok(parent.open_dir(name).await?) + } + _ => Err(DirSourceError::NotADirectory), + } + } +} + +/// Reader returned by [`DirSource::read_file`]. +/// +/// Small files are read via an async [`cap::File`](crate::cap::File) (buffered +/// async IO). Large files (above [`mmap::MMAP_THRESHOLD`]) are memory-mapped +/// for zero-copy reads. +pub enum FileReader { + /// Normal async file read. + File(crate::cap::File), + /// Memory-mapped file, wrapped in a cursor for `AsyncRead`. + Mmap(std::io::Cursor), +} + +impl tokio::io::AsyncRead for FileReader { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.get_mut() { + FileReader::File(f) => std::pin::Pin::new(f).poll_read(cx, buf), + FileReader::Mmap(cursor) => { + let slice = cursor.get_ref().as_slice(); + let pos = cursor.position() as usize; + let remaining = &slice[pos..]; + let n = remaining.len().min(buf.remaining()); + buf.put_slice(&remaining[..n]); + cursor.set_position((pos + n) as u64); + std::task::Poll::Ready(Ok(())) + } + } + } +} + +impl FileSystemSource for DirSource { + type Error = DirSourceError; + type Reader = FileReader; + type Child = Self; + type Entries<'a> = DirSourceEntries; + + async fn lstat(&self) -> Result { + let meta = self.meta(); + let file_type = if meta.is_dir() { + FileType::Directory + } else if meta.is_symlink() { + FileType::Symlink + } else if meta.is_file() { + FileType::Regular + } else { + return Err(DirSourceError::UnsupportedFileType); + }; + let file_size = if file_type == FileType::Regular { + Some(meta.len()) + } else { + None + }; + Ok(Stat { + file_type, + file_size, + executable: super::is_executable(meta), + }) + } + + async fn read_file(&self) -> Result { + match &self.0 { + DirSourceInner::Entry { parent, name, meta } if meta.is_file() => { + let size = meta.len(); + let file = parent.open(name).await?; + if size > mmap::MMAP_THRESHOLD { + let mapped = mmap::MappedFile::from_fd(&file, size)?; + Ok(FileReader::Mmap(std::io::Cursor::new(mapped))) + } else { + Ok(FileReader::File(file)) + } + } + _ => Err(DirSourceError::NotAFile), + } + } + + async fn read_link(&self) -> Result { + match &self.0 { + DirSourceInner::Entry { parent, name, meta } if meta.is_symlink() => { + let target = parent.read_link(name).await?; + target.into_os_string().into_string().map_err(|t| { + std::io::Error::other(format!("non-UTF-8 symlink target: {t:?}")).into() + }) + } + _ => Err(DirSourceError::NotASymlink), + } + } + + async fn entries(&self) -> Result, Self::Error> { + let dir = self.open_dir().await?; + let read_dir = dir.read_dir(".").await?; + let mut names = Vec::new(); + for entry in read_dir { + let entry = entry?; + names.push( + entry + .file_name() + .into_string() + .map_err(|n| std::io::Error::other(format!("non-UTF-8 filename: {n:?}")))?, + ); + } + names.sort(); + let mut children = Vec::with_capacity(names.len()); + for name in names { + let meta = dir.symlink_metadata(&name).await?; + let parent = dir.clone(); + children.push(( + name.clone(), + DirSource(DirSourceInner::Entry { parent, name, meta }), + )); + } + Ok(DirSourceEntries(children.into_iter())) + } + + async fn open(&self, name: &str) -> Result { + let dir = self.open_dir().await?; + let meta = dir.symlink_metadata(name).await?; + let parent = dir.clone(); + Ok(DirSource(DirSourceInner::Entry { + parent, + name: name.to_owned(), + meta, + })) + } +} + +/// Pre-collected children of a [`DirSource`] directory, yielded as a stream. +/// +/// Each entry is already constructed with cached metadata, but its underlying +/// directory handle is NOT opened until someone calls `entries()`/`open()` on it. +pub struct DirSourceEntries(std::vec::IntoIter<(String, DirSource)>); + +impl futures_core::Stream for DirSourceEntries { + type Item = Result<(String, DirSource), DirSourceError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(self.0.next().map(Ok)) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} diff --git a/harmonia-file-io-pure/Cargo.toml b/harmonia-file-io-pure/Cargo.toml new file mode 100644 index 00000000..b47bcd07 --- /dev/null +++ b/harmonia-file-io-pure/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "harmonia-file-io-pure" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +description = "Async file tree IO traits and in-memory implementations" + +[dependencies] +futures-core.workspace = true +futures-util.workspace = true +harmonia-file-core = { workspace = true } +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true + +[dev-dependencies] +serde_json.workspace = true + +[lints] +workspace = true diff --git a/harmonia-file-io-pure/README.md b/harmonia-file-io-pure/README.md new file mode 100644 index 00000000..374e612e --- /dev/null +++ b/harmonia-file-io-pure/README.md @@ -0,0 +1,30 @@ +# harmonia-file-io-pure + +Async file tree IO traits, in-memory implementations, and listing functions. + +## Traits + +- **`FileSystemSource`** — async read-side interface. A node in a file + tree; navigate to children with `open()`, iterate with `entries()`. + Entries yield lazy `ChildThunk` futures so directory handles are only + opened on demand. + +- **`FileSystemSink`** — async write-side interface. Creates a single + node (file, dir, or symlink). `create_directory()` returns a + `DirectorySink` for populating children one level at a time. + +- **`RegularFileSink`** — `AsyncWrite` for streaming file contents. + +Mirrors nix's `SourceAccessor` / `FileSystemObjectSink` architecture +([NixOS/nix#15392](https://github.com/NixOS/nix/pull/15392)) in Rust +with async traits. + +## In-memory implementation + +`MemoryTreeSource` and `MemoryTreeBuilder` provide a pure in-memory +implementation for testing and for building trees from parsed NARs. + +## Listing + +- `list_deep` — fully recursive listing from a `FileSystemSource` +- `list_shallow` — one-level listing with `Opaque` children diff --git a/harmonia-file-io-pure/src/lib.rs b/harmonia-file-io-pure/src/lib.rs new file mode 100644 index 00000000..91c9f2fd --- /dev/null +++ b/harmonia-file-io-pure/src/lib.rs @@ -0,0 +1,43 @@ +//! Async file tree IO traits, in-memory implementations, and listing functions. +//! +//! This does not do any "real" IO, but unlike `harmonia-file-core`, +//! it does use `async` and creates some slightly opinionated IO +//! abstractions. +//! +//! # Read side +//! +//! [`FileSystemSource`] represents a node in a file tree. You navigate +//! to children with [`open`](FileSystemSource::open) and iterate them +//! with [`entries`](FileSystemSource::entries), which yields `(name, child)` +//! pairs. A child carries cached metadata, and its directory handle is +//! opened only when you descend into it. +//! +//! # Write side +//! +//! [`FileSystemSink`] creates a single node (file, dir, or symlink). +//! For directories, it returns a [`DirectorySink`] whose +//! [`create_child`](DirectorySink::create_child) method yields a +//! sub-sink for each child — one level at a time, matching the NAR +//! format's depth-first structure. [`RegularFileSink`] implements +//! [`AsyncWrite`](tokio::io::AsyncWrite) for streaming file contents. +//! +//! # In-memory implementation +//! +//! [`MemoryTreeSource`] and [`MemoryTreeBuilder`] provide a pure +//! in-memory implementation useful for testing and for building +//! trees programmatically (e.g. from parsed NARs). +//! +//! See also [`harmonia-file-core`](harmonia_file_core) for the +//! underlying data types. + +mod listing; +mod serde_impl; +mod sink; +mod source; + +#[cfg(test)] +mod tests; + +pub use listing::{Opaque, ShallowTree, list_deep, list_shallow}; +pub use sink::{DirectorySink, FileSystemSink, MemoryTreeBuilder, RegularFileSink}; +pub use source::{FileSystemSource, FileType, MemoryTreeSource, Stat}; diff --git a/harmonia-file-io-pure/src/listing.rs b/harmonia-file-io-pure/src/listing.rs new file mode 100644 index 00000000..4da99085 --- /dev/null +++ b/harmonia-file-io-pure/src/listing.rs @@ -0,0 +1,71 @@ +use futures_util::StreamExt; + +use harmonia_file_core::{Directory, FileSystemObject, FileTree, Regular, Symlink}; + +use crate::{FileSystemSource, FileType, Stat}; + +/// An opaque placeholder used in shallow listings. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Opaque; + +/// A shallow (one-level) file tree — directory children are [`Opaque`]. +pub type ShallowTree = FileSystemObject; + +/// Produce a fully recursive listing from a [`FileSystemSource`]. +pub async fn list_deep( + source: &impl FileSystemSource, +) -> Result, Box> { + let stat = source.lstat().await.map_err(box_err)?; + match stat.file_type { + FileType::Regular => Ok(FileTree(FileSystemObject::Regular(Regular { + executable: stat.executable, + contents: stat, + }))), + FileType::Symlink => { + let target = source.read_link().await.map_err(box_err)?; + Ok(FileTree(FileSystemObject::Symlink(Symlink { target }))) + } + FileType::Directory => { + let mut entries = std::collections::BTreeMap::new(); + let mut stream = source.entries().await.map_err(box_err)?; + while let Some(item) = stream.next().await { + let (name, child) = item.map_err(box_err)?; + let listing = Box::pin(list_deep(&child)).await?; + entries.insert(name, Box::new(listing)); + } + Ok(FileTree(FileSystemObject::Directory(Directory { entries }))) + } + } +} + +/// Produce a shallow (one-level) listing from a [`FileSystemSource`]. +/// +/// Directory children are represented as [`Opaque`] placeholders. +pub async fn list_shallow( + source: &impl FileSystemSource, +) -> Result, Box> { + let stat = source.lstat().await.map_err(box_err)?; + match stat.file_type { + FileType::Regular => Ok(FileSystemObject::Regular(Regular { + executable: stat.executable, + contents: stat, + })), + FileType::Symlink => { + let target = source.read_link().await.map_err(box_err)?; + Ok(FileSystemObject::Symlink(Symlink { target })) + } + FileType::Directory => { + let mut entries = std::collections::BTreeMap::new(); + let mut stream = source.entries().await.map_err(box_err)?; + while let Some(item) = stream.next().await { + let (name, _child) = item.map_err(box_err)?; + entries.insert(name, Opaque); + } + Ok(FileSystemObject::Directory(Directory { entries })) + } + } +} + +fn box_err(e: impl std::error::Error + 'static) -> Box { + Box::new(e) +} diff --git a/harmonia-file-io-pure/src/serde_impl.rs b/harmonia-file-io-pure/src/serde_impl.rs new file mode 100644 index 00000000..ae8b73d0 --- /dev/null +++ b/harmonia-file-io-pure/src/serde_impl.rs @@ -0,0 +1,67 @@ +//! Serde implementations for IO types that can't derive. + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::listing::Opaque; +use crate::source::{FileType, Stat}; + +// --------------------------------------------------------------------------- +// Opaque — serializes as `{}` +// --------------------------------------------------------------------------- + +impl Serialize for Opaque { + fn serialize(&self, serializer: S) -> Result { + use serde::ser::SerializeMap; + serializer.serialize_map(Some(0))?.end() + } +} + +impl<'de> Deserialize<'de> for Opaque { + fn deserialize>(deserializer: D) -> Result { + serde::de::IgnoredAny::deserialize(deserializer)?; + Ok(Opaque) + } +} + +// --------------------------------------------------------------------------- +// Stat — flattened into Regular as `{"size": N}` +// --------------------------------------------------------------------------- + +impl Serialize for Stat { + fn serialize(&self, serializer: S) -> Result { + use serde::ser::SerializeMap; + let mut map = serializer.serialize_map(None)?; + if let Some(size) = self.file_size { + map.serialize_entry("size", &size)?; + } + map.end() + } +} + +impl<'de> Deserialize<'de> for Stat { + fn deserialize>(deserializer: D) -> Result { + #[derive(Deserialize)] + struct Helper { + #[serde(rename = "type", default)] + file_type: Option, + executable: Option, + size: Option, + } + let h = Helper::deserialize(deserializer)?; + let file_type = match h.file_type.as_deref() { + Some("regular") | None => FileType::Regular, + Some("directory") => FileType::Directory, + Some("symlink") => FileType::Symlink, + Some(other) => { + return Err(::custom(format!( + "invalid file type: {other}" + ))); + } + }; + Ok(Stat { + file_type, + file_size: h.size, + executable: h.executable.unwrap_or(false), + }) + } +} diff --git a/harmonia-file-io-pure/src/sink.rs b/harmonia-file-io-pure/src/sink.rs new file mode 100644 index 00000000..0b24feec --- /dev/null +++ b/harmonia-file-io-pure/src/sink.rs @@ -0,0 +1,187 @@ +use tokio::io::AsyncWrite; + +use harmonia_file_core::{Directory, FileSystemObject, FileTree, MemoryTree, Regular, Symlink}; + +// --------------------------------------------------------------------------- +// Traits +// --------------------------------------------------------------------------- + +/// Write-side interface: choose what kind of node to create at this position. +/// +/// One level at a time — `create_directory` returns a [`DirectorySink`] +/// for populating children, `create_regular_file` returns a [`RegularFileSink`] +/// for streaming contents. +// TODO: desugar to `fn foo() -> impl Future<...> + Send` once we need +// to spawn tasks that hold these futures across await points. +#[allow(async_fn_in_trait)] +pub trait FileSystemSink: Sized { + type Error: std::error::Error; + type Directory: DirectorySink; + type File: RegularFileSink; + + async fn create_directory(self) -> Result; + async fn create_regular_file(self, executable: bool) -> Result; + async fn create_symlink(self, target: &str) -> Result<(), Self::Error>; +} + +/// Populate a directory's children one at a time. +#[allow(async_fn_in_trait)] +pub trait DirectorySink: Sized { + type Error: std::error::Error; + type Child<'a>: FileSystemSink + where + Self: 'a; + + async fn create_child(&mut self, name: &str) -> Result, Self::Error>; +} + +/// Sink for streaming contents into a regular file. +pub trait RegularFileSink: AsyncWrite + Unpin { + type Error: std::error::Error; +} + +// --------------------------------------------------------------------------- +// In-memory implementation +// --------------------------------------------------------------------------- + +/// Errors from in-memory tree building. +#[derive(Debug, thiserror::Error)] +pub enum MemoryBuildError { + #[error("IO error reading contents: {0}")] + Io(#[from] std::io::Error), +} + +fn placeholder() -> MemoryTree { + FileTree(FileSystemObject::Directory(Directory { + entries: Default::default(), + })) +} + +/// Builder that owns a [`MemoryTree`] and provides a [`FileSystemSink`] +/// for populating it. +pub struct MemoryTreeBuilder { + root: MemoryTree, +} + +impl MemoryTreeBuilder { + pub fn new() -> Self { + Self { + root: placeholder(), + } + } + + /// Get a sink for the root slot. + pub fn sink(&mut self) -> MemorySlotSink<'_> { + MemorySlotSink(&mut self.root) + } + + /// Consume the builder and return the finished tree. + pub fn build(self) -> MemoryTree { + self.root + } +} + +impl Default for MemoryTreeBuilder { + fn default() -> Self { + Self::new() + } +} + +/// A mutable reference to a tree node that will be overwritten. +pub struct MemorySlotSink<'a>(&'a mut MemoryTree); + +impl<'a> FileSystemSink for MemorySlotSink<'a> { + type Error = MemoryBuildError; + type Directory = MemoryDirSink<'a>; + type File = MemoryFileSink<'a>; + + async fn create_directory(self) -> Result { + *self.0 = FileTree(FileSystemObject::Directory(Directory { + entries: Default::default(), + })); + Ok(MemoryDirSink(self.0)) + } + + async fn create_regular_file(self, executable: bool) -> Result { + Ok(MemoryFileSink { + slot: self.0, + executable, + data: Vec::new(), + }) + } + + async fn create_symlink(self, target: &str) -> Result<(), Self::Error> { + *self.0 = FileTree(FileSystemObject::Symlink(Symlink { + target: target.to_owned(), + })); + Ok(()) + } +} + +/// A directory being populated with children. +pub struct MemoryDirSink<'a>(&'a mut MemoryTree); + +impl<'a> DirectorySink for MemoryDirSink<'a> { + type Error = MemoryBuildError; + type Child<'b> + = MemorySlotSink<'b> + where + Self: 'b; + + async fn create_child(&mut self, name: &str) -> Result, Self::Error> { + let FileTree(FileSystemObject::Directory(dir)) = &mut *self.0 else { + unreachable!("MemoryDirSink always wraps a directory"); + }; + dir.entries + .entry(name.to_owned()) + .or_insert_with(|| Box::new(placeholder())); + let child = dir.entries.get_mut(name).unwrap(); + Ok(MemorySlotSink(child.as_mut())) + } +} + +/// A regular file being written. +pub struct MemoryFileSink<'a> { + slot: &'a mut MemoryTree, + executable: bool, + data: Vec, +} + +impl<'a> AsyncWrite for MemoryFileSink<'a> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.data.extend_from_slice(buf); + std::task::Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +impl<'a> RegularFileSink for MemoryFileSink<'a> { + type Error = MemoryBuildError; +} + +impl<'a> Drop for MemoryFileSink<'a> { + fn drop(&mut self) { + let data = std::mem::take(&mut self.data); + *self.slot = FileTree(FileSystemObject::Regular(Regular { + executable: self.executable, + contents: data, + })); + } +} diff --git a/harmonia-file-io-pure/src/source.rs b/harmonia-file-io-pure/src/source.rs new file mode 100644 index 00000000..13340f25 --- /dev/null +++ b/harmonia-file-io-pure/src/source.rs @@ -0,0 +1,187 @@ +use tokio::io::AsyncRead; + +use harmonia_file_core::{Directory, FileSystemObject, FileTree, MemoryTree, Regular}; + +// --------------------------------------------------------------------------- +// Stat / FileType +// --------------------------------------------------------------------------- + +/// The type of a file-system entry. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FileType { + Regular, + Directory, + Symlink, +} + +/// Metadata for a file-system entry. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Stat { + pub file_type: FileType, + pub file_size: Option, + pub executable: bool, +} + +// --------------------------------------------------------------------------- +// FileSystemSource trait +// --------------------------------------------------------------------------- + +/// Read-side interface for accessing a file tree. +/// +/// The accessor represents a node in the tree. Use [`open`](Self::open) +/// to navigate to children, or [`entries`](Self::entries) to iterate them. +// TODO: desugar `async fn` in trait to explicit associated type to give downstream code more +// expressive power at some point. +#[allow(async_fn_in_trait)] +pub trait FileSystemSource: Sized { + type Error: std::error::Error + 'static; + type Reader: AsyncRead + Unpin; + type Child: FileSystemSource; + type Entries<'a>: futures_core::Stream> + Unpin + where + Self: 'a; + + /// Get metadata for this node (does not follow symlinks). + async fn lstat(&self) -> Result; + + /// Read the contents of this node (must be a regular file). + async fn read_file(&self) -> Result; + + /// Read the symlink target of this node (must be a symlink). + async fn read_link(&self) -> Result; + + /// Iterate over children of this directory node. + /// + /// Each item is a `(name, child)` pair where the child is already + /// opened as a [`Self::Child`]. Returns an error if this node is not + /// a directory. Individual entries may also fail (e.g. permission + /// errors on a filesystem source). + /// + /// Entries are yielded in sorted order. + async fn entries(&self) -> Result, Self::Error>; + + /// Open a child entry by name. + async fn open(&self, name: &str) -> Result; +} + +// --------------------------------------------------------------------------- +// MemoryTree implementation +// --------------------------------------------------------------------------- + +/// A reference into a [`MemoryTree`] acting as a [`FileSystemSource`]. +#[derive(Debug, Clone)] +pub struct MemoryTreeSource<'a> { + node: &'a MemoryTree, +} + +impl<'a> MemoryTreeSource<'a> { + pub fn new(tree: &'a MemoryTree) -> Self { + Self { node: tree } + } +} + +/// Errors from in-memory tree access. +#[derive(Debug, thiserror::Error)] +#[allow(clippy::enum_variant_names)] +pub enum MemorySourceError { + #[error("entry not found: {0}")] + NotFound(String), + #[error("not a regular file")] + NotAFile, + #[error("not a symlink")] + NotASymlink, + #[error("not a directory")] + NotADirectory, +} + +impl<'a> FileSystemSource for MemoryTreeSource<'a> { + type Error = MemorySourceError; + type Reader = std::io::Cursor>; + type Child = Self; + type Entries<'b> + = MemoryTreeEntries<'a> + where + Self: 'b; + + async fn lstat(&self) -> Result { + Ok(stat_of(self.node)) + } + + async fn read_file(&self) -> Result { + match self.node { + FileTree(FileSystemObject::Regular(r)) => Ok(std::io::Cursor::new(r.contents.clone())), + _ => Err(MemorySourceError::NotAFile), + } + } + + async fn read_link(&self) -> Result { + match self.node { + FileTree(FileSystemObject::Symlink(s)) => Ok(s.target.clone()), + _ => Err(MemorySourceError::NotASymlink), + } + } + + async fn entries(&self) -> Result, Self::Error> { + match self.node { + FileTree(FileSystemObject::Directory(d)) => Ok(MemoryTreeEntries(d.entries.iter())), + _ => Err(MemorySourceError::NotADirectory), + } + } + + async fn open(&self, name: &str) -> Result { + match self.node { + FileTree(FileSystemObject::Directory(Directory { entries })) => { + let child = entries + .get(name) + .ok_or_else(|| MemorySourceError::NotFound(name.to_owned()))?; + Ok(MemoryTreeSource { node: child }) + } + _ => Err(MemorySourceError::NotADirectory), + } + } +} + +/// Iterator over children of a [`MemoryTreeSource`] directory. +pub struct MemoryTreeEntries<'a>(std::collections::btree_map::Iter<'a, String, Box>); + +impl<'a> futures_core::Stream for MemoryTreeEntries<'a> { + type Item = Result<(String, MemoryTreeSource<'a>), MemorySourceError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready( + self.0 + .next() + .map(|(name, child)| Ok((name.clone(), MemoryTreeSource { node: child }))), + ) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +fn stat_of(node: &MemoryTree) -> Stat { + match node { + FileTree(FileSystemObject::Regular(Regular { + executable, + contents, + })) => Stat { + file_type: FileType::Regular, + file_size: Some(contents.len() as u64), + executable: *executable, + }, + FileTree(FileSystemObject::Directory(_)) => Stat { + file_type: FileType::Directory, + file_size: None, + executable: false, + }, + FileTree(FileSystemObject::Symlink(_)) => Stat { + file_type: FileType::Symlink, + file_size: None, + executable: false, + }, + } +} diff --git a/harmonia-file-io-pure/src/tests.rs b/harmonia-file-io-pure/src/tests.rs new file mode 100644 index 00000000..5eaa760f --- /dev/null +++ b/harmonia-file-io-pure/src/tests.rs @@ -0,0 +1,205 @@ +use tokio::io::AsyncWriteExt; + +use harmonia_file_core::*; + +use crate::sink::{DirectorySink, FileSystemSink}; +use crate::source::MemoryTreeSource; +use crate::*; + +/// Build a test tree: +/// ```text +/// . +/// ├── bar/ +/// │ ├── baz (executable, 19 bytes) +/// │ └── quux -> /over/there +/// └── foo (15 bytes) +/// ``` +async fn sample_tree() -> MemoryTree { + let mut builder = MemoryTreeBuilder::new(); + { + let mut root = builder.sink().create_directory().await.unwrap(); + + { + let mut bar = root + .create_child("bar") + .await + .unwrap() + .create_directory() + .await + .unwrap(); + bar.create_child("baz") + .await + .unwrap() + .create_regular_file(true) + .await + .unwrap() + .write_all(&[0u8; 19]) + .await + .unwrap(); + bar.create_child("quux") + .await + .unwrap() + .create_symlink("/over/there") + .await + .unwrap(); + } + + root.create_child("foo") + .await + .unwrap() + .create_regular_file(false) + .await + .unwrap() + .write_all(&[0u8; 15]) + .await + .unwrap(); + } + builder.build() +} + +#[tokio::test] +async fn source_lstat() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + + let root = src.lstat().await.unwrap(); + assert_eq!(root.file_type, FileType::Directory); + + let foo = src.open("foo").await.unwrap(); + let foo_stat = foo.lstat().await.unwrap(); + assert_eq!(foo_stat.file_type, FileType::Regular); + assert_eq!(foo_stat.file_size, Some(15)); + assert!(!foo_stat.executable); + + let baz = src.open("bar").await.unwrap().open("baz").await.unwrap(); + let baz_stat = baz.lstat().await.unwrap(); + assert_eq!(baz_stat.file_type, FileType::Regular); + assert_eq!(baz_stat.file_size, Some(19)); + assert!(baz_stat.executable); + + let quux = src.open("bar").await.unwrap().open("quux").await.unwrap(); + let quux_stat = quux.lstat().await.unwrap(); + assert_eq!(quux_stat.file_type, FileType::Symlink); +} + +#[tokio::test] +async fn source_read_ops() { + use tokio::io::AsyncReadExt; + + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + + let mut data = Vec::new(); + src.open("foo") + .await + .unwrap() + .read_file() + .await + .unwrap() + .read_to_end(&mut data) + .await + .unwrap(); + assert_eq!(data.len(), 15); + + let target = src + .open("bar") + .await + .unwrap() + .open("quux") + .await + .unwrap() + .read_link() + .await + .unwrap(); + assert_eq!(target, "/over/there"); + + use futures_util::StreamExt; + let entries = src + .entries() + .await + .unwrap() + .map(|r| r.unwrap().0) + .collect::>() + .await; + assert_eq!(entries, ["bar", "foo"]); +} + +#[tokio::test] +async fn list_deep_matches_nix_json() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + let listing = list_deep(&src).await.unwrap(); + let json = serde_json::to_value(&listing).unwrap(); + + let expected = serde_json::from_str::( + r#"{ + "type": "directory", + "entries": { + "bar": { + "type": "directory", + "entries": { + "baz": { + "type": "regular", + "executable": true, + "size": 19 + }, + "quux": { + "type": "symlink", + "target": "/over/there" + } + } + }, + "foo": { + "type": "regular", + "size": 15 + } + } + }"#, + ) + .unwrap(); + + assert_eq!(json, expected); +} + +#[tokio::test] +async fn list_shallow_matches_nix_json() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + let listing = list_shallow(&src).await.unwrap(); + let json = serde_json::to_value(&listing).unwrap(); + + let expected = serde_json::from_str::( + r#"{ + "type": "directory", + "entries": { + "bar": {}, + "foo": {} + } + }"#, + ) + .unwrap(); + + assert_eq!(json, expected); +} + +#[tokio::test] +async fn deep_listing_roundtrip_serde() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + let listing = list_deep(&src).await.unwrap(); + let json = serde_json::to_string(&listing).unwrap(); + let parsed = serde_json::from_str::>(&json).unwrap(); + let json2 = serde_json::to_string(&parsed).unwrap(); + assert_eq!(json, json2); +} + +#[tokio::test] +async fn shallow_listing_roundtrip_serde() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + let listing = list_shallow(&src).await.unwrap(); + let json = serde_json::to_string(&listing).unwrap(); + let parsed = serde_json::from_str::>(&json).unwrap(); + let json2 = serde_json::to_string(&parsed).unwrap(); + assert_eq!(json, json2); +} diff --git a/harmonia-file-nar/Cargo.toml b/harmonia-file-nar/Cargo.toml index 7296d839..50a6ae1b 100644 --- a/harmonia-file-nar/Cargo.toml +++ b/harmonia-file-nar/Cargo.toml @@ -13,23 +13,24 @@ repository.workspace = true description = "NAR (Nix ARchive) format packing and unpacking" [dependencies] -bstr = { workspace = true } -bytes = { workspace = true } -derive_more = { workspace = true } -futures-core = { workspace = true } -futures-sink = { workspace = true } -futures-util = { workspace = true } -harmonia-file-core = { workspace = true } -harmonia-utils-io = { workspace = true } -nix.workspace = true -pin-project-lite = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true, features = [ "fs", "io-util" ] } -tokio-util = { workspace = true, features = [ "io", "io-util" ] } -tracing = { workspace = true } -walkdir = "2.5" +bstr = { workspace = true } +bytes = { workspace = true } +derive_more = { workspace = true } +futures-core = { workspace = true } +futures-sink = { workspace = true } +futures-util = { workspace = true } +harmonia-file-core = { workspace = true } +harmonia-file-fd = { workspace = true } +harmonia-file-io-pure = { workspace = true } +harmonia-utils-io = { workspace = true } +nix.workspace = true +pin-project-lite = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = [ "fs", "io-util", "sync" ] } +tokio-util = { workspace = true, features = [ "io", "io-util" ] } +tracing = { workspace = true } [dev-dependencies] harmonia-utils-test = { workspace = true } diff --git a/harmonia-file-nar/README.md b/harmonia-file-nar/README.md index a5bddd88..1a00fa02 100644 --- a/harmonia-file-nar/README.md +++ b/harmonia-file-nar/README.md @@ -1,23 +1,39 @@ # harmonia-file-nar -NAR (Nix ARchive) format handling. +NAR (Nix ARchive) format handling through `harmonia-file-io-pure` traits. ## Overview -This crate packs and unpacks NAR archives, the archive format used by -Nix for representing store paths as byte streams. +This crate packs and unpacks NAR archives using the generic +`FileSystemSource` and `FileSystemSink` traits from `harmonia-file-io-pure`. +Any source (filesystem, memory tree, remote store) can be dumped to NAR, +and any NAR can be restored into any sink. -## Key types +## Key functions -- **`NarDumper`** — stream that walks a filesystem path and produces `NarEvent`s -- **`NarRestorer`** — sink that restores `NarEvent`s to a filesystem path -- **`NarByteStream`** — streaming NAR byte output for HTTP serving -- **`NarParser`** — parse NAR bytes into `NarEvent`s -- **`parse_nar_listing`** — parse NAR bytes into a `FileTree` listing +- **`dump_source`** — write a NAR archive from a `FileSystemSource` to an `AsyncWrite` +- **`restore_to_sink`** — parse a NAR archive and write to a `FileSystemSink` +- **`parse_nar_listing`** — parse a NAR archive into a `FileTree` (JSON-compatible listing) +- **`NarByteStream`** — streaming NAR byte output from a filesystem path + +## Example + +```rust +use harmonia_file_core::{MemoryTreeSource, list_deep}; +use harmonia_file_nar::{dump_source, restore_to_sink}; + +// Dump any FileSystemSource to NAR bytes +let mut nar = Vec::new(); +dump_source(&source, &mut nar).await?; + +// Restore NAR bytes into any FileSystemSink +restore_to_sink(reader, sink).await?; +``` ## Design - **Streaming**: Never requires entire NAR in memory -- **Async**: Built on tokio +- **Trait-based**: Dump/restore go through `FileSystemSource`/`FileSystemSink`, + not hardcoded to filesystem paths - **Format-focused**: Only concerned with NAR archive structure - **Composable**: Can be used independently of the daemon diff --git a/harmonia-file-nar/src/archive/byte_stream.rs b/harmonia-file-nar/src/archive/byte_stream.rs index 0f2ba242..31762adb 100644 --- a/harmonia-file-nar/src/archive/byte_stream.rs +++ b/harmonia-file-nar/src/archive/byte_stream.rs @@ -1,182 +1,84 @@ +//! Streaming NAR byte output from a filesystem path. + use std::io; use std::path::PathBuf; use std::pin::Pin; -use std::task::{Context, Poll, ready}; +use std::task::{Context, Poll}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::Bytes; use futures_core::Stream; +use tokio::io::AsyncReadExt; +use tokio::sync::mpsc; -use super::NarEvent; -use super::dumper::{DumpOptions, NarDumper}; -use super::read_nar::{ - TOK_DIR, TOK_ENTRY, TOK_FILE, TOK_FILE_E, TOK_NODE, TOK_PAR, TOK_ROOT, TOK_SYM, -}; -use crate::wire::calc_padding; - -/// Flush accumulated framing once it exceeds this many bytes so a long run of -/// tiny entries (symlinks, empty dirs) does not buffer unbounded amounts of -/// metadata before yielding to the consumer. -const FRAME_FLUSH_THRESHOLD: usize = 32 * 1024; - -/// Max size of a single file-content chunk yielded downstream. -/// -/// Large mmap'd files are sliced into pieces of this size so the HTTP layer -/// can interleave socket writes with further NAR encoding work and so a slow -/// client does not pin a multi-GiB `Bytes` in actix's write buffer at once. -/// Slicing a `Bytes` is just a refcount bump — no copy. -const FILE_CHUNK_SIZE: usize = 256 * 1024; - -enum Phase { - /// Pull the next [`NarEvent`] from the dumper and encode framing. - Event, - /// Yield file content (possibly in multiple slices), then append the - /// trailing padding/`)` tokens to `frame` and go back to [`Phase::Event`]. - Emit { - data: Bytes, - size: u64, - }, - Done, -} +use super::dumper::dump_source; /// A [`Stream`] of [`Bytes`] chunks containing NAR-encoded data for a path. /// -/// Drives a [`NarDumper`] directly and emits the NAR wire format without -/// intermediate copies: framing tokens are accumulated in a small reusable -/// buffer, and file payloads are forwarded as the `Bytes` already loaded by -/// the dumper (heap buffer for small files, mmap-backed for large ones). The -/// only per-byte copy on the serving path is the one the HTTP layer performs -/// into its socket write buffer. +/// Opens the path as a [`DirSource`](harmonia_file_fd::DirSource) and +/// drives [`dump_source`] on a spawned task. The NAR bytes are streamed +/// back via a channel. pub struct NarByteStream { - dumper: NarDumper, - /// Scratch buffer for NAR structure tokens between file payloads. - frame: BytesMut, - /// Directory nesting depth, mirroring [`NarWriter`]'s `level` so the - /// emitted token sequence is byte-identical. - level: u32, - phase: Phase, + rx: mpsc::Receiver>, } impl NarByteStream { /// Create a new `NarByteStream` for the given filesystem path. pub fn new(path: PathBuf) -> Self { - Self { - dumper: DumpOptions::new().dump(path), - frame: BytesMut::with_capacity(FRAME_FLUSH_THRESHOLD), - level: 0, - phase: Phase::Event, - } + let (tx, rx) = mpsc::channel(8); + tokio::spawn(async move { + let result = Self::produce(path, tx.clone()).await; + if let Err(e) = result { + let _ = tx.send(Err(e)).await; + } + }); + Self { rx } } - /// Append the entry-prefix tokens that precede every non-root node. - fn put_entry_header(frame: &mut BytesMut, name: &[u8]) { - frame.put_slice(TOK_ENTRY); - put_nix_slice(frame, name); - frame.put_slice(TOK_NODE); - } -} + async fn produce(path: PathBuf, tx: mpsc::Sender>) -> io::Result<()> { + use harmonia_file_fd::DirSource; -fn put_nix_slice(buf: &mut BytesMut, src: &[u8]) { - buf.put_u64_le(src.len() as u64); - buf.put_slice(src); - buf.put_bytes(0, calc_padding(src.len() as u64)); -} + let source = DirSource::open_path(&path) + .await + .map_err(io::Error::other)?; -impl Stream for NarByteStream { - type Item = io::Result; + // Write NAR to a duplex stream, read chunks and send to channel + let (mut writer, mut reader) = tokio::io::duplex(64 * 1024); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - loop { - match &mut this.phase { - Phase::Event => { - if this.frame.len() >= FRAME_FLUSH_THRESHOLD { - return Poll::Ready(Some(Ok(this.frame.split().freeze()))); - } - match ready!(Pin::new(&mut this.dumper).poll_next(cx)) { - Some(Ok(event)) => { - if this.level == 0 { - this.frame.put_slice(TOK_ROOT); - } - match event { - NarEvent::StartDirectory { name } => { - if this.level > 0 { - Self::put_entry_header(&mut this.frame, &name); - } - this.frame.put_slice(TOK_DIR); - this.level += 1; - } - NarEvent::EndDirectory => { - this.frame.put_slice(TOK_PAR); - this.level -= 1; - if this.level > 0 { - this.frame.put_slice(TOK_PAR); - } - } - NarEvent::Symlink { name, target } => { - if this.level > 0 { - Self::put_entry_header(&mut this.frame, &name); - } - this.frame.put_slice(TOK_SYM); - put_nix_slice(&mut this.frame, &target); - this.frame.put_slice(TOK_PAR); - if this.level > 0 { - this.frame.put_slice(TOK_PAR); - } - } - NarEvent::File { - name, - executable, - size, - reader, - } => { - if this.level > 0 { - Self::put_entry_header(&mut this.frame, &name); - } - this.frame.put_slice(if executable { - TOK_FILE_E - } else { - TOK_FILE - }); - this.frame.put_u64_le(size); - this.phase = Phase::Emit { - data: reader.into_bytes(), - size, - }; - // Flush framing now so file bytes follow - // immediately without being copied into - // the frame buffer. - if !this.frame.is_empty() { - return Poll::Ready(Some(Ok(this.frame.split().freeze()))); - } - } - } - } - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - None => { - this.phase = Phase::Done; - if !this.frame.is_empty() { - return Poll::Ready(Some(Ok(this.frame.split().freeze()))); - } - } + let dump_handle = tokio::spawn(async move { dump_source(&source, &mut writer).await }); + + let outcome = loop { + let mut buf = vec![0u8; 256 * 1024]; + match reader.read(&mut buf).await { + Ok(0) => break Ok(()), + Ok(n) => { + buf.truncate(n); + if tx.send(Ok(Bytes::from(buf))).await.is_err() { + break Err(None); // receiver dropped } } - Phase::Emit { data, size } => { - if !data.is_empty() { - let n = data.len().min(FILE_CHUNK_SIZE); - let chunk = data.split_to(n); - return Poll::Ready(Some(Ok(chunk))); - } - // File fully emitted; append trailer and resume framing. - this.frame.put_bytes(0, calc_padding(*size)); - this.frame.put_slice(TOK_PAR); - if this.level > 0 { - this.frame.put_slice(TOK_PAR); - } - this.phase = Phase::Event; + Err(e) => break Err(Some(e)), + } + }; + + match outcome { + Ok(()) => dump_handle.await.map_err(io::Error::other)??, + // The dumper would block forever writing into the undrained duplex, so abort instead of awaiting. + Err(reason) => { + dump_handle.abort(); + if let Some(e) = reason { + return Err(e); } - Phase::Done => return Poll::Ready(None), } } + Ok(()) + } +} + +impl Stream for NarByteStream { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.rx.poll_recv(cx) } } @@ -196,19 +98,14 @@ mod tests { out } - /// The zero-copy byte stream must produce the exact same bytes as the - /// reference [`NarWriter`] for every test fixture. + /// The byte stream must produce valid NAR matching `nix-store --dump`. #[tokio::test] - async fn byte_stream_matches_nar_writer() { + async fn byte_stream_matches_nix_store_dump() { let dir = tempfile::Builder::new() .prefix("nar_byte_stream") .tempdir() .unwrap(); let path = dir.path().join("nar"); - // `NarByteStream::new` uses the platform default for case-hack - // (enabled on macOS, disabled elsewhere); the on-disk fixture must be - // created with the matching flag so the case-colliding `Deep`/`deep` - // pair round-trips on case-insensitive filesystems. let case_hack = cfg!(target_os = "macos"); test_data::create_dir_example(&path, case_hack).unwrap(); @@ -228,17 +125,31 @@ mod tests { assert_eq!(got, want.to_vec()); } - /// Exercise the mmap-backed `Bytes::from_owner` path and the - /// `FILE_CHUNK_SIZE` slicing of large payloads. + /// A root that is itself a symlink must be dumped as the symlink, not the + /// directory it points at. + #[tokio::test] + async fn byte_stream_root_symlink_not_followed() { + let dir = tempfile::tempdir().unwrap(); + std::fs::create_dir(dir.path().join("target")).unwrap(); + let link = dir.path().join("link"); + std::os::unix::fs::symlink("target", &link).unwrap(); + + let got = collect(link.clone()).await; + let want = std::process::Command::new("nix-store") + .arg("--dump") + .arg(&link) + .output() + .expect("nix-store --dump failed"); + assert!(want.status.success()); + assert_eq!(got, want.stdout); + } + #[tokio::test] async fn byte_stream_large_file_matches_nix_store_dump() { - // Larger than SMALL_FILE_THRESHOLD and not a multiple of - // FILE_CHUNK_SIZE / 8, so padding and the final short slice are both - // covered. const LEN: usize = 300 * 1024 + 5; let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("big"); - let data: Vec = (0..LEN).map(|i| (i % 251) as u8).collect(); + let data = (0..LEN).map(|i| (i % 251) as u8).collect::>(); std::fs::write(&path, &data).unwrap(); let got = collect(path.clone()).await; diff --git a/harmonia-file-nar/src/archive/dumper.rs b/harmonia-file-nar/src/archive/dumper.rs index bc1ec0f2..06851385 100644 --- a/harmonia-file-nar/src/archive/dumper.rs +++ b/harmonia-file-nar/src/archive/dumper.rs @@ -1,509 +1,186 @@ -use std::cmp::Ordering; -use std::ffi::OsStr; -use std::fs::read_link; -use std::future::Future as _; -use std::os::unix::ffi::OsStrExt; -#[cfg(unix)] -use std::os::unix::ffi::OsStringExt; -#[cfg(unix)] -use std::os::unix::fs::PermissionsExt as _; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; -use std::{collections::VecDeque, io}; - -use bstr::ByteSlice as _; -use bytes::Bytes; -use futures_core::Stream; -use tokio::io::AsyncRead; -use tokio::task::{JoinHandle, spawn_blocking}; -use tracing::debug; -use walkdir::{DirEntry, IntoIter}; - -use super::{CASE_HACK_SUFFIX, NarEvent}; - -pub struct DumpOptions { - use_case_hack: bool, -} - -impl DumpOptions { - pub fn new() -> Self { - #[cfg(target_os = "macos")] - let use_case_hack = true; - #[cfg(not(target_os = "macos"))] - let use_case_hack = false; - Self { use_case_hack } - } - - pub fn use_case_hack(mut self, use_case_hack: bool) -> Self { - self.use_case_hack = use_case_hack; - self - } - - pub fn dump>(self, path: P) -> NarDumper { - let root = path.into(); - let mut walker = walkdir::WalkDir::new(&root) - .follow_links(false) - .follow_root_links(false); - walker = if self.use_case_hack { - walker.sort_by(sort_case_hack) - } else { - walker.sort_by(|a, b| fast_file_name(a.path()).cmp(fast_file_name(b.path()))) - }; - let walker = walker.into_iter(); - NarDumper { - state: State::Idle(Some(Box::new(BatchState { - buf: VecDeque::with_capacity(CHUNK_SIZE), - walker, - remain: true, - use_case_hack: self.use_case_hack, - }))), - next: None, - level: 0, - } - } -} - -impl Default for DumpOptions { - fn default() -> Self { - Self::new() - } -} - -pub fn dump>(path: P) -> NarDumper { - DumpOptions::new().dump(path) -} - -/// Return the final path segment as raw bytes. -/// -/// Uses a byte-level search instead of `Path::file_name`, which performs a -/// full `Components` parse on every call. This is safe because walkdir builds -/// entry paths as `parent.join(file_name)`, so on Unix the segment after the -/// last `/` is exactly the file name with no `.`/`..` to normalise. -#[cfg(unix)] -fn fast_file_name(p: &Path) -> &[u8] { - let b = p.as_os_str().as_bytes(); - match b.rfind_byte(b'/') { - Some(i) => &b[i + 1..], - None => b, - } -} - -fn sort_case_hack(left: &DirEntry, right: &DirEntry) -> Ordering { - let left_file_name = left.file_name(); - let right_file_name = right.file_name(); - remove_case_hack_osstr(left_file_name) - .unwrap_or(left_file_name) - .cmp(remove_case_hack_osstr(right_file_name).unwrap_or(right_file_name)) -} - -fn remove_case_hack_osstr(name: &OsStr) -> Option<&OsStr> { - if let Some(n) = <[u8]>::from_os_str(name) - && let Some(pos) = n.rfind(CASE_HACK_SUFFIX) - { - return Some(OsStr::from_bytes(&n[..pos])); - } - None -} - -fn remove_case_hack(name: &mut Bytes) { - if let Some(pos) = name.rfind(CASE_HACK_SUFFIX) { - debug!("removing case hack suffix from '{:?}'", name); - name.truncate(pos); - } -} - -/// Turn a walkdir entry into the NAR entry name. -/// -/// The root node carries no name in the NAR format. For deeper entries the -/// final path segment is copied into a fresh small `Bytes`; the entry's -/// `PathBuf` is then dropped here on the blocking thread. -fn entry_name(entry: DirEntry, use_case_hack: bool) -> Bytes { - if entry.depth() == 0 { - return Bytes::new(); - } - let mut name = Bytes::copy_from_slice(fast_file_name(entry.path())); - if use_case_hack { - remove_case_hack(&mut name); - } - name -} - -/// Convert an `OsString` (symlink target) into `Bytes` by moving its buffer. -#[cfg(unix)] -fn os_string_into_bytes(s: std::ffi::OsString) -> Bytes { - Bytes::from(s.into_vec()) -} - -use super::mmap::MappedFile; - -/// Files up to this size are read into a heap buffer; larger files are -/// memory-mapped so streaming a multi-gigabyte store path stays bounded. -const SMALL_FILE_THRESHOLD: u64 = 256 * 1024; // 256 KiB - -/// Load a file as a single `Bytes` value without intermediate copies: -/// small files become `Bytes::from(Vec)`, large files become a refcounted -/// view over the mmap via `Bytes::from_owner`. -/// -/// `size` comes from the directory walk's `lstat`, so the small-file branch -/// allocates exactly once at the right capacity instead of letting -/// `std::fs::read` issue its own `fstat` to size the buffer. -fn load_file_bytes(path: &Path, size: u64) -> io::Result { - if size <= SMALL_FILE_THRESHOLD { - use std::io::Read as _; - let mut f = std::fs::File::open(path)?; - let mut buf = Vec::with_capacity(size as usize); - f.read_to_end(&mut buf)?; - // The NAR length prefix has already been derived from `size`; if the - // file changed under us the archive would silently desync, so surface - // it as an explicit error instead. - if buf.len() as u64 != size { - return Err(io::Error::other(format!( - "file {path:?} changed size during dump: stat {size}, read {}", - buf.len() - ))); +//! NAR dump: serialize a [`FileSystemSource`] to NAR wire format. + +use std::io; + +use futures_util::StreamExt; +use harmonia_file_io_pure::FileSystemSource; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use super::read_nar::{ + TOK_DIR, TOK_ENTRY, TOK_FILE, TOK_FILE_E, TOK_NODE, TOK_PAR, TOK_ROOT, TOK_SYM, +}; + +/// Write a NAR archive from a [`FileSystemSource`] to an async writer. +pub async fn dump_source( + source: &S, + writer: &mut W, +) -> io::Result<()> { + writer.write_all(TOK_ROOT).await?; + dump_node(source, writer).await?; + writer.write_all(TOK_PAR).await?; + Ok(()) +} + +async fn dump_node( + source: &S, + w: &mut W, +) -> io::Result<()> { + let stat = source.lstat().await.map_err(to_io)?; + match stat.file_type { + harmonia_file_io_pure::FileType::Regular => { + w.write_all(if stat.executable { + TOK_FILE_E + } else { + TOK_FILE + }) + .await?; + let size = stat.file_size.unwrap_or(0); + w.write_all(&size.to_le_bytes()).await?; + let mut reader = source.read_file().await.map_err(to_io)?; + let copied = tokio::io::copy(&mut reader, w).await?; + if copied != size { + return Err(io::Error::other(format!( + "file size mismatch: expected {size}, wrote {copied}" + ))); + } + let padding = crate::wire::calc_padding(size); + if padding > 0 { + w.write_all(&crate::wire::ZEROS[..padding]).await?; + } } - Ok(Bytes::from(buf)) - } else { - Ok(Bytes::from_owner(MappedFile::open(path, size)?)) - } -} - -/// File contents for a [`NarEvent::File`], already loaded into memory (or -/// memory-mapped) by the same blocking task that walked the directory. -/// -/// The data is fetched eagerly so the async consumer never has to round-trip -/// to the blocking pool per file; neither representation holds an open file -/// descriptor, so at most [`CHUNK_SIZE`] entries worth of small-file buffers -/// plus mappings are resident at a time. -pub struct DumpedFile { - data: Bytes, -} - -impl DumpedFile { - fn new(data: Bytes) -> Self { - Self { data } - } - - /// Take the file content as a zero-copy [`Bytes`]. - pub fn into_bytes(self) -> Bytes { - self.data - } -} - -impl AsyncRead for DumpedFile { - fn poll_read( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let n = self.data.len().min(buf.remaining()); - buf.put_slice(&self.data[..n]); - // `Bytes::advance` on an owned/mmap-backed buffer just bumps an offset. - bytes::Buf::advance(&mut self.data, n); - Poll::Ready(Ok(())) - } -} - -/// A fully prepared directory entry: everything the async side needs to emit -/// the corresponding [`NarEvent`] without touching the filesystem again. -enum Entry { - File { - depth: usize, - name: Bytes, - size: u64, - executable: bool, - data: Bytes, - }, - Symlink { - depth: usize, - name: Bytes, - target: Bytes, - }, - Directory { - depth: usize, - name: Bytes, - }, -} - -impl Entry { - fn depth(&self) -> usize { - match self { - Entry::File { depth, .. } => *depth, - Entry::Symlink { depth, .. } => *depth, - Entry::Directory { depth, .. } => *depth, + harmonia_file_io_pure::FileType::Symlink => { + w.write_all(TOK_SYM).await?; + let target = source.read_link().await.map_err(to_io)?; + write_str(w, target.as_bytes()).await?; } - } -} - -struct BatchState { - buf: VecDeque>, - walker: IntoIter, - remain: bool, - use_case_hack: bool, -} - -#[allow(clippy::large_enum_variant)] -enum State { - Idle(Option>), - Pending(JoinHandle>), -} - -const CHUNK_SIZE: usize = 25; - -impl State { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match self { - State::Idle(data) => { - let st = data.as_mut().unwrap(); - if let Some(entry) = st.buf.pop_front() { - return Poll::Ready(Some(entry)); - } else if !st.remain { - return Poll::Ready(None); - } - let mut st = data.take().unwrap(); - *self = State::Pending(spawn_blocking(move || { - st.remain = - State::next_chunk(&mut st.buf, &mut st.walker, st.use_case_hack); - st - })); - } - State::Pending(handler) => { - *self = State::Idle(Some(ready!(Pin::new(handler).poll(cx))?)); - } + harmonia_file_io_pure::FileType::Directory => { + w.write_all(TOK_DIR).await?; + let mut entries = source.entries().await.map_err(to_io)?; + let mut unhacked = Vec::<(String, S::Child)>::new(); + while let Some(item) = entries.next().await { + let (name, child) = item.map_err(to_io)?; + let name = if cfg!(target_os = "macos") { + strip_case_hack(&name).to_owned() + } else { + name + }; + unhacked.push((name, child)); } - } - } - fn next_chunk( - buf: &mut VecDeque>, - iter: &mut IntoIter, - use_case_hack: bool, - ) -> bool { - for _ in 0..CHUNK_SIZE { - match iter.next() { - Some(res) => { - let res = res.map_err(io::Error::from).and_then(|entry| { - let depth = entry.depth(); - // `file_type()` is cached from `readdir`'s `d_type` - // and needs no syscall; only regular files require an - // additional `lstat` for size and the exec bit. - let ft = entry.file_type(); - let entry = if ft.is_dir() { - Entry::Directory { - depth, - name: entry_name(entry, use_case_hack), - } - } else if ft.is_file() { - let m = entry.metadata()?; - let executable; - #[cfg(unix)] - { - executable = m.permissions().mode() & 0o100 == 0o100; - } - #[cfg(not(unix))] - { - executable = false; - } - let size = m.len(); - // Load the content here, in the same blocking - // task that is already iterating the directory, - // so the async side receives ready-to-stream - // bytes without a second pool round-trip. - let data = load_file_bytes(entry.path(), size)?; - Entry::File { - depth, - name: entry_name(entry, use_case_hack), - size, - executable, - data, - } - } else if ft.is_symlink() { - let target = - os_string_into_bytes(read_link(entry.path())?.into_os_string()); - Entry::Symlink { - depth, - name: entry_name(entry, use_case_hack), - target, - } - } else { - return Err(io::Error::other(format!("unsupported file type {ft:?}"))); - }; - Ok(entry) - }); - buf.push_back(res); - } - None => return false, + // The restorer appends a case-hack suffix on case-insensitive + // filesystems, so re-sort by the cleaned name to match a NAR + // produced on a case-sensitive one. + unhacked.sort_by(|a, b| a.0.cmp(&b.0)); + if let Some(dup) = unhacked.windows(2).find(|w| w[0].0 == w[1].0) { + return Err(io::Error::other(format!( + "case-hack collision on entry name {:?}", + dup[0].0 + ))); + } + for (name, child) in unhacked { + w.write_all(TOK_ENTRY).await?; + write_str(w, name.as_bytes()).await?; + w.write_all(TOK_NODE).await?; + Box::pin(dump_node(&child, w)).await?; + w.write_all(TOK_PAR).await?; // close node + w.write_all(TOK_PAR).await?; // close entry } } - true } + Ok(()) } -pub struct NarDumper { - state: State, - next: Option, - level: u32, +/// Removes everything from the case-hack suffix onward, recovering the original +/// name the restorer disambiguated on a case-insensitive filesystem. +fn strip_case_hack(name: &str) -> &str { + match name.find(crate::archive::CASE_HACK_SUFFIX) { + Some(pos) => &name[..pos], + None => name, + } } -impl NarDumper { - pub fn new

(root: P) -> Self - where - P: Into, - { - DumpOptions::new().dump(root) +/// Write a single wire-encoded string (length prefix + data + padding). +/// Only used for variable-length data (entry names, symlink targets). +async fn write_str(w: &mut W, s: &[u8]) -> io::Result<()> { + w.write_all(&(s.len() as u64).to_le_bytes()).await?; + w.write_all(s).await?; + let padding = crate::wire::calc_padding(s.len() as u64); + if padding > 0 { + w.write_all(&crate::wire::ZEROS[..padding]).await?; } + Ok(()) } -impl Stream for NarDumper { - type Item = io::Result>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - // Close directories until the pending entry is at the current - // nesting level. walkdir's `depth()` gives this directly, so no - // path comparison is needed. - if let Some(entry) = self.next.as_ref() - && entry.depth() < self.level as usize - { - self.level -= 1; - return Poll::Ready(Some(Ok(NarEvent::EndDirectory))); - } - if let Some(entry) = self.next.take() { - let event = match entry { - Entry::Directory { name, .. } => { - self.level += 1; - NarEvent::StartDirectory { name } - } - Entry::File { - name, - size, - executable, - data, - .. - } => NarEvent::File { - name, - executable, - size, - reader: DumpedFile::new(data), - }, - Entry::Symlink { name, target, .. } => NarEvent::Symlink { name, target }, - }; - return Poll::Ready(Some(Ok(event))); - } - match ready!(self.state.poll_next(cx)) { - Some(Ok(entry)) => { - self.next = Some(entry); - } - Some(Err(err)) => return Poll::Ready(Some(Err(err))), - None => { - if self.level > 0 { - self.level -= 1; - return Poll::Ready(Some(Ok(NarEvent::EndDirectory))); - } - return Poll::Ready(None); - } - } - } - } +fn to_io(e: impl std::error::Error) -> io::Error { + io::Error::other(e.to_string()) } #[cfg(test)] -mod unittests { - use std::fs::create_dir_all; - - use futures_util::TryStreamExt as _; - use tempfile::Builder; +mod tests { + use futures_util::StreamExt as _; + use harmonia_file_io_pure::*; + use harmonia_utils_io::BytesReader; use super::*; + use crate::archive::parser::NarParser; use crate::archive::test_data; + use crate::archive::write_nar; + + /// Build a MemoryTree from test events via NarWriter + restore_to_sink, + /// then dump it with dump_source and parse back to events. + async fn dump_and_collect(events: &test_data::TestNarEvents) -> test_data::TestNarEvents { + // Events → NAR bytes + let nar_bytes = write_nar(events.iter()); + + // NAR bytes → MemoryTree + let mut builder = MemoryTreeBuilder::new(); + let reader = BytesReader::new(std::io::Cursor::new(nar_bytes)); + crate::archive::restorer::restore_to_sink(reader, builder.sink()) + .await + .unwrap(); + let tree = builder.build(); + + // MemoryTree → NAR bytes (via dump_source) + let src = MemoryTreeSource::new(&tree); + let mut nar2 = Vec::new(); + dump_source(&src, &mut nar2).await.unwrap(); + + // NAR bytes → events + let reader = BytesReader::new(std::io::Cursor::new(nar2)); + let mut parser = NarParser::new(reader); + let mut result = test_data::TestNarEvents::new(); + while let Some(event) = parser.next().await { + result.push(event.unwrap().read_file().await.unwrap()); + } + result + } #[tokio::test] async fn test_dump_dir() { - let dir = Builder::new().prefix("test_dump_dir").tempdir().unwrap(); - let path = dir.path().join("nar"); - test_data::create_dir_example(&path, true).unwrap(); - - let s = DumpOptions::new() - .use_case_hack(true) - .dump(path) - .and_then(|entry| entry.read_file()) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, test_data::dir_example()); + let result = dump_and_collect(&test_data::dir_example()).await; + assert_eq!(result, test_data::dir_example()); } #[tokio::test] async fn test_dump_text_file() { - let dir = Builder::new() - .prefix("test_dump_text_file") - .tempdir() - .unwrap(); - let path = dir.path().join("nar"); - test_data::create_dir_example(&path, true).unwrap(); - - let s = dump(path.join("testing.txt")) - .and_then(|entry| entry.read_file()) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, test_data::text_file()); + let result = dump_and_collect(&test_data::text_file()).await; + assert_eq!(result, test_data::text_file()); } #[tokio::test] async fn test_dump_exec_file() { - let dir = Builder::new() - .prefix("test_dump_exec_file") - .tempdir() - .unwrap(); - let path = dir.path().join("nar"); - test_data::create_dir_example(&path, true).unwrap(); - - let s = dump(path.join("dir/more/Deep")) - .and_then(|entry| entry.read_file()) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, test_data::exec_file()); + let result = dump_and_collect(&test_data::exec_file()).await; + assert_eq!(result, test_data::exec_file()); } #[tokio::test] async fn test_dump_empty_file() { - let dir = Builder::new() - .prefix("test_dump_empty_file") - .tempdir() - .unwrap(); - let path = dir.path().join("empty.keep"); - std::fs::write(&path, b"").unwrap(); - - let s = dump(path) - .and_then(|entry| entry.read_file()) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, test_data::empty_file()); + let result = dump_and_collect(&test_data::empty_file()).await; + assert_eq!(result, test_data::empty_file()); } #[tokio::test] async fn test_dump_symlink() { - let dir = Builder::new() - .prefix("test_dump_symlink") - .tempdir() - .unwrap(); - let deep = dir.path().join("deep"); - create_dir_all(&deep).unwrap(); - let path = deep.join("loop"); - std::os::unix::fs::symlink("../deep", &path).unwrap(); - - let s = dump(path) - .and_then(|entry| entry.read_file()) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, test_data::symlink()); + let result = dump_and_collect(&test_data::symlink()).await; + assert_eq!(result, test_data::symlink()); } } diff --git a/harmonia-file-nar/src/archive/mod.rs b/harmonia-file-nar/src/archive/mod.rs index 14ed44c1..30368448 100644 --- a/harmonia-file-nar/src/archive/mod.rs +++ b/harmonia-file-nar/src/archive/mod.rs @@ -2,7 +2,6 @@ use crate::ByteString; mod byte_stream; mod dumper; -pub(crate) mod mmap; mod parser; pub(crate) mod radix_tree; mod read_nar; @@ -11,12 +10,12 @@ pub mod test_data; mod writer; pub use byte_stream::NarByteStream; -pub use dumper::{DumpOptions, DumpedFile, NarDumper, dump}; +pub use dumper::dump_source; #[cfg(any(test, feature = "test"))] pub use parser::read_nar; pub use parser::{NarParser, parse_nar}; pub use read_nar::{NarBytesReader, NarReader}; -pub use restorer::{NarRestorer, NarWriteError, RestoreOptions, restore}; +pub use restorer::restore_to_sink; pub use writer::NarWriter; #[cfg(any(test, feature = "test"))] pub use writer::write_nar; diff --git a/harmonia-file-nar/src/archive/restorer.rs b/harmonia-file-nar/src/archive/restorer.rs index 62b526fd..5960ee10 100644 --- a/harmonia-file-nar/src/archive/restorer.rs +++ b/harmonia-file-nar/src/archive/restorer.rs @@ -1,333 +1,220 @@ -use std::collections::HashMap; -use std::fmt; -use std::io; -use std::path::{Path, PathBuf}; - -use bstr::ByteSlice as _; -use bytes::Bytes; -use derive_more::Display; -use futures_core::Stream; -use thiserror::Error; -use tokio::io::{AsyncBufRead, AsyncBufReadExt as _, AsyncWriteExt as _}; -use tracing::{debug, trace}; +//! NAR restore: parse NAR format and write to a [`FileSystemSink`]. -use super::{CASE_HACK_SUFFIX, NarEvent}; +use std::io; -#[derive(Display, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] -pub enum NarWriteOperation { - #[display("creating directory")] - CreateDirectory, - #[display("creating symlink")] - CreateSymlink, - #[display("creating file")] - CreateFile, - #[display("path contains invalid UTF-8")] - PathUTF8, -} +use futures_util::StreamExt as _; +use harmonia_file_io_pure::{DirectorySink, FileSystemSink}; +use harmonia_utils_io::AsyncBytesRead; +use tokio::io::AsyncWriteExt; -#[derive(Error, Debug)] -#[error("{operation} {path}: {source}")] -pub struct NarWriteError { - operation: NarWriteOperation, - path: PathBuf, - #[source] - source: io::Error, -} +use super::NarEvent; +use super::parser::NarParser; -impl NarWriteError { - pub fn new(operation: NarWriteOperation, path: PathBuf, source: io::Error) -> Self { - Self { - operation, - path, - source, - } - } - pub fn path_utf8_error(path: PathBuf, err: bstr::Utf8Error) -> Self { - Self::new( - NarWriteOperation::PathUTF8, - path, - io::Error::new(io::ErrorKind::InvalidData, err), - ) - } - pub fn create_dir_error(path: PathBuf, err: io::Error) -> Self { - Self::new(NarWriteOperation::CreateDirectory, path, err) - } - pub fn create_symlink_error(path: PathBuf, err: io::Error) -> Self { - Self::new(NarWriteOperation::CreateSymlink, path, err) - } - pub fn create_file_error(path: PathBuf, err: io::Error) -> Self { - Self::new(NarWriteOperation::CreateFile, path, err) +/// Restore a NAR archive from an async reader into a [`FileSystemSink`]. +/// +/// Uses the existing [`NarParser`] to parse the NAR format, then maps +/// the event stream to [`FileSystemSink`] calls. +pub async fn restore_to_sink(reader: R, sink: S) -> io::Result<()> +where + R: AsyncBytesRead + Unpin, + S: FileSystemSink, + S::Error: Send + Sync + 'static, +{ + let mut parser = NarParser::new(reader); + let first = parser + .next() + .await + .ok_or_else(|| io::Error::other("empty NAR"))??; + restore_sink_event(&mut parser, first, sink).await?; + if let Some(result) = parser.next().await { + result?; + return Err(io::Error::other("trailing data after NAR root")); } + Ok(()) } -pub struct NarRestorer { - path: PathBuf, - use_case_hack: bool, - entries: Entries, - dir_stack: Vec, -} - -impl NarRestorer { - pub fn new>(path: P) -> Self { - Self::new_restorer(path, false) - } - - pub fn with_case_hack>(path: P) -> Self { - Self::new_restorer(path, true) - } - - fn new_restorer

(path: P, use_case_hack: bool) -> Self - where - P: Into, - { - let path = path.into(); - Self { - path, - use_case_hack, - entries: Default::default(), - dir_stack: Default::default(), +/// Process one NarEvent and recursively handle directory children. +async fn restore_sink_event( + parser: &mut NarParser, + event: NarEvent, + sink: impl FileSystemSink, +) -> io::Result<()> +where + R: AsyncBytesRead + Unpin, + EV: tokio::io::AsyncRead + Unpin, +{ + match event { + NarEvent::File { + executable, + mut reader, + .. + } => { + let mut file_sink = sink + .create_regular_file(executable) + .await + .map_err(sink_to_io)?; + tokio::io::copy(&mut reader, &mut file_sink).await?; + file_sink.shutdown().await?; } - } - - /// Process a single NAR event, writing to the filesystem. - async fn process_event(&mut self, event: NarEvent) -> Result<(), NarWriteError> - where - R: AsyncBufRead + Unpin, - { - match event { - NarEvent::File { - name, - executable, - size: _, - mut reader, - } => { - let name = if self.use_case_hack { - self.entries.hack_name(name) - } else { - name - }; - - let path = join_name(&self.path, &name)?; - let mut options = tokio::fs::OpenOptions::new(); - options.write(true); - options.create_new(true); - #[cfg(unix)] - { - if executable { - options.mode(0o777); - } else { - options.mode(0o666); - } - } - let mut file = options - .open(&path) - .await - .map_err(|err| NarWriteError::create_file_error(path.clone(), err))?; - loop { - trace!("Writing to file {:?}", path); - let buf = reader - .fill_buf() - .await - .map_err(|err| NarWriteError::create_file_error(path.clone(), err))?; - if buf.is_empty() { - break; - } - let amt = buf.len(); - file.write_all(buf) - .await - .map_err(|err| NarWriteError::create_file_error(path.clone(), err))?; - reader.consume(amt); - } - file.flush() - .await - .map_err(|err| NarWriteError::create_file_error(path.clone(), err))?; - } - NarEvent::Symlink { name, target } => { - let name = if self.use_case_hack { - self.entries.hack_name(name) - } else { - name - }; - - let path = join_name(&self.path, &name)?; - let target_os = target - .to_os_str() - .map_err(|err| { - let lossy = target.to_os_str_lossy().into_owned(); - let path = PathBuf::from(lossy); - NarWriteError::path_utf8_error(path, err) - })? - .to_owned(); - #[cfg(unix)] - { - tokio::fs::symlink(target_os, &path) - .await - .map_err(|err| NarWriteError::create_symlink_error(path, err))?; - } - } - NarEvent::StartDirectory { name } => { - let name = if self.use_case_hack { - let name = self.entries.hack_name(name); - - #[allow(clippy::mutable_key_type)] - let entries = std::mem::take(&mut self.entries); - self.dir_stack.push(entries); - name - } else { - name - }; - - let path = join_name(&self.path, &name)?; - self.path = path; - let path = self.path.clone(); - tokio::fs::create_dir(&path) - .await - .map_err(|err| NarWriteError::create_dir_error(path, err))?; - } - NarEvent::EndDirectory => { - if self.use_case_hack { - self.entries = self.dir_stack.pop().unwrap_or_default(); + NarEvent::Symlink { target, .. } => { + let target = std::str::from_utf8(&target) + .map_err(|e| io::Error::other(format!("non-UTF-8 symlink target: {e}")))?; + sink.create_symlink(target).await.map_err(sink_to_io)?; + } + NarEvent::StartDirectory { .. } => { + let mut dir = sink.create_directory().await.map_err(sink_to_io)?; + while let Some(result) = parser.next().await { + let event = result?; + if matches!(event, NarEvent::EndDirectory) { + return Ok(()); } - self.path.pop(); + let name = event_name(&event)?; + let child_sink = dir.create_child(&name).await.map_err(sink_to_io)?; + Box::pin(restore_sink_event(parser, event, child_sink)).await?; } + return Err(io::Error::other("unexpected end of NAR stream")); } - Ok(()) - } - - /// Consume a stream of NAR events and restore them to the filesystem. - pub async fn restore(mut self, stream: S) -> Result<(), NarWriteError> - where - S: Stream, - U: Into, NarWriteError>>, - R: AsyncBufRead + Send + Unpin, - { - use futures_util::StreamExt as _; - futures_util::pin_mut!(stream); - while let Some(item) = stream.next().await { - let event = item.into()?; - self.process_event(event).await?; + NarEvent::EndDirectory => { + return Err(io::Error::other("unexpected EndDirectory")); } - Ok(()) } + Ok(()) } -fn join_name(path: &Path, name: &[u8]) -> Result { - if name.is_empty() { - Ok(path.to_owned()) - } else { - let name_os = name.to_os_str().map_err(|err| { - let lossy = name.to_os_str_lossy(); - let path = path.join(lossy); - NarWriteError::path_utf8_error(path, err) - })?; - Ok(path.join(name_os)) - } +fn event_name(event: &NarEvent) -> io::Result { + let name_bytes = match event { + NarEvent::File { name, .. } + | NarEvent::Symlink { name, .. } + | NarEvent::StartDirectory { name } => name, + NarEvent::EndDirectory => return Err(io::Error::other("EndDirectory has no name")), + }; + std::str::from_utf8(name_bytes) + .map(|s| s.to_owned()) + .map_err(|e| io::Error::other(format!("non-UTF-8 entry name: {e}"))) } -struct CIString(Bytes, String); - -impl PartialEq for CIString { - fn eq(&self, other: &Self) -> bool { - self.1.eq(&other.1) - } +fn sink_to_io(e: impl std::error::Error) -> io::Error { + io::Error::other(e.to_string()) } -impl fmt::Display for CIString { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let bstr = bstr::BStr::new(&self.0); - write!(f, "{bstr}") - } -} - -impl Eq for CIString {} - -impl std::hash::Hash for CIString { - fn hash(&self, state: &mut H) { - self.1.hash(state) - } -} - -#[derive(Default)] -struct Entries(HashMap); - -impl Entries { - fn hack_name(&mut self, name: Bytes) -> Bytes { - use std::collections::hash_map::Entry; - use std::io::Write; - - let lower = String::from_utf8_lossy(&name).to_lowercase(); - let ci_str = CIString(name.clone(), lower); - match self.0.entry(ci_str) { - Entry::Occupied(mut o) => { - let b_name = bstr::BStr::new(&name); - debug!("case collision between '{}' and '{}'", o.key(), b_name); - let idx = o.get() + 1; - let mut new_name = name.to_vec(); - write!(new_name, "{CASE_HACK_SUFFIX}{idx}").unwrap(); - o.insert(idx); - Bytes::from(new_name) - } - Entry::Vacant(v) => { - v.insert(0); - name +#[cfg(test)] +mod tests { + use harmonia_file_core::*; + use harmonia_file_io_pure::*; + use tokio::io::AsyncWriteExt; + + use super::restore_to_sink; + use crate::archive::dumper::dump_source; + + async fn sample_tree() -> MemoryTree { + let mut builder = MemoryTreeBuilder::new(); + { + let mut root = builder.sink().create_directory().await.unwrap(); + { + let mut sub = root + .create_child("dir") + .await + .unwrap() + .create_directory() + .await + .unwrap(); + sub.create_child("file") + .await + .unwrap() + .create_regular_file(true) + .await + .unwrap() + .write_all(b"hello") + .await + .unwrap(); + sub.create_child("link") + .await + .unwrap() + .create_symlink("/target") + .await + .unwrap(); } + root.create_child("readme") + .await + .unwrap() + .create_regular_file(false) + .await + .unwrap() + .write_all(b"world") + .await + .unwrap(); } - } -} - -pub struct RestoreOptions { - use_case_hack: bool, -} - -impl RestoreOptions { - pub fn new() -> Self { - #[cfg(target_os = "macos")] - let use_case_hack = true; - #[cfg(not(target_os = "macos"))] - let use_case_hack = false; - Self { use_case_hack } + builder.build() } - pub fn use_case_hack(mut self, use_case_hack: bool) -> Self { - self.use_case_hack = use_case_hack; - self - } - - pub async fn restore(self, stream: S, path: P) -> Result<(), NarWriteError> - where - S: Stream, - U: Into, NarWriteError>>, - P: Into, - R: AsyncBufRead + Send + Unpin, - { - let restorer = NarRestorer::new_restorer(path, self.use_case_hack); - restorer.restore(stream).await - } -} - -impl Default for RestoreOptions { - fn default() -> Self { - Self::new() + #[tokio::test] + async fn round_trip_dump_restore() { + let tree = sample_tree().await; + let src = MemoryTreeSource::new(&tree); + + // Dump to NAR bytes + let mut nar = Vec::new(); + dump_source(&src, &mut nar).await.unwrap(); + + // Restore from NAR bytes into a new MemoryTree + let mut builder = MemoryTreeBuilder::new(); + let reader = harmonia_utils_io::BytesReader::new(std::io::Cursor::new(nar)); + restore_to_sink(reader, builder.sink()).await.unwrap(); + let restored = builder.build(); + + // Compare via JSON serialization + let orig_json = serde_json::to_value(list_deep(&src).await.unwrap()).unwrap(); + let restored_src = MemoryTreeSource::new(&restored); + let restored_json = serde_json::to_value(list_deep(&restored_src).await.unwrap()).unwrap(); + assert_eq!(orig_json, restored_json); } } -pub async fn restore(stream: S, path: P) -> Result<(), NarWriteError> -where - S: Stream, - U: Into, NarWriteError>>, - P: Into, - R: AsyncBufRead + Send + Unpin, -{ - RestoreOptions::new().restore(stream, path).await -} - +/// Restore-then-dump round-trip tests using test_data fixtures. +/// +/// These correspond to the old `NarRestorer` unit tests: write test +/// events as NAR via `NarWriter`, restore into a `MemoryTree` via +/// `restore_to_sink`, dump back via `dump_source`, parse back into +/// events, and compare. #[cfg(test)] -mod unittests { - use super::*; - use crate::archive::{NarEvent, dump, test_data}; - use futures_util::stream::{StreamExt as _, TryStreamExt as _, iter}; +mod fixture_tests { + use futures_util::StreamExt as _; + use harmonia_file_io_pure::*; + use harmonia_utils_io::BytesReader; use rstest::rstest; - use tempfile::Builder; + + use super::restore_to_sink; + use crate::archive::dumper::dump_source; + use crate::archive::parser::NarParser; + use crate::archive::test_data; + use crate::archive::write_nar; + + /// Write test events as NAR bytes, restore into MemoryTree, dump + /// back to NAR, parse into events, compare. + async fn round_trip_via_memory(events: test_data::TestNarEvents) -> test_data::TestNarEvents { + // Events → NAR bytes (via NarWriter) + let nar_bytes = write_nar(events.iter()); + + // NAR bytes → MemoryTree (via restore_to_sink) + let mut builder = MemoryTreeBuilder::new(); + let reader = BytesReader::new(std::io::Cursor::new(nar_bytes)); + restore_to_sink(reader, builder.sink()).await.unwrap(); + let tree = builder.build(); + + // MemoryTree → NAR bytes (via dump_source) + let src = MemoryTreeSource::new(&tree); + let mut nar2 = Vec::new(); + dump_source(&src, &mut nar2).await.unwrap(); + + // NAR bytes → events (via NarParser) + let reader = BytesReader::new(std::io::Cursor::new(nar2)); + let mut parser = NarParser::new(reader); + let mut result = test_data::TestNarEvents::new(); + while let Some(event) = parser.next().await { + result.push(event.unwrap().read_file().await.unwrap()); + } + result + } #[tokio::test] #[rstest] @@ -341,32 +228,25 @@ mod unittests { #[case::dir_example(test_data::dir_example())] #[case::case_hack_sorting(test_data::case_hack_sorting())] async fn test_restore(#[case] events: test_data::TestNarEvents) { - let dir = Builder::new().prefix("test_restore").tempdir().unwrap(); - let path = dir.path().join("output"); - - let events_s = iter(events.clone().into_iter()) - .map(|e| Ok(e) as Result); - restore(events_s, &path).await.unwrap(); - - let s = dump(path) - .and_then(NarEvent::read_file) - .try_collect::() - .await - .unwrap(); - assert_eq!(s, events); + let result = round_trip_via_memory(events.clone()).await; + assert_eq!(result, events); } } #[cfg(test)] mod proptests { - use futures_util::stream::iter; - use futures_util::{StreamExt as _, TryStreamExt as _}; + use futures_util::StreamExt as _; + use harmonia_file_io_pure::*; + use harmonia_utils_io::BytesReader; + use proptest::prop_assert_eq; use proptest::proptest; - use tempfile::tempdir; - use crate::archive::{NarEvent, NarWriteError, dump, restore, test_data}; + use super::restore_to_sink; + use crate::archive::dumper::dump_source; + use crate::archive::parser::NarParser; + use crate::archive::test_data; + use crate::archive::write_nar; use crate::test::arbitrary::archive::arb_nar_events; - use proptest::prop_assert_eq; #[test] fn proptest_restore_dump() { @@ -376,20 +256,31 @@ mod proptests { .unwrap(); proptest!(|(events in arb_nar_events(8, 256, 10))| { r.block_on(async { - let dir = tempdir()?; - let path = dir.path().join("output"); - - let event_s = iter(events.clone().into_iter()) - .map(|e| Ok(e) as Result ); - restore(event_s, &path).await.unwrap(); + // Events → NAR bytes + let nar_bytes = write_nar(events.iter()); + + // NAR bytes → MemoryTree + let mut builder = MemoryTreeBuilder::new(); + let reader = BytesReader::new(std::io::Cursor::new(nar_bytes)); + restore_to_sink(reader, builder.sink()).await.unwrap(); + let tree = builder.build(); + + // MemoryTree → NAR bytes + let src = MemoryTreeSource::new(&tree); + let mut nar2 = Vec::new(); + dump_source(&src, &mut nar2).await.unwrap(); + + // NAR bytes → events + let reader = BytesReader::new(std::io::Cursor::new(nar2)); + let mut parser = NarParser::new(reader); + let mut result = test_data::TestNarEvents::new(); + while let Some(event) = parser.next().await { + result.push(event.unwrap().read_file().await.unwrap()); + } - let s = dump(path) - .and_then(NarEvent::read_file) - .try_collect::().await?; - prop_assert_eq!(&s, &events); + prop_assert_eq!(&result, &events); Ok(()) })?; - }); } } diff --git a/harmonia-file-nar/src/lib.rs b/harmonia-file-nar/src/lib.rs index b2091636..20078ac9 100644 --- a/harmonia-file-nar/src/lib.rs +++ b/harmonia-file-nar/src/lib.rs @@ -5,24 +5,42 @@ // This crate is derived from Nix.rs (https://github.com/griff/Nix.rs) // Upstream commit: f5d129b71bb30b476ce21e6da2a53dcb28607a89 -//! NAR (Nix ARchive) format handling. +//! NAR (Nix ARchive) format handling through [`harmonia-file-core`] traits. //! -//! This crate provides functionality for packing and unpacking NAR archives, -//! the archive format used by Nix for representing store paths as byte streams. +//! # Dump and restore //! -//! # Key Features +//! [`dump_source`] writes a NAR archive from any [`FileSystemSource`] to +//! an [`AsyncWrite`](tokio::io::AsyncWrite). [`restore_to_sink`] parses +//! a NAR archive from any [`AsyncBytesRead`](harmonia_utils_io::AsyncBytesRead) +//! and writes to any [`FileSystemSink`]. //! -//! - Streaming NAR pack/unpack (bounded memory usage) -//! - Async/await support via tokio -//! - Works with any `AsyncRead`/`AsyncWrite` source/sink -//! - NAR listing via [`parse_nar_listing`] producing [`FileTree`] +//! ```rust,ignore +//! // Dump a DirSource to NAR bytes +//! dump_source(&dir_source, &mut writer).await?; //! -//! # Design Principles +//! // Restore NAR bytes into a MemoryTree +//! restore_to_sink(reader, builder.sink()).await?; +//! ``` +//! +//! # Listing +//! +//! [`parse_nar_listing`] produces a [`FileTree`] from a +//! NAR stream — the same JSON format as `nix nar ls --json --recursive`. +//! +//! # Streaming +//! +//! [`NarByteStream`] produces a `Stream` of NAR-encoded +//! data for a filesystem path, suitable for HTTP streaming. +//! +//! # Design principles //! //! 1. **Streaming**: Never require entire NAR in memory -//! 2. **IO-agnostic**: Work with trait objects (AsyncRead/AsyncWrite) +//! 2. **Trait-based**: Dump/restore go through [`FileSystemSource`]/[`FileSystemSink`] //! 3. **Format-focused**: Only concerned with archive structure //! 4. **Composable**: Can be used independently of daemon +//! +//! [`FileSystemSource`]: harmonia_file_io_pure::FileSystemSource +//! [`FileSystemSink`]: harmonia_file_io_pure::FileSystemSink /// Byte string type alias. pub type ByteString = bytes::Bytes; @@ -37,8 +55,8 @@ pub mod archive; // Re-export commonly used types from archive pub use archive::{ - CASE_HACK_SUFFIX, DumpOptions, DumpedFile, NarByteStream, NarDumper, NarEvent, NarParser, - NarReader, NarRestorer, NarWriteError, NarWriter, RestoreOptions, dump, parse_nar, restore, + CASE_HACK_SUFFIX, NarByteStream, NarEvent, NarParser, NarReader, NarWriter, dump_source, + parse_nar, restore_to_sink, }; pub use listing::{NarFileInfo, parse_nar_listing}; diff --git a/harmonia-file-nar/src/listing.rs b/harmonia-file-nar/src/listing.rs index f7524f63..cc8fd58e 100644 --- a/harmonia-file-nar/src/listing.rs +++ b/harmonia-file-nar/src/listing.rs @@ -10,6 +10,9 @@ use harmonia_utils_io::AsyncBytesRead; use crate::archive::{NarEvent, NarParser}; +/// One directory frame on the build stack, holding its name and the children gathered so far. +type DirFrame = (Option, BTreeMap>>); + /// Metadata for a file entry within a NAR archive. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct NarFileInfo { @@ -28,8 +31,8 @@ where let mut parser = NarParser::new(reader); // Stack of (name, entries) for directories being built. - let mut stack: Vec = Vec::new(); - let mut result: Option> = None; + let mut stack = Vec::::new(); + let mut result = Option::>::None; while let Some(event) = parser.next().await { let event = event?; @@ -79,11 +82,8 @@ where result.ok_or_else(|| std::io::Error::other("empty NAR")) } -type DirFrame = (Option, BTreeMap>>); - -#[allow(clippy::ptr_arg)] fn insert_node( - stack: &mut Vec, + stack: &mut [DirFrame], result: &mut Option>, name: String, node: FileTree, @@ -126,8 +126,11 @@ mod tests { // NarByteStream produces raw NAR bytes let byte_stream = crate::archive::NarByteStream::new(dir.to_owned()); - let chunks: Vec = byte_stream.try_collect().await.unwrap(); - let nar_bytes: Vec = chunks.into_iter().flatten().collect(); + let chunks = byte_stream + .try_collect::>() + .await + .unwrap(); + let nar_bytes = chunks.into_iter().flatten().collect::>(); // Parse the listing let reader = BytesReader::new(std::io::Cursor::new(nar_bytes));