Skip to content

ctx.run_node() sequential calls to multiple single_turn agents β€” second agent receives empty/ignored node_inputΒ #5686

@tofucheng

Description

@tofucheng

πŸ”΄ Required Information

Is your feature request related to a specific problem?

Yes. When using a Dynamic Workflow with @node and ctx.run_node() to sequentially execute multiple single_turn agents, the second and subsequent agents do not receive their node_input correctly. The agent behaves as if no input was provided, even though node_input is explicitly passed.

Concrete scenario:

@node(rerun_on_resume=True)
async def my_pipeline(ctx: Context):
    # Step 1: works correctly β€” agent receives user_query and returns structured output
    analysis = await ctx.run_node(query_analyzer, ctx.user_content)
    
    # Step 2: FAILS β€” agent responds as if it has no input
    sql_input = json.dumps(analysis, ensure_ascii=False)
    generated_sql = await ctx.run_node(sql_generator, sql_input)
    # generated_sql contains: "I haven't received any input yet, please provide..."

The query_analyzer (first call) correctly receives its node_input and returns valid output. But the sql_generator (second call) responds as if it received no input β€” it generates a "please provide input" response despite sql_input being a valid JSON string.

Both agents are configured with mode="single_turn" and output_schema=<BaseModel>.

Describe the Solution You'd Like

ctx.run_node(agent, node_input) should reliably deliver node_input to the target agent regardless of how many prior run_node calls have been made in the same @node function. Each sequential run_node call should create a clean, isolated execution context where the agent sees exactly and only its node_input.

Impact on your work

This is critical β€” it makes Dynamic Workflows with sequential agent chaining unusable. The primary use case for ctx.run_node() is building pipelines like:

User Query β†’ Analyzer Agent β†’ SQL Generator Agent β†’ Executor β†’ Report Writer Agent

Without this working, the entire Workflow + Collaborative Agents architecture (a headline feature of ADK 2.0) cannot be used for multi-step pipelines. We had to fall back to a single monolithic Agent, effectively not using any ADK 2.0 Workflow features.

Willingness to contribute

Yes β€” happy to submit a PR if the team confirms the root cause analysis below and agrees on the fix direction.


🟑 Recommended Information

Describe Alternatives You've Considered

  1. Changed agent mode to task or chat β€” Does not work. prepare_llm_agent_input() only injects node_input when agent.mode == 'single_turn'. Other modes silently ignore the node_input parameter entirely.

  2. Used use_sub_branch=True in ctx.run_node() β€” Did not resolve the issue.

  3. Fell back to a single orchestrator Agent with Skills β€” This is our current workaround. We split the monolithic prompt into 6 SKILL.md files loaded via SkillToolset, but use a single Agent instead of a Workflow. This works but loses all Workflow benefits (deterministic flow control, parallel execution, structured I/O between steps).

Proposed API / Implementation

Root cause analysis (traced through google-adk==2.0.0b1 source):

The issue is in google/adk/workflow/_llm_agent_wrapper.py. When run_llm_agent_as_node() is called:

# Step 1: prepare_llm_agent_context() creates a new Context for single_turn agents
def prepare_llm_agent_context(agent, ctx):
    if agent.mode != 'single_turn':
        return ctx
    ic = ctx._invocation_context.model_copy()
    agent_ctx = Context(invocation_context=ic, ...)
    ic.session = ic.session.model_copy(deep=False)  # ← shallow copy
    return agent_ctx

# Step 2: prepare_llm_agent_input() injects node_input as a user event
def prepare_llm_agent_input(agent, ctx, node_input):
    if node_input is not None and agent.mode == 'single_turn':
        agent_input = _node_input_to_content(node_input)
        user_event = Event(author='user', message=agent_input)
        user_event.branch = ctx._invocation_context.branch  # ← parent's branch
        ctx.session.events.append(user_event)

The injected user event gets branch = ctx._invocation_context.branch (the parent pipeline's branch, e.g., "my_pipeline").

But when the agent actually runs via agent.run_async(ic), the framework assigns it a child branch (e.g., "my_pipeline.sql_generator.@1").

Then in google/adk/flows/llm_flows/contents.py, _get_current_turn_contents() filters events by branch:

def _get_current_turn_contents(current_branch, events, agent_name, ...):
    for i in range(len(events) - 1, -1, -1):
        event = events[i]
        if _should_include_event_in_context(current_branch, event) and (
            event.author == 'user' or ...
        ):
            return _get_contents(current_branch, events[i:], ...)
    return []  # ← returns empty if no matching event found

On the first run_node call, the events list is clean and the user event is found. On the second call, the events list contains residual events from the first agent's execution (on a different child branch like "my_pipeline.query_analyzer.@1"), and the branch filtering logic in _is_event_belongs_to_branch() may fail to locate the correct user event for the second agent.

Proposed fix direction:

In prepare_llm_agent_input(), the injected user event's branch should match the child agent's branch (not the parent's), or the session should be fully isolated (deep copy of events list) so each single_turn agent starts with a clean event list containing only its own user event.

# Option A: Deep copy events for single_turn isolation
def prepare_llm_agent_context(agent, ctx):
    if agent.mode != 'single_turn':
        return ctx
    ic = ctx._invocation_context.model_copy()
    agent_ctx = Context(invocation_context=ic, ...)
    ic.session = ic.session.model_copy(deep=True)  # ← deep copy instead of shallow
    ic.session.events = []  # ← start with clean events
    return agent_ctx

# Option B: Set branch to match the child agent's assigned branch
# (requires knowing the branch before agent.run_async, which may need refactoring)

Additional Context

Environment:

  • google-adk==2.0.0b1
  • Python 3.12.2
  • Windows 11
  • agents-cli v0.1.3

Reproduction steps:

  1. Create a Workflow with a single @node(rerun_on_resume=True) entry point
  2. Inside the node, call ctx.run_node(agent_A, "input A") followed by ctx.run_node(agent_B, "input B")
  3. Both agents should have mode="single_turn" and optionally output_schema
  4. Agent A will work correctly; Agent B will respond as if it received no input

Screenshot from ADK Web Playground:

Image

The playground trace shows:

The graph visualization also shows both agents at the same level under the pipeline node, both labeled @1, confirming they are dynamic child runs of the same parent node.

Metadata

Metadata

Assignees

Labels

v2Affects only 2.0 version

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions