Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,15 @@ export class InvalidPreRecordedArgsError extends WorkflowExecutorError {
}
}

// The "Related to" source step ran but loaded no record, so this step has nothing to load from
// (PRD-550 "no source record"). Distinct from a bad config — the user can continue without.
// A "Related to" / "On record" source step ran but loaded no record, so the step that uses it has
// no source to act on (PRD-550 / PRD-469 "no source record"). Distinct from a bad config — the
// user can continue without. Wording is step-type-neutral (shared by load-related and trigger-action).
export class SourceRecordMissingError extends WorkflowExecutorError {
constructor(sourceTitle?: string) {
const from = sourceTitle ? `"${sourceTitle}"` : 'its source step';
super(
`Source step ${from} loaded no record`,
`This step loads from ${from}, but that step didn't load any record, so nothing can be loaded here.`,
`This step uses ${from} as its source, but that step didn't load any record.`,
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ import {
NoRelationshipFieldsError,
RelatedRecordNotFoundError,
RelationNotFoundError,
SourceRecordMissingError,
StepStateError,
} from '../errors';
import RecordStepExecutor from './record-step-executor';
import {
StepExecutionMode,
StepType,
WORKFLOW_START_STEP_ID,
} from '../types/validated/step-definition';
import { StepExecutionMode } from '../types/validated/step-definition';

const SELECT_RELATION_SYSTEM_PROMPT = `You are an AI agent loading a related record based on a user request.
You are given relations to follow, each shown as "<source record> → <relation> (→ <target collection>)".
Expand Down Expand Up @@ -191,43 +186,6 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
};
}

// Revise-safe source resolution. The "Related to" reference is a stable BPMN step id (or the
// WORKFLOW_START_STEP_ID sentinel), not a runtime index — so it survives the index shifts a
// revision causes (clones keep their step id) and is knowable by the editor at build time.
// previousSteps are already restricted to the live path; in a loop the same id can appear more
// than once, so we take the most recent occurrence.
private async resolveSourceRecordRef(stepId: string): Promise<RecordRef> {
if (stepId === WORKFLOW_START_STEP_ID) {
return this.context.baseRecordRef;
}

const matches = this.context.previousSteps.filter(
step =>
step.stepDefinition.type === StepType.LoadRelatedRecord &&
step.stepOutcome.stepId === stepId,
);
const sourceStep = matches[matches.length - 1];

if (sourceStep) {
const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const execution = this.resolveStepExecution(sourceStep, stepExecutions);

if (
execution?.type === 'load-related-record' &&
execution.executionResult !== undefined &&
'record' in execution.executionResult
) {
return execution.executionResult.record;
}

// The source step exists but loaded nothing → clear "no source record" message (PRD-550),
// distinct from a config pointing at a non-existent step.
throw new SourceRecordMissingError(sourceStep.stepDefinition.title);
}

throw new InvalidPreRecordedArgsError(`No source record found for step "${stepId}"`);
}

private async buildRelationCandidates(records: RecordRef[]): Promise<RelationCandidate[]> {
const candidates: RelationCandidate[] = [];

Expand Down
47 changes: 45 additions & 2 deletions packages/workflow-executor/src/executors/record-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import type { RecordStepStatus } from '../types/validated/step-outcome';
import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';
import { z } from 'zod';

import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors';
import {
InvalidAIResponseError,
InvalidPreRecordedArgsError,
NoRecordsError,
SourceRecordMissingError,
} from '../errors';
import BaseStepExecutor from './base-step-executor';
import { StepType } from '../types/validated/step-definition';
import { StepType, WORKFLOW_START_STEP_ID } from '../types/validated/step-definition';

export default abstract class RecordStepExecutor<
TStep extends StepDefinition = StepDefinition,
Expand Down Expand Up @@ -47,6 +52,44 @@ export default abstract class RecordStepExecutor<
return this.selectRecordRef(records, prompt);
}

// Revise-safe source resolution shared by deterministic steps (load-related "Related to",
// trigger-action "On record"). The reference is a stable BPMN step id (or the
// WORKFLOW_START_STEP_ID sentinel), not a runtime index — so it survives the index shifts a
// revision causes (clones keep their step id) and is knowable by the editor at build time.
// previousSteps are already restricted to the live path; in a loop the same id can appear more
// than once, so we take the most recent occurrence.
protected async resolveSourceRecordRef(stepId: string): Promise<RecordRef> {
if (stepId === WORKFLOW_START_STEP_ID) {
return this.context.baseRecordRef;
}

const matches = this.context.previousSteps.filter(
step =>
step.stepDefinition.type === StepType.LoadRelatedRecord &&
step.stepOutcome.stepId === stepId,
);
const sourceStep = matches[matches.length - 1];

if (sourceStep) {
const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const execution = this.resolveStepExecution(sourceStep, stepExecutions);

if (
execution?.type === 'load-related-record' &&
execution.executionResult !== undefined &&
'record' in execution.executionResult
) {
return execution.executionResult.record;
}

// The source step exists but loaded nothing → clear "no source record" message (PRD-550),
// distinct from a config pointing at a non-existent step.
throw new SourceRecordMissingError(sourceStep.stepDefinition.title);
}

throw new InvalidPreRecordedArgsError(`No source record found for step "${stepId}"`);
}

// Candidate sources for the AI: the base record plus the record each live prior
// load-related step resolved — own stepIndex first, falling back to a clone's
// originalStepIndex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
private async handleFirstCall(): Promise<StepExecutionResult> {
const { stepDefinition: step } = this.context;
const { preRecordedArgs } = step;
const records = await this.getAvailableRecordRefs();

const selectedRecordRef = await this.resolveRecordRef(
records,
step.prompt,
preRecordedArgs?.selectedRecordStepIndex,
);
// "On record" pins the source by stable step id (revise-safe); legacy steps without it fall
// back to AI record selection among the available source records (PRD-469).
const selectedRecordRef = preRecordedArgs?.selectedRecordStepId
? await this.resolveSourceRecordRef(preRecordedArgs.selectedRecordStepId)
: await this.resolveRecordRef(await this.getAvailableRecordRefs(), step.prompt);
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const recordedAction = preRecordedArgs?.actionName;
const actionName = recordedAction ?? (await this.selectAction(schema, step.prompt)).actionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ export const TriggerActionStepDefinitionSchema = z.object({
.catch(AutomatedWithConfirmation),
preRecordedArgs: z
.object({
selectedRecordStepIndex: z.number().int().optional(),
/**
* "On record" — the source record the action is triggered on, referenced by the stable BPMN
* step id of the previous Load Related Record step that loaded it, or WORKFLOW_START_STEP_ID
* for the trigger record. Stable across revisions, unlike the runtime stepIndex (PRD-469).
*/
selectedRecordStepId: z.string().optional(),
/** Technical name of the action to trigger */
actionName: z.string().optional(),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,61 @@ describe('TriggerRecordActionStepExecutor', () => {

expect(mockModel.bindTools).toHaveBeenCalledTimes(1);
});

it('pins the source record from selectedRecordStepId=workflow-start without AI (PRD-469)', async () => {
const mockModel = makeMockModel();
const agentPort = makeMockAgentPort();
const context = makeContext({
model: mockModel.model,
agentPort,
stepDefinition: makeStep({
executionType: StepExecutionMode.FullyAutomated,
preRecordedArgs: {
selectedRecordStepId: 'workflow-start',
actionName: 'send-welcome-email',
},
}),
});
const executor = new TriggerRecordActionStepExecutor(context);

const result = await executor.execute();

expect(result.stepOutcome.status).toBe('success');
// Neither record nor action selection invoked the AI.
expect(mockModel.bindTools).not.toHaveBeenCalled();
// Action runs on the workflow-start (base) record, recordId [42].
expect(agentPort.executeAction).toHaveBeenCalledWith(
expect.objectContaining({
collection: 'customers',
action: 'send-welcome-email',
id: [42],
}),
context.user,
);
});

it('errors when the pinned source step (a Load Related Record) loaded no record (PRD-469)', async () => {
const agentPort = makeMockAgentPort();
// The source Load Related Record step is on the live path but has no execution record stored
// (it loaded nothing) → SourceRecordMissingError, no action triggered.
const runStore = makeMockRunStore({ getStepExecutions: jest.fn().mockResolvedValue([]) });
const context = makeContext({
agentPort,
runStore,
previousSteps: [makeLoadRelatedPreviousStep(2)],
stepDefinition: makeStep({
executionType: StepExecutionMode.FullyAutomated,
preRecordedArgs: { selectedRecordStepId: 'load-2', actionName: 'send-welcome-email' },
}),
});
const executor = new TriggerRecordActionStepExecutor(context);

const result = await executor.execute();

expect(result.stepOutcome.status).toBe('error');
expect(result.stepOutcome.error).toContain("didn't load any record");
expect(agentPort.executeAction).not.toHaveBeenCalled();
});
});

describe('idempotency', () => {
Expand Down
Loading