From e47cd21a96b4859da47a8b9a7d9028feb7af49a3 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Fri, 27 Dec 2024 20:24:27 +0530 Subject: [PATCH 01/15] Shopify integration mvp --- .../cron/shopify/checkout-completed/route.ts | 243 ++++++++++++++++++ .../api/shopify/webhook/customer-created.ts | 16 ++ .../web/app/api/shopify/webhook/order-paid.ts | 10 + apps/web/app/api/shopify/webhook/route.ts | 51 ++++ apps/web/app/api/shopify/webhook/utils.ts | 22 ++ apps/web/app/api/track/shopify/route.ts | 55 ++++ 6 files changed, 397 insertions(+) create mode 100644 apps/web/app/api/cron/shopify/checkout-completed/route.ts create mode 100644 apps/web/app/api/shopify/webhook/customer-created.ts create mode 100644 apps/web/app/api/shopify/webhook/order-paid.ts create mode 100644 apps/web/app/api/shopify/webhook/route.ts create mode 100644 apps/web/app/api/shopify/webhook/utils.ts create mode 100644 apps/web/app/api/track/shopify/route.ts diff --git a/apps/web/app/api/cron/shopify/checkout-completed/route.ts b/apps/web/app/api/cron/shopify/checkout-completed/route.ts new file mode 100644 index 0000000000..bc4c471f91 --- /dev/null +++ b/apps/web/app/api/cron/shopify/checkout-completed/route.ts @@ -0,0 +1,243 @@ +import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; +import { createSaleData } from "@/lib/api/sales/sale"; +import { createId } from "@/lib/api/utils"; +import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { generateRandomName } from "@/lib/names"; +import { getClickEvent, recordLead, recordSale } from "@/lib/tinybird"; +import { redis } from "@/lib/upstash"; +import z from "@/lib/zod"; +import { prisma } from "@dub/prisma"; +import { nanoid } from "@dub/utils"; +import { Customer } from "@prisma/client"; + +export const dynamic = "force-dynamic"; + +const schema = z.object({ + clickId: z.string(), + checkoutToken: z.string(), +}); + +const checkoutSchema = z.object({ + customer: z.object({ + id: z.number(), + }), // TODO: Test this with guest checkout + total_price: z.string(), + currency: z.string(), + confirmation_number: z.string(), +}); + +const workspace = { + id: "cl7pj5kq4006835rbjlt2ofka", +}; + +// POST /api/cron/shopify/checkout-completed +export async function POST(req: Request) { + try { + const body = await req.json(); + await verifyQstashSignature(req, body); + + const { clickId, checkoutToken } = schema.parse(body); + + // upstash-retried header + // console.log("Request headers", req.headers); + + // Find click event + const clickEvent = await getClickEvent({ clickId }); + + if (!clickEvent || clickEvent.data.length === 0) { + throw new DubApiError({ + code: "not_found", + message: `Click event not found for clickId: ${clickId}`, + }); + } + + // Find Shopify order + const order = await redis.get(`shopify:checkout:${checkoutToken}`); + + if (!order) { + throw new DubApiError({ + code: "bad_request", + message: "Shopify order not found. Waiting for order...", + }); + } + + const parsedOrder = checkoutSchema.parse(order); + const customerExternalId = parsedOrder.customer.id.toString(); + + // Fetch or create customer + let customer: Customer | null = await prisma.customer.findUnique({ + where: { + projectId_externalId: { + projectId: workspace.id, + externalId: customerExternalId, + }, + }, + }); + + const clickData = clickEvent.data[0]; + const { link_id: linkId, country, timestamp } = clickData; + + if (!customer) { + customer = await prisma.customer.create({ + data: { + id: createId({ prefix: "cus_" }), + name: generateRandomName(), + externalId: customerExternalId, + projectId: workspace.id, + clickedAt: new Date(timestamp + "Z"), + clickId, + linkId, + country, + }, + }); + + await Promise.all([ + // record lead + recordLead({ + ...clickData, + event_id: nanoid(16), + event_name: "Account created", + customer_id: customer.id, + }), + + // update link leads count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + leads: { + increment: 1, + }, + }, + }), + + // update workspace usage + prisma.project.update({ + where: { + id: workspace.id, + }, + data: { + usage: { + increment: 1, + }, + }, + }), + ]); + } + + const amount = Number(parsedOrder.total_price); + const currency = parsedOrder.currency; + const invoiceId = parsedOrder.confirmation_number; + + const eventId = nanoid(16); + const paymentProcessor = "shopify"; + + const [_sale, link, _project] = await Promise.all([ + // record sale + recordSale({ + ...clickData, + event_id: eventId, + event_name: "Purchase", + customer_id: customer.id, + payment_processor: paymentProcessor, + amount, + currency, + invoice_id: invoiceId, + metadata: JSON.stringify({ + parsedOrder, + }), + }), + + // update link sales count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + sales: { + increment: 1, + }, + saleAmount: { + increment: amount, + }, + }, + }), + + // update workspace sales usage + prisma.project.update({ + where: { + id: workspace.id, + }, + data: { + usage: { + increment: 1, + }, + salesUsage: { + increment: amount, + }, + }, + }), + ]); + + // for program links + if (link.programId) { + const { program, partner } = + await prisma.programEnrollment.findUniqueOrThrow({ + where: { + linkId: link.id, + }, + select: { + program: true, + partner: { + select: { + id: true, + }, + }, + }, + }); + + const saleRecord = createSaleData({ + customerId: customer.id, + linkId: link.id, + clickId: clickData.click_id, + invoiceId, + eventId, + paymentProcessor, + amount, + currency, + partnerId: partner.id, + program, + metadata: clickData, + }); + + await Promise.allSettled([ + prisma.sale.create({ + data: saleRecord, + }), + + notifyPartnerSale({ + partner: { + id: partner.id, + referralLink: link.shortLink, + }, + program, + sale: { + amount: saleRecord.amount, + earnings: saleRecord.earnings, + }, + }), + ]); + } + + // TODO: + // Send webhook event + + await redis.del(`shopify:checkout:${checkoutToken}`); + + return new Response("Shopify order tracked.", { status: 200 }); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/app/api/shopify/webhook/customer-created.ts b/apps/web/app/api/shopify/webhook/customer-created.ts new file mode 100644 index 0000000000..a285d6d762 --- /dev/null +++ b/apps/web/app/api/shopify/webhook/customer-created.ts @@ -0,0 +1,16 @@ +import { generateRandomName } from "@/lib/names"; +import { prisma } from "@dub/prisma"; + +// TODO: We probably don't need this event + +export async function customerCreated(event: any) { + console.log("customerCreated", event); + + await prisma.customer.create({ + data: { + projectId: "cl7pj5kq4006835rbjlt2ofka", + externalId: event.id.toString(), + name: generateRandomName(), + }, + }); +} diff --git a/apps/web/app/api/shopify/webhook/order-paid.ts b/apps/web/app/api/shopify/webhook/order-paid.ts new file mode 100644 index 0000000000..59b240e378 --- /dev/null +++ b/apps/web/app/api/shopify/webhook/order-paid.ts @@ -0,0 +1,10 @@ +import { redis } from "@/lib/upstash"; + +// TODO: +// Instead of checkout_token, maybe use order_number or checkout_id +// Add an expiry to the redis key + +export async function orderPaid(event: any) { + const checkoutToken = event.checkout_token; + await redis.set(`shopify:checkout:${checkoutToken}`, event); +} diff --git a/apps/web/app/api/shopify/webhook/route.ts b/apps/web/app/api/shopify/webhook/route.ts new file mode 100644 index 0000000000..57b158f015 --- /dev/null +++ b/apps/web/app/api/shopify/webhook/route.ts @@ -0,0 +1,51 @@ +import { parseRequestBody } from "@/lib/api/utils"; +import { log } from "@dub/utils"; +import { NextResponse } from "next/server"; +import { customerCreated } from "./customer-created"; +import { orderPaid } from "./order-paid"; + +// TODO: +// Verify the webhook signature + +const relevantTopics = new Set([ + "customers/create", + "customers/update", + "orders/paid", +]); + +// POST /api/shopify/webhook – Listen to Shopify webhook events +export const POST = async (req: Request) => { + const body = await parseRequestBody(req); + + // Find the topic from the headers + const headers = req.headers; + const topic = headers.get("x-shopify-topic") || ""; + + if (!relevantTopics.has(topic)) { + return new Response("Unsupported topic, skipping...", { + status: 200, + }); + } + + try { + switch (topic) { + case "customers/create": + await customerCreated(body); + break; + case "orders/paid": + await orderPaid(body); + break; + } + } catch (error) { + await log({ + message: `Shopify webhook failed. Error: ${error.message}`, + type: "errors", + }); + + return new Response('Webhook error: "Webhook handler failed. View logs."', { + status: 400, + }); + } + + return NextResponse.json("OK"); +}; diff --git a/apps/web/app/api/shopify/webhook/utils.ts b/apps/web/app/api/shopify/webhook/utils.ts new file mode 100644 index 0000000000..0747f3897d --- /dev/null +++ b/apps/web/app/api/shopify/webhook/utils.ts @@ -0,0 +1,22 @@ +import crypto from "crypto"; +import { NextRequest } from "next/server"; + +export async function verifyWebhookSignature(req: NextRequest) { + const signature = req.headers["x-shopify-hmac-sha256"]; + + const genSig = crypto + .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) + .update(JSON.stringify(req)) + .digest("base64"); + + console.log({ + genSig, + signature, + }); + + if (genSig !== signature) { + throw new Error("Invalid webhook signature."); + } + + return true; +} diff --git a/apps/web/app/api/track/shopify/route.ts b/apps/web/app/api/track/shopify/route.ts new file mode 100644 index 0000000000..0f2d128e91 --- /dev/null +++ b/apps/web/app/api/track/shopify/route.ts @@ -0,0 +1,55 @@ +import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { parseRequestBody } from "@/lib/api/utils"; +import { qstash } from "@/lib/cron"; +import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; +import { waitUntil } from "@vercel/functions"; +import { NextResponse } from "next/server"; + +export const runtime = "edge"; + +const CORS_HEADERS = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", +}; + +// TODO: +// Add rate limiting + +// POST /api/track/shopify – Track a Shopify event +export const POST = async (req: Request) => { + try { + const { clickId, checkoutToken } = await parseRequestBody(req); + + if (!clickId || !checkoutToken) { + throw new DubApiError({ + code: "bad_request", + message: "Missing clickId or checkoutToken", + }); + } + + waitUntil( + qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/shopify/checkout-completed`, + body: { + clickId, + checkoutToken, + }, + retries: 5, + }), + ); + + return NextResponse.json("OK", { + headers: CORS_HEADERS, + }); + } catch (error) { + return handleAndReturnErrorResponse(error, CORS_HEADERS); + } +}; + +export const OPTIONS = () => { + return new Response(null, { + status: 204, + headers: CORS_HEADERS, + }); +}; From 9f81e273ead7356d07fc41ddfae0aa279a7f6b27 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Fri, 27 Dec 2024 23:17:44 +0530 Subject: [PATCH 02/15] some fixes --- .../cron/shopify/checkout-completed/route.ts | 11 +++++----- .../api/shopify/webhook/customer-created.ts | 16 -------------- apps/web/app/api/shopify/webhook/route.ts | 20 ++++++------------ apps/web/app/api/shopify/webhook/utils.ts | 21 +++++++++++-------- 4 files changed, 23 insertions(+), 45 deletions(-) delete mode 100644 apps/web/app/api/shopify/webhook/customer-created.ts diff --git a/apps/web/app/api/cron/shopify/checkout-completed/route.ts b/apps/web/app/api/cron/shopify/checkout-completed/route.ts index bc4c471f91..9d0ed9649d 100644 --- a/apps/web/app/api/cron/shopify/checkout-completed/route.ts +++ b/apps/web/app/api/cron/shopify/checkout-completed/route.ts @@ -127,13 +127,13 @@ export async function POST(req: Request) { ]); } - const amount = Number(parsedOrder.total_price); + const eventId = nanoid(16); + const amount = Number(parsedOrder.total_price) * 100; const currency = parsedOrder.currency; const invoiceId = parsedOrder.confirmation_number; - - const eventId = nanoid(16); const paymentProcessor = "shopify"; + const [_sale, link, _project] = await Promise.all([ // record sale recordSale({ @@ -145,9 +145,8 @@ export async function POST(req: Request) { amount, currency, invoice_id: invoiceId, - metadata: JSON.stringify({ - parsedOrder, - }), + metadata: JSON.stringify(parsedOrder), + }), // update link sales count diff --git a/apps/web/app/api/shopify/webhook/customer-created.ts b/apps/web/app/api/shopify/webhook/customer-created.ts deleted file mode 100644 index a285d6d762..0000000000 --- a/apps/web/app/api/shopify/webhook/customer-created.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { generateRandomName } from "@/lib/names"; -import { prisma } from "@dub/prisma"; - -// TODO: We probably don't need this event - -export async function customerCreated(event: any) { - console.log("customerCreated", event); - - await prisma.customer.create({ - data: { - projectId: "cl7pj5kq4006835rbjlt2ofka", - externalId: event.id.toString(), - name: generateRandomName(), - }, - }); -} diff --git a/apps/web/app/api/shopify/webhook/route.ts b/apps/web/app/api/shopify/webhook/route.ts index 57b158f015..c6c9dc786e 100644 --- a/apps/web/app/api/shopify/webhook/route.ts +++ b/apps/web/app/api/shopify/webhook/route.ts @@ -1,25 +1,20 @@ -import { parseRequestBody } from "@/lib/api/utils"; import { log } from "@dub/utils"; import { NextResponse } from "next/server"; -import { customerCreated } from "./customer-created"; import { orderPaid } from "./order-paid"; +import { verifyShopifySignature } from "./utils"; -// TODO: -// Verify the webhook signature - -const relevantTopics = new Set([ - "customers/create", - "customers/update", - "orders/paid", -]); +const relevantTopics = new Set(["orders/paid"]); // POST /api/shopify/webhook – Listen to Shopify webhook events export const POST = async (req: Request) => { - const body = await parseRequestBody(req); + const body = await req.json(); // Find the topic from the headers const headers = req.headers; const topic = headers.get("x-shopify-topic") || ""; + const signature = headers.get("x-shopify-hmac-sha256") || ""; + + await verifyShopifySignature({ body, signature }); if (!relevantTopics.has(topic)) { return new Response("Unsupported topic, skipping...", { @@ -29,9 +24,6 @@ export const POST = async (req: Request) => { try { switch (topic) { - case "customers/create": - await customerCreated(body); - break; case "orders/paid": await orderPaid(body); break; diff --git a/apps/web/app/api/shopify/webhook/utils.ts b/apps/web/app/api/shopify/webhook/utils.ts index 0747f3897d..e1902d6957 100644 --- a/apps/web/app/api/shopify/webhook/utils.ts +++ b/apps/web/app/api/shopify/webhook/utils.ts @@ -1,22 +1,25 @@ import crypto from "crypto"; -import { NextRequest } from "next/server"; -export async function verifyWebhookSignature(req: NextRequest) { - const signature = req.headers["x-shopify-hmac-sha256"]; - - const genSig = crypto +export const verifyShopifySignature = async ({ + body, + signature, +}: { + body: Record; + signature: string; +}) => { + const generatedSignature = crypto .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) - .update(JSON.stringify(req)) + .update(JSON.stringify(body)) .digest("base64"); console.log({ - genSig, + generatedSignature, signature, }); - if (genSig !== signature) { + if (generatedSignature !== signature) { throw new Error("Invalid webhook signature."); } return true; -} +}; From c3a1feeb2ab2d91cc081e0a782bbe324bf664da7 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Sun, 29 Dec 2024 18:04:19 +0530 Subject: [PATCH 03/15] add /api/shopify/integration/callback --- apps/web/app/api/oauth/userinfo/route.ts | 2 + .../api/shopify/integration/callback/route.ts | 91 +++++++++++++++++++ .../{ => integration}/webhook/order-paid.ts | 0 .../{ => integration}/webhook/route.ts | 2 +- .../{ => integration}/webhook/utils.ts | 7 -- packages/prisma/schema/workspace.prisma | 1 + packages/utils/src/constants/integrations.ts | 1 + 7 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 apps/web/app/api/shopify/integration/callback/route.ts rename apps/web/app/api/shopify/{ => integration}/webhook/order-paid.ts (100%) rename apps/web/app/api/shopify/{ => integration}/webhook/route.ts (93%) rename apps/web/app/api/shopify/{ => integration}/webhook/utils.ts (84%) diff --git a/apps/web/app/api/oauth/userinfo/route.ts b/apps/web/app/api/oauth/userinfo/route.ts index cb15d13369..7fd3bccab8 100644 --- a/apps/web/app/api/oauth/userinfo/route.ts +++ b/apps/web/app/api/oauth/userinfo/route.ts @@ -30,6 +30,7 @@ export async function GET(req: NextRequest) { }, project: { select: { + id: true, name: true, slug: true, logo: true, @@ -52,6 +53,7 @@ export async function GET(req: NextRequest) { name: user.name, image: user.image, workspace: { + id: `ws_${tokenRecord.project.id}`, slug: tokenRecord.project.slug, name: tokenRecord.project.name, logo: tokenRecord.project.logo, diff --git a/apps/web/app/api/shopify/integration/callback/route.ts b/apps/web/app/api/shopify/integration/callback/route.ts new file mode 100644 index 0000000000..564bd6da2d --- /dev/null +++ b/apps/web/app/api/shopify/integration/callback/route.ts @@ -0,0 +1,91 @@ +import { DubApiError } from "@/lib/api/errors"; +import { parseRequestBody } from "@/lib/api/utils"; +import { withWorkspace } from "@/lib/auth"; +import { installIntegration } from "@/lib/integrations/install"; +import { prisma } from "@dub/prisma"; +import { SHOPIFY_INTEGRATION_ID } from "@dub/utils"; +import { waitUntil } from "@vercel/functions"; +import { NextResponse } from "next/server"; +import { z } from "zod"; + +const updateWorkspaceSchema = z.object({ + shopifyStoreId: z.string().nullable(), +}); + +// PATCH /api/shopify/integration/callback – update a shopify store id +export const PATCH = withWorkspace( + async ({ req, workspace, session }) => { + const body = await parseRequestBody(req); + const { shopifyStoreId } = updateWorkspaceSchema.parse(body); + + try { + const response = await prisma.project.update({ + where: { + id: workspace.id, + }, + data: { + shopifyStoreId, + }, + select: { + shopifyStoreId: true, + }, + }); + + waitUntil( + (async () => { + const installation = await prisma.installedIntegration.findUnique({ + where: { + userId_integrationId_projectId: { + userId: session.user.id, + projectId: workspace.id, + integrationId: SHOPIFY_INTEGRATION_ID, + }, + }, + select: { + id: true, + }, + }); + + // Install the integration if it doesn't exist + if (!installation) { + await installIntegration({ + userId: session.user.id, + workspaceId: workspace.id, + integrationId: SHOPIFY_INTEGRATION_ID, + credentials: { + shopifyStoreId, + }, + }); + } + + // Uninstall the integration if the shopify store id is null + if (installation && shopifyStoreId === null) { + await prisma.installedIntegration.delete({ + where: { + id: installation.id, + }, + }); + } + })(), + ); + + return NextResponse.json(response); + } catch (error) { + if (error.code === "P2002") { + throw new DubApiError({ + code: "conflict", + message: `The shopify store "${shopifyStoreId}" is already in use.`, + }); + } + + throw new DubApiError({ + code: "internal_server_error", + message: error.message, + }); + } + }, + { + requiredPermissions: ["workspaces.write"], + requiredAddOn: "conversion", + }, +); diff --git a/apps/web/app/api/shopify/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts similarity index 100% rename from apps/web/app/api/shopify/webhook/order-paid.ts rename to apps/web/app/api/shopify/integration/webhook/order-paid.ts diff --git a/apps/web/app/api/shopify/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts similarity index 93% rename from apps/web/app/api/shopify/webhook/route.ts rename to apps/web/app/api/shopify/integration/webhook/route.ts index c6c9dc786e..1f8ce4c4b8 100644 --- a/apps/web/app/api/shopify/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -5,7 +5,7 @@ import { verifyShopifySignature } from "./utils"; const relevantTopics = new Set(["orders/paid"]); -// POST /api/shopify/webhook – Listen to Shopify webhook events +// POST /api/shopify/integration/webhook – Listen to Shopify webhook events export const POST = async (req: Request) => { const body = await req.json(); diff --git a/apps/web/app/api/shopify/webhook/utils.ts b/apps/web/app/api/shopify/integration/webhook/utils.ts similarity index 84% rename from apps/web/app/api/shopify/webhook/utils.ts rename to apps/web/app/api/shopify/integration/webhook/utils.ts index e1902d6957..2212aad314 100644 --- a/apps/web/app/api/shopify/webhook/utils.ts +++ b/apps/web/app/api/shopify/integration/webhook/utils.ts @@ -12,14 +12,7 @@ export const verifyShopifySignature = async ({ .update(JSON.stringify(body)) .digest("base64"); - console.log({ - generatedSignature, - signature, - }); - if (generatedSignature !== signature) { throw new Error("Invalid webhook signature."); } - - return true; }; diff --git a/packages/prisma/schema/workspace.prisma b/packages/prisma/schema/workspace.prisma index 18cd572653..f8bf448e7e 100644 --- a/packages/prisma/schema/workspace.prisma +++ b/packages/prisma/schema/workspace.prisma @@ -11,6 +11,7 @@ model Project { paymentFailedAt DateTime? stripeConnectId String? @unique // for Stripe Integration payoutMethodId String? // for Stripe Connect payouts + shopifyStoreId String? @unique // for Shopify Integration usage Int @default(0) usageLimit Int @default(1000) diff --git a/packages/utils/src/constants/integrations.ts b/packages/utils/src/constants/integrations.ts index b09dabc288..2b06e8225a 100644 --- a/packages/utils/src/constants/integrations.ts +++ b/packages/utils/src/constants/integrations.ts @@ -1,2 +1,3 @@ export const SLACK_INTEGRATION_ID = "clzu59rx9000110bm5fnlzwuj"; export const STRIPE_INTEGRATION_ID = "clzra1ya60001wnj4a89zcg9h"; +export const SHOPIFY_INTEGRATION_ID = "int_edz1HUu12L3kL7s9kLRLycuM"; \ No newline at end of file From b552cff68ecafe95ddf50d774e41b3141b73e52a Mon Sep 17 00:00:00 2001 From: Kiran K Date: Sun, 29 Dec 2024 19:00:57 +0530 Subject: [PATCH 04/15] wip --- .../cron/shopify/checkout-completed/route.ts | 85 +++++++++++-------- .../shopify/integration/webhook/order-paid.ts | 14 ++- .../api/shopify/integration/webhook/route.ts | 34 ++++++-- apps/web/app/api/track/shopify/route.ts | 3 +- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/apps/web/app/api/cron/shopify/checkout-completed/route.ts b/apps/web/app/api/cron/shopify/checkout-completed/route.ts index 9d0ed9649d..2b0409f6da 100644 --- a/apps/web/app/api/cron/shopify/checkout-completed/route.ts +++ b/apps/web/app/api/cron/shopify/checkout-completed/route.ts @@ -4,7 +4,12 @@ import { createSaleData } from "@/lib/api/sales/sale"; import { createId } from "@/lib/api/utils"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { generateRandomName } from "@/lib/names"; -import { getClickEvent, recordLead, recordSale } from "@/lib/tinybird"; +import { + getClickEvent, + getLeadEvent, + recordLead, + recordSale, +} from "@/lib/tinybird"; import { redis } from "@/lib/upstash"; import z from "@/lib/zod"; import { prisma } from "@dub/prisma"; @@ -19,18 +24,15 @@ const schema = z.object({ }); const checkoutSchema = z.object({ - customer: z.object({ - id: z.number(), - }), // TODO: Test this with guest checkout total_price: z.string(), currency: z.string(), confirmation_number: z.string(), + workspaceId: z.string(), + customer: z.object({ + id: z.number(), + }), }); -const workspace = { - id: "cl7pj5kq4006835rbjlt2ofka", -}; - // POST /api/cron/shopify/checkout-completed export async function POST(req: Request) { try { @@ -39,17 +41,13 @@ export async function POST(req: Request) { const { clickId, checkoutToken } = schema.parse(body); - // upstash-retried header - // console.log("Request headers", req.headers); - // Find click event const clickEvent = await getClickEvent({ clickId }); if (!clickEvent || clickEvent.data.length === 0) { - throw new DubApiError({ - code: "not_found", - message: `Click event not found for clickId: ${clickId}`, - }); + return new Response( + `[Shopify] Click event not found for clickId: ${clickId}`, + ); } // Find Shopify order @@ -58,18 +56,21 @@ export async function POST(req: Request) { if (!order) { throw new DubApiError({ code: "bad_request", - message: "Shopify order not found. Waiting for order...", + message: "Shopify order not found. Waiting for order...", // This will be retried by Qstash }); } const parsedOrder = checkoutSchema.parse(order); + const workspaceId = parsedOrder.workspaceId; const customerExternalId = parsedOrder.customer.id.toString(); - // Fetch or create customer + console.log("parsedOrder", parsedOrder); + + // Fetch customer let customer: Customer | null = await prisma.customer.findUnique({ where: { projectId_externalId: { - projectId: workspace.id, + projectId: workspaceId, externalId: customerExternalId, }, }, @@ -78,13 +79,17 @@ export async function POST(req: Request) { const clickData = clickEvent.data[0]; const { link_id: linkId, country, timestamp } = clickData; + // Handle the lead if (!customer) { + // TODO: + // Fetch customer from Shopify and use their email & name + customer = await prisma.customer.create({ data: { id: createId({ prefix: "cus_" }), name: generateRandomName(), externalId: customerExternalId, - projectId: workspace.id, + projectId: workspaceId, clickedAt: new Date(timestamp + "Z"), clickId, linkId, @@ -116,7 +121,7 @@ export async function POST(req: Request) { // update workspace usage prisma.project.update({ where: { - id: workspace.id, + id: workspaceId, }, data: { usage: { @@ -127,27 +132,33 @@ export async function POST(req: Request) { ]); } + // Find lead + const leadEvent = await getLeadEvent({ customerId: customer.id }); + if (!leadEvent || leadEvent.data.length === 0) { + return `[Shopify] Lead event with customer ID ${customer.id} not found, skipping...`; + } + + // Handle the sale const eventId = nanoid(16); const amount = Number(parsedOrder.total_price) * 100; const currency = parsedOrder.currency; const invoiceId = parsedOrder.confirmation_number; const paymentProcessor = "shopify"; + const saleData = { + ...leadEvent.data[0], + event_id: nanoid(16), + event_name: "Purchase", + payment_processor: "shopify", + amount, + currency, + invoice_id: invoiceId, + metadata: JSON.stringify(parsedOrder), + }; const [_sale, link, _project] = await Promise.all([ // record sale - recordSale({ - ...clickData, - event_id: eventId, - event_name: "Purchase", - customer_id: customer.id, - payment_processor: paymentProcessor, - amount, - currency, - invoice_id: invoiceId, - metadata: JSON.stringify(parsedOrder), - - }), + recordSale(saleData), // update link sales count prisma.link.update({ @@ -167,7 +178,7 @@ export async function POST(req: Request) { // update workspace sales usage prisma.project.update({ where: { - id: workspace.id, + id: workspaceId, }, data: { usage: { @@ -185,7 +196,7 @@ export async function POST(req: Request) { const { program, partner } = await prisma.programEnrollment.findUniqueOrThrow({ where: { - linkId: link.id, + linkId, }, select: { program: true, @@ -199,8 +210,8 @@ export async function POST(req: Request) { const saleRecord = createSaleData({ customerId: customer.id, - linkId: link.id, - clickId: clickData.click_id, + linkId, + clickId, invoiceId, eventId, paymentProcessor, @@ -235,7 +246,7 @@ export async function POST(req: Request) { await redis.del(`shopify:checkout:${checkoutToken}`); - return new Response("Shopify order tracked.", { status: 200 }); + return new Response("[Shopify] Order event processed successfully."); } catch (error) { return handleAndReturnErrorResponse(error); } diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index 59b240e378..ff8360bae2 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -4,7 +4,17 @@ import { redis } from "@/lib/upstash"; // Instead of checkout_token, maybe use order_number or checkout_id // Add an expiry to the redis key -export async function orderPaid(event: any) { +export async function orderPaid({ + event, + workspaceId, +}: { + event: any; + workspaceId: string; +}) { const checkoutToken = event.checkout_token; - await redis.set(`shopify:checkout:${checkoutToken}`, event); + + await redis.set(`shopify:checkout:${checkoutToken}`, { + ...event, + workspaceId, + }); } diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts index 1f8ce4c4b8..2d8c825999 100644 --- a/apps/web/app/api/shopify/integration/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -1,3 +1,4 @@ +import { prisma } from "@dub/prisma"; import { log } from "@dub/utils"; import { NextResponse } from "next/server"; import { orderPaid } from "./order-paid"; @@ -8,24 +9,39 @@ const relevantTopics = new Set(["orders/paid"]); // POST /api/shopify/integration/webhook – Listen to Shopify webhook events export const POST = async (req: Request) => { const body = await req.json(); - - // Find the topic from the headers const headers = req.headers; const topic = headers.get("x-shopify-topic") || ""; const signature = headers.get("x-shopify-hmac-sha256") || ""; + const shop = headers.get("x-shopify-shop-domain") || ""; await verifyShopifySignature({ body, signature }); if (!relevantTopics.has(topic)) { - return new Response("Unsupported topic, skipping...", { - status: 200, - }); + return new Response(`[Shopify] Unsupported topic: ${topic}. Skipping...`); + } + + const workspace = await prisma.project.findUnique({ + where: { + shopifyStoreId: shop, + }, + select: { + id: true, + }, + }); + + if (!workspace) { + return new Response( + `[Shopify] Workspace not found for shop: ${shop}. Skipping...`, + ); } try { switch (topic) { case "orders/paid": - await orderPaid(body); + await orderPaid({ + event: body, + workspaceId: workspace.id, + }); break; } } catch (error) { @@ -34,9 +50,9 @@ export const POST = async (req: Request) => { type: "errors", }); - return new Response('Webhook error: "Webhook handler failed. View logs."', { - status: 400, - }); + return new Response( + `[Shopify] Webhook error: "Webhook handler failed. View logs."`, + ); } return NextResponse.json("OK"); diff --git a/apps/web/app/api/track/shopify/route.ts b/apps/web/app/api/track/shopify/route.ts index 0f2d128e91..3388727a1d 100644 --- a/apps/web/app/api/track/shopify/route.ts +++ b/apps/web/app/api/track/shopify/route.ts @@ -15,8 +15,9 @@ const CORS_HEADERS = { // TODO: // Add rate limiting +// Finalize the endpoint (Maybe move to /api/shopify/track) -// POST /api/track/shopify – Track a Shopify event +// POST /api/track/shopify – Handle the Shopify Pixel events export const POST = async (req: Request) => { try { const { clickId, checkoutToken } = await parseRequestBody(req); From 94c0081637b9184583c4cdb65eb6595cc0f89fe6 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Sun, 29 Dec 2024 23:42:56 +0530 Subject: [PATCH 05/15] improve the webhooks --- .../cron/shopify/checkout-completed/route.ts | 253 -------------- .../app/api/cron/shopify/order-paid/route.ts | 320 ++++++++++++++++++ .../shopify/integration/webhook/order-paid.ts | 24 +- .../api/shopify/integration/webhook/route.ts | 2 +- apps/web/app/api/track/shopify/route.ts | 23 +- 5 files changed, 351 insertions(+), 271 deletions(-) delete mode 100644 apps/web/app/api/cron/shopify/checkout-completed/route.ts create mode 100644 apps/web/app/api/cron/shopify/order-paid/route.ts diff --git a/apps/web/app/api/cron/shopify/checkout-completed/route.ts b/apps/web/app/api/cron/shopify/checkout-completed/route.ts deleted file mode 100644 index 2b0409f6da..0000000000 --- a/apps/web/app/api/cron/shopify/checkout-completed/route.ts +++ /dev/null @@ -1,253 +0,0 @@ -import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; -import { createSaleData } from "@/lib/api/sales/sale"; -import { createId } from "@/lib/api/utils"; -import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; -import { generateRandomName } from "@/lib/names"; -import { - getClickEvent, - getLeadEvent, - recordLead, - recordSale, -} from "@/lib/tinybird"; -import { redis } from "@/lib/upstash"; -import z from "@/lib/zod"; -import { prisma } from "@dub/prisma"; -import { nanoid } from "@dub/utils"; -import { Customer } from "@prisma/client"; - -export const dynamic = "force-dynamic"; - -const schema = z.object({ - clickId: z.string(), - checkoutToken: z.string(), -}); - -const checkoutSchema = z.object({ - total_price: z.string(), - currency: z.string(), - confirmation_number: z.string(), - workspaceId: z.string(), - customer: z.object({ - id: z.number(), - }), -}); - -// POST /api/cron/shopify/checkout-completed -export async function POST(req: Request) { - try { - const body = await req.json(); - await verifyQstashSignature(req, body); - - const { clickId, checkoutToken } = schema.parse(body); - - // Find click event - const clickEvent = await getClickEvent({ clickId }); - - if (!clickEvent || clickEvent.data.length === 0) { - return new Response( - `[Shopify] Click event not found for clickId: ${clickId}`, - ); - } - - // Find Shopify order - const order = await redis.get(`shopify:checkout:${checkoutToken}`); - - if (!order) { - throw new DubApiError({ - code: "bad_request", - message: "Shopify order not found. Waiting for order...", // This will be retried by Qstash - }); - } - - const parsedOrder = checkoutSchema.parse(order); - const workspaceId = parsedOrder.workspaceId; - const customerExternalId = parsedOrder.customer.id.toString(); - - console.log("parsedOrder", parsedOrder); - - // Fetch customer - let customer: Customer | null = await prisma.customer.findUnique({ - where: { - projectId_externalId: { - projectId: workspaceId, - externalId: customerExternalId, - }, - }, - }); - - const clickData = clickEvent.data[0]; - const { link_id: linkId, country, timestamp } = clickData; - - // Handle the lead - if (!customer) { - // TODO: - // Fetch customer from Shopify and use their email & name - - customer = await prisma.customer.create({ - data: { - id: createId({ prefix: "cus_" }), - name: generateRandomName(), - externalId: customerExternalId, - projectId: workspaceId, - clickedAt: new Date(timestamp + "Z"), - clickId, - linkId, - country, - }, - }); - - await Promise.all([ - // record lead - recordLead({ - ...clickData, - event_id: nanoid(16), - event_name: "Account created", - customer_id: customer.id, - }), - - // update link leads count - prisma.link.update({ - where: { - id: linkId, - }, - data: { - leads: { - increment: 1, - }, - }, - }), - - // update workspace usage - prisma.project.update({ - where: { - id: workspaceId, - }, - data: { - usage: { - increment: 1, - }, - }, - }), - ]); - } - - // Find lead - const leadEvent = await getLeadEvent({ customerId: customer.id }); - if (!leadEvent || leadEvent.data.length === 0) { - return `[Shopify] Lead event with customer ID ${customer.id} not found, skipping...`; - } - - // Handle the sale - const eventId = nanoid(16); - const amount = Number(parsedOrder.total_price) * 100; - const currency = parsedOrder.currency; - const invoiceId = parsedOrder.confirmation_number; - const paymentProcessor = "shopify"; - - const saleData = { - ...leadEvent.data[0], - event_id: nanoid(16), - event_name: "Purchase", - payment_processor: "shopify", - amount, - currency, - invoice_id: invoiceId, - metadata: JSON.stringify(parsedOrder), - }; - - const [_sale, link, _project] = await Promise.all([ - // record sale - recordSale(saleData), - - // update link sales count - prisma.link.update({ - where: { - id: linkId, - }, - data: { - sales: { - increment: 1, - }, - saleAmount: { - increment: amount, - }, - }, - }), - - // update workspace sales usage - prisma.project.update({ - where: { - id: workspaceId, - }, - data: { - usage: { - increment: 1, - }, - salesUsage: { - increment: amount, - }, - }, - }), - ]); - - // for program links - if (link.programId) { - const { program, partner } = - await prisma.programEnrollment.findUniqueOrThrow({ - where: { - linkId, - }, - select: { - program: true, - partner: { - select: { - id: true, - }, - }, - }, - }); - - const saleRecord = createSaleData({ - customerId: customer.id, - linkId, - clickId, - invoiceId, - eventId, - paymentProcessor, - amount, - currency, - partnerId: partner.id, - program, - metadata: clickData, - }); - - await Promise.allSettled([ - prisma.sale.create({ - data: saleRecord, - }), - - notifyPartnerSale({ - partner: { - id: partner.id, - referralLink: link.shortLink, - }, - program, - sale: { - amount: saleRecord.amount, - earnings: saleRecord.earnings, - }, - }), - ]); - } - - // TODO: - // Send webhook event - - await redis.del(`shopify:checkout:${checkoutToken}`); - - return new Response("[Shopify] Order event processed successfully."); - } catch (error) { - return handleAndReturnErrorResponse(error); - } -} diff --git a/apps/web/app/api/cron/shopify/order-paid/route.ts b/apps/web/app/api/cron/shopify/order-paid/route.ts new file mode 100644 index 0000000000..9969d15568 --- /dev/null +++ b/apps/web/app/api/cron/shopify/order-paid/route.ts @@ -0,0 +1,320 @@ +import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; +import { createSaleData } from "@/lib/api/sales/sale"; +import { createId } from "@/lib/api/utils"; +import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { generateRandomName } from "@/lib/names"; +import { + getClickEvent, + getLeadEvent, + recordLead, + recordSale, +} from "@/lib/tinybird"; +import { redis } from "@/lib/upstash"; +import z from "@/lib/zod"; +import { leadEventSchemaTB } from "@/lib/zod/schemas/leads"; +import { prisma } from "@dub/prisma"; +import { nanoid } from "@dub/utils"; +import { Customer } from "@prisma/client"; + +export const dynamic = "force-dynamic"; + +const schema = z.object({ + workspaceId: z.string(), + checkoutToken: z.string(), +}); + +const orderSchema = z.object({ + total_price: z.string(), + currency: z.string(), + confirmation_number: z.string(), + customer: z.object({ + id: z.number(), + }), +}); + +// POST /api/cron/shopify/order-paid +export async function POST(req: Request) { + try { + const body = await req.json(); + await verifyQstashSignature(req, body); + + const { workspaceId, checkoutToken } = schema.parse(body); + + // Find Shopify order + const order = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "order", + ); + + if (!order) { + return new Response(`[Shopify] Shopify order not found. Skipping...`); + } + + const orderData = orderSchema.parse(order); + const externalId = orderData.customer.id.toString(); + + // Fetch customer + const customer: Customer | null = await prisma.customer.findUnique({ + where: { + projectId_externalId: { + projectId: workspaceId, + externalId, + }, + }, + }); + + if (customer) { + // Existing customer + await createSale({ + order, + workspaceId, + customerId: customer.id, + }); + } else { + const clickId = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "clickId", + ); + + if (!clickId) { + // Wait for the click event to come from Shopify pixel + throw new DubApiError({ + code: "bad_request", + message: + "[Shopify] Click event not found. Waiting for Shopify pixel event...", + }); + } + + const leadData = await createLead({ + clickId, + externalId, + workspaceId, + }); + + await createSale({ + order, + workspaceId, + leadData, + customerId: leadData.customer_id, + }); + } + + await redis.del(`shopify:checkout:${checkoutToken}`); + + return new Response("[Shopify] Order event processed successfully."); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} + +const createLead = async ({ + clickId, + externalId, + workspaceId, +}: { + clickId: string; + externalId: string; + workspaceId: string; +}) => { + // find click + const clickEvent = await getClickEvent({ clickId }); + + const clickData = clickEvent.data[0]; + const { link_id: linkId, country, timestamp } = clickData; + + // create customer + const customer = await prisma.customer.create({ + data: { + id: createId({ prefix: "cus_" }), + name: generateRandomName(), + externalId, + projectId: workspaceId, + clickedAt: new Date(timestamp + "Z"), + clickId, + linkId, + country, + }, + }); + + const leadData = leadEventSchemaTB.parse({ + ...clickData, + event_id: nanoid(16), + event_name: "Account created", + customer_id: customer.id, + }); + + await Promise.all([ + // record lead + recordLead(leadData), + + // update link leads count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + leads: { + increment: 1, + }, + }, + }), + + // update workspace usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + }, + }), + ]); + + return leadData; +}; + +const createSale = async ({ + order, + customerId, + workspaceId, + leadData, +}: { + order: any; + customerId: string; + workspaceId: string; + leadData?: z.infer; +}) => { + if (!leadData) { + const leadEvent = await getLeadEvent({ customerId }); + + if (!leadEvent || leadEvent.data.length === 0) { + return new Response( + `[Shopify] Lead event with customer ID ${customerId} not found, skipping...`, + ); + } + + leadData = leadEvent.data[0]; + } + + const { link_id: linkId, click_id: clickId } = leadData; + + const orderData = orderSchema.parse(order); + const eventId = nanoid(16); + const paymentProcessor = "shopify"; + + const amount = Number(orderData.total_price) * 100; + const currency = orderData.currency; + const invoiceId = orderData.confirmation_number; + + const sale = await prisma.sale.findFirst({ + where: { + invoiceId, + clickId, + }, + }); + + if (sale) { + return new Response( + `[Shopify] Order has been processed already. Skipping...`, + ); + } + + const saleData = { + ...leadData, + event_id: eventId, + event_name: "Purchase", + payment_processor: paymentProcessor, + amount, + currency, + invoice_id: invoiceId, + metadata: JSON.stringify(order), + }; + + const [_sale, link, _project] = await Promise.all([ + // record sale + recordSale(saleData), + + // update link sales count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + sales: { + increment: 1, + }, + saleAmount: { + increment: amount, + }, + }, + }), + + // update workspace sales usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + salesUsage: { + increment: amount, + }, + }, + }), + ]); + + // for program links + if (link.programId) { + const { program, partner } = + await prisma.programEnrollment.findUniqueOrThrow({ + where: { + linkId, + }, + select: { + program: true, + partner: { + select: { + id: true, + }, + }, + }, + }); + + const saleRecord = createSaleData({ + customerId, + linkId, + clickId, + invoiceId, + eventId, + paymentProcessor, + amount, + currency, + partnerId: partner.id, + program, + metadata: order, + }); + + await Promise.allSettled([ + prisma.sale.create({ + data: saleRecord, + }), + + notifyPartnerSale({ + partner: { + id: partner.id, + referralLink: link.shortLink, + }, + program, + sale: { + amount: saleRecord.amount, + earnings: saleRecord.earnings, + }, + }), + ]); + } +}; diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index ff8360bae2..580b1b1bc6 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -1,20 +1,30 @@ +import { qstash } from "@/lib/cron"; import { redis } from "@/lib/upstash"; +import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; // TODO: // Instead of checkout_token, maybe use order_number or checkout_id -// Add an expiry to the redis key export async function orderPaid({ - event, + order, workspaceId, }: { - event: any; + order: any; workspaceId: string; }) { - const checkoutToken = event.checkout_token; + const checkoutToken = order.checkout_token; - await redis.set(`shopify:checkout:${checkoutToken}`, { - ...event, - workspaceId, + await redis.hset(`shopify:checkout:${checkoutToken}`, { + order, + }); + + await qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/shopify/order-paid`, + body: { + checkoutToken, + workspaceId, + }, + retries: 3, + delay: 3, }); } diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts index 2d8c825999..f08f4198f4 100644 --- a/apps/web/app/api/shopify/integration/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -39,7 +39,7 @@ export const POST = async (req: Request) => { switch (topic) { case "orders/paid": await orderPaid({ - event: body, + order: body, workspaceId: workspace.id, }); break; diff --git a/apps/web/app/api/track/shopify/route.ts b/apps/web/app/api/track/shopify/route.ts index 3388727a1d..b447233234 100644 --- a/apps/web/app/api/track/shopify/route.ts +++ b/apps/web/app/api/track/shopify/route.ts @@ -1,7 +1,7 @@ import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; import { parseRequestBody } from "@/lib/api/utils"; -import { qstash } from "@/lib/cron"; -import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; +import { getClickEvent } from "@/lib/tinybird"; +import { redis } from "@/lib/upstash"; import { waitUntil } from "@vercel/functions"; import { NextResponse } from "next/server"; @@ -15,7 +15,7 @@ const CORS_HEADERS = { // TODO: // Add rate limiting -// Finalize the endpoint (Maybe move to /api/shopify/track) +// Finalize the endpoint (Maybe move to /api/shopify/pixel) // POST /api/track/shopify – Handle the Shopify Pixel events export const POST = async (req: Request) => { @@ -29,14 +29,17 @@ export const POST = async (req: Request) => { }); } + const clickEvent = await getClickEvent({ clickId }); + + if (!clickEvent || clickEvent.data.length === 0) { + return new Response( + `[Shopify] Click event not found for clickId: ${clickId}`, + ); + } + waitUntil( - qstash.publishJSON({ - url: `${APP_DOMAIN_WITH_NGROK}/api/cron/shopify/checkout-completed`, - body: { - clickId, - checkoutToken, - }, - retries: 5, + redis.hset(`shopify:checkout:${checkoutToken}`, { + clickId, }), ); From 858e33113feda201926f36ee49905c283b6ca52c Mon Sep 17 00:00:00 2001 From: Kiran K Date: Sun, 29 Dec 2024 23:43:58 +0530 Subject: [PATCH 06/15] format --- packages/utils/src/constants/integrations.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/utils/src/constants/integrations.ts b/packages/utils/src/constants/integrations.ts index 2b06e8225a..be141540a4 100644 --- a/packages/utils/src/constants/integrations.ts +++ b/packages/utils/src/constants/integrations.ts @@ -1,3 +1,3 @@ export const SLACK_INTEGRATION_ID = "clzu59rx9000110bm5fnlzwuj"; export const STRIPE_INTEGRATION_ID = "clzra1ya60001wnj4a89zcg9h"; -export const SHOPIFY_INTEGRATION_ID = "int_edz1HUu12L3kL7s9kLRLycuM"; \ No newline at end of file +export const SHOPIFY_INTEGRATION_ID = "int_edz1HUu12L3kL7s9kLRLycuM"; From 428359c57abe0c607768dd52ec83dc754e3aa62d Mon Sep 17 00:00:00 2001 From: Kiran K Date: Mon, 30 Dec 2024 12:20:55 +0530 Subject: [PATCH 07/15] handle mandate webhook --- .../webhook/customers-data-request.ts | 34 +++++++++++++ .../integration/webhook/customers-redact.ts | 34 +++++++++++++ .../shopify/integration/webhook/order-paid.ts | 11 ++-- .../api/shopify/integration/webhook/route.ts | 51 ++++++++++++++++--- .../integration/webhook/shop-redact.ts | 20 ++++++++ .../api/shopify/integration/webhook/utils.ts | 18 ------- 6 files changed, 136 insertions(+), 32 deletions(-) create mode 100644 apps/web/app/api/shopify/integration/webhook/customers-data-request.ts create mode 100644 apps/web/app/api/shopify/integration/webhook/customers-redact.ts create mode 100644 apps/web/app/api/shopify/integration/webhook/shop-redact.ts delete mode 100644 apps/web/app/api/shopify/integration/webhook/utils.ts diff --git a/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts new file mode 100644 index 0000000000..0fb2e84e4f --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts @@ -0,0 +1,34 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), + orders_requested: z.array(z.number()), + customer: z.object({ + id: z.number(), + email: z.string(), + phone: z.string(), + }), +}); + +export async function customersDataRequest({ body }: { body: any }) { + const { + customer, + shop_domain: shopDomain, + orders_requested: ordersRequested, + } = schema.parse(body); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Customer Data Request received", + text: `Customer Data Request received for shop: ${shopDomain}. + Customer ID: ${customer.id}, + Email: ${customer.email}, + Phone: ${customer.phone}, + Orders Requested: ${ordersRequested.join(", ")}`, + }), + ); +} diff --git a/apps/web/app/api/shopify/integration/webhook/customers-redact.ts b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts new file mode 100644 index 0000000000..511766e725 --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts @@ -0,0 +1,34 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), + orders_to_redact: z.array(z.number()), + customer: z.object({ + id: z.number(), + email: z.string(), + phone: z.string(), + }), +}); + +export async function customersRedact({ body }: { body: any }) { + const { + customer, + shop_domain: shopDomain, + orders_to_redact: ordersToRedact, + } = schema.parse(body); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Customer Redacted request received", + text: `Customer Redacted request received for shop: ${shopDomain}. + Customer ID: ${customer.id}, + Email: ${customer.email}, + Phone: ${customer.phone}, + Orders to Redact: ${ordersToRedact.join(", ")}`, + }), + ); +} diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index 580b1b1bc6..8385478d1e 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -2,20 +2,17 @@ import { qstash } from "@/lib/cron"; import { redis } from "@/lib/upstash"; import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; -// TODO: -// Instead of checkout_token, maybe use order_number or checkout_id - export async function orderPaid({ - order, + body, workspaceId, }: { - order: any; + body: any; workspaceId: string; }) { - const checkoutToken = order.checkout_token; + const checkoutToken = body.checkout_token; await redis.hset(`shopify:checkout:${checkoutToken}`, { - order, + order: body, }); await qstash.publishJSON({ diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts index f08f4198f4..40b57af1d8 100644 --- a/apps/web/app/api/shopify/integration/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -1,10 +1,20 @@ import { prisma } from "@dub/prisma"; import { log } from "@dub/utils"; +import crypto from "crypto"; import { NextResponse } from "next/server"; +import { customersDataRequest } from "./customers-data-request"; +import { customersRedact } from "./customers-redact"; import { orderPaid } from "./order-paid"; -import { verifyShopifySignature } from "./utils"; +import { shopRedact } from "./shop-redact"; -const relevantTopics = new Set(["orders/paid"]); +const relevantTopics = new Set([ + "orders/paid", + + // Mandatory compliance webhooks + "customers/data_request", + "customers/redact", + "shop/redact", +]); // POST /api/shopify/integration/webhook – Listen to Shopify webhook events export const POST = async (req: Request) => { @@ -12,17 +22,29 @@ export const POST = async (req: Request) => { const headers = req.headers; const topic = headers.get("x-shopify-topic") || ""; const signature = headers.get("x-shopify-hmac-sha256") || ""; - const shop = headers.get("x-shopify-shop-domain") || ""; + const shopDomain = headers.get("x-shopify-shop-domain") || ""; - await verifyShopifySignature({ body, signature }); + // Verify signature + const generatedSignature = crypto + .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) + .update(JSON.stringify(body)) + .digest("base64"); + if (generatedSignature !== signature) { + return new Response(`[Shopify] Invalid signature. Skipping...`, { + status: 401, + }); + } + + // Check if topic is relevant if (!relevantTopics.has(topic)) { return new Response(`[Shopify] Unsupported topic: ${topic}. Skipping...`); } + // Find workspace const workspace = await prisma.project.findUnique({ where: { - shopifyStoreId: shop, + shopifyStoreId: shopDomain, }, select: { id: true, @@ -31,7 +53,7 @@ export const POST = async (req: Request) => { if (!workspace) { return new Response( - `[Shopify] Workspace not found for shop: ${shop}. Skipping...`, + `[Shopify] Workspace not found for shop: ${shopDomain}. Skipping...`, ); } @@ -39,10 +61,25 @@ export const POST = async (req: Request) => { switch (topic) { case "orders/paid": await orderPaid({ - order: body, + body, workspaceId: workspace.id, }); break; + case "customers/data_request": + await customersDataRequest({ + body, + }); + break; + case "customers/redact": + await customersRedact({ + body, + }); + break; + case "shop/redact": + await shopRedact({ + body, + }); + break; } } catch (error) { await log({ diff --git a/apps/web/app/api/shopify/integration/webhook/shop-redact.ts b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts new file mode 100644 index 0000000000..d4fb50b131 --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts @@ -0,0 +1,20 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), +}); + +export async function shopRedact({ body }: { body: any }) { + const { shop_domain: shopDomain } = schema.parse(body); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Shop Redacted request received", + text: `Shop Redacted request received for shop: ${shopDomain}`, + }), + ); +} diff --git a/apps/web/app/api/shopify/integration/webhook/utils.ts b/apps/web/app/api/shopify/integration/webhook/utils.ts deleted file mode 100644 index 2212aad314..0000000000 --- a/apps/web/app/api/shopify/integration/webhook/utils.ts +++ /dev/null @@ -1,18 +0,0 @@ -import crypto from "crypto"; - -export const verifyShopifySignature = async ({ - body, - signature, -}: { - body: Record; - signature: string; -}) => { - const generatedSignature = crypto - .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) - .update(JSON.stringify(body)) - .digest("base64"); - - if (generatedSignature !== signature) { - throw new Error("Invalid webhook signature."); - } -}; From 95813bcea17826bebc8e27e0ca73cefc8e8fbaa5 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Mon, 30 Dec 2024 13:07:40 +0530 Subject: [PATCH 08/15] update webhooks --- .../app/api/cron/shopify/order-paid/route.ts | 42 +++++++++++-------- .../integration/webhook/app-uninstalled.ts | 17 ++++++++ .../shopify/integration/webhook/order-paid.ts | 7 +++- .../api/shopify/integration/webhook/route.ts | 15 +++++-- 4 files changed, 59 insertions(+), 22 deletions(-) create mode 100644 apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts diff --git a/apps/web/app/api/cron/shopify/order-paid/route.ts b/apps/web/app/api/cron/shopify/order-paid/route.ts index 9969d15568..0be70737f8 100644 --- a/apps/web/app/api/cron/shopify/order-paid/route.ts +++ b/apps/web/app/api/cron/shopify/order-paid/route.ts @@ -1,6 +1,6 @@ import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; -import { createSaleData } from "@/lib/api/sales/sale"; +import { createSaleData } from "@/lib/api/sales/create-sale-data"; import { createId } from "@/lib/api/utils"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; import { generateRandomName } from "@/lib/names"; @@ -270,33 +270,39 @@ const createSale = async ({ // for program links if (link.programId) { - const { program, partner } = + const { program, partnerId, commissionAmount } = await prisma.programEnrollment.findUniqueOrThrow({ where: { linkId, }, select: { program: true, - partner: { - select: { - id: true, - }, - }, + partnerId: true, + commissionAmount: true, }, }); const saleRecord = createSaleData({ - customerId, - linkId, - clickId, - invoiceId, - eventId, - paymentProcessor, - amount, - currency, - partnerId: partner.id, program, - metadata: order, + partner: { + id: partnerId, + commissionAmount, + }, + customer: { + id: customerId, + linkId, + clickId, + }, + sale: { + amount, + currency, + invoiceId, + eventId, + paymentProcessor, + }, + metadata: { + ...order, + }, }); await Promise.allSettled([ @@ -306,7 +312,7 @@ const createSale = async ({ notifyPartnerSale({ partner: { - id: partner.id, + id: partnerId, referralLink: link.shortLink, }, program, diff --git a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts new file mode 100644 index 0000000000..c7d95a8bc6 --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts @@ -0,0 +1,17 @@ +import { redis } from "@/lib/upstash"; +import { prisma } from "@dub/prisma"; + +export async function appUninstalled({ shopDomain }: { shopDomain: string }) { + await Promise.all([ + prisma.project.update({ + where: { + shopifyStoreId: shopDomain, + }, + data: { + shopifyStoreId: null, + }, + }), + + redis.del(`shopify:shop:${shopDomain}`), + ]); +} diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index 8385478d1e..9c94a00394 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -1,6 +1,11 @@ import { qstash } from "@/lib/cron"; import { redis } from "@/lib/upstash"; import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; +import { z } from "zod"; + +const schema = z.object({ + checkout_token: z.string(), +}); export async function orderPaid({ body, @@ -9,7 +14,7 @@ export async function orderPaid({ body: any; workspaceId: string; }) { - const checkoutToken = body.checkout_token; + const { checkout_token: checkoutToken } = schema.parse(body); await redis.hset(`shopify:checkout:${checkoutToken}`, { order: body, diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts index 40b57af1d8..a230d9c2d4 100644 --- a/apps/web/app/api/shopify/integration/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -2,6 +2,7 @@ import { prisma } from "@dub/prisma"; import { log } from "@dub/utils"; import crypto from "crypto"; import { NextResponse } from "next/server"; +import { appUninstalled } from "./app-uninstalled"; import { customersDataRequest } from "./customers-data-request"; import { customersRedact } from "./customers-redact"; import { orderPaid } from "./order-paid"; @@ -11,6 +12,7 @@ const relevantTopics = new Set([ "orders/paid", // Mandatory compliance webhooks + "app/uninstalled", "customers/data_request", "customers/redact", "shop/redact", @@ -18,16 +20,15 @@ const relevantTopics = new Set([ // POST /api/shopify/integration/webhook – Listen to Shopify webhook events export const POST = async (req: Request) => { - const body = await req.json(); + const data = await req.text(); const headers = req.headers; const topic = headers.get("x-shopify-topic") || ""; const signature = headers.get("x-shopify-hmac-sha256") || ""; - const shopDomain = headers.get("x-shopify-shop-domain") || ""; // Verify signature const generatedSignature = crypto .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) - .update(JSON.stringify(body)) + .update(data, "utf8") .digest("base64"); if (generatedSignature !== signature) { @@ -41,6 +42,9 @@ export const POST = async (req: Request) => { return new Response(`[Shopify] Unsupported topic: ${topic}. Skipping...`); } + const body = JSON.parse(data); + const shopDomain = headers.get("x-shopify-shop-domain") || ""; + // Find workspace const workspace = await prisma.project.findUnique({ where: { @@ -80,6 +84,11 @@ export const POST = async (req: Request) => { body, }); break; + case "app/uninstalled": + await appUninstalled({ + shopDomain, + }); + break; } } catch (error) { await log({ From 9e2320eba790c35d4466aa7d97cca7f2103507f2 Mon Sep 17 00:00:00 2001 From: Steven Tey Date: Mon, 30 Dec 2024 12:47:51 -0800 Subject: [PATCH 09/15] update shopify integration ID --- apps/web/app/api/cron/year-in-review/route.ts | 2 +- packages/utils/src/constants/integrations.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/web/app/api/cron/year-in-review/route.ts b/apps/web/app/api/cron/year-in-review/route.ts index 041c2cf716..8d93d6be7e 100644 --- a/apps/web/app/api/cron/year-in-review/route.ts +++ b/apps/web/app/api/cron/year-in-review/route.ts @@ -10,7 +10,7 @@ export const dynamic = "force-dynamic"; const BATCH_SIZE = 100; // POST /api/cron/year-in-review -export async function POST(req: Request) { +export async function POST() { try { if (process.env.VERCEL === "1") { return new Response("Not available in production."); diff --git a/packages/utils/src/constants/integrations.ts b/packages/utils/src/constants/integrations.ts index be141540a4..67de874ace 100644 --- a/packages/utils/src/constants/integrations.ts +++ b/packages/utils/src/constants/integrations.ts @@ -1,3 +1,3 @@ export const SLACK_INTEGRATION_ID = "clzu59rx9000110bm5fnlzwuj"; export const STRIPE_INTEGRATION_ID = "clzra1ya60001wnj4a89zcg9h"; -export const SHOPIFY_INTEGRATION_ID = "int_edz1HUu12L3kL7s9kLRLycuM"; +export const SHOPIFY_INTEGRATION_ID = "int_iWOtrZgmcyU6XDwKr4AYYqLN"; From 2513e46a9fdf1865394d6cc19a5cae782a65ea95 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Tue, 31 Dec 2024 14:00:25 +0530 Subject: [PATCH 10/15] improve the order processing --- .../app/api/cron/shopify/order-paid/route.ts | 312 ++---------------- .../webhook/customers-data-request.ts | 4 +- .../integration/webhook/customers-redact.ts | 4 +- .../shopify/integration/webhook/order-paid.ts | 42 ++- .../api/shopify/integration/webhook/route.ts | 10 +- .../integration/webhook/shop-redact.ts | 4 +- .../lib/integrations/shopify/process-order.ts | 286 ++++++++++++++++ apps/web/lib/integrations/shopify/schema.ts | 11 + 8 files changed, 363 insertions(+), 310 deletions(-) create mode 100644 apps/web/lib/integrations/shopify/process-order.ts create mode 100644 apps/web/lib/integrations/shopify/schema.ts diff --git a/apps/web/app/api/cron/shopify/order-paid/route.ts b/apps/web/app/api/cron/shopify/order-paid/route.ts index 0be70737f8..8e96582a3b 100644 --- a/apps/web/app/api/cron/shopify/order-paid/route.ts +++ b/apps/web/app/api/cron/shopify/order-paid/route.ts @@ -1,21 +1,8 @@ import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; -import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; -import { createSaleData } from "@/lib/api/sales/create-sale-data"; -import { createId } from "@/lib/api/utils"; import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; -import { generateRandomName } from "@/lib/names"; -import { - getClickEvent, - getLeadEvent, - recordLead, - recordSale, -} from "@/lib/tinybird"; +import { processOrder } from "@/lib/integrations/shopify/process-order"; import { redis } from "@/lib/upstash"; -import z from "@/lib/zod"; -import { leadEventSchemaTB } from "@/lib/zod/schemas/leads"; -import { prisma } from "@dub/prisma"; -import { nanoid } from "@dub/utils"; -import { Customer } from "@prisma/client"; +import { z } from "zod"; export const dynamic = "force-dynamic"; @@ -24,15 +11,6 @@ const schema = z.object({ checkoutToken: z.string(), }); -const orderSchema = z.object({ - total_price: z.string(), - currency: z.string(), - confirmation_number: z.string(), - customer: z.object({ - id: z.number(), - }), -}); - // POST /api/cron/shopify/order-paid export async function POST(req: Request) { try { @@ -42,64 +20,37 @@ export async function POST(req: Request) { const { workspaceId, checkoutToken } = schema.parse(body); // Find Shopify order - const order = await redis.hget( + const event = await redis.hget( `shopify:checkout:${checkoutToken}`, "order", ); - if (!order) { - return new Response(`[Shopify] Shopify order not found. Skipping...`); - } - - const orderData = orderSchema.parse(order); - const externalId = orderData.customer.id.toString(); - - // Fetch customer - const customer: Customer | null = await prisma.customer.findUnique({ - where: { - projectId_externalId: { - projectId: workspaceId, - externalId, - }, - }, - }); - - if (customer) { - // Existing customer - await createSale({ - order, - workspaceId, - customerId: customer.id, - }); - } else { - const clickId = await redis.hget( - `shopify:checkout:${checkoutToken}`, - "clickId", + if (!event) { + return new Response( + `[Shopify] Order with checkout token ${checkoutToken} not found. Skipping...`, ); + } - if (!clickId) { - // Wait for the click event to come from Shopify pixel - throw new DubApiError({ - code: "bad_request", - message: - "[Shopify] Click event not found. Waiting for Shopify pixel event...", - }); - } - - const leadData = await createLead({ - clickId, - externalId, - workspaceId, - }); + const clickId = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "clickId", + ); - await createSale({ - order, - workspaceId, - leadData, - customerId: leadData.customer_id, + // Wait for the click event to come from Shopify pixel + if (!clickId) { + throw new DubApiError({ + code: "bad_request", + message: + "[Shopify] Click event not found. Waiting for Shopify pixel event...", }); } + await processOrder({ + event, + workspaceId, + clickId, + }); + await redis.del(`shopify:checkout:${checkoutToken}`); return new Response("[Shopify] Order event processed successfully."); @@ -107,220 +58,3 @@ export async function POST(req: Request) { return handleAndReturnErrorResponse(error); } } - -const createLead = async ({ - clickId, - externalId, - workspaceId, -}: { - clickId: string; - externalId: string; - workspaceId: string; -}) => { - // find click - const clickEvent = await getClickEvent({ clickId }); - - const clickData = clickEvent.data[0]; - const { link_id: linkId, country, timestamp } = clickData; - - // create customer - const customer = await prisma.customer.create({ - data: { - id: createId({ prefix: "cus_" }), - name: generateRandomName(), - externalId, - projectId: workspaceId, - clickedAt: new Date(timestamp + "Z"), - clickId, - linkId, - country, - }, - }); - - const leadData = leadEventSchemaTB.parse({ - ...clickData, - event_id: nanoid(16), - event_name: "Account created", - customer_id: customer.id, - }); - - await Promise.all([ - // record lead - recordLead(leadData), - - // update link leads count - prisma.link.update({ - where: { - id: linkId, - }, - data: { - leads: { - increment: 1, - }, - }, - }), - - // update workspace usage - prisma.project.update({ - where: { - id: workspaceId, - }, - data: { - usage: { - increment: 1, - }, - }, - }), - ]); - - return leadData; -}; - -const createSale = async ({ - order, - customerId, - workspaceId, - leadData, -}: { - order: any; - customerId: string; - workspaceId: string; - leadData?: z.infer; -}) => { - if (!leadData) { - const leadEvent = await getLeadEvent({ customerId }); - - if (!leadEvent || leadEvent.data.length === 0) { - return new Response( - `[Shopify] Lead event with customer ID ${customerId} not found, skipping...`, - ); - } - - leadData = leadEvent.data[0]; - } - - const { link_id: linkId, click_id: clickId } = leadData; - - const orderData = orderSchema.parse(order); - const eventId = nanoid(16); - const paymentProcessor = "shopify"; - - const amount = Number(orderData.total_price) * 100; - const currency = orderData.currency; - const invoiceId = orderData.confirmation_number; - - const sale = await prisma.sale.findFirst({ - where: { - invoiceId, - clickId, - }, - }); - - if (sale) { - return new Response( - `[Shopify] Order has been processed already. Skipping...`, - ); - } - - const saleData = { - ...leadData, - event_id: eventId, - event_name: "Purchase", - payment_processor: paymentProcessor, - amount, - currency, - invoice_id: invoiceId, - metadata: JSON.stringify(order), - }; - - const [_sale, link, _project] = await Promise.all([ - // record sale - recordSale(saleData), - - // update link sales count - prisma.link.update({ - where: { - id: linkId, - }, - data: { - sales: { - increment: 1, - }, - saleAmount: { - increment: amount, - }, - }, - }), - - // update workspace sales usage - prisma.project.update({ - where: { - id: workspaceId, - }, - data: { - usage: { - increment: 1, - }, - salesUsage: { - increment: amount, - }, - }, - }), - ]); - - // for program links - if (link.programId) { - const { program, partnerId, commissionAmount } = - await prisma.programEnrollment.findUniqueOrThrow({ - where: { - linkId, - }, - select: { - program: true, - partnerId: true, - commissionAmount: true, - }, - }); - - const saleRecord = createSaleData({ - program, - partner: { - id: partnerId, - commissionAmount, - }, - customer: { - id: customerId, - linkId, - clickId, - }, - sale: { - amount, - currency, - invoiceId, - eventId, - paymentProcessor, - }, - metadata: { - ...order, - }, - }); - - await Promise.allSettled([ - prisma.sale.create({ - data: saleRecord, - }), - - notifyPartnerSale({ - partner: { - id: partnerId, - referralLink: link.shortLink, - }, - program, - sale: { - amount: saleRecord.amount, - earnings: saleRecord.earnings, - }, - }), - ]); - } -}; diff --git a/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts index 0fb2e84e4f..1cc1c20f9d 100644 --- a/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts +++ b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts @@ -12,12 +12,12 @@ const schema = z.object({ }), }); -export async function customersDataRequest({ body }: { body: any }) { +export async function customersDataRequest({ event }: { event: any }) { const { customer, shop_domain: shopDomain, orders_requested: ordersRequested, - } = schema.parse(body); + } = schema.parse(event); waitUntil( sendEmail({ diff --git a/apps/web/app/api/shopify/integration/webhook/customers-redact.ts b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts index 511766e725..c12ff0c7f9 100644 --- a/apps/web/app/api/shopify/integration/webhook/customers-redact.ts +++ b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts @@ -12,12 +12,12 @@ const schema = z.object({ }), }); -export async function customersRedact({ body }: { body: any }) { +export async function customersRedact({ event }: { event: any }) { const { customer, shop_domain: shopDomain, orders_to_redact: ordersToRedact, - } = schema.parse(body); + } = schema.parse(event); waitUntil( sendEmail({ diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index 9c94a00394..83b5906668 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -1,23 +1,45 @@ import { qstash } from "@/lib/cron"; +import { processOrder } from "@/lib/integrations/shopify/process-order"; +import { orderSchema } from "@/lib/integrations/shopify/schema"; import { redis } from "@/lib/upstash"; +import { prisma } from "@dub/prisma"; import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; -import { z } from "zod"; - -const schema = z.object({ - checkout_token: z.string(), -}); export async function orderPaid({ - body, + event, workspaceId, }: { - body: any; + event: any; workspaceId: string; }) { - const { checkout_token: checkoutToken } = schema.parse(body); + const { + customer: { id: externalId }, + checkout_token: checkoutToken, + } = orderSchema.parse(event); + + const customer = await prisma.customer.findUnique({ + where: { + projectId_externalId: { + projectId: workspaceId, + externalId: externalId.toString(), + }, + }, + }); + + // customer is found, process the order right away + if (customer) { + await processOrder({ + event, + workspaceId, + customerId: customer.id, + }); + + return; + } + // if no customer is found, wait for the Pixel event to come in so that we can decide if the order is from a Dub link or not await redis.hset(`shopify:checkout:${checkoutToken}`, { - order: body, + order: event, }); await qstash.publishJSON({ @@ -26,7 +48,7 @@ export async function orderPaid({ checkoutToken, workspaceId, }, - retries: 3, + retries: 5, delay: 3, }); } diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts index a230d9c2d4..d9918331af 100644 --- a/apps/web/app/api/shopify/integration/webhook/route.ts +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -42,7 +42,7 @@ export const POST = async (req: Request) => { return new Response(`[Shopify] Unsupported topic: ${topic}. Skipping...`); } - const body = JSON.parse(data); + const event = JSON.parse(data); const shopDomain = headers.get("x-shopify-shop-domain") || ""; // Find workspace @@ -65,23 +65,23 @@ export const POST = async (req: Request) => { switch (topic) { case "orders/paid": await orderPaid({ - body, + event, workspaceId: workspace.id, }); break; case "customers/data_request": await customersDataRequest({ - body, + event, }); break; case "customers/redact": await customersRedact({ - body, + event, }); break; case "shop/redact": await shopRedact({ - body, + event, }); break; case "app/uninstalled": diff --git a/apps/web/app/api/shopify/integration/webhook/shop-redact.ts b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts index d4fb50b131..26fb13141a 100644 --- a/apps/web/app/api/shopify/integration/webhook/shop-redact.ts +++ b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts @@ -6,8 +6,8 @@ const schema = z.object({ shop_domain: z.string(), }); -export async function shopRedact({ body }: { body: any }) { - const { shop_domain: shopDomain } = schema.parse(body); +export async function shopRedact({ event }: { event: any }) { + const { shop_domain: shopDomain } = schema.parse(event); waitUntil( sendEmail({ diff --git a/apps/web/lib/integrations/shopify/process-order.ts b/apps/web/lib/integrations/shopify/process-order.ts new file mode 100644 index 0000000000..9ff0cd0a03 --- /dev/null +++ b/apps/web/lib/integrations/shopify/process-order.ts @@ -0,0 +1,286 @@ +import { handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; +import { createSaleData } from "@/lib/api/sales/create-sale-data"; +import { createId } from "@/lib/api/utils"; +import { generateRandomName } from "@/lib/names"; +import { + getClickEvent, + getLeadEvent, + recordLead, + recordSale, +} from "@/lib/tinybird"; +import z from "@/lib/zod"; +import { leadEventSchemaTB } from "@/lib/zod/schemas/leads"; +import { prisma } from "@dub/prisma"; +import { nanoid } from "@dub/utils"; +import { orderSchema } from "./schema"; + +async function createLead({ + clickId, + externalId, + workspaceId, +}: { + clickId: string; + externalId: string; + workspaceId: string; +}) { + // find click + const clickEvent = await getClickEvent({ clickId }); + + const clickData = clickEvent.data[0]; + const { link_id: linkId, country, timestamp } = clickData; + + // create customer + const customer = await prisma.customer.create({ + data: { + id: createId({ prefix: "cus_" }), + name: generateRandomName(), + externalId, + projectId: workspaceId, + clickedAt: new Date(timestamp + "Z"), + clickId, + linkId, + country, + }, + }); + + const leadData = leadEventSchemaTB.parse({ + ...clickData, + event_id: nanoid(16), + event_name: "Account created", + customer_id: customer.id, + }); + + await Promise.all([ + // record lead + recordLead(leadData), + + // update link leads count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + leads: { + increment: 1, + }, + }, + }), + + // update workspace usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + }, + }), + ]); + + return leadData; +} + +async function createSale({ + order, + customerId, + workspaceId, + leadData, +}: { + order: any; + customerId: string; + workspaceId: string; + leadData: z.infer; +}) { + const orderData = orderSchema.parse(order); + const eventId = nanoid(16); + const paymentProcessor = "shopify"; + + const amount = Number(orderData.total_price) * 100; + const currency = orderData.currency; + const invoiceId = orderData.confirmation_number; + + const { link_id: linkId, click_id: clickId } = leadData; + + const sale = await prisma.sale.findFirst({ + where: { + invoiceId, + clickId, + }, + }); + + if (sale) { + return new Response( + `[Shopify] Order has been processed already. Skipping...`, + ); + } + + const saleData = { + ...leadData, + event_id: eventId, + event_name: "Purchase", + payment_processor: paymentProcessor, + amount, + currency, + invoice_id: invoiceId, + metadata: JSON.stringify(order), + }; + + const [_sale, link, _project] = await Promise.all([ + // record sale + recordSale(saleData), + + // update link sales count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + sales: { + increment: 1, + }, + saleAmount: { + increment: amount, + }, + }, + }), + + // update workspace sales usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + salesUsage: { + increment: amount, + }, + }, + }), + ]); + + // for program links + if (link.programId) { + const { program, partnerId, commissionAmount } = + await prisma.programEnrollment.findUniqueOrThrow({ + where: { + linkId, + }, + select: { + program: true, + partnerId: true, + commissionAmount: true, + }, + }); + + const saleRecord = createSaleData({ + program, + partner: { + id: partnerId, + commissionAmount, + }, + customer: { + id: customerId, + linkId, + clickId, + }, + sale: { + amount, + currency, + invoiceId, + eventId, + paymentProcessor, + }, + metadata: { + ...order, + }, + }); + + await Promise.allSettled([ + prisma.sale.create({ + data: saleRecord, + }), + + notifyPartnerSale({ + partner: { + id: partnerId, + referralLink: link.shortLink, + }, + program, + sale: { + amount: saleRecord.amount, + earnings: saleRecord.earnings, + }, + }), + ]); + } +} + +// Process the order from Shopify webhook +export async function processOrder({ + event, + workspaceId, + customerId, + clickId, +}: { + event: unknown; + workspaceId: string; + customerId?: string; // ID of the customer in Dub + clickId?: string; // ID of the click event from Shopify pixel +}) { + try { + const order = orderSchema.parse(event); + + // for existing customer + if (customerId) { + const leadEvent = await getLeadEvent({ customerId }); + + if (!leadEvent || leadEvent.data.length === 0) { + return new Response( + `[Shopify] Lead event with customer ID ${customerId} not found, skipping...`, + ); + } + + const leadData = leadEvent.data[0]; + + await createSale({ + leadData, + order, + workspaceId, + customerId, + }); + + return; + } + + // for new customer + if (clickId) { + const { + customer: { id: externalId }, + } = orderSchema.parse(event); + + const leadData = await createLead({ + clickId, + workspaceId, + externalId: externalId.toString(), + }); + + const { customer_id: customerId } = leadData; + + await createSale({ + leadData, + order, + workspaceId, + customerId, + }); + } + + return new Response("[Shopify] Order event processed successfully."); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/lib/integrations/shopify/schema.ts b/apps/web/lib/integrations/shopify/schema.ts new file mode 100644 index 0000000000..ced02b6b87 --- /dev/null +++ b/apps/web/lib/integrations/shopify/schema.ts @@ -0,0 +1,11 @@ +import { z } from "zod"; + +export const orderSchema = z.object({ + total_price: z.string(), + currency: z.string(), + confirmation_number: z.string(), + checkout_token: z.string(), + customer: z.object({ + id: z.number(), + }), +}); From 4890adb1492bdf0a551787a47df81908bd376047 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Tue, 31 Dec 2024 18:18:58 +0530 Subject: [PATCH 11/15] Update order-paid.ts --- .../shopify/integration/webhook/order-paid.ts | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts index 83b5906668..7dc5e193bf 100644 --- a/apps/web/app/api/shopify/integration/webhook/order-paid.ts +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -37,7 +37,26 @@ export async function orderPaid({ return; } - // if no customer is found, wait for the Pixel event to come in so that we can decide if the order is from a Dub link or not + // Check the cache to see the pixel event for this checkout token exist before publishing the event to the queue + const clickId = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "clickId", + ); + + // clickId is found, process the order for the new customer + if (clickId) { + await processOrder({ + event, + workspaceId, + clickId, + }); + + await redis.del(`shopify:checkout:${checkoutToken}`); + + return; + } + + // Wait for the pixel event to come in so that we can decide if the order is from a Dub link or not await redis.hset(`shopify:checkout:${checkoutToken}`, { order: event, }); From a32c8e410d7f4e903e48e4619646a960fef42897 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Tue, 31 Dec 2024 18:24:29 +0530 Subject: [PATCH 12/15] use subtotal_price --- apps/web/lib/integrations/shopify/process-order.ts | 2 +- apps/web/lib/integrations/shopify/schema.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/web/lib/integrations/shopify/process-order.ts b/apps/web/lib/integrations/shopify/process-order.ts index 9ff0cd0a03..bbdb61cffa 100644 --- a/apps/web/lib/integrations/shopify/process-order.ts +++ b/apps/web/lib/integrations/shopify/process-order.ts @@ -98,7 +98,7 @@ async function createSale({ const eventId = nanoid(16); const paymentProcessor = "shopify"; - const amount = Number(orderData.total_price) * 100; + const amount = Number(orderData.subtotal_price) * 100; const currency = orderData.currency; const invoiceId = orderData.confirmation_number; diff --git a/apps/web/lib/integrations/shopify/schema.ts b/apps/web/lib/integrations/shopify/schema.ts index ced02b6b87..9e32fd41bb 100644 --- a/apps/web/lib/integrations/shopify/schema.ts +++ b/apps/web/lib/integrations/shopify/schema.ts @@ -1,7 +1,7 @@ import { z } from "zod"; export const orderSchema = z.object({ - total_price: z.string(), + subtotal_price: z.string(), currency: z.string(), confirmation_number: z.string(), checkout_token: z.string(), From 3145e5e7286aff5899fb112172f7819f652ea5ed Mon Sep 17 00:00:00 2001 From: Kiran K Date: Tue, 31 Dec 2024 18:42:14 +0530 Subject: [PATCH 13/15] use current_subtotal_price --- apps/web/lib/integrations/shopify/process-order.ts | 2 +- apps/web/lib/integrations/shopify/schema.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/web/lib/integrations/shopify/process-order.ts b/apps/web/lib/integrations/shopify/process-order.ts index bbdb61cffa..01154bc84c 100644 --- a/apps/web/lib/integrations/shopify/process-order.ts +++ b/apps/web/lib/integrations/shopify/process-order.ts @@ -98,7 +98,7 @@ async function createSale({ const eventId = nanoid(16); const paymentProcessor = "shopify"; - const amount = Number(orderData.subtotal_price) * 100; + const amount = Number(orderData.current_subtotal_price) * 100; const currency = orderData.currency; const invoiceId = orderData.confirmation_number; diff --git a/apps/web/lib/integrations/shopify/schema.ts b/apps/web/lib/integrations/shopify/schema.ts index 9e32fd41bb..2db2e3918b 100644 --- a/apps/web/lib/integrations/shopify/schema.ts +++ b/apps/web/lib/integrations/shopify/schema.ts @@ -1,7 +1,7 @@ import { z } from "zod"; export const orderSchema = z.object({ - subtotal_price: z.string(), + current_subtotal_price: z.string(), currency: z.string(), confirmation_number: z.string(), checkout_token: z.string(), From 8e3df44b31411ec9e8cb653314e39a7fb7c8a4f9 Mon Sep 17 00:00:00 2001 From: Steven Tey Date: Wed, 1 Jan 2025 17:20:24 -0800 Subject: [PATCH 14/15] Update app-uninstalled.ts --- .../integration/webhook/app-uninstalled.ts | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts index c7d95a8bc6..c203d01927 100644 --- a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts +++ b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts @@ -1,17 +1,12 @@ -import { redis } from "@/lib/upstash"; import { prisma } from "@dub/prisma"; export async function appUninstalled({ shopDomain }: { shopDomain: string }) { - await Promise.all([ - prisma.project.update({ - where: { - shopifyStoreId: shopDomain, - }, - data: { - shopifyStoreId: null, - }, - }), - - redis.del(`shopify:shop:${shopDomain}`), - ]); + await prisma.project.update({ + where: { + shopifyStoreId: shopDomain, + }, + data: { + shopifyStoreId: null, + }, + }); } From 32b71ed3ecdb81ac8ed2aa14197e16830e4c2f8b Mon Sep 17 00:00:00 2001 From: Steven Tey Date: Wed, 1 Jan 2025 17:29:35 -0800 Subject: [PATCH 15/15] revert --- .../integration/webhook/app-uninstalled.ts | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts index c203d01927..156d23e6e5 100644 --- a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts +++ b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts @@ -1,12 +1,16 @@ +import { redis } from "@/lib/upstash"; import { prisma } from "@dub/prisma"; export async function appUninstalled({ shopDomain }: { shopDomain: string }) { - await prisma.project.update({ - where: { - shopifyStoreId: shopDomain, - }, - data: { - shopifyStoreId: null, - }, - }); + await Promise.all([ + prisma.project.update({ + where: { + shopifyStoreId: shopDomain, + }, + data: { + shopifyStoreId: null, + }, + }), + redis.del(`shopify:shop:${shopDomain}`), + ]); }