diff --git a/core/src/agent.rs b/core/src/agent.rs index c759299..2b7c8b3 100644 --- a/core/src/agent.rs +++ b/core/src/agent.rs @@ -90,6 +90,8 @@ pub(crate) struct AgentConfig { pub goal_tracking: bool, /// Optional hook engine for firing lifecycle events (PreToolUse, PostToolUse, etc.) pub hook_engine: Option>, + /// Optional structured JSONL trajectory recorder for RL training and service data capture. + pub rl_trajectory_recorder: crate::rl_trajectory::RlTrajectoryRecorder, /// Optional skill registry for tool permission enforcement pub skill_registry: Option>, /// When true, active skill `allowed-tools` restrict ordinary session tool calls. @@ -183,6 +185,7 @@ impl std::fmt::Debug for AgentConfig { .field("planning_mode", &self.planning_mode) .field("goal_tracking", &self.goal_tracking) .field("hook_engine", &self.hook_engine.is_some()) + .field("rl_trajectory", &self.rl_trajectory_recorder.is_enabled()) .field( "skill_registry", &self.skill_registry.as_ref().map(|r| r.len()), @@ -230,6 +233,7 @@ impl Default for AgentConfig { planning_mode: PlanningMode::default(), goal_tracking: false, hook_engine: None, + rl_trajectory_recorder: crate::rl_trajectory::RlTrajectoryRecorder::disabled(), skill_registry: Some(Arc::new(crate::skills::SkillRegistry::with_builtins())), enforce_active_skill_tool_restrictions: false, max_parse_retries: 2, diff --git a/core/src/agent/execution_mode.rs b/core/src/agent/execution_mode.rs index 99ada7b..9cb090a 100644 --- a/core/src/agent/execution_mode.rs +++ b/core/src/agent/execution_mode.rs @@ -190,6 +190,14 @@ impl AgentLoop { a3s.llm.total_tokens = r.usage.total_tokens, "a3s.agent.execute completed" ); + self.config.rl_trajectory_recorder.record_execution_end( + session_id.unwrap_or(""), + true, + Some(&r.text), + Some(&r.usage), + Some(r.tool_calls_count), + None, + ); self.fire_post_response( session_id.unwrap_or(""), &r.text, @@ -204,6 +212,14 @@ impl AgentLoop { error = %e, "a3s.agent.execute failed" ); + self.config.rl_trajectory_recorder.record_execution_end( + session_id.unwrap_or(""), + false, + None, + None, + None, + Some(&e.to_string()), + ); self.fire_on_error( session_id.unwrap_or(""), ErrorType::Other, diff --git a/core/src/agent/execution_state.rs b/core/src/agent/execution_state.rs index d71f23e..147a73d 100644 --- a/core/src/agent/execution_state.rs +++ b/core/src/agent/execution_state.rs @@ -68,6 +68,10 @@ impl ExecutionLoopState { self.turn } + pub(super) fn current_turn(&self) -> usize { + self.turn + } + pub(super) fn continuation_count(&self) -> u32 { self.continuation_count } diff --git a/core/src/agent/llm_turn.rs b/core/src/agent/llm_turn.rs index d727f60..309c312 100644 --- a/core/src/agent/llm_turn.rs +++ b/core/src/agent/llm_turn.rs @@ -33,6 +33,20 @@ impl AgentLoop { "LLM completion started" ); + let selected_tool_names = + crate::tools::select_tools_for_messages(&self.config.tools, &state.messages) + .into_iter() + .map(|tool| tool.name) + .collect::>(); + self.config.rl_trajectory_recorder.record_llm_request( + session_id.unwrap_or(""), + turn, + &state.messages, + augmented_system.as_deref(), + &selected_tool_names, + estimate_prompt_tokens(&state.messages, augmented_system.as_deref()), + ); + self.fire_generate_start(session_id.unwrap_or(""), effective_prompt, augmented_system) .await; @@ -255,6 +269,12 @@ impl AgentLoop { a3s.llm.total_tokens = response.usage.total_tokens, "Turn token usage" ); + self.config.rl_trajectory_recorder.record_llm_response( + session_id.unwrap_or(""), + turn, + response, + llm_duration.as_millis() as u64, + ); } async fn emit_turn_end( @@ -318,6 +338,13 @@ impl AgentLoop { state.messages = compacted; } + self.config.rl_trajectory_recorder.record_context_compacted( + session_id.unwrap_or(""), + before_len, + &state.messages, + percent_before, + ); + if let Some(tx) = event_tx { tx.send(AgentEvent::ContextCompacted { session_id: session_id.unwrap_or("").to_string(), diff --git a/core/src/agent/loop_runtime.rs b/core/src/agent/loop_runtime.rs index 5e48fee..761b905 100644 --- a/core/src/agent/loop_runtime.rs +++ b/core/src/agent/loop_runtime.rs @@ -99,6 +99,16 @@ impl AgentLoop { let effective_prompt = turn_context.effective_prompt.as_str(); let augmented_system = turn_context.augmented_system; + self.config.rl_trajectory_recorder.record_execution_start( + session_id.unwrap_or(""), + &self.tool_context.workspace, + effective_prompt, + history, + augmented_system.as_deref(), + self.config.max_tool_rounds, + &format!("{:?}", self.config.planning_mode), + ); + // Add user message if !msg_prompt.is_empty() { state.messages.push(Message::user(msg_prompt)); diff --git a/core/src/agent/tool_completion_runtime.rs b/core/src/agent/tool_completion_runtime.rs index 43921ed..ea64394 100644 --- a/core/src/agent/tool_completion_runtime.rs +++ b/core/src/agent/tool_completion_runtime.rs @@ -50,6 +50,21 @@ impl AgentLoop { ) .await; + self.config.rl_trajectory_recorder.record_tool_result( + session_id.unwrap_or(""), + state.current_turn(), + &tool_call.id, + &tool_call.name, + &output, + normalized.exit_code, + tool_duration.as_millis() as u64, + &normalized.metadata, + normalized + .error_kind + .as_ref() + .map(|kind| format!("{kind:?}")), + ); + self.remember_tool_result( effective_prompt, &tool_call.name, diff --git a/core/src/agent/tool_execution_runtime.rs b/core/src/agent/tool_execution_runtime.rs index f0789c4..ea106ad 100644 --- a/core/src/agent/tool_execution_runtime.rs +++ b/core/src/agent/tool_execution_runtime.rs @@ -1,5 +1,6 @@ use super::tool_result_runtime::NormalizedToolResult; use super::{AgentEvent, AgentLoop, ToolCommand}; +use crate::llm::ToolCall; use crate::tools::{ToolContext, ToolStreamEvent}; use serde_json::Value; use std::sync::Arc; @@ -41,6 +42,16 @@ impl AgentLoop { event_tx: &Option>, ) -> (String, i32, bool, Option) { let call_id = format!("plan-{}-{}", tool_name, uuid::Uuid::new_v4()); + let synthetic_call = ToolCall { + id: call_id.clone(), + name: tool_name.to_string(), + args: args.clone(), + }; + self.config.rl_trajectory_recorder.record_tool_call( + session_id.unwrap_or(""), + 0, + &synthetic_call, + ); if let Some(tx) = event_tx { tx.send(AgentEvent::ToolStart { id: call_id.clone(), @@ -51,9 +62,24 @@ impl AgentLoop { } let ctx = self.tool_context_for_plan(session_id); + let started = std::time::Instant::now(); let normalized = NormalizedToolResult::from_execution( self.execute_tool_timed(tool_name, args, &ctx).await, ); + self.config.rl_trajectory_recorder.record_tool_result( + session_id.unwrap_or(""), + 0, + &call_id, + tool_name, + &normalized.output, + normalized.exit_code, + started.elapsed().as_millis() as u64, + &normalized.metadata, + normalized + .error_kind + .as_ref() + .map(|kind| format!("{kind:?}")), + ); if let Some(tx) = event_tx { tx.send(AgentEvent::ToolEnd { diff --git a/core/src/agent/tool_guard_runtime.rs b/core/src/agent/tool_guard_runtime.rs index f4981a8..92d1966 100644 --- a/core/src/agent/tool_guard_runtime.rs +++ b/core/src/agent/tool_guard_runtime.rs @@ -13,6 +13,7 @@ impl AgentLoop { tool_call: &ToolCall, state: &mut ExecutionLoopState, event_tx: &Option>, + session_id: Option<&str>, ) -> anyhow::Result { if let Some((duplicate_count, error_msg)) = state.duplicate_tool_call( &tool_call.name, @@ -37,6 +38,17 @@ impl AgentLoop { state .messages .push(Message::tool_result(&tool_call.id, &error_msg, true)); + self.config.rl_trajectory_recorder.record_tool_result( + session_id.unwrap_or(""), + state.current_turn(), + &tool_call.id, + &tool_call.name, + &error_msg, + 1, + 0, + &None, + Some("duplicate_tool_call".to_string()), + ); return Ok(true); } @@ -68,6 +80,17 @@ impl AgentLoop { &parse_outcome.output, true, )); + self.config.rl_trajectory_recorder.record_tool_result( + session_id.unwrap_or(""), + state.current_turn(), + &tool_call.id, + &tool_call.name, + &parse_outcome.output, + 1, + 0, + &None, + Some("parse_error".to_string()), + ); if let Some(msg) = parse_outcome.fatal_message { tracing::error!("{}", msg); diff --git a/core/src/agent/tool_turn.rs b/core/src/agent/tool_turn.rs index 3714608..8e5b0bd 100644 --- a/core/src/agent/tool_turn.rs +++ b/core/src/agent/tool_turn.rs @@ -43,6 +43,12 @@ impl AgentLoop { ) -> anyhow::Result<()> { state.record_tool_call(); let tool_start = std::time::Instant::now(); + let turn = state.current_turn(); + self.config.rl_trajectory_recorder.record_tool_call( + session_id.unwrap_or(""), + turn, + &tool_call, + ); tracing::info!( tool_name = tool_call.name.as_str(), @@ -51,7 +57,7 @@ impl AgentLoop { ); if self - .handle_tool_preflight_guard(&tool_call, state, event_tx) + .handle_tool_preflight_guard(&tool_call, state, event_tx, session_id) .await? { return Ok(()); diff --git a/core/src/agent_api.rs b/core/src/agent_api.rs index 709e92f..50115d8 100644 --- a/core/src/agent_api.rs +++ b/core/src/agent_api.rs @@ -208,6 +208,13 @@ pub struct SessionOptions { /// tasks). `None` (default) keeps everything — fine for short /// sessions, a memory leak for hours-long cluster workloads. pub retention_limits: Option, + /// Optional structured JSONL trajectory config. + /// + /// When set, a3s-code records user prompts, LLM turns, tool calls, + /// tool observations, token usage, and execution end status for RL + /// training or service data collection. If unset, the same config can + /// be enabled by `A3S_CODE_TRAJECTORY_PATH`. + pub rl_trajectory: Option, /// Auto-save after each completed `send()` or default-history `stream()` call. pub auto_save: bool, /// Optional artifact retention limits for large tool/program outputs. diff --git a/core/src/agent_api/session_builder.rs b/core/src/agent_api/session_builder.rs index 136bdc3..e185096 100644 --- a/core/src/agent_api/session_builder.rs +++ b/core/src/agent_api/session_builder.rs @@ -138,6 +138,12 @@ pub(super) fn build_agent_session( let base = agent.config.clone(); let auto_delegation = resolve_auto_delegation_config(&agent.code_config, opts); + let rl_trajectory_config = match opts.rl_trajectory.clone() { + Some(config) => Some(config), + None => crate::rl_trajectory::RlTrajectoryConfig::from_env()?, + }; + let rl_trajectory_recorder = + crate::rl_trajectory::RlTrajectoryRecorder::from_config(rl_trajectory_config)?; let config = AgentConfig { prompt_slots, tools: tool_defs, @@ -180,6 +186,7 @@ pub(super) fn build_agent_session( agent_registry: Some(Arc::clone(&agent_registry)), max_execution_time_ms: opts.max_execution_time_ms.or(base.max_execution_time_ms), budget_guard: opts.budget_guard.clone().or(base.budget_guard.clone()), + rl_trajectory_recorder, host_env: opts .host_env .clone() diff --git a/core/src/agent_api/session_options.rs b/core/src/agent_api/session_options.rs index 4850ed5..93d1319 100644 --- a/core/src/agent_api/session_options.rs +++ b/core/src/agent_api/session_options.rs @@ -42,6 +42,7 @@ impl std::fmt::Debug for SessionOptions { .field("memory_store", &self.memory_store.is_some()) .field("session_store", &self.session_store.is_some()) .field("session_id", &self.session_id) + .field("rl_trajectory", &self.rl_trajectory) .field("auto_save", &self.auto_save) .field("artifact_store_limits", &self.artifact_store_limits) .field("max_parse_retries", &self.max_parse_retries) @@ -365,6 +366,16 @@ impl SessionOptions { self } + /// Enable structured JSONL trajectory capture for this session. + /// + /// This is the preferred programmatic path for RL training and deployed + /// service data collection. Environment-only deployments can instead set + /// `A3S_CODE_TRAJECTORY_PATH`. + pub fn with_rl_trajectory(mut self, config: crate::rl_trajectory::RlTrajectoryConfig) -> Self { + self.rl_trajectory = Some(config); + self + } + /// Enable auto-save after each `send()` call pub fn with_auto_save(mut self, enabled: bool) -> Self { self.auto_save = enabled; diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index 02f0ebc..cbbda6b 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -2682,6 +2682,42 @@ async fn test_active_skill_tool_restriction_option_defaults_and_overrides() { assert!(legacy_session.config.enforce_active_skill_tool_restrictions); } +#[tokio::test(flavor = "multi_thread")] +async fn test_session_options_with_rl_trajectory_records_jsonl() { + let dir = tempfile::TempDir::new().unwrap(); + let trajectory_path = dir.path().join("trajectory.jsonl"); + let agent = Agent::from_config(test_config()).await.unwrap(); + + let opts = SessionOptions::new().with_rl_trajectory( + crate::rl_trajectory::RlTrajectoryConfig::new(&trajectory_path).with_max_text_bytes(4), + ); + let session = agent + .session("/tmp/test-ws-rl-trajectory", Some(opts)) + .unwrap(); + + assert!(session.config.rl_trajectory_recorder.is_enabled()); + session + .config + .rl_trajectory_recorder + .record_execution_start( + "sess-rl", + std::path::Path::new("/tmp/test-ws-rl-trajectory"), + "abcdef", + &[], + None, + 16, + "disabled", + ); + + let content = std::fs::read_to_string(&trajectory_path).unwrap(); + let record: serde_json::Value = serde_json::from_str(content.lines().next().unwrap()).unwrap(); + assert_eq!(record["schema"], crate::rl_trajectory::RL_TRAJECTORY_SCHEMA); + assert_eq!(record["event_type"], "execution_start"); + assert_eq!(record["session_id"], "sess-rl"); + assert_eq!(record["payload"]["prompt"]["text"], "abcd"); + assert_eq!(record["payload"]["prompt"]["truncated"], true); +} + #[tokio::test(flavor = "multi_thread")] async fn test_session_max_parallel_tasks_config_and_override() { let mut config = test_config(); diff --git a/core/src/lib.rs b/core/src/lib.rs index a68cf3a..197606d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -103,6 +103,7 @@ pub(crate) mod prompts; pub mod queue; pub mod retention; pub(crate) mod retry; +pub mod rl_trajectory; pub mod run; pub(crate) mod safety_gate; pub mod sandbox; @@ -144,6 +145,7 @@ pub use orchestration::{ WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION, }; pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots}; +pub use rl_trajectory::{RlTrajectoryConfig, RlTrajectoryMode, RlTrajectoryRecorder}; pub use run::{ ActiveToolSnapshot, InMemoryRunStore, RunEventRecord, RunHandle, RunRecord, RunSnapshot, RunStatus, diff --git a/core/src/rl_trajectory.rs b/core/src/rl_trajectory.rs new file mode 100644 index 0000000..77c062f --- /dev/null +++ b/core/src/rl_trajectory.rs @@ -0,0 +1,601 @@ +//! RL trajectory recording primitives. +//! +//! The default runtime trace is intentionally lightweight and diagnostic. This +//! module records an opt-in training-oriented JSONL stream that can reconstruct LLM +//! turns, tool calls, tool observations, token usage, and termination reasons. +//! It is opt-in: normal sessions pay only a cheap disabled-recorder branch. + +use crate::llm::{ContentBlock, LlmResponse, Message, TokenUsage, ToolCall}; +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, +}; + +pub const RL_TRAJECTORY_SCHEMA: &str = "a3s.rl_trajectory.v1"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum RlTrajectoryMode { + #[default] + Off, + On, +} + +impl RlTrajectoryMode { + pub fn parse(value: &str) -> Option { + match value.trim().to_ascii_lowercase().as_str() { + "" | "off" | "0" | "false" | "none" => Some(Self::Off), + "on" | "1" | "true" | "yes" | "enabled" | "rl" | "train" | "training" + | "trajectory" | "trace" | "debug" | "full" | "compact" => Some(Self::On), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RlTrajectoryConfig { + pub mode: RlTrajectoryMode, + pub path: PathBuf, + pub max_text_bytes: usize, + pub include_messages: bool, +} + +impl RlTrajectoryConfig { + pub fn new(path: impl Into) -> Self { + Self { + mode: RlTrajectoryMode::On, + path: path.into(), + max_text_bytes: default_max_text_bytes(RlTrajectoryMode::On), + include_messages: true, + } + } + + pub fn with_mode(mut self, mode: RlTrajectoryMode) -> Self { + self.mode = mode; + self.max_text_bytes = default_max_text_bytes(mode); + self.include_messages = mode == RlTrajectoryMode::On; + self + } + + pub fn with_max_text_bytes(mut self, max_text_bytes: usize) -> Self { + self.max_text_bytes = max_text_bytes; + self + } + + pub fn with_include_messages(mut self, include_messages: bool) -> Self { + self.include_messages = include_messages; + self + } + + pub fn from_env() -> Result> { + let mode_env = env_first(&["A3S_CODE_TRAJECTORY_MODE", "A3S_CODE_RL_TRAJECTORY_MODE"]); + let path_env = env_first(&["A3S_CODE_TRAJECTORY_PATH", "A3S_CODE_RL_TRAJECTORY_PATH"]); + if mode_env.is_none() && path_env.is_none() { + return Ok(None); + } + + let mode = match mode_env.as_deref() { + Some(raw) => RlTrajectoryMode::parse(raw) + .with_context(|| format!("invalid A3S_CODE_RL_TRAJECTORY_MODE: {raw}"))?, + None => RlTrajectoryMode::On, + }; + if mode == RlTrajectoryMode::Off { + return Ok(None); + } + + let path = path_env + .filter(|s| !s.trim().is_empty()) + .with_context(|| "A3S_CODE_TRAJECTORY_PATH is required when trajectory mode is on")?; + + let max_text_bytes = env_first(&[ + "A3S_CODE_TRAJECTORY_MAX_TEXT_BYTES", + "A3S_CODE_RL_TRAJECTORY_MAX_TEXT_BYTES", + ]) + .and_then(|value| value.parse::().ok()) + .unwrap_or_else(|| default_max_text_bytes(mode)); + + let include_messages = env_first(&[ + "A3S_CODE_TRAJECTORY_INCLUDE_MESSAGES", + "A3S_CODE_RL_TRAJECTORY_INCLUDE_MESSAGES", + ]) + .and_then(|value| parse_bool(&value)) + .unwrap_or(true); + + Ok(Some(Self { + mode, + path: PathBuf::from(path), + max_text_bytes, + include_messages, + })) + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct RlTrajectoryContext { + #[serde(skip_serializing_if = "Option::is_none")] + pub run_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub group_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub replica_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sample_id: Option, +} + +impl RlTrajectoryContext { + fn from_env() -> Self { + Self { + run_id: env_first(&["A3S_CODE_RL_RUN_ID", "A3S_CODE_RUN_ID", "A3S_RUN_ID"]), + task_id: env_first(&["A3S_CODE_RL_TASK_ID", "A3S_CODE_TASK_ID", "TASK_ID"]), + group_id: env_first(&["A3S_CODE_RL_GROUP_ID", "A3S_CODE_GROUP_ID"]), + replica_id: env_first(&["A3S_CODE_RL_REPLICA_ID", "A3S_CODE_REPLICA_ID"]), + sample_id: env_first(&["A3S_CODE_RL_SAMPLE_ID", "A3S_CODE_SAMPLE_ID"]), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CapturedText { + pub byte_len: usize, + pub sha256: String, + pub truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub preview: Option, +} + +#[derive(Clone)] +pub struct RlTrajectoryRecorder { + inner: Option>, +} + +struct RlTrajectoryRecorderInner { + config: RlTrajectoryConfig, + context: RlTrajectoryContext, + sequence: AtomicU64, + file: Mutex, +} + +impl std::fmt::Debug for RlTrajectoryRecorder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RlTrajectoryRecorder") + .field("enabled", &self.inner.is_some()) + .finish() + } +} + +impl Default for RlTrajectoryRecorder { + fn default() -> Self { + Self::disabled() + } +} + +impl RlTrajectoryRecorder { + pub fn disabled() -> Self { + Self { inner: None } + } + + pub fn from_config(config: Option) -> Result { + let Some(config) = config else { + return Ok(Self::disabled()); + }; + if config.mode == RlTrajectoryMode::Off { + return Ok(Self::disabled()); + } + + if let Some(parent) = config.path.parent().filter(|p| !p.as_os_str().is_empty()) { + std::fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create RL trajectory directory {}", + parent.display() + ) + })?; + } + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&config.path) + .with_context(|| { + format!( + "failed to open RL trajectory JSONL {}", + config.path.display() + ) + })?; + + Ok(Self { + inner: Some(Arc::new(RlTrajectoryRecorderInner { + config, + context: RlTrajectoryContext::from_env(), + sequence: AtomicU64::new(0), + file: Mutex::new(file), + })), + }) + } + + pub fn is_enabled(&self) -> bool { + self.inner.is_some() + } + + pub fn record_execution_start( + &self, + session_id: &str, + workspace: &Path, + prompt: &str, + history: &[Message], + system_prompt: Option<&str>, + max_tool_rounds: usize, + planning_mode: &str, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "workspace": workspace.display().to_string(), + "prompt": inner.capture_text(prompt), + "history_message_count": history.len(), + "history": inner.capture_messages(history), + "system_prompt": system_prompt.map(|s| inner.capture_text(s)), + "max_tool_rounds": max_tool_rounds, + "planning_mode": planning_mode, + }); + inner.record("execution_start", session_id, payload); + } + + pub fn record_llm_request( + &self, + session_id: &str, + turn: usize, + messages: &[Message], + system: Option<&str>, + available_tools: &[String], + estimated_prompt_tokens: usize, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "turn": turn, + "messages_count": messages.len(), + "messages": inner.capture_messages(messages), + "system_prompt": system.map(|s| inner.capture_text(s)), + "available_tools": available_tools, + "estimated_prompt_tokens": estimated_prompt_tokens, + }); + inner.record("llm_request", session_id, payload); + } + + pub fn record_llm_response( + &self, + session_id: &str, + turn: usize, + response: &LlmResponse, + duration_ms: u64, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "turn": turn, + "message": inner.capture_message(&response.message), + "response_text": inner.capture_text(&response.text()), + "reasoning_content": response.message.reasoning_content.as_ref().map(|s| inner.capture_text(s)), + "tool_calls": response.tool_calls().iter().map(tool_call_value).collect::>(), + "usage": token_usage_value(&response.usage), + "stop_reason": response.stop_reason.clone(), + "meta": response.meta.clone(), + "duration_ms": duration_ms, + }); + inner.record("llm_response", session_id, payload); + } + + pub fn record_tool_call(&self, session_id: &str, turn: usize, tool_call: &ToolCall) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "turn": turn, + "tool_call_id": tool_call.id, + "tool": tool_call.name, + "args": tool_call.args, + }); + inner.record("tool_call", session_id, payload); + } + + #[allow(clippy::too_many_arguments)] + pub fn record_tool_result( + &self, + session_id: &str, + turn: usize, + tool_call_id: &str, + tool_name: &str, + output: &str, + exit_code: i32, + duration_ms: u64, + metadata: &Option, + error_kind: Option, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "turn": turn, + "tool_call_id": tool_call_id, + "tool": tool_name, + "success": exit_code == 0, + "exit_code": exit_code, + "duration_ms": duration_ms, + "output": inner.capture_text(output), + "metadata": metadata, + "error_kind": error_kind, + }); + inner.record("tool_result", session_id, payload); + } + + pub fn record_context_compacted( + &self, + session_id: &str, + before_messages: usize, + after_messages: &[Message], + percent_before: f32, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "before_messages": before_messages, + "after_messages": after_messages.len(), + "percent_before": percent_before, + "messages": inner.capture_messages(after_messages), + }); + inner.record("context_compacted", session_id, payload); + } + + pub fn record_execution_end( + &self, + session_id: &str, + success: bool, + response_text: Option<&str>, + usage: Option<&TokenUsage>, + tool_calls_count: Option, + error_message: Option<&str>, + ) { + let Some(inner) = &self.inner else { + return; + }; + let payload = json!({ + "success": success, + "response_text": response_text.map(|s| inner.capture_text(s)), + "usage": usage.map(token_usage_value), + "tool_calls_count": tool_calls_count, + "error_message": error_message.map(|s| inner.capture_text(s)), + }); + inner.record("execution_end", session_id, payload); + } +} + +impl RlTrajectoryRecorderInner { + fn record(&self, event_type: &str, session_id: &str, payload: Value) { + let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) + 1; + let record = json!({ + "schema": RL_TRAJECTORY_SCHEMA, + "sequence": sequence, + "timestamp_ms": chrono::Utc::now().timestamp_millis(), + "event_type": event_type, + "session_id": session_id, + "mode": self.config.mode, + "context": self.context, + "payload": payload, + }); + + let line = match serde_json::to_string(&record) { + Ok(line) => line, + Err(err) => { + tracing::warn!(error = %err, "Failed to serialize RL trajectory record"); + return; + } + }; + let mut file = match self.file.lock() { + Ok(file) => file, + Err(poisoned) => poisoned.into_inner(), + }; + if let Err(err) = writeln!(file, "{line}") { + tracing::warn!(error = %err, "Failed to write RL trajectory record"); + } + } + + fn capture_messages(&self, messages: &[Message]) -> Value { + if !self.config.include_messages { + return json!({ + "included": false, + "count": messages.len(), + "roles": messages.iter().map(|m| m.role.as_str()).collect::>(), + }); + } + Value::Array( + messages + .iter() + .enumerate() + .map(|(index, message)| { + let mut value = self.capture_message(message); + if let Value::Object(ref mut object) = value { + object.insert("index".to_string(), json!(index)); + } + value + }) + .collect(), + ) + } + + fn capture_message(&self, message: &Message) -> Value { + json!({ + "role": message.role, + "content": message.content.iter().map(|block| self.capture_content_block(block)).collect::>(), + "reasoning_content": message.reasoning_content.as_ref().map(|s| self.capture_text(s)), + }) + } + + fn capture_content_block(&self, block: &ContentBlock) -> Value { + match block { + ContentBlock::Text { text } => json!({ + "type": "text", + "text": self.capture_text(text), + }), + ContentBlock::Image { source } => json!({ + "type": "image", + "source": { + "media_type": source.media_type, + "data": self.capture_text(&source.data), + } + }), + ContentBlock::ToolUse { id, name, input } => json!({ + "type": "tool_use", + "id": id, + "name": name, + "input": input, + }), + ContentBlock::ToolResult { + tool_use_id, + content, + is_error, + } => json!({ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": self.capture_text(&content.as_text()), + "is_error": is_error, + }), + } + } + + fn capture_text(&self, text: &str) -> CapturedText { + let byte_len = text.len(); + let sha256 = sha256::digest(text); + let (captured, truncated) = truncate_utf8(text, self.config.max_text_bytes); + CapturedText { + byte_len, + sha256, + truncated, + text: Some(captured), + preview: None, + } + } +} + +fn tool_call_value(tool_call: &ToolCall) -> Value { + json!({ + "tool_call_id": tool_call.id, + "tool": tool_call.name, + "args": tool_call.args, + }) +} + +fn token_usage_value(usage: &TokenUsage) -> Value { + json!({ + "prompt_tokens": usage.prompt_tokens, + "completion_tokens": usage.completion_tokens, + "total_tokens": usage.total_tokens, + "cache_read_tokens": usage.cache_read_tokens, + "cache_write_tokens": usage.cache_write_tokens, + }) +} + +fn truncate_utf8(text: &str, max_bytes: usize) -> (String, bool) { + if text.len() <= max_bytes { + return (text.to_string(), false); + } + let mut end = max_bytes.min(text.len()); + while end > 0 && !text.is_char_boundary(end) { + end -= 1; + } + (text[..end].to_string(), true) +} + +fn default_max_text_bytes(mode: RlTrajectoryMode) -> usize { + match mode { + RlTrajectoryMode::Off => 0, + RlTrajectoryMode::On => 1024 * 1024, + } +} + +fn parse_bool(value: &str) -> Option { + match value.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "yes" | "on" => Some(true), + "0" | "false" | "no" | "off" => Some(false), + _ => None, + } +} + +fn env_first(names: &[&str]) -> Option { + names.iter().find_map(|name| { + std::env::var(name) + .ok() + .filter(|value| !value.trim().is_empty()) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn rl_recorder_writes_jsonl() { + let dir = tempdir().unwrap(); + let path = dir.path().join("trajectory.jsonl"); + let recorder = + RlTrajectoryRecorder::from_config(Some(RlTrajectoryConfig::new(&path))).unwrap(); + + recorder.record_execution_start( + "sess-1", + Path::new("/workspace"), + "solve task", + &[], + Some("system"), + 64, + "disabled", + ); + recorder.record_tool_result("sess-1", 1, "tool-1", "bash", "ok", 0, 3, &None, None); + + let lines = std::fs::read_to_string(path).unwrap(); + assert_eq!(lines.lines().count(), 2); + let first: Value = serde_json::from_str(lines.lines().next().unwrap()).unwrap(); + assert_eq!(first["schema"], RL_TRAJECTORY_SCHEMA); + assert_eq!(first["event_type"], "execution_start"); + assert_eq!(first["session_id"], "sess-1"); + } + + #[test] + fn enabled_mode_records_text_with_truncation_flag() { + let dir = tempdir().unwrap(); + let path = dir.path().join("trajectory.jsonl"); + let recorder = RlTrajectoryRecorder::from_config(Some( + RlTrajectoryConfig::new(&path).with_max_text_bytes(3), + )) + .unwrap(); + + recorder.record_execution_start( + "sess-1", + Path::new("/workspace"), + "abcdef", + &[], + None, + 64, + "auto", + ); + + let text = std::fs::read_to_string(path).unwrap(); + let record: Value = serde_json::from_str(text.lines().next().unwrap()).unwrap(); + let prompt = &record["payload"]["prompt"]; + assert!(prompt.get("sha256").is_some()); + assert_eq!(prompt["text"], "abc"); + assert_eq!(prompt["truncated"], true); + } +} diff --git a/sdk/python/src/lib.rs b/sdk/python/src/lib.rs index 589f4ed..54563f8 100644 --- a/sdk/python/src/lib.rs +++ b/sdk/python/src/lib.rs @@ -4814,7 +4814,10 @@ impl PyAutoDelegationConfig { fn __repr__(&self) -> String { format!( "AutoDelegationConfig(enabled={}, auto_parallel={}, min_confidence={}, max_tasks={})", - self.enabled, self.auto_parallel, self.min_confidence, self.max_tasks + self.enabled, + self.auto_parallel, + self.min_confidence, + self.max_tasks ) } } @@ -4963,6 +4966,15 @@ struct PySessionOptions { /// long-running cluster sessions to stop in-memory state from /// growing unboundedly. retention_limits: Option, + /// Structured JSONL trajectory path. When set, records user input, + /// LLM turns, tool calls, tool observations, token usage, and episode end. + trajectory_path: Option, + /// Trajectory mode: "on" or "off". Defaults to "on" when trajectory_path is set. + trajectory_mode: Option, + /// Max bytes retained for any single text field before truncation. + trajectory_max_text_bytes: Option, + /// Whether to include full message arrays in LLM request records. + trajectory_include_messages: Option, } impl Clone for PySessionOptions { @@ -5028,6 +5040,10 @@ impl Clone for PySessionOptions { retention_limits: pyo3::Python::with_gil(|py| { self.retention_limits.as_ref().map(|o| o.clone_ref(py)) }), + trajectory_path: self.trajectory_path.clone(), + trajectory_mode: self.trajectory_mode.clone(), + trajectory_max_text_bytes: self.trajectory_max_text_bytes, + trajectory_include_messages: self.trajectory_include_messages, } } } @@ -5083,6 +5099,10 @@ impl PySessionOptions { ahp_transport: None, budget_guard: None, retention_limits: None, + trajectory_path: None, + trajectory_mode: None, + trajectory_max_text_bytes: None, + trajectory_include_messages: None, } } @@ -5639,6 +5659,54 @@ impl PySessionOptions { self.retention_limits = value; } + /// Structured JSONL trajectory output path. + /// + /// When set, a3s-code records user input, LLM turns, tool calls, tool + /// observations, token usage, and execution end status. This is the + /// programmatic equivalent of ``A3S_CODE_TRAJECTORY_PATH``. + #[getter] + fn get_trajectory_path(&self) -> Option { + self.trajectory_path.clone() + } + + #[setter] + fn set_trajectory_path(&mut self, value: Option) { + self.trajectory_path = value; + } + + /// Trajectory mode: ``"on"`` or ``"off"``. + #[getter] + fn get_trajectory_mode(&self) -> Option { + self.trajectory_mode.clone() + } + + #[setter] + fn set_trajectory_mode(&mut self, value: Option) { + self.trajectory_mode = value; + } + + /// Max bytes retained for any single text field before truncation. + #[getter] + fn get_trajectory_max_text_bytes(&self) -> Option { + self.trajectory_max_text_bytes + } + + #[setter] + fn set_trajectory_max_text_bytes(&mut self, value: Option) { + self.trajectory_max_text_bytes = value; + } + + /// Whether LLM request records include full message arrays. + #[getter] + fn get_trajectory_include_messages(&self) -> Option { + self.trajectory_include_messages + } + + #[setter] + fn set_trajectory_include_messages(&mut self, value: Option) { + self.trajectory_include_messages = value; + } + /// Register an instruction skill programmatically. /// /// Instructions are injected into the system prompt at session start. @@ -6101,6 +6169,22 @@ fn build_rust_session_options(so: PySessionOptions) -> PyResult