Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/per-org-stream-basins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Per-org S2 stream basins with retention tied to the org's billing plan, gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution.
19 changes: 19 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ import { MachinePresetName } from "@trigger.dev/core/v3";
import { BoolEnv } from "./utils/boolEnv";
import { isValidDatabaseUrl } from "./utils/db";
import { isValidRegex } from "./utils/regex";
import { isValidDuration } from "./services/realtime/duration.server";

// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
function durationString() {
return z
.string()
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
}

// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
Expand Down Expand Up @@ -1506,6 +1515,16 @@ const EnvironmentSchema = z
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
// When "true", provision a dedicated S2 basin per org and stamp
// `streamBasinName` on new rows. Off keeps everything on the single
// basin defined by `REALTIME_STREAMS_S2_BASIN`.
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"),
// Per-org basin name = `{prefix}-{env}-org-{orgId}`.
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import { isValidDuration } from "~/services/realtime/duration.server";
import {
deprovisionBasinForOrg,
ensureBasinForOrg,
} from "~/services/realtime/streamBasinProvisioner.server";

const ParamsSchema = z.object({ organizationId: z.string() });

const BodySchema = z.discriminatedUnion("action", [
z.object({
action: z.literal("ensure"),
retention: z
.string()
.refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y"),
}),
z.object({ action: z.literal("deprovision") }),
]);

export async function action({ request, params }: ActionFunctionArgs) {
await requireAdminApiRequest(request);

const { organizationId } = ParamsSchema.parse(params);

let parsed: z.infer<typeof BodySchema>;
try {
const text = await request.text();
const raw = text.length > 0 ? JSON.parse(text) : {};
const result = BodySchema.safeParse(raw);
if (!result.success) {
return json({ ok: false, error: result.error.flatten() }, { status: 400 });
}
parsed = result.data;
} catch {
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
}

if (parsed.action === "ensure") {
const result = await ensureBasinForOrg(organizationId, parsed.retention);
return json({ ok: true, ...result });
}

const result = await deprovisionBasinForOrg(organizationId);
return json({ ok: true, ...result });
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const { action, loader } = createActionApiRoute(
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
streamBasinName: true,
},
});

