Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/per-org-stream-basins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Per-org S2 stream basins with plan-tied retention (free 7d / hobby 30d / pro 365d), gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution via S2 basin metrics.
28 changes: 28 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,34 @@ const EnvironmentSchema = z
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
/// Per-org basin migration. When "true", the webapp provisions a
/// dedicated S2 basin per org with plan-tied retention and stamps
/// `streamBasinName` on new TaskRun / Session rows. OSS / s2-lite
/// installs leave this off and keep using the single basin defined
/// by `REALTIME_STREAMS_S2_BASIN`.
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"),
/// Naming pattern for per-org basins: `{prefix}-{env}-org-{slug}`
/// e.g. `triggerdotdev-prod-org-acme-corp`. Cluster + tier shorthand
/// — kept short to stay under S2's basin-name length limit.
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
/// Default retention for new basins (S2 duration syntax: 7d / 30d / 1y).
/// Used at org-create and as the fallback when no plan-specific
/// retention is resolved. Operators that don't run a billing API
/// only need this one.
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: z.string().default("30d"),
/// Plan-specific retention overrides — only consulted by the
/// optional `streamBasinRetentionByPlan` shim. Operators that
/// don't map plans to retention (OSS, self-hosted) can ignore
/// these and rely on the default above.
REALTIME_STREAMS_BASIN_RETENTION_FREE: z.string().default("7d"),
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: z.string().default("30d"),
REALTIME_STREAMS_BASIN_RETENTION_PRO: z.string().default("365d"),
/// Storage class applied to per-org basins at create time.
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
/// `delete_on_empty_min_age` applied to per-org basins. Streams
/// that go empty for this long are reaped automatically.
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: z.string().default("1h"),
Comment thread
ericallam marked this conversation as resolved.
Outdated
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),

Expand Down
23 changes: 23 additions & 0 deletions apps/webapp/app/models/organization.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { env } from "~/env.server";
import { featuresForUrl } from "~/features.server";
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
import { logger } from "~/services/logger.server";
import { provisionBasinForOrg } from "~/services/realtime/streamBasinProvisioner.server";
export type { Organization };

const nanoid = customAlphabet("1234567890abcdef", 4);
Expand Down Expand Up @@ -82,6 +84,27 @@ export async function createOrganization(
},
});

// Provision the org's S2 basin synchronously so the very first run
// gets `streamBasinName` stamped via the existing org read. New orgs
// get the default retention; the plan-change path updates retention
// later if the operator runs a billing-aware install. Soft-fail on
// S2 errors so a transient outage doesn't block signup — the
// backfill reconciler picks up any org left with `streamBasinName: null`.
// No-op when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false` (OSS mode).
try {
await provisionBasinForOrg({
id: organization.id,
slug: organization.slug,
streamBasinName: organization.streamBasinName,
// No `retention` — provisioner uses `defaultRetention()`.
});
} catch (error) {
logger.warn("[createOrganization] streamBasin provisioning failed; backfill will retry", {
orgId: organization.id,
error: error instanceof Error ? error.message : String(error),
});
}

return { ...organization };
}

Expand Down
165 changes: 165 additions & 0 deletions apps/webapp/app/routes/admin.api.v1.stream-basins.backfill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import { isPerOrgBasinsEnabled } from "~/services/realtime/streamBasinProvisioner.server";
import { commonWorker } from "~/v3/commonWorker.server";
import { logger } from "~/services/logger.server";

/**
* One-shot backfill that enqueues `v3.provisionStreamBasinForOrg` for
* every org with `streamBasinName: null`. Idempotent — re-running picks
* up only the orgs that haven't been provisioned yet, and the worker
* job itself is also idempotent (the provisioner short-circuits if the
* org column is already set).
*
* - Admin auth via `requireAdminApiRequest` (PAT in `Authorization`).
* - Refuses to run when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false`
* so OSS / s2-lite installs can't accidentally trigger basin
* creation against a misconfigured backend.
* - `dryRun=true` (default false) returns the count without enqueueing.
* - `limit` (default 1000, max 10000) caps a single invocation. Run
* again to process more — the column filter naturally walks the
* queue forward each call.
* - Each job is keyed `provisionStreamBasin:<orgId>` so concurrent
* backfill calls converge to one job per org instead of duplicating.
*
* Run from a shell:
* curl -X POST -H "Authorization: Bearer $PAT" \
* "https://api.trigger.dev/admin/api/v1/stream-basins/backfill?limit=200&dryRun=true"
*/

