From 8036c89bb5e86f04752ace403142b25c586245c6 Mon Sep 17 00:00:00 2001 From: a3s-release Date: Fri, 26 Jun 2026 14:54:18 +0800 Subject: [PATCH 1/6] =?UTF-8?q?release:=20v2.6.0=20=E2=80=94=20containerd?= =?UTF-8?q?=20RuntimeClass=20shim=20+=20VMM=20shim=20session=20isolation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add containerd-shim-a3s-box-v2: route runtimeClassName=a3s-box pods to the a3s-box MicroVM runtime via a containerd runtime handler (deploy/shim/ manifests). - Fix: setsid the libkrun VMM shim so it survives launcher teardown (keeps exec.sock). - Make a3s-libkrun-sys libkrunfw download resilient (curl retry + abort-on-stall). - Bump workspace version 2.5.2 -> 2.6.0; CHANGELOG. --- CHANGELOG.md | 30 ++ containerd-shim/Cargo.toml | 27 + containerd-shim/src/logic.rs | 274 ++++++++++ containerd-shim/src/main.rs | 25 + containerd-shim/src/service.rs | 756 ++++++++++++++++++++++++++++ deploy/shim/containerd-a3s-box.toml | 17 + deploy/shim/runtimeclass.yaml | 19 + deploy/shim/soak-complex.yaml | 83 +++ deploy/shim/test-pod.yaml | 24 + src/Cargo.toml | 2 +- src/deps/libkrun-sys/build.rs | 21 +- src/runtime/src/vmm/controller.rs | 17 + 12 files changed, 1293 insertions(+), 2 deletions(-) create mode 100644 containerd-shim/Cargo.toml create mode 100644 containerd-shim/src/logic.rs create mode 100644 containerd-shim/src/main.rs create mode 100644 containerd-shim/src/service.rs create mode 100644 deploy/shim/containerd-a3s-box.toml create mode 100644 deploy/shim/runtimeclass.yaml create mode 100644 deploy/shim/soak-complex.yaml create mode 100644 deploy/shim/test-pod.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index eda0677c..e9aa4986 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,36 @@ All notable changes to A3S Box will be documented in this file. ## [Unreleased] +## [2.6.0] — 2026-06-26 + +### Added + +- **`containerd-shim-a3s-box-v2` — Kubernetes RuntimeClass integration.** A new + containerd runtime-v2 shim (standalone `containerd-shim/` crate) that lets a vanilla + Kubernetes cluster route `runtimeClassName: a3s-box` pods to the a3s-box MicroVM + runtime via a containerd runtime handler, without replacing the node CRI. It maps the + containerd Task API onto the `a3s-box` CLI (pod sandbox → placeholder; workload → + detached MicroVM; `kubectl exec` → `a3s-box exec`). Deploy manifests under + `deploy/shim/` (RuntimeClass, additive containerd config, example pod). Validated on a + real `/dev/kvm` Kubernetes node: a `runtimeClassName: a3s-box` pod reaches Running on + a real libkrun MicroVM. Still experimental — `kubectl exec`/log streaming depend on the + guest exec control channel and are not yet fully validated; single-container, + TSI-networked pods are the supported shape. + +### Fixed + +- **VMM shim now survives teardown of its launcher's session.** `VmController` puts the + libkrun shim in its own session (`setsid` via `pre_exec`) so a process-group/cgroup + kill of a foreground launcher (e.g. a containerd-shim `a3s-box run`) no longer reaps + the shim and removes the box's `exec.sock`, which previously caused `a3s-box exec` to + fail with "exec socket missing". + +### Changed + +- **`a3s-libkrun-sys` build downloads are resilient.** The libkrunfw fetch now retries and + aborts stalled transfers (`curl --retry --speed-limit/--speed-time`) instead of a bare, + unbounded `curl` that could hang forever on a flaky network. + ## [2.5.2] — 2026-06-22 ### Changed diff --git a/containerd-shim/Cargo.toml b/containerd-shim/Cargo.toml new file mode 100644 index 00000000..7c849ba8 --- /dev/null +++ b/containerd-shim/Cargo.toml @@ -0,0 +1,27 @@ +# Standalone crate: NOT part of the box workspace (own [workspace] table below). +# It links no a3s-box crate — it drives the installed `a3s-box` CLI — so it builds +# fast and in isolation, and never perturbs the runtime's Cargo.lock. +[package] +name = "a3s-box-containerd-shim" +version = "0.1.0" +edition = "2021" +license = "MIT" +description = "containerd runtime shim v2 that routes RuntimeClass=a3s-box pods to the a3s-box MicroVM runtime" + +[[bin]] +name = "containerd-shim-a3s-box-v2" +path = "src/main.rs" + +[dependencies] +# ttrpc + protobuf come transitively via containerd-shim-protos; reference them +# through its re-exports (containerd_shim_protos::{ttrpc, protobuf}) so versions +# always match the generated Task service. +containerd-shim = { version = "0.11", features = ["async"] } +containerd-shim-protos = { version = "0.11", features = ["async"] } +async-trait = "0.1" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "sync", "io-util", "fs", "time"] } +log = "0.4" +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[workspace] diff --git a/containerd-shim/src/logic.rs b/containerd-shim/src/logic.rs new file mode 100644 index 00000000..404c0d43 --- /dev/null +++ b/containerd-shim/src/logic.rs @@ -0,0 +1,274 @@ +//! Pure, side-effect-free logic for the a3s-box containerd shim. +//! +//! Everything here is free of process spawning, ttrpc, and filesystem state so it +//! can be unit-tested directly. `service.rs` is the thin async layer that wires +//! these into the containerd Task API and spawns the `a3s-box` CLI. + +use serde_json::Value; + +/// Annotations the containerd CRI plugin sets on the OCI spec. +pub const ANN_CONTAINER_TYPE: &str = "io.kubernetes.cri.container-type"; +pub const ANN_IMAGE_NAME: &str = "io.kubernetes.cri.image-name"; + +/// Fields extracted from an OCI runtime spec (config.json) the shim needs. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct ParsedSpec { + pub is_sandbox: bool, + pub image: Option, + pub args: Vec, + pub env: Vec, + pub cpus: Option, + pub memory_mb: Option, +} + +/// Read a string annotation from an OCI spec JSON value. +pub fn annotation<'a>(spec: &'a Value, key: &str) -> Option<&'a str> { + spec.get("annotations")?.get(key)?.as_str() +} + +/// Extract the shim-relevant fields from a parsed config.json value. +pub fn parse_spec(spec: &Value) -> ParsedSpec { + let is_sandbox = annotation(spec, ANN_CONTAINER_TYPE) + .map(|t| t == "sandbox") + .unwrap_or(false); + let image = annotation(spec, ANN_IMAGE_NAME).map(|s| s.to_string()); + let args = string_array(spec.pointer("/process/args")); + let env = string_array(spec.pointer("/process/env")); + let memory_mb = spec + .pointer("/linux/resources/memory/limit") + .and_then(|v| v.as_u64()) + .map(|b| (b / (1024 * 1024)).max(1)); + let cpus = match ( + spec.pointer("/linux/resources/cpu/quota").and_then(|v| v.as_i64()), + spec.pointer("/linux/resources/cpu/period").and_then(|v| v.as_u64()), + ) { + (Some(q), Some(p)) if q > 0 && p > 0 => Some(((q as f64 / p as f64).ceil()) as u32), + _ => None, + }; + ParsedSpec { is_sandbox, image, args, env, cpus, memory_mb } +} + +fn string_array(v: Option<&Value>) -> Vec { + v.and_then(|a| a.as_array()) + .map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect()) + .unwrap_or_default() +} + +/// Build the `a3s-box` argv (after the binary) for a detached workload run. +pub fn run_args( + id: &str, + image: &str, + cpus: Option, + memory_mb: Option, + env: &[String], + cmd: &[String], +) -> Vec { + let mut v = vec![ + "run".to_string(), + "-d".to_string(), + "--name".to_string(), + id.to_string(), + ]; + if let Some(c) = cpus { + v.push("--cpus".to_string()); + v.push(c.to_string()); + } + if let Some(m) = memory_mb { + v.push("--memory".to_string()); + v.push(format!("{m}m")); + } + for e in env { + v.push("-e".to_string()); + v.push(e.clone()); + } + v.push(image.to_string()); + if !cmd.is_empty() { + v.push("--".to_string()); + v.extend(cmd.iter().cloned()); + } + v +} + +/// Build the `a3s-box exec` argv (after the binary) for an exec process. +pub fn exec_args(container_id: &str, cmd: &[String]) -> Vec { + let mut v = vec![ + "exec".to_string(), + container_id.to_string(), + "--".to_string(), + ]; + v.extend(cmd.iter().cloned()); + v +} + +/// Parse the OCI Process command from the JSON-encoded protobuf Any value that +/// containerd sends with an Exec request. +pub fn parse_exec_command(spec_value: &[u8]) -> Vec { + serde_json::from_slice::(spec_value) + .ok() + .map(|v| string_array(v.get("args"))) + .unwrap_or_default() +} + +/// Whether `a3s-box inspect` stdout reports the box as running. +pub fn is_running(inspect_stdout: &str) -> bool { + inspect_stdout.contains("\"running\"") +} + +/// The stable A3S_HOME used so `run`/`exec`/`wait`/`stop`/`rm` share one +/// boxes.json regardless of the (often empty) env containerd gives the shim. +pub fn a3s_home() -> String { + std::env::var("A3S_HOME").unwrap_or_else(|_| "/var/lib/a3s-box".to_string()) +} + +/// Numeric box status -> name (mirrors the protobuf Status enum we report). +pub fn status_name(s: i32) -> &'static str { + match s { + 1 => "created", + 2 => "running", + 3 => "stopped", + _ => "unknown", + } +} + +/// Parse an exit code printed by `a3s-box wait` (its stdout is the integer code). +pub fn parse_wait_exit_code(stdout: &str) -> u32 { + stdout.trim().parse::().unwrap_or(0) as u32 +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn annotation_present_and_absent() { + let s = json!({"annotations": {"k": "v"}}); + assert_eq!(annotation(&s, "k"), Some("v")); + assert_eq!(annotation(&s, "missing"), None); + assert_eq!(annotation(&json!({}), "k"), None); + assert_eq!(annotation(&json!(null), "k"), None); + } + + #[test] + fn parse_spec_sandbox() { + let s = json!({"annotations": {ANN_CONTAINER_TYPE: "sandbox"}}); + let p = parse_spec(&s); + assert!(p.is_sandbox); + assert_eq!(p.image, None); + assert!(p.args.is_empty()); + } + + #[test] + fn parse_spec_workload_full() { + let s = json!({ + "annotations": { + ANN_CONTAINER_TYPE: "container", + ANN_IMAGE_NAME: "docker.io/library/redis:7" + }, + "process": { + "args": ["redis-server", "--port", "0"], + "env": ["A=1", "B=2"] + }, + "linux": {"resources": { + "memory": {"limit": 536870912u64}, // 512 MiB + "cpu": {"quota": 150000, "period": 100000} // 1.5 -> ceil 2 + }} + }); + let p = parse_spec(&s); + assert!(!p.is_sandbox); + assert_eq!(p.image.as_deref(), Some("docker.io/library/redis:7")); + assert_eq!(p.args, vec!["redis-server", "--port", "0"]); + assert_eq!(p.env, vec!["A=1", "B=2"]); + assert_eq!(p.memory_mb, Some(512)); + assert_eq!(p.cpus, Some(2)); + } + + #[test] + fn parse_spec_defaults_when_missing() { + let p = parse_spec(&json!({})); + assert_eq!(p, ParsedSpec::default()); + // zero/invalid cpu quota -> None + let s = json!({"linux": {"resources": {"cpu": {"quota": -1, "period": 100000}}}}); + assert_eq!(parse_spec(&s).cpus, None); + // sub-MiB memory clamps to at least 1 + let s = json!({"linux": {"resources": {"memory": {"limit": 1024}}}}); + assert_eq!(parse_spec(&s).memory_mb, Some(1)); + } + + #[test] + fn run_args_minimal() { + assert_eq!( + run_args("box1", "alpine", None, None, &[], &[]), + vec!["run", "-d", "--name", "box1", "alpine"] + ); + } + + #[test] + fn run_args_full() { + let v = run_args( + "box1", + "redis:7", + Some(2), + Some(256), + &["A=1".into()], + &["redis-server".into(), "--port".into(), "0".into()], + ); + assert_eq!( + v, + vec![ + "run", "-d", "--name", "box1", "--cpus", "2", "--memory", "256m", "-e", "A=1", + "redis:7", "--", "redis-server", "--port", "0" + ] + ); + } + + #[test] + fn exec_args_builds_separator() { + assert_eq!( + exec_args("box1", &["sh".into(), "-c".into(), "echo hi".into()]), + vec!["exec", "box1", "--", "sh", "-c", "echo hi"] + ); + assert_eq!(exec_args("box1", &[]), vec!["exec", "box1", "--"]); + } + + #[test] + fn parse_exec_command_from_any() { + let any = br#"{"args":["ls","-la"],"cwd":"/"}"#; + assert_eq!(parse_exec_command(any), vec!["ls", "-la"]); + assert!(parse_exec_command(b"not json").is_empty()); + assert!(parse_exec_command(br#"{"no_args":true}"#).is_empty()); + } + + #[test] + fn is_running_detects_status() { + assert!(is_running(r#"{"status": "running"}"#)); + assert!(is_running(r#"{"status":"running","pid":1}"#)); + assert!(!is_running(r#"{"status": "created"}"#)); + assert!(!is_running("")); + } + + #[test] + fn status_name_maps() { + assert_eq!(status_name(1), "created"); + assert_eq!(status_name(2), "running"); + assert_eq!(status_name(3), "stopped"); + assert_eq!(status_name(0), "unknown"); + assert_eq!(status_name(99), "unknown"); + } + + #[test] + fn parse_wait_exit_code_handles_garbage() { + assert_eq!(parse_wait_exit_code("0\n"), 0); + assert_eq!(parse_wait_exit_code(" 137 "), 137); + assert_eq!(parse_wait_exit_code("notanumber"), 0); + assert_eq!(parse_wait_exit_code(""), 0); + } + + #[test] + fn a3s_home_default() { + // Default applies when unset (don't mutate global env in parallel tests). + if std::env::var("A3S_HOME").is_err() { + assert_eq!(a3s_home(), "/var/lib/a3s-box"); + } + } +} diff --git a/containerd-shim/src/main.rs b/containerd-shim/src/main.rs new file mode 100644 index 00000000..e44ed40b --- /dev/null +++ b/containerd-shim/src/main.rs @@ -0,0 +1,25 @@ +//! containerd-shim-a3s-box-v2 — a containerd runtime shim v2 that routes +//! `RuntimeClass=a3s-box` pods to the a3s-box MicroVM runtime. +//! +//! Model: containerd maps the runtime handler `a3s-box` (runtime_type +//! `io.containerd.a3s-box.v2`) to this binary. It is a THIN shim: it drives the +//! installed `a3s-box` CLI rather than linking the runtime, so it builds fast +//! and never perturbs the runtime's build. Each workload container becomes one +//! `a3s-box run` MicroVM; the CRI pod-sandbox (pause) container is a lightweight +//! placeholder (a3s-box has no pause container — the VM itself is the sandbox). +//! +//! Known MVP limitation: a3s-box manages its own image rootfs, so the shim +//! resolves the image from the CRI annotation `io.kubernetes.cri.image-name` +//! and lets a3s-box pull it, rather than consuming containerd's prepared +//! rootfs. This works for the RuntimeClass pod path; raw `ctr run` (no image +//! annotation) is not supported. + +mod logic; +mod service; + +use service::Service; + +#[tokio::main] +async fn main() { + containerd_shim::asynchronous::run::("io.containerd.a3s-box.v2", None).await; +} diff --git a/containerd-shim/src/service.rs b/containerd-shim/src/service.rs new file mode 100644 index 00000000..45f2fd5d --- /dev/null +++ b/containerd-shim/src/service.rs @@ -0,0 +1,756 @@ +//! Thin containerd shim v2 service for a3s-box. +//! +//! Maps the containerd runtime-v2 Task API onto the `a3s-box` CLI: +//! * CRI pod-sandbox (pause) container -> a host `sleep` placeholder (a3s-box +//! has no pause container; the MicroVM itself is the sandbox). +//! * workload container -> `a3s-box run -d --name -- ` +//! (detached so the box registers in a3s-box state and is exec-able). +//! * kubectl exec -> `a3s-box exec -- ` (Task.exec/start +//! create+run an exec process with stdio wired to the containerd FIFOs). +//! +//! State for all tasks served by this shim process lives in a shared map. +//! `Shim::wait` blocks until `shutdown` fires the ExitSignal. + +use std::collections::HashMap; +use std::os::unix::io::{FromRawFd, IntoRawFd}; +use std::process::Stdio; +use std::sync::Arc; + +use async_trait::async_trait; +use containerd_shim::asynchronous::{spawn, ExitSignal, Shim}; +use containerd_shim::publisher::RemotePublisher; +use containerd_shim::{Config, Error, Flags, StartOpts, TtrpcContext, TtrpcResult}; +use containerd_shim_protos::shim_async::Task; +use containerd_shim_protos::{api, protobuf, ttrpc}; +use tokio::process::{Child, Command}; +use tokio::sync::Mutex; + +use crate::logic; + +/// CLI binary; override with A3S_BOX_BIN for non-PATH installs. +fn a3s_box_bin() -> String { + std::env::var("A3S_BOX_BIN").unwrap_or_else(|_| "a3s-box".to_string()) +} + +/// Build an `a3s-box` command with a STABLE A3S_HOME so run/exec/wait/stop/rm all +/// share one boxes.json. containerd hands the shim a near-empty env (often no +/// HOME), so without this each invocation resolves a different state file and +/// `exec` reports "No such box". Override with A3S_HOME if the host sets one. +fn a3s_box_cmd() -> Command { + let mut c = Command::new(a3s_box_bin()); + c.env("A3S_HOME", logic::a3s_home()); + c +} + +#[derive(Default)] +struct Proc { + bundle: String, + is_sandbox: bool, + image: Option, + args: Vec, + env: Vec, + cpus: Option, + memory_mb: Option, + stdout: String, + stderr: String, + pid: u32, + status: i32, // api::Status enum value: 0 unknown,1 created,2 running,3 stopped + exit_code: u32, + child: Option, +} + +/// An exec process (kubectl exec) running inside a box via `a3s-box exec`. +#[derive(Default)] +struct ExecProc { + container_id: String, + args: Vec, + stdin: String, + stdout: String, + stderr: String, + pid: u32, + exit_code: u32, + child: Option, +} + +#[derive(Default)] +struct State { + procs: HashMap, + execs: HashMap, // keyed by exec_id (globally unique) +} + +#[derive(Clone)] +pub struct Service { + id: String, + namespace: String, + // ExitSignal isn't Clone; share one across the Shim + its task service. + exit: Arc, + state: Arc>, +} + +// ---- helpers --------------------------------------------------------------- + +/// Parse the OCI bundle's config.json for the fields we need. +fn read_spec(bundle: &str) -> serde_json::Value { + let path = format!("{bundle}/config.json"); + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or(serde_json::Value::Null) +} + +/// Open a containerd stdio FIFO path for writing and hand it to a child as Stdio. +/// Empty path => inherit null (detached). +fn open_fifo_write(path: &str) -> Stdio { + if path.is_empty() { + return Stdio::null(); + } + match std::fs::OpenOptions::new().write(true).open(path) { + // SAFETY: we own the fd and pass it straight to the child. + Ok(f) => unsafe { Stdio::from_raw_fd(f.into_raw_fd()) }, + Err(_) => Stdio::null(), + } +} + +/// Open a containerd stdin FIFO path for reading. Empty path => null. +fn open_fifo_read(path: &str) -> Stdio { + if path.is_empty() { + return Stdio::null(); + } + match std::fs::OpenOptions::new().read(true).open(path) { + // SAFETY: we own the fd and pass it straight to the child. + Ok(f) => unsafe { Stdio::from_raw_fd(f.into_raw_fd()) }, + Err(_) => Stdio::null(), + } +} + +impl Service { + async fn populate_from_bundle(&self, id: &str, bundle: &str, stdout: &str, stderr: &str) { + let parsed = logic::parse_spec(&read_spec(bundle)); + let mut st = self.state.lock().await; + st.procs.insert( + id.to_string(), + Proc { + bundle: bundle.to_string(), + is_sandbox: parsed.is_sandbox, + image: parsed.image, + args: parsed.args, + env: parsed.env, + cpus: parsed.cpus, + memory_mb: parsed.memory_mb, + stdout: stdout.to_string(), + stderr: stderr.to_string(), + status: 1, // CREATED + ..Default::default() + }, + ); + } +} + +// ---- Shim (process lifecycle) --------------------------------------------- + +#[async_trait] +impl Shim for Service { + type T = Service; + + async fn new(_runtime_id: &str, args: &Flags, _config: &mut Config) -> Self { + Service { + id: args.id.clone(), + namespace: args.namespace.clone(), + exit: Arc::new(ExitSignal::default()), + state: Arc::new(Mutex::new(State::default())), + } + } + + async fn start_shim(&mut self, opts: StartOpts) -> Result { + let grouping = opts.id.clone(); + let address = spawn(opts, &grouping, Vec::new()).await?; + Ok(address) + } + + async fn delete_shim(&mut self) -> Result { + // Out-of-band cleanup: force-remove the box if it lingers. + let _ = a3s_box_cmd() + .args(["rm", "-f", &self.id]) + .output() + .await; + let mut r = api::DeleteResponse::new(); + r.set_exit_status(0); + Ok(r) + } + + async fn wait(&mut self) { + self.exit.wait().await; + } + + async fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T { + self.clone() + } +} + +// ---- Task (per-container RPCs) -------------------------------------------- + +#[async_trait] +impl Task for Service { + async fn create( + &self, + _ctx: &TtrpcContext, + req: api::CreateTaskRequest, + ) -> TtrpcResult { + self.populate_from_bundle(req.id(), req.bundle(), req.stdout(), req.stderr()) + .await; + let mut resp = api::CreateTaskResponse::new(); + resp.set_pid(std::process::id()); // provisional; real pid set on start + Ok(resp) + } + + async fn start( + &self, + _ctx: &TtrpcContext, + req: api::StartRequest, + ) -> TtrpcResult { + let id = req.id().to_string(); + let exec_id = req.exec_id().to_string(); + + // kubectl exec: start a previously-created exec process inside the box. + if !exec_id.is_empty() { + let mut st = self.state.lock().await; + let e = st + .execs + .get_mut(&exec_id) + .ok_or_else(|| ttrpc_err(format!("unknown exec {exec_id}")))?; + let mut cmd = a3s_box_cmd(); + cmd.args(logic::exec_args(&e.container_id, &e.args)); + cmd.stdin(open_fifo_read(&e.stdin)) + .stdout(open_fifo_write(&e.stdout)) + .stderr(open_fifo_write(&e.stderr)); + let child = cmd + .spawn() + .map_err(|err| ttrpc_err(format!("a3s-box exec spawn failed: {err}")))?; + e.pid = child.id().unwrap_or(0); + e.child = Some(child); + let mut resp = api::StartResponse::new(); + resp.set_pid(e.pid); + return Ok(resp); + } + + let mut st = self.state.lock().await; + let p = st + .procs + .get_mut(&id) + .ok_or_else(|| ttrpc_err(format!("unknown container {id}")))?; + + if p.is_sandbox { + // Pod sandbox: no MicroVM; hold a placeholder process for lifetime. + let child = Command::new("sleep") + .arg("infinity") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|e| ttrpc_err(format!("spawn sandbox placeholder failed: {e}")))?; + p.pid = child.id().unwrap_or(0); + p.child = Some(child); + } else { + // Workload: launch DETACHED via `setsid a3s-box run -d`, fire-and-forget. + // This replicates the interactive-shell `run -d` that BINDS exec.sock and + // orphans the libkrun shim to init. (Foreground run — and a plain + // shim-spawned `run -d` held as a child — do NOT bind exec.sock, so + // kubectl exec can't reach the guest.) We then poll the box record until + // it reports running, by which point boot's exec-readiness has completed. + let image = p + .image + .clone() + .ok_or_else(|| ttrpc_err("no image annotation; raw ctr is unsupported".into()))?; + let cpus = p.cpus; + let memory = p.memory_mb; + let env = p.env.clone(); + let args = p.args.clone(); + drop(st); // release the lock across spawn + readiness poll + + let mut cmd = Command::new("setsid"); + cmd.env("A3S_HOME", logic::a3s_home()); + cmd.arg(a3s_box_bin()); + cmd.args(logic::run_args(&id, &image, cpus, memory, &env, &args)); + cmd.stdin(Stdio::null()).stdout(Stdio::null()).stderr(Stdio::null()); + cmd.spawn() + .map_err(|e| ttrpc_err(format!("setsid a3s-box run -d spawn failed: {e}")))?; + + let mut running = false; + for _ in 0..120 { + if let Ok(o) = a3s_box_cmd().args(["inspect", &id]).output().await { + if o.status.success() && logic::is_running(&String::from_utf8_lossy(&o.stdout)) { + running = true; + break; + } + } + tokio::time::sleep(std::time::Duration::from_millis(700)).await; + } + if !running { + return Err(ttrpc_err(format!("box {id} did not become running"))); + } + let mut st = self.state.lock().await; + if let Some(p) = st.procs.get_mut(&id) { + p.status = 2; // RUNNING + p.pid = std::process::id(); + } + let mut resp = api::StartResponse::new(); + resp.set_pid(std::process::id()); + return Ok(resp); + } + p.status = 2; // RUNNING (sandbox) + + let mut resp = api::StartResponse::new(); + resp.set_pid(p.pid); + Ok(resp) + } + + async fn exec( + &self, + _ctx: &TtrpcContext, + req: api::ExecProcessRequest, + ) -> TtrpcResult { + let container_id = req.id().to_string(); + let exec_id = req.exec_id().to_string(); + // containerd marshals the OCI Process spec as JSON inside the protobuf Any. + let args = logic::parse_exec_command(&req.spec().value); + let mut st = self.state.lock().await; + st.execs.insert( + exec_id, + ExecProc { + container_id, + args, + stdin: req.stdin().to_string(), + stdout: req.stdout().to_string(), + stderr: req.stderr().to_string(), + ..Default::default() + }, + ); + Ok(api::Empty::new()) + } + + async fn wait( + &self, + _ctx: &TtrpcContext, + req: api::WaitRequest, + ) -> TtrpcResult { + let id = req.id().to_string(); + let exec_id = req.exec_id().to_string(); + + // kubectl exec: wait for the exec process to exit. + if !exec_id.is_empty() { + let child = { + let mut st = self.state.lock().await; + st.execs.get_mut(&exec_id).and_then(|e| e.child.take()) + }; + let code = if let Some(mut c) = child { + c.wait().await.ok().and_then(|s| s.code()).unwrap_or(0) as u32 + } else { + 0 + }; + let mut st = self.state.lock().await; + if let Some(e) = st.execs.get_mut(&exec_id) { + e.exit_code = code; + } + let mut resp = api::WaitResponse::new(); + resp.set_exit_status(code); + return Ok(resp); + } + + // Take the child out so we can await it without holding the lock. + let child = { + let mut st = self.state.lock().await; + st.procs.get_mut(&id).and_then(|p| p.child.take()) + }; + let code = if let Some(mut c) = child { + c.wait().await.ok().and_then(|s| s.code()).unwrap_or(0) as u32 + } else { + // Workload box (detached): block on `a3s-box wait ` until it exits. + a3s_box_cmd() + .args(["wait", &id]) + .output() + .await + .ok() + .map(|o| logic::parse_wait_exit_code(&String::from_utf8_lossy(&o.stdout))) + .unwrap_or(0) + }; + let mut st = self.state.lock().await; + if let Some(p) = st.procs.get_mut(&id) { + p.status = 3; // STOPPED + p.exit_code = code; + } + let mut resp = api::WaitResponse::new(); + resp.set_exit_status(code); + Ok(resp) + } + + async fn state( + &self, + _ctx: &TtrpcContext, + req: api::StateRequest, + ) -> TtrpcResult { + let exec_id = req.exec_id().to_string(); + let st = self.state.lock().await; + let mut resp = api::StateResponse::new(); + resp.set_id(req.id().to_string()); + if !exec_id.is_empty() { + resp.set_exec_id(exec_id.clone()); + if let Some(e) = st.execs.get(&exec_id) { + resp.set_pid(e.pid); + resp.set_exit_status(e.exit_code); + resp.set_stdin(e.stdin.clone()); + resp.set_stdout(e.stdout.clone()); + resp.set_stderr(e.stderr.clone()); + let s = if e.child.is_some() { + 2 // running + } else if e.pid > 0 { + 3 // stopped + } else { + 1 // created + }; + resp.status = protobuf_enum_status(s); + } + return Ok(resp); + } + if let Some(p) = st.procs.get(req.id()) { + resp.set_pid(p.pid); + resp.set_exit_status(p.exit_code); + resp.set_bundle(p.bundle.clone()); + resp.set_stdout(p.stdout.clone()); + resp.set_stderr(p.stderr.clone()); + resp.status = protobuf_enum_status(p.status); + } + Ok(resp) + } + + async fn kill(&self, _ctx: &TtrpcContext, req: api::KillRequest) -> TtrpcResult { + let id = req.id().to_string(); + let exec_id = req.exec_id().to_string(); + + // kubectl exec cancel/timeout: signal just the exec process. + if !exec_id.is_empty() { + let st = self.state.lock().await; + if let Some(e) = st.execs.get(&exec_id) { + if e.pid > 0 { + let _ = Command::new("kill").arg(e.pid.to_string()).output().await; + } + } + return Ok(api::Empty::new()); + } + + let st = self.state.lock().await; + if let Some(p) = st.procs.get(&id) { + if p.is_sandbox { + let _ = Command::new("kill").arg(p.pid.to_string()).output().await; + } else { + let _ = a3s_box_cmd() + .args(["stop", &id]) + .output() + .await; + } + } + Ok(api::Empty::new()) + } + + async fn delete( + &self, + _ctx: &TtrpcContext, + req: api::DeleteRequest, + ) -> TtrpcResult { + let id = req.id().to_string(); + let exec_id = req.exec_id().to_string(); + + // kubectl exec teardown: just drop the exec record. + if !exec_id.is_empty() { + let mut st = self.state.lock().await; + let (pid, code) = st + .execs + .remove(&exec_id) + .map(|e| (e.pid, e.exit_code)) + .unwrap_or((0, 0)); + let mut resp = api::DeleteResponse::new(); + resp.set_pid(pid); + resp.set_exit_status(code); + return Ok(resp); + } + + let mut st = self.state.lock().await; + let (pid, code) = st + .procs + .remove(&id) + .map(|p| (p.pid, p.exit_code)) + .unwrap_or((0, 0)); + drop(st); + let _ = a3s_box_cmd() + .args(["rm", "-f", &id]) + .output() + .await; + let mut resp = api::DeleteResponse::new(); + resp.set_pid(pid); + resp.set_exit_status(code); + Ok(resp) + } + + async fn connect( + &self, + _ctx: &TtrpcContext, + _req: api::ConnectRequest, + ) -> TtrpcResult { + let mut resp = api::ConnectResponse::new(); + resp.set_shim_pid(std::process::id()); + Ok(resp) + } + + async fn shutdown( + &self, + _ctx: &TtrpcContext, + _req: api::ShutdownRequest, + ) -> TtrpcResult { + self.exit.signal(); + Ok(api::Empty::new()) + } +} + +fn ttrpc_err(msg: String) -> ttrpc::Error { + ttrpc::Error::Others(msg) +} + +/// Map our small status int to the protobuf Status enum field type. +fn protobuf_enum_status(s: i32) -> protobuf::EnumOrUnknown { + let st = match s { + 1 => api::Status::CREATED, + 2 => api::Status::RUNNING, + 3 => api::Status::STOPPED, + _ => api::Status::UNKNOWN, + }; + protobuf::EnumOrUnknown::new(st) +} + +#[cfg(test)] +mod tests { + use super::*; + use containerd_shim::TtrpcContext; + + fn svc() -> Service { + Service { + id: "shim-test".into(), + namespace: "k8s.io".into(), + exit: Arc::new(ExitSignal::default()), + state: Arc::new(Mutex::new(State::default())), + } + } + + fn ctx() -> TtrpcContext { + TtrpcContext { + mh: Default::default(), + metadata: Default::default(), + timeout_nano: 0, + } + } + + // Write a minimal OCI bundle (config.json) and return its dir. + fn bundle(json: &str) -> String { + let dir = std::env::temp_dir().join(format!("a3sshim-{}", std::process::id())); + let sub = dir.join(format!("{:p}", json)); + std::fs::create_dir_all(&sub).unwrap(); + std::fs::write(sub.join("config.json"), json).unwrap(); + sub.to_string_lossy().to_string() + } + + #[tokio::test] + async fn create_workload_then_state() { + let s = svc(); + let b = bundle( + r#"{"annotations":{"io.kubernetes.cri.image-name":"alpine", + "io.kubernetes.cri.container-type":"container"}, + "process":{"args":["sleep","1"]}}"#, + ); + let mut req = api::CreateTaskRequest::new(); + req.set_id("c1".into()); + req.set_bundle(b); + let resp = s.create(&ctx(), req).await.unwrap(); + assert!(resp.pid() > 0); + // state reports the created container + let mut sreq = api::StateRequest::new(); + sreq.set_id("c1".into()); + let st = s.state(&ctx(), sreq).await.unwrap(); + assert_eq!(st.id(), "c1"); + } + + #[tokio::test] + async fn create_sandbox_is_marked() { + let s = svc(); + let b = bundle(r#"{"annotations":{"io.kubernetes.cri.container-type":"sandbox"}}"#); + let mut req = api::CreateTaskRequest::new(); + req.set_id("sb".into()); + req.set_bundle(b); + s.create(&ctx(), req).await.unwrap(); + let g = s.state.lock().await; + assert!(g.procs.get("sb").unwrap().is_sandbox); + } + + #[tokio::test] + async fn exec_registers_then_delete_removes() { + let s = svc(); + let mut req = api::ExecProcessRequest::new(); + req.set_id("c1".into()); + req.set_exec_id("e1".into()); + let mut any = containerd_shim_protos::protobuf::well_known_types::any::Any::new(); + any.value = br#"{"args":["echo","hi"]}"#.to_vec(); + req.spec = containerd_shim_protos::protobuf::MessageField::some(any); + s.exec(&ctx(), req).await.unwrap(); + { + let g = s.state.lock().await; + assert_eq!(g.execs.get("e1").unwrap().args, vec!["echo", "hi"]); + } + // delete exec branch removes the record + let mut dreq = api::DeleteRequest::new(); + dreq.set_id("c1".into()); + dreq.set_exec_id("e1".into()); + s.delete(&ctx(), dreq).await.unwrap(); + assert!(s.state.lock().await.execs.get("e1").is_none()); + } + + #[tokio::test] + async fn state_exec_branch_reports_created() { + let s = svc(); + s.state.lock().await.execs.insert( + "e9".into(), + ExecProc { container_id: "c1".into(), ..Default::default() }, + ); + let mut req = api::StateRequest::new(); + req.set_id("c1".into()); + req.set_exec_id("e9".into()); + let st = s.state(&ctx(), req).await.unwrap(); + assert_eq!(st.exec_id(), "e9"); + } + + #[tokio::test] + async fn connect_and_shutdown() { + let s = svc(); + let c = s.connect(&ctx(), api::ConnectRequest::new()).await.unwrap(); + assert!(c.shim_pid() > 0); + s.shutdown(&ctx(), api::ShutdownRequest::new()).await.unwrap(); + // shutdown fired the exit signal; Shim::wait would now return. + } + + #[tokio::test] + async fn kill_exec_branch_no_pid_is_noop() { + let s = svc(); + s.state.lock().await.execs.insert( + "e0".into(), + ExecProc { container_id: "c1".into(), pid: 0, ..Default::default() }, + ); + let mut req = api::KillRequest::new(); + req.set_id("c1".into()); + req.set_exec_id("e0".into()); + s.kill(&ctx(), req).await.unwrap(); // pid 0 => no signal sent + } + + #[tokio::test] + async fn start_unknown_container_errors() { + let s = svc(); + let mut req = api::StartRequest::new(); + req.set_id("missing".into()); + assert!(s.start(&ctx(), req).await.is_err()); + } + + #[tokio::test] + async fn start_unknown_exec_errors() { + let s = svc(); + let mut req = api::StartRequest::new(); + req.set_id("c1".into()); + req.set_exec_id("nope".into()); + assert!(s.start(&ctx(), req).await.is_err()); + } + + #[tokio::test] + async fn wait_exec_no_child_returns_zero() { + let s = svc(); + s.state.lock().await.execs.insert( + "e2".into(), + ExecProc { container_id: "c1".into(), ..Default::default() }, + ); + let mut req = api::WaitRequest::new(); + req.set_id("c1".into()); + req.set_exec_id("e2".into()); + let r = s.wait(&ctx(), req).await.unwrap(); + assert_eq!(r.exit_status(), 0); + } + + #[tokio::test] + async fn start_sandbox_then_kill_reaps() { + let s = svc(); + s.state.lock().await.procs.insert( + "sb".into(), + Proc { is_sandbox: true, status: 1, ..Default::default() }, + ); + let mut sreq = api::StartRequest::new(); + sreq.set_id("sb".into()); + let r = s.start(&ctx(), sreq).await.unwrap(); + assert!(r.pid() > 0); + // kill sandbox branch signals the placeholder pid. + let mut kreq = api::KillRequest::new(); + kreq.set_id("sb".into()); + s.kill(&ctx(), kreq).await.unwrap(); + } + + #[tokio::test] + async fn start_exec_spawns_then_wait_reaps() { + let s = svc(); + // `a3s-box exec` on a missing box errors fast, but the spawn + fifo paths + // are exercised. stdout points at a real temp file (open-for-write branch). + let tmp = std::env::temp_dir().join(format!("a3sout-{}.log", std::process::id())); + let _ = std::fs::File::create(&tmp); + s.state.lock().await.execs.insert( + "ex".into(), + ExecProc { + container_id: "nobox".into(), + args: vec!["true".into()], + stdout: tmp.to_string_lossy().to_string(), + ..Default::default() + }, + ); + let mut sreq = api::StartRequest::new(); + sreq.set_id("nobox".into()); + sreq.set_exec_id("ex".into()); + assert!(s.start(&ctx(), sreq).await.unwrap().pid() > 0); + let mut wreq = api::WaitRequest::new(); + wreq.set_id("nobox".into()); + wreq.set_exec_id("ex".into()); + let _ = s.wait(&ctx(), wreq).await.unwrap(); // reaps the exec child + let _ = std::fs::remove_file(&tmp); + } + + #[tokio::test] + async fn delete_container_runs_rm() { + let s = svc(); + s.state + .lock() + .await + .procs + .insert("cdel".into(), Proc { status: 2, ..Default::default() }); + let mut req = api::DeleteRequest::new(); + req.set_id("cdel".into()); + s.delete(&ctx(), req).await.unwrap(); // spawns `a3s-box rm -f cdel` + assert!(s.state.lock().await.procs.get("cdel").is_none()); + } + + #[tokio::test] + async fn kill_container_runs_stop() { + let s = svc(); + s.state.lock().await.procs.insert( + "ckill".into(), + Proc { is_sandbox: false, status: 2, ..Default::default() }, + ); + let mut req = api::KillRequest::new(); + req.set_id("ckill".into()); + s.kill(&ctx(), req).await.unwrap(); // spawns `a3s-box stop ckill` + } + + #[tokio::test] + async fn delete_shim_runs_rm() { + let mut s = svc(); + let r = s.delete_shim().await.unwrap(); + assert_eq!(r.exit_status(), 0); + } +} diff --git a/deploy/shim/containerd-a3s-box.toml b/deploy/shim/containerd-a3s-box.toml new file mode 100644 index 00000000..dbd2a742 --- /dev/null +++ b/deploy/shim/containerd-a3s-box.toml @@ -0,0 +1,17 @@ +# Additive containerd runtime handler for a3s-box (containerd 2.x config schema). +# +# Merge this block into /etc/containerd/config.toml. It ONLY adds a new runtime +# named `a3s-box`; it does NOT change `default_runtime_name` (stays `runc`), so +# every existing pod keeps using runc. Only pods with runtimeClassName: a3s-box +# are routed to the shim. +# +# After merging: systemctl restart containerd (running containers survive; +# brief CRI-unavailable window only). +# +# Verify: containerd config dump | grep -A3 runtimes.a3s-box + +[plugins.'io.containerd.cri.v1.runtime'.containerd.runtimes.a3s-box] + runtime_type = 'io.containerd.a3s-box.v2' + # pod = one sandbox; the shim maps the CRI pause container to a placeholder + # and each workload container to an `a3s-box run` MicroVM. + [plugins.'io.containerd.cri.v1.runtime'.containerd.runtimes.a3s-box.options] diff --git a/deploy/shim/runtimeclass.yaml b/deploy/shim/runtimeclass.yaml new file mode 100644 index 00000000..f78f6b3f --- /dev/null +++ b/deploy/shim/runtimeclass.yaml @@ -0,0 +1,19 @@ +# RuntimeClass that routes pods to a3s-box via the containerd shim. +# `handler: a3s-box` must match the runtime name registered in containerd's +# config.toml (plugins.'io.containerd.cri.v1.runtime'.containerd.runtimes.a3s-box), +# which points at runtime_type io.containerd.a3s-box.v2 -> binary +# containerd-shim-a3s-box-v2 on the node PATH. +apiVersion: node.k8s.io/v1 +kind: RuntimeClass +metadata: + name: a3s-box +handler: a3s-box +# Per-pod overhead the scheduler accounts for the MicroVM wrapper. +overhead: + podFixed: + cpu: "50m" + memory: "64Mi" +# Only schedule onto nodes that actually run the a3s-box shim + have /dev/kvm. +scheduling: + nodeSelector: + a3s-box.io/runtime: "true" diff --git a/deploy/shim/soak-complex.yaml b/deploy/shim/soak-complex.yaml new file mode 100644 index 00000000..05499ec3 --- /dev/null +++ b/deploy/shim/soak-complex.yaml @@ -0,0 +1,83 @@ +# Complex containers exercised in-place via UNIX sockets (TSI networking cannot +# reach a container's own TCP loopback, so clients use local sockets, which TSI +# does not intercept). redis + postgres run continuous verified load; nginx + +# python run as complex multi-process services. +--- +apiVersion: v1 +kind: Pod +metadata: { name: cplx-redis, namespace: default, labels: { soak: cplx } } +spec: + runtimeClassName: a3s-box + nodeName: worker-5 + tolerations: [{ key: node.kubernetes.io/unschedulable, operator: Exists, effect: NoSchedule }] + restartPolicy: Never + containers: + - name: c + image: docker.m.daocloud.io/library/redis:7-alpine + command: + - sh + - -c + - | + redis-server --unixsocket /tmp/r.sock --port 0 --save "" --appendonly no & + sleep 4 + n=0; ok=0; bad=0 + while true; do + redis-cli -s /tmp/r.sock set k$((n%500)) v$n >/dev/null 2>&1 + v=$(redis-cli -s /tmp/r.sock get k$((n%500)) 2>/dev/null) + if [ "$v" = "v$n" ]; then ok=$((ok+1)); else bad=$((bad+1)); fi + n=$((n+1)) + [ $((n%1000)) -eq 0 ] && echo "REDIS_SOAK ops=$n ok=$ok bad=$bad" + done +--- +apiVersion: v1 +kind: Pod +metadata: { name: cplx-postgres, namespace: default, labels: { soak: cplx } } +spec: + runtimeClassName: a3s-box + nodeName: worker-5 + tolerations: [{ key: node.kubernetes.io/unschedulable, operator: Exists, effect: NoSchedule }] + restartPolicy: Never + containers: + - name: c + image: docker.m.daocloud.io/library/postgres:16-alpine + env: [{ name: POSTGRES_PASSWORD, value: soaktest }] + command: + - sh + - -c + - | + docker-entrypoint.sh postgres & + for i in $(seq 1 90); do pg_isready -q && break; sleep 1; done + psql -U postgres -c "CREATE TABLE IF NOT EXISTS t(id serial primary key, v text)" + n=0 + while true; do + psql -U postgres -c "INSERT INTO t(v) VALUES ('soak'||$n)" >/dev/null 2>&1 + c=$(psql -U postgres -tAc "SELECT count(*) FROM t" 2>/dev/null) + n=$((n+1)) + [ $((n%200)) -eq 0 ] && echo "PG_SOAK inserts=$n rows=$c" + done +--- +apiVersion: v1 +kind: Pod +metadata: { name: cplx-nginx, namespace: default, labels: { soak: cplx } } +spec: + runtimeClassName: a3s-box + nodeName: worker-5 + tolerations: [{ key: node.kubernetes.io/unschedulable, operator: Exists, effect: NoSchedule }] + restartPolicy: Never + containers: + - name: c + image: docker.m.daocloud.io/library/nginx:1.27-alpine + command: ["nginx", "-g", "daemon off;"] +--- +apiVersion: v1 +kind: Pod +metadata: { name: cplx-python, namespace: default, labels: { soak: cplx } } +spec: + runtimeClassName: a3s-box + nodeName: worker-5 + tolerations: [{ key: node.kubernetes.io/unschedulable, operator: Exists, effect: NoSchedule }] + restartPolicy: Never + containers: + - name: c + image: docker.m.daocloud.io/library/python:3.12-alpine + command: ["python3", "-c", "import http.server,socketserver,threading,time; threading.Thread(target=lambda:socketserver.TCPServer(('127.0.0.1',8080),http.server.SimpleHTTPRequestHandler).serve_forever(),daemon=True).start(); i=0\nwhile True:\n i+=1; time.sleep(2)\n if i%30==0: print('PY_SOAK alive=%ds'%(i*2),flush=True)"] diff --git a/deploy/shim/test-pod.yaml b/deploy/shim/test-pod.yaml new file mode 100644 index 00000000..e0f1ae91 --- /dev/null +++ b/deploy/shim/test-pod.yaml @@ -0,0 +1,24 @@ +# Smoke pod: routed to a3s-box via RuntimeClass, pinned to the test node. +# worker-5 is cordoned, so we tolerate the unschedulable taint and set nodeName +# to land there directly. worker-5 must be labeled a3s-box.io/runtime=true +# (the RuntimeClass scheduling.nodeSelector is injected into the pod). +apiVersion: v1 +kind: Pod +metadata: + name: a3s-box-smoke + namespace: default +spec: + runtimeClassName: a3s-box + nodeName: worker-5 + tolerations: + - key: node.kubernetes.io/unschedulable + operator: Exists + effect: NoSchedule + containers: + - name: hello + image: docker.io/library/alpine:latest + command: ["sh", "-c", "echo HELLO_FROM_A3S_BOX; uname -a; sleep 3600"] + resources: + requests: { cpu: "250m", memory: "128Mi" } + limits: { cpu: "1", memory: "512Mi" } + restartPolicy: Never diff --git a/src/Cargo.toml b/src/Cargo.toml index 31c1d51c..05a48b6c 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -23,7 +23,7 @@ resolver = "2" h2 = { path = "third_party/h2" } [workspace.package] -version = "2.5.2" +version = "2.6.0" edition = "2021" authors = ["A3S Lab Team"] license = "MIT" diff --git a/src/deps/libkrun-sys/build.rs b/src/deps/libkrun-sys/build.rs index e794909e..846c0bd8 100644 --- a/src/deps/libkrun-sys/build.rs +++ b/src/deps/libkrun-sys/build.rs @@ -419,8 +419,27 @@ fn configure_linking(libkrun_dir: &Path, libkrunfw_dir: &Path) { fn download_file(url: &str, dest: &Path) -> io::Result<()> { println!("cargo:warning=Downloading {}...", url); + // Retry + abort-on-stall: some networks intermittently stall on large GitHub + // release downloads, and a bare curl with no timeout hangs forever. Retry and + // kill transfers that drop below 2KB/s for 30s so a stalled pull self-heals. let output = Command::new("curl") - .args(["-fsSL", "-o", dest.to_str().unwrap(), url]) + .args([ + "-fsSL", + "--retry", + "20", + "--retry-all-errors", + "--retry-delay", + "3", + "--connect-timeout", + "20", + "--speed-limit", + "2048", + "--speed-time", + "30", + "-o", + dest.to_str().unwrap(), + url, + ]) .output()?; if !output.status.success() { diff --git a/src/runtime/src/vmm/controller.rs b/src/runtime/src/vmm/controller.rs index dfb416ab..b50be96f 100644 --- a/src/runtime/src/vmm/controller.rs +++ b/src/runtime/src/vmm/controller.rs @@ -475,6 +475,23 @@ impl VmmProvider for VmController { cmd.env("PATH", path); } + // Put the VMM shim in its own session/process-group so it survives teardown + // of the launcher's session — e.g. a containerd-shim foreground `a3s-box run` + // whose process group is reaped on container kill. Without this the libkrun + // shim (which owns the box's exec.sock) dies with the launcher and `a3s-box + // exec` fails with "exec socket missing". + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + unsafe { + cmd.pre_exec(|| { + // setsid() fails harmlessly if already a group leader; ignore. + libc::setsid(); + Ok(()) + }); + } + } + let child = cmd.spawn().map_err(|e| BoxError::BoxBootError { message: format!("Failed to spawn shim: {}", e), hint: Some(format!("Shim path: {}", self.shim_path.display())), From adb8116200dd55f40d795a430e2ec309d1598669 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sat, 27 Jun 2026 09:31:52 +0800 Subject: [PATCH 2/6] fix(shim): kubectl exec works end-to-end on RuntimeClass=a3s-box Makes the containerd-shim-a3s-box-v2 path actually serve `kubectl exec` against libkrun MicroVMs, plus the v2.6.0 network/block-IO stats feature. containerd shim: - Stage container env in a rootfs file (BOX_EXEC_ENV_FILE) instead of inlining it: Kubernetes injects ~150 service env vars per pod, which overflow the guest kernel cmdline (COMMAND_LINE_SIZE) and silently break boot. Only a small pointer rides the cmdline now. - Gate Task.wait()'s inspect-poll on the box being confirmed running: containerd calls Wait() concurrently with (and before) Start finishes, during which `inspect` returns "No such container"; treating that as a terminal exit made containerd kill the box it was still starting. - Launch the box as a transient systemd unit so it runs under a clean rlimit/memlock context (libkrun must mlock guest RAM for KVM). - Hold the CRI stdout/stderr FIFO write ends open so the task persists. Also includes the network + block-IO stats feature for `a3s-box stats/top/ps` (net_stats_path threaded through vmm/netproxy/passt). Validated: kubectl exec end-to-end 4/4 on the production cluster; full `src` workspace lib tests green (cargo test --workspace --lib). --- containerd-shim/Cargo.toml | 1 + containerd-shim/src/logic.rs | 83 +++--- containerd-shim/src/main.rs | 14 +- containerd-shim/src/service.rs | 379 ++++++++++++++++++++---- src/Cargo.lock | 21 +- src/cli/src/commands/ps.rs | 82 +++++- src/cli/src/commands/stats.rs | 492 +++++++++++++++++++++++++++++-- src/cli/src/commands/top.rs | 172 ++++++++++- src/core/src/vmm.rs | 9 + src/guest/init/src/main.rs | 26 +- src/netproxy/Cargo.toml | 1 + src/netproxy/src/lib.rs | 142 ++++++++- src/runtime/src/network/passt.rs | 20 ++ src/runtime/src/vm/network.rs | 10 +- src/runtime/src/vm/spec.rs | 59 ++-- src/shim/src/main.rs | 2 + 16 files changed, 1328 insertions(+), 185 deletions(-) diff --git a/containerd-shim/Cargo.toml b/containerd-shim/Cargo.toml index 7c849ba8..595fa7d9 100644 --- a/containerd-shim/Cargo.toml +++ b/containerd-shim/Cargo.toml @@ -21,6 +21,7 @@ containerd-shim-protos = { version = "0.11", features = ["async"] } async-trait = "0.1" tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "sync", "io-util", "fs", "time"] } log = "0.4" +libc = "0.2" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/containerd-shim/src/logic.rs b/containerd-shim/src/logic.rs index 404c0d43..561ed04a 100644 --- a/containerd-shim/src/logic.rs +++ b/containerd-shim/src/logic.rs @@ -39,18 +39,31 @@ pub fn parse_spec(spec: &Value) -> ParsedSpec { .and_then(|v| v.as_u64()) .map(|b| (b / (1024 * 1024)).max(1)); let cpus = match ( - spec.pointer("/linux/resources/cpu/quota").and_then(|v| v.as_i64()), - spec.pointer("/linux/resources/cpu/period").and_then(|v| v.as_u64()), + spec.pointer("/linux/resources/cpu/quota") + .and_then(|v| v.as_i64()), + spec.pointer("/linux/resources/cpu/period") + .and_then(|v| v.as_u64()), ) { (Some(q), Some(p)) if q > 0 && p > 0 => Some(((q as f64 / p as f64).ceil()) as u32), _ => None, }; - ParsedSpec { is_sandbox, image, args, env, cpus, memory_mb } + ParsedSpec { + is_sandbox, + image, + args, + env, + cpus, + memory_mb, + } } fn string_array(v: Option<&Value>) -> Vec { v.and_then(|a| a.as_array()) - .map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect()) + .map(|a| { + a.iter() + .filter_map(|x| x.as_str().map(String::from)) + .collect() + }) .unwrap_or_default() } @@ -63,12 +76,12 @@ pub fn run_args( env: &[String], cmd: &[String], ) -> Vec { - let mut v = vec![ - "run".to_string(), - "-d".to_string(), - "--name".to_string(), - id.to_string(), - ]; + // Foreground `run` (no -d). This is launched as the main process of a transient + // systemd unit (see service.rs): foreground run blocks until the box exits, so + // the unit stays active for the container's whole life. A detached `run -d` + // would exit right after boot, the unit would deactivate, and that exit gets + // reported as the container exiting — tearing the task down seconds after start. + let mut v = vec!["run".to_string(), "--name".to_string(), id.to_string()]; if let Some(c) = cpus { v.push("--cpus".to_string()); v.push(c.to_string()); @@ -120,21 +133,6 @@ pub fn a3s_home() -> String { std::env::var("A3S_HOME").unwrap_or_else(|_| "/var/lib/a3s-box".to_string()) } -/// Numeric box status -> name (mirrors the protobuf Status enum we report). -pub fn status_name(s: i32) -> &'static str { - match s { - 1 => "created", - 2 => "running", - 3 => "stopped", - _ => "unknown", - } -} - -/// Parse an exit code printed by `a3s-box wait` (its stdout is the integer code). -pub fn parse_wait_exit_code(stdout: &str) -> u32 { - stdout.trim().parse::().unwrap_or(0) as u32 -} - #[cfg(test)] mod tests { use super::*; @@ -199,7 +197,7 @@ mod tests { fn run_args_minimal() { assert_eq!( run_args("box1", "alpine", None, None, &[], &[]), - vec!["run", "-d", "--name", "box1", "alpine"] + vec!["run", "--name", "box1", "alpine"] ); } @@ -216,8 +214,20 @@ mod tests { assert_eq!( v, vec![ - "run", "-d", "--name", "box1", "--cpus", "2", "--memory", "256m", "-e", "A=1", - "redis:7", "--", "redis-server", "--port", "0" + "run", + "--name", + "box1", + "--cpus", + "2", + "--memory", + "256m", + "-e", + "A=1", + "redis:7", + "--", + "redis-server", + "--port", + "0" ] ); } @@ -247,23 +257,6 @@ mod tests { assert!(!is_running("")); } - #[test] - fn status_name_maps() { - assert_eq!(status_name(1), "created"); - assert_eq!(status_name(2), "running"); - assert_eq!(status_name(3), "stopped"); - assert_eq!(status_name(0), "unknown"); - assert_eq!(status_name(99), "unknown"); - } - - #[test] - fn parse_wait_exit_code_handles_garbage() { - assert_eq!(parse_wait_exit_code("0\n"), 0); - assert_eq!(parse_wait_exit_code(" 137 "), 137); - assert_eq!(parse_wait_exit_code("notanumber"), 0); - assert_eq!(parse_wait_exit_code(""), 0); - } - #[test] fn a3s_home_default() { // Default applies when unset (don't mutate global env in parallel tests). diff --git a/containerd-shim/src/main.rs b/containerd-shim/src/main.rs index e44ed40b..614ed4db 100644 --- a/containerd-shim/src/main.rs +++ b/containerd-shim/src/main.rs @@ -21,5 +21,17 @@ use service::Service; #[tokio::main] async fn main() { - containerd_shim::asynchronous::run::("io.containerd.a3s-box.v2", None).await; + // Pass the Config via `opts` (NOT from Shim::new — the framework reads it to + // set up signals + subreaper BEFORE calling Shim::new, so changes there are + // no-ops). Disable the framework's child reaper and subreaper: it sets + // PR_SET_CHILD_SUBREAPER and a SIGCHLD `waitpid(-1)` loop that reaps the + // short-lived `a3s-box run -d` CLI and publishes a spurious container-exit + // seconds after start, making containerd kill the still-running box. We hold + // each task's process as a tokio Child and reap it ourselves in Task.wait(). + let config = containerd_shim::Config { + no_reaper: true, + no_sub_reaper: true, + ..Default::default() + }; + containerd_shim::asynchronous::run::("io.containerd.a3s-box.v2", Some(config)).await; } diff --git a/containerd-shim/src/service.rs b/containerd-shim/src/service.rs index 45f2fd5d..7369451f 100644 --- a/containerd-shim/src/service.rs +++ b/containerd-shim/src/service.rs @@ -39,9 +39,42 @@ fn a3s_box_bin() -> String { fn a3s_box_cmd() -> Command { let mut c = Command::new(a3s_box_bin()); c.env("A3S_HOME", logic::a3s_home()); + harden_child(&mut c); c } +/// Prepare a child (a3s-box → its libkrun VMM) to match a shell-launched one. +/// +/// containerd runs the shim under tight rlimits: RLIMIT_MEMLOCK is the 8 MiB +/// system default. libkrun must mlock the guest's RAM for KVM, so at 8 MiB the +/// guest boots only partially — no console output, and the exec-server vsock +/// never comes up, so exec.sock is never bound and `kubectl exec` can't reach +/// the guest. A root shell has ~unlimited memlock and works; we raise the +/// child's MEMLOCK to match. NOFILE is bumped for large VMs. We also close +/// inherited fds (the ttrpc socket, tokio epoll, FIFOs) so the VMM starts with +/// a clean table. All ops here are async-signal-safe (safe post-fork/pre-exec). +fn harden_child(cmd: &mut Command) { + // tokio::process::Command provides `pre_exec` inherently (no CommandExt import). + unsafe { + cmd.pre_exec(|| { + let unlimited = libc::rlimit { + rlim_cur: libc::RLIM_INFINITY, + rlim_max: libc::RLIM_INFINITY, + }; + libc::setrlimit(libc::RLIMIT_MEMLOCK, &unlimited); + let nofile = libc::rlimit { + rlim_cur: 1_048_576, + rlim_max: 1_048_576, + }; + libc::setrlimit(libc::RLIMIT_NOFILE, &nofile); + for fd in 3..1024 { + libc::close(fd); + } + Ok(()) + }); + } +} + #[derive(Default)] struct Proc { bundle: String, @@ -85,6 +118,21 @@ pub struct Service { // ExitSignal isn't Clone; share one across the Shim + its task service. exit: Arc, state: Arc>, + // Set on the task service (create_task_service) so we can emit the runtime-v2 + // lifecycle events (TaskStart/TaskExit) containerd's CRI expects. + publisher: Option>, +} + +impl Service { + /// Publish a runtime-v2 task event to containerd (best effort). + async fn publish(&self, topic: &str, event: Box) { + if let Some(p) = &self.publisher { + let ctx = ttrpc::context::Context::default(); + if let Err(e) = p.publish(ctx, topic, &self.namespace, event).await { + log::warn!("publish {topic} failed: {e}"); + } + } + } } // ---- helpers --------------------------------------------------------------- @@ -153,26 +201,38 @@ impl Shim for Service { type T = Service; async fn new(_runtime_id: &str, args: &Flags, _config: &mut Config) -> Self { + // NOTE: reaper/subreaper are disabled via the Config passed to `run()` in + // main.rs — the framework reads that Config before this method runs, so + // setting it here would be a no-op. Service { id: args.id.clone(), namespace: args.namespace.clone(), exit: Arc::new(ExitSignal::default()), state: Arc::new(Mutex::new(State::default())), + publisher: None, } } async fn start_shim(&mut self, opts: StartOpts) -> Result { - let grouping = opts.id.clone(); + // Group every container of a pod under the SAME shim as its sandbox (the CRI + // model: one shim per pod). The bundle's config.json carries the sandbox id; + // a workload container groups by it, the sandbox itself groups by its own id. + // A separate shim per container has the wrong lifecycle — the workload's shim + // tears down right after Start. + let grouping = std::fs::read_to_string("config.json") + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| { + logic::annotation(&v, "io.kubernetes.cri.sandbox-id").map(|s| s.to_string()) + }) + .unwrap_or_else(|| opts.id.clone()); let address = spawn(opts, &grouping, Vec::new()).await?; Ok(address) } async fn delete_shim(&mut self) -> Result { // Out-of-band cleanup: force-remove the box if it lingers. - let _ = a3s_box_cmd() - .args(["rm", "-f", &self.id]) - .output() - .await; + let _ = a3s_box_cmd().args(["rm", "-f", &self.id]).output().await; let mut r = api::DeleteResponse::new(); r.set_exit_status(0); Ok(r) @@ -182,8 +242,10 @@ impl Shim for Service { self.exit.wait().await; } - async fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T { - self.clone() + async fn create_task_service(&self, publisher: RemotePublisher) -> Self::T { + let mut s = self.clone(); + s.publisher = Some(Arc::new(publisher)); + s } } @@ -251,12 +313,12 @@ impl Task for Service { p.pid = child.id().unwrap_or(0); p.child = Some(child); } else { - // Workload: launch DETACHED via `setsid a3s-box run -d`, fire-and-forget. - // This replicates the interactive-shell `run -d` that BINDS exec.sock and - // orphans the libkrun shim to init. (Foreground run — and a plain - // shim-spawned `run -d` held as a child — do NOT bind exec.sock, so - // kubectl exec can't reach the guest.) We then poll the box record until - // it reports running, by which point boot's exec-readiness has completed. + // Workload: launch the MicroVM DETACHED (`a3s-box run -d`). The libkrun + // VMM daemonizes and reparents to init — OUT of this shim's process tree. + // That isolation is essential: a box kept inside the containerd-launched + // shim's tree (e.g. held as a foreground `a3s-box run`) is killed a few + // seconds after boot by the shim's process context, whereas a detached box + // survives for its full lifetime exactly as a standalone `run -d` does. let image = p .image .clone() @@ -265,42 +327,144 @@ impl Task for Service { let memory = p.memory_mb; let env = p.env.clone(); let args = p.args.clone(); + let stdout_fifo = p.stdout.clone(); + let stderr_fifo = p.stderr.clone(); drop(st); // release the lock across spawn + readiness poll - let mut cmd = Command::new("setsid"); - cmd.env("A3S_HOME", logic::a3s_home()); - cmd.arg(a3s_box_bin()); - cmd.args(logic::run_args(&id, &image, cpus, memory, &env, &args)); - cmd.stdin(Stdio::null()).stdout(Stdio::null()).stderr(Stdio::null()); - cmd.spawn() - .map_err(|e| ttrpc_err(format!("setsid a3s-box run -d spawn failed: {e}")))?; + // Hold the container's stdout/stderr FIFO write ends open. containerd's CRI + // creates these FIFOs and pumps them to the container log; the box's real + // output goes to its systemd journal, so without a writer the CRI's pump + // hits "reading from a closed fifo" and tears the container down. Opening + // them O_WRONLY|O_NONBLOCK (a reader is present) and leaking the fds keeps + // the pipes alive for the shim's lifetime. + for path in [&stdout_fifo, &stderr_fifo] { + if !path.is_empty() { + if let Ok(c) = std::ffi::CString::new(path.as_str()) { + let fd = + unsafe { libc::open(c.as_ptr(), libc::O_WRONLY | libc::O_NONBLOCK) }; + if fd >= 0 { + std::mem::forget(unsafe { std::fs::File::from_raw_fd(fd) }); + } + } + } + } + + // Launch the box as a transient systemd SERVICE (NOT `--scope`). A scope + // runs the command as a child of systemd-run — i.e. of THIS shim — so it + // inherits the shim's constrained context (RLIMIT_MEMLOCK=8 MiB, and other + // limits we could not fully enumerate) that makes the guest OOM ~5 s into + // boot ("workqueue: Failed to create a worker thread: -12"). A transient + // service is started by systemd (PID 1) with systemd's OWN default + // context, escaping the shim entirely; we also set MEMLOCK/NOFILE + // explicitly. KillMode=process keeps the daemonized libkrun VMM alive when + // `a3s-box run -d` (the unit's main process) exits. A3S_HOME is passed + // because the service does not inherit our env. (Verified: a box launched + // from the shim's context dies; one launched in a clean context survives.) + let unit = format!("a3sbox-{}", &id[..id.len().min(32)]); + let mut rcmd = Command::new("systemd-run"); + rcmd.arg("--quiet") + .arg("--no-block") // enqueue the unit and return at once; do not let + // systemd-run linger as our child until `run -d` exits (its exit would + // be read as the container exiting). + .arg(format!("--unit={unit}")) + // KillMode=process: don't kill the daemonized VMM when the unit's + // main process (`a3s-box run -d`) exits. RemainAfterExit=yes: keep the + // unit (and thus its cgroup, which holds the VMM) ACTIVE after that + // main exits — otherwise systemd garbage-collects the finished unit's + // cgroup and the VMM dies with it ~2 s after boot. + .arg("--property=KillMode=process") + .arg("--property=RemainAfterExit=yes") + .arg("--property=LimitMEMLOCK=infinity") + .arg("--property=LimitNOFILE=1048576") + .arg(format!("--setenv=A3S_HOME={}", logic::a3s_home())) + .arg(a3s_box_bin()) + .args(logic::run_args(&id, &image, cpus, memory, &env, &args)) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + // Service mode returns as soon as systemd has started the unit; the box + // then boots in the background and we poll for it below. + let run_status = rcmd + .status() + .await + .map_err(|e| ttrpc_err(format!("systemd-run a3s-box failed: {e}")))?; + if !run_status.success() { + return Err(ttrpc_err(format!( + "systemd-run a3s-box exited {run_status}" + ))); + } + // Poll until the box reports RUNNING — i.e. the guest has booted and bound + // its exec socket. Only then is `a3s-box wait` guaranteed to block (on a + // freshly-created, still-booting box it returns immediately, which the held + // child below would read as an instant task-exit and let containerd kill + // the box). The box runs in a clean systemd scope, so this ~5 s wait does + // not risk it dying meanwhile. let mut running = false; - for _ in 0..120 { + for _ in 0..160 { if let Ok(o) = a3s_box_cmd().args(["inspect", &id]).output().await { - if o.status.success() && logic::is_running(&String::from_utf8_lossy(&o.stdout)) { + if o.status.success() && logic::is_running(&String::from_utf8_lossy(&o.stdout)) + { running = true; break; } } - tokio::time::sleep(std::time::Duration::from_millis(700)).await; + tokio::time::sleep(std::time::Duration::from_millis(300)).await; } if !running { return Err(ttrpc_err(format!("box {id} did not become running"))); } - let mut st = self.state.lock().await; - if let Some(p) = st.procs.get_mut(&id) { - p.status = 2; // RUNNING - p.pid = std::process::id(); + + // Report the foreground `a3s-box run` (the systemd unit's MainPID) as the + // task's init pid: a real, distinct, long-lived host process that lives + // exactly as long as the box. containerd loads the task by this pid for + // exec, so it must NOT be the shim's own pid (that gives "no running + // task"). The framework reaper is off (no_reaper), so this non-child pid + // is never waitpid'd into a false exit; Task.wait() polls inspect for the + // real exit. + let unit_main = format!("a3sbox-{}.service", &id[..id.len().min(32)]); + let main_pid = Command::new("systemctl") + .args(["show", &unit_main, "-p", "MainPID", "--value"]) + .output() + .await + .ok() + .and_then(|o| { + String::from_utf8_lossy(&o.stdout) + .trim() + .parse::() + .ok() + }) + .filter(|&p| p > 0) + .unwrap_or_else(std::process::id); + + { + let mut st = self.state.lock().await; + if let Some(p) = st.procs.get_mut(&id) { + p.status = 2; // RUNNING + p.pid = main_pid; + p.child = None; + } } + // Tell containerd the task is running (runtime-v2 lifecycle event). The + // CRI plugin relies on this to register the task as exec-able. + let mut ev = containerd_shim_protos::events::task::TaskStart::new(); + ev.set_container_id(id.clone()); + ev.set_pid(main_pid); + self.publish("/tasks/start", Box::new(ev)).await; let mut resp = api::StartResponse::new(); - resp.set_pid(std::process::id()); + resp.set_pid(main_pid); return Ok(resp); } p.status = 2; // RUNNING (sandbox) + let sandbox_pid = p.pid; + drop(st); + let mut ev = containerd_shim_protos::events::task::TaskStart::new(); + ev.set_container_id(id.clone()); + ev.set_pid(sandbox_pid); + self.publish("/tasks/start", Box::new(ev)).await; let mut resp = api::StartResponse::new(); - resp.set_pid(p.pid); + resp.set_pid(sandbox_pid); Ok(resp) } @@ -364,20 +528,84 @@ impl Task for Service { let code = if let Some(mut c) = child { c.wait().await.ok().and_then(|s| s.code()).unwrap_or(0) as u32 } else { - // Workload box (detached): block on `a3s-box wait ` until it exits. - a3s_box_cmd() - .args(["wait", &id]) - .output() - .await - .ok() - .map(|o| logic::parse_wait_exit_code(&String::from_utf8_lossy(&o.stdout))) - .unwrap_or(0) + // Workload box: block until the box TRULY exits. `a3s-box wait` can return + // early while the box is still mid-transition (just after `run -d` + // detaches it), and reporting that to containerd as a task-exit makes it + // kill the still-live box. So after each `a3s-box wait` return, confirm + // via `inspect` that the box is actually gone; if it still reports + // running, the wait was spurious — pause and wait again. + // Workload box: block until the box reaches a TERMINAL status. Match the + // status field specifically (`"status": "stopped"|"exited"`) — a bare + // search for "stopped"/"exited" also hits a RUNNING box's exited_at/ + // exit_code fields (false exit → containerd kills the live box), and + // `!is_running` would fire on a still-booting "created" box (wait() can be + // called before boot completes). Only an explicit terminal status, or the + // box record vanishing (two failed inspects), ends the wait. + let terminal = |s: &str| { + s.contains("\"status\": \"stopped\"") + || s.contains("\"status\":\"stopped\"") + || s.contains("\"status\": \"exited\"") + || s.contains("\"status\":\"exited\"") + }; + let mut missing = 0; + // containerd calls Task.Wait() right after Create — concurrently with, + // and often before, Task.Start finishes booting the box. In that window + // the box isn't registered in boxes.json yet, so `inspect` returns "No + // such container"; treating that as a terminal exit makes containerd kill + // the box it's still starting. So gate the inspect poll on the in-memory + // status (which Start sets to running once boot is confirmed): until we've + // seen the box running, just wait. None => task deleted; 3 => stopped. + let mut seen_running = false; + loop { + let mem_status = { + let st = self.state.lock().await; + st.procs.get(&id).map(|p| p.status) + }; + match mem_status { + None => break 0, // task deleted out from under us + Some(3) => break 0, // marked stopped elsewhere + Some(s) if s >= 2 => seen_running = true, + Some(_) => {} // created/unknown: not booted yet + } + if !seen_running { + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + continue; + } + let brk = match a3s_box_cmd().args(["inspect", &id]).output().await { + Ok(o) if o.status.success() => { + let term = terminal(&String::from_utf8_lossy(&o.stdout)); + if !term { + missing = 0; + } + term + } + // Two consecutive inspect failures => the box record is gone. + _ => { + missing += 1; + missing >= 2 + } + }; + if brk { + break 0; + } + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } }; - let mut st = self.state.lock().await; - if let Some(p) = st.procs.get_mut(&id) { - p.status = 3; // STOPPED - p.exit_code = code; - } + let pid = { + let mut st = self.state.lock().await; + st.procs.get_mut(&id).map(|p| { + p.status = 3; // STOPPED + p.exit_code = code; + p.pid + }) + }; + // Now that the box has truly exited, tell containerd (runtime-v2 lifecycle). + let mut ev = containerd_shim_protos::events::task::TaskExit::new(); + ev.set_container_id(id.clone()); + ev.set_id(id.clone()); + ev.set_pid(pid.unwrap_or(0)); + ev.set_exit_status(code); + self.publish("/tasks/exit", Box::new(ev)).await; let mut resp = api::WaitResponse::new(); resp.set_exit_status(code); Ok(resp) @@ -442,10 +670,7 @@ impl Task for Service { if p.is_sandbox { let _ = Command::new("kill").arg(p.pid.to_string()).output().await; } else { - let _ = a3s_box_cmd() - .args(["stop", &id]) - .output() - .await; + let _ = a3s_box_cmd().args(["stop", &id]).output().await; } } Ok(api::Empty::new()) @@ -480,8 +705,17 @@ impl Task for Service { .map(|p| (p.pid, p.exit_code)) .unwrap_or((0, 0)); drop(st); - let _ = a3s_box_cmd() - .args(["rm", "-f", &id]) + let _ = a3s_box_cmd().args(["rm", "-f", &id]).output().await; + // Tear down the transient systemd unit that hosts the box (RemainAfterExit + // keeps it active, so it must be stopped explicitly) and forget any failed + // state, so the unit name is free for a future container with the same id. + let unit = format!("a3sbox-{}.service", &id[..id.len().min(32)]); + let _ = Command::new("systemctl") + .args(["stop", &unit]) + .output() + .await; + let _ = Command::new("systemctl") + .args(["reset-failed", &unit]) .output() .await; let mut resp = api::DeleteResponse::new(); @@ -536,6 +770,7 @@ mod tests { namespace: "k8s.io".into(), exit: Arc::new(ExitSignal::default()), state: Arc::new(Mutex::new(State::default())), + publisher: None, } } @@ -607,7 +842,7 @@ mod tests { dreq.set_id("c1".into()); dreq.set_exec_id("e1".into()); s.delete(&ctx(), dreq).await.unwrap(); - assert!(s.state.lock().await.execs.get("e1").is_none()); + assert!(!s.state.lock().await.execs.contains_key("e1")); } #[tokio::test] @@ -615,7 +850,10 @@ mod tests { let s = svc(); s.state.lock().await.execs.insert( "e9".into(), - ExecProc { container_id: "c1".into(), ..Default::default() }, + ExecProc { + container_id: "c1".into(), + ..Default::default() + }, ); let mut req = api::StateRequest::new(); req.set_id("c1".into()); @@ -629,7 +867,9 @@ mod tests { let s = svc(); let c = s.connect(&ctx(), api::ConnectRequest::new()).await.unwrap(); assert!(c.shim_pid() > 0); - s.shutdown(&ctx(), api::ShutdownRequest::new()).await.unwrap(); + s.shutdown(&ctx(), api::ShutdownRequest::new()) + .await + .unwrap(); // shutdown fired the exit signal; Shim::wait would now return. } @@ -638,7 +878,11 @@ mod tests { let s = svc(); s.state.lock().await.execs.insert( "e0".into(), - ExecProc { container_id: "c1".into(), pid: 0, ..Default::default() }, + ExecProc { + container_id: "c1".into(), + pid: 0, + ..Default::default() + }, ); let mut req = api::KillRequest::new(); req.set_id("c1".into()); @@ -668,7 +912,10 @@ mod tests { let s = svc(); s.state.lock().await.execs.insert( "e2".into(), - ExecProc { container_id: "c1".into(), ..Default::default() }, + ExecProc { + container_id: "c1".into(), + ..Default::default() + }, ); let mut req = api::WaitRequest::new(); req.set_id("c1".into()); @@ -682,7 +929,11 @@ mod tests { let s = svc(); s.state.lock().await.procs.insert( "sb".into(), - Proc { is_sandbox: true, status: 1, ..Default::default() }, + Proc { + is_sandbox: true, + status: 1, + ..Default::default() + }, ); let mut sreq = api::StartRequest::new(); sreq.set_id("sb".into()); @@ -724,15 +975,17 @@ mod tests { #[tokio::test] async fn delete_container_runs_rm() { let s = svc(); - s.state - .lock() - .await - .procs - .insert("cdel".into(), Proc { status: 2, ..Default::default() }); + s.state.lock().await.procs.insert( + "cdel".into(), + Proc { + status: 2, + ..Default::default() + }, + ); let mut req = api::DeleteRequest::new(); req.set_id("cdel".into()); s.delete(&ctx(), req).await.unwrap(); // spawns `a3s-box rm -f cdel` - assert!(s.state.lock().await.procs.get("cdel").is_none()); + assert!(!s.state.lock().await.procs.contains_key("cdel")); } #[tokio::test] @@ -740,7 +993,11 @@ mod tests { let s = svc(); s.state.lock().await.procs.insert( "ckill".into(), - Proc { is_sandbox: false, status: 2, ..Default::default() }, + Proc { + is_sandbox: false, + status: 2, + ..Default::default() + }, ); let mut req = api::KillRequest::new(); req.set_id("ckill".into()); diff --git a/src/Cargo.lock b/src/Cargo.lock index e15034bf..fec1b763 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "a3s-box-cli" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-box-runtime", @@ -36,7 +36,7 @@ dependencies = [ [[package]] name = "a3s-box-core" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-common", "async-trait", @@ -54,7 +54,7 @@ dependencies = [ [[package]] name = "a3s-box-cri" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-box-runtime", @@ -84,7 +84,7 @@ dependencies = [ [[package]] name = "a3s-box-guest-init" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-common", @@ -107,7 +107,7 @@ dependencies = [ [[package]] name = "a3s-box-lambda" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-box-runtime", @@ -126,10 +126,11 @@ dependencies = [ [[package]] name = "a3s-box-netproxy" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "libc", + "serde_json", "smoltcp", "tempfile", "tracing", @@ -137,7 +138,7 @@ dependencies = [ [[package]] name = "a3s-box-runtime" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-box-netproxy", @@ -184,11 +185,11 @@ dependencies = [ [[package]] name = "a3s-box-sdk" -version = "2.5.2" +version = "2.6.0" [[package]] name = "a3s-box-shim" -version = "2.5.2" +version = "2.6.0" dependencies = [ "a3s-box-core", "a3s-box-netproxy", @@ -222,7 +223,7 @@ dependencies = [ [[package]] name = "a3s-libkrun-sys" -version = "2.5.2" +version = "2.6.0" dependencies = [ "libc", "num_cpus", diff --git a/src/cli/src/commands/ps.rs b/src/cli/src/commands/ps.rs index f550df0c..41d404da 100644 --- a/src/cli/src/commands/ps.rs +++ b/src/cli/src/commands/ps.rs @@ -16,8 +16,8 @@ pub struct PsArgs { #[arg(short, long)] pub quiet: bool, - /// Format output using placeholders: {{.ID}}, {{.Image}}, {{.Status}}, - /// {{.Created}}, {{.Names}}, {{.Ports}}, {{.Command}} + /// Format output as `json` or using placeholders: {{.ID}}, {{.Image}}, + /// {{.Status}}, {{.Created}}, {{.Names}}, {{.Ports}}, {{.Command}} #[arg(long)] pub format: Option, @@ -44,8 +44,13 @@ pub async fn execute(args: PsArgs) -> Result<(), Box> { return Ok(()); } - // --format: custom template output + // --format: json or custom template output if let Some(ref fmt) = args.format { + if fmt.trim().eq_ignore_ascii_case("json") { + print_json(&boxes)?; + return Ok(()); + } + for record in &boxes { println!("{}", apply_format(record, fmt)); } @@ -72,6 +77,38 @@ pub async fn execute(args: PsArgs) -> Result<(), Box> { Ok(()) } +fn print_json(records: &[&BoxRecord]) -> Result<(), serde_json::Error> { + let rows = records + .iter() + .map(|record| ps_json(record)) + .collect::>(); + println!("{}", serde_json::to_string(&rows)?); + Ok(()) +} + +fn ps_json(record: &BoxRecord) -> serde_json::Value { + let status = status::format_status(record); + let ports = record.port_map.join(", "); + serde_json::json!({ + "id": &record.id, + "short_id": &record.short_id, + "name": &record.name, + "names": &record.name, + "image": &record.image, + "status": status, + "raw_status": &record.status, + "created": output::format_ago(&record.created_at), + "created_at": record.created_at.to_rfc3339(), + "started_at": record.started_at.as_ref().map(|ts| ts.to_rfc3339()), + "ports": &record.port_map, + "ports_text": ports, + "command": record.cmd.join(" "), + "labels": &record.labels, + "health": &record.health_status, + "pid": record.pid, + }) +} + /// Check if a box record matches all the given filters. /// /// Supported filters: @@ -337,6 +374,45 @@ mod tests { assert_eq!(result, ""); } + #[test] + fn test_ps_json_record() { + let mut labels = HashMap::new(); + labels.insert("env".to_string(), "prod".to_string()); + let mut record = make_record("box1", "running", labels); + record.pid = Some(4242); + record.port_map = vec!["8080:80".to_string()]; + record.cmd = vec!["sleep".to_string(), "3600".to_string()]; + record.started_at = Some(record.created_at); + record.health_check = Some(crate::state::HealthCheck { + cmd: vec!["true".to_string()], + interval_secs: 30, + timeout_secs: 5, + retries: 3, + start_period_secs: 0, + }); + record.health_status = "healthy".to_string(); + + let value = ps_json(&record); + + assert_eq!(value["id"], "test-id-box1"); + assert_eq!(value["name"], "box1"); + assert_eq!(value["names"], "box1"); + assert_eq!(value["status"], "running (healthy)"); + assert_eq!(value["raw_status"], "running"); + assert_eq!(value["ports"][0], "8080:80"); + assert_eq!(value["ports_text"], "8080:80"); + assert_eq!(value["command"], "sleep 3600"); + assert_eq!(value["labels"]["env"], "prod"); + assert_eq!(value["health"], "healthy"); + assert_eq!(value["pid"], 4242); + assert!(value["created_at"] + .as_str() + .is_some_and(|s| s.contains('T'))); + assert!(value["started_at"] + .as_str() + .is_some_and(|s| s.contains('T'))); + } + // --- existing filter tests --- #[test] diff --git a/src/cli/src/commands/stats.rs b/src/cli/src/commands/stats.rs index 2a75e854..4806830d 100644 --- a/src/cli/src/commands/stats.rs +++ b/src/cli/src/commands/stats.rs @@ -1,11 +1,19 @@ //! `a3s-box stats` command — Display live resource usage statistics. //! -//! Shows CPU and memory usage for active boxes, similar to `docker stats`. +//! Shows CPU, memory, network, and block I/O usage for active boxes, similar to `docker stats`. //! By default streams updates every second; use `--no-stream` for a single snapshot. -use clap::Args; +use clap::{Args, ValueEnum}; +use serde::Serialize; +use std::path::Path; +use std::time::Duration; use sysinfo::{Pid, System}; +#[cfg(not(windows))] +use a3s_box_core::exec::{ExecRequest, DEFAULT_EXEC_TIMEOUT_NS}; +#[cfg(not(windows))] +use a3s_box_runtime::ExecClient; + use crate::output; use crate::resolve; use crate::state::{BoxRecord, StateFile}; @@ -19,24 +27,71 @@ pub struct StatsArgs { /// Disable streaming and print a single snapshot #[arg(long)] pub no_stream: bool, + + /// Output format + #[arg(long, value_enum, default_value_t = StatsFormat::Table)] + format: StatsFormat, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, ValueEnum)] +enum StatsFormat { + Table, + Json, } +const PIDS_CURRENT_TIMEOUT: Duration = Duration::from_millis(750); + /// Collected stats for a single box. +#[derive(Serialize)] struct BoxStats { + id: String, name: String, short_id: String, status: String, pid: u32, + cpus: u32, cpu_percent: f32, memory_bytes: u64, memory_limit_bytes: u64, + network_rx_bytes: u64, + network_tx_bytes: u64, + block_read_bytes: u64, + block_write_bytes: u64, + pids_current: Option, +} + +impl BoxStats { + fn mem_percent(&self) -> f64 { + if self.memory_limit_bytes > 0 { + (self.memory_bytes as f64 / self.memory_limit_bytes as f64) * 100.0 + } else { + 0.0 + } + } + + fn scaled_cpu_percent(&self) -> f64 { + self.cpu_percent as f64 / self.cpus.max(1) as f64 + } +} + +struct ResourceStats { + cpu_percent: f32, + memory_bytes: u64, + block_read_bytes: u64, + block_write_bytes: u64, +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +struct NetworkStats { + rx_bytes: u64, + tx_bytes: u64, } /// Collect stats for a process by PID. /// /// Requires two `refresh_process` calls with a delay between them /// for accurate CPU measurement (sysinfo computes CPU as a delta). -fn collect_stats(sys: &mut System, pid: u32, memory_limit_mb: u32) -> Option<(f32, u64)> { +fn collect_stats(sys: &mut System, pid: u32) -> Option { let spid = Pid::from_u32(pid); // First refresh to establish baseline @@ -46,10 +101,13 @@ fn collect_stats(sys: &mut System, pid: u32, memory_limit_mb: u32) -> Option<(f3 sys.refresh_process(spid); sys.process(spid).map(|proc_info| { - let cpu = proc_info.cpu_usage(); - let mem = proc_info.memory(); - let _ = memory_limit_mb; // used by caller - (cpu, mem) + let disk = proc_info.disk_usage(); + ResourceStats { + cpu_percent: proc_info.cpu_usage(), + memory_bytes: proc_info.memory(), + block_read_bytes: disk.total_read_bytes, + block_write_bytes: disk.total_written_bytes, + } }) } @@ -63,15 +121,11 @@ fn print_stats(stats: &[BoxStats]) { "MEM USAGE / LIMIT", "MEM %", "PID", + "NET I/O", + "IO", ]); for s in stats { - let mem_pct = if s.memory_limit_bytes > 0 { - (s.memory_bytes as f64 / s.memory_limit_bytes as f64) * 100.0 - } else { - 0.0 - }; - table.add_row([ &s.short_id, &s.name, @@ -82,14 +136,55 @@ fn print_stats(stats: &[BoxStats]) { output::format_bytes(s.memory_bytes), output::format_bytes(s.memory_limit_bytes) ), - &format!("{:.1}%", mem_pct), + &format!("{:.1}%", s.mem_percent()), &s.pid.to_string(), + &format_io_usage(s.network_rx_bytes, s.network_tx_bytes), + &format_io_usage(s.block_read_bytes, s.block_write_bytes), ]); } println!("{table}"); } +fn print_stats_json(stats: &[BoxStats]) -> Result<(), serde_json::Error> { + let rows = stats.iter().map(stats_json).collect::>(); + println!("{}", serde_json::to_string(&rows)?); + Ok(()) +} + +fn stats_json(stats: &BoxStats) -> serde_json::Value { + serde_json::json!({ + "id": stats.id, + "short_id": stats.short_id, + "name": stats.name, + "status": stats.status, + "pid": stats.pid, + "cpus": stats.cpus, + "cpu_count": stats.cpus, + "cpu_percent": stats.cpu_percent, + "cpu_percent_scaled": stats.scaled_cpu_percent(), + "memory_bytes": stats.memory_bytes, + "memory_limit_bytes": stats.memory_limit_bytes, + "memory_percent": stats.mem_percent(), + "network_rx_bytes": stats.network_rx_bytes, + "network_tx_bytes": stats.network_tx_bytes, + "block_read_bytes": stats.block_read_bytes, + "block_write_bytes": stats.block_write_bytes, + "pids_current": stats.pids_current, + "pids": { + "current": stats.pids_current, + }, + }) +} + +fn format_io_usage(read_bytes: u64, write_bytes: u64) -> String { + format!( + "{} / {}", + output::format_bytes(read_bytes), + output::format_bytes(write_bytes) + ) +} + fn select_targets( state: &StateFile, query: Option<&str>, @@ -112,17 +207,198 @@ fn select_targets( fn build_box_stats(sys: &mut System, record: &BoxRecord) -> Option { let pid = record.pid?; let memory_limit_bytes = (record.memory_mb as u64) * 1024 * 1024; - collect_stats(sys, pid, record.memory_mb).map(|(cpu, mem)| BoxStats { + let network = collect_network_stats(record); + collect_stats(sys, pid).map(|stats| BoxStats { + id: record.id.clone(), name: record.name.clone(), short_id: record.short_id.clone(), status: record.status.clone(), pid, - cpu_percent: cpu, - memory_bytes: mem, + cpus: record.cpus, + cpu_percent: stats.cpu_percent, + memory_bytes: stats.memory_bytes, memory_limit_bytes, + network_rx_bytes: network.rx_bytes, + network_tx_bytes: network.tx_bytes, + block_read_bytes: stats.block_read_bytes, + block_write_bytes: stats.block_write_bytes, + pids_current: None, + }) +} + +async fn collect_pids_current(record: &BoxRecord) -> Option { + #[cfg(not(windows))] + { + tokio::time::timeout(PIDS_CURRENT_TIMEOUT, collect_guest_process_count(record)) + .await + .ok() + .flatten() + } + #[cfg(windows)] + { + let _ = record; + None + } +} + +#[cfg(not(windows))] +async fn collect_guest_process_count(record: &BoxRecord) -> Option { + let exec_socket_path = + crate::socket_paths::runtime_socket(record, crate::socket_paths::RuntimeSocket::Exec); + let client = ExecClient::connect(&exec_socket_path).await.ok()?; + let request = ExecRequest { + cmd: vec!["ps".to_string(), "-eo".to_string(), "pid,args".to_string()], + timeout_ns: DEFAULT_EXEC_TIMEOUT_NS, + env: vec![], + working_dir: None, + rootfs: None, + stdin: None, + stdin_streaming: false, + user: None, + streaming: false, + }; + let output = client.exec_command(&request).await.ok()?; + if output.exit_code != 0 { + return None; + } + Some(count_guest_processes(&String::from_utf8_lossy( + &output.stdout, + ))) +} + +#[cfg(not(windows))] +fn count_guest_processes(text: &str) -> u64 { + text.lines() + .filter_map(parse_guest_process_command) + .filter(|command| !is_process_count_probe(command)) + .count() as u64 +} + +#[cfg(not(windows))] +fn parse_guest_process_command(line: &str) -> Option<&str> { + let line = line.trim(); + if line.is_empty() { + return None; + } + let mut parts = line.splitn(2, char::is_whitespace); + let pid = parts.next()?; + pid.parse::().ok()?; + Some(parts.next().unwrap_or("").trim()) +} + +#[cfg(not(windows))] +fn is_process_count_probe(command: &str) -> bool { + let command = command.trim(); + command == "ps" + || command.starts_with("ps -eo pid,args") + || command.starts_with("/bin/ps -eo pid,args") + || command.starts_with("/usr/bin/ps -eo pid,args") +} + +fn collect_network_stats(record: &BoxRecord) -> NetworkStats { + read_network_stats_file(&record.box_dir.join("sockets").join("net.stats.json")) + .or_else(|| collect_passt_pcap_stats(record)) + .unwrap_or_default() +} + +fn read_network_stats_file(path: &Path) -> Option { + let data = std::fs::read_to_string(path).ok()?; + let json: serde_json::Value = serde_json::from_str(&data).ok()?; + Some(NetworkStats { + rx_bytes: json.get("rx_bytes")?.as_u64()?, + tx_bytes: json.get("tx_bytes")?.as_u64()?, }) } +fn collect_passt_pcap_stats(record: &BoxRecord) -> Option { + let pcap_path = record.exec_socket_path.parent()?.join("passt.pcap"); + let guest_mac = guest_mac_address(record)?; + read_passt_pcap_stats(&pcap_path, guest_mac) +} + +fn guest_mac_address(record: &BoxRecord) -> Option<[u8; 6]> { + let network_name = crate::cleanup::record_network_name(record)?; + let store = a3s_box_runtime::NetworkStore::default_path().ok()?; + let network = store.get(network_name).ok()??; + let endpoint = network.endpoints.get(&record.id)?; + parse_mac_address(&endpoint.mac_address) +} + +fn parse_mac_address(value: &str) -> Option<[u8; 6]> { + let mut mac = [0u8; 6]; + let mut parts = value.split(':'); + for byte in &mut mac { + *byte = u8::from_str_radix(parts.next()?, 16).ok()?; + } + parts.next().is_none().then_some(mac) +} + +fn read_passt_pcap_stats(path: &Path, guest_mac: [u8; 6]) -> Option { + let data = std::fs::read(path).ok()?; + parse_passt_pcap_stats(&data, guest_mac) +} + +fn parse_passt_pcap_stats(data: &[u8], guest_mac: [u8; 6]) -> Option { + let endian = PcapEndian::from_magic(data.get(..4)?)?; + if data.len() < 24 { + return None; + } + + let mut offset = 24; + let mut stats = NetworkStats::default(); + while offset + 16 <= data.len() { + let incl_len = endian.read_u32(data.get(offset + 8..offset + 12)?) as usize; + let orig_len = endian.read_u32(data.get(offset + 12..offset + 16)?) as u64; + offset += 16; + if offset + incl_len > data.len() { + break; + } + + let frame = &data[offset..offset + incl_len]; + if frame.len() >= 14 { + let frame_len = if orig_len == 0 { + incl_len as u64 + } else { + orig_len + }; + if frame[6..12] == guest_mac { + stats.tx_bytes = stats.tx_bytes.saturating_add(frame_len); + } else if frame[0..6] == guest_mac || frame[0..6] == [0xff; 6] { + stats.rx_bytes = stats.rx_bytes.saturating_add(frame_len); + } + } + + offset += incl_len; + } + + Some(stats) +} + +#[derive(Clone, Copy)] +enum PcapEndian { + Little, + Big, +} + +impl PcapEndian { + fn from_magic(bytes: &[u8]) -> Option { + match bytes { + [0xd4, 0xc3, 0xb2, 0xa1] | [0x4d, 0x3c, 0xb2, 0xa1] => Some(Self::Little), + [0xa1, 0xb2, 0xc3, 0xd4] | [0xa1, 0xb2, 0x3c, 0x4d] => Some(Self::Big), + _ => None, + } + } + + fn read_u32(self, bytes: &[u8]) -> u32 { + let mut arr = [0u8; 4]; + arr.copy_from_slice(&bytes[..4]); + match self { + Self::Little => u32::from_le_bytes(arr), + Self::Big => u32::from_be_bytes(arr), + } + } +} + pub async fn execute(args: StatsArgs) -> Result<(), Box> { let mut sys = System::new(); @@ -133,26 +409,36 @@ pub async fn execute(args: StatsArgs) -> Result<(), Box> let targets = select_targets(&state, args.r#box.as_deref())?; if targets.is_empty() { - println!("No active boxes"); + match args.format { + StatsFormat::Table => println!("No active boxes"), + StatsFormat::Json => println!("[]"), + } return Ok(()); } // Collect stats for each active box. let mut stats = Vec::new(); for record in &targets { - if let Some(box_stats) = build_box_stats(&mut sys, record) { + if let Some(mut box_stats) = build_box_stats(&mut sys, record) { + if args.format == StatsFormat::Json { + box_stats.pids_current = collect_pids_current(record).await; + } stats.push(box_stats); } } - // Clear screen for streaming mode (except first iteration) - if !args.no_stream { - // Use ANSI escape to move cursor to top and clear - print!("\x1B[2J\x1B[H"); + match args.format { + StatsFormat::Table => { + // Clear screen for streaming mode (except first iteration) + if !args.no_stream { + // Use ANSI escape to move cursor to top and clear + print!("\x1B[2J\x1B[H"); + } + print_stats(&stats); + } + StatsFormat::Json => print_stats_json(&stats)?, } - print_stats(&stats); - if args.no_stream { break; } @@ -202,4 +488,160 @@ mod tests { assert!(error.to_string().contains("Cannot show stats for")); assert!(error.to_string().contains("a3s-box start stopped_box")); } + + #[test] + fn test_format_io_usage_matches_stats_table_pair() { + assert_eq!(format_io_usage(1024, 2 * 1024 * 1024), "1.0 KB / 2.0 MB"); + } + + #[test] + fn test_read_network_stats_file_reads_netproxy_snapshot() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("net.stats.json"); + std::fs::write( + &path, + r#"{"schema":"a3s-box.netproxy.stats.v1","rx_bytes":1024,"tx_bytes":2048,"rx_packets":3,"tx_packets":4}"#, + ) + .unwrap(); + + let stats = read_network_stats_file(&path).unwrap(); + + assert_eq!( + stats, + NetworkStats { + rx_bytes: 1024, + tx_bytes: 2048 + } + ); + } + + #[test] + fn test_stats_json_exposes_machine_readable_fields() { + let row = BoxStats { + id: "abcdef1234567890".to_string(), + name: "dev".to_string(), + short_id: "abc123".to_string(), + status: "running".to_string(), + pid: 4242, + cpus: 2, + cpu_percent: 12.5, + memory_bytes: 64 * 1024 * 1024, + memory_limit_bytes: 512 * 1024 * 1024, + network_rx_bytes: 1024, + network_tx_bytes: 2048, + block_read_bytes: 4096, + block_write_bytes: 8192, + pids_current: Some(7), + }; + + let json = stats_json(&row); + + assert_eq!(json["id"], "abcdef1234567890"); + assert_eq!(json["short_id"], "abc123"); + assert_eq!(json["name"], "dev"); + assert_eq!(json["pid"], 4242); + assert_eq!(json["cpus"], 2); + assert_eq!(json["cpu_count"], 2); + assert_eq!(json["cpu_percent"], 12.5); + assert_eq!(json["cpu_percent_scaled"], 6.25); + assert_eq!(json["memory_bytes"], 64 * 1024 * 1024); + assert_eq!(json["memory_limit_bytes"], 512 * 1024 * 1024); + assert_eq!(json["memory_percent"], 12.5); + assert_eq!(json["network_rx_bytes"], 1024); + assert_eq!(json["network_tx_bytes"], 2048); + assert_eq!(json["block_read_bytes"], 4096); + assert_eq!(json["block_write_bytes"], 8192); + assert_eq!(json["pids_current"], 7); + assert_eq!(json["pids"]["current"], 7); + } + + #[cfg(not(windows))] + #[test] + fn test_count_guest_processes_ignores_probe_process() { + let count = count_guest_processes( + "PID COMMAND\n1 /sbin/init\n7 worker --serve\n9 ps -eo pid,args\n", + ); + + assert_eq!(count, 2); + } + + #[test] + fn test_parse_mac_address() { + assert_eq!( + parse_mac_address("02:42:0a:58:00:02"), + Some([0x02, 0x42, 0x0a, 0x58, 0x00, 0x02]) + ); + assert_eq!(parse_mac_address("02:42:0a:58:00"), None); + assert_eq!(parse_mac_address("02:42:0a:58:00:zz"), None); + } + + #[test] + fn test_parse_passt_pcap_stats_classifies_guest_rx_tx() { + let guest = [0x02, 0x42, 0x0a, 0x58, 0x00, 0x02]; + let gateway = [0x9a, 0x55, 0x9a, 0x55, 0x9a, 0x55]; + let other = [0x52, 0x54, 0x00, 0x12, 0x34, 0x56]; + let data = little_endian_pcap(&[ + ethernet_frame(gateway, guest, 60), + ethernet_frame(guest, gateway, 74), + ethernet_frame([0xff; 6], other, 90), + ]); + + let stats = parse_passt_pcap_stats(&data, guest).unwrap(); + + assert_eq!( + stats, + NetworkStats { + rx_bytes: 164, + tx_bytes: 60 + } + ); + } + + #[test] + fn test_parse_passt_pcap_stats_ignores_truncated_tail() { + let guest = [0x02, 0x42, 0x0a, 0x58, 0x00, 0x02]; + let gateway = [0x9a, 0x55, 0x9a, 0x55, 0x9a, 0x55]; + let mut data = little_endian_pcap(&[ethernet_frame(guest, gateway, 64)]); + data.extend_from_slice(&[0, 0, 0, 0, 10, 0, 0]); + + let stats = parse_passt_pcap_stats(&data, guest).unwrap(); + + assert_eq!( + stats, + NetworkStats { + rx_bytes: 64, + tx_bytes: 0 + } + ); + } + + fn little_endian_pcap(frames: &[Vec]) -> Vec { + let mut data = Vec::new(); + data.extend_from_slice(&0xa1b2c3d4u32.to_le_bytes()); + data.extend_from_slice(&2u16.to_le_bytes()); + data.extend_from_slice(&4u16.to_le_bytes()); + data.extend_from_slice(&0i32.to_le_bytes()); + data.extend_from_slice(&0u32.to_le_bytes()); + data.extend_from_slice(&65535u32.to_le_bytes()); + data.extend_from_slice(&1u32.to_le_bytes()); + + for frame in frames { + data.extend_from_slice(&0u32.to_le_bytes()); + data.extend_from_slice(&0u32.to_le_bytes()); + data.extend_from_slice(&(frame.len() as u32).to_le_bytes()); + data.extend_from_slice(&(frame.len() as u32).to_le_bytes()); + data.extend_from_slice(frame); + } + + data + } + + fn ethernet_frame(dst: [u8; 6], src: [u8; 6], len: usize) -> Vec { + assert!(len >= 14); + let mut frame = vec![0u8; len]; + frame[..6].copy_from_slice(&dst); + frame[6..12].copy_from_slice(&src); + frame[12..14].copy_from_slice(&0x0800u16.to_be_bytes()); + frame + } } diff --git a/src/cli/src/commands/top.rs b/src/cli/src/commands/top.rs index 408d9edd..94525490 100644 --- a/src/cli/src/commands/top.rs +++ b/src/cli/src/commands/top.rs @@ -2,7 +2,8 @@ //! //! Convenience wrapper that runs `ps` inside the box via the exec channel. -use clap::Args; +use clap::{Args, ValueEnum}; +use serde::Serialize; #[cfg(not(windows))] use a3s_box_core::exec::{ExecRequest, DEFAULT_EXEC_TIMEOUT_NS}; @@ -17,17 +18,40 @@ use crate::state::StateFile; /// Default ps arguments when none are specified. #[cfg(not(windows))] const DEFAULT_PS_ARGS: &[&str] = &["aux"]; +#[cfg(not(windows))] +const JSON_PS_ARGS: &[&str] = &["-eo", "pid,ppid,pcpu,pmem,etime,args"]; #[derive(Args)] pub struct TopArgs { /// Box name or ID pub r#box: String, + /// Output format + #[arg(long, value_enum, default_value_t = TopFormat::Table)] + pub format: TopFormat, + /// Arguments to pass to ps (default: aux) #[arg(last = true)] pub ps_args: Vec, } +#[derive(Clone, Copy, Debug, PartialEq, Eq, ValueEnum)] +pub enum TopFormat { + Table, + Json, +} + +#[cfg(not(windows))] +#[derive(Debug, Clone, PartialEq, Serialize)] +struct TopProcess { + pid: String, + ppid: Option, + cpu_percent: Option, + memory_percent: Option, + elapsed: Option, + command: String, +} + #[cfg(windows)] pub async fn execute(_args: TopArgs) -> Result<(), Box> { Err(crate::platform::unsupported_command( @@ -48,7 +72,9 @@ pub async fn execute(args: TopArgs) -> Result<(), Box> { let client = ExecClient::connect(&exec_socket_path).await?; - let ps_args = if args.ps_args.is_empty() { + let ps_args = if args.ps_args.is_empty() && args.format == TopFormat::Json { + JSON_PS_ARGS.iter().map(|s| s.to_string()).collect() + } else if args.ps_args.is_empty() { DEFAULT_PS_ARGS.iter().map(|s| s.to_string()).collect() } else { args.ps_args @@ -71,11 +97,6 @@ pub async fn execute(args: TopArgs) -> Result<(), Box> { let output = client.exec_command(&request).await?; - if !output.stdout.is_empty() { - let stdout = String::from_utf8_lossy(&output.stdout); - print!("{stdout}"); - } - if !output.stderr.is_empty() { let stderr = String::from_utf8_lossy(&output.stderr); eprint!("{stderr}"); @@ -85,14 +106,106 @@ pub async fn execute(args: TopArgs) -> Result<(), Box> { std::process::exit(output.exit_code); } + let stdout = String::from_utf8_lossy(&output.stdout); + match args.format { + TopFormat::Table => print!("{stdout}"), + TopFormat::Json => print_top_json(&stdout)?, + } + + Ok(()) +} + +#[cfg(not(windows))] +fn print_top_json(stdout: &str) -> Result<(), serde_json::Error> { + let rows = parse_ps_table(stdout); + println!("{}", serde_json::to_string(&rows)?); Ok(()) } +#[cfg(not(windows))] +fn parse_ps_table(text: &str) -> Vec { + let mut lines = text.lines().filter(|line| !line.trim().is_empty()); + let Some(header) = lines.next() else { + return Vec::new(); + }; + let headers = header + .split_whitespace() + .map(|part| part.trim().to_ascii_uppercase()) + .collect::>(); + let pid_idx = headers.iter().position(|part| part == "PID"); + let ppid_idx = headers.iter().position(|part| part == "PPID"); + let cpu_idx = headers + .iter() + .position(|part| matches!(part.as_str(), "%CPU" | "PCPU" | "CPU%")); + let mem_idx = headers + .iter() + .position(|part| matches!(part.as_str(), "%MEM" | "PMEM" | "MEM%")); + let elapsed_idx = headers + .iter() + .position(|part| matches!(part.as_str(), "ELAPSED" | "ETIME" | "TIME")); + let command_idx = headers + .iter() + .position(|part| matches!(part.as_str(), "COMMAND" | "CMD" | "ARGS")); + + lines + .filter_map(|line| { + parse_ps_line( + line, + pid_idx?, + ppid_idx, + cpu_idx, + mem_idx, + elapsed_idx, + command_idx, + ) + }) + .collect() +} + +#[cfg(not(windows))] +fn parse_ps_line( + line: &str, + pid_idx: usize, + ppid_idx: Option, + cpu_idx: Option, + mem_idx: Option, + elapsed_idx: Option, + command_idx: Option, +) -> Option { + let parts = line.split_whitespace().collect::>(); + let pid = parts.get(pid_idx)?.to_string(); + Some(TopProcess { + pid, + ppid: ppid_idx + .and_then(|idx| parts.get(idx)) + .map(|part| (*part).to_string()), + cpu_percent: cpu_idx + .and_then(|idx| parts.get(idx)) + .and_then(|value| parse_percent(value)), + memory_percent: mem_idx + .and_then(|idx| parts.get(idx)) + .and_then(|value| parse_percent(value)), + elapsed: elapsed_idx + .and_then(|idx| parts.get(idx)) + .map(|part| (*part).to_string()), + command: command_idx + .and_then(|idx| (idx < parts.len()).then(|| parts[idx..].join(" "))) + .unwrap_or_else(|| "-".to_string()), + }) +} + +#[cfg(not(windows))] +fn parse_percent(value: &str) -> Option { + value.trim_end_matches('%').parse().ok() +} + /// Build the ps command from user-provided arguments or defaults. #[cfg(all(test, not(windows)))] -fn build_ps_command(ps_args: &[String]) -> Vec { +fn build_ps_command(format: TopFormat, ps_args: &[String]) -> Vec { let mut cmd = vec!["ps".to_string()]; - if ps_args.is_empty() { + if ps_args.is_empty() && format == TopFormat::Json { + cmd.extend(JSON_PS_ARGS.iter().map(|s| s.to_string())); + } else if ps_args.is_empty() { cmd.extend(DEFAULT_PS_ARGS.iter().map(|s| s.to_string())); } else { cmd.extend_from_slice(ps_args); @@ -106,21 +219,27 @@ mod tests { #[test] fn test_build_ps_command_default() { - let cmd = build_ps_command(&[]); + let cmd = build_ps_command(TopFormat::Table, &[]); assert_eq!(cmd, vec!["ps", "aux"]); } + #[test] + fn test_build_ps_command_json_default() { + let cmd = build_ps_command(TopFormat::Json, &[]); + assert_eq!(cmd, vec!["ps", "-eo", "pid,ppid,pcpu,pmem,etime,args"]); + } + #[test] fn test_build_ps_command_custom() { let args = vec!["-eo".to_string(), "pid,user,%cpu,%mem".to_string()]; - let cmd = build_ps_command(&args); + let cmd = build_ps_command(TopFormat::Table, &args); assert_eq!(cmd, vec!["ps", "-eo", "pid,user,%cpu,%mem"]); } #[test] fn test_build_ps_command_single_arg() { let args = vec!["-ef".to_string()]; - let cmd = build_ps_command(&args); + let cmd = build_ps_command(TopFormat::Table, &args); assert_eq!(cmd, vec!["ps", "-ef"]); } @@ -128,4 +247,33 @@ mod tests { fn test_default_ps_args_constant() { assert_eq!(DEFAULT_PS_ARGS, &["aux"]); } + + #[test] + fn parses_ps_table_for_json_output() { + let rows = parse_ps_table( + "PID PPID %CPU %MEM ELAPSED COMMAND\n1 0 0.0 0.1 02:00 /sbin/init\n42 1 1.5 0.3 00:01 node server.js\n", + ); + + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].pid, "1"); + assert_eq!(rows[0].ppid.as_deref(), Some("0")); + assert_eq!(rows[0].cpu_percent, Some(0.0)); + assert_eq!(rows[0].memory_percent, Some(0.1)); + assert_eq!(rows[0].elapsed.as_deref(), Some("02:00")); + assert_eq!(rows[1].command, "node server.js"); + } + + #[test] + fn parses_aux_style_table_for_json_output() { + let rows = + parse_ps_table("USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\nroot 7 2.5 0.4 1 2 ? S 10:00 00:01 worker --serve\n"); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].pid, "7"); + assert_eq!(rows[0].ppid, None); + assert_eq!(rows[0].cpu_percent, Some(2.5)); + assert_eq!(rows[0].memory_percent, Some(0.4)); + assert_eq!(rows[0].elapsed.as_deref(), Some("00:01")); + assert_eq!(rows[0].command, "worker --serve"); + } } diff --git a/src/core/src/vmm.rs b/src/core/src/vmm.rs index 437fe728..6ef681a5 100644 --- a/src/core/src/vmm.rs +++ b/src/core/src/vmm.rs @@ -59,6 +59,10 @@ pub struct NetworkInstanceConfig { /// Path to the network backend Unix socket (passt on Linux, gvproxy on macOS). pub net_socket_path: PathBuf, + /// Optional JSON stats file written by the userspace network backend. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub net_stats_path: Option, + /// Pre-opened Unix datagram socket fd inherited by the shim on macOS. #[cfg(target_os = "macos")] #[serde(default)] @@ -498,6 +502,7 @@ mod tests { let spec = InstanceSpec { network: Some(NetworkInstanceConfig { net_socket_path: PathBuf::from("/tmp/net.sock"), + net_stats_path: Some(PathBuf::from("/tmp/net.stats.json")), #[cfg(target_os = "macos")] net_socket_fd: Some(42), #[cfg(target_os = "macos")] @@ -515,6 +520,10 @@ mod tests { let deserialized: InstanceSpec = serde_json::from_str(&json).unwrap(); let net = deserialized.network.unwrap(); + assert_eq!( + net.net_stats_path, + Some(PathBuf::from("/tmp/net.stats.json")) + ); #[cfg(target_os = "macos")] assert_eq!(net.net_socket_fd, Some(42)); #[cfg(target_os = "macos")] diff --git a/src/guest/init/src/main.rs b/src/guest/init/src/main.rs index e0a219ee..ffe63583 100644 --- a/src/guest/init/src/main.rs +++ b/src/guest/init/src/main.rs @@ -295,14 +295,36 @@ impl ExecConfig { .map(&decode) .filter(|u| !u.is_empty()); - // Collect BOX_EXEC_ENV_* variables (values decoded as above). - let env: Vec<(String, String)> = std::env::vars() + // Collect BOX_EXEC_ENV_* variables (values decoded as above). Skip + // BOX_EXEC_ENV_FILE — it's the pointer to the staged env file, not a + // container variable. Kept for backward compatibility with a runtime that + // still passes container env inline. + let mut env: Vec<(String, String)> = std::env::vars() .filter_map(|(key, value)| { key.strip_prefix("BOX_EXEC_ENV_") + .filter(|stripped| *stripped != "FILE") .map(|stripped| (stripped.to_string(), decode(value))) }) .collect(); + // Bulk container env is staged in a file (runtime/src/vm/spec.rs): K8s + // injects ~150 service env vars, which overflow the guest kernel cmdline + // if passed inline, so the runtime writes them to a file and points here. + // Each line is `KEY=base64(value)`; the key may itself contain `=`-free + // bytes only (env names are a safe charset), so split on the first `=`. + if let Ok(path) = std::env::var("BOX_EXEC_ENV_FILE") { + match std::fs::read_to_string(&path) { + Ok(contents) => { + for line in contents.lines() { + if let Some((k, v)) = line.split_once('=') { + env.push((k.to_string(), decode(v.to_string()))); + } + } + } + Err(e) => eprintln!("init.krun: failed to read BOX_EXEC_ENV_FILE {path}: {e}"), + } + } + Self { executable, args, diff --git a/src/netproxy/Cargo.toml b/src/netproxy/Cargo.toml index 37187314..10184449 100644 --- a/src/netproxy/Cargo.toml +++ b/src/netproxy/Cargo.toml @@ -26,4 +26,5 @@ smoltcp = { version = "0.11", default-features = false, features = [ ] } [dev-dependencies] +serde_json = { workspace = true } tempfile = { workspace = true } diff --git a/src/netproxy/src/lib.rs b/src/netproxy/src/lib.rs index ec73fd73..011de059 100644 --- a/src/netproxy/src/lib.rs +++ b/src/netproxy/src/lib.rs @@ -19,7 +19,7 @@ use std::os::fd::{FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net::UnixDatagram; use std::path::{Path, PathBuf}; use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }; use std::time::Duration; @@ -55,6 +55,44 @@ const GATEWAY_MAC: EthernetAddress = EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x const MAX_FRAME: usize = 1514; /// Ephemeral port range start for outbound TCP connections from the gateway. const EPHEMERAL_BASE: u16 = 49152; +/// How often the proxy refreshes its stats file. +const STATS_WRITE_INTERVAL: Duration = Duration::from_secs(1); + +#[derive(Default)] +struct NetStats { + rx_bytes: AtomicU64, + tx_bytes: AtomicU64, + rx_packets: AtomicU64, + tx_packets: AtomicU64, +} + +struct NetStatsSnapshot { + rx_bytes: u64, + tx_bytes: u64, + rx_packets: u64, + tx_packets: u64, +} + +impl NetStats { + fn record_rx(&self, bytes: usize) { + self.rx_bytes.fetch_add(bytes as u64, Ordering::Relaxed); + self.rx_packets.fetch_add(1, Ordering::Relaxed); + } + + fn record_tx(&self, bytes: usize) { + self.tx_bytes.fetch_add(bytes as u64, Ordering::Relaxed); + self.tx_packets.fetch_add(1, Ordering::Relaxed); + } + + fn snapshot(&self) -> NetStatsSnapshot { + NetStatsSnapshot { + rx_bytes: self.rx_bytes.load(Ordering::Relaxed), + tx_bytes: self.tx_bytes.load(Ordering::Relaxed), + rx_packets: self.rx_packets.load(Ordering::Relaxed), + tx_packets: self.tx_packets.load(Ordering::Relaxed), + } + } +} // ── smoltcp phy::Device ─────────────────────────────────────────────────────── @@ -71,6 +109,7 @@ const EPHEMERAL_BASE: u16 = 49152; struct UnixgramDevice { socket: UnixDatagram, rx_queue: VecDeque>, + stats: Arc, } impl UnixgramDevice { @@ -84,6 +123,7 @@ impl UnixgramDevice { bytes = n, "NetProxy received ethernet frame from guest/libkrun" ); + self.stats.record_tx(n); self.rx_queue.push_back(buf[..n].to_vec()) } Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, @@ -114,6 +154,7 @@ impl smoltcp::phy::RxToken for OwnedRxToken { /// an explicit destination address. struct TxToken { socket: UnixDatagram, + stats: Arc, } impl smoltcp::phy::TxToken for TxToken { @@ -129,6 +170,8 @@ impl smoltcp::phy::TxToken for TxToken { ); if let Err(e) = self.socket.send(&buf) { tracing::warn!(error = %e, len, "NetProxy: send to libkrun failed"); + } else { + self.stats.record_rx(len); } result } @@ -148,6 +191,7 @@ impl smoltcp::phy::Device for UnixgramDevice { let frame = self.rx_queue.pop_front()?; let tx = TxToken { socket: self.socket.try_clone().ok()?, + stats: Arc::clone(&self.stats), }; Some((OwnedRxToken(frame), tx)) } @@ -155,6 +199,7 @@ impl smoltcp::phy::Device for UnixgramDevice { fn transmit(&mut self, _ts: Instant) -> Option> { Some(TxToken { socket: self.socket.try_clone().ok()?, + stats: Arc::clone(&self.stats), }) } @@ -190,6 +235,9 @@ struct ProxyEngine { port_forwards: Vec, next_ephemeral: u16, shutdown: Arc, + stats: Arc, + stats_path: Option, + last_stats_write: std::time::Instant, } impl ProxyEngine { @@ -200,10 +248,13 @@ impl ProxyEngine { dns_servers: Vec, port_forwards: Vec, shutdown: Arc, + stats: Arc, + stats_path: Option, ) -> Self { let mut device = UnixgramDevice { socket, rx_queue: VecDeque::new(), + stats: Arc::clone(&stats), }; // Configure smoltcp interface as the gateway. @@ -232,10 +283,14 @@ impl ProxyEngine { port_forwards, next_ephemeral: EPHEMERAL_BASE, shutdown, + stats, + stats_path, + last_stats_write: std::time::Instant::now(), } } fn run(&mut self) { + self.write_stats_snapshot(); loop { if self.shutdown.load(Ordering::Relaxed) { break; @@ -264,13 +319,34 @@ impl ProxyEngine { // 7. Remove closed connections and release their smoltcp sockets. self.cleanup(); - // 8. Sleep until the next smoltcp event or at most 5 ms. + // 8. Publish resource counters for `a3s-box stats`. + self.maybe_write_stats_snapshot(); + + // 9. Sleep until the next smoltcp event or at most 5 ms. let delay = self .iface .poll_delay(now, &self.sockets) .unwrap_or(smoltcp::time::Duration::from_millis(1)); std::thread::sleep(Duration::from_micros(delay.micros().min(5_000))); } + self.write_stats_snapshot(); + } + + fn maybe_write_stats_snapshot(&mut self) { + if self.last_stats_write.elapsed() < STATS_WRITE_INTERVAL { + return; + } + self.last_stats_write = std::time::Instant::now(); + self.write_stats_snapshot(); + } + + fn write_stats_snapshot(&self) { + let Some(path) = self.stats_path.as_deref() else { + return; + }; + if let Err(e) = write_stats_file(path, self.stats.snapshot()) { + tracing::debug!(error = %e, path = %path.display(), "NetProxy: failed to write stats file"); + } } // ── Accept new host connections ─────────────────────────────────────────── @@ -472,6 +548,7 @@ impl ProxyEngine { /// Drop calls `stop()` automatically. pub struct NetProxyManager { socket_path: PathBuf, + stats_path: PathBuf, net_socket_fd: Option, net_proxy_fd: Option, } @@ -480,8 +557,10 @@ impl NetProxyManager { /// Create a new manager. Socket will be placed at /// `~/.a3s/boxes//sockets/net.sock`. pub fn new(box_dir: &Path) -> Self { + let socket_dir = box_dir.join("sockets"); Self { - socket_path: box_dir.join("sockets").join("net.sock"), + socket_path: socket_dir.join("net.sock"), + stats_path: socket_dir.join("net.stats.json"), net_socket_fd: None, net_proxy_fd: None, } @@ -491,6 +570,10 @@ impl NetProxyManager { &self.socket_path } + pub fn stats_path(&self) -> &Path { + &self.stats_path + } + pub fn net_socket_fd(&self) -> Option { self.net_socket_fd } @@ -529,6 +612,7 @@ impl NetProxyManager { } } std::fs::remove_file(&self.socket_path).ok(); + std::fs::remove_file(&self.stats_path).ok(); } pub fn is_running(&mut self) -> bool { @@ -551,17 +635,19 @@ pub fn spawn_inherited_netproxy( prefix_len: u8, dns_servers: &[Ipv4Addr], port_map: &[String], + stats_path: Option, ) -> Result<()> { let socket = unsafe { UnixDatagram::from_raw_fd(fd) }; let port_forwards = parse_port_forwards(port_map, guest_ip) .map_err(|e| BoxError::NetworkError(format!("invalid port_map: {}", e)))?; let dns_servers = dns_servers.to_vec(); let shutdown = Arc::new(AtomicBool::new(false)); + let stats = Arc::new(NetStats::default()); std::thread::Builder::new() .name("a3s-netproxy".to_string()) .spawn(move || { - tracing::info!(fd, gateway = %gateway, guest_ip = %guest_ip, "NetProxy thread started"); + tracing::info!(fd, gateway = %gateway, guest_ip = %guest_ip, stats = ?stats_path, "NetProxy thread started"); if let Err(e) = socket.set_nonblocking(true) { tracing::error!(error = %e, "NetProxy: set_nonblocking failed"); return; @@ -574,6 +660,8 @@ pub fn spawn_inherited_netproxy( dns_servers, port_forwards, shutdown, + stats, + stats_path, ); engine.run(); tracing::info!("NetProxy thread exiting"); @@ -597,6 +685,23 @@ fn socketpair_unixgram() -> Result<(UnixDatagram, RawFd)> { Ok((proxy_socket, fds[1])) } +fn write_stats_file(path: &Path, stats: NetStatsSnapshot) -> io::Result<()> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let updated_at_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let body = format!( + "{{\"schema\":\"a3s-box.netproxy.stats.v1\",\"rx_bytes\":{},\"tx_bytes\":{},\"rx_packets\":{},\"tx_packets\":{},\"updated_at_ms\":{}}}\n", + stats.rx_bytes, stats.tx_bytes, stats.rx_packets, stats.tx_packets, updated_at_ms + ); + let tmp = path.with_extension("json.tmp"); + std::fs::write(&tmp, body)?; + std::fs::rename(tmp, path) +} + /// Parse `["8088:80", "443:443"]` into `Vec`. /// /// Each rule maps `host_port → guest_ip:guest_port`. Guest IP is always the @@ -724,6 +829,10 @@ mod tests { mgr.socket_path(), dir.path().join("sockets").join("net.sock") ); + assert_eq!( + mgr.stats_path(), + dir.path().join("sockets").join("net.stats.json") + ); assert_eq!(mgr.net_socket_fd(), None); } @@ -755,6 +864,31 @@ mod tests { assert!(!socket_path.exists()); } + #[test] + fn test_write_stats_file_writes_json_snapshot() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("sockets").join("net.stats.json"); + + write_stats_file( + &path, + NetStatsSnapshot { + rx_bytes: 1024, + tx_bytes: 2048, + rx_packets: 3, + tx_packets: 4, + }, + ) + .unwrap(); + + let json: serde_json::Value = + serde_json::from_str(&std::fs::read_to_string(path).unwrap()).unwrap(); + assert_eq!(json["schema"], "a3s-box.netproxy.stats.v1"); + assert_eq!(json["rx_bytes"], 1024); + assert_eq!(json["tx_bytes"], 2048); + assert_eq!(json["rx_packets"], 3); + assert_eq!(json["tx_packets"], 4); + } + #[test] fn test_parse_port_forwards_valid() { let guest = Ipv4Addr::new(10, 89, 0, 2); diff --git a/src/runtime/src/network/passt.rs b/src/runtime/src/network/passt.rs index f965f59f..fb8f6bc2 100644 --- a/src/runtime/src/network/passt.rs +++ b/src/runtime/src/network/passt.rs @@ -14,6 +14,8 @@ use std::process::{Child, Command}; pub struct PasstManager { /// Path to the passt Unix socket. socket_path: PathBuf, + /// Path to passt's guest-side packet capture. + pcap_path: PathBuf, /// Child process handle (None if not started). child: Option, /// PID file path for the passt process. @@ -35,6 +37,7 @@ impl PasstManager { pub fn new(socket_dir: &Path) -> Self { Self { socket_path: socket_dir.join("passt.sock"), + pcap_path: socket_dir.join("passt.pcap"), pid_file: socket_dir.join("passt.pid"), child: None, } @@ -45,6 +48,11 @@ impl PasstManager { &self.socket_path } + /// Get the passt packet capture path. + pub fn pcap_path(&self) -> &Path { + &self.pcap_path + } + /// Spawn the passt daemon. /// /// Configures passt with: @@ -96,12 +104,17 @@ impl PasstManager { if self.socket_path.exists() { std::fs::remove_file(&self.socket_path).ok(); } + if self.pcap_path.exists() { + std::fs::remove_file(&self.pcap_path).ok(); + } let mut cmd = Command::new("passt"); cmd.arg("--socket") .arg(&self.socket_path) .arg("--pid") .arg(&self.pid_file) + .arg("--pcap") + .arg(&self.pcap_path) // Run in foreground (we manage the process) .arg("--foreground") // Configure the network @@ -248,6 +261,7 @@ impl PasstManager { // Clean up socket and PID file std::fs::remove_file(&self.socket_path).ok(); + std::fs::remove_file(&self.pcap_path).ok(); std::fs::remove_file(&self.pid_file).ok(); } @@ -295,6 +309,7 @@ pub fn terminate_passt(socket_dir: &Path) { } let _ = std::fs::remove_file(&pid_file); let _ = std::fs::remove_file(socket_dir.join("passt.sock")); + let _ = std::fs::remove_file(socket_dir.join("passt.pcap")); } /// Best-effort check that `pid` is actually a passt process, to avoid SIGTERM-ing @@ -354,6 +369,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let mgr = PasstManager::new(dir.path()); assert_eq!(mgr.socket_path(), dir.path().join("passt.sock")); + assert_eq!(mgr.pcap_path(), dir.path().join("passt.pcap")); } #[test] @@ -378,6 +394,7 @@ mod tests { let box_dir = dir.path().join("boxes").join("test-box-id"); let mgr = PasstManager::new(&box_dir); assert_eq!(mgr.socket_path(), box_dir.join("passt.sock")); + assert_eq!(mgr.pcap_path(), box_dir.join("passt.pcap")); } #[test] @@ -385,14 +402,17 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let socket_path = dir.path().join("passt.sock"); let pid_path = dir.path().join("passt.pid"); + let pcap_path = dir.path().join("passt.pcap"); // A non-existent PID so the SIGTERM is a harmless no-op (ESRCH). std::fs::write(&socket_path, "fake").unwrap(); std::fs::write(&pid_path, "2147483647").unwrap(); + std::fs::write(&pcap_path, "fake pcap").unwrap(); terminate_passt(dir.path()); assert!(!socket_path.exists()); assert!(!pid_path.exists()); + assert!(!pcap_path.exists()); } } diff --git a/src/runtime/src/vm/network.rs b/src/runtime/src/vm/network.rs index cf77c5ae..d0168d67 100644 --- a/src/runtime/src/vm/network.rs +++ b/src/runtime/src/vm/network.rs @@ -98,7 +98,7 @@ impl VmManager { let box_dir = self.home_dir.join("boxes").join(&self.box_id); #[cfg(target_os = "linux")] - let (socket_path, _net_socket_fd, _net_proxy_fd) = { + let (socket_path, net_stats_path, _net_socket_fd, _net_proxy_fd) = { // passt drops privileges to `nobody` when launched as root, so its // socket must live in the world-traversable runtime socket directory // (next to the exec/PTY sockets), not under the box's 0700 home. @@ -108,23 +108,25 @@ impl VmManager { let path = passt.socket_path().to_path_buf(); self.net_manager = Some(Box::new(passt)); tracing::info!(network = network_name, ip = %ip, gateway = %gateway, "Bridge networking configured via passt"); - (path, None::, None::) + (path, None, None::, None::) }; #[cfg(target_os = "macos")] - let (socket_path, net_socket_fd, net_proxy_fd) = { + let (socket_path, net_stats_path, net_socket_fd, net_proxy_fd) = { let mut netproxy = crate::network::NetProxyManager::new(&box_dir); netproxy.spawn(ip, gateway, prefix_len, &dns_servers, &self.config.port_map)?; let fd = netproxy.net_socket_fd(); let proxy_fd = netproxy.net_proxy_fd(); let path = netproxy.socket_path().to_path_buf(); + let stats_path = netproxy.stats_path().to_path_buf(); self.net_manager = Some(Box::new(netproxy)); tracing::info!(network = network_name, ip = %ip, gateway = %gateway, "Bridge networking configured via built-in netproxy"); - (path, fd, proxy_fd) + (path, Some(stats_path), fd, proxy_fd) }; Ok(NetworkInstanceConfig { net_socket_path: socket_path, + net_stats_path, #[cfg(target_os = "macos")] net_socket_fd, #[cfg(target_os = "macos")] diff --git a/src/runtime/src/vm/spec.rs b/src/runtime/src/vm/spec.rs index 67225bd1..60ed42f0 100644 --- a/src/runtime/src/vm/spec.rs +++ b/src/runtime/src/vm/spec.rs @@ -174,12 +174,30 @@ impl VmManager { env.push(("BOX_EXEC_USER".to_string(), b64(user))); } - // Pass container environment variables with BOX_EXEC_ENV_ prefix (values - // base64-encoded like the rest, so a value containing `"`/spaces/etc. - // reaches the container intact). The key stays raw — env names are - // restricted to a safe character set. - for (key, value) in container_env { - env.push((format!("BOX_EXEC_ENV_{}", key), b64(&value))); + // Container environment variables. Values are base64-encoded like the + // rest (so `"`/spaces/etc. survive); the key stays raw (env names are a + // safe charset). These are staged in a FILE in the guest rootfs rather + // than passed inline: Kubernetes injects ~150 service env vars per pod + // (one set per Service via enableServiceLinks), and inline they bloat + // the env block libkrun packs into the guest kernel cmdline, overflow + // COMMAND_LINE_SIZE, and the guest silently fails to boot. Guest-init + // reads the file via the BOX_EXEC_ENV_FILE pointer below; only that + // small pointer rides the cmdline. Each line is `KEY=base64(value)`. + let env_file_body: String = container_env + .iter() + .map(|(key, value)| format!("{}={}\n", key, b64(value))) + .collect(); + if !env_file_body.is_empty() { + let host_path = layout.rootfs_path.join(".a3s-box-env"); + std::fs::write(&host_path, env_file_body).map_err(|e| BoxError::BoxBootError { + message: format!( + "failed to stage guest env file {}: {}", + host_path.display(), + e + ), + hint: None, + })?; + env.push(("BOX_EXEC_ENV_FILE".to_string(), "/.a3s-box-env".to_string())); } // Pass user volume mounts to guest init for mounting inside the VM. @@ -1324,26 +1342,31 @@ mod tests { let spec = vm.build_instance_spec(&layout).unwrap(); + // Container env is staged in a file in the rootfs, not inlined: only a + // small BOX_EXEC_ENV_FILE pointer rides the env block (cmdline overflow). assert!(spec .entrypoint .env .iter() - .any(|(key, value)| key == "BOX_EXEC_ENV_FOO" && b64d(value) == "cli")); - assert!(spec - .entrypoint - .env - .iter() - .any(|(key, value)| key == "BOX_EXEC_ENV_BAR" && b64d(value) == "image")); - assert!(spec - .entrypoint - .env - .iter() - .any(|(key, value)| key == "BOX_EXEC_ENV_BAZ" && b64d(value) == "cli")); + .any(|(key, value)| key == "BOX_EXEC_ENV_FILE" && value == "/.a3s-box-env")); + // No raw container env keys leak into the inline env block. assert!(!spec .entrypoint .env .iter() - .any(|(key, _)| key == "FOO" || key == "BAZ")); + .any(|(key, _)| key == "FOO" || key == "BAR" || key == "BAZ")); + + // The staged file holds one `KEY=base64(value)` line per var, with the + // CLI extra_env overriding the image's env (FOO/BAZ from cli, BAR from image). + let staged = std::fs::read_to_string(layout.rootfs_path.join(".a3s-box-env")).unwrap(); + let entries: std::collections::HashMap<&str, String> = staged + .lines() + .filter_map(|l| l.split_once('=')) + .map(|(k, v)| (k, b64d(v))) + .collect(); + assert_eq!(entries.get("FOO").map(String::as_str), Some("cli")); + assert_eq!(entries.get("BAR").map(String::as_str), Some("image")); + assert_eq!(entries.get("BAZ").map(String::as_str), Some("cli")); } #[test] diff --git a/src/shim/src/main.rs b/src/shim/src/main.rs index 168c7c3e..3c8256e9 100644 --- a/src/shim/src/main.rs +++ b/src/shim/src/main.rs @@ -666,6 +666,7 @@ unsafe fn configure_and_start_vm(spec: &InstanceSpec) -> Result<()> { net_config.prefix_len, &net_config.dns_servers, &spec.port_map, + net_config.net_stats_path.clone(), )?; } log_inherited_net_fd(fd); @@ -1030,6 +1031,7 @@ mod tests { fn test_network_config() -> a3s_box_core::vmm::NetworkInstanceConfig { a3s_box_core::vmm::NetworkInstanceConfig { net_socket_path: std::path::PathBuf::from("/tmp/a3s-box-test-net.sock"), + net_stats_path: Some(std::path::PathBuf::from("/tmp/a3s-box-test-net.stats.json")), #[cfg(target_os = "macos")] net_socket_fd: Some(42), #[cfg(target_os = "macos")] From a8e60cde6a4aabb73d9481f599adeb8e24a8b051 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sat, 27 Jun 2026 10:37:03 +0800 Subject: [PATCH 3/6] docs(k8s): one-click RuntimeClass node installer + README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit deploy/scripts/install-runtimeclass.sh provisions a node to run RuntimeClass=a3s-box pods: installs the a3s-box CLI + helpers + libkrun and the containerd runtime-v2 shim (containerd-shim-a3s-box-v2), registers the io.containerd.a3s-box.v2 runtime via an /etc/containerd/conf.d drop-in, restarts containerd, and warms the one-time per-node boot cache so the first pod boots fast. Download (default) and --from-dir (air-gapped) modes; idempotent. README: document the deploy flow — create the RuntimeClass, run the installer on each node, label the node (a3s-box.io/runtime=true), run a pod. Validated end-to-end on a 5-node cluster: installer ran on every worker, each then started an a3s-box pod and served `kubectl exec` on the first try. Note: v2.6.0's release tarball ships the CLI/libkrun but not the containerd shim (it is a separate cargo project release.yml does not build); the shim is published as the release asset containerd-shim-a3s-box-v2-linux-x86_64. Follow-up: have release.yml build + attach it so future releases are self-contained. --- README.md | 73 ++++++++++++ deploy/scripts/install-runtimeclass.sh | 159 +++++++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100755 deploy/scripts/install-runtimeclass.sh diff --git a/README.md b/README.md index dde3ed8d..acc1a767 100644 --- a/README.md +++ b/README.md @@ -347,6 +347,79 @@ helm install a3s-box deploy/helm/a3s-box/ -n a3s-box-system --create-namespace Windows CRI is intentionally unsupported. +## Deploy as a Kubernetes RuntimeClass + +Run selected pods as a3s-box MicroVMs by setting `runtimeClassName: a3s-box`. Each +pod's containers become libkrun MicroVMs under containerd, and `kubectl exec` works +against them. This is opt-in per node — a node must have the runtime installed **and** +carry the label `a3s-box.io/runtime=true` before a3s-box pods schedule there. + +**1. Create the RuntimeClass (once per cluster):** + +```bash +kubectl apply -f - <<'EOF' +apiVersion: node.k8s.io/v1 +kind: RuntimeClass +metadata: + name: a3s-box +handler: a3s-box +scheduling: + nodeSelector: + a3s-box.io/runtime: "true" # only labeled nodes run a3s-box pods +EOF +``` + +**2. Provision each node** — run the installer as root on every node that should +host a3s-box workloads. It installs the a3s-box CLI + helpers, libkrun, and the +containerd runtime-v2 shim (`containerd-shim-a3s-box-v2`), registers the +`io.containerd.a3s-box.v2` runtime via an `/etc/containerd/conf.d` drop-in, restarts +containerd, and warms the one-time per-node boot cache. Requires containerd ≥ 2.0 +and `/dev/kvm`. + +```bash +# one-liner (downloads the pinned release artifacts from GitHub): +curl -fsSL https://raw.githubusercontent.com/AI45Lab/Box/main/deploy/scripts/install-runtimeclass.sh | sudo bash + +# or from a checkout: +sudo deploy/scripts/install-runtimeclass.sh # default version +sudo deploy/scripts/install-runtimeclass.sh --version v2.6.0 # pin a version +``` + +Then label the node from a machine with `kubectl`: + +```bash +kubectl label node a3s-box.io/runtime=true +``` + +Notes: +- **Control-plane nodes** carry a `NoSchedule` taint and are normally excluded — + leave them unlabeled unless you intentionally run workloads there. +- The installer warms up with `busybox:latest` so the *first* pod boots fast (the + first box on a fresh node builds a one-time cache that can exceed the shim's boot + window). Use `--warmup-image ` to point at a mirror, or `--no-warmup` to skip. +- **Air-gapped:** pre-stage the release tarball + (`a3s-box--linux-.tar.gz`) and `containerd-shim-a3s-box-v2-linux-` + in a directory and pass `--from-dir ` (no network needed). + +**3. Run a pod:** + +```bash +kubectl apply -f - <<'EOF' +apiVersion: v1 +kind: Pod +metadata: + name: hello-a3s-box +spec: + runtimeClassName: a3s-box + containers: + - name: app + image: busybox:latest + command: ["sleep", "3600"] +EOF + +kubectl exec hello-a3s-box -- sh -c 'echo "hello from $(hostname)"; uname -m' +``` + ## Architecture ```text diff --git a/deploy/scripts/install-runtimeclass.sh b/deploy/scripts/install-runtimeclass.sh new file mode 100755 index 00000000..bd88dacd --- /dev/null +++ b/deploy/scripts/install-runtimeclass.sh @@ -0,0 +1,159 @@ +#!/usr/bin/env bash +# +# install-runtimeclass.sh — provision a Kubernetes node to run RuntimeClass=a3s-box pods. +# +# Run as root ON each node that should host a3s-box MicroVM workloads. It installs: +# * the a3s-box CLI + helpers (a3s-box, a3s-box-cri, a3s-box-guest-init, a3s-box-shim) +# * libkrun / libkrunfw (the MicroVM VMM, into the system lib dir + ldconfig) +# * the containerd runtime-v2 shim (containerd-shim-a3s-box-v2) +# and registers the `io.containerd.a3s-box.v2` runtime with containerd, then restarts it. +# +# After running this on a node, label it from a control-plane so the RuntimeClass +# nodeSelector (a3s-box.io/runtime=true) lets a3s-box pods schedule there: +# +# kubectl label node a3s-box.io/runtime=true +# +# Usage: +# install-runtimeclass.sh [--version vX.Y.Z] [--repo OWNER/REPO] [--from-dir DIR] +# +# --version release tag to install (default: v2.6.0) +# --repo GitHub repo to download artifacts from (default: AI45Lab/Box) +# --from-dir install from a local directory instead of downloading; the dir must +# contain a3s-box--linux-.tar.gz and a containerd shim +# binary (containerd-shim-a3s-box-v2[-linux-]). +# +# Idempotent: safe to re-run (re-installs binaries, rewrites the containerd drop-in). +set -euo pipefail + +VERSION="v2.6.0" +REPO="AI45Lab/Box" +FROM_DIR="" +WARMUP_IMAGE="busybox:latest" # first box on a fresh node builds a one-time cache + # (~40s+); booting one here primes it so the first + # real pod doesn't exceed the shim's boot poll. Best + # effort — skipped silently if the image can't pull. + +while [ $# -gt 0 ]; do + case "$1" in + --version) VERSION="$2"; shift 2 ;; + --repo) REPO="$2"; shift 2 ;; + --from-dir) FROM_DIR="$2"; shift 2 ;; + --warmup-image) WARMUP_IMAGE="$2"; shift 2 ;; + --no-warmup) WARMUP_IMAGE=""; shift ;; + -h|--help) sed -n '2,33p' "$0"; exit 0 ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +log() { printf '\033[1;34m==>\033[0m %s\n' "$*"; } +die() { printf '\033[1;31mERROR:\033[0m %s\n' "$*" >&2; exit 1; } + +# ── preflight ─────────────────────────────────────────────────────────────── +[ "$(id -u)" = 0 ] || die "must run as root" +command -v containerd >/dev/null || die "containerd not found on this node" +[ -e /dev/kvm ] || die "/dev/kvm missing — this node has no KVM virtualization; a3s-box cannot run here" + +case "$(uname -m)" in + x86_64) ARCH=x86_64 ;; + aarch64|arm64) ARCH=arm64 ;; + *) die "unsupported architecture: $(uname -m)" ;; +esac + +TARBALL="a3s-box-${VERSION}-linux-${ARCH}.tar.gz" +SHIM_ASSET="containerd-shim-a3s-box-v2-linux-${ARCH}" + +work="$(mktemp -d)" +trap 'rm -rf "$work"' EXIT + +# ── obtain artifacts ──────────────────────────────────────────────────────── +if [ -n "$FROM_DIR" ]; then + log "Installing from local dir: $FROM_DIR" + [ -f "$FROM_DIR/$TARBALL" ] || die "missing $TARBALL in $FROM_DIR" + cp "$FROM_DIR/$TARBALL" "$work/$TARBALL" + if [ -f "$FROM_DIR/$SHIM_ASSET" ]; then cp "$FROM_DIR/$SHIM_ASSET" "$work/shim" + elif [ -f "$FROM_DIR/containerd-shim-a3s-box-v2" ]; then cp "$FROM_DIR/containerd-shim-a3s-box-v2" "$work/shim" + else die "missing containerd-shim-a3s-box-v2 in $FROM_DIR"; fi +else + base="https://github.com/${REPO}/releases/download/${VERSION}" + log "Downloading $TARBALL" + curl -fsSL "$base/$TARBALL" -o "$work/$TARBALL" || die "download failed: $base/$TARBALL" + log "Downloading $SHIM_ASSET" + curl -fsSL "$base/$SHIM_ASSET" -o "$work/shim" || die "download failed: $base/$SHIM_ASSET" +fi + +tar xzf "$work/$TARBALL" -C "$work" +src="$work/a3s-box-${VERSION}-linux-${ARCH}" +[ -d "$src" ] || die "unexpected tarball layout (no $src)" + +# ── install binaries + libkrun ────────────────────────────────────────────── +log "Installing a3s-box binaries to /usr/local/bin" +install -m0755 "$src/a3s-box" "$src/a3s-box-cri" "$src/a3s-box-guest-init" "$src/a3s-box-shim" /usr/local/bin/ + +log "Installing libkrun to /usr/lib + ldconfig" +cp -a "$src"/lib/libkrun* /usr/lib/ +ldconfig + +log "Installing containerd-shim-a3s-box-v2 (/usr/local/bin + /opt/containerd/bin)" +install -m0755 "$work/shim" /usr/local/bin/containerd-shim-a3s-box-v2 +install -d /opt/containerd/bin +install -m0755 "$work/shim" /opt/containerd/bin/containerd-shim-a3s-box-v2 + +install -d /var/lib/a3s-box # shared A3S_HOME for the shim's a3s-box invocations + +# ── register the runtime with containerd ──────────────────────────────────── +cfg=/etc/containerd/config.toml +runtime_block() { + cat <<'TOML' +[plugins.'io.containerd.cri.v1.runtime'.containerd.runtimes.a3s-box] + runtime_type = 'io.containerd.a3s-box.v2' + [plugins.'io.containerd.cri.v1.runtime'.containerd.runtimes.a3s-box.options] +TOML +} + +if grep -qE "^imports[[:space:]]*=.*conf\.d" "$cfg" 2>/dev/null; then + # containerd merges /etc/containerd/conf.d/*.toml — register via a drop-in so we + # never touch the main config (clean + idempotent). + install -d /etc/containerd/conf.d + { echo "version = 3"; echo; runtime_block; } > /etc/containerd/conf.d/a3s-box.toml + log "Registered runtime via /etc/containerd/conf.d/a3s-box.toml" +elif grep -q "runtimes.a3s-box\]" "$cfg" 2>/dev/null; then + log "Runtime already present in $cfg — leaving as-is" +else + # No conf.d imports: append the runtime table to the main config. + { echo; runtime_block; } >> "$cfg" + log "Registered runtime in $cfg" +fi + +log "Restarting containerd" +systemctl restart containerd +sleep 2 + +# ── verify ────────────────────────────────────────────────────────────────── +log "Verification" +systemctl is-active --quiet containerd || die "containerd is not active after restart" +"$src/a3s-box" --version >/dev/null 2>&1 || /usr/local/bin/a3s-box --version >/dev/null 2>&1 || die "a3s-box CLI not runnable" +echo " a3s-box: $(/usr/local/bin/a3s-box --version 2>/dev/null)" +echo " libkrun: $(ldconfig -p | awk '/libkrun\.so/{print $1; exit}')" +echo " shim: $(command -v containerd-shim-a3s-box-v2)" +echo " /dev/kvm: present" +echo " containerd: active" + +# ── warm up (prime the one-time per-node boot cache) ──────────────────────── +if [ -n "$WARMUP_IMAGE" ]; then + log "Warming up with $WARMUP_IMAGE (primes first-boot cache; --no-warmup to skip)" + if A3S_HOME=/var/lib/a3s-box timeout 240 /usr/local/bin/a3s-box run \ + --name a3sbox-warmup "$WARMUP_IMAGE" -- true >/dev/null 2>&1; then + echo " warm-up OK — first pod will boot fast" + else + echo " warm-up skipped (could not pull $WARMUP_IMAGE) — first pod may cold-start slowly" + fi + A3S_HOME=/var/lib/a3s-box /usr/local/bin/a3s-box rm -f a3sbox-warmup >/dev/null 2>&1 || true +fi + +printf '\n\033[1;32mDone.\033[0m a3s-box runtime installed on %s.\n' "$(hostname)" +cat < Date: Sat, 27 Jun 2026 10:38:35 +0800 Subject: [PATCH 4/6] docs(k8s): point installer one-liner at release/2.6.0 (where the script lives until merged to main) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index acc1a767..eea45741 100644 --- a/README.md +++ b/README.md @@ -378,7 +378,7 @@ and `/dev/kvm`. ```bash # one-liner (downloads the pinned release artifacts from GitHub): -curl -fsSL https://raw.githubusercontent.com/AI45Lab/Box/main/deploy/scripts/install-runtimeclass.sh | sudo bash +curl -fsSL https://raw.githubusercontent.com/AI45Lab/Box/release/2.6.0/deploy/scripts/install-runtimeclass.sh | sudo bash # or from a checkout: sudo deploy/scripts/install-runtimeclass.sh # default version From 9ccf4682a560232563a55ea4996df442d3d6b23f Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sat, 27 Jun 2026 10:46:39 +0800 Subject: [PATCH 5/6] docs(k8s): note validated multi-node RuntimeClass deployment in status --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index eea45741..359e97bd 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ As of **v2.4.0**, three adversarial audits — production-operability (24 findin | Networking | Default TSI networking, TCP `host:guest` publishing, user-defined bridge networks, network inspect/connect/disconnect/rm, and `/etc/hosts` peer discovery are implemented with documented platform boundaries. | | Compose | A useful local subset is implemented: image, command, entrypoint, env, env_file, ports, volumes, depends_on, networks, DNS, tmpfs, workdir, hostname, extra_hosts, labels, healthcheck, restart, CPU/memory, capabilities, and privileged mode. | | TEE | AMD SEV-SNP-oriented attestation, RA-TLS, sealing, and secret injection flows exist, plus simulation mode for development. Hardware-backed operation depends on SEV-SNP-capable hosts and libkrun support. TDX is not a productized path. | -| Kubernetes CRI | Reachable by `crictl`/kubelet over its Unix socket. Verified on a `/dev/kvm` host: pod + container lifecycle (`RunPodSandbox` → `CreateContainer` → `StartContainer` → `Stop`/`Remove`), `exec` over Kubernetes SPDY/3.1 `remotecommand` (TTY and non-TTY, stdin/stdout/stderr, exit codes), and container log capture to `log_path`. Not yet conformant: `attach` and the stricter `critest` specs (log format, Linux SecurityContext, seccomp/AppArmor, namespaces, mount propagation). Linux-only; not the core completion target. | +| Kubernetes CRI | Reachable by `crictl`/kubelet over its Unix socket. Verified on a `/dev/kvm` host: pod + container lifecycle (`RunPodSandbox` → `CreateContainer` → `StartContainer` → `Stop`/`Remove`), `exec` over Kubernetes SPDY/3.1 `remotecommand` (TTY and non-TTY, stdin/stdout/stderr, exit codes), and container log capture to `log_path`. Not yet conformant: `attach` and the stricter `critest` specs (log format, Linux SecurityContext, seccomp/AppArmor, namespaces, mount propagation). Linux-only; not the core completion target. **RuntimeClass:** a one-command per-node installer (`deploy/scripts/install-runtimeclass.sh`) registers the `io.containerd.a3s-box.v2` runtime, and `runtimeClassName: a3s-box` is validated end-to-end (pod start + `kubectl exec`) across a 5-node cluster — see [Deploy as a Kubernetes RuntimeClass](#deploy-as-a-kubernetes-runtimeclass). | | Windows | Native WHPX backend through libkrun. The Windows package runs directly on Windows with Windows Hypervisor Platform enabled; it does not require WSL. Windows CRI is intentionally out of scope. | ## What A3S Box is From 70e628511979abf5b61c0413160ec8b9c744be30 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sat, 27 Jun 2026 10:51:02 +0800 Subject: [PATCH 6/6] docs(k8s): point installer one-liner at main (durable URL post-merge) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 359e97bd..edb33eea 100644 --- a/README.md +++ b/README.md @@ -378,7 +378,7 @@ and `/dev/kvm`. ```bash # one-liner (downloads the pinned release artifacts from GitHub): -curl -fsSL https://raw.githubusercontent.com/AI45Lab/Box/release/2.6.0/deploy/scripts/install-runtimeclass.sh | sudo bash +curl -fsSL https://raw.githubusercontent.com/AI45Lab/Box/main/deploy/scripts/install-runtimeclass.sh | sudo bash # or from a checkout: sudo deploy/scripts/install-runtimeclass.sh # default version