diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01e50777..e1b624d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -244,3 +244,27 @@ jobs: unset A3S_DEPS_STUB chmod +x bench/bench.sh bench/bench.sh race + # SDK programmable-CI pipeline against REAL microVMs: warm + CoW fork-per-step, + # parallel collect-all, `::metric` extraction, fork isolation, leak-freeness, + # and crash-orphan recovery (sweep_orphans). The unit suite only drives a fake + # CLI, so this is the only gate that the real fork/snapshot path actually works. + - name: SDK pipeline integration — real fork-per-step + sweep + env: + A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box + A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }} + run: | + unset A3S_DEPS_STUB + cd src + cargo test --release -p a3s-box-sdk --test integration_kvm -- --ignored --nocapture --test-threads=1 + # Soak the fork-eval loop: sustained churn must stay leak-free (no orphan + # boxes/snapshots, snapshot count flat) and memory-stable. Light here (120 + # fork-evals); crank A3S_SDK_SOAK_FORKS for a manual long soak. + - name: SDK pipeline soak — leak-free under churn + env: + A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box + A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }} + A3S_SDK_SOAK_FORKS: "120" + run: | + unset A3S_DEPS_STUB + cd src + cargo test --release -p a3s-box-sdk --test soak_kvm -- --ignored --nocapture --test-threads=1 diff --git a/CHANGELOG.md b/CHANGELOG.md index e9aa4986..bdd333be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,58 @@ All notable changes to A3S Box will be documented in this file. ## [Unreleased] +### Added + +- **Programmable-CI pipeline: parallel fan-out + typed JSON report (`a3s-box-sdk`).** + `Base::run_parallel(steps, max_concurrency)` runs steps concurrently as isolated + copy-on-write MicroVM forks (bounded, collect-all, results in input order) and returns a + `Report` with a dependency-free `to_json()`. `StepResult` now carries separated + `stdout`/`stderr`, `duration_ms`, and `metrics` parsed from `::metric =` + guest-stdout lines (a machine-readable scoring channel for matrix/selection workloads). + Steps run via `&self` (atomic fork counter), so fan-out no longer needs hand-rolled + threads. The base auto-removes its snapshot on `Drop` (`--force`), and each fork is + removed on every path (including panic). Box/snapshot names now carry per-process + + per-instance entropy, so concurrent pipelines from the same image+setup can no longer + collide and tear down each other's boxes. A fork that hits a *transient* + infrastructure failure (restore/start/boot) is retried — `WarmBase::infra_retries`, + default 2 — since its command never ran, which keeps sustained high-concurrency + churn green. Validated end-to-end on a real `/dev/kvm` host. +- **Crash-orphan recovery + real-VM integration & soak tests.** `sweep_orphans()` + reclaims `ci-base-*` boxes/snapshots left behind when a pipeline process is + `SIGKILL`ed / OOM-killed (its RAII cleanup never runs), by matching the dead owner + pid embedded in the resource name — and it never touches a live peer's resources. + Added `#[ignore]`'d real-microVM integration tests (`tests/integration_kvm.rs`: + warm + fork-per-step, cache, parallel order/metrics, fork isolation, leak-freeness, + sweep) and a soak test (`tests/soak_kvm.rs`: sustained fork-eval churn stays + leak-free and RSS-stable), both wired into the KVM CI gate. +- **`a3s-box-ci` runner + `warm_base` retry.** A dependency-free `a3s-box-ci` binary + (shipped by the `a3s-box-sdk` crate) bridges any agent/tool to the pipeline: a + line-based spec on stdin → a JSON `Report` on stdout (`a3s-box-ci run -`), plus + `a3s-box-ci sweep` for crash-orphan recovery. `warm_base` now also retries on a + transient infrastructure failure (sharing the step-fork's `retry_infra` budget), + so concurrent same-image warms are more robust under load. + +### Changed + +- **`StepResult.logs` is replaced by separated `stdout` / `stderr` fields** + (use `StepResult::combined()` for the old concatenated view). Breaking for + direct `.logs` field access on the `a3s-box-sdk` pipeline API. + +### Fixed + +- **Concurrent same-image pipelines could corrupt each other's rootfs cache.** + `RootfsCache::prune` (run after a cache-miss `put`) evicted least-recently-used + entries with no in-use guard, so it could `remove_dir_all` a cache entry that + another box was simultaneously using as its overlayfs **lowerdir** — the peer's + `mount(2)` then failed with `No such file or directory (os error 2)`, and the + failure persisted through retries (the backing was gone). Added the same in-use + guard `SnapshotStore::prune` already applies to live copy-on-write lowers: each + overlay box records the cache key it holds in a `/.rootfs-cache-key` + marker (removed with the box dir), and `prune` skips any still-referenced key. + Found via a concurrent-pipeline chaos test driven through a3s-code; root-caused + and verified on a real `/dev/kvm` host (the concurrency scenario went from ~50% + failure to reliably green). + ## [2.6.0] — 2026-06-26 ### Added diff --git a/src/runtime/src/cache/rootfs_cache.rs b/src/runtime/src/cache/rootfs_cache.rs index 54ae5ec0..e01dc6d6 100644 --- a/src/runtime/src/cache/rootfs_cache.rs +++ b/src/runtime/src/cache/rootfs_cache.rs @@ -190,11 +190,27 @@ impl RootfsCache { Ok(()) } - /// Prune the cache to stay within the given entry count limit. + /// Prune the cache to stay within the given entry count / byte limit. /// - /// Evicts least-recently-accessed entries first. - /// Returns the number of entries evicted. + /// Evicts least-recently-accessed entries first. Returns the number evicted. pub fn prune(&self, max_entries: usize, max_bytes: u64) -> Result { + self.prune_protecting(max_entries, max_bytes, &std::collections::HashSet::new()) + } + + /// Like [`RootfsCache::prune`], but never evicts an entry whose key is in + /// `protected`. Such an entry is currently serving as a box's overlayfs + /// **lowerdir**, and `remove_dir_all`-ing it out from under a concurrent box's + /// `mount(2)` makes the mount fail with ENOENT ("No such file or directory"). + /// This is the same in-use guard [`crate::SnapshotStore::prune`] applies to + /// live copy-on-write lowers — without it, two pipelines built from the same + /// image (one cache-hit overlay box, one cache-miss box that prunes after its + /// put) can race and corrupt each other. + pub fn prune_protecting( + &self, + max_entries: usize, + max_bytes: u64, + protected: &std::collections::HashSet, + ) -> Result { let mut entries = self.list_entries()?; if entries.len() <= max_entries { @@ -215,6 +231,11 @@ impl RootfsCache { if current_count <= max_entries && current_size <= max_bytes { break; } + // Never evict an entry in use as a live overlay lower — deleting the + // lowerdir under a concurrent box's mount(2) is the bug this guards. + if protected.contains(&entry.key) { + continue; + } self.invalidate(&entry.key)?; current_count -= 1; current_size = current_size.saturating_sub(entry.size_bytes); @@ -495,6 +516,52 @@ mod tests { assert_eq!(cache.entry_count().unwrap(), 1); } + #[test] + fn prune_protecting_never_evicts_in_use_key() { + let tmp = TempDir::new().unwrap(); + let cache = RootfsCache::new(tmp.path()).unwrap(); + for i in 0..4 { + let src = tmp.path().join(format!("s{i}")); + create_test_rootfs(&src, &[("f", "x")]); + cache.put(&format!("k{i}"), &src, &format!("e{i}")).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + // k0 is the OLDEST (normally evicted first) but is in use as an overlay lower. + let mut protected = std::collections::HashSet::new(); + protected.insert("k0".to_string()); + // keep=2 over 4 entries evicts two; the protected k0 is never one of them. + // (last_accessed is second-resolution, so WHICH two unprotected entries go + // is not asserted — only that the in-use lower survives.) + let evicted = cache.prune_protecting(2, u64::MAX, &protected).unwrap(); + assert_eq!(evicted, 2, "two unprotected entries evicted to meet keep=2"); + assert!( + cache.get("k0").unwrap().is_some(), + "the in-use (protected) lower must survive prune" + ); + assert_eq!( + cache.entry_count().unwrap(), + 2, + "k0 + one unprotected remain" + ); + } + + #[test] + fn prune_protecting_keeps_all_when_all_in_use() { + let tmp = TempDir::new().unwrap(); + let cache = RootfsCache::new(tmp.path()).unwrap(); + for i in 0..2 { + let src = tmp.path().join(format!("s{i}")); + create_test_rootfs(&src, &[("f", "x")]); + cache.put(&format!("k{i}"), &src, "e").unwrap(); + } + let protected: std::collections::HashSet = + ["k0", "k1"].iter().map(|s| s.to_string()).collect(); + // Even asked to keep 0, nothing is evicted — every entry is a live lower. + let evicted = cache.prune_protecting(0, 0, &protected).unwrap(); + assert_eq!(evicted, 0, "all in-use -> nothing evicted"); + assert_eq!(cache.entry_count().unwrap(), 2); + } + #[test] fn test_rootfs_cache_metadata_persists() { let tmp = TempDir::new().unwrap(); diff --git a/src/runtime/src/vm/layout.rs b/src/runtime/src/vm/layout.rs index 225e4a90..4f2ede20 100644 --- a/src/runtime/src/vm/layout.rs +++ b/src/runtime/src/vm/layout.rs @@ -156,6 +156,9 @@ impl VmManager { let cache_key = RootfsCache::compute_key(reference, &[], &[], &[]); if let Some(cached_path) = self.try_rootfs_cache_path(&cache_key)? { let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?; + // Record that this box holds `cache_key` as its overlay lower, so a + // concurrent box's cache prune won't evict it mid-mount (ENOENT). + self.mark_rootfs_cache_key(&box_dir, &cache_key); let tee_instance_config = self.generate_tee_config(&box_dir)?; return Ok(BoxLayout { rootfs_path, @@ -204,6 +207,9 @@ impl VmManager { prom.rootfs_cache_hits.inc(); } let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?; + // Record that this box holds `cache_key` as its overlay lower, so a + // concurrent box's cache prune won't evict it mid-mount (ENOENT). + self.mark_rootfs_cache_key(&box_dir, &cache_key); if let Ok(guest_init_path) = Self::find_guest_init() { tracing::info!( @@ -377,10 +383,14 @@ impl VmManager { description = %description, "Stored rootfs in cache" ); - // Prune if needed - if let Err(e) = cache.prune( + // Prune if needed — but never evict a cache entry that is in use as + // a live overlay lower for a concurrent box (deleting the lowerdir + // under its mount(2) is the same-image concurrency bug this guards). + let protected = self.referenced_rootfs_cache_keys(); + if let Err(e) = cache.prune_protecting( self.config.cache.max_rootfs_entries, self.config.cache.max_cache_bytes, + &protected, ) { tracing::warn!(error = %e, "Failed to prune rootfs cache"); } @@ -391,6 +401,29 @@ impl VmManager { } } + /// Record which rootfs-cache key this box holds as its overlay lower, in a + /// `/.rootfs-cache-key` marker (mirror of the snapshot store's + /// `.snapshot-lower`). Read back by [`Self::referenced_rootfs_cache_keys`] so + /// the cache prune never evicts a live lower. Best-effort; removed with box_dir. + fn mark_rootfs_cache_key(&self, box_dir: &Path, cache_key: &str) { + let _ = std::fs::write(box_dir.join(".rootfs-cache-key"), cache_key); + } + + /// Rootfs-cache keys currently in use as an overlay lower by some live box. + /// Boxes live under `/boxes//`; a removed box's marker is gone with + /// its dir, so an evictable key is simply one no live box references. + fn referenced_rootfs_cache_keys(&self) -> std::collections::HashSet { + let mut set = std::collections::HashSet::new(); + if let Ok(entries) = std::fs::read_dir(self.home_dir.join("boxes")) { + for entry in entries.flatten() { + if let Ok(k) = std::fs::read_to_string(entry.path().join(".rootfs-cache-key")) { + set.insert(k.trim().to_string()); + } + } + } + set + } + /// Resolve the cache directory from config or default. pub(crate) fn resolve_cache_dir(&self) -> PathBuf { self.config diff --git a/src/sdk/Cargo.toml b/src/sdk/Cargo.toml index 8552bcde..a9d21a47 100644 --- a/src/sdk/Cargo.toml +++ b/src/sdk/Cargo.toml @@ -9,3 +9,9 @@ description = "Rust SDK for a3s-box. Includes a programmable CI/CD pipeline API [dependencies] # none — a thin, dependency-free wrapper over the `a3s-box` CLI. + +# Thin runner that bridges any agent (a3s-code / Claude Code / Codex) to the +# pipeline API: a line-based spec on stdin -> JSON Report on stdout. +[[bin]] +name = "a3s-box-ci" +path = "src/bin/a3s-box-ci.rs" diff --git a/src/sdk/README.md b/src/sdk/README.md index 1200906e..7a8c5480 100644 --- a/src/sdk/README.md +++ b/src/sdk/README.md @@ -11,28 +11,48 @@ state). Set `A3S_BOX` if `a3s-box` is not on `PATH`. ## Pipelines -Warm a base box **once** (clone + install deps), snapshot it, fork per step: +Warm a base box **once** (clone + install deps), snapshot it, fork per step. +Run steps **sequentially** (fail-fast) or **in parallel** (collect-all → a typed +report): ```rust use a3s_box_sdk::pipeline::{warm_base, WarmBase, FileCache, Step}; fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { let cache = FileCache::new(".ci-cache")?; // skip a step when its inputs are unchanged - let mut base = warm_base( + let base = warm_base( WarmBase::new("node:20", "git clone $REPO /w && cd /w && npm ci") // runs ONCE .env("REPO", "https://github.com/me/app") .cache(&cache), )?; + + // Sequential, fail-fast: a non-zero exit returns Err. base.step(Step::new("lint", "cd /w && npm run lint"))?; - base.step(Step::new("test", "cd /w && npm test"))?; // nonzero exit -> Err (fail-fast) - base.step(Step::new("build", "cd /w && npm run build"))?; - base.dispose(); // drops the snapshot - Ok(()) + + // Parallel, collect-all: each step is an isolated CoW fork; <=4 at a time. + let report = base.run_parallel(vec![ + Step::new("test", "cd /w && npm test"), + Step::new("build", "cd /w && npm run build"), + ], 4); + + println!("{}", report.to_json()); // {"passed":..,"total_ms":..,"steps":[..]} + if !report.passed { /* inspect report.failures() */ } + Ok(()) // `base` drops here -> snapshot auto-removed (or call base.dispose()) } ``` -`Step::allow_failure()` keeps the pipeline going on a non-zero exit; `Step::input(..)` -adds extra cache-key parts. Parallel steps = spawn threads (each `step` is blocking). +- `run_parallel` is the way to use a3s-box's cheap (~ms) CoW fork at scale — a + matrix / evolution-style batch — without hand-rolling threads (every method + takes `&self`). +- A step reports a metric by printing `::metric =` to stdout; it + surfaces as `StepResult::metrics` (the scoring channel for a selection loop). +- `StepResult` carries separated `stdout`/`stderr`, `duration_ms`, and `cached`; + `Report::to_json()` is the machine-readable handoff to an agent/scorer. +- `Step::allow_failure()` keeps a non-zero step from failing the run; `Step::input(..)` + adds extra cache-key parts. + +The base **auto-disposes** its snapshot on drop, and each per-step box is removed +on every path (including a panic), so a long-running batch doesn't leak. ## Why forking is cheap diff --git a/src/sdk/examples/pipeline.rs b/src/sdk/examples/pipeline.rs index b26a5aa7..a79e4432 100644 --- a/src/sdk/examples/pipeline.rs +++ b/src/sdk/examples/pipeline.rs @@ -12,25 +12,36 @@ fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { let cache = FileCache::new("/tmp/.a3s-ci-demo")?; // Warm the base once (here: write a marker = "deps installed"), snapshot it. - let mut base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?; + let base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?; + // Sequential, fail-fast: a non-zero exit returns Err. let r = base.step(Step::new("read", "cat /warmed"))?; - println!( - "read -> code={} out={:?} cached={}", - r.exit_code, - r.logs.trim(), - r.cached - ); - + println!("read -> code={} out={:?}", r.exit_code, r.stdout.trim()); let r2 = base.step(Step::new("read", "cat /warmed"))?; // identical -> cache hit println!("read#2 -> cached={}", r2.cached); - match base.step(Step::new("fail", "exit 7")) { - Err(e) => println!("fail-fast ok: {e}"), - Ok(_) => println!("ERROR: fail step did not error"), - } + // Parallel matrix, collect-all: each step is an isolated CoW fork of the base. + // A step reports a metric by printing `::metric key=value`. + let report = base.run_parallel( + vec![ + Step::new("ok", "echo fine"), + Step::new("perf", "echo '::metric duration_ms=12.5'"), + Step::new("fail", "exit 7"), // collected, not fatal + ], + 4, + ); + println!("report -> {}", report.to_json()); + println!( + "passed={} failures={:?}", + report.passed, + report + .failures() + .iter() + .map(|s| &s.name) + .collect::>() + ); - base.dispose(); + // base.dispose() is optional — the snapshot is removed when `base` drops. println!("demo ok"); Ok(()) } diff --git a/src/sdk/src/bin/a3s-box-ci.rs b/src/sdk/src/bin/a3s-box-ci.rs new file mode 100644 index 00000000..459a03d6 --- /dev/null +++ b/src/sdk/src/bin/a3s-box-ci.rs @@ -0,0 +1,252 @@ +//! `a3s-box-ci` — thin runner bridging any agent (a3s-code / Claude Code / Codex) +//! to the programmable-CI pipeline: a line-based spec in, a JSON `Report` out. +//! +//! ```text +//! a3s-box-ci run [SPEC|-] # run a pipeline; prints Report JSON; exit 0 iff passed +//! a3s-box-ci sweep # reclaim crashed-pipeline orphans; prints {"removed":[...]} +//! ``` +//! +//! Spec format (line-based; `#` comments; reads stdin when SPEC is `-` or omitted): +//! ```text +//! image # required +//! setup # optional (default: true) +//! env # repeatable +//! concurrency # default 4 +//! retries # infra retries (default 2) +//! max-output # cap per-step stdout/stderr +//! cache # content-addressed step cache +//! step :: # repeatable; >= 1 required +//! ``` +//! Output JSON is `Report::to_json()`; the process exits non-zero when the report +//! did not pass, so a shell/agent caller can branch on it. + +use a3s_box_sdk::pipeline::{sweep_orphans, warm_base, FileCache, Step, WarmBase}; +use std::io::Read; + +#[derive(Debug)] +struct Spec { + image: String, + setup: String, + env: Vec<(String, String)>, + steps: Vec<(String, String)>, + concurrency: usize, + retries: Option, + max_output: Option, + cache: Option, +} + +fn parse_spec(text: &str) -> Result { + let mut s = Spec { + image: String::new(), + setup: "true".into(), + env: Vec::new(), + steps: Vec::new(), + concurrency: 4, + retries: None, + max_output: None, + cache: None, + }; + for (n, raw) in text.lines().enumerate() { + let line = raw.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + let (key, rest) = match line.split_once(char::is_whitespace) { + Some((k, r)) => (k, r.trim()), + None => (line, ""), + }; + let bad = |what: &str| format!("line {}: {what}", n + 1); + match key { + "image" => s.image = rest.to_string(), + "setup" => s.setup = rest.to_string(), + "cache" => s.cache = Some(rest.to_string()), + "concurrency" => s.concurrency = rest.parse().map_err(|_| bad("bad concurrency"))?, + "retries" => s.retries = Some(rest.parse().map_err(|_| bad("bad retries"))?), + "max-output" => s.max_output = Some(rest.parse().map_err(|_| bad("bad max-output"))?), + "env" => { + let (k, v) = rest + .split_once('=') + .ok_or_else(|| bad("env needs KEY=VALUE"))?; + s.env.push((k.trim().to_string(), v.to_string())); + } + "step" => { + let (name, cmd) = rest + .split_once("::") + .ok_or_else(|| bad("step needs `name :: cmd`"))?; + s.steps + .push((name.trim().to_string(), cmd.trim().to_string())); + } + other => return Err(bad(&format!("unknown key `{other}`"))), + } + } + if s.image.is_empty() { + return Err("spec missing required `image`".into()); + } + if s.steps.is_empty() { + return Err("spec has no `step`".into()); + } + Ok(s) +} + +fn read_input(arg: Option<&str>) -> std::io::Result { + match arg { + None | Some("-") => { + let mut buf = String::new(); + std::io::stdin().read_to_string(&mut buf)?; + Ok(buf) + } + Some(path) => std::fs::read_to_string(path), + } +} + +fn run(spec_arg: Option<&str>) -> i32 { + let text = match read_input(spec_arg) { + Ok(t) => t, + Err(e) => { + eprintln!("a3s-box-ci: read spec: {e}"); + return 2; + } + }; + let spec = match parse_spec(&text) { + Ok(s) => s, + Err(e) => { + eprintln!("a3s-box-ci: spec error: {e}"); + return 2; + } + }; + let cache = match &spec.cache { + Some(dir) => match FileCache::new(dir) { + Ok(c) => Some(c), + Err(e) => { + eprintln!("a3s-box-ci: cache dir: {e}"); + return 2; + } + }, + None => None, + }; + + let mut wb = WarmBase::new(spec.image.as_str(), spec.setup.as_str()); + for (k, v) in &spec.env { + wb = wb.env(k.as_str(), v.as_str()); + } + if let Some(c) = &cache { + wb = wb.cache(c); + } + if let Some(r) = spec.retries { + wb = wb.infra_retries(r); + } + if let Some(m) = spec.max_output { + wb = wb.max_output(m); + } + + let base = match warm_base(wb) { + Ok(b) => b, + Err(e) => { + eprintln!("a3s-box-ci: warm_base: {e}"); + return 3; + } + }; + let steps: Vec = spec + .steps + .iter() + .map(|(name, cmd)| Step::new(name.as_str(), cmd.as_str())) + .collect(); + let report = base.run_parallel(steps, spec.concurrency); + println!("{}", report.to_json()); + i32::from(!report.passed) // 0 iff passed +} + +fn sweep() -> i32 { + let removed = sweep_orphans(); + let items: Vec = removed + .iter() + .map(|n| format!("\"{}\"", n.replace('\\', "\\\\").replace('"', "\\\""))) + .collect(); + println!("{{\"removed\":[{}]}}", items.join(",")); + 0 +} + +fn main() { + let args: Vec = std::env::args().collect(); + let code = match args.get(1).map(String::as_str) { + Some("run") => run(args.get(2).map(String::as_str)), + Some("sweep") => sweep(), + _ => { + eprintln!("usage: a3s-box-ci "); + 2 + } + }; + std::process::exit(code); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_a_full_spec() { + let s = parse_spec( + "# a pipeline\n\ + image node:20\n\ + setup git clone $REPO /w && cd /w && npm ci\n\ + env REPO=https://x/y\n\ + env TOKEN=a=b\n\ + concurrency 6\n\ + retries 1\n\ + max-output 4096\n\ + cache .ci\n\ + step lint :: cd /w && npm run lint\n\ + step test :: cd /w && npm test\n", + ) + .unwrap(); + assert_eq!(s.image, "node:20"); + assert_eq!(s.setup, "git clone $REPO /w && cd /w && npm ci"); + assert_eq!( + s.env, + vec![ + ("REPO".to_string(), "https://x/y".to_string()), + ("TOKEN".to_string(), "a=b".to_string()), // only the FIRST '=' splits + ] + ); + assert_eq!(s.concurrency, 6); + assert_eq!(s.retries, Some(1)); + assert_eq!(s.max_output, Some(4096)); + assert_eq!(s.cache.as_deref(), Some(".ci")); + assert_eq!( + s.steps, + vec![ + ("lint".to_string(), "cd /w && npm run lint".to_string()), + ("test".to_string(), "cd /w && npm test".to_string()), + ] + ); + } + + #[test] + fn defaults_when_omitted() { + let s = parse_spec("image alpine\nstep go :: true\n").unwrap(); + assert_eq!(s.setup, "true"); + assert_eq!(s.concurrency, 4); + assert!(s.retries.is_none() && s.max_output.is_none() && s.cache.is_none()); + assert!(s.env.is_empty()); + } + + #[test] + fn rejects_bad_specs() { + assert!(parse_spec("step go :: true").unwrap_err().contains("image")); + assert!(parse_spec("image alpine") + .unwrap_err() + .contains("no `step`")); + assert!(parse_spec("image alpine\nstep oops") + .unwrap_err() + .contains("name :: cmd")); + assert!(parse_spec("image alpine\nenv NOEQ\nstep g :: true") + .unwrap_err() + .contains("KEY=VALUE")); + assert!(parse_spec("image alpine\nbogus x\nstep g :: true") + .unwrap_err() + .contains("unknown key")); + assert!(parse_spec("image alpine\nconcurrency NaN\nstep g :: true") + .unwrap_err() + .contains("concurrency")); + } +} diff --git a/src/sdk/src/pipeline.rs b/src/sdk/src/pipeline.rs index 166c3d33..db977701 100644 --- a/src/sdk/src/pipeline.rs +++ b/src/sdk/src/pipeline.rs @@ -10,8 +10,14 @@ //! Model: warm a base box once (clone + install deps), `snapshot` it, then fork //! the snapshot per step. With a3s-box's copy-on-write restore (overlay lower) //! each fork shares the snapshot's pristine rootfs read-only and writes to its -//! own upper — near-instant, space-cheap, isolated. The DAG is your code: -//! sequence with plain calls, fan out with threads. No YAML, no engine. +//! own upper — near-instant, space-cheap, isolated. Steps are *independent* +//! forks of the same base, so they run sequentially with [`Base::step`] +//! (fail-fast) or concurrently with [`Base::run_parallel`] (collect-all), which +//! returns a typed, JSON-serializable [`Report`]. The base auto-disposes its +//! snapshot on drop; per-step boxes are removed on every path (incl. panic). +//! +//! A step reports metrics by printing `::metric =` lines to stdout; +//! they surface as [`StepResult::metrics`] for scoring (e.g. an evolution loop). //! //! Set `A3S_BOX` to the CLI path if `a3s-box` is not on `PATH`. //! @@ -20,22 +26,63 @@ //! //! # fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> { //! let cache = FileCache::new(".ci-cache")?; -//! let mut base = warm_base( +//! let base = warm_base( //! WarmBase::new("node:20", "git clone $REPO /w && cd /w && npm ci") //! .env("REPO", "https://github.com/me/app") //! .cache(&cache), //! )?; -//! base.step(Step::new("lint", "cd /w && npm run lint"))?; -//! base.step(Step::new("test", "cd /w && npm test"))?; // nonzero exit -> Err (fail-fast) -//! base.step(Step::new("build", "cd /w && npm run build"))?; -//! base.dispose(); -//! # Ok(()) } +//! // Fan out in parallel (each step is an isolated CoW fork of the base), +//! // bounded to 4 at a time; collect a typed report. +//! let report = base.run_parallel( +//! vec![ +//! Step::new("lint", "cd /w && npm run lint"), +//! Step::new("test", "cd /w && npm test"), +//! Step::new("build", "cd /w && npm run build"), +//! ], +//! 4, +//! ); +//! println!("{}", report.to_json()); // {"passed":true,"total_ms":...,"steps":[...]} +//! assert!(report.passed); +//! # Ok(()) } // base drops here -> snapshot auto-removed //! ``` -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::fmt; use std::path::PathBuf; use std::process::{Command, Output}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +/// Default per-step cap on captured stdout/stderr — bounds in-memory [`Report`] +/// size when fanning hundreds of forks out. Override via [`WarmBase::max_output`]. +const DEFAULT_MAX_OUTPUT: usize = 1 << 20; // 1 MiB + +/// Default number of times to retry a fork that hits an *infrastructure* failure +/// (restore/start/boot/exec-spawn). The step's command never ran, so re-forking is +/// safe and idempotent; this absorbs the transient boot hiccups that surface under +/// sustained, highly-concurrent churn. Override via [`WarmBase::infra_retries`]. +const DEFAULT_INFRA_RETRIES: usize = 2; + +/// `exec_step` returns this exit code when the step never ran because of an +/// *infrastructure* failure (restore/start/exec), distinct from any real exit +/// code (a host process killed by a signal surfaces as -1, a guest `exit -1` as +/// 255), so a scorer can tell "never ran" from "ran and failed". +pub const INFRA_FAILURE: i32 = i32::MIN; + +/// Per-process instance seed. Folding it (plus the pid) into box/snapshot NAMES +/// gives two Bases from the same image+setup — or two processes on one host — +/// DISJOINT names. The CLI enforces name uniqueness, and a shared name would let +/// one Base's cleanup tear down another's live box (cross-talk → false CI). +static INSTANCE_SEQ: AtomicU32 = AtomicU32::new(0); + +fn instance_token() -> String { + format!( + "{}-{}", + std::process::id(), + INSTANCE_SEQ.fetch_add(1, Ordering::Relaxed) + ) +} fn box_bin() -> String { std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".to_string()) @@ -109,6 +156,15 @@ fn box_cleanup(args: &[&str]) { let _ = Command::new(box_bin()).args(args).output(); } +/// RAII: removes a per-step box on every exit path (normal return, `?`, panic), +/// so a step that fails after `start`/before exec never leaks a box. +struct BoxGuard(String); +impl Drop for BoxGuard { + fn drop(&mut self) { + box_cleanup(&["rm", "-f", &self.0]); + } +} + /// Content-addressed cache key: FNV-1a over NUL-delimited parts (stable, dep-free). fn key(parts: &[&str]) -> String { let mut h: u64 = 0xcbf2_9ce4_8422_2325; @@ -147,6 +203,69 @@ fn slug(s: &str) -> String { .collect() } +/// Truncate output to `cap` bytes on a UTF-8 boundary, appending a marker, so one +/// chatty/looping step can't balloon an in-memory [`Report`] during large fan-out. +fn cap_output(mut s: String, cap: usize) -> String { + if s.len() <= cap { + return s; + } + let mut end = cap; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + let dropped = s.len() - end; + s.truncate(end); + s.push_str(&format!("\n[..truncated {dropped} bytes]")); + s +} + +/// Parse `::metric =` lines from a step's stdout into a metric map. +/// Non-numeric and non-finite (NaN/inf) values are ignored; this is the scoring +/// channel for callers (e.g. an evolution loop that selects on `perf_ms`/`tests`). +fn parse_metrics(stdout: &str) -> BTreeMap { + let mut m = BTreeMap::new(); + for line in stdout.lines() { + if let Some(rest) = line.trim().strip_prefix("::metric ") { + if let Some((k, v)) = rest.split_once('=') { + if let Ok(val) = v.trim().parse::() { + if val.is_finite() { + m.insert(k.trim().to_string(), val); + } + } + } + } + } + m +} + +/// Minimal, correct JSON string encoder (dep-free): quotes + escapes. +fn json_str(s: &str) -> String { + let mut o = String::with_capacity(s.len() + 2); + o.push('"'); + for c in s.chars() { + match c { + '"' => o.push_str("\\\""), + '\\' => o.push_str("\\\\"), + '\n' => o.push_str("\\n"), + '\r' => o.push_str("\\r"), + '\t' => o.push_str("\\t"), + c if (c as u32) < 0x20 => o.push_str(&format!("\\u{:04x}", c as u32)), + c => o.push(c), + } + } + o.push('"'); + o +} + +/// A finite JSON number, or `null` for NaN/±inf (which are not valid JSON). +fn json_num(v: f64) -> String { + if v.is_finite() { + format!("{v}") + } else { + "null".to_string() + } +} + /// Poll `exec -- true` until the box is ready (box has no "wait-ready" verb). fn wait_ready(box_name: &str) -> Result<()> { // ~30s total budget. A box is usually ready in ~100-300ms, so poll fast first @@ -205,6 +324,8 @@ pub struct WarmBase<'a> { setup: String, env: BTreeMap, cache: Option<&'a FileCache>, + max_output: usize, + infra_retries: usize, } impl<'a> WarmBase<'a> { @@ -214,6 +335,8 @@ impl<'a> WarmBase<'a> { setup: setup.into(), env: BTreeMap::new(), cache: None, + max_output: DEFAULT_MAX_OUTPUT, + infra_retries: DEFAULT_INFRA_RETRIES, } } pub fn env(mut self, k: impl Into, v: impl Into) -> Self { @@ -224,6 +347,17 @@ impl<'a> WarmBase<'a> { self.cache = Some(cache); self } + /// Cap captured stdout/stderr per step (default 1 MiB) to bound report memory. + pub fn max_output(mut self, bytes: usize) -> Self { + self.max_output = bytes; + self + } + /// Retries for a fork that hits an infrastructure failure (default 2). The + /// step's command never ran on such a failure, so re-forking is idempotent. + pub fn infra_retries(mut self, n: usize) -> Self { + self.infra_retries = n; + self + } } /// A pipeline step: a command run in its own kernel forked from the base. @@ -261,46 +395,129 @@ impl Step { } } -/// Outcome of a step. +/// Outcome of a step. `stdout`/`stderr` are separated (and capped, see +/// [`WarmBase::max_output`]); `metrics` are parsed from `::metric k=v` stdout +/// lines; `duration_ms` is the command's wall-clock time. An infrastructure +/// failure surfaces as `exit_code == `[`INFRA_FAILURE`]. #[derive(Debug, Clone)] pub struct StepResult { pub name: String, pub exit_code: i32, - pub logs: String, + pub stdout: String, + pub stderr: String, pub cached: bool, + pub allow_failure: bool, + pub duration_ms: u128, + pub metrics: BTreeMap, +} + +impl StepResult { + /// A passing step: exit 0, or a non-zero step that opted into `allow_failure`. + pub fn ok(&self) -> bool { + self.exit_code == 0 || self.allow_failure + } + /// stdout followed by stderr (the legacy combined log view). + pub fn combined(&self) -> String { + let mut s = self.stdout.clone(); + s.push_str(&self.stderr); + s + } + /// Serialize to a JSON object (dep-free). + pub fn to_json(&self) -> String { + let mut metrics = String::from("{"); + for (i, (k, v)) in self.metrics.iter().enumerate() { + if i > 0 { + metrics.push(','); + } + metrics.push_str(&json_str(k)); + metrics.push(':'); + metrics.push_str(&json_num(*v)); + } + metrics.push('}'); + format!( + "{{\"name\":{},\"exit_code\":{},\"cached\":{},\"allow_failure\":{},\"duration_ms\":{},\"metrics\":{},\"stdout\":{},\"stderr\":{}}}", + json_str(&self.name), + self.exit_code, + self.cached, + self.allow_failure, + self.duration_ms, + metrics, + json_str(&self.stdout), + json_str(&self.stderr), + ) + } +} + +/// Aggregate outcome of a [`Base::run_parallel`] batch: per-step results in input +/// order, total wall-clock, and a `passed` gate (all steps [`StepResult::ok`]). +#[derive(Debug, Clone)] +pub struct Report { + pub steps: Vec, + pub total_ms: u128, + pub passed: bool, +} + +impl Report { + /// The steps that did not pass (non-zero exit without `allow_failure`). + pub fn failures(&self) -> Vec<&StepResult> { + self.steps.iter().filter(|r| !r.ok()).collect() + } + /// Serialize the whole report to a JSON object (dep-free) for an agent/scorer. + pub fn to_json(&self) -> String { + let steps: Vec = self.steps.iter().map(|s| s.to_json()).collect(); + format!( + "{{\"passed\":{},\"total_ms\":{},\"steps\":[{}]}}", + self.passed, + self.total_ms, + steps.join(",") + ) + } } -/// A warmed, forkable base — a snapshot plus an optional step cache. +/// A warmed, forkable base — a snapshot plus an optional step cache. Methods take +/// `&self` (the fork counter is atomic), so steps can run concurrently across +/// threads. The snapshot is removed on drop (or eagerly via [`Base::dispose`]). pub struct Base<'a> { snapshot_name: String, snapshot_id: String, cache: Option<&'a FileCache>, - n: u32, + max_output: usize, + infra_retries: usize, + n: AtomicU32, + disposed: AtomicBool, } impl Base<'_> { - /// Run one step in its own kernel forked from the warmed base. A non-zero exit - /// returns `Err(StepFailed)` (fail-fast) unless `Step::allow_failure` was set. - pub fn step(&mut self, step: Step) -> Result { + /// Fork the base, run one step in its own kernel, return its [`StepResult`]. + /// `Err` only on an infrastructure failure (restore/start/exec); a non-zero + /// *step* exit is returned as a `StepResult` (caller decides what it means). + fn exec_step(&self, step: &Step) -> Result { let mut parts: Vec<&str> = vec![&step.name, &step.cmd]; parts.extend(step.inputs.iter().map(|s| s.as_str())); let k = key(&parts); if let Some(c) = self.cache { if c.has(&k) { return Ok(StepResult { - name: step.name, + name: step.name.clone(), exit_code: 0, - logs: String::new(), + stdout: String::new(), + stderr: String::new(), cached: true, + allow_failure: step.allow_failure, + duration_ms: 0, + metrics: BTreeMap::new(), }); } } - self.n += 1; - let box_name = format!("{}-job{}-{}", self.snapshot_name, self.n, slug(&step.name)); - box_cleanup(&["rm", "-f", &box_name]); // idempotent: clear a crashed prior run + // Names are globally unique (snapshot_name carries pid+instance entropy, + // `i` is per-fork), so no cross-process pre-clean is needed; the guard + // removes the fork on every exit path. + let i = self.n.fetch_add(1, Ordering::Relaxed) + 1; + let box_name = format!("{}-job{}-{}", self.snapshot_name, i, slug(&step.name)); + let _guard = BoxGuard(box_name.clone()); - // Fork = restore + start. Since a3s-box's CoW restore, this mounts the + // Fork = restore + start. Thanks to a3s-box's CoW restore, this mounts the // snapshot's rootfs as a read-only overlay lower with a fresh per-box upper. box_run(&[ "snapshot", @@ -313,38 +530,130 @@ impl Base<'_> { wait_ready(&box_name)?; let full = with_env(&step.cmd, &step.env); + let t = Instant::now(); let out = Command::new(box_bin()) .args(["exec", &box_name, "--", "sh", "-c", &full]) .output()?; - box_cleanup(&["rm", "-f", &box_name]); + let duration_ms = t.elapsed().as_millis(); let code = out.status.code().unwrap_or(-1); - let mut logs = String::from_utf8_lossy(&out.stdout).into_owned(); - logs.push_str(&String::from_utf8_lossy(&out.stderr)); + let stdout = String::from_utf8_lossy(&out.stdout).into_owned(); + let stderr = String::from_utf8_lossy(&out.stderr).into_owned(); + let metrics = parse_metrics(&stdout); // parse BEFORE truncating if code == 0 { if let Some(c) = self.cache { c.put(&k); } } - if code != 0 && !step.allow_failure { - return Err(PipelineError::StepFailed { - name: step.name, - code, - logs, - }); - } Ok(StepResult { - name: step.name, + name: step.name.clone(), exit_code: code, - logs, + stdout: cap_output(stdout, self.max_output), + stderr: cap_output(stderr, self.max_output), cached: false, + allow_failure: step.allow_failure, + duration_ms, + metrics, }) } - /// Remove the snapshot (by ID — `snapshot rm` does not accept names). + /// `exec_step` with bounded retry on *infrastructure* failure (the step's + /// command never ran, so re-forking is idempotent). A non-zero step exit is + /// returned as `Ok` and never retried — only a failed fork is. + fn exec_step_retrying(&self, step: &Step) -> Result { + retry_infra(self.infra_retries, || self.exec_step(step)) + } + + /// Run one step, **fail-fast**: a non-zero exit returns `Err(StepFailed)` + /// unless `Step::allow_failure` was set. Use for a sequential pipeline. + pub fn step(&self, step: Step) -> Result { + let r = self.exec_step_retrying(&step)?; + if r.exit_code != 0 && !step.allow_failure { + return Err(PipelineError::StepFailed { + name: r.name.clone(), + code: r.exit_code, + logs: r.combined(), + }); + } + Ok(r) + } + + /// Run one step for a batch: **never** propagates — an infra failure becomes a + /// `StepResult{ exit_code: `[`INFRA_FAILURE`]`, stderr: }` so one bad + /// fork can't abort the batch. + fn run_one(&self, step: Step) -> StepResult { + match self.exec_step_retrying(&step) { + Ok(r) => r, + Err(e) => StepResult { + name: step.name.clone(), + exit_code: INFRA_FAILURE, + stdout: String::new(), + stderr: e.to_string(), + cached: false, + allow_failure: step.allow_failure, + duration_ms: 0, + metrics: BTreeMap::new(), + }, + } + } + + /// Fan steps out concurrently — each is an isolated CoW fork of the base — + /// bounded to `max_concurrency` at a time. **Collect-all** (not fail-fast): + /// every step runs and is returned in input order. This is the primitive that + /// makes a3s-box's cheap fork (~ms) usable for matrix / evolution-scale CI. + /// Do not call [`Base::dispose`] while a `run_parallel` is in flight. + pub fn run_parallel(&self, steps: Vec, max_concurrency: usize) -> Report { + let start = Instant::now(); + let n = steps.len(); + if n == 0 { + return Report { + steps: Vec::new(), + total_ms: 0, + passed: true, + }; + } + let conc = max_concurrency.max(1).min(n); + let queue: Mutex> = + Mutex::new(steps.into_iter().enumerate().collect()); + let results: Mutex> = Mutex::new(Vec::with_capacity(n)); + + std::thread::scope(|s| { + for _ in 0..conc { + s.spawn(|| loop { + let next = { queue.lock().unwrap().pop_front() }; + let Some((idx, step)) = next else { break }; + let r = self.run_one(step); + results.lock().unwrap().push((idx, r)); + }); + } + }); + + let mut v = results.into_inner().unwrap(); + v.sort_by_key(|(i, _)| *i); + let steps: Vec = v.into_iter().map(|(_, r)| r).collect(); + let passed = steps.iter().all(|r| r.ok()); + Report { + steps, + total_ms: start.elapsed().as_millis(), + passed, + } + } + + /// Eagerly remove the snapshot (by ID, `--force` since forks are CoW lowers). + /// Idempotent with the `Drop` impl, so calling it early is safe. pub fn dispose(&self) { - box_cleanup(&["snapshot", "rm", &self.snapshot_id]); + if !self.disposed.swap(true, Ordering::SeqCst) { + box_cleanup(&["snapshot", "rm", "--force", &self.snapshot_id]); + } + } +} + +impl Drop for Base<'_> { + fn drop(&mut self) { + if !self.disposed.swap(true, Ordering::SeqCst) { + box_cleanup(&["snapshot", "rm", "--force", &self.snapshot_id]); + } } } @@ -372,15 +681,42 @@ fn parse_snapshot_id(ls_output: &str, name: &str) -> Option { None } -/// Warm a base box: run `setup` once, snapshot the result, return a forkable [`Base`]. +/// Retry `f` on an infrastructure failure, with backoff, up to `retries` extra +/// attempts. Shared by the per-step fork and `warm_base` — both re-run idempotent +/// work (a fresh box/fork each call), so a transient boot/restore hiccup under +/// concurrent load is absorbed rather than surfaced. +fn retry_infra(retries: usize, mut f: impl FnMut() -> Result) -> Result { + let mut last: Option = None; + for attempt in 0..=retries { + match f() { + Ok(v) => return Ok(v), + Err(e) => { + last = Some(e); + if attempt < retries { + std::thread::sleep(Duration::from_millis(150 * (attempt as u64 + 1))); + } + } + } + } + Err(last.expect("loop runs at least once")) +} + +/// Warm a base box: run `setup` once, snapshot the result, return a forkable +/// [`Base`]. Retried on a transient infrastructure failure (the spec's +/// `infra_retries` budget) so concurrent same-image warms stay robust under load. pub fn warm_base(spec: WarmBase<'_>) -> Result> { - let base_box = format!("ci-base-{}", key(&[&spec.image, &spec.setup])); - let snap = format!("{base_box}-snap"); + retry_infra(spec.infra_retries, || warm_once(&spec)) +} - box_cleanup(&["rm", "-f", &base_box]); // idempotent rerun - if let Ok(old) = snapshot_id(&snap) { - box_cleanup(&["snapshot", "rm", &old]); - } +/// One warm attempt. Fresh pid+instance-tagged names each call, so a retry can't +/// collide with its own partial leftovers nor tear down a concurrent peer's box. +fn warm_once<'a>(spec: &WarmBase<'a>) -> Result> { + let base_box = format!( + "ci-base-{}-{}", + key(&[&spec.image, &spec.setup]), + instance_token() + ); + let snap = format!("{base_box}-snap"); box_run(&[ "run", @@ -400,15 +736,88 @@ pub fn warm_base(spec: WarmBase<'_>) -> Result> { snapshot_id(&snap) })(); box_cleanup(&["rm", "-f", &base_box]); + // If `snapshot create` succeeded but ID resolution failed, no Base is built, + // so Drop never fires — best-effort remove the orphan snapshot. + if prepared.is_err() { + if let Ok(id) = snapshot_id(&snap) { + box_cleanup(&["snapshot", "rm", "--force", &id]); + } + } Ok(Base { snapshot_name: snap, snapshot_id: prepared?, cache: spec.cache, - n: 0, + max_output: spec.max_output, + infra_retries: spec.infra_retries, + n: AtomicU32::new(0), + disposed: AtomicBool::new(false), }) } +/// The owner pid embedded in a `ci-base---...` resource name. +fn parse_owner_pid(name: &str) -> Option { + let rest = name.strip_prefix("ci-base-")?; + let mut it = rest.split('-'); + it.next()?; // key hash + it.next()?.parse().ok() +} + +/// `Some(true/false)` where pid-liveness is knowable (Linux `/proc`); `None` +/// otherwise — so a sweep never reclaims resources whose owner it can't confirm dead. +fn pid_alive(pid: u32) -> Option { + if !std::path::Path::new("/proc").is_dir() { + return None; + } + Some(std::path::Path::new(&format!("/proc/{pid}")).exists()) +} + +/// If `name` is a ci-base resource owned by a confirmed-DEAD pid other than `me`, +/// return that pid (it is an orphan safe to reclaim). +fn orphan_pid(name: &str, me: u32) -> Option { + let pid = parse_owner_pid(name)?; + if pid == me { + return None; // ours — never sweep a live self + } + match pid_alive(pid) { + Some(false) => Some(pid), // confirmed dead + _ => None, // alive or unknowable — leave it + } +} + +/// Reclaim `ci-base-*` boxes and snapshots leaked by a **crashed** pipeline +/// process. `Drop`/guards clean up graceful exits, but `SIGKILL`/OOM/power-loss +/// skip them; resource names embed the owner pid, so a resource whose pid is no +/// longer alive (and isn't this process) is removed. Returns the names reclaimed. +/// Safe to call concurrently with live pipelines: it only touches dead-pid orphans. +pub fn sweep_orphans() -> Vec { + let me = std::process::id(); + let mut removed = Vec::new(); + + if let Ok(out) = box_run(&["ps", "-a", "--format", "{{.Names}}"]) { + for name in String::from_utf8_lossy(&out.stdout).lines() { + let name = name.trim(); + if orphan_pid(name, me).is_some() { + box_cleanup(&["rm", "-f", name]); + removed.push(name.to_string()); + } + } + } + if let Ok(out) = box_run(&["snapshot", "ls"]) { + let text = String::from_utf8_lossy(&out.stdout); + for line in text.lines() { + let mut cols = line.split_whitespace(); + if let (Some(id), Some(name)) = (cols.next(), cols.next()) { + if orphan_pid(name, me).is_some() { + box_cleanup(&["snapshot", "rm", "--force", id]); + removed.push(name.to_string()); + } + } + } + } + removed +} + #[cfg(test)] mod tests { use super::*; @@ -421,6 +830,11 @@ mod tests { assert_ne!(key(&["ab", "c"]), key(&["a", "bc"])); } + #[test] + fn instance_token_is_unique_per_call() { + assert_ne!(instance_token(), instance_token()); + } + #[test] fn with_env_escapes_and_prefixes() { let mut e = BTreeMap::new(); @@ -437,6 +851,86 @@ mod tests { assert_eq!(slug("a/b c"), "a-b-c"); } + #[test] + fn cap_output_truncates_on_char_boundary() { + assert_eq!(cap_output("hello".into(), 10), "hello"); // under cap: untouched + let capped = cap_output("a".repeat(100), 10); + assert!(capped.starts_with("aaaaaaaaaa")); + assert!(capped.contains("[..truncated 90 bytes]")); + // a multi-byte char ("é" = 2 bytes) must not be split mid-codepoint. + let c = cap_output("é".repeat(10), 5); // floors to a 4-byte boundary + assert!(std::str::from_utf8(c.as_bytes()).is_ok()); + assert!(c.contains("truncated")); + } + + #[test] + fn parse_metrics_extracts_finite_numeric_only() { + let m = parse_metrics( + "noise\n::metric perf_ms=84.2\n ::metric tests=412 \n::metric bad=xyz\n\ + ::metric inf_m=inf\n::metric nan_m=NaN\ntrailing", + ); + assert_eq!(m.get("perf_ms"), Some(&84.2)); + assert_eq!(m.get("tests"), Some(&412.0)); + assert!(!m.contains_key("bad")); // non-numeric ignored + assert!(!m.contains_key("inf_m")); // non-finite ignored + assert!(!m.contains_key("nan_m")); + assert_eq!(m.len(), 2); + } + + #[test] + fn json_str_escapes_quotes_backslash_control() { + assert_eq!(json_str("a\"b\\c\n"), "\"a\\\"b\\\\c\\n\""); + assert_eq!(json_str("\u{0001}"), "\"\\u0001\""); + assert_eq!(json_num(1.5), "1.5"); + assert_eq!(json_num(f64::NAN), "null"); + } + + #[test] + fn report_and_step_json_shape() { + let mut metrics = BTreeMap::new(); + metrics.insert("k".to_string(), 2.0); + let rep = Report { + passed: false, + total_ms: 5, + steps: vec![StepResult { + name: "t".into(), + exit_code: 1, + stdout: "o".into(), + stderr: "e\"x".into(), + cached: false, + allow_failure: false, + duration_ms: 3, + metrics, + }], + }; + let j = rep.to_json(); + assert!(j.contains("\"passed\":false")); + assert!(j.contains("\"total_ms\":5")); + assert!(j.contains("\"name\":\"t\"")); + assert!(j.contains("\"exit_code\":1")); + assert!(j.contains("\"metrics\":{\"k\":2}")); + assert!(j.contains("\"stderr\":\"e\\\"x\"")); // escaped quote in stderr + assert_eq!(rep.failures().len(), 1); + } + + #[test] + fn step_result_ok_respects_allow_failure() { + let mk = |code: i32, allow: bool| StepResult { + name: "s".into(), + exit_code: code, + stdout: String::new(), + stderr: String::new(), + cached: false, + allow_failure: allow, + duration_ms: 0, + metrics: BTreeMap::new(), + }; + assert!(mk(0, false).ok()); + assert!(!mk(1, false).ok()); + assert!(mk(1, true).ok()); // allowed failure still "ok" for the gate + assert!(!mk(INFRA_FAILURE, false).ok()); + } + #[test] fn file_cache_roundtrip() { let dir = std::env::temp_dir().join(format!("a3s-ci-test-{}", key(&["cache"]))); @@ -484,6 +978,31 @@ mod tests { ); } + #[test] + fn parse_owner_pid_and_orphan_detection() { + assert_eq!(parse_owner_pid("ci-base-deadbeef-4242-0-snap"), Some(4242)); + assert_eq!( + parse_owner_pid("ci-base-deadbeef-4242-7-snap-job3-make"), + Some(4242) + ); + assert_eq!(parse_owner_pid("other-box"), None); + assert_eq!(parse_owner_pid("ci-base-onlykey"), None); + // this process's own resources are never orphans, regardless of liveness. + let me = std::process::id(); + let mine = format!("ci-base-deadbeef-{me}-0-snap"); + assert_eq!(orphan_pid(&mine, me), None); + // a confirmed-dead pid is an orphan — but only where liveness is knowable + // (Linux /proc); on hosts where it isn't, sweep must leave it untouched. + if pid_alive(4_000_000_000).is_some() { + assert_eq!( + orphan_pid("ci-base-deadbeef-4000000000-0-snap", me), + Some(4_000_000_000) + ); + } else { + assert_eq!(orphan_pid("ci-base-deadbeef-4000000000-0-snap", me), None); + } + } + #[test] fn ci_error_display_covers_each_variant() { let io = PipelineError::from(std::io::Error::other("boom")); @@ -505,21 +1024,33 @@ mod tests { } // A POSIX-sh stub standing in for `a3s-box`: enough to drive warm_base + step - // through their happy paths (run/exec/start/rm succeed, `snapshot create` - // records name->id in a state file that `snapshot ls` echoes, an exec whose - // command contains "boom" exits 7). Lets us cover the orchestration locally, - // without a real box or KVM. + // through their happy paths (run/exec/start/rm succeed; `snapshot create` + // records name->id in a state file that `snapshot ls` echoes; an exec whose + // command contains "boom" exits 7, "emitmetric" prints a `::metric` line; + // a `snapshot restore` whose target name contains "infrafail" exits 1). #[cfg(unix)] const FAKE_BOX: &str = r#"#!/bin/sh state="$(dirname "$0")/.snaps" case "$1" in run|start|rm) exit 0 ;; exec) last=""; for a in "$@"; do last="$a"; done - case "$last" in *boom*) exit 7 ;; *) exit 0 ;; esac ;; + case "$last" in + *emitmetric*) printf '::metric score=9\n'; exit 0 ;; + *boom*) exit 7 ;; + *) exit 0 ;; + esac ;; snapshot) case "$2" in create) name=""; while [ "$#" -gt 0 ]; do [ "$1" = "--name" ] && name="$2"; shift; done printf '%s %s\n' "snap-fake1" "$name" >> "$state"; printf 'snap-fake1\n'; exit 0 ;; + restore) + for a in "$@"; do + case "$a" in + *infrafail*) exit 1 ;; + *flaky*) c="$(dirname "$0")/.flaky"; n=$(cat "$c" 2>/dev/null || echo 0); n=$((n+1)); echo "$n" > "$c"; [ "$n" -le 2 ] && exit 1 || exit 0 ;; + esac + done + exit 0 ;; ls) printf 'ID NAME SRC\n'; cat "$state" 2>/dev/null; exit 0 ;; *) exit 0 ;; esac ;; @@ -543,7 +1074,7 @@ esac // Happy path against the stub. std::env::set_var("A3S_BOX", &fake); let cache = FileCache::new(dir.join("cache")).unwrap(); - let mut base = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base"); + let base = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base"); let r = base.step(Step::new("build", "make")).expect("step"); assert_eq!(r.exit_code, 0); assert!(!r.cached); @@ -552,23 +1083,74 @@ esac base.step(Step::new("fail", "boom")), Err(PipelineError::StepFailed { code: 7, .. }) )); - // box_run surfaces a non-zero exit as Cli (the stub exits 7 on a "boom" exec). - assert!(matches!( - box_run(&["exec", "x", "--", "boom"]), - Err(PipelineError::Cli { code: Some(7), .. }) - )); // allow_failure: a non-zero step returns Ok with the code instead of Err. let allowed = base .step(Step::new("maybe", "boom").allow_failure()) .expect("allow_failure step"); assert_eq!(allowed.exit_code, 7); - assert!(!allowed.cached); - // A second warm_base exercises the stale-snapshot pre-clean path. - let _ = warm_base(WarmBase::new("img", "echo hi").cache(&cache)).expect("warm_base #2"); + + // run_parallel: collect-all, input order preserved, metrics parsed, gate derived. + let rep = base.run_parallel( + vec![ + Step::new("a", "make"), + Step::new("b", "boom"), // exits 7 -> not ok + Step::new("m", "emitmetric"), // prints ::metric score=9 + ], + 2, + ); + assert_eq!(rep.steps.len(), 3); + assert_eq!(rep.steps[0].name, "a"); // order preserved despite concurrency + assert_eq!(rep.steps[1].exit_code, 7); + assert_eq!(rep.steps[2].metrics.get("score"), Some(&9.0)); + assert!(!rep.passed); + assert_eq!(rep.failures().len(), 1); + assert!(rep.to_json().contains("\"score\":9")); + + // an allowed-failure step must NOT fail the batch gate. + let rep2 = base.run_parallel( + vec![ + Step::new("ok", "make"), + Step::new("soft", "boom").allow_failure(), + ], + 2, + ); + assert!(rep2.passed, "allow_failure step must not fail the batch"); + + // an infra failure (restore refuses) surfaces as INFRA_FAILURE, not a panic + // (it is retried infra_retries times first, then gives up). + let rep3 = base.run_parallel(vec![Step::new("infrafail", "make")], 1); + assert_eq!(rep3.steps[0].exit_code, INFRA_FAILURE); + assert!(!rep3.steps[0].stderr.is_empty()); + assert!(!rep3.passed); + + // a TRANSIENT infra failure is retried: "flaky" restore fails twice then + // succeeds, so with the default 2 retries (3 attempts) the step recovers. + let _ = std::fs::remove_file(dir.join(".flaky")); + let okr = base.run_parallel(vec![Step::new("flaky", "make")], 1); + assert_eq!( + okr.steps[0].exit_code, 0, + "transient infra failure should be retried to success" + ); + assert!(okr.passed); + + // empty batch is trivially passed. + assert!(base.run_parallel(Vec::new(), 4).passed); + + // Two Bases from the SAME spec must get disjoint names (no cross-talk). + let b1 = warm_base(WarmBase::new("u", "echo").cache(&cache)).expect("b1"); + let b2 = warm_base(WarmBase::new("u", "echo").cache(&cache)).expect("b2"); + assert_ne!( + b1.snapshot_name, b2.snapshot_name, + "same spec must yield disjoint snapshot names" + ); + b1.dispose(); + b2.dispose(); + base.dispose(); + base.dispose(); // idempotent: second dispose + Drop must not double-remove // A cache-less base exercises the no-cache branch in step(). - let mut nocache = warm_base(WarmBase::new("img", "echo hi")).expect("warm_base nocache"); + let nocache = warm_base(WarmBase::new("img", "echo hi")).expect("warm_base nocache"); assert!( !nocache .step(Step::new("x", "make")) @@ -598,7 +1180,11 @@ esac // Exercise every builder method (fields are private; chaining covers the bodies). let c = FileCache::new(std::env::temp_dir().join(format!("a3s-ci-bld-{}", key(&["bld"])))) .unwrap(); - let _ = WarmBase::new("img", "setup").env("A", "1").cache(&c); + let _ = WarmBase::new("img", "setup") + .env("A", "1") + .cache(&c) + .max_output(4096) + .infra_retries(1); let _ = Step::new("n", "cmd") .input("x") .env("K", "V") diff --git a/src/sdk/tests/integration_kvm.rs b/src/sdk/tests/integration_kvm.rs new file mode 100644 index 00000000..7fc58509 --- /dev/null +++ b/src/sdk/tests/integration_kvm.rs @@ -0,0 +1,229 @@ +//! Real-microVM integration tests for the programmable-CI pipeline. +//! +//! `#[ignore]` by default. Run on a `/dev/kvm` host: +//! ```text +//! A3S_BOX=/path/to/a3s-box \ +//! cargo test -p a3s-box-sdk --test integration_kvm -- --ignored --nocapture --test-threads=1 +//! ``` +//! Override the image with `A3S_SDK_TEST_IMAGE` (default: a daocloud alpine mirror). +//! Each test self-skips if no usable `a3s-box`/virtualization is present. + +use a3s_box_sdk::pipeline::{sweep_orphans, warm_base, FileCache, Step, WarmBase}; +use std::process::Command; + +fn a3s_box() -> String { + std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".into()) +} + +fn image() -> String { + std::env::var("A3S_SDK_TEST_IMAGE") + .unwrap_or_else(|_| "docker.m.daocloud.io/library/alpine:latest".into()) +} + +/// Run `a3s-box `, returning (success, stdout+stderr). +fn run(args: &[&str]) -> (bool, String) { + let out = Command::new(a3s_box()) + .args(args) + .output() + .expect("spawn a3s-box"); + let mut s = String::from_utf8_lossy(&out.stdout).into_owned(); + s.push_str(&String::from_utf8_lossy(&out.stderr)); + (out.status.success(), s) +} + +/// True only when a working `a3s-box` reports virtualization is available. +fn kvm_ready() -> bool { + match Command::new(a3s_box()).arg("info").output() { + Ok(o) => o.status.success() && String::from_utf8_lossy(&o.stdout).contains("available"), + Err(_) => false, + } +} + +/// Resource names embed this process's pid; scope leak counts to `--` so a +/// concurrent pipeline on the shared host can't perturb the assertions. +fn marker() -> String { + format!("-{}-", std::process::id()) +} + +fn ci_base_boxes() -> usize { + let m = marker(); + run(&["ps", "-a", "--format", "{{.Names}}"]) + .1 + .lines() + .filter(|l| l.trim().starts_with("ci-base-") && l.contains(&m)) + .count() +} + +fn ci_base_snaps() -> usize { + let m = marker(); + run(&["snapshot", "ls"]) + .1 + .lines() + .filter(|l| l.contains("ci-base-") && l.contains(&m)) + .count() +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn warm_fork_exec_and_cache() { + if !kvm_ready() { + eprintln!("SKIP warm_fork_exec_and_cache: no A3S_BOX/KVM"); + return; + } + let cdir = std::env::temp_dir().join(format!("a3s-sdk-it-cache-{}", std::process::id())); + let _ = std::fs::remove_dir_all(&cdir); + let cache = FileCache::new(&cdir).unwrap(); + + let base = warm_base(WarmBase::new(image(), "echo DEPS-INSTALLED > /warmed").cache(&cache)) + .expect("warm_base"); + let r = base + .step(Step::new("read", "cat /warmed")) + .expect("step read"); + assert_eq!(r.exit_code, 0); + assert!(r.stdout.contains("DEPS-INSTALLED"), "stdout={:?}", r.stdout); + assert!(!r.cached); + let r2 = base + .step(Step::new("read", "cat /warmed")) + .expect("step read#2"); + assert!(r2.cached, "an identical step must hit the cache"); + base.dispose(); + let _ = std::fs::remove_dir_all(&cdir); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn parallel_collect_all_ordered_with_metrics() { + if !kvm_ready() { + eprintln!("SKIP parallel_collect_all_ordered_with_metrics: no A3S_BOX/KVM"); + return; + } + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + let rep = base.run_parallel( + vec![ + Step::new("a", "echo first"), + Step::new( + "perf", + "echo '::metric duration_ms=12.5'; echo '::metric tests=7'", + ), + Step::new("b", "exit 3"), + Step::new("c", "echo third"), + ], + 4, + ); + assert_eq!(rep.steps.len(), 4); + assert_eq!(rep.steps[0].name, "a"); // input order preserved despite concurrency + assert_eq!(rep.steps[1].name, "perf"); + assert_eq!(rep.steps[1].metrics.get("duration_ms"), Some(&12.5)); + assert_eq!(rep.steps[1].metrics.get("tests"), Some(&7.0)); + assert_eq!(rep.steps[2].exit_code, 3); + assert!(!rep.passed); + assert_eq!(rep.failures().len(), 1); + assert!(rep.steps[0].duration_ms < 120_000); + assert!(rep.to_json().contains("\"duration_ms\":12.5")); + base.dispose(); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn forks_are_isolated() { + if !kvm_ready() { + eprintln!("SKIP forks_are_isolated: no A3S_BOX/KVM"); + return; + } + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + // Each step is a fresh CoW fork of the base; a file written in one fork must + // not be visible to a sibling. + let rep = base.run_parallel( + vec![ + Step::new("writer", "echo HELLO > /marker; cat /marker"), + Step::new("reader", "cat /marker 2>/dev/null || echo MISSING"), + ], + 2, + ); + assert!(rep.steps[0].stdout.contains("HELLO")); + assert!( + rep.steps[1].stdout.contains("MISSING"), + "sibling fork leaked state: {:?}", + rep.steps[1].stdout + ); + base.dispose(); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn leak_free_under_churn() { + if !kvm_ready() { + eprintln!("SKIP leak_free_under_churn: no A3S_BOX/KVM"); + return; + } + let boxes0 = ci_base_boxes(); + let snaps0 = ci_base_snaps(); + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + for gen in 0..3 { + let steps: Vec = (0..6) + .map(|i| Step::new(format!("g{gen}s{i}"), "true")) + .collect(); + let rep = base.run_parallel(steps, 4); + assert!(rep.passed, "gen {gen} unexpectedly failed"); + let m = marker(); + let lingering = run(&["ps", "-a", "--format", "{{.Names}}"]) + .1 + .lines() + .filter(|l| l.contains("-snap-job") && l.contains(&m)) + .count(); + assert_eq!(lingering, 0, "leaked fork boxes mid-churn at gen {gen}"); + } + base.dispose(); + assert_eq!(ci_base_boxes(), boxes0, "leaked ci-base boxes after run"); + assert_eq!( + ci_base_snaps(), + snaps0, + "leaked ci-base snapshots after dispose" + ); +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm"] +fn sweep_reclaims_dead_pid_orphan_but_spares_live() { + if !kvm_ready() { + eprintln!("SKIP sweep_reclaims_dead_pid_orphan_but_spares_live: no A3S_BOX/KVM"); + return; + } + // A live base (owned by THIS pid) must survive the sweep. + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + + // Forge a running box named as if owned by a reaped (dead) pid. + let mut child = Command::new("true").spawn().expect("spawn true"); + let dead = child.id(); + child.wait().ok(); // reap -> /proc/ goes away -> confirmed dead + let orphan = format!("ci-base-deadbeef-{dead}-0-snap-job1-x"); + let (ok, out) = run(&[ + "run", + "-d", + "--name", + &orphan, + &image(), + "--", + "sleep", + "300", + ]); + assert!(ok, "could not create forged orphan box: {out}"); + + let removed = sweep_orphans(); + assert!( + removed.iter().any(|n| n == &orphan), + "sweep did not reclaim the dead-pid orphan; removed={removed:?}" + ); + let names = run(&["ps", "-a", "--format", "{{.Names}}"]).1; + assert!( + !names.lines().any(|l| l.trim() == orphan), + "orphan box still present after sweep" + ); + // The live base was spared — a step on it still works. + assert!( + base.step(Step::new("alive", "true")).is_ok(), + "sweep wrongly reclaimed a live base" + ); + run(&["rm", "-f", &orphan]); // belt-and-suspenders + base.dispose(); +} diff --git a/src/sdk/tests/soak_kvm.rs b/src/sdk/tests/soak_kvm.rs new file mode 100644 index 00000000..e7d10ab2 --- /dev/null +++ b/src/sdk/tests/soak_kvm.rs @@ -0,0 +1,160 @@ +//! Soak test: sustained fork-eval churn must stay leak-free and memory-stable. +//! +//! `#[ignore]` by default. Run on a `/dev/kvm` host: +//! ```text +//! A3S_BOX=/path/to/a3s-box A3S_SDK_SOAK_FORKS=2000 \ +//! cargo test -p a3s-box-sdk --test soak_kvm -- --ignored --nocapture --test-threads=1 +//! ``` +//! Knobs: `A3S_SDK_SOAK_FORKS` (total fork-evals, default 200), +//! `A3S_SDK_SOAK_CONC` (max concurrency, default 8), `A3S_SDK_TEST_IMAGE`. + +use a3s_box_sdk::pipeline::{warm_base, Step, WarmBase}; +use std::process::Command; +use std::time::Instant; + +fn a3s_box() -> String { + std::env::var("A3S_BOX").unwrap_or_else(|_| "a3s-box".into()) +} + +fn image() -> String { + std::env::var("A3S_SDK_TEST_IMAGE") + .unwrap_or_else(|_| "docker.m.daocloud.io/library/alpine:latest".into()) +} + +fn kvm_ready() -> bool { + match Command::new(a3s_box()).arg("info").output() { + Ok(o) => o.status.success() && String::from_utf8_lossy(&o.stdout).contains("available"), + Err(_) => false, + } +} + +/// Resource names embed this process's pid (`ci-base---...`); scope +/// every leak count to `--` so a *concurrent* pipeline on the same host can't +/// perturb the assertions (the global namespace is shared). +fn marker() -> String { + format!("-{}-", std::process::id()) +} + +/// Count THIS run's boxes whose name also contains `needle`. +fn count_box_names(needle: &str) -> usize { + let m = marker(); + let out = Command::new(a3s_box()) + .args(["ps", "-a", "--format", "{{.Names}}"]) + .output() + .expect("a3s-box ps"); + String::from_utf8_lossy(&out.stdout) + .lines() + .filter(|l| l.contains(needle) && l.contains(&m)) + .count() +} + +/// Count THIS run's ci-base snapshots. +fn count_my_snaps() -> usize { + let m = marker(); + let out = Command::new(a3s_box()) + .args(["snapshot", "ls"]) + .output() + .expect("a3s-box snapshot ls"); + String::from_utf8_lossy(&out.stdout) + .lines() + .filter(|l| l.contains("ci-base-") && l.contains(&m)) + .count() +} + +fn rss_kib() -> Option { + let s = std::fs::read_to_string("/proc/self/status").ok()?; + s.lines() + .find_map(|l| l.strip_prefix("VmRSS:")) + .and_then(|v| v.split_whitespace().next()) + .and_then(|n| n.parse().ok()) +} + +fn env_usize(key: &str, default: usize) -> usize { + std::env::var(key) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +#[test] +#[ignore = "needs a real a3s-box + /dev/kvm; long-running"] +fn soak_fork_eval_is_leak_free_and_stable() { + if !kvm_ready() { + eprintln!("SKIP soak_fork_eval_is_leak_free_and_stable: no A3S_BOX/KVM"); + return; + } + let target = env_usize("A3S_SDK_SOAK_FORKS", 200); + let conc = env_usize("A3S_SDK_SOAK_CONC", 8); + let batch = 20usize; + + let snaps0 = count_my_snaps(); + let rss0 = rss_kib(); + + let base = warm_base(WarmBase::new(image(), "true")).expect("warm_base"); + let snap_base = count_my_snaps(); + assert_eq!( + snap_base, + snaps0 + 1, + "warm_base should add exactly one snapshot" + ); + + let start = Instant::now(); + let mut done = 0usize; + let mut gen = 0usize; + while done < target { + let n = batch.min(target - done); + let steps: Vec = (0..n) + .map(|i| Step::new(format!("g{gen}s{i}"), "echo '::metric ok=1'")) + .collect(); + let rep = base.run_parallel(steps, conc); + assert!( + rep.passed, + "gen {gen} had failures: {:?}", + rep.failures() + .iter() + .map(|s| { + ( + s.name.clone(), + s.exit_code, + s.stderr.chars().take(160).collect::(), + ) + }) + .collect::>() + ); + assert_eq!(rep.steps.len(), n); + // every step parsed its metric (proves the channel survives churn). + assert!(rep.steps.iter().all(|s| s.metrics.get("ok") == Some(&1.0))); + // leak gate per generation: no fork box lingers, snapshot count stays flat. + assert_eq!( + count_box_names("-snap-job"), + 0, + "leaked fork boxes after gen {gen}" + ); + assert_eq!(count_my_snaps(), snap_base, "snapshot drift at gen {gen}"); + done += n; + gen += 1; + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + "SOAK: {done} fork-evals across {gen} generations in {elapsed:.1}s = {:.1} forks/s", + done as f64 / elapsed + ); + + base.dispose(); + assert_eq!(count_my_snaps(), snaps0, "snapshot leaked after dispose"); + assert_eq!( + count_box_names("ci-base-"), + 0, + "ci-base box leaked after soak" + ); + + if let (Some(a), Some(b)) = (rss0, rss_kib()) { + let grow = b.saturating_sub(a); + eprintln!("SOAK RSS: {a} KiB -> {b} KiB (+{grow} KiB over {done} fork-evals)"); + assert!( + grow < 200_000, + "RSS grew {grow} KiB over {done} fork-evals — possible orchestrator leak" + ); + } +}