From 5addd9f0234349fb68e26b1b7684a1da542b1278 Mon Sep 17 00:00:00 2001
From: CahidArda <cahidardaooz@hotmail.com>
Date: Mon, 23 Dec 2024 16:19:46 +0300
Subject: [PATCH 1/4] fix: pass all params to context in failureFunction

---
 src/serve/index.ts          |  5 ++++-
 src/workflow-parser.test.ts | 32 ++++++++++++++++++++++++++------
 src/workflow-parser.ts      |  6 +++++-
 3 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/src/serve/index.ts b/src/serve/index.ts
index af8473f..a495eba 100644
--- a/src/serve/index.ts
+++ b/src/serve/index.ts
@@ -107,7 +107,10 @@ export const serveBase = <
       qstashClient,
       initialPayloadParser,
       routeFunction,
-      failureFunction
+      failureFunction,
+      env,
+      retries,
+      debug
     );
     if (failureCheck.isErr()) {
       // unexpected error during handleFailure
diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts
index 5206d01..bd4a8b1 100644
--- a/src/workflow-parser.test.ts
+++ b/src/workflow-parser.test.ts
@@ -753,7 +753,16 @@ describe("Workflow Parser", () => {
       };
 
       // no failureFunction
-      const result1 = await handleFailure(request, "", client, initialPayloadParser, routeFunction);
+      const result1 = await handleFailure(
+        request,
+        "",
+        client,
+        initialPayloadParser,
+        routeFunction,
+        undefined,
+        {},
+        3
+      );
       expect(result1.isOk()).toBeTrue();
       expect(result1.isOk() && result1.value === "not-failure-callback").toBeTrue();
 
@@ -764,7 +773,9 @@ describe("Workflow Parser", () => {
         client,
         initialPayloadParser,
         routeFunction,
-        failureFunction
+        failureFunction,
+        {},
+        0
       );
       expect(result2.isOk()).toBeTrue();
       expect(result2.isOk() && result2.value === "not-failure-callback").toBeTrue();
@@ -789,7 +800,10 @@ describe("Workflow Parser", () => {
         "",
         client,
         initialPayloadParser,
-        routeFunction
+        routeFunction,
+        undefined,
+        {},
+        0
       );
       expect(result.isErr()).toBeTrue();
       expect(result.isErr() && result.error.name).toBe(WorkflowError.name);
@@ -817,7 +831,9 @@ describe("Workflow Parser", () => {
         client,
         initialPayloadParser,
         routeFunction,
-        failureFunction
+        failureFunction,
+        {},
+        3
       );
       expect(result.isErr()).toBeTrue();
       expect(result.isErr() && result.error.message).toBe("my-error");
@@ -847,7 +863,9 @@ describe("Workflow Parser", () => {
         client,
         initialPayloadParser,
         routeFunction,
-        failureFunction
+        failureFunction,
+        {},
+        0
       );
       expect(result.isOk()).toBeTrue();
       expect(result.isOk() && result.value).toBe("is-failure-callback");
@@ -867,7 +885,9 @@ describe("Workflow Parser", () => {
         client,
         initialPayloadParser,
         routeFunctionWithoutSteps,
-        failureFunction
+        failureFunction,
+        {},
+        3
       );
 
       expect(result.isErr());
diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts
index 329de5a..9596ff0 100644
--- a/src/workflow-parser.ts
+++ b/src/workflow-parser.ts
@@ -304,7 +304,9 @@ export const handleFailure = async <TInitialPayload>(
     WorkflowServeOptions<Response, TInitialPayload>
   >["initialPayloadParser"],
   routeFunction: RouteFunction<TInitialPayload>,
-  failureFunction?: WorkflowServeOptions<Response, TInitialPayload>["failureFunction"],
+  failureFunction: WorkflowServeOptions<Response, TInitialPayload>["failureFunction"],
+  env: WorkflowServeOptions["env"],
+  retries: WorkflowServeOptions["retries"],
   debug?: WorkflowLogger
 ): Promise<Ok<"is-failure-callback" | "not-failure-callback", never> | Err<never, Error>> => {
   if (request.headers.get(WORKFLOW_FAILURE_HEADER) !== "true") {
@@ -350,6 +352,8 @@ export const handleFailure = async <TInitialPayload>(
       url: url,
       failureUrl: url,
       debug,
+      env,
+      retries,
     });
 
     // attempt running routeFunction until the first step

From 4c4ec7bab5d061c5dae83d97518046ad9414997e Mon Sep 17 00:00:00 2001
From: CahidArda <cahidardaooz@hotmail.com>
Date: Mon, 23 Dec 2024 21:14:06 +0300
Subject: [PATCH 2/4] feat: add workflow telemetry

---
 platforms/astro.ts            |   8 +-
 platforms/cloudflare.ts       |  16 +++-
 platforms/express.ts          |   8 +-
 platforms/h3.ts               |  11 ++-
 platforms/hono.ts             |  21 +++--
 platforms/nextjs.ts           |  16 +++-
 platforms/solidjs.ts          |  11 ++-
 platforms/svelte.ts           |  16 +++-
 src/client/index.ts           |   8 +-
 src/constants.ts              |  14 +++
 src/context/auto-executor.ts  |  59 +++++++-----
 src/context/context.ts        |   5 +-
 src/serve/index.ts            |  29 ++++--
 src/serve/serve.test.ts       | 123 ++++++++++++++++++++++++-
 src/test-utils.ts             |  10 +-
 src/types.ts                  |  82 +++++++++++++++++
 src/workflow-parser.ts        |   1 +
 src/workflow-requests.test.ts | 167 +++++++++++++++++++++-------------
 src/workflow-requests.ts      | 127 ++++++++++++++++----------
 19 files changed, 565 insertions(+), 167 deletions(-)

diff --git a/platforms/astro.ts b/platforms/astro.ts
index 72f6bcb..871fbb8 100644
--- a/platforms/astro.ts
+++ b/platforms/astro.ts
@@ -1,7 +1,8 @@
 import type { APIContext, APIRoute } from "astro";
 
-import type { PublicServeOptions, WorkflowContext } from "../src";
+import { PublicServeOptions, WorkflowContext } from "../src";
 import { serveBase } from "../src/serve";
+import { SDK_TELEMETRY } from "../src/constants";
 
 export function serve<TInitialPayload = unknown>(
   routeFunction: (
@@ -13,6 +14,11 @@ export function serve<TInitialPayload = unknown>(
   const POST: APIRoute = (apiContext) => {
     const { handler } = serveBase<TInitialPayload>(
       (workflowContext) => routeFunction(workflowContext, apiContext),
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "astro",
+        runtime: `node@${process.version}`,
+      },
       options
     );
 
diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts
index e5c9f5d..2bda636 100644
--- a/platforms/cloudflare.ts
+++ b/platforms/cloudflare.ts
@@ -1,4 +1,5 @@
 import type { PublicServeOptions, RouteFunction } from "../src";
+import { SDK_TELEMETRY } from "../src/constants";
 import { serveBase } from "../src/serve";
 
 export type WorkflowBindings = {
@@ -62,10 +63,17 @@ export const serve = <TInitialPayload = unknown>(
 ): { fetch: (...args: PagesHandlerArgs | WorkersHandlerArgs) => Promise<Response> } => {
   const fetch = async (...args: PagesHandlerArgs | WorkersHandlerArgs) => {
     const { request, env } = getArgs(args);
-    const { handler: serveHandler } = serveBase(routeFunction, {
-      env,
-      ...options,
-    });
+    const { handler: serveHandler } = serveBase(
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "cloudflare",
+      },
+      {
+        env,
+        ...options,
+      }
+    );
     return await serveHandler(request);
   };
   return { fetch };
diff --git a/platforms/express.ts b/platforms/express.ts
index 518bc14..c6ada83 100644
--- a/platforms/express.ts
+++ b/platforms/express.ts
@@ -1,4 +1,5 @@
 import type { WorkflowServeOptions, RouteFunction } from "../src";
+import { SDK_TELEMETRY } from "../src/constants";
 import { serveBase } from "../src/serve";
 import {
   Request as ExpressRequest,
@@ -43,7 +44,12 @@ export function serve<TInitialPayload = unknown>(
 
     // create handler
     const { handler: serveHandler } = serveBase<TInitialPayload>(
-      (workflowContext) => routeFunction(workflowContext),
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "express",
+        runtime: `node@${process.version}`,
+      },
       {
         ...options,
         useJSONContent: true,
diff --git a/platforms/h3.ts b/platforms/h3.ts
index f647d09..54e0dd4 100644
--- a/platforms/h3.ts
+++ b/platforms/h3.ts
@@ -3,6 +3,7 @@ import { defineEventHandler, readRawBody } from "h3";
 import type { PublicServeOptions, RouteFunction } from "../src";
 import { serveBase } from "../src/serve";
 import type { IncomingHttpHeaders } from "node:http";
+import { SDK_TELEMETRY } from "../src/constants";
 
 function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {
   const formattedHeaders = Object.entries(headers).map(([key, value]) => [
@@ -37,7 +38,15 @@ export const serve = <TInitialPayload = unknown>(
       method: "POST",
     });
 
-    const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
+    const { handler: serveHandler } = serveBase<TInitialPayload>(
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "h3",
+        runtime: `node@${process.version}`,
+      },
+      options
+    );
     return await serveHandler(request);
   });
 
diff --git a/platforms/hono.ts b/platforms/hono.ts
index e9d5413..cb7921e 100644
--- a/platforms/hono.ts
+++ b/platforms/hono.ts
@@ -2,6 +2,7 @@ import type { Context } from "hono";
 import type { PublicServeOptions, RouteFunction } from "../src";
 import { serveBase } from "../src/serve";
 import { Variables } from "hono/types";
+import { SDK_TELEMETRY } from "../src/constants";
 
 export type WorkflowBindings = {
   QSTASH_TOKEN: string;
@@ -32,12 +33,20 @@ export const serve = <
     const environment = context.env;
     const request = context.req.raw;
 
-    const { handler: serveHandler } = serveBase(routeFunction, {
-      // when hono is used without cf workers, it sends a DebugHTTPServer
-      // object in `context.env`. don't pass env if this is the case:
-      env: "QSTASH_TOKEN" in environment ? environment : undefined,
-      ...options,
-    });
+    const { handler: serveHandler } = serveBase(
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "hono",
+        runtime: `node@${process.version}`,
+      },
+      {
+        // when hono is used without cf workers, it sends a DebugHTTPServer
+        // object in `context.env`. don't pass env if this is the case:
+        env: "QSTASH_TOKEN" in environment ? environment : undefined,
+        ...options,
+      }
+    );
     return await serveHandler(request);
   };
   return handler;
diff --git a/platforms/nextjs.ts b/platforms/nextjs.ts
index 362826b..d3b3e24 100644
--- a/platforms/nextjs.ts
+++ b/platforms/nextjs.ts
@@ -4,6 +4,7 @@ import type { NextApiHandler, NextApiRequest, NextApiResponse } from "next";
 
 import type { RouteFunction, PublicServeOptions } from "../src";
 import { serveBase } from "../src/serve";
+import { SDK_TELEMETRY } from "../src/constants";
 
 /**
  * Serve method to serve a Upstash Workflow in a Nextjs project
@@ -20,6 +21,11 @@ export const serve = <TInitialPayload = unknown>(
 ): { POST: (request: Request) => Promise<Response> } => {
   const { handler: serveHandler } = serveBase<TInitialPayload, Request, Response>(
     routeFunction,
+    {
+      sdk: SDK_TELEMETRY,
+      platform: "nextjs",
+      runtime: `node@${process.version}`,
+    },
     options
   );
 
@@ -34,7 +40,15 @@ export const servePagesRouter = <TInitialPayload = unknown>(
   routeFunction: RouteFunction<TInitialPayload>,
   options?: PublicServeOptions<TInitialPayload>
 ): { handler: NextApiHandler } => {
-  const { handler: serveHandler } = serveBase(routeFunction, options);
+  const { handler: serveHandler } = serveBase(
+    routeFunction,
+    {
+      sdk: SDK_TELEMETRY,
+      platform: "nextjs-pages",
+      runtime: `node@${process.version}`,
+    },
+    options
+  );
 
   const handler = async (request_: NextApiRequest, res: NextApiResponse) => {
     if (request_.method?.toUpperCase() !== "POST") {
diff --git a/platforms/solidjs.ts b/platforms/solidjs.ts
index 9da7706..d8cd22b 100644
--- a/platforms/solidjs.ts
+++ b/platforms/solidjs.ts
@@ -2,6 +2,7 @@ import type { APIEvent } from "@solidjs/start/server";
 
 import type { PublicServeOptions, RouteFunction } from "../src";
 import { serveBase } from "../src/serve";
+import { SDK_TELEMETRY } from "../src/constants";
 
 /**
  * Serve method to serve a Upstash Workflow in a Nextjs project
@@ -28,7 +29,15 @@ export const serve = <TInitialPayload = unknown>(
     }
 
     // create serve handler
-    const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
+    const { handler: serveHandler } = serveBase<TInitialPayload>(
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "solidjs",
+        runtime: `node@${process.version}`,
+      },
+      options
+    );
 
     return await serveHandler(event.request);
   };
diff --git a/platforms/svelte.ts b/platforms/svelte.ts
index 9aa7b34..157b1b3 100644
--- a/platforms/svelte.ts
+++ b/platforms/svelte.ts
@@ -2,6 +2,7 @@ import type { RequestHandler } from "@sveltejs/kit";
 
 import type { PublicServeOptions, RouteFunction } from "../src";
 import { serveBase } from "../src/serve";
+import { SDK_TELEMETRY } from "../src/constants";
 
 /**
  * Serve method to serve a Upstash Workflow in a Nextjs project
@@ -19,10 +20,17 @@ export const serve = <TInitialPayload = unknown>(
   }
 ): { POST: RequestHandler } => {
   const handler: RequestHandler = async ({ request }) => {
-    const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, {
-      ...options,
-      useJSONContent: true,
-    });
+    const { handler: serveHandler } = serveBase<TInitialPayload>(
+      routeFunction,
+      {
+        sdk: SDK_TELEMETRY,
+        platform: "svelte",
+      },
+      {
+        ...options,
+        useJSONContent: true,
+      }
+    );
     return await serveHandler(request);
   };
 
diff --git a/src/client/index.ts b/src/client/index.ts
index 6beea9c..2e19a2e 100644
--- a/src/client/index.ts
+++ b/src/client/index.ts
@@ -4,7 +4,6 @@ import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
 import { getWorkflowRunId } from "../utils";
 import { triggerFirstInvocation } from "../workflow-requests";
 import { WorkflowContext } from "../context";
-import { DEFAULT_RETRIES } from "../constants";
 
 type ClientConfig = ConstructorParameters<typeof QStashClient>[0];
 
@@ -214,8 +213,13 @@ export class Client {
       steps: [],
       url,
       workflowRunId: finalWorkflowRunId,
+      retries,
+      telemetry: undefined, // can't know workflow telemetry here
+    });
+    const result = await triggerFirstInvocation({
+      workflowContext: context,
+      telemetry: undefined, // can't know workflow telemetry here
     });
-    const result = await triggerFirstInvocation(context, retries ?? DEFAULT_RETRIES);
     if (result.isOk()) {
       return { workflowRunId: finalWorkflowRunId };
     } else {
diff --git a/src/constants.ts b/src/constants.ts
index f4a7ebc..8ce66b6 100644
--- a/src/constants.ts
+++ b/src/constants.ts
@@ -1,3 +1,5 @@
+import { Telemetry } from "./types";
+
 export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
 export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
 export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
@@ -12,3 +14,15 @@ export const DEFAULT_CONTENT_TYPE = "application/json";
 export const NO_CONCURRENCY = 1;
 export const NOT_SET = "not-set";
 export const DEFAULT_RETRIES = 3;
+
+export const VERSION = "v0.2.3";
+export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;
+
+export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
+export const TELEMETRY_HEADER_PLATFORM = "Upstash-Telemetry-Platform" as const;
+export const TELEMETRY_HEADER_RUNTIME = "Upstash-Telemetry-Runtime" as const;
+
+export const MOCK_TELEMETRY: Telemetry = {
+  platform: "mock",
+  sdk: "mock",
+};
diff --git a/src/context/auto-executor.ts b/src/context/auto-executor.ts
index 0534e9e..cfc88c1 100644
--- a/src/context/auto-executor.ts
+++ b/src/context/auto-executor.ts
@@ -1,6 +1,6 @@
 import { WorkflowAbort, WorkflowError } from "../error";
 import type { WorkflowContext } from "./context";
-import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types";
+import type { StepFunction, ParallelCallState, Step, WaitRequest, Telemetry } from "../types";
 import { LazyCallStep, type BaseLazyStep } from "./steps";
 import { getHeaders } from "../workflow-requests";
 import type { WorkflowLogger } from "../logger";
@@ -15,15 +15,24 @@ export class AutoExecutor {
   private readonly nonPlanStepCount: number;
   private readonly steps: Step[];
   private indexInCurrentList = 0;
+  private telemetry?: Telemetry;
+
   public stepCount = 0;
   public planStepCount = 0;
 
   protected executingStep: string | false = false;
 
-  constructor(context: WorkflowContext, steps: Step[], debug?: WorkflowLogger) {
+  constructor(
+    context: WorkflowContext,
+    steps: Step[],
+    telemetry?: Telemetry,
+    debug?: WorkflowLogger
+  ) {
     this.context = context;
-    this.debug = debug;
     this.steps = steps;
+    this.telemetry = telemetry;
+    this.debug = debug;
+
     this.nonPlanStepCount = this.steps.filter((step) => !step.targetStep).length;
   }
 
@@ -323,18 +332,21 @@ export class AutoExecutor {
       steps,
     });
 
+    // must check length to be 1, otherwise was the if would return
+    // true for plan steps.
     if (steps[0].waitEventId && steps.length === 1) {
       const waitStep = steps[0];
 
-      const { headers, timeoutHeaders } = getHeaders(
-        "false",
-        this.context.workflowRunId,
-        this.context.url,
-        this.context.headers,
-        waitStep,
-        this.context.failureUrl,
-        this.context.retries
-      );
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "false",
+        workflowRunId: this.context.workflowRunId,
+        workflowUrl: this.context.url,
+        userHeaders: this.context.headers,
+        step: waitStep,
+        failureUrl: this.context.failureUrl,
+        retries: this.context.retries,
+        telemetry: this.telemetry,
+      });
 
       // call wait
       const waitBody: WaitRequest = {
@@ -366,17 +378,18 @@ export class AutoExecutor {
     const result = await this.context.qstashClient.batchJSON(
       steps.map((singleStep, index) => {
         const lazyStep = lazySteps[index];
-        const { headers } = getHeaders(
-          "false",
-          this.context.workflowRunId,
-          this.context.url,
-          this.context.headers,
-          singleStep,
-          this.context.failureUrl,
-          this.context.retries,
-          lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
-          lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined
-        );
+        const { headers } = getHeaders({
+          initHeaderValue: "false",
+          workflowRunId: this.context.workflowRunId,
+          workflowUrl: this.context.url,
+          userHeaders: this.context.headers,
+          step: singleStep,
+          failureUrl: this.context.failureUrl,
+          retries: this.context.retries,
+          callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
+          callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined,
+          telemetry: this.telemetry,
+        });
 
         // if the step is a single step execution or a plan step, we can add sleep headers
         const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0;
diff --git a/src/context/context.ts b/src/context/context.ts
index 974cc1a..31b8349 100644
--- a/src/context/context.ts
+++ b/src/context/context.ts
@@ -2,6 +2,7 @@ import type {
   CallResponse,
   CallSettings,
   NotifyStepResponse,
+  Telemetry,
   WaitEventOptions,
   WaitStepResponse,
   WorkflowClient,
@@ -162,6 +163,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
     initialPayload,
     env,
     retries,
+    telemetry,
   }: {
     qstashClient: WorkflowClient;
     workflowRunId: string;
@@ -173,6 +175,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
     initialPayload: TInitialPayload;
     env?: Record<string, string | undefined>;
     retries?: number;
+    telemetry?: Telemetry;
   }) {
     this.qstashClient = qstashClient;
     this.workflowRunId = workflowRunId;
@@ -184,7 +187,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
     this.env = env ?? {};
     this.retries = retries ?? DEFAULT_RETRIES;
 
-    this.executor = new AutoExecutor(this, this.steps, debug);
+    this.executor = new AutoExecutor(this, this.steps, telemetry, debug);
   }
 
   /**
diff --git a/src/serve/index.ts b/src/serve/index.ts
index a495eba..818ec35 100644
--- a/src/serve/index.ts
+++ b/src/serve/index.ts
@@ -1,8 +1,9 @@
 import { makeCancelRequest } from "../client/utils";
+import { SDK_TELEMETRY } from "../constants";
 import { WorkflowContext } from "../context";
 import { formatWorkflowError } from "../error";
 import { WorkflowLogger } from "../logger";
-import { RouteFunction, WorkflowServeOptions } from "../types";
+import { RouteFunction, Telemetry, WorkflowServeOptions } from "../types";
 import { getPayload, handleFailure, parseRequest, validateRequest } from "../workflow-parser";
 import {
   handleThirdPartyCallResult,
@@ -33,6 +34,7 @@ export const serveBase = <
   TResponse extends Response = Response,
 >(
   routeFunction: RouteFunction<TInitialPayload>,
+  telemetry: Telemetry,
   options?: WorkflowServeOptions<TResponse, TInitialPayload>
 ): { handler: (request: TRequest) => Promise<TResponse> } => {
   // Prepares options with defaults if they are not provided.
@@ -133,6 +135,7 @@ export const serveBase = <
       debug,
       env,
       retries,
+      telemetry,
     });
 
     // attempt running routeFunction until the first step
@@ -155,15 +158,16 @@ export const serveBase = <
     }
 
     // check if request is a third party call result
-    const callReturnCheck = await handleThirdPartyCallResult(
+    const callReturnCheck = await handleThirdPartyCallResult({
       request,
-      rawInitialPayload,
-      qstashClient,
+      requestPayload: rawInitialPayload,
+      client: qstashClient,
       workflowUrl,
-      workflowFailureUrl,
+      failureUrl: workflowFailureUrl,
       retries,
-      debug
-    );
+      telemetry,
+      debug,
+    });
     if (callReturnCheck.isErr()) {
       // error while checking
       await debug?.log("ERROR", "SUBMIT_THIRD_PARTY_RESULT", {
@@ -173,7 +177,7 @@ export const serveBase = <
     } else if (callReturnCheck.value === "continue-workflow") {
       // request is not third party call. Continue workflow as usual
       const result = isFirstInvocation
-        ? await triggerFirstInvocation(workflowContext, retries, useJSONContent, debug)
+        ? await triggerFirstInvocation({ workflowContext, useJSONContent, telemetry, debug })
         : await triggerRouteFunction({
             onStep: async () => routeFunction(workflowContext),
             onCleanup: async () => {
@@ -232,5 +236,12 @@ export const serve = <
   routeFunction: RouteFunction<TInitialPayload>,
   options?: Omit<WorkflowServeOptions<TResponse, TInitialPayload>, "useJSONContent">
 ): { handler: (request: TRequest) => Promise<TResponse> } => {
-  return serveBase(routeFunction, options);
+  return serveBase(
+    routeFunction,
+    {
+      sdk: SDK_TELEMETRY,
+      platform: "unknown",
+    },
+    options
+  );
 };
diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts
index a31c2c7..4b73f20 100644
--- a/src/serve/serve.test.ts
+++ b/src/serve/serve.test.ts
@@ -114,13 +114,44 @@ describe("serve", () => {
     ];
 
     await driveWorkflow({
-      execute: async (initialPayload, steps) => {
-        const request = getRequest(WORKFLOW_ENDPOINT, workflowRunId, initialPayload, steps);
+      execute: async (initialPayload, steps, first) => {
+        const request = first
+          ? new Request(WORKFLOW_ENDPOINT, {
+              body: JSON.stringify(initialPayload),
+              method: "POST",
+            })
+          : getRequest(WORKFLOW_ENDPOINT, workflowRunId, initialPayload, steps);
+
         const response = await endpoint(request);
         expect(response.status).toBe(200);
       },
       initialPayload,
       iterations: [
+        {
+          stepsToAdd: [],
+          responseFields: {
+            body: { messageId: "some-message-id" },
+            status: 200,
+          },
+          receivesRequest: {
+            method: "POST",
+            url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/https://requestcatcher.com/api`,
+            token,
+            body: initialPayload,
+            headers: {
+              "upstash-feature-set": "LazyFetch,InitialBody",
+              "upstash-forward-upstash-workflow-sdk-version": "1",
+              "upstash-retries": "3",
+              "upstash-failure-callback-retries": "3",
+              "upstash-method": "POST",
+              "upstash-workflow-init": "true",
+              "upstash-workflow-url": WORKFLOW_ENDPOINT,
+              "upstash-telemetry-platform": "unknown",
+              "upstash-telemetry-runtime": "unknown",
+              "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
+            },
+          },
+        },
         {
           stepsToAdd: [],
           responseFields: {
@@ -145,6 +176,9 @@ describe("serve", () => {
                   "upstash-workflow-runid": workflowRunId,
                   "upstash-workflow-init": "false",
                   "upstash-workflow-url": WORKFLOW_ENDPOINT,
+                  "upstash-telemetry-platform": "unknown",
+                  "upstash-telemetry-runtime": "unknown",
+                  "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
                 },
               },
             ],
@@ -173,6 +207,9 @@ describe("serve", () => {
                   "upstash-workflow-runid": workflowRunId,
                   "upstash-workflow-init": "false",
                   "upstash-workflow-url": WORKFLOW_ENDPOINT,
+                  "upstash-telemetry-platform": "unknown",
+                  "upstash-telemetry-runtime": "unknown",
+                  "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
                 },
                 body: JSON.stringify(steps[1]),
               },
@@ -349,6 +386,9 @@ describe("serve", () => {
                 "upstash-workflow-init": "false",
                 "upstash-workflow-runid": "wfr-foo",
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
+                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-runtime": "unknown",
+                "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
               body: '{"stepId":3,"stepName":"step 3","stepType":"Run","out":"\\"combined results: result 1,result 2\\"","concurrent":1}',
             },
@@ -396,6 +436,9 @@ describe("serve", () => {
                 "upstash-workflow-init": "false",
                 "upstash-workflow-runid": "wfr-bar",
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
+                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-runtime": "unknown",
+                "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
               body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
             },
@@ -441,6 +484,9 @@ describe("serve", () => {
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback": myFailureEndpoint,
                 "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
+                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-runtime": "unknown",
+                "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
               body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
             },
@@ -488,6 +534,9 @@ describe("serve", () => {
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
+                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-runtime": "unknown",
+                "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
               body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
             },
@@ -686,6 +735,9 @@ describe("serve", () => {
             "Upstash-Workflow-RunId": ["wfr-bar"],
             "Upstash-Workflow-Runid": ["wfr-bar"],
             "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT],
+            "Upstash-Telemetry-Platform": ["unknown"],
+            "Upstash-Telemetry-Runtime": ["unknown"],
+            "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.3"],
           },
           timeoutUrl: WORKFLOW_ENDPOINT,
           url: WORKFLOW_ENDPOINT,
@@ -905,4 +957,71 @@ describe("serve", () => {
       });
     });
   });
+
+  test("should forward client headers", async () => {
+    const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []);
+    let called = false;
+    const myFailureFunction: WorkflowServeOptions["failureFunction"] = async () => {
+      return;
+    };
+
+    const header = "test-header";
+    const headerValue = `test-header-value-${nanoid()}`;
+    const qstashClient = new Client({
+      baseUrl: MOCK_QSTASH_SERVER_URL,
+      token,
+      headers: {
+        [header]: headerValue,
+      },
+    });
+
+    const { handler: endpoint } = serve(
+      async (context) => {
+        await context.sleep("sleep-step", 1);
+      },
+      {
+        qstashClient,
+        receiver: undefined,
+        failureFunction: myFailureFunction,
+      }
+    );
+    await mockQStashServer({
+      execute: async () => {
+        const response = await endpoint(request);
+        expect(response.status).toBe(200);
+        called = true;
+      },
+      responseFields: { body: { messageId: "some-message-id" }, status: 200 },
+      receivesRequest: {
+        method: "POST",
+        url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
+        token,
+        body: [
+          {
+            destination: WORKFLOW_ENDPOINT,
+            headers: {
+              "content-type": "application/json",
+              "upstash-feature-set": "LazyFetch,InitialBody",
+              "upstash-delay": "1s",
+              "upstash-forward-upstash-workflow-sdk-version": "1",
+              "upstash-method": "POST",
+              "upstash-retries": "3",
+              "upstash-failure-callback-retries": "3",
+              "upstash-workflow-init": "false",
+              "upstash-workflow-runid": "wfr-bar",
+              "upstash-workflow-url": WORKFLOW_ENDPOINT,
+              "upstash-failure-callback": WORKFLOW_ENDPOINT,
+              "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
+              "upstash-forward-test-header": headerValue,
+              "upstash-telemetry-platform": "unknown",
+              "upstash-telemetry-runtime": "unknown",
+              "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
+            },
+            body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}',
+          },
+        ],
+      },
+    });
+    expect(called).toBeTrue();
+  });
 });
diff --git a/src/test-utils.ts b/src/test-utils.ts
index ad41af1..a8336a2 100644
--- a/src/test-utils.ts
+++ b/src/test-utils.ts
@@ -53,7 +53,7 @@ export const mockQStashServer = async ({
 
       if (!receivesRequest) {
         return new Response("assertion in mock QStash failed. fetch shouldn't have been called.", {
-          status: 400,
+          status: 500,
         });
       }
       const { method, url, token, body } = receivesRequest;
@@ -85,7 +85,7 @@ export const mockQStashServer = async ({
         if (error instanceof Error) {
           console.error(error);
           return new Response(`assertion in mock QStash failed.`, {
-            status: 400,
+            status: 500,
           });
         }
       }
@@ -120,18 +120,20 @@ export const driveWorkflow = async ({
   initialPayload,
   iterations,
 }: {
-  execute: (intialPayload: unknown, steps: Step[]) => Promise<void>;
+  execute: (intialPayload: unknown, steps: Step[], first: boolean) => Promise<void>;
   initialPayload: unknown;
   iterations: IterationTape;
 }) => {
   const steps: Step[] = [];
+  let counter = 0;
   for (const { stepsToAdd, responseFields, receivesRequest } of iterations) {
     steps.push(...stepsToAdd);
     await mockQStashServer({
-      execute: async () => execute(initialPayload, steps),
+      execute: async () => execute(initialPayload, steps, counter === 0),
       responseFields,
       receivesRequest,
     });
+    counter++;
   }
 };
 
diff --git a/src/types.ts b/src/types.ts
index 027f060..c69350d 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -229,6 +229,21 @@ export type WorkflowServeOptions<
   useJSONContent?: boolean;
 };
 
+export type Telemetry = {
+  /**
+   * sdk version
+   */
+  sdk: string;
+  /**
+   * platform (such as nextjs/cloudflare)
+   */
+  platform: string;
+  /**
+   * node version
+   */
+  runtime?: string;
+};
+
 export type PublicServeOptions<
   TInitialPayload = unknown,
   TResponse extends Response = Response,
@@ -337,3 +352,70 @@ export type CallSettings<TBody = unknown> = {
   retries?: number;
   timeout?: Duration | number;
 };
+
+export type HeaderParams = {
+  /**
+   * whether the request is a first invocation request.
+   */
+  initHeaderValue: "true" | "false";
+  /**
+   * run id of the workflow
+   */
+  workflowRunId: string;
+  /**
+   * url where the workflow is hosted
+   */
+  workflowUrl: string;
+  /**
+   * user headers which will be forwarded in the request
+   */
+  userHeaders?: Headers;
+  /**
+   * failure url to call incase of failure
+   */
+  failureUrl?: WorkflowServeOptions["failureUrl"];
+  /**
+   * retry setting of requests except context.call
+   */
+  retries?: number;
+  /**
+   * telemetry to include in timeoutHeaders.
+   *
+   * Only needed/used when the step is a waitForEvent step
+   */
+  telemetry?: Telemetry;
+} & (
+  | {
+      /**
+       * step to generate headers for
+       */
+      step: Step;
+      /**
+       * number of retries in context.call
+       */
+      callRetries?: number;
+      /**
+       * timeout duration in context.call
+       */
+      callTimeout?: number | Duration;
+    }
+  | {
+      /**
+       * step not passed. Either first invocation or simply getting headers for
+       * third party callack.
+       */
+      step?: never;
+      /**
+       * number of retries in context.call
+       *
+       * set to never because this is not a context.call step
+       */
+      callRetries?: never;
+      /**
+       * timeout duration in context.call
+       *
+       * set to never because this is not a context.call step
+       */
+      callTimeout?: never;
+    }
+);
diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts
index 9596ff0..52a1e11 100644
--- a/src/workflow-parser.ts
+++ b/src/workflow-parser.ts
@@ -354,6 +354,7 @@ export const handleFailure = async <TInitialPayload>(
       debug,
       env,
       retries,
+      telemetry: undefined, // not going to make requests in authentication check
     });
 
     // attempt running routeFunction until the first step
diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts
index e0d7fa3..4cdd17c 100644
--- a/src/workflow-requests.test.ts
+++ b/src/workflow-requests.test.ts
@@ -46,11 +46,12 @@ describe("Workflow Requests", () => {
       headers: new Headers({}) as Headers,
       steps: [],
       url: WORKFLOW_ENDPOINT,
+      retries: 0,
     });
 
     await mockQStashServer({
       execute: async () => {
-        const result = await triggerFirstInvocation(context, 0);
+        const result = await triggerFirstInvocation({ workflowContext: context });
         expect(result.isOk()).toBeTrue();
       },
       responseFields: {
@@ -270,14 +271,18 @@ describe("Workflow Requests", () => {
       // create mock server and run the code
       await mockQStashServer({
         execute: async () => {
-          const result = await handleThirdPartyCallResult(
+          const result = await handleThirdPartyCallResult({
             request,
-            await request.text(),
+            requestPayload: await request.text(),
             client,
-            WORKFLOW_ENDPOINT,
-            WORKFLOW_ENDPOINT,
-            2
-          );
+            workflowUrl: WORKFLOW_ENDPOINT,
+            failureUrl: WORKFLOW_ENDPOINT,
+            retries: 2,
+            telemetry: {
+              platform: "some-platform",
+              sdk: "some-sdk",
+            },
+          });
           expect(result.isOk()).toBeTrue();
           // @ts-expect-error value will be set since stepFinish isOk
           expect(result.value).toBe("is-call-return");
@@ -343,14 +348,18 @@ describe("Workflow Requests", () => {
       });
 
       const spy = spyOn(client, "publishJSON");
-      const result = await handleThirdPartyCallResult(
+      const result = await handleThirdPartyCallResult({
         request,
-        await request.text(),
+        requestPayload: await request.text(),
         client,
-        WORKFLOW_ENDPOINT,
-        WORKFLOW_ENDPOINT,
-        3
-      );
+        workflowUrl: WORKFLOW_ENDPOINT,
+        failureUrl: WORKFLOW_ENDPOINT,
+        retries: 3,
+        telemetry: {
+          platform: "some-platform",
+          sdk: "some-sdk",
+        },
+      });
       expect(result.isOk()).toBeTrue();
       // @ts-expect-error value will be set since stepFinish isOk
       expect(result.value).toBe("call-will-retry");
@@ -393,28 +402,36 @@ describe("Workflow Requests", () => {
       });
 
       const spy = spyOn(client, "publishJSON");
-      const initialResult = await handleThirdPartyCallResult(
-        initialRequest,
-        await initialRequest.text(),
+      const initialResult = await handleThirdPartyCallResult({
+        request: initialRequest,
+        requestPayload: await initialRequest.text(),
         client,
-        WORKFLOW_ENDPOINT,
-        WORKFLOW_ENDPOINT,
-        5
-      );
+        workflowUrl: WORKFLOW_ENDPOINT,
+        failureUrl: WORKFLOW_ENDPOINT,
+        retries: 5,
+        telemetry: {
+          platform: "some-platform",
+          sdk: "some-sdk",
+        },
+      });
       expect(initialResult.isOk()).toBeTrue();
       // @ts-expect-error value will be set since stepFinish isOk
       expect(initialResult.value).toBe("continue-workflow");
       expect(spy).toHaveBeenCalledTimes(0);
 
       // second call
-      const result = await handleThirdPartyCallResult(
-        workflowRequest,
-        await workflowRequest.text(),
+      const result = await handleThirdPartyCallResult({
+        request: workflowRequest,
+        requestPayload: await workflowRequest.text(),
         client,
-        WORKFLOW_ENDPOINT,
-        WORKFLOW_ENDPOINT,
-        0
-      );
+        workflowUrl: WORKFLOW_ENDPOINT,
+        failureUrl: WORKFLOW_ENDPOINT,
+        retries: 0,
+        telemetry: {
+          platform: "some-platform",
+          sdk: "some-sdk",
+        },
+      });
       expect(result.isOk()).toBeTrue();
       // @ts-expect-error value will be set since stepFinish isOk
       expect(result.value).toBe("continue-workflow");
@@ -425,7 +442,11 @@ describe("Workflow Requests", () => {
   describe("getHeaders", () => {
     const workflowRunId = nanoid();
     test("should create headers without step passed", () => {
-      const { headers, timeoutHeaders } = getHeaders("true", workflowRunId, WORKFLOW_ENDPOINT);
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "true",
+        workflowRunId,
+        workflowUrl: WORKFLOW_ENDPOINT,
+      });
       expect(headers).toEqual({
         [WORKFLOW_INIT_HEADER]: "true",
         [WORKFLOW_ID_HEADER]: workflowRunId,
@@ -441,18 +462,17 @@ describe("Workflow Requests", () => {
       const stepName = "some step";
       const stepType: StepType = "Run";
 
-      const { headers, timeoutHeaders } = getHeaders(
-        "false",
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "false",
         workflowRunId,
-        WORKFLOW_ENDPOINT,
-        undefined,
-        {
+        workflowUrl: WORKFLOW_ENDPOINT,
+        step: {
           stepId,
           stepName,
           stepType: stepType,
           concurrent: 1,
-        }
-      );
+        },
+      });
       expect(headers).toEqual({
         [WORKFLOW_INIT_HEADER]: "false",
         [WORKFLOW_ID_HEADER]: workflowRunId,
@@ -474,12 +494,11 @@ describe("Workflow Requests", () => {
       };
       const callBody = undefined;
 
-      const { headers, timeoutHeaders } = getHeaders(
-        "false",
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "false",
         workflowRunId,
-        WORKFLOW_ENDPOINT,
-        undefined,
-        {
+        workflowUrl: WORKFLOW_ENDPOINT,
+        step: {
           stepId,
           stepName,
           stepType: stepType,
@@ -488,8 +507,8 @@ describe("Workflow Requests", () => {
           callMethod,
           callHeaders,
           callBody,
-        }
-      );
+        },
+      });
       expect(headers).toEqual({
         [WORKFLOW_INIT_HEADER]: "false",
         [WORKFLOW_ID_HEADER]: workflowRunId,
@@ -516,14 +535,13 @@ describe("Workflow Requests", () => {
 
     test("should include failure header", () => {
       const failureUrl = "https://my-failure-endpoint.com";
-      const { headers, timeoutHeaders } = getHeaders(
-        "true",
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "true",
         workflowRunId,
-        WORKFLOW_ENDPOINT,
-        new Headers() as Headers,
-        undefined,
-        failureUrl
-      );
+        workflowUrl: WORKFLOW_ENDPOINT,
+        userHeaders: new Headers() as Headers,
+        failureUrl,
+      });
       expect(headers).toEqual({
         [WORKFLOW_INIT_HEADER]: "true",
         [WORKFLOW_ID_HEADER]: workflowRunId,
@@ -537,20 +555,19 @@ describe("Workflow Requests", () => {
     });
 
     test("should return timeout headers for wait step", () => {
-      const { headers, timeoutHeaders } = getHeaders(
-        "false",
+      const { headers, timeoutHeaders } = getHeaders({
+        initHeaderValue: "false",
         workflowRunId,
-        WORKFLOW_ENDPOINT,
-        undefined,
-        {
+        workflowUrl: WORKFLOW_ENDPOINT,
+        step: {
           stepId: 1,
           stepName: "waiting-step-name",
           stepType: "Wait",
           concurrent: 1,
           waitEventId: "wait event id",
           timeout: "20s",
-        }
-      );
+        },
+      });
       expect(headers).toEqual({
         "Upstash-Workflow-Init": "false",
         "Upstash-Workflow-RunId": workflowRunId,
@@ -596,7 +613,7 @@ describe("Workflow Requests", () => {
           url: WORKFLOW_ENDPOINT,
         });
 
-        await triggerFirstInvocation(context, 3);
+        await triggerFirstInvocation({ workflowContext: context });
         const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
         const spy = spyOn(debug, "log");
 
@@ -639,7 +656,11 @@ describe("Workflow Requests", () => {
         const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
         const spy = spyOn(debug, "log");
 
-        await triggerFirstInvocation(context, 3, false, debug);
+        await triggerFirstInvocation({
+          workflowContext: context,
+          useJSONContent: false,
+          debug,
+        });
         expect(spy).toHaveBeenCalledTimes(1);
 
         await workflowClient.cancel({ ids: [workflowRunId] });
@@ -690,7 +711,11 @@ describe("Workflow Requests", () => {
         const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
         const spy = spyOn(debug, "log");
 
-        await triggerFirstInvocation(context, 3, false, debug);
+        await triggerFirstInvocation({
+          workflowContext: context,
+          useJSONContent: false,
+          debug,
+        });
         expect(spy).toHaveBeenCalledTimes(1);
 
         await workflowClient.cancel({ ids: [workflowRunId] });
@@ -740,14 +765,31 @@ describe("Workflow Requests", () => {
         const debug = new WorkflowLogger({ logLevel: "INFO", logOutput: "console" });
         const spy = spyOn(debug, "log");
 
-        const resultOne = await triggerFirstInvocation(context, 3, false, debug);
+        const resultOne = await triggerFirstInvocation({
+          workflowContext: context,
+          useJSONContent: false,
+          debug,
+        });
         expect(resultOne.isOk()).toBeTrue();
         // @ts-expect-error value will exist because of isOk
         expect(resultOne.value).toBe("success");
 
         expect(spy).toHaveBeenCalledTimes(1);
 
-        const resultTwo = await triggerFirstInvocation(context, 0, false, debug);
+        const noRetryContext = new WorkflowContext({
+          qstashClient,
+          workflowRunId: workflowRunId,
+          initialPayload: undefined,
+          headers: new Headers({}) as Headers,
+          steps: [],
+          url: WORKFLOW_ENDPOINT,
+          retries: 0,
+        });
+        const resultTwo = await triggerFirstInvocation({
+          workflowContext: noRetryContext,
+          useJSONContent: false,
+          debug,
+        });
         expect(resultTwo.isOk()).toBeTrue();
         // @ts-expect-error value will exist because of isOk
         expect(resultTwo.value).toBe("workflow-run-already-exists");
@@ -771,6 +813,9 @@ describe("Workflow Requests", () => {
 
         const deleteResult = await triggerWorkflowDelete(context, debug);
         expect(deleteResult).toEqual({ deleted: true });
+
+        const deleteResultSecond = await triggerWorkflowDelete(noRetryContext, debug);
+        expect(deleteResultSecond).toEqual({ deleted: false });
       },
       {
         timeout: 10000,
diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts
index 3f91661..2cd1e12 100644
--- a/src/workflow-requests.ts
+++ b/src/workflow-requests.ts
@@ -4,6 +4,9 @@ import { WorkflowAbort, WorkflowError } from "./error";
 import type { WorkflowContext } from "./context";
 import {
   DEFAULT_CONTENT_TYPE,
+  TELEMETRY_HEADER_PLATFORM,
+  TELEMETRY_HEADER_RUNTIME,
+  TELEMETRY_HEADER_SDK,
   WORKFLOW_FAILURE_HEADER,
   WORKFLOW_FEATURE_HEADER,
   WORKFLOW_ID_HEADER,
@@ -14,9 +17,10 @@ import {
 } from "./constants";
 import type {
   CallResponse,
-  Duration,
+  HeaderParams,
   Step,
   StepType,
+  Telemetry,
   WorkflowClient,
   WorkflowReceiver,
   WorkflowServeOptions,
@@ -26,21 +30,26 @@ import type { WorkflowLogger } from "./logger";
 import { QstashError } from "@upstash/qstash";
 import { getSteps } from "./client/utils";
 
-export const triggerFirstInvocation = async <TInitialPayload>(
-  workflowContext: WorkflowContext<TInitialPayload>,
-  retries: number,
-  useJSONContent?: boolean,
-  debug?: WorkflowLogger
-): Promise<Ok<"success" | "workflow-run-already-exists", never> | Err<never, Error>> => {
-  const { headers } = getHeaders(
-    "true",
-    workflowContext.workflowRunId,
-    workflowContext.url,
-    workflowContext.headers,
-    undefined,
-    workflowContext.failureUrl,
-    retries
-  );
+export const triggerFirstInvocation = async <TInitialPayload>({
+  workflowContext,
+  useJSONContent,
+  telemetry,
+  debug,
+}: {
+  workflowContext: WorkflowContext<TInitialPayload>;
+  useJSONContent?: boolean;
+  telemetry?: Telemetry;
+  debug?: WorkflowLogger;
+}): Promise<Ok<"success" | "workflow-run-already-exists", never> | Err<never, Error>> => {
+  const { headers } = getHeaders({
+    initHeaderValue: "true",
+    workflowRunId: workflowContext.workflowRunId,
+    workflowUrl: workflowContext.url,
+    userHeaders: workflowContext.headers,
+    failureUrl: workflowContext.failureUrl,
+    retries: workflowContext.retries,
+    telemetry,
+  });
 
   if (useJSONContent) {
     headers["content-type"] = "application/json";
@@ -207,15 +216,25 @@ export const recreateUserHeaders = (headers: Headers): Headers => {
  * @param client QStash client
  * @returns
  */
-export const handleThirdPartyCallResult = async (
-  request: Request,
-  requestPayload: string,
-  client: WorkflowClient,
-  workflowUrl: string,
-  failureUrl: WorkflowServeOptions["failureUrl"],
-  retries: number,
-  debug?: WorkflowLogger
-): Promise<
+export const handleThirdPartyCallResult = async ({
+  request,
+  requestPayload,
+  client,
+  workflowUrl,
+  failureUrl,
+  retries,
+  telemetry,
+  debug,
+}: {
+  request: Request;
+  requestPayload: string;
+  client: WorkflowClient;
+  workflowUrl: string;
+  failureUrl: WorkflowServeOptions["failureUrl"];
+  retries: number;
+  telemetry: Telemetry;
+  debug?: WorkflowLogger;
+}): Promise<
   | Ok<"is-call-return" | "continue-workflow" | "call-will-retry" | "workflow-ended", never>
   | Err<never, Error>
 > => {
@@ -311,15 +330,15 @@ export const handleThirdPartyCallResult = async (
       }
 
       const userHeaders = recreateUserHeaders(request.headers as Headers);
-      const { headers: requestHeaders } = getHeaders(
-        "false",
+      const { headers: requestHeaders } = getHeaders({
+        initHeaderValue: "false",
         workflowRunId,
         workflowUrl,
         userHeaders,
-        undefined,
         failureUrl,
-        retries
-      );
+        retries,
+        telemetry,
+      });
 
       const callResponse: CallResponse = {
         status: callbackMessage.status,
@@ -368,32 +387,39 @@ export type HeadersResponse = {
   timeoutHeaders?: Record<string, string[]>;
 };
 
+export const getTelemetryHeaders = (telemetry: Telemetry) => {
+  return {
+    [TELEMETRY_HEADER_SDK]: telemetry.sdk,
+    [TELEMETRY_HEADER_PLATFORM]: telemetry.platform,
+    [TELEMETRY_HEADER_RUNTIME]: telemetry.runtime ?? "unknown",
+  };
+};
+
 /**
  * Gets headers for calling QStash
  *
- * @param initHeaderValue Whether the invocation should create a new workflow
- * @param workflowRunId id of the workflow
- * @param workflowUrl url of the workflow endpoint
- * @param step step to get headers for. If the step is a third party call step, more
- *       headers are added.
+ * See HeaderParams for more details about parameters.
+ *
  * @returns headers to submit
  */
-export const getHeaders = (
-  initHeaderValue: "true" | "false",
-  workflowRunId: string,
-  workflowUrl: string,
-  userHeaders?: Headers,
-  step?: Step,
-  failureUrl?: WorkflowServeOptions["failureUrl"],
-  retries?: number,
-  callRetries?: number,
-  callTimeout?: number | Duration
-): HeadersResponse => {
+export const getHeaders = ({
+  initHeaderValue,
+  workflowRunId,
+  workflowUrl,
+  userHeaders,
+  failureUrl,
+  retries,
+  step,
+  callRetries,
+  callTimeout,
+  telemetry,
+}: HeaderParams): HeadersResponse => {
   const baseHeaders: Record<string, string> = {
     [WORKFLOW_INIT_HEADER]: initHeaderValue,
     [WORKFLOW_ID_HEADER]: workflowRunId,
     [WORKFLOW_URL_HEADER]: workflowUrl,
     [WORKFLOW_FEATURE_HEADER]: "LazyFetch,InitialBody",
+    ...(telemetry ? getTelemetryHeaders(telemetry) : {}),
   };
 
   if (!step?.callUrl) {
@@ -483,6 +509,15 @@ export const getHeaders = (
         ...Object.fromEntries(
           Object.entries(baseHeaders).map(([header, value]) => [header, [value]])
         ),
+        // to include telemetry headers:
+        ...(telemetry
+          ? Object.fromEntries(
+              Object.entries(getTelemetryHeaders(telemetry)).map(([header, value]) => [
+                header,
+                [value],
+              ])
+            )
+          : {}),
         // note: using WORKFLOW_ID_HEADER doesn't work, because Runid -> RunId:
         "Upstash-Workflow-Runid": [workflowRunId],
         [WORKFLOW_INIT_HEADER]: ["false"],

From 8826bbd75cd3c9cb79ce677d3b15da3163fb1e7d Mon Sep 17 00:00:00 2001
From: CahidArda <cahidardaooz@hotmail.com>
Date: Mon, 23 Dec 2024 21:28:55 +0300
Subject: [PATCH 3/4] fix: rm process from hono

---
 platforms/hono.ts | 1 -
 1 file changed, 1 deletion(-)

diff --git a/platforms/hono.ts b/platforms/hono.ts
index cb7921e..7ec3fa6 100644
--- a/platforms/hono.ts
+++ b/platforms/hono.ts
@@ -38,7 +38,6 @@ export const serve = <
       {
         sdk: SDK_TELEMETRY,
         platform: "hono",
-        runtime: `node@${process.version}`,
       },
       {
         // when hono is used without cf workers, it sends a DebugHTTPServer

From d8a5e1d04fdeb62a842ed264471e157617152a90 Mon Sep 17 00:00:00 2001
From: CahidArda <cahidardaooz@hotmail.com>
Date: Mon, 30 Dec 2024 13:31:34 +0300
Subject: [PATCH 4/4] fix: add disable telemetry option and change platform to
 telemetry

---
 platforms/astro.ts            |  6 ++++--
 platforms/cloudflare.ts       |  2 +-
 platforms/express.ts          |  6 ++++--
 platforms/h3.ts               |  6 ++++--
 platforms/hono.ts             |  2 +-
 platforms/nextjs.ts           |  8 +++++---
 platforms/solidjs.ts          |  6 ++++--
 platforms/svelte.ts           |  2 +-
 src/constants.ts              |  4 ++--
 src/serve/index.ts            |  6 ++++--
 src/serve/options.ts          |  1 +
 src/serve/serve.test.ts       | 22 +++++++---------------
 src/types.ts                  |  8 +++++++-
 src/workflow-requests.test.ts |  8 ++++----
 src/workflow-requests.ts      |  6 +++---
 15 files changed, 52 insertions(+), 41 deletions(-)

diff --git a/platforms/astro.ts b/platforms/astro.ts
index 871fbb8..4fd3bbc 100644
--- a/platforms/astro.ts
+++ b/platforms/astro.ts
@@ -16,8 +16,10 @@ export function serve<TInitialPayload = unknown>(
       (workflowContext) => routeFunction(workflowContext, apiContext),
       {
         sdk: SDK_TELEMETRY,
-        platform: "astro",
-        runtime: `node@${process.version}`,
+        framework: "astro",
+        runtime: process.versions.bun
+          ? `bun@${process.versions.bun}/node@${process.version}`
+          : `node@${process.version}`,
       },
       options
     );
diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts
index 2bda636..9061bbf 100644
--- a/platforms/cloudflare.ts
+++ b/platforms/cloudflare.ts
@@ -67,7 +67,7 @@ export const serve = <TInitialPayload = unknown>(
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "cloudflare",
+        framework: "cloudflare",
       },
       {
         env,
diff --git a/platforms/express.ts b/platforms/express.ts
index c6ada83..a7a3781 100644
--- a/platforms/express.ts
+++ b/platforms/express.ts
@@ -47,8 +47,10 @@ export function serve<TInitialPayload = unknown>(
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "express",
-        runtime: `node@${process.version}`,
+        framework: "express",
+        runtime: process.versions.bun
+          ? `bun@${process.versions.bun}/node@${process.version}`
+          : `node@${process.version}`,
       },
       {
         ...options,
diff --git a/platforms/h3.ts b/platforms/h3.ts
index 54e0dd4..ec0b570 100644
--- a/platforms/h3.ts
+++ b/platforms/h3.ts
@@ -42,8 +42,10 @@ export const serve = <TInitialPayload = unknown>(
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "h3",
-        runtime: `node@${process.version}`,
+        framework: "h3",
+        runtime: process.versions.bun
+          ? `bun@${process.versions.bun}/node@${process.version}`
+          : `node@${process.version}`,
       },
       options
     );
diff --git a/platforms/hono.ts b/platforms/hono.ts
index 7ec3fa6..a0ab5bd 100644
--- a/platforms/hono.ts
+++ b/platforms/hono.ts
@@ -37,7 +37,7 @@ export const serve = <
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "hono",
+        framework: "hono",
       },
       {
         // when hono is used without cf workers, it sends a DebugHTTPServer
diff --git a/platforms/nextjs.ts b/platforms/nextjs.ts
index d3b3e24..8869cb3 100644
--- a/platforms/nextjs.ts
+++ b/platforms/nextjs.ts
@@ -23,7 +23,7 @@ export const serve = <TInitialPayload = unknown>(
     routeFunction,
     {
       sdk: SDK_TELEMETRY,
-      platform: "nextjs",
+      framework: "nextjs",
       runtime: `node@${process.version}`,
     },
     options
@@ -44,8 +44,10 @@ export const servePagesRouter = <TInitialPayload = unknown>(
     routeFunction,
     {
       sdk: SDK_TELEMETRY,
-      platform: "nextjs-pages",
-      runtime: `node@${process.version}`,
+      framework: "nextjs-pages",
+      runtime: process.versions.bun
+        ? `bun@${process.versions.bun}/node@${process.version}`
+        : `node@${process.version}`,
     },
     options
   );
diff --git a/platforms/solidjs.ts b/platforms/solidjs.ts
index d8cd22b..6382fc8 100644
--- a/platforms/solidjs.ts
+++ b/platforms/solidjs.ts
@@ -33,8 +33,10 @@ export const serve = <TInitialPayload = unknown>(
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "solidjs",
-        runtime: `node@${process.version}`,
+        framework: "solidjs",
+        runtime: process.versions.bun
+          ? `bun@${process.versions.bun}/node@${process.version}`
+          : `node@${process.version}`,
       },
       options
     );
diff --git a/platforms/svelte.ts b/platforms/svelte.ts
index 157b1b3..8490336 100644
--- a/platforms/svelte.ts
+++ b/platforms/svelte.ts
@@ -24,7 +24,7 @@ export const serve = <TInitialPayload = unknown>(
       routeFunction,
       {
         sdk: SDK_TELEMETRY,
-        platform: "svelte",
+        framework: "svelte",
       },
       {
         ...options,
diff --git a/src/constants.ts b/src/constants.ts
index 8ce66b6..7b8e1d4 100644
--- a/src/constants.ts
+++ b/src/constants.ts
@@ -19,10 +19,10 @@ export const VERSION = "v0.2.3";
 export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;
 
 export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
-export const TELEMETRY_HEADER_PLATFORM = "Upstash-Telemetry-Platform" as const;
+export const TELEMETRY_HEADER_FRAMEWORK = "Upstash-Telemetry-Framework" as const;
 export const TELEMETRY_HEADER_RUNTIME = "Upstash-Telemetry-Runtime" as const;
 
 export const MOCK_TELEMETRY: Telemetry = {
-  platform: "mock",
+  framework: "mock",
   sdk: "mock",
 };
diff --git a/src/serve/index.ts b/src/serve/index.ts
index 818ec35..a544218 100644
--- a/src/serve/index.ts
+++ b/src/serve/index.ts
@@ -34,7 +34,7 @@ export const serveBase = <
   TResponse extends Response = Response,
 >(
   routeFunction: RouteFunction<TInitialPayload>,
-  telemetry: Telemetry,
+  telemetry?: Telemetry,
   options?: WorkflowServeOptions<TResponse, TInitialPayload>
 ): { handler: (request: TRequest) => Promise<TResponse> } => {
   // Prepares options with defaults if they are not provided.
@@ -51,7 +51,9 @@ export const serveBase = <
     env,
     retries,
     useJSONContent,
+    disableTelemetry,
   } = processOptions<TResponse, TInitialPayload>(options);
+  telemetry = disableTelemetry ? undefined : telemetry;
   const debug = WorkflowLogger.getLogger(verbose);
 
   /**
@@ -240,7 +242,7 @@ export const serve = <
     routeFunction,
     {
       sdk: SDK_TELEMETRY,
-      platform: "unknown",
+      framework: "unknown",
     },
     options
   );
diff --git a/src/serve/options.ts b/src/serve/options.ts
index a49dc43..855f96a 100644
--- a/src/serve/options.ts
+++ b/src/serve/options.ts
@@ -82,6 +82,7 @@ export const processOptions = <TResponse extends Response = Response, TInitialPa
     env: environment,
     retries: DEFAULT_RETRIES,
     useJSONContent: false,
+    disableTelemetry: false,
     ...options,
   };
 };
diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts
index 4b73f20..b6716da 100644
--- a/src/serve/serve.test.ts
+++ b/src/serve/serve.test.ts
@@ -92,6 +92,7 @@ describe("serve", () => {
         qstashClient,
         verbose: true,
         receiver: undefined,
+        disableTelemetry: true,
       }
     );
 
@@ -146,9 +147,6 @@ describe("serve", () => {
               "upstash-method": "POST",
               "upstash-workflow-init": "true",
               "upstash-workflow-url": WORKFLOW_ENDPOINT,
-              "upstash-telemetry-platform": "unknown",
-              "upstash-telemetry-runtime": "unknown",
-              "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
             },
           },
         },
@@ -176,9 +174,6 @@ describe("serve", () => {
                   "upstash-workflow-runid": workflowRunId,
                   "upstash-workflow-init": "false",
                   "upstash-workflow-url": WORKFLOW_ENDPOINT,
-                  "upstash-telemetry-platform": "unknown",
-                  "upstash-telemetry-runtime": "unknown",
-                  "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
                 },
               },
             ],
@@ -207,9 +202,6 @@ describe("serve", () => {
                   "upstash-workflow-runid": workflowRunId,
                   "upstash-workflow-init": "false",
                   "upstash-workflow-url": WORKFLOW_ENDPOINT,
-                  "upstash-telemetry-platform": "unknown",
-                  "upstash-telemetry-runtime": "unknown",
-                  "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
                 },
                 body: JSON.stringify(steps[1]),
               },
@@ -386,7 +378,7 @@ describe("serve", () => {
                 "upstash-workflow-init": "false",
                 "upstash-workflow-runid": "wfr-foo",
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
-                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-framework": "unknown",
                 "upstash-telemetry-runtime": "unknown",
                 "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
@@ -436,7 +428,7 @@ describe("serve", () => {
                 "upstash-workflow-init": "false",
                 "upstash-workflow-runid": "wfr-bar",
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
-                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-framework": "unknown",
                 "upstash-telemetry-runtime": "unknown",
                 "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
@@ -484,7 +476,7 @@ describe("serve", () => {
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback": myFailureEndpoint,
                 "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
-                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-framework": "unknown",
                 "upstash-telemetry-runtime": "unknown",
                 "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
@@ -534,7 +526,7 @@ describe("serve", () => {
                 "upstash-workflow-url": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback": WORKFLOW_ENDPOINT,
                 "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
-                "upstash-telemetry-platform": "unknown",
+                "upstash-telemetry-framework": "unknown",
                 "upstash-telemetry-runtime": "unknown",
                 "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
               },
@@ -735,7 +727,7 @@ describe("serve", () => {
             "Upstash-Workflow-RunId": ["wfr-bar"],
             "Upstash-Workflow-Runid": ["wfr-bar"],
             "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT],
-            "Upstash-Telemetry-Platform": ["unknown"],
+            "Upstash-Telemetry-Framework": ["unknown"],
             "Upstash-Telemetry-Runtime": ["unknown"],
             "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.3"],
           },
@@ -1013,7 +1005,7 @@ describe("serve", () => {
               "upstash-failure-callback": WORKFLOW_ENDPOINT,
               "upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
               "upstash-forward-test-header": headerValue,
-              "upstash-telemetry-platform": "unknown",
+              "upstash-telemetry-framework": "unknown",
               "upstash-telemetry-runtime": "unknown",
               "upstash-telemetry-sdk": "@upstash/workflow@v0.2.3",
             },
diff --git a/src/types.ts b/src/types.ts
index c69350d..8c11a39 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -227,6 +227,12 @@ export type WorkflowServeOptions<
    * Not part of the public API. Only available in serveBase, which is not exported.
    */
   useJSONContent?: boolean;
+  /**
+   * By default, Workflow SDK sends telemetry about SDK version, framework or runtime.
+   *
+   * Set `disableTelemetry` to disable this behavior.
+   */
+  disableTelemetry?: boolean;
 };
 
 export type Telemetry = {
@@ -237,7 +243,7 @@ export type Telemetry = {
   /**
    * platform (such as nextjs/cloudflare)
    */
-  platform: string;
+  framework: string;
   /**
    * node version
    */
diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts
index 4cdd17c..4735d58 100644
--- a/src/workflow-requests.test.ts
+++ b/src/workflow-requests.test.ts
@@ -279,7 +279,7 @@ describe("Workflow Requests", () => {
             failureUrl: WORKFLOW_ENDPOINT,
             retries: 2,
             telemetry: {
-              platform: "some-platform",
+              framework: "some-platform",
               sdk: "some-sdk",
             },
           });
@@ -356,7 +356,7 @@ describe("Workflow Requests", () => {
         failureUrl: WORKFLOW_ENDPOINT,
         retries: 3,
         telemetry: {
-          platform: "some-platform",
+          framework: "some-platform",
           sdk: "some-sdk",
         },
       });
@@ -410,7 +410,7 @@ describe("Workflow Requests", () => {
         failureUrl: WORKFLOW_ENDPOINT,
         retries: 5,
         telemetry: {
-          platform: "some-platform",
+          framework: "some-platform",
           sdk: "some-sdk",
         },
       });
@@ -428,7 +428,7 @@ describe("Workflow Requests", () => {
         failureUrl: WORKFLOW_ENDPOINT,
         retries: 0,
         telemetry: {
-          platform: "some-platform",
+          framework: "some-platform",
           sdk: "some-sdk",
         },
       });
diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts
index 2cd1e12..7ac3341 100644
--- a/src/workflow-requests.ts
+++ b/src/workflow-requests.ts
@@ -4,7 +4,7 @@ import { WorkflowAbort, WorkflowError } from "./error";
 import type { WorkflowContext } from "./context";
 import {
   DEFAULT_CONTENT_TYPE,
-  TELEMETRY_HEADER_PLATFORM,
+  TELEMETRY_HEADER_FRAMEWORK,
   TELEMETRY_HEADER_RUNTIME,
   TELEMETRY_HEADER_SDK,
   WORKFLOW_FAILURE_HEADER,
@@ -232,7 +232,7 @@ export const handleThirdPartyCallResult = async ({
   workflowUrl: string;
   failureUrl: WorkflowServeOptions["failureUrl"];
   retries: number;
-  telemetry: Telemetry;
+  telemetry?: Telemetry;
   debug?: WorkflowLogger;
 }): Promise<
   | Ok<"is-call-return" | "continue-workflow" | "call-will-retry" | "workflow-ended", never>
@@ -390,7 +390,7 @@ export type HeadersResponse = {
 export const getTelemetryHeaders = (telemetry: Telemetry) => {
   return {
     [TELEMETRY_HEADER_SDK]: telemetry.sdk,
-    [TELEMETRY_HEADER_PLATFORM]: telemetry.platform,
+    [TELEMETRY_HEADER_FRAMEWORK]: telemetry.framework,
     [TELEMETRY_HEADER_RUNTIME]: telemetry.runtime ?? "unknown",
   };
 };