Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
74fd76d
Implement organization-scoped ClickHouse instances
matt-aitken Mar 26, 2026
d0d1f0e
Better replication performance
matt-aitken Mar 26, 2026
94c9a83
Removed dynamic imports
matt-aitken Mar 26, 2026
49bf0f7
otlpExporter.server reverted to main
matt-aitken Mar 27, 2026
795d603
Switch to a DataStore registry
matt-aitken Mar 31, 2026
adc95ea
Admin page for adding data stores
matt-aitken Apr 1, 2026
99cf100
New admin page, lots of improvements to make it more robust and testable
matt-aitken Apr 2, 2026
2ccc5e6
Use the clickhouseFactory directly. WIP on new event repository
matt-aitken Apr 2, 2026
d91e90c
Errors switched to using org-specific clickhouses
matt-aitken Apr 2, 2026
e024d3b
RunPresenter: use org event repository
matt-aitken Apr 2, 2026
606420b
SpanPresenter: use org event repository
matt-aitken Apr 2, 2026
eb661b4
Move metrics sending to the event repository
matt-aitken Apr 4, 2026
dd233f3
We don't need separate stores for metrics
matt-aitken Apr 4, 2026
9febc1a
Back to main runReplicationService
matt-aitken Apr 4, 2026
4928d0c
Run Replication using the factory
matt-aitken Apr 6, 2026
3dacce7
Get the tracer from the provider
matt-aitken Apr 7, 2026
16761c1
Use resolveEventRepositoryForStore everywhere
matt-aitken Apr 9, 2026
b9dfc43
Admin data stores editing
matt-aitken Apr 14, 2026
ef21148
Reload the data store every minute
matt-aitken Apr 14, 2026
3a1d6f1
Work with Postgres EventRepository for self-hosters
matt-aitken Apr 16, 2026
8b2c5db
Error fingerprint should use the logs client
matt-aitken Apr 16, 2026
8fa3153
Retry the initial boot using p-retry
matt-aitken Apr 16, 2026
e837a1b
Fix Devin Review bugs: postgres event repository fallback and error p…
devin-ai-integration[bot] Apr 16, 2026
7f473c6
Fall back to Postgres eventRepository for non-ClickHouse stores
devin-ai-integration[bot] Apr 17, 2026
8f3c622
Move Postgres event repository fallback out of ClickHouse factory
devin-ai-integration[bot] Apr 17, 2026
00e4cfb
Make OrganizationDataStoresRegistry deterministic on overlap
devin-ai-integration[bot] Apr 17, 2026
506cf5a
try/catch getting the clickhouse client
matt-aitken Apr 17, 2026
8c789ab
Better handling for no org id on task run
matt-aitken Apr 17, 2026
ff6f53f
Test for deterministic data store resolution if there are multiple en…
matt-aitken Apr 17, 2026
6752197
Separate the clickhouseFactoryInstance so the db client isn't pulled …
matt-aitken Apr 17, 2026
635ab84
Use the same ClickHouse settings for org specific clients
matt-aitken Apr 17, 2026
e8282a0
Update apps/webapp/app/services/dataStores/organizationDataStoresRegi…
matt-aitken Apr 17, 2026
f235b8e
Merge remote-tracking branch 'origin/main' into claude/merge-clickhou…
claude May 20, 2026
d9506f7
Route sessions replication via ClickhouseFactory, gate replication se…
matt-aitken May 22, 2026
d03b1fa
Fix CI: stale clickhouseInstance imports + Devin events-default routing
matt-aitken May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .cursor/mcp.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{
"mcpServers": {}
"mcpServers": {
"linear": {
"url": "https://mcp.linear.app/mcp"
}
}
}
6 changes: 6 additions & 0 deletions .server-changes/organization-scoped-clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => {
});
```

## Code Style

### Imports

**Prefer static imports over dynamic imports.** Only use dynamic `import()` when:
- Circular dependencies cannot be resolved otherwise
- Code splitting is genuinely needed for performance
- The module must be loaded conditionally at runtime

Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead.

## Changesets and Server Changes

When modifying any public package (`packages/*` or `integrations/*`), add a changeset:
Expand Down
42 changes: 5 additions & 37 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,12 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
// Touch the sessions replication singleton at entry so it boots deterministically
// on webapp startup. The singleton's initializer wires start (gated on
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
// runsReplicationInstance.
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
// Capture a non-nullable reference so the shutdown closure below
// doesn't need to re-null-check (TS narrowing doesn't follow through
// an inner function scope).
const replicator = sessionsReplicationInstance;
replicator
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

// Wrap the async shutdown in a sync handler that catches rejections —
// SIGTERM/SIGINT fire during process teardown, and an unhandled
// promise rejection from `_replicationClient.stop()` there would
// bubble up past the process exit. Matches the pattern in
// dynamicFlushScheduler.server.ts.
const shutdownSessionsReplication = () => {
replicator.shutdown().catch((error) => {
console.error("🗃️ Sessions replication service shutdown error", {
error,
});
});
};
signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
signalsEmitter.on("SIGINT", shutdownSessionsReplication);
}
void sessionsReplicationInstance;

const ABORT_DELAY = 30000;

Expand Down
16 changes: 14 additions & 2 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,10 @@ const EnvironmentSchema = z
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),
OBJECT_STORE_DEFAULT_PROTOCOL: z
.string()
.regex(/^[a-z0-9]+$/)
.optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
Expand Down Expand Up @@ -1489,9 +1492,18 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

// Organization data stores registry
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(60 * 1000), // 1 minute

// LLM cost tracking
LLM_COST_TRACKING_ENABLED: BoolEnv.default(true),
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_RELOAD_CHANNEL: z.string().default("llm-registry:reload"),
LLM_PRICING_RELOAD_DEBOUNCE_MS: z.coerce.number().int().default(1000),
// Whether to subscribe this process to the LLM_PRICING_RELOAD_CHANNEL.
Expand Down
36 changes: 21 additions & 15 deletions apps/webapp/app/presenters/v3/AgentListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import {
type RuntimeEnvironmentType,
type TaskTriggerSource,
} from "@trigger.dev/database";
import { ClickHouse } from "@internal/clickhouse";
import { type ClickHouse } from "@internal/clickhouse";
import { z } from "zod";
import { $replica } from "~/db.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { singleton } from "~/utils/singleton";
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";

Expand All @@ -24,10 +24,7 @@ export type AgentActiveState = {
};

export class AgentListPresenter {
constructor(
private readonly clickhouse: ClickHouse,
private readonly _replica: PrismaClientOrTransaction
) {}
constructor(private readonly _replica: PrismaClientOrTransaction) {}

public async call({
organizationId,
Expand All @@ -40,6 +37,11 @@ export class AgentListPresenter {
environmentId: string;
environmentType: RuntimeEnvironmentType;
}) {
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(
organizationId,
"standard"
);

const currentWorker = await findCurrentWorkerFromEnvironment(
{
id: environmentId,
Expand Down Expand Up @@ -89,20 +91,21 @@ export class AgentListPresenter {
}

// All queries are deferred for streaming
const activeStates = this.#getActiveStates(environmentId, slugs);
const conversationSparklines = this.#getConversationSparklines(environmentId, slugs);
const costSparklines = this.#getCostSparklines(environmentId, slugs);
const tokenSparklines = this.#getTokenSparklines(environmentId, slugs);
const activeStates = this.#getActiveStates(clickhouse, environmentId, slugs);
const conversationSparklines = this.#getConversationSparklines(clickhouse, environmentId, slugs);
const costSparklines = this.#getCostSparklines(clickhouse, environmentId, slugs);
const tokenSparklines = this.#getTokenSparklines(clickhouse, environmentId, slugs);

return { agents, activeStates, conversationSparklines, costSparklines, tokenSparklines };
}

/** Count runs currently executing vs suspended per agent */
async #getActiveStates(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, AgentActiveState>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentActiveStates",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -140,10 +143,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of conversation (run) count per agent */
async #getConversationSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentConversationSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -172,10 +176,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of LLM cost per agent */
async #getCostSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentCostSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -203,10 +208,11 @@ export class AgentListPresenter {

