diff --git a/apps/web/app/api/cron/shopify/order-paid/route.ts b/apps/web/app/api/cron/shopify/order-paid/route.ts new file mode 100644 index 0000000000..8e96582a3b --- /dev/null +++ b/apps/web/app/api/cron/shopify/order-paid/route.ts @@ -0,0 +1,60 @@ +import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { verifyQstashSignature } from "@/lib/cron/verify-qstash"; +import { processOrder } from "@/lib/integrations/shopify/process-order"; +import { redis } from "@/lib/upstash"; +import { z } from "zod"; + +export const dynamic = "force-dynamic"; + +const schema = z.object({ + workspaceId: z.string(), + checkoutToken: z.string(), +}); + +// POST /api/cron/shopify/order-paid +export async function POST(req: Request) { + try { + const body = await req.json(); + await verifyQstashSignature(req, body); + + const { workspaceId, checkoutToken } = schema.parse(body); + + // Find Shopify order + const event = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "order", + ); + + if (!event) { + return new Response( + `[Shopify] Order with checkout token ${checkoutToken} not found. Skipping...`, + ); + } + + const clickId = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "clickId", + ); + + // Wait for the click event to come from Shopify pixel + if (!clickId) { + throw new DubApiError({ + code: "bad_request", + message: + "[Shopify] Click event not found. Waiting for Shopify pixel event...", + }); + } + + await processOrder({ + event, + workspaceId, + clickId, + }); + + await redis.del(`shopify:checkout:${checkoutToken}`); + + return new Response("[Shopify] Order event processed successfully."); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/app/api/cron/year-in-review/route.ts b/apps/web/app/api/cron/year-in-review/route.ts index 041c2cf716..8d93d6be7e 100644 --- a/apps/web/app/api/cron/year-in-review/route.ts +++ b/apps/web/app/api/cron/year-in-review/route.ts @@ -10,7 +10,7 @@ export const dynamic = "force-dynamic"; const BATCH_SIZE = 100; // POST /api/cron/year-in-review -export async function POST(req: Request) { +export async function POST() { try { if (process.env.VERCEL === "1") { return new Response("Not available in production."); diff --git a/apps/web/app/api/oauth/userinfo/route.ts b/apps/web/app/api/oauth/userinfo/route.ts index cb15d13369..7fd3bccab8 100644 --- a/apps/web/app/api/oauth/userinfo/route.ts +++ b/apps/web/app/api/oauth/userinfo/route.ts @@ -30,6 +30,7 @@ export async function GET(req: NextRequest) { }, project: { select: { + id: true, name: true, slug: true, logo: true, @@ -52,6 +53,7 @@ export async function GET(req: NextRequest) { name: user.name, image: user.image, workspace: { + id: `ws_${tokenRecord.project.id}`, slug: tokenRecord.project.slug, name: tokenRecord.project.name, logo: tokenRecord.project.logo, diff --git a/apps/web/app/api/shopify/integration/callback/route.ts b/apps/web/app/api/shopify/integration/callback/route.ts new file mode 100644 index 0000000000..564bd6da2d --- /dev/null +++ b/apps/web/app/api/shopify/integration/callback/route.ts @@ -0,0 +1,91 @@ +import { DubApiError } from "@/lib/api/errors"; +import { parseRequestBody } from "@/lib/api/utils"; +import { withWorkspace } from "@/lib/auth"; +import { installIntegration } from "@/lib/integrations/install"; +import { prisma } from "@dub/prisma"; +import { SHOPIFY_INTEGRATION_ID } from "@dub/utils"; +import { waitUntil } from "@vercel/functions"; +import { NextResponse } from "next/server"; +import { z } from "zod"; + +const updateWorkspaceSchema = z.object({ + shopifyStoreId: z.string().nullable(), +}); + +// PATCH /api/shopify/integration/callback – update a shopify store id +export const PATCH = withWorkspace( + async ({ req, workspace, session }) => { + const body = await parseRequestBody(req); + const { shopifyStoreId } = updateWorkspaceSchema.parse(body); + + try { + const response = await prisma.project.update({ + where: { + id: workspace.id, + }, + data: { + shopifyStoreId, + }, + select: { + shopifyStoreId: true, + }, + }); + + waitUntil( + (async () => { + const installation = await prisma.installedIntegration.findUnique({ + where: { + userId_integrationId_projectId: { + userId: session.user.id, + projectId: workspace.id, + integrationId: SHOPIFY_INTEGRATION_ID, + }, + }, + select: { + id: true, + }, + }); + + // Install the integration if it doesn't exist + if (!installation) { + await installIntegration({ + userId: session.user.id, + workspaceId: workspace.id, + integrationId: SHOPIFY_INTEGRATION_ID, + credentials: { + shopifyStoreId, + }, + }); + } + + // Uninstall the integration if the shopify store id is null + if (installation && shopifyStoreId === null) { + await prisma.installedIntegration.delete({ + where: { + id: installation.id, + }, + }); + } + })(), + ); + + return NextResponse.json(response); + } catch (error) { + if (error.code === "P2002") { + throw new DubApiError({ + code: "conflict", + message: `The shopify store "${shopifyStoreId}" is already in use.`, + }); + } + + throw new DubApiError({ + code: "internal_server_error", + message: error.message, + }); + } + }, + { + requiredPermissions: ["workspaces.write"], + requiredAddOn: "conversion", + }, +); diff --git a/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts new file mode 100644 index 0000000000..156d23e6e5 --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/app-uninstalled.ts @@ -0,0 +1,16 @@ +import { redis } from "@/lib/upstash"; +import { prisma } from "@dub/prisma"; + +export async function appUninstalled({ shopDomain }: { shopDomain: string }) { + await Promise.all([ + prisma.project.update({ + where: { + shopifyStoreId: shopDomain, + }, + data: { + shopifyStoreId: null, + }, + }), + redis.del(`shopify:shop:${shopDomain}`), + ]); +} diff --git a/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts new file mode 100644 index 0000000000..1cc1c20f9d --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/customers-data-request.ts @@ -0,0 +1,34 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), + orders_requested: z.array(z.number()), + customer: z.object({ + id: z.number(), + email: z.string(), + phone: z.string(), + }), +}); + +export async function customersDataRequest({ event }: { event: any }) { + const { + customer, + shop_domain: shopDomain, + orders_requested: ordersRequested, + } = schema.parse(event); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Customer Data Request received", + text: `Customer Data Request received for shop: ${shopDomain}. + Customer ID: ${customer.id}, + Email: ${customer.email}, + Phone: ${customer.phone}, + Orders Requested: ${ordersRequested.join(", ")}`, + }), + ); +} diff --git a/apps/web/app/api/shopify/integration/webhook/customers-redact.ts b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts new file mode 100644 index 0000000000..c12ff0c7f9 --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/customers-redact.ts @@ -0,0 +1,34 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), + orders_to_redact: z.array(z.number()), + customer: z.object({ + id: z.number(), + email: z.string(), + phone: z.string(), + }), +}); + +export async function customersRedact({ event }: { event: any }) { + const { + customer, + shop_domain: shopDomain, + orders_to_redact: ordersToRedact, + } = schema.parse(event); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Customer Redacted request received", + text: `Customer Redacted request received for shop: ${shopDomain}. + Customer ID: ${customer.id}, + Email: ${customer.email}, + Phone: ${customer.phone}, + Orders to Redact: ${ordersToRedact.join(", ")}`, + }), + ); +} diff --git a/apps/web/app/api/shopify/integration/webhook/order-paid.ts b/apps/web/app/api/shopify/integration/webhook/order-paid.ts new file mode 100644 index 0000000000..7dc5e193bf --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/order-paid.ts @@ -0,0 +1,73 @@ +import { qstash } from "@/lib/cron"; +import { processOrder } from "@/lib/integrations/shopify/process-order"; +import { orderSchema } from "@/lib/integrations/shopify/schema"; +import { redis } from "@/lib/upstash"; +import { prisma } from "@dub/prisma"; +import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; + +export async function orderPaid({ + event, + workspaceId, +}: { + event: any; + workspaceId: string; +}) { + const { + customer: { id: externalId }, + checkout_token: checkoutToken, + } = orderSchema.parse(event); + + const customer = await prisma.customer.findUnique({ + where: { + projectId_externalId: { + projectId: workspaceId, + externalId: externalId.toString(), + }, + }, + }); + + // customer is found, process the order right away + if (customer) { + await processOrder({ + event, + workspaceId, + customerId: customer.id, + }); + + return; + } + + // Check the cache to see the pixel event for this checkout token exist before publishing the event to the queue + const clickId = await redis.hget( + `shopify:checkout:${checkoutToken}`, + "clickId", + ); + + // clickId is found, process the order for the new customer + if (clickId) { + await processOrder({ + event, + workspaceId, + clickId, + }); + + await redis.del(`shopify:checkout:${checkoutToken}`); + + return; + } + + // Wait for the pixel event to come in so that we can decide if the order is from a Dub link or not + await redis.hset(`shopify:checkout:${checkoutToken}`, { + order: event, + }); + + await qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/shopify/order-paid`, + body: { + checkoutToken, + workspaceId, + }, + retries: 5, + delay: 3, + }); +} diff --git a/apps/web/app/api/shopify/integration/webhook/route.ts b/apps/web/app/api/shopify/integration/webhook/route.ts new file mode 100644 index 0000000000..d9918331af --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/route.ts @@ -0,0 +1,105 @@ +import { prisma } from "@dub/prisma"; +import { log } from "@dub/utils"; +import crypto from "crypto"; +import { NextResponse } from "next/server"; +import { appUninstalled } from "./app-uninstalled"; +import { customersDataRequest } from "./customers-data-request"; +import { customersRedact } from "./customers-redact"; +import { orderPaid } from "./order-paid"; +import { shopRedact } from "./shop-redact"; + +const relevantTopics = new Set([ + "orders/paid", + + // Mandatory compliance webhooks + "app/uninstalled", + "customers/data_request", + "customers/redact", + "shop/redact", +]); + +// POST /api/shopify/integration/webhook – Listen to Shopify webhook events +export const POST = async (req: Request) => { + const data = await req.text(); + const headers = req.headers; + const topic = headers.get("x-shopify-topic") || ""; + const signature = headers.get("x-shopify-hmac-sha256") || ""; + + // Verify signature + const generatedSignature = crypto + .createHmac("sha256", `${process.env.SHOPIFY_WEBHOOK_SECRET}`) + .update(data, "utf8") + .digest("base64"); + + if (generatedSignature !== signature) { + return new Response(`[Shopify] Invalid signature. Skipping...`, { + status: 401, + }); + } + + // Check if topic is relevant + if (!relevantTopics.has(topic)) { + return new Response(`[Shopify] Unsupported topic: ${topic}. Skipping...`); + } + + const event = JSON.parse(data); + const shopDomain = headers.get("x-shopify-shop-domain") || ""; + + // Find workspace + const workspace = await prisma.project.findUnique({ + where: { + shopifyStoreId: shopDomain, + }, + select: { + id: true, + }, + }); + + if (!workspace) { + return new Response( + `[Shopify] Workspace not found for shop: ${shopDomain}. Skipping...`, + ); + } + + try { + switch (topic) { + case "orders/paid": + await orderPaid({ + event, + workspaceId: workspace.id, + }); + break; + case "customers/data_request": + await customersDataRequest({ + event, + }); + break; + case "customers/redact": + await customersRedact({ + event, + }); + break; + case "shop/redact": + await shopRedact({ + event, + }); + break; + case "app/uninstalled": + await appUninstalled({ + shopDomain, + }); + break; + } + } catch (error) { + await log({ + message: `Shopify webhook failed. Error: ${error.message}`, + type: "errors", + }); + + return new Response( + `[Shopify] Webhook error: "Webhook handler failed. View logs."`, + ); + } + + return NextResponse.json("OK"); +}; diff --git a/apps/web/app/api/shopify/integration/webhook/shop-redact.ts b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts new file mode 100644 index 0000000000..26fb13141a --- /dev/null +++ b/apps/web/app/api/shopify/integration/webhook/shop-redact.ts @@ -0,0 +1,20 @@ +import { waitUntil } from "@vercel/functions"; +import { sendEmail } from "emails"; +import { z } from "zod"; + +const schema = z.object({ + shop_domain: z.string(), +}); + +export async function shopRedact({ event }: { event: any }) { + const { shop_domain: shopDomain } = schema.parse(event); + + waitUntil( + sendEmail({ + email: "steven@dub.co", + from: "Steven Tey ", + subject: "[Shopify] - Shop Redacted request received", + text: `Shop Redacted request received for shop: ${shopDomain}`, + }), + ); +} diff --git a/apps/web/app/api/track/shopify/route.ts b/apps/web/app/api/track/shopify/route.ts new file mode 100644 index 0000000000..b447233234 --- /dev/null +++ b/apps/web/app/api/track/shopify/route.ts @@ -0,0 +1,59 @@ +import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { parseRequestBody } from "@/lib/api/utils"; +import { getClickEvent } from "@/lib/tinybird"; +import { redis } from "@/lib/upstash"; +import { waitUntil } from "@vercel/functions"; +import { NextResponse } from "next/server"; + +export const runtime = "edge"; + +const CORS_HEADERS = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", +}; + +// TODO: +// Add rate limiting +// Finalize the endpoint (Maybe move to /api/shopify/pixel) + +// POST /api/track/shopify – Handle the Shopify Pixel events +export const POST = async (req: Request) => { + try { + const { clickId, checkoutToken } = await parseRequestBody(req); + + if (!clickId || !checkoutToken) { + throw new DubApiError({ + code: "bad_request", + message: "Missing clickId or checkoutToken", + }); + } + + const clickEvent = await getClickEvent({ clickId }); + + if (!clickEvent || clickEvent.data.length === 0) { + return new Response( + `[Shopify] Click event not found for clickId: ${clickId}`, + ); + } + + waitUntil( + redis.hset(`shopify:checkout:${checkoutToken}`, { + clickId, + }), + ); + + return NextResponse.json("OK", { + headers: CORS_HEADERS, + }); + } catch (error) { + return handleAndReturnErrorResponse(error, CORS_HEADERS); + } +}; + +export const OPTIONS = () => { + return new Response(null, { + status: 204, + headers: CORS_HEADERS, + }); +}; diff --git a/apps/web/lib/integrations/shopify/process-order.ts b/apps/web/lib/integrations/shopify/process-order.ts new file mode 100644 index 0000000000..01154bc84c --- /dev/null +++ b/apps/web/lib/integrations/shopify/process-order.ts @@ -0,0 +1,286 @@ +import { handleAndReturnErrorResponse } from "@/lib/api/errors"; +import { notifyPartnerSale } from "@/lib/api/partners/notify-partner-sale"; +import { createSaleData } from "@/lib/api/sales/create-sale-data"; +import { createId } from "@/lib/api/utils"; +import { generateRandomName } from "@/lib/names"; +import { + getClickEvent, + getLeadEvent, + recordLead, + recordSale, +} from "@/lib/tinybird"; +import z from "@/lib/zod"; +import { leadEventSchemaTB } from "@/lib/zod/schemas/leads"; +import { prisma } from "@dub/prisma"; +import { nanoid } from "@dub/utils"; +import { orderSchema } from "./schema"; + +async function createLead({ + clickId, + externalId, + workspaceId, +}: { + clickId: string; + externalId: string; + workspaceId: string; +}) { + // find click + const clickEvent = await getClickEvent({ clickId }); + + const clickData = clickEvent.data[0]; + const { link_id: linkId, country, timestamp } = clickData; + + // create customer + const customer = await prisma.customer.create({ + data: { + id: createId({ prefix: "cus_" }), + name: generateRandomName(), + externalId, + projectId: workspaceId, + clickedAt: new Date(timestamp + "Z"), + clickId, + linkId, + country, + }, + }); + + const leadData = leadEventSchemaTB.parse({ + ...clickData, + event_id: nanoid(16), + event_name: "Account created", + customer_id: customer.id, + }); + + await Promise.all([ + // record lead + recordLead(leadData), + + // update link leads count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + leads: { + increment: 1, + }, + }, + }), + + // update workspace usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + }, + }), + ]); + + return leadData; +} + +async function createSale({ + order, + customerId, + workspaceId, + leadData, +}: { + order: any; + customerId: string; + workspaceId: string; + leadData: z.infer; +}) { + const orderData = orderSchema.parse(order); + const eventId = nanoid(16); + const paymentProcessor = "shopify"; + + const amount = Number(orderData.current_subtotal_price) * 100; + const currency = orderData.currency; + const invoiceId = orderData.confirmation_number; + + const { link_id: linkId, click_id: clickId } = leadData; + + const sale = await prisma.sale.findFirst({ + where: { + invoiceId, + clickId, + }, + }); + + if (sale) { + return new Response( + `[Shopify] Order has been processed already. Skipping...`, + ); + } + + const saleData = { + ...leadData, + event_id: eventId, + event_name: "Purchase", + payment_processor: paymentProcessor, + amount, + currency, + invoice_id: invoiceId, + metadata: JSON.stringify(order), + }; + + const [_sale, link, _project] = await Promise.all([ + // record sale + recordSale(saleData), + + // update link sales count + prisma.link.update({ + where: { + id: linkId, + }, + data: { + sales: { + increment: 1, + }, + saleAmount: { + increment: amount, + }, + }, + }), + + // update workspace sales usage + prisma.project.update({ + where: { + id: workspaceId, + }, + data: { + usage: { + increment: 1, + }, + salesUsage: { + increment: amount, + }, + }, + }), + ]); + + // for program links + if (link.programId) { + const { program, partnerId, commissionAmount } = + await prisma.programEnrollment.findUniqueOrThrow({ + where: { + linkId, + }, + select: { + program: true, + partnerId: true, + commissionAmount: true, + }, + }); + + const saleRecord = createSaleData({ + program, + partner: { + id: partnerId, + commissionAmount, + }, + customer: { + id: customerId, + linkId, + clickId, + }, + sale: { + amount, + currency, + invoiceId, + eventId, + paymentProcessor, + }, + metadata: { + ...order, + }, + }); + + await Promise.allSettled([ + prisma.sale.create({ + data: saleRecord, + }), + + notifyPartnerSale({ + partner: { + id: partnerId, + referralLink: link.shortLink, + }, + program, + sale: { + amount: saleRecord.amount, + earnings: saleRecord.earnings, + }, + }), + ]); + } +} + +// Process the order from Shopify webhook +export async function processOrder({ + event, + workspaceId, + customerId, + clickId, +}: { + event: unknown; + workspaceId: string; + customerId?: string; // ID of the customer in Dub + clickId?: string; // ID of the click event from Shopify pixel +}) { + try { + const order = orderSchema.parse(event); + + // for existing customer + if (customerId) { + const leadEvent = await getLeadEvent({ customerId }); + + if (!leadEvent || leadEvent.data.length === 0) { + return new Response( + `[Shopify] Lead event with customer ID ${customerId} not found, skipping...`, + ); + } + + const leadData = leadEvent.data[0]; + + await createSale({ + leadData, + order, + workspaceId, + customerId, + }); + + return; + } + + // for new customer + if (clickId) { + const { + customer: { id: externalId }, + } = orderSchema.parse(event); + + const leadData = await createLead({ + clickId, + workspaceId, + externalId: externalId.toString(), + }); + + const { customer_id: customerId } = leadData; + + await createSale({ + leadData, + order, + workspaceId, + customerId, + }); + } + + return new Response("[Shopify] Order event processed successfully."); + } catch (error) { + return handleAndReturnErrorResponse(error); + } +} diff --git a/apps/web/lib/integrations/shopify/schema.ts b/apps/web/lib/integrations/shopify/schema.ts new file mode 100644 index 0000000000..2db2e3918b --- /dev/null +++ b/apps/web/lib/integrations/shopify/schema.ts @@ -0,0 +1,11 @@ +import { z } from "zod"; + +export const orderSchema = z.object({ + current_subtotal_price: z.string(), + currency: z.string(), + confirmation_number: z.string(), + checkout_token: z.string(), + customer: z.object({ + id: z.number(), + }), +}); diff --git a/packages/prisma/schema/workspace.prisma b/packages/prisma/schema/workspace.prisma index de9d925fe5..cca5031877 100644 --- a/packages/prisma/schema/workspace.prisma +++ b/packages/prisma/schema/workspace.prisma @@ -11,6 +11,7 @@ model Project { paymentFailedAt DateTime? stripeConnectId String? @unique // for Stripe Integration payoutMethodId String? // for Stripe Connect payouts + shopifyStoreId String? @unique // for Shopify Integration usage Int @default(0) usageLimit Int @default(1000) diff --git a/packages/utils/src/constants/integrations.ts b/packages/utils/src/constants/integrations.ts index b09dabc288..67de874ace 100644 --- a/packages/utils/src/constants/integrations.ts +++ b/packages/utils/src/constants/integrations.ts @@ -1,2 +1,3 @@ export const SLACK_INTEGRATION_ID = "clzu59rx9000110bm5fnlzwuj"; export const STRIPE_INTEGRATION_ID = "clzra1ya60001wnj4a89zcg9h"; +export const SHOPIFY_INTEGRATION_ID = "int_iWOtrZgmcyU6XDwKr4AYYqLN";