const BodySchema = z
.object({
dryRun: z.boolean().optional().default(false),
limit: z.number().int().min(1).max(10_000).optional().default(1000),
})
.strict();

type BackfillResponse = {
ok: true;
dryRun: boolean;
enqueued: number;
pending: number;
remaining: number;
orgIds: string[];
};

export async function action({ request }: ActionFunctionArgs) {
await requireAdminApiRequest(request);

if (!isPerOrgBasinsEnabled()) {
return json(
{
ok: false,
error:
"Per-org stream basins are disabled. Set REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true before running the backfill.",
},
{ status: 400 }
);
}

// `application/json` POST body — empty body falls back to defaults so
// a parameterless POST does the right thing for the default backfill.
let parsed: z.infer<typeof BodySchema>;
try {
const text = await request.text();
const raw = text.length > 0 ? JSON.parse(text) : {};
const result = BodySchema.safeParse(raw);
if (!result.success) {
return json({ ok: false, error: result.error.flatten() }, { status: 400 });
}
parsed = result.data;
} catch {
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
}

const { dryRun, limit } = parsed;

// Page candidate orgs. Ordered by createdAt so re-runs walk the queue
// forward predictably; deletedAt filter avoids resurrecting orgs.
const candidates = await prisma.organization.findMany({
where: {
streamBasinName: null,
deletedAt: null,
},
orderBy: { createdAt: "asc" },
take: limit,
select: { id: true },
});

// Total count of remaining nulls (for progress reporting).
const remainingTotal = await prisma.organization.count({
where: { streamBasinName: null, deletedAt: null },
});

if (dryRun) {
const response: BackfillResponse = {
ok: true,
dryRun: true,
enqueued: 0,
pending: candidates.length,
remaining: Math.max(0, remainingTotal - candidates.length),
orgIds: candidates.map((o) => o.id),
};
return json(response);
}

// Enqueue one job per org. Per-org dedupe key collapses concurrent
// backfill calls into a single pending job, and a job that's already
// run (basin set) is a no-op on the worker side.
let enqueued = 0;
for (const org of candidates) {
try {
await commonWorker.enqueue({
job: "v3.provisionStreamBasinForOrg",
payload: { orgId: org.id },
id: `provisionStreamBasin:${org.id}`,
});
enqueued += 1;
} catch (error) {
logger.error("[stream-basins-backfill] enqueue failed", {
orgId: org.id,
error: error instanceof Error ? error.message : String(error),
});
}
}

const response: BackfillResponse = {
ok: true,
dryRun: false,
enqueued,
pending: candidates.length,
remaining: Math.max(0, remainingTotal - enqueued),
orgIds: candidates.map((o) => o.id),
};

logger.info("[stream-basins-backfill] enqueued provisioning jobs", {
enqueued,
candidates: candidates.length,
remaining: response.remaining,
});

return json(response);
}

// GET returns the current state without doing anything — useful for
// monitoring "is the backfill done yet?" from a dashboard / curl.
export async function loader({ request }: ActionFunctionArgs) {
await requireAdminApiRequest(request);

const totalOrgs = await prisma.organization.count({ where: { deletedAt: null } });
const provisioned = await prisma.organization.count({
where: { deletedAt: null, NOT: { streamBasinName: null } },
});
const remaining = totalOrgs - provisioned;

return json({
ok: true,
perOrgBasinsEnabled: isPerOrgBasinsEnabled(),
totalOrgs,
provisioned,
remaining,
completion: totalOrgs === 0 ? 1 : provisioned / totalOrgs,
});
}
72 changes: 72 additions & 0 deletions apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import {
isPerOrgBasinsEnabled,
reconfigureBasinForOrg,
} from "~/services/realtime/streamBasinProvisioner.server";
import { commonWorker } from "~/v3/commonWorker.server";