Expand Down Expand Up @@ -98,7 +99,8 @@ const { action, loader } = createActionApiRoute(
try {
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

const records = await realtimeStream.readRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ const { action, loader } = createActionApiRoute(
// and remove the pending registration.
if (!result.isCached) {
try {
// Session streams are always v2 (S2) — the writer in
// `appendPartToSessionStream` and the SSE subscribe both
// hardcode "v2", so the race-check reader has to match.
// Don't fall through to the run's own `realtimeStreamsVersion`,
// which only describes the run's run-scoped streams.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
// Session streams are hardcoded v2 by the append-side writer
// and SSE subscribe, so the race-check reader matches. Basin
// comes from `session` only — the writer side passes the same
// and we have to read from the same basin to find the record.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session: maybeSession,
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
ericallam marked this conversation as resolved.
Comment thread
ericallam marked this conversation as resolved.

if (realtimeStream instanceof S2RealtimeStreams) {
const records = await realtimeStream.readSessionStreamRecords(
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const { action } = createActionApiRoute(
runtimeEnvironmentId: authentication.environment.id,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
streamBasinName: authentication.environment.organization.streamBasinName,
},
update: { triggerConfig: triggerConfigJson },
});
Expand All @@ -186,6 +187,7 @@ const { action } = createActionApiRoute(
runtimeEnvironmentId: authentication.environment.id,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
streamBasinName: authentication.environment.organization.streamBasinName,
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ const { action, loader } = createActionApiRoute(
);
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session,
});

if (!(realtimeStream instanceof S2RealtimeStreams)) {
return json(
Expand Down
14 changes: 12 additions & 2 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ const { action } = createActionApiRoute(
});
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
// No-row form: resolve via the org so the stream initialised here
// matches what later appends/subscribes will land on once the row
// is created.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session: maybeSession,
organization: maybeSession ? null : authentication.environment.organization,
});
Comment thread
ericallam marked this conversation as resolved.

if (!(realtimeStream instanceof S2RealtimeStreams)) {
return new Response("Session channels require the S2 realtime backend", {
Expand Down Expand Up @@ -122,7 +128,11 @@ const loader = createLoaderApiRoute(
},
},
async ({ params, request, authentication, resource }) => {
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
// Same no-row fallback as PUT above.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session: resource.row,
organization: resource.row ? null : authentication.environment.organization,
});
Comment thread
ericallam marked this conversation as resolved.
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

if (!(realtimeStream instanceof S2RealtimeStreams)) {
return new Response("Session channels require the S2 realtime backend", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
select: {
id: true,
friendlyId: true,
streamBasinName: true,
runtimeEnvironment: {
include: {
project: true,
Expand Down Expand Up @@ -64,7 +65,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
}

// The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion);
const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion, {
run,
});

return realtimeStream.ingestData(
request.body,
Expand Down Expand Up @@ -127,7 +130,8 @@ export const loader = createLoaderApiRoute(

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const { action } = createActionApiRoute(
realtimeStreamsVersion: true,
completedAt: true,
id: true,
streamBasinName: true,
},
});

Expand Down Expand Up @@ -102,7 +103,8 @@ const { action } = createActionApiRoute(

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
targetRun.realtimeStreamsVersion
targetRun.realtimeStreamsVersion,
{ run: targetRun }
);

const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ const { action } = createActionApiRoute(
select: {
id: true,
friendlyId: true,
streamBasinName: true,
parentTaskRun: {
select: {
friendlyId: true,
streamBasinName: true,
},
},
rootTaskRun: {
select: {
friendlyId: true,
streamBasinName: true,
},
},
},
Expand All @@ -43,17 +46,20 @@ const { action } = createActionApiRoute(
return new Response("Run not found", { status: 404 });
}

const targetId =
const targetRun =
params.target === "self"
? run.friendlyId
? run
: params.target === "parent"
? run.parentTaskRun?.friendlyId
: run.rootTaskRun?.friendlyId;
? run.parentTaskRun
: run.rootTaskRun;

if (!targetId) {
if (!targetRun?.friendlyId) {
return new Response("Target not found", { status: 404 });
}

const targetId = targetRun.friendlyId;
const basinContext = { run: { streamBasinName: targetRun.streamBasinName ?? null } };

if (request.method === "PUT") {
// This is the "create" endpoint
const updatedRun = await prisma.taskRun.update({
Expand All @@ -80,7 +86,8 @@ const { action } = createActionApiRoute(

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
updatedRun.realtimeStreamsVersion
updatedRun.realtimeStreamsVersion,
basinContext
);

const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);
Expand Down Expand Up @@ -112,7 +119,11 @@ const { action } = createActionApiRoute(
resumeFromChunkNumber = parsed;
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
streamVersion,
basinContext
);

return realtimeStream.ingestData(
request.body,
Expand All @@ -139,14 +150,17 @@ const loader = createLoaderApiRoute(
select: {
id: true,
friendlyId: true,
streamBasinName: true,
parentTaskRun: {
select: {
friendlyId: true,
streamBasinName: true,
},
},
rootTaskRun: {
select: {
friendlyId: true,
streamBasinName: true,
},
},
},
Expand All @@ -158,17 +172,19 @@ const loader = createLoaderApiRoute(
return new Response("Run not found", { status: 404 });
}

const targetId =
const targetRun =
params.target === "self"
? run.friendlyId
? run
: params.target === "parent"
? run.parentTaskRun?.friendlyId
: run.rootTaskRun?.friendlyId;
? run.parentTaskRun
: run.rootTaskRun;

if (!targetId) {
if (!targetRun?.friendlyId) {
return new Response("Target not found", { status: 404 });
}

const targetId = targetRun.friendlyId;

// Handle HEAD request to get last chunk index
if (request.method !== "HEAD") {
return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 });
Expand All @@ -178,7 +194,11 @@ const loader = createLoaderApiRoute(
const clientId = request.headers.get("X-Client-Id") || "default";
const streamVersion = request.headers.get("X-Stream-Version") || "v1";

const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion);
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
streamVersion,
{ run: { streamBasinName: targetRun.streamBasinName ?? null } }
);

const lastChunkIndex = await realtimeStream.getLastChunkIndex(
targetId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const { action } = createActionApiRoute(
friendlyId: true,
completedAt: true,
realtimeStreamsVersion: true,
streamBasinName: true,
},
});

Expand All @@ -68,7 +69,8 @@ const { action } = createActionApiRoute(

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

// Build the input stream record (raw user data, no wrapper)
Expand Down Expand Up @@ -155,7 +157,8 @@ const loader = createLoaderApiRoute(

const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

// Read from the internal S2 stream name (prefixed to avoid user stream collisions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {

const realtimeStream = getRealtimeStreamInstance(
run.runtimeEnvironment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ export class RunEngineTriggerTaskService {
bulkActionId: body.options?.bulkActionId,
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
streamBasinName: environment.organization.streamBasinName,
debounce: body.options?.debounce,
annotations,
// When debouncing with triggerAndWait, create a span for the debounced trigger
Expand Down
Loading
Loading