Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub(crate) struct AgentConfig {
pub goal_tracking: bool,
/// Optional hook engine for firing lifecycle events (PreToolUse, PostToolUse, etc.)
pub hook_engine: Option<Arc<dyn HookExecutor>>,
/// Optional structured JSONL trajectory recorder for RL training and service data capture.
pub rl_trajectory_recorder: crate::rl_trajectory::RlTrajectoryRecorder,
/// Optional skill registry for tool permission enforcement
pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
/// When true, active skill `allowed-tools` restrict ordinary session tool calls.
Expand Down Expand Up @@ -183,6 +185,7 @@ impl std::fmt::Debug for AgentConfig {
.field("planning_mode", &self.planning_mode)
.field("goal_tracking", &self.goal_tracking)
.field("hook_engine", &self.hook_engine.is_some())
.field("rl_trajectory", &self.rl_trajectory_recorder.is_enabled())
.field(
"skill_registry",
&self.skill_registry.as_ref().map(|r| r.len()),
Expand Down Expand Up @@ -230,6 +233,7 @@ impl Default for AgentConfig {
planning_mode: PlanningMode::default(),
goal_tracking: false,
hook_engine: None,
rl_trajectory_recorder: crate::rl_trajectory::RlTrajectoryRecorder::disabled(),
skill_registry: Some(Arc::new(crate::skills::SkillRegistry::with_builtins())),
enforce_active_skill_tool_restrictions: false,
max_parse_retries: 2,
Expand Down
16 changes: 16 additions & 0 deletions core/src/agent/execution_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ impl AgentLoop {
a3s.llm.total_tokens = r.usage.total_tokens,
"a3s.agent.execute completed"
);
self.config.rl_trajectory_recorder.record_execution_end(
session_id.unwrap_or(""),
true,
Some(&r.text),
Some(&r.usage),
Some(r.tool_calls_count),
None,
);
self.fire_post_response(
session_id.unwrap_or(""),
&r.text,
Expand All @@ -204,6 +212,14 @@ impl AgentLoop {
error = %e,
"a3s.agent.execute failed"
);
self.config.rl_trajectory_recorder.record_execution_end(
session_id.unwrap_or(""),
false,
None,
None,
None,
Some(&e.to_string()),
);
self.fire_on_error(
session_id.unwrap_or(""),
ErrorType::Other,
Expand Down
4 changes: 4 additions & 0 deletions core/src/agent/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl ExecutionLoopState {
self.turn
}

pub(super) fn current_turn(&self) -> usize {
self.turn
}

pub(super) fn continuation_count(&self) -> u32 {
self.continuation_count
}
Expand Down
27 changes: 27 additions & 0 deletions core/src/agent/llm_turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ impl AgentLoop {
"LLM completion started"
);

let selected_tool_names =
crate::tools::select_tools_for_messages(&self.config.tools, &state.messages)
.into_iter()
.map(|tool| tool.name)
.collect::<Vec<_>>();
self.config.rl_trajectory_recorder.record_llm_request(
session_id.unwrap_or(""),
turn,
&state.messages,
augmented_system.as_deref(),
&selected_tool_names,
estimate_prompt_tokens(&state.messages, augmented_system.as_deref()),
);

self.fire_generate_start(session_id.unwrap_or(""), effective_prompt, augmented_system)
.await;

Expand Down Expand Up @@ -255,6 +269,12 @@ impl AgentLoop {
a3s.llm.total_tokens = response.usage.total_tokens,
"Turn token usage"
);
self.config.rl_trajectory_recorder.record_llm_response(
session_id.unwrap_or(""),
turn,
response,
llm_duration.as_millis() as u64,
);
}

async fn emit_turn_end(
Expand Down Expand Up @@ -318,6 +338,13 @@ impl AgentLoop {
state.messages = compacted;
}

self.config.rl_trajectory_recorder.record_context_compacted(
session_id.unwrap_or(""),
before_len,
&state.messages,
percent_before,
);

if let Some(tx) = event_tx {
tx.send(AgentEvent::ContextCompacted {
session_id: session_id.unwrap_or("").to_string(),
Expand Down
10 changes: 10 additions & 0 deletions core/src/agent/loop_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ impl AgentLoop {
let effective_prompt = turn_context.effective_prompt.as_str();
let augmented_system = turn_context.augmented_system;

self.config.rl_trajectory_recorder.record_execution_start(
session_id.unwrap_or(""),
&self.tool_context.workspace,
effective_prompt,
history,
augmented_system.as_deref(),
self.config.max_tool_rounds,
&format!("{:?}", self.config.planning_mode),
);

// Add user message
if !msg_prompt.is_empty() {
state.messages.push(Message::user(msg_prompt));
Expand Down
15 changes: 15 additions & 0 deletions core/src/agent/tool_completion_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ impl AgentLoop {
)
.await;

self.config.rl_trajectory_recorder.record_tool_result(
session_id.unwrap_or(""),
state.current_turn(),
&tool_call.id,
&tool_call.name,
&output,
normalized.exit_code,
tool_duration.as_millis() as u64,
&normalized.metadata,
normalized
.error_kind
.as_ref()
.map(|kind| format!("{kind:?}")),
);

self.remember_tool_result(
effective_prompt,
&tool_call.name,
Expand Down
26 changes: 26 additions & 0 deletions core/src/agent/tool_execution_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::tool_result_runtime::NormalizedToolResult;
use super::{AgentEvent, AgentLoop, ToolCommand};
use crate::llm::ToolCall;
use crate::tools::{ToolContext, ToolStreamEvent};
use serde_json::Value;
use std::sync::Arc;
Expand Down Expand Up @@ -41,6 +42,16 @@ impl AgentLoop {
event_tx: &Option<mpsc::Sender<AgentEvent>>,
) -> (String, i32, bool, Option<Value>) {
let call_id = format!("plan-{}-{}", tool_name, uuid::Uuid::new_v4());
let synthetic_call = ToolCall {
id: call_id.clone(),
name: tool_name.to_string(),
args: args.clone(),
};
self.config.rl_trajectory_recorder.record_tool_call(
session_id.unwrap_or(""),
0,
&synthetic_call,
);
if let Some(tx) = event_tx {
tx.send(AgentEvent::ToolStart {
id: call_id.clone(),
Expand All @@ -51,9 +62,24 @@ impl AgentLoop {
}

let ctx = self.tool_context_for_plan(session_id);
let started = std::time::Instant::now();
let normalized = NormalizedToolResult::from_execution(
self.execute_tool_timed(tool_name, args, &ctx).await,
);
self.config.rl_trajectory_recorder.record_tool_result(
session_id.unwrap_or(""),
0,
&call_id,
tool_name,
&normalized.output,
normalized.exit_code,
started.elapsed().as_millis() as u64,
&normalized.metadata,
normalized
.error_kind
.as_ref()
.map(|kind| format!("{kind:?}")),
);

if let Some(tx) = event_tx {
tx.send(AgentEvent::ToolEnd {
Expand Down
23 changes: 23 additions & 0 deletions core/src/agent/tool_guard_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ impl AgentLoop {
tool_call: &ToolCall,
state: &mut ExecutionLoopState,
event_tx: &Option<mpsc::Sender<AgentEvent>>,
session_id: Option<&str>,
) -> anyhow::Result<bool> {
if let Some((duplicate_count, error_msg)) = state.duplicate_tool_call(
&tool_call.name,
Expand All @@ -37,6 +38,17 @@ impl AgentLoop {
state
.messages
.push(Message::tool_result(&tool_call.id, &error_msg, true));
self.config.rl_trajectory_recorder.record_tool_result(
session_id.unwrap_or(""),
state.current_turn(),
&tool_call.id,
&tool_call.name,
&error_msg,
1,
0,
&None,
Some("duplicate_tool_call".to_string()),
);
return Ok(true);
}

Expand Down Expand Up @@ -68,6 +80,17 @@ impl AgentLoop {
&parse_outcome.output,
true,
));
self.config.rl_trajectory_recorder.record_tool_result(
session_id.unwrap_or(""),
state.current_turn(),
&tool_call.id,
&tool_call.name,
&parse_outcome.output,
1,
0,
&None,
Some("parse_error".to_string()),
);

if let Some(msg) = parse_outcome.fatal_message {
tracing::error!("{}", msg);
Expand Down
8 changes: 7 additions & 1 deletion core/src/agent/tool_turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ impl AgentLoop {
) -> anyhow::Result<()> {
state.record_tool_call();
let tool_start = std::time::Instant::now();
let turn = state.current_turn();
self.config.rl_trajectory_recorder.record_tool_call(
session_id.unwrap_or(""),
turn,
&tool_call,
);

tracing::info!(
tool_name = tool_call.name.as_str(),
Expand All @@ -51,7 +57,7 @@ impl AgentLoop {
);

if self
.handle_tool_preflight_guard(&tool_call, state, event_tx)
.handle_tool_preflight_guard(&tool_call, state, event_tx, session_id)
.await?
{
return Ok(());
Expand Down
7 changes: 7 additions & 0 deletions core/src/agent_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ pub struct SessionOptions {
/// tasks). `None` (default) keeps everything — fine for short
/// sessions, a memory leak for hours-long cluster workloads.
pub retention_limits: Option<crate::retention::SessionRetentionLimits>,
/// Optional structured JSONL trajectory config.
///
/// When set, a3s-code records user prompts, LLM turns, tool calls,
/// tool observations, token usage, and execution end status for RL
/// training or service data collection. If unset, the same config can
/// be enabled by `A3S_CODE_TRAJECTORY_PATH`.
pub rl_trajectory: Option<crate::rl_trajectory::RlTrajectoryConfig>,
/// Auto-save after each completed `send()` or default-history `stream()` call.
pub auto_save: bool,
/// Optional artifact retention limits for large tool/program outputs.
Expand Down
7 changes: 7 additions & 0 deletions core/src/agent_api/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ pub(super) fn build_agent_session(

let base = agent.config.clone();
let auto_delegation = resolve_auto_delegation_config(&agent.code_config, opts);
let rl_trajectory_config = match opts.rl_trajectory.clone() {
Some(config) => Some(config),
None => crate::rl_trajectory::RlTrajectoryConfig::from_env()?,
};
let rl_trajectory_recorder =
crate::rl_trajectory::RlTrajectoryRecorder::from_config(rl_trajectory_config)?;
let config = AgentConfig {
prompt_slots,
tools: tool_defs,
Expand Down Expand Up @@ -180,6 +186,7 @@ pub(super) fn build_agent_session(
agent_registry: Some(Arc::clone(&agent_registry)),
max_execution_time_ms: opts.max_execution_time_ms.or(base.max_execution_time_ms),
budget_guard: opts.budget_guard.clone().or(base.budget_guard.clone()),
rl_trajectory_recorder,
host_env: opts
.host_env
.clone()
Expand Down
11 changes: 11 additions & 0 deletions core/src/agent_api/session_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl std::fmt::Debug for SessionOptions {
.field("memory_store", &self.memory_store.is_some())
.field("session_store", &self.session_store.is_some())
.field("session_id", &self.session_id)
.field("rl_trajectory", &self.rl_trajectory)
.field("auto_save", &self.auto_save)
.field("artifact_store_limits", &self.artifact_store_limits)
.field("max_parse_retries", &self.max_parse_retries)
Expand Down Expand Up @@ -365,6 +366,16 @@ impl SessionOptions {
self
}

/// Enable structured JSONL trajectory capture for this session.
///
/// This is the preferred programmatic path for RL training and deployed
/// service data collection. Environment-only deployments can instead set
/// `A3S_CODE_TRAJECTORY_PATH`.
pub fn with_rl_trajectory(mut self, config: crate::rl_trajectory::RlTrajectoryConfig) -> Self {
self.rl_trajectory = Some(config);
self
}

/// Enable auto-save after each `send()` call
pub fn with_auto_save(mut self, enabled: bool) -> Self {
self.auto_save = enabled;
Expand Down
36 changes: 36 additions & 0 deletions core/src/agent_api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,42 @@ async fn test_active_skill_tool_restriction_option_defaults_and_overrides() {
assert!(legacy_session.config.enforce_active_skill_tool_restrictions);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_session_options_with_rl_trajectory_records_jsonl() {
let dir = tempfile::TempDir::new().unwrap();
let trajectory_path = dir.path().join("trajectory.jsonl");
let agent = Agent::from_config(test_config()).await.unwrap();

let opts = SessionOptions::new().with_rl_trajectory(
crate::rl_trajectory::RlTrajectoryConfig::new(&trajectory_path).with_max_text_bytes(4),
);
let session = agent
.session("/tmp/test-ws-rl-trajectory", Some(opts))
.unwrap();

assert!(session.config.rl_trajectory_recorder.is_enabled());
session
.config
.rl_trajectory_recorder
.record_execution_start(
"sess-rl",
std::path::Path::new("/tmp/test-ws-rl-trajectory"),
"abcdef",
&[],
None,
16,
"disabled",
);

let content = std::fs::read_to_string(&trajectory_path).unwrap();
let record: serde_json::Value = serde_json::from_str(content.lines().next().unwrap()).unwrap();
assert_eq!(record["schema"], crate::rl_trajectory::RL_TRAJECTORY_SCHEMA);
assert_eq!(record["event_type"], "execution_start");
assert_eq!(record["session_id"], "sess-rl");
assert_eq!(record["payload"]["prompt"]["text"], "abcd");
assert_eq!(record["payload"]["prompt"]["truncated"], true);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_session_max_parallel_tasks_config_and_override() {
let mut config = test_config();
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub(crate) mod prompts;
pub mod queue;
pub mod retention;
pub(crate) mod retry;
pub mod rl_trajectory;
pub mod run;
pub(crate) mod safety_gate;
pub mod sandbox;
Expand Down Expand Up @@ -144,6 +145,7 @@ pub use orchestration::{
WorkflowStepRecord, WORKFLOW_CHECKPOINT_SCHEMA_VERSION,
};
pub use prompts::{AgentStyle, DetectionConfidence, PlanningMode, SystemPromptSlots};
pub use rl_trajectory::{RlTrajectoryConfig, RlTrajectoryMode, RlTrajectoryRecorder};
pub use run::{
ActiveToolSnapshot, InMemoryRunStore, RunEventRecord, RunHandle, RunRecord, RunSnapshot,
RunStatus,
Expand Down
Loading
Loading