/** 24h hourly sparkline of total tokens per agent */
async #getTokenSparklines(
clickhouse: ClickHouse,
environmentId: string,
slugs: string[]
): Promise<Record<string, number[]>> {
const queryFn = this.clickhouse.reader.query({
const queryFn = clickhouse.reader.query({
name: "agentTokenSparklines",
query: `SELECT
task_identifier,
Expand Down Expand Up @@ -284,5 +290,5 @@ export class AgentListPresenter {
export const agentListPresenter = singleton("agentListPresenter", setupAgentListPresenter);

function setupAgentListPresenter() {
return new AgentListPresenter(clickhouseClient, $replica);
return new AgentListPresenter($replica);
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig
import assertNever from "assert-never";
import { z } from "zod";
import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { logger } from "~/services/logger.server";
import { CoercedDate } from "~/utils/zod";
import { ServiceValidationError } from "~/v3/services/baseService.server";
Expand Down Expand Up @@ -269,7 +269,8 @@ export class ApiRunListPresenter extends BasePresenter {
options.machines = searchParams["filter[machine]"];
}

const presenter = new NextRunListPresenter(this._replica, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const presenter = new NextRunListPresenter(this._replica, clickhouse);

logger.debug("Calling RunListPresenter", { options });

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";
Expand All @@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter {
Object.fromEntries(new URL(request.url).searchParams)
);

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
11 changes: 7 additions & 4 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
import { prisma, type PrismaClient } from "~/db.server";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { getUsername } from "~/utils/username";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -145,10 +145,13 @@ export class RunPresenter {
};
}

const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
const repository = await getEventRepositoryForStore(
run.taskEventStore,
run.runtimeEnvironment.organizationId
);

// get the events
let traceSummary = await eventRepository.getTraceSummary(
let traceSummary = await repository.getTraceSummary(
getTaskEventStoreTableForRun(run),
run.runtimeEnvironment.id,
run.traceId,
Expand Down Expand Up @@ -272,7 +275,7 @@ export class RunPresenter {
overridesBySpanId: traceSummary.overridesBySpanId,
linkedRunIdBySpanId,
},
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
maximumLiveReloadingSetting: repository.maximumLiveReloadingSetting,
};
}
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { BasePresenter } from "./basePresenter.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { type PrismaClient } from "@trigger.dev/database";
import { timeFilters } from "~/components/runs/v3/SharedFilters";

Expand Down Expand Up @@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter {
}: TagListOptions) {
const hasFilters = Boolean(name?.trim());

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
11 changes: 7 additions & 4 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { WaitpointPresenter } from "./WaitpointPresenter.server";
import { engine } from "~/v3/runEngine.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
import { safeJsonParse } from "~/utils/json";
import {
Expand All @@ -32,6 +31,7 @@ import {
extractAIToolCallData,
extractAIEmbedData,
} from "~/components/runs/v3/ai";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";

export type PromptSpanData = {
slug: string;
Expand Down Expand Up @@ -132,14 +132,17 @@ export class SpanPresenter extends BasePresenter {

const { traceId } = parentRun;

const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);
const repository = await getEventRepositoryForStore(
parentRun.taskEventStore,
project.organizationId
);

const eventStore = getTaskEventStoreTableForRun(parentRun);

const run = await this.getRun({
eventStore,
traceId,
eventRepository,
eventRepository: repository,
spanId,
linkedRunId,
createdAt: parentRun.createdAt,
Expand All @@ -161,7 +164,7 @@ export class SpanPresenter extends BasePresenter {
projectId: parentRun.projectId,
createdAt: parentRun.createdAt,
completedAt: parentRun.completedAt,
eventRepository,
eventRepository: repository,
});

if (!span) {
Expand Down
Loading