Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apps/webapp/app/models/workflowListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { SchedulerSource } from ".prisma/client";
import { ScheduleSourceSchema } from "@trigger.dev/common-schemas";
import cronstrue from "cronstrue";
import type { DisplayProperties } from "internal-integrations";
import { github } from "internal-integrations";
import { airtable, github } from "internal-integrations";
import invariant from "tiny-invariant";
import { triggerLabel } from "~/components/triggers/triggerLabel";
import type { PrismaClient } from "~/db.server";
Expand Down Expand Up @@ -142,6 +142,11 @@ function triggerProperties(
externalSource.source
);
break;
case "airtable":
displayProperties = airtable.webhooks.displayProperties(
externalSource.source
);
break;
default:
throw new Error(`Unsupported service ${externalSource.service}`);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,14 @@
import type {
HandledExternalEventResponse,
NormalizedRequest,
} from "internal-integrations";
import { airtable, github } from "internal-integrations";
import type { PrismaClient } from "~/db.server";
import { prisma } from "~/db.server";
import { github } from "internal-integrations";
import type { ExternalSourceWithConnection } from "~/models/externalSource.server";
import type { NormalizedRequest } from "internal-integrations";
import { getAccessInfo } from "../accessInfo.server";
import { IngestEvent } from "../events/ingest.server";

type IgnoredEventResponse = {
status: "ignored";
reason: string;
};

type ErrorEventResponse = {
status: "error";
error: string;
};

type TriggeredEventResponse = {
status: "ok";
data: {
id: string;
payload: any;
event: string;
timestamp?: string;
context?: any;
};
};

export type HandledExternalEventResponse =
| TriggeredEventResponse
| IgnoredEventResponse
| ErrorEventResponse;

export class HandleExternalSource {
#prismaClient: PrismaClient;

Expand Down Expand Up @@ -66,22 +44,41 @@ export class HandleExternalSource {

switch (possibleEvent.status) {
case "ok": {
const { id, payload, event, timestamp, context } = possibleEvent.data;

const ingestService = new IngestEvent();

await ingestService.call(
{
id,
payload,
name: event,
type: externalSource.type,
service: serviceIdentifier,
timestamp,
context,
},
externalSource.organization
);
//todo try update ExternalSource, setting lastDelivery, increment version
const updatedSources =
await this.#prismaClient.externalSource.updateMany({
where: { id: externalSource.id, version: externalSource.version },
data: {
lastDelivery: possibleEvent.lastDelivery,
version: {
increment: 1,
},
},
});

//todo if updatedSources.count === 0, then just return

//todo move to another event called deliverWebhookEvents
//loop over the events and ingest them
for (let index = 0; index < possibleEvent.data.length; index++) {
const { id, payload, event, timestamp, context } =
possibleEvent.data[index];

const ingestService = new IngestEvent();

await ingestService.call(
{
id,
payload,
name: event,
type: externalSource.type,
service: serviceIdentifier,
timestamp,
context,
},
externalSource.organization
);
}

return true;
}
Expand Down Expand Up @@ -118,16 +115,58 @@ export class HandleExternalSource {
}
}

//todo Schema: add lastDelivery JSON column
//todo Schema: add version int column (default 0)
//todo pass lastDelivery into handleWebhookRequest (parsed by the webhook handler)

