Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,27 @@ jobs:
unset A3S_DEPS_STUB
chmod +x bench/bench.sh
bench/bench.sh race
# SDK programmable-CI pipeline against REAL microVMs: warm + CoW fork-per-step,
# parallel collect-all, `::metric` extraction, fork isolation, leak-freeness,
# and crash-orphan recovery (sweep_orphans). The unit suite only drives a fake
# CLI, so this is the only gate that the real fork/snapshot path actually works.
- name: SDK pipeline integration — real fork-per-step + sweep
env:
A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box
A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }}
run: |
unset A3S_DEPS_STUB
cd src
cargo test --release -p a3s-box-sdk --test integration_kvm -- --ignored --nocapture --test-threads=1
# Soak the fork-eval loop: sustained churn must stay leak-free (no orphan
# boxes/snapshots, snapshot count flat) and memory-stable. Light here (120
# fork-evals); crank A3S_SDK_SOAK_FORKS for a manual long soak.
- name: SDK pipeline soak — leak-free under churn
env:
A3S_BOX: ${{ github.workspace }}/src/target/release/a3s-box
A3S_SDK_TEST_IMAGE: ${{ vars.KVM_CI_AGENT_IMAGE || 'docker.m.daocloud.io/library/alpine:latest' }}
A3S_SDK_SOAK_FORKS: "120"
run: |
unset A3S_DEPS_STUB
cd src
cargo test --release -p a3s-box-sdk --test soak_kvm -- --ignored --nocapture --test-threads=1
52 changes: 52 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,58 @@ All notable changes to A3S Box will be documented in this file.

## [Unreleased]

### Added

