Conversation
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntroduces per-organization S2 stream basins: adds nullable Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Move from a single shared basin with a fixed retention to per-org
basins with retention tied to the org's billing plan (free 7d /
hobby 30d / pro 365d). Stops S2 from deleting streams out from
under live chat sessions when basin retention fires before the
chat ends, unlocks per-org cost attribution via S2's basin metrics
API, and narrows the blast radius of any leaked scoped token.
OSS / s2-lite installs are unaffected: provisioning is gated by
`REALTIME_STREAMS_PER_ORG_BASINS_ENABLED` and the read precedence
falls back to the global basin env var when an entity has no
stamped basin.
Schema: nullable `streamBasinName` on Organization / TaskRun /
Session (migration `20260504071227_add_stream_basin_name`).
Stamped at provisioning / trigger / session-create. Reads resolve
via `run.streamBasinName ?? session.streamBasinName ?? legacy`.
Provisioner: new `streamBasinProvisioner.server.ts` creates basins
via S2's `POST /v1/basins`, reconfigures via `PATCH /v1/basins/{name}`,
maps plan codes to retention durations. Idempotent on race /
pre-existing basin (treats 409 as success). Org create wires it
synchronously with soft-fail. Plan changes in `setPlan` enqueue
`v3.reconfigureStreamBasinForOrg` next to existing billing-cache
invalidations.
Worker jobs: `v3.provisionStreamBasinForOrg` (backfill / retry)
and `v3.reconfigureStreamBasinForOrg` (plan change) on commonWorker.
Read path: `getRealtimeStreamInstance` becomes a factory keyed by
`{ run, session }` basin context; stream prefix drops `org/{orgId}`
segment for per-org basins (basin already isolates) and keeps it
for the legacy basin (orgs share). Access-token cache key includes
basin to prevent cross-contamination.
Admin routes:
POST /admin/api/v1/stream-basins/backfill — fan out provisioning
jobs for every org with `streamBasinName: null`. dryRun + limit
flags. GET returns progress (`provisioned / totalOrgs`).
POST /admin/api/v1/stream-basins/reconfigure — enqueue worker job
(queued mode) or run inline with `tier` override (escape hatch).
Run-engine: `streamBasinName` added to `TriggerParams` (optional);
the V2 trigger path stamps it onto the new TaskRun. No changes to
`MinimalAuthenticatedEnvironment` — stamping is a trigger-time
concern, not a queue concern.
Verified end-to-end with chat.agent locally: backfill creates
basins with right retention (7d free), reconfigure flips retention
via plan change (30d hobby / 365d pro), chat streams land in the
per-org basin, zero leakage to the global fallback basin, multi-turn
reuses the same in/out stream pair.
599803f to
a1d4564
Compare
- reconfigure admin route: guard `JSON.parse` with try/catch + empty-body
check so a malformed POST returns 400 instead of an unhandled 500
(mirror of the backfill route).
- session-streams.wait race-check: select `streamBasinName` on the run
and pass `{ run, session }` to `getRealtimeStreamInstance` so the
resolver picks up the run's stamped basin when the session row is
unavailable.
- streamBasinProvisioner: 10s `AbortSignal.timeout()` on both `s2CreateBasin`
and `s2ReconfigureBasin` so the synchronous org-create path can't hang
signup forever on a slow/unresponsive S2.
- commonWorker basin handlers: throw when `getCurrentPlan` returns
undefined (billing API failure) so redis-worker retries instead of
silently defaulting to "free" tier — a reconfigure landing during a
transient billing outage would otherwise clip a pro org's retention
from 365d to 7d.
The provisioner is now purely retention-string-driven: callers pass a
duration like "30d" and it does the S2 round-trip. No tier types, no
plan-name matching, no billing imports.
The plan-aware mapping moves into a new
`streamBasinRetentionByPlan.server.ts` shim that's the only file in the
webapp that knows about plan codes. Callers that resolve retention
from a plan (the worker's backfill / reconfigure handlers) import the
shim; callers that just want a default (the org-create path) call the
provisioner without `retention`.
Also addresses two review concerns:
- `basinNameForOrg` now throws when the configured prefix + env-name
leave zero or negative budget for the org slug. Without the guard a
too-long prefix would produce `slice(0, 0) = ""` for every org and
silently collide their basins via S2's idempotent-create path.
- The plan-code → retention mapping uses an exact-match switch instead
of substring matching. Substring matching against future plan codes
could grant the wrong retention (e.g. `"approved"` matching `"pro"`).
The known set is small and explicit; new plan codes go in the switch
at launch.
Net surface change:
- `streamBasinProvisioner.server.ts`: drops `StreamBasinTier`,
`planTierFor`, `retentionFor` exports. Adds `defaultRetention()`.
`provisionBasinForOrg` takes `{ retention?: string }` instead of
`{ tier?: StreamBasinTier }`. `reconfigureBasinForOrg` takes a
retention string instead of a tier.
- `streamBasinRetentionByPlan.server.ts` (new): exports
`resolveRetentionForOrg(orgId)` and `retentionForPlanCode(code)`.
- `commonWorker.server.ts`: handlers call the shim, hand a string to
the provisioner.
- Admin reconfigure route: replaces the `tier` body field with a
direct `retention` duration override.
- Org create: no longer passes `tier: "free"`; provisioner uses the
default.
- New env var `REALTIME_STREAMS_BASIN_DEFAULT_RETENTION` (default
`30d`). Existing per-plan vars are still consulted by the shim
only.
Verified end-to-end with chat.agent locally — fresh chat lands in the
per-org basin, multi-turn behaves the same, no leakage to the global
fallback.
- Use `org.id` (cuid, fixed-length, unique-by-construction) as the basin-name suffix instead of a truncated `org.slug`. The slug approach could silently collide two orgs whose slugs share a prefix past the truncation point, since the create call treats S2's 409 as success — a real cross-tenant isolation risk. - `resolveRetentionForOrg` now distinguishes "billing not configured" from "billing call failed". OSS / self-hosted installs (no billing client) get `defaultRetention()` and the worker job converges; cloud installs that experience a transient billing failure throw and get retried by redis-worker. Previously every install without billing hit a permafail loop. - `reconfigureBasinForOrg` throws when no S2 access token is configured instead of silently returning, so a misconfigured cloud install surfaces as a worker failure rather than stale retention. - Duration env vars (`*_RETENTION*`, `*_DELETE_ON_EMPTY_MIN_AGE`) validated at boot via a `durationString()` Zod schema, so a misconfigured value fails fast at startup instead of at first basin operation. - Admin reconfigure route's `retention` body field validated against the same duration shape — bad input is now a clean 400 rather than a 500 from `parseDuration`. - Extract duration parsing into a shared `duration.server.ts` so the env validator and the provisioner share one source of truth. Verified end-to-end with chat.agent locally — fresh chat lands in the per-org basin, no leakage to the global fallback.
Free orgs share the global stream basin (the existing legacy fallback
path); paid orgs get a dedicated per-org basin with retention tied to
their tier. Cleaner story, much smaller S2 footprint, and basin
existence becomes a real tier benefit rather than a default for
everyone.
A single `v3.reconcileStreamBasinForOrg` worker job handles every
plan transition idempotently:
free → paid: provision a new basin, stamp `Organization.streamBasinName`.
paid → paid: reconfigure retention (tier-change). S2 retention only
takes effect on new streams, but that's fine — old
streams age out on their original retention.
paid → free: null `Organization.streamBasinName`. Future runs/sessions
for this org route through the shared global basin via
the existing read-precedence fallback. The per-org basin
lingers; existing streams there respect their original
retention until they age out.
free → free: no-op.
Replaces the previous `provisionStreamBasinForOrg` /
`reconfigureStreamBasinForOrg` job pair so callers don't have to
choose the right job for the transition. `setPlan` enqueues
`reconcile` from all three plan-changed branches; the admin backfill
route enqueues `reconcile` for every non-deleted org (idempotent — the
worker decides per-org what to do).
Org create no longer provisions synchronously — new orgs start free
and use the shared basin until their first paid upgrade.
Verified locally: backfill correctly deprovisioned 4 free orgs (column
nulled, basins left intact) and kept the 1 hobby-tier org's basin.
A fresh chat for a free org streams into the shared basin under the
legacy prefix `org/{orgId}/env/.../sessions/{chatId}/{io}` with no
new streams in the old per-org basin.
a38eb4f to
fc88017
Compare
Two correctness fixes flagged in PR review.
- Session-stream race-check resolves basin from `{ session }` only.
The append-side writer in
`realtime.v1.sessions.$session.$io.append.ts` passes only
`{ session }`, and `resolveStreamBasin` prefers `run` over `session`
when both are present. During the migration window
`run.streamBasinName` and `session.streamBasinName` can differ —
writes land in the session's basin, so the race-check has to read
from the same one or it falls through to the redis path silently.
- Backfill admin route now supports cursor pagination via `afterOrgId`
+ `nextAfterOrgId`, so deployments with more orgs than `limit` (max
10k per call) can actually page through. `remaining` now counts orgs
strictly past the cursor returned, matching the dry-run semantics.
Without this guard `reconcileBasinForOrg` would still call into
`provisionBasinForOrg` / `reconfigureBasinForOrg`, which both no-op
behind the feature flag, but the reconciler then logged
"provisioned (paid upgrade)" and returned `{ kind: "provisioned" }`.
Misleading on a cloud install where billing is wired but per-org
basins are off — the logs claim work that didn't happen, and we paid
for a billing API round-trip we couldn't act on.
Bail at the top with `{ kind: "skipped", reason: "feature-disabled" }`
so the result and the logs match the actual no-op behaviour.
PUT /realtime/v1/sessions/:session/:io and the SSE GET loader on the same path are row-optional — `:session` may be a `chatId` (externalId) that hasn't been upserted yet. When the row is missing, both used to fall through `resolveStreamBasin` to the legacy global basin. If the row was then created with a per-org basin stamp, follow-up appends and SSE subscribes resolved to per-org while the PUT-returned headers still pointed at legacy — caller writes via those headers landed in the wrong place. Resolve via the org's current basin when the row is absent. A fresh session row would be stamped with that same basin at create time, so all subsequent ops converge. Pre-migration rows (row exists, column null) keep their legacy fallback because `organization` is only passed in the no-row branch — `session.streamBasinName === null` still falls through to the env var, not to the org column. Verified by curl: PUT against a fresh externalId for an org with a per-org basin returns `X-S2-Basin: triggerdotdev-dev-org-<orgId>`; same call for a free-org key still returns the legacy basin.
…paths JSDoc referenced `v3.reconfigureStreamBasinForOrg` (a job that doesn't exist — the actual one is `v3.reconcileStreamBasinForOrg`) and didn't make clear that the default path runs the full reconciler, which can deprovision a basin if the org is now on a free plan. Spell that out so an operator hitting this route by hand isn't surprised.
Pass over the basin-related code to drop running commentary and cloud-product-specific phrasing. No behaviour change. - streamBasinProvisioner.server.ts: shorter module + helper docblocks; drop stale "synchronous org-create call site" rationale that no longer applies after the paid-only refactor. - streamBasinRetentionByPlan.server.ts: tighter module doc; collapse the reconcile-transitions narrative into a short table; drop the "cloud-flavored" framing. - v1StreamsGlobal.server.ts: short doc on resolveStreamBasin; drop references to specific operational state. - env.server.ts: terse one-liner per env var; drop sample basin name example. - platform.v3.server.ts, commonWorker.server.ts, runEngine/types.ts + index.ts, triggerTask.server.ts, api.v1.sessions.ts, the two admin routes and the two row-optional session-channel handlers: drop inline rationale paragraphs that re-explained the reconciler / read-precedence chain at every call site.
The cloud billing app now drives basin lifecycle operations via a
single webapp admin endpoint instead of the webapp enqueuing a
reconcile worker that called back into billing to resolve plan +
retention. Plan vocabulary now lives in cloud
(`Limits.streamBasinRetention`); S2 access stays in the webapp.
New: POST /admin/api/v1/orgs/:orgId/stream-basin
- { action: "ensure", retention } — provision or PATCH retention
- { action: "deprovision" } — null the column
Helpers: ensureBasinForOrg / deprovisionBasinForOrg in the
provisioner.
Removed:
- streamBasinRetentionByPlan.server.ts (plan vocabulary)
- v3.reconcileStreamBasinForOrg worker job
- enqueueStreamBasinReconcile from the three setPlan branches
- admin.api.v1.stream-basins.{backfill,reconfigure}.ts (replaced by
the per-org sync endpoint)
- REALTIME_STREAMS_BASIN_RETENTION_FREE/HOBBY/PRO env vars
The .server-changes entry stays accurate — feature behaviour is
unchanged from a customer's perspective.
When the session row is absent (externalId addressing before the row is upserted), the race-check resolved to the legacy basin while the writer side resolves to the org's basin. Fall back to the org so both sides land in the same place — matches the PUT/GET sister routes.
Summary
Move from a single shared S2 basin to per-org basins with retention tied to the org's billing plan. Stops S2 from deleting streams out from under live chat sessions when basin retention fires before the chat ends, and unlocks per-org cost attribution.
OSS / s2-lite installs are unaffected: provisioning is gated by
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED(defaultfalse), and the read precedence falls back to the global basin env var when an entity has no stamped basin.Design
Three nullable
streamBasinNamecolumns (Organization,TaskRun,Session) plus a provisioner that idempotently creates the basin and reconfigures retention on plan changes. The trigger and session-create paths stamp the org's basin onto new rows; the realtime read path picks the basin from the entity context.Admin routes back-fill existing orgs and force-reconfigure a single org.
Test plan
pnpm run typecheck --filter webapp --filter @internal/run-engine