diff --git a/examples/nextjs-webhook-stripe/app/api/workflow/onboarding/route.ts b/examples/nextjs-webhook-stripe/app/api/workflow/onboarding/route.ts index cf67099..36898b9 100644 --- a/examples/nextjs-webhook-stripe/app/api/workflow/onboarding/route.ts +++ b/examples/nextjs-webhook-stripe/app/api/workflow/onboarding/route.ts @@ -8,180 +8,180 @@ import { Webhook } from 'svix'; const stripe = new Stripe(process.env.STRIPE_SECRET_KEY ?? ""); const resend = new Resend(process.env.RESEND_API_KEY ?? "") export type OnboardingPayload = { - event: string; - clerkUserId: string; - email: string; - firstName: string; - lastName: string; + event: string; + clerkUserId: string; + email: string; + firstName: string; + lastName: string; } async function validateRequest(payloadString: string, headerPayload: Headers) { - const svixHeaders = { - "svix-id": headerPayload.get("svix-id") as string, - "svix-timestamp": headerPayload.get("svix-timestamp") as string, - "svix-signature": headerPayload.get("svix-signature") as string, - } - const wh = new Webhook(process.env.CLERK_WEBHOOK_SECRET ?? ""); - return wh.verify(payloadString, svixHeaders) as WebhookEvent; + const svixHeaders = { + "svix-id": headerPayload.get("svix-id") as string, + "svix-timestamp": headerPayload.get("svix-timestamp") as string, + "svix-signature": headerPayload.get("svix-signature") as string, + } + const wh = new Webhook(process.env.CLERK_WEBHOOK_SECRET ?? ""); + return wh.verify(payloadString, svixHeaders) as WebhookEvent; } export const { POST } = serve(async (context) => { - const payloadString = context.requestPayload; - const headerPayload = context.headers; - - let event: WebhookEvent; - try { - event = await validateRequest(payloadString, headerPayload); - } catch { - return - } - - const user = await context.run("handle-clerk-webhook-event", async () => { - if (event.type === "user.created") { - const { id: clerkUserId, email_addresses, first_name, last_name } = event.data; - const primaryEmail = email_addresses.find(email => email.id === event.data.primary_email_address_id) - - if (!primaryEmail) { - return false - } - - return { - event: event.type, - clerkUserId: clerkUserId, - email: primaryEmail.email_address, - firstName: first_name, - lastName: last_name, - } as OnboardingPayload - } - return false - }) - - if (!user) { - return - } - - const customer = await context.run("create-stripe-customer", async () => { - return await stripe.customers.create({ - email: user.email, - name: `${user.firstName} ${user.lastName}`, - metadata: { - clerkUserId: user.clerkUserId - } - }) - }) - - await context.run("send-welcome-email", async () => { - console.log("Sending welcome email to:", user.email) - - await resend.emails.send({ - from: 'welcome@yourdomain.com', - to: user.email, - subject: 'Welcome to Your Trial!', - html: ` -

Welcome ${user.firstName || 'there'}!

-

Thanks for signing up! Your trial starts now.

-

You have 7 days to explore all our premium features.

-

What you get with your trial:

- -

Get started now: Visit Dashboard

- ` - }); - - }) - - const subscription = await context.run("create-trial", async () => { - return await stripe.subscriptions.create({ - customer: customer.id, - items: [{ price: "price_1QQQWaCKnqweyLP9MPbARyG" }], - trial_period_days: 7, - metadata: { - clerkUserId: user.clerkUserId, - workflowRunId: context.workflowRunId - } - }) - }) - - await context.run("store-subscription", async () => { - console.log(subscription) - }) - - - /** - * This is where we start waiting for the payment method to be added to the subscription. - * If the payment method is added within 7 days, workflow on the `api/stripe/route` will notify this workflow with `payment_method_`. - * If the payment method is not added within 7 days, we will handle the trial end. - */ - const { timeout } = await context.waitForEvent("await-payment-method", `payment_method_${subscription.id}`, { - timeout: "7d" - }) - - - if (!timeout) { - await context.run("send-subscription-start-welcome-mail", async () => { - console.log("Sending subscription started email to:", user.email) - - await resend.emails.send({ - from: 'billing@yourdomain.com', - to: user.email, - subject: 'Payment Method Added Successfully!', - html: ` -

Thank you for adding your payment method!

-

Your subscription will continue automatically after the trial period.

-

Your trial benefits:

-
    -
  • Unlimited access to all features
  • -
  • Priority support
  • -
  • No interruption in service
  • -
-

Need help? Reply to this email or visit our support center.

- ` - }); - }) - - } else { - await context.run("handle-trial-end", async () => { - await stripe.subscriptions.update(subscription.id, { - cancel_at_period_end: true - }) - - return { status: 'trial_ended' } - }) - - - await context.run("send-trial-ending-mail", async () => { - console.log("Sending trial ending email to:", user.email) - - await resend.emails.send({ - from: 'billing@yourdomain.com', - to: user.email, - subject: 'Your Trial is Ending Soon', - html: ` -

Don't Lose Access!

-

Your trial is coming to an end. Add a payment method to keep your access:

-
    -
  • Keep all your data and settings
  • -
  • Continue using premium features
  • -
  • No interruption in service
  • -
- Add Payment Method -

Questions? Contact our support team!

- ` - }); - - }) - } + const payloadString = context.requestPayload; + const headerPayload = context.headers; + + let event: WebhookEvent; + try { + event = await validateRequest(payloadString, headerPayload); + } catch { + return + } + + const user = await context.run("handle-clerk-webhook-event", async () => { + if (event.type === "user.created") { + const { id: clerkUserId, email_addresses, first_name, last_name } = event.data; + const primaryEmail = email_addresses.find(email => email.id === event.data.primary_email_address_id) + + if (!primaryEmail) { + return false + } + + return { + event: event.type, + clerkUserId: clerkUserId, + email: primaryEmail.email_address, + firstName: first_name, + lastName: last_name, + } as OnboardingPayload + } + return false + }) + + if (!user) { + return + } + + const customer = await context.run("create-stripe-customer", async () => { + return await stripe.customers.create({ + email: user.email, + name: `${user.firstName} ${user.lastName}`, + metadata: { + clerkUserId: user.clerkUserId + } + }) + }) + + await context.run("send-welcome-email", async () => { + console.log("Sending welcome email to:", user.email) + + await resend.emails.send({ + from: 'welcome@yourdomain.com', + to: user.email, + subject: 'Welcome to Your Trial!', + html: ` +

Welcome ${user.firstName || 'there'}!

+

Thanks for signing up! Your trial starts now.

+

You have 7 days to explore all our premium features.

+

What you get with your trial:

+
    +
  • Feature 1
  • +
  • Feature 2
  • +
  • Feature 3
  • +
+

Get started now: Visit Dashboard

+ ` + }); + + }) + + const subscription = await context.run("create-trial", async () => { + return await stripe.subscriptions.create({ + customer: customer.id, + items: [{ price: "price_1QQQWaCKnqweyLP9MPbARyG" }], + trial_period_days: 7, + metadata: { + clerkUserId: user.clerkUserId, + workflowRunId: context.workflowRunId + } + }) + }) + + await context.run("store-subscription", async () => { + console.log(subscription) + }) + + + /** + * This is where we start waiting for the payment method to be added to the subscription. + * If the payment method is added within 7 days, workflow on the `api/stripe/route` will notify this workflow with `payment_method_`. + * If the payment method is not added within 7 days, we will handle the trial end. + */ + const { timeout } = await context.waitForEvent("await-payment-method", `payment_method_${subscription.id}`, { + timeout: "7d" + }) + + + if (!timeout) { + await context.run("send-subscription-start-welcome-mail", async () => { + console.log("Sending subscription started email to:", user.email) + + await resend.emails.send({ + from: 'billing@yourdomain.com', + to: user.email, + subject: 'Payment Method Added Successfully!', + html: ` +

Thank you for adding your payment method!

+

Your subscription will continue automatically after the trial period.

+

Your trial benefits:

+
    +
  • Unlimited access to all features
  • +
  • Priority support
  • +
  • No interruption in service
  • +
+

Need help? Reply to this email or visit our support center.

+ ` + }); + }) + + } else { + await context.run("handle-trial-end", async () => { + await stripe.subscriptions.update(subscription.id, { + cancel_at_period_end: true + }) + + return { status: 'trial_ended' } + }) + + + await context.run("send-trial-ending-mail", async () => { + console.log("Sending trial ending email to:", user.email) + + await resend.emails.send({ + from: 'billing@yourdomain.com', + to: user.email, + subject: 'Your Trial is Ending Soon', + html: ` +

Don't Lose Access!

+

Your trial is coming to an end. Add a payment method to keep your access:

+
    +
  • Keep all your data and settings
  • +
  • Continue using premium features
  • +
  • No interruption in service
  • +
+ Add Payment Method +

Questions? Contact our support team!

+ ` + }); + + }) + } }, { baseUrl: "", initialPayloadParser: (payload) => { return payload } }) \ No newline at end of file diff --git a/examples/nextjs-webhook-stripe/app/api/workflow/stripe/route.ts b/examples/nextjs-webhook-stripe/app/api/workflow/stripe/route.ts index 3223679..fc49e51 100644 --- a/examples/nextjs-webhook-stripe/app/api/workflow/stripe/route.ts +++ b/examples/nextjs-webhook-stripe/app/api/workflow/stripe/route.ts @@ -5,50 +5,50 @@ import { Client } from "@upstash/workflow" const stripe = new Stripe(process.env.STRIPE_SECRET_KEY!); const wc = new Client({ - token: process.env.QSTASH_TOKEN ?? "" + token: process.env.QSTASH_TOKEN ?? "" }); export async function POST(request: Request) { - const body = await request.text(); - const headerList = await headers(); - const signature = headerList.get("stripe-signature") as string; - - try { - const event = stripe.webhooks.constructEvent(body, signature, process.env.STRIPE_WEBHOOK_SECRET ?? ""); - - if (event.type === "payment_method.attached") { - const paymentMethod = event.data.object; - const customer = await stripe.customers.retrieve(paymentMethod.customer as string); - - const subscriptions = await stripe.subscriptions.list({ - customer: paymentMethod.customer as string, - status: "trialing" - }) - - const trialSubscription = subscriptions.data[0]; - - if (trialSubscription) { - /** - * This is where we notify the Workflow on the `api/workflow/onboarding` endpoint when a payment method is attached to a customer. - * Whether this event is notified within 7 days(arbitrary, customizable timeout), can be handled in onboarding workflow. - */ - await wc.notify({ - eventId: `payment_method_${trialSubscription.id}`, eventData: { - customerId: customer.id, - paymentMethodId: paymentMethod.id, - addedAt: new Date().toISOString() - } - }) - } - - } - - return Response.json({ received: true }); - } catch (error) { - console.error('Stripe webhook error:', error); - return Response.json( - { error: 'Webhook error occurred' }, - { status: 400 } - ); - } + const body = await request.text(); + const headerList = await headers(); + const signature = headerList.get("stripe-signature") as string; + + try { + const event = stripe.webhooks.constructEvent(body, signature, process.env.STRIPE_WEBHOOK_SECRET ?? ""); + + if (event.type === "payment_method.attached") { + const paymentMethod = event.data.object; + const customer = await stripe.customers.retrieve(paymentMethod.customer as string); + + const subscriptions = await stripe.subscriptions.list({ + customer: paymentMethod.customer as string, + status: "trialing" + }) + + const trialSubscription = subscriptions.data[0]; + + if (trialSubscription) { + /** + * This is where we notify the Workflow on the `api/workflow/onboarding` endpoint when a payment method is attached to a customer. + * Whether this event is notified within 7 days(arbitrary, customizable timeout), can be handled in onboarding workflow. + */ + await wc.notify({ + eventId: `payment_method_${trialSubscription.id}`, eventData: { + customerId: customer.id, + paymentMethodId: paymentMethod.id, + addedAt: new Date().toISOString() + } + }) + } + + } + + return Response.json({ received: true }); + } catch (error) { + console.error('Stripe webhook error:', error); + return Response.json( + { error: 'Webhook error occurred' }, + { status: 400 } + ); + } } \ No newline at end of file diff --git a/examples/nextjs/app/vercel-ai-sdk/route.ts b/examples/nextjs/app/vercel-ai-sdk/route.ts new file mode 100644 index 0000000..5371319 --- /dev/null +++ b/examples/nextjs/app/vercel-ai-sdk/route.ts @@ -0,0 +1,115 @@ +import { createOpenAI } from '@ai-sdk/openai'; +import { WorkflowContext } from '@upstash/workflow'; +import { QStashWorkflowAbort } from '@upstash/qstash'; +import { HTTPMethods } from '@upstash/qstash'; +import { generateText, tool, ToolExecutionError } from 'ai'; +import { z } from 'zod'; +import { serve } from "@upstash/workflow/nextjs"; + +const createWorkflowOpenAI = (context: WorkflowContext) => { + return createOpenAI({ + apiKey: process.env.OPENAI_API_KEY ?? "", + compatibility: "strict", + fetch: async (input, init) => { + try { + // Prepare headers from init.headers + const headers = init?.headers + ? Object.fromEntries(new Headers(init.headers).entries()) + : {}; + + // Prepare body from init.body + const body = init?.body ? JSON.parse(init.body as string) : undefined; + + // Call the workflow context + const responseInfo = await context.call("call step", { + url: input.toString(), + method: init?.method as HTTPMethods, + headers, + body, + }); + + // Construct headers for the response + const responseHeaders = new Headers( + Object.entries(responseInfo.header).reduce((acc, [key, values]) => { + acc[key] = values.join(", "); + return acc; + }, {} as Record) + ); + + // Return the constructed response + return new Response(JSON.stringify(responseInfo.body), { + status: responseInfo.status, + headers: responseHeaders, + }); + } catch (error) { + if (error instanceof QStashWorkflowAbort) { + throw error + } else { + console.error("Error in fetch implementation:", error); + throw error; // Rethrow error for further handling + } + } + }, + }); +}; + +export const { POST } = serve<{ prompt: string }>(async (context) => { + + const openai = createWorkflowOpenAI(context) + + const prompt = await context.run("get prompt", async () => { + return context.requestPayload.prompt + }) + + try { + const result = await generateText({ + model: openai('gpt-3.5-turbo'), + + maxTokens: 2048, + tools: { + weather: tool({ + description: 'Get the weather in a location', + parameters: z.object({ + latitude: z.number(), + longitude: z.number(), + }), + execute: async ({ latitude, longitude }) => context.call("weather tool", { + url: `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}¤t=temperature_2m&hourly=temperature_2m&daily=sunrise,sunset&timezone=auto`, + method: 'GET', + }) + }), + cityAttractions: tool({ + description: 'Get tourist attractions in a city', + parameters: z.object({ + city: z.string().describe('The city to get attractions for') + }), + execute: async ({ city }) => context.call("attractions tool", { + url: 'https://places.googleapis.com/v1/places:searchText', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Goog-Api-Key': process.env.GOOGLE_PLACES_API_KEY ?? "", + 'X-Goog-FieldMask': 'places.displayName,places.formattedAddress' + }, + body: { + textQuery: `tourist attractions in ${city}` + } + }) + }), + }, + prompt, + maxSteps: 6 + }); + await context.run("text", () => { + console.log(`TEXT: ${result.text}`); + return result.text + }) + + } catch (error) { + if (error instanceof ToolExecutionError && error.cause instanceof QStashWorkflowAbort) { + throw error.cause + } else { + throw error + } + } +}) \ No newline at end of file diff --git a/examples/nextjs/package.json b/examples/nextjs/package.json index 39e8538..a28c521 100644 --- a/examples/nextjs/package.json +++ b/examples/nextjs/package.json @@ -9,8 +9,10 @@ "lint": "next lint" }, "dependencies": { + "@ai-sdk/openai": "^1.0.8", "@upstash/qstash": "^2.7.12", "@upstash/workflow": "latest", + "ai": "^4.0.16", "clsx": "^2.1.1", "next": "14.2.4", "react": "^18.3.1",