async #handleWebhook(
externalSource: NonNullable<ExternalSourceWithConnection>,
serviceIdentifier: string,
request: NormalizedRequest
): Promise<HandledExternalEventResponse> {
if (externalSource.connection === null) {
return {
status: "error",
error: `Could not handle webhook with no API connection. ExternalSource id: ${externalSource.id}`,
};
}

const accessInfo = await getAccessInfo(externalSource.connection);

if (accessInfo === undefined) {
return {
status: "error",
error: `Could not handle webhook with no AccessInfo. ExternalSource id: ${externalSource.id}. Connection id: ${externalSource.connection.id}`,
};
}

switch (serviceIdentifier) {
case "github": {
return github.webhooks.handleWebhookRequest({
return await github.webhooks.handleWebhookRequest({
accessInfo,
request,
secret: externalSource.secret ?? undefined,
lastDelivery: externalSource.lastDelivery,
});
}
case "airtable": {
const latestTriggerEvent =
await this.#prismaClient.triggerEvent.findFirst({
where: {
key: request.body.base.id,
},
orderBy: {
createdAt: "desc",
},
});

return await airtable.webhooks.handleWebhookRequest({
accessInfo,
request,
secret: externalSource.secret ?? undefined,
options: {
latestTriggerEvent,
},
});
}
default: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { APIConnection, ExternalSource } from ".prisma/client";
import type { AccessInfo } from "internal-integrations";
import { AccessInfo, airtable } from "internal-integrations";
import { github } from "internal-integrations";
import crypto from "node:crypto";
import type { PrismaClient } from "~/db.server";
Expand Down Expand Up @@ -131,6 +131,16 @@ export class RegisterExternalSource {
data
);
}
case "airtable": {
return airtable.webhooks.registerWebhook(
{
callbackUrl,
secret,
accessInfo,
},
data
);
}
default: {
throw new Error(
`Could not register webhook with unsupported service identifier: ${serviceIdentifier}`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { github } from "internal-integrations";
import { github, airtable } from "internal-integrations";
import type { WorkflowMetadata } from "internal-platform";
import { WorkflowMetadataSchema } from "internal-platform";
import type { PrismaClient } from "~/db.server";
Expand Down Expand Up @@ -241,6 +241,9 @@ export class RegisterWorkflow {
case "github": {
return github.webhooks.keyForSource(payload.trigger.source);
}
case "airtable": {
return airtable.webhooks.keyForSource(payload.trigger.source);
}
default: {
return payload.trigger.service;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "TriggerEvent" ADD COLUMN "key" TEXT;
1 change: 1 addition & 0 deletions apps/webapp/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ model TriggerEvent {
timestamp DateTime @default(now())
payload Json
context Json?
key String?

organization Organization? @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
organizationId String?
Expand Down
19 changes: 19 additions & 0 deletions examples/airtable/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"private": true,
"name": "@examples/airtable",
"version": "0.0.1",
"description": "Example trigger.dev workflow that uses the Airtable integration",
"dependencies": {
"@trigger.dev/integrations": "workspace:*",
"@trigger.dev/sdk": "workspace:*",
"zod": "^3.20.2"
},
"devDependencies": {
"@trigger.dev/tsconfig": "workspace:*",
"@types/node": "16",
"tsx": "^3.12.0"
},
"scripts": {
"dev": "tsx src/index.ts"
}
}
17 changes: 17 additions & 0 deletions examples/airtable/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Trigger } from "@trigger.dev/sdk";
import { airtable } from "@trigger.dev/integrations";

new Trigger({
id: "airtable-webhook-1",
name: "Airtable webhook: appBlf3KsalIQeMUo",
apiKey: "trigger_dev_zC25mKNn6c0q",
endpoint: "ws://localhost:8889/ws",
logLevel: "debug",
on: airtable.events.all({
baseId: "appBlf3KsalIQeMUo",
}),
run: async (event, ctx) => {
await ctx.logger.info(`Received webhook!`);
return event;
},
}).listen();
5 changes: 5 additions & 0 deletions examples/airtable/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"extends": "@trigger.dev/tsconfig/examples.json",
"include": ["src/**/*.ts"],
"exclude": ["node_modules", "**/*.test.*"]
}
3 changes: 2 additions & 1 deletion packages/internal-integrations/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"main": "./src/index.ts",
"types": "./src/index.ts",
"devDependencies": {
"@trigger.dev/providers": "workspace:*",
"@trigger.dev/tsconfig": "workspace:*",
"@types/debug": "^4.1.7",
"@types/node": "16",
"@trigger.dev/providers": "workspace:*",
"typescript": "^4.9.4"
},
"scripts": {},
Expand All @@ -20,6 +20,7 @@
"@urql/core": "^3.1.1",
"debug": "^4.3.4",
"graphql": "^16.6.0",
"ulid": "^2.3.0",
"urql": "^3.0.3",
"zod": "^3.20.2"
},
Expand Down
Loading