/**
* Admin trigger for `v3.reconfigureStreamBasinForOrg`. The plan-change
* path in `setPlan` enqueues this automatically when billing is wired;
Comment thread
ericallam marked this conversation as resolved.
Outdated
* this route exists for ops + e2e testing.
*
* - Default (`{ orgId }`): enqueues the worker job which resolves the
* retention from the org's plan and PATCHes the basin to match.
* No-op when billing isn't configured (OSS).
* - With `retention`: bypasses the billing lookup and runs reconfigure
* inline against the given duration string (e.g. `"7d"`, `"30d"`,
* `"365d"`, `"1y"`). Useful for validating the PATCH wire shape
* end-to-end and as a manual override (e.g. enterprise contracts).
*/
const BodySchema = z
.object({
orgId: z.string(),
retention: z.string().optional(),
})
Comment thread
ericallam marked this conversation as resolved.
Outdated
.strict();

export async function action({ request }: ActionFunctionArgs) {
await requireAdminApiRequest(request);

if (!isPerOrgBasinsEnabled()) {
return json(
{ ok: false, error: "Per-org stream basins are disabled." },
{ status: 400 }
);
}

let parsed: ReturnType<typeof BodySchema.safeParse>;
try {
const text = await request.text();
const raw = text.length > 0 ? JSON.parse(text) : {};
parsed = BodySchema.safeParse(raw);
} catch {
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
}
if (!parsed.success) {
return json({ ok: false, error: parsed.error.flatten() }, { status: 400 });
}

if (parsed.data.retention) {
// Direct, synchronous reconfigure with the explicit retention.
// Skips the worker queue + billing lookup so the PATCH is
// verifiable in the response. Errors surface as 500.
await reconfigureBasinForOrg(parsed.data.orgId, parsed.data.retention);
return json({
ok: true,
mode: "inline",
orgId: parsed.data.orgId,
retention: parsed.data.retention,
});
}

await commonWorker.enqueue({
job: "v3.reconfigureStreamBasinForOrg",
payload: { orgId: parsed.data.orgId },
id: `reconfigureStreamBasin:${parsed.data.orgId}`,
});

return json({ ok: true, mode: "queued", enqueued: parsed.data.orgId });
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const { action, loader } = createActionApiRoute(
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
streamBasinName: true,
},
});

Expand Down Expand Up @@ -98,7 +99,8 @@ const { action, loader } = createActionApiRoute(
try {
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
run.realtimeStreamsVersion,
{ run }
);

const records = await realtimeStream.readRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const { action, loader } = createActionApiRoute(
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
streamBasinName: true,
},
});

Expand Down Expand Up @@ -128,7 +129,10 @@ const { action, loader } = createActionApiRoute(
// hardcode "v2", so the race-check reader has to match.
// Don't fall through to the run's own `realtimeStreamsVersion`,
// which only describes the run's run-scoped streams.
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
run,
session: maybeSession,
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
ericallam marked this conversation as resolved.
Comment thread
ericallam marked this conversation as resolved.

if (realtimeStream instanceof S2RealtimeStreams) {
const records = await realtimeStream.readSessionStreamRecords(
Expand Down
8 changes: 8 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ const { action } = createActionApiRoute(
runtimeEnvironmentId: authentication.environment.id,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
// Stamp the org's S2 basin so realtime reads on this
// session's `.in/.out` channels resolve without joining
// Organization. Null until per-org basins are provisioned.
streamBasinName: authentication.environment.organization.streamBasinName,
},
update: { triggerConfig: triggerConfigJson },
});
Expand All @@ -186,6 +190,10 @@ const { action } = createActionApiRoute(
runtimeEnvironmentId: authentication.environment.id,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
// Stamp the org's S2 basin so realtime reads on this
// session's `.in/.out` channels resolve without joining
// Organization. Null until per-org basins are provisioned.
streamBasinName: authentication.environment.organization.streamBasinName,
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ const { action, loader } = createActionApiRoute(
);
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session,
});

if (!(realtimeStream instanceof S2RealtimeStreams)) {
return json(
Expand Down
Loading
Loading