- **Programmable-CI pipeline: parallel fan-out + typed JSON report (`a3s-box-sdk`).**
`Base::run_parallel(steps, max_concurrency)` runs steps concurrently as isolated
copy-on-write MicroVM forks (bounded, collect-all, results in input order) and returns a
`Report` with a dependency-free `to_json()`. `StepResult` now carries separated
`stdout`/`stderr`, `duration_ms`, and `metrics` parsed from `::metric <key>=<value>`
guest-stdout lines (a machine-readable scoring channel for matrix/selection workloads).
Steps run via `&self` (atomic fork counter), so fan-out no longer needs hand-rolled
threads. The base auto-removes its snapshot on `Drop` (`--force`), and each fork is
removed on every path (including panic). Box/snapshot names now carry per-process +
per-instance entropy, so concurrent pipelines from the same image+setup can no longer
collide and tear down each other's boxes. A fork that hits a *transient*
infrastructure failure (restore/start/boot) is retried — `WarmBase::infra_retries`,
default 2 — since its command never ran, which keeps sustained high-concurrency
churn green. Validated end-to-end on a real `/dev/kvm` host.
- **Crash-orphan recovery + real-VM integration & soak tests.** `sweep_orphans()`
reclaims `ci-base-*` boxes/snapshots left behind when a pipeline process is
`SIGKILL`ed / OOM-killed (its RAII cleanup never runs), by matching the dead owner
pid embedded in the resource name — and it never touches a live peer's resources.
Added `#[ignore]`'d real-microVM integration tests (`tests/integration_kvm.rs`:
warm + fork-per-step, cache, parallel order/metrics, fork isolation, leak-freeness,
sweep) and a soak test (`tests/soak_kvm.rs`: sustained fork-eval churn stays
leak-free and RSS-stable), both wired into the KVM CI gate.
- **`a3s-box-ci` runner + `warm_base` retry.** A dependency-free `a3s-box-ci` binary
(shipped by the `a3s-box-sdk` crate) bridges any agent/tool to the pipeline: a
line-based spec on stdin → a JSON `Report` on stdout (`a3s-box-ci run -`), plus
`a3s-box-ci sweep` for crash-orphan recovery. `warm_base` now also retries on a
transient infrastructure failure (sharing the step-fork's `retry_infra` budget),
so concurrent same-image warms are more robust under load.

### Changed

- **`StepResult.logs` is replaced by separated `stdout` / `stderr` fields**
(use `StepResult::combined()` for the old concatenated view). Breaking for
direct `.logs` field access on the `a3s-box-sdk` pipeline API.

### Fixed

- **Concurrent same-image pipelines could corrupt each other's rootfs cache.**
`RootfsCache::prune` (run after a cache-miss `put`) evicted least-recently-used
entries with no in-use guard, so it could `remove_dir_all` a cache entry that
another box was simultaneously using as its overlayfs **lowerdir** — the peer's
`mount(2)` then failed with `No such file or directory (os error 2)`, and the
failure persisted through retries (the backing was gone). Added the same in-use
guard `SnapshotStore::prune` already applies to live copy-on-write lowers: each
overlay box records the cache key it holds in a `<box_dir>/.rootfs-cache-key`
marker (removed with the box dir), and `prune` skips any still-referenced key.
Found via a concurrent-pipeline chaos test driven through a3s-code; root-caused
and verified on a real `/dev/kvm` host (the concurrency scenario went from ~50%
failure to reliably green).

## [2.6.0] — 2026-06-26

### Added
Expand Down
73 changes: 70 additions & 3 deletions src/runtime/src/cache/rootfs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,27 @@ impl RootfsCache {
Ok(())
}

/// Prune the cache to stay within the given entry count limit.
/// Prune the cache to stay within the given entry count / byte limit.
///
/// Evicts least-recently-accessed entries first.
/// Returns the number of entries evicted.
/// Evicts least-recently-accessed entries first. Returns the number evicted.
pub fn prune(&self, max_entries: usize, max_bytes: u64) -> Result<usize> {
self.prune_protecting(max_entries, max_bytes, &std::collections::HashSet::new())
}

/// Like [`RootfsCache::prune`], but never evicts an entry whose key is in
/// `protected`. Such an entry is currently serving as a box's overlayfs
/// **lowerdir**, and `remove_dir_all`-ing it out from under a concurrent box's
/// `mount(2)` makes the mount fail with ENOENT ("No such file or directory").
/// This is the same in-use guard [`crate::SnapshotStore::prune`] applies to
/// live copy-on-write lowers — without it, two pipelines built from the same
/// image (one cache-hit overlay box, one cache-miss box that prunes after its
/// put) can race and corrupt each other.
pub fn prune_protecting(
&self,
max_entries: usize,
max_bytes: u64,
protected: &std::collections::HashSet<String>,
) -> Result<usize> {
let mut entries = self.list_entries()?;

if entries.len() <= max_entries {
Expand All @@ -215,6 +231,11 @@ impl RootfsCache {
if current_count <= max_entries && current_size <= max_bytes {
break;
}
// Never evict an entry in use as a live overlay lower — deleting the
// lowerdir under a concurrent box's mount(2) is the bug this guards.
if protected.contains(&entry.key) {
continue;
}
self.invalidate(&entry.key)?;
current_count -= 1;
current_size = current_size.saturating_sub(entry.size_bytes);
Expand Down Expand Up @@ -495,6 +516,52 @@ mod tests {
assert_eq!(cache.entry_count().unwrap(), 1);
}

#[test]
fn prune_protecting_never_evicts_in_use_key() {
let tmp = TempDir::new().unwrap();
let cache = RootfsCache::new(tmp.path()).unwrap();
for i in 0..4 {
let src = tmp.path().join(format!("s{i}"));
create_test_rootfs(&src, &[("f", "x")]);
cache.put(&format!("k{i}"), &src, &format!("e{i}")).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
// k0 is the OLDEST (normally evicted first) but is in use as an overlay lower.
let mut protected = std::collections::HashSet::new();
protected.insert("k0".to_string());
// keep=2 over 4 entries evicts two; the protected k0 is never one of them.
// (last_accessed is second-resolution, so WHICH two unprotected entries go
// is not asserted — only that the in-use lower survives.)
let evicted = cache.prune_protecting(2, u64::MAX, &protected).unwrap();
assert_eq!(evicted, 2, "two unprotected entries evicted to meet keep=2");
assert!(
cache.get("k0").unwrap().is_some(),
"the in-use (protected) lower must survive prune"
);
assert_eq!(
cache.entry_count().unwrap(),
2,
"k0 + one unprotected remain"
);
}

#[test]
fn prune_protecting_keeps_all_when_all_in_use() {
let tmp = TempDir::new().unwrap();
let cache = RootfsCache::new(tmp.path()).unwrap();
for i in 0..2 {
let src = tmp.path().join(format!("s{i}"));
create_test_rootfs(&src, &[("f", "x")]);
cache.put(&format!("k{i}"), &src, "e").unwrap();
}
let protected: std::collections::HashSet<String> =
["k0", "k1"].iter().map(|s| s.to_string()).collect();
// Even asked to keep 0, nothing is evicted — every entry is a live lower.
let evicted = cache.prune_protecting(0, 0, &protected).unwrap();
assert_eq!(evicted, 0, "all in-use -> nothing evicted");
assert_eq!(cache.entry_count().unwrap(), 2);
}

#[test]
fn test_rootfs_cache_metadata_persists() {
let tmp = TempDir::new().unwrap();
Expand Down
37 changes: 35 additions & 2 deletions src/runtime/src/vm/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ impl VmManager {
let cache_key = RootfsCache::compute_key(reference, &[], &[], &[]);
if let Some(cached_path) = self.try_rootfs_cache_path(&cache_key)? {
let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?;
// Record that this box holds `cache_key` as its overlay lower, so a
// concurrent box's cache prune won't evict it mid-mount (ENOENT).
self.mark_rootfs_cache_key(&box_dir, &cache_key);
let tee_instance_config = self.generate_tee_config(&box_dir)?;
return Ok(BoxLayout {
rootfs_path,
Expand Down Expand Up @@ -204,6 +207,9 @@ impl VmManager {
prom.rootfs_cache_hits.inc();
}
let rootfs_path = self.rootfs_provider.prepare(&box_dir, &cached_path)?;
// Record that this box holds `cache_key` as its overlay lower, so a
// concurrent box's cache prune won't evict it mid-mount (ENOENT).
self.mark_rootfs_cache_key(&box_dir, &cache_key);

if let Ok(guest_init_path) = Self::find_guest_init() {
tracing::info!(
Expand Down Expand Up @@ -377,10 +383,14 @@ impl VmManager {
description = %description,
"Stored rootfs in cache"
);
// Prune if needed
if let Err(e) = cache.prune(
// Prune if needed — but never evict a cache entry that is in use as
// a live overlay lower for a concurrent box (deleting the lowerdir
// under its mount(2) is the same-image concurrency bug this guards).
let protected = self.referenced_rootfs_cache_keys();
if let Err(e) = cache.prune_protecting(
self.config.cache.max_rootfs_entries,
self.config.cache.max_cache_bytes,
&protected,
) {
tracing::warn!(error = %e, "Failed to prune rootfs cache");
}
Expand All @@ -391,6 +401,29 @@ impl VmManager {
}
}

/// Record which rootfs-cache key this box holds as its overlay lower, in a
/// `<box_dir>/.rootfs-cache-key` marker (mirror of the snapshot store's
/// `.snapshot-lower`). Read back by [`Self::referenced_rootfs_cache_keys`] so
/// the cache prune never evicts a live lower. Best-effort; removed with box_dir.
fn mark_rootfs_cache_key(&self, box_dir: &Path, cache_key: &str) {
let _ = std::fs::write(box_dir.join(".rootfs-cache-key"), cache_key);
}

/// Rootfs-cache keys currently in use as an overlay lower by some live box.
/// Boxes live under `<home>/boxes/<id>/`; a removed box's marker is gone with
/// its dir, so an evictable key is simply one no live box references.
fn referenced_rootfs_cache_keys(&self) -> std::collections::HashSet<String> {
let mut set = std::collections::HashSet::new();
if let Ok(entries) = std::fs::read_dir(self.home_dir.join("boxes")) {
for entry in entries.flatten() {
if let Ok(k) = std::fs::read_to_string(entry.path().join(".rootfs-cache-key")) {
set.insert(k.trim().to_string());
}
}
}
set
}

/// Resolve the cache directory from config or default.
pub(crate) fn resolve_cache_dir(&self) -> PathBuf {
self.config
Expand Down
6 changes: 6 additions & 0 deletions src/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ description = "Rust SDK for a3s-box. Includes a programmable CI/CD pipeline API

[dependencies]
# none — a thin, dependency-free wrapper over the `a3s-box` CLI.

# Thin runner that bridges any agent (a3s-code / Claude Code / Codex) to the
# pipeline API: a line-based spec on stdin -> JSON Report on stdout.
[[bin]]
name = "a3s-box-ci"
path = "src/bin/a3s-box-ci.rs"
36 changes: 28 additions & 8 deletions src/sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,48 @@ state). Set `A3S_BOX` if `a3s-box` is not on `PATH`.

## Pipelines

Warm a base box **once** (clone + install deps), snapshot it, fork per step:
Warm a base box **once** (clone + install deps), snapshot it, fork per step.
Run steps **sequentially** (fail-fast) or **in parallel** (collect-all → a typed
report):

```rust
use a3s_box_sdk::pipeline::{warm_base, WarmBase, FileCache, Step};

fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> {
let cache = FileCache::new(".ci-cache")?; // skip a step when its inputs are unchanged
let mut base = warm_base(
let base = warm_base(
WarmBase::new("node:20", "git clone $REPO /w && cd /w && npm ci") // runs ONCE
.env("REPO", "https://github.com/me/app")
.cache(&cache),
)?;

// Sequential, fail-fast: a non-zero exit returns Err.
base.step(Step::new("lint", "cd /w && npm run lint"))?;
base.step(Step::new("test", "cd /w && npm test"))?; // nonzero exit -> Err (fail-fast)
base.step(Step::new("build", "cd /w && npm run build"))?;
base.dispose(); // drops the snapshot
Ok(())

// Parallel, collect-all: each step is an isolated CoW fork; <=4 at a time.
let report = base.run_parallel(vec![
Step::new("test", "cd /w && npm test"),
Step::new("build", "cd /w && npm run build"),
], 4);

println!("{}", report.to_json()); // {"passed":..,"total_ms":..,"steps":[..]}
if !report.passed { /* inspect report.failures() */ }
Ok(()) // `base` drops here -> snapshot auto-removed (or call base.dispose())
}
```

`Step::allow_failure()` keeps the pipeline going on a non-zero exit; `Step::input(..)`
adds extra cache-key parts. Parallel steps = spawn threads (each `step` is blocking).
- `run_parallel` is the way to use a3s-box's cheap (~ms) CoW fork at scale — a
matrix / evolution-style batch — without hand-rolling threads (every method
takes `&self`).
- A step reports a metric by printing `::metric <key>=<number>` to stdout; it
surfaces as `StepResult::metrics` (the scoring channel for a selection loop).
- `StepResult` carries separated `stdout`/`stderr`, `duration_ms`, and `cached`;
`Report::to_json()` is the machine-readable handoff to an agent/scorer.
- `Step::allow_failure()` keeps a non-zero step from failing the run; `Step::input(..)`
adds extra cache-key parts.

The base **auto-disposes** its snapshot on drop, and each per-step box is removed
on every path (including a panic), so a long-running batch doesn't leak.

## Why forking is cheap

Expand Down
37 changes: 24 additions & 13 deletions src/sdk/examples/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,36 @@ fn main() -> Result<(), a3s_box_sdk::pipeline::PipelineError> {
let cache = FileCache::new("/tmp/.a3s-ci-demo")?;

// Warm the base once (here: write a marker = "deps installed"), snapshot it.
let mut base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?;
let base = warm_base(WarmBase::new(image, "echo DEPS-INSTALLED > /warmed").cache(&cache))?;

// Sequential, fail-fast: a non-zero exit returns Err.
let r = base.step(Step::new("read", "cat /warmed"))?;
println!(
"read -> code={} out={:?} cached={}",
r.exit_code,
r.logs.trim(),
r.cached
);

println!("read -> code={} out={:?}", r.exit_code, r.stdout.trim());
let r2 = base.step(Step::new("read", "cat /warmed"))?; // identical -> cache hit
println!("read#2 -> cached={}", r2.cached);

match base.step(Step::new("fail", "exit 7")) {
Err(e) => println!("fail-fast ok: {e}"),
Ok(_) => println!("ERROR: fail step did not error"),
}
// Parallel matrix, collect-all: each step is an isolated CoW fork of the base.
// A step reports a metric by printing `::metric key=value`.
let report = base.run_parallel(
vec![
Step::new("ok", "echo fine"),
Step::new("perf", "echo '::metric duration_ms=12.5'"),
Step::new("fail", "exit 7"), // collected, not fatal
],
4,
);
println!("report -> {}", report.to_json());
println!(
"passed={} failures={:?}",
report.passed,
report
.failures()
.iter()
.map(|s| &s.name)
.collect::<Vec<_>>()
);

base.dispose();
// base.dispose() is optional — the snapshot is removed when `base` drops.
println!("demo ok");
Ok(())
}
Loading
Loading