Skip to content

Commit

Permalink
feat: add workflow telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Dec 23, 2024
1 parent 5addd9f commit 4c4ec7b
Show file tree
Hide file tree
Showing 19 changed files with 565 additions and 167 deletions.
8 changes: 7 additions & 1 deletion platforms/astro.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { APIContext, APIRoute } from "astro";

import type { PublicServeOptions, WorkflowContext } from "../src";
import { PublicServeOptions, WorkflowContext } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

export function serve<TInitialPayload = unknown>(
routeFunction: (
Expand All @@ -13,6 +14,11 @@ export function serve<TInitialPayload = unknown>(
const POST: APIRoute = (apiContext) => {
const { handler } = serveBase<TInitialPayload>(
(workflowContext) => routeFunction(workflowContext, apiContext),
{
sdk: SDK_TELEMETRY,
platform: "astro",
runtime: `node@${process.version}`,
},
options
);

Expand Down
16 changes: 12 additions & 4 deletions platforms/cloudflare.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { PublicServeOptions, RouteFunction } from "../src";
import { SDK_TELEMETRY } from "../src/constants";
import { serveBase } from "../src/serve";

export type WorkflowBindings = {
Expand Down Expand Up @@ -62,10 +63,17 @@ export const serve = <TInitialPayload = unknown>(
): { fetch: (...args: PagesHandlerArgs | WorkersHandlerArgs) => Promise<Response> } => {
const fetch = async (...args: PagesHandlerArgs | WorkersHandlerArgs) => {
const { request, env } = getArgs(args);
const { handler: serveHandler } = serveBase(routeFunction, {
env,
...options,
});
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "cloudflare",
},
{
env,
...options,
}
);
return await serveHandler(request);
};
return { fetch };
Expand Down
8 changes: 7 additions & 1 deletion platforms/express.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { WorkflowServeOptions, RouteFunction } from "../src";
import { SDK_TELEMETRY } from "../src/constants";
import { serveBase } from "../src/serve";
import {
Request as ExpressRequest,
Expand Down Expand Up @@ -43,7 +44,12 @@ export function serve<TInitialPayload = unknown>(

// create handler
const { handler: serveHandler } = serveBase<TInitialPayload>(
(workflowContext) => routeFunction(workflowContext),
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "express",
runtime: `node@${process.version}`,
},
{
...options,
useJSONContent: true,
Expand Down
11 changes: 10 additions & 1 deletion platforms/h3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { defineEventHandler, readRawBody } from "h3";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import type { IncomingHttpHeaders } from "node:http";
import { SDK_TELEMETRY } from "../src/constants";

function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {
const formattedHeaders = Object.entries(headers).map(([key, value]) => [
Expand Down Expand Up @@ -37,7 +38,15 @@ export const serve = <TInitialPayload = unknown>(
method: "POST",
});

const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "h3",
runtime: `node@${process.version}`,
},
options
);
return await serveHandler(request);
});

Expand Down
21 changes: 15 additions & 6 deletions platforms/hono.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Context } from "hono";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { Variables } from "hono/types";
import { SDK_TELEMETRY } from "../src/constants";

export type WorkflowBindings = {
QSTASH_TOKEN: string;
Expand Down Expand Up @@ -32,12 +33,20 @@ export const serve = <
const environment = context.env;
const request = context.req.raw;

const { handler: serveHandler } = serveBase(routeFunction, {
// when hono is used without cf workers, it sends a DebugHTTPServer
// object in `context.env`. don't pass env if this is the case:
env: "QSTASH_TOKEN" in environment ? environment : undefined,
...options,
});
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "hono",
runtime: `node@${process.version}`,
},
{
// when hono is used without cf workers, it sends a DebugHTTPServer
// object in `context.env`. don't pass env if this is the case:
env: "QSTASH_TOKEN" in environment ? environment : undefined,
...options,
}
);
return await serveHandler(request);
};
return handler;
Expand Down
16 changes: 15 additions & 1 deletion platforms/nextjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { NextApiHandler, NextApiRequest, NextApiResponse } from "next";

import type { RouteFunction, PublicServeOptions } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -20,6 +21,11 @@ export const serve = <TInitialPayload = unknown>(
): { POST: (request: Request) => Promise<Response> } => {
const { handler: serveHandler } = serveBase<TInitialPayload, Request, Response>(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "nextjs",
runtime: `node@${process.version}`,
},
options
);

Expand All @@ -34,7 +40,15 @@ export const servePagesRouter = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: PublicServeOptions<TInitialPayload>
): { handler: NextApiHandler } => {
const { handler: serveHandler } = serveBase(routeFunction, options);
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "nextjs-pages",
runtime: `node@${process.version}`,
},
options
);

const handler = async (request_: NextApiRequest, res: NextApiResponse) => {
if (request_.method?.toUpperCase() !== "POST") {
Expand Down
11 changes: 10 additions & 1 deletion platforms/solidjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { APIEvent } from "@solidjs/start/server";

import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -28,7 +29,15 @@ export const serve = <TInitialPayload = unknown>(
}

// create serve handler
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "solidjs",
runtime: `node@${process.version}`,
},
options
);

return await serveHandler(event.request);
};
Expand Down
16 changes: 12 additions & 4 deletions platforms/svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { RequestHandler } from "@sveltejs/kit";

import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -19,10 +20,17 @@ export const serve = <TInitialPayload = unknown>(
}
): { POST: RequestHandler } => {
const handler: RequestHandler = async ({ request }) => {
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, {
...options,
useJSONContent: true,
});
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
platform: "svelte",
},
{
...options,
useJSONContent: true,
}
);
return await serveHandler(request);
};

Expand Down
8 changes: 6 additions & 2 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
import { getWorkflowRunId } from "../utils";
import { triggerFirstInvocation } from "../workflow-requests";
import { WorkflowContext } from "../context";
import { DEFAULT_RETRIES } from "../constants";

type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

Expand Down Expand Up @@ -214,8 +213,13 @@ export class Client {
steps: [],
url,
workflowRunId: finalWorkflowRunId,
retries,
telemetry: undefined, // can't know workflow telemetry here
});
const result = await triggerFirstInvocation({
workflowContext: context,
telemetry: undefined, // can't know workflow telemetry here
});
const result = await triggerFirstInvocation(context, retries ?? DEFAULT_RETRIES);
if (result.isOk()) {
return { workflowRunId: finalWorkflowRunId };
} else {
Expand Down
14 changes: 14 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Telemetry } from "./types";

export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
Expand All @@ -12,3 +14,15 @@ export const DEFAULT_CONTENT_TYPE = "application/json";
export const NO_CONCURRENCY = 1;
export const NOT_SET = "not-set";
export const DEFAULT_RETRIES = 3;

export const VERSION = "v0.2.3";
export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;

export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
export const TELEMETRY_HEADER_PLATFORM = "Upstash-Telemetry-Platform" as const;
export const TELEMETRY_HEADER_RUNTIME = "Upstash-Telemetry-Runtime" as const;

export const MOCK_TELEMETRY: Telemetry = {
platform: "mock",
sdk: "mock",
};
59 changes: 36 additions & 23 deletions src/context/auto-executor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowAbort, WorkflowError } from "../error";
import type { WorkflowContext } from "./context";
import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types";
import type { StepFunction, ParallelCallState, Step, WaitRequest, Telemetry } from "../types";
import { LazyCallStep, type BaseLazyStep } from "./steps";
import { getHeaders } from "../workflow-requests";
import type { WorkflowLogger } from "../logger";
Expand All @@ -15,15 +15,24 @@ export class AutoExecutor {
private readonly nonPlanStepCount: number;
private readonly steps: Step[];
private indexInCurrentList = 0;
private telemetry?: Telemetry;

public stepCount = 0;
public planStepCount = 0;

protected executingStep: string | false = false;

constructor(context: WorkflowContext, steps: Step[], debug?: WorkflowLogger) {
constructor(
context: WorkflowContext,
steps: Step[],
telemetry?: Telemetry,
debug?: WorkflowLogger
) {
this.context = context;
this.debug = debug;
this.steps = steps;
this.telemetry = telemetry;
this.debug = debug;

this.nonPlanStepCount = this.steps.filter((step) => !step.targetStep).length;
}

Expand Down Expand Up @@ -323,18 +332,21 @@ export class AutoExecutor {
steps,
});

// must check length to be 1, otherwise was the if would return
// true for plan steps.
if (steps[0].waitEventId && steps.length === 1) {
const waitStep = steps[0];

const { headers, timeoutHeaders } = getHeaders(
"false",
this.context.workflowRunId,
this.context.url,
this.context.headers,
waitStep,
this.context.failureUrl,
this.context.retries
);
const { headers, timeoutHeaders } = getHeaders({
initHeaderValue: "false",
workflowRunId: this.context.workflowRunId,
workflowUrl: this.context.url,
userHeaders: this.context.headers,
step: waitStep,
failureUrl: this.context.failureUrl,
retries: this.context.retries,
telemetry: this.telemetry,
});

// call wait
const waitBody: WaitRequest = {
Expand Down Expand Up @@ -366,17 +378,18 @@ export class AutoExecutor {
const result = await this.context.qstashClient.batchJSON(
steps.map((singleStep, index) => {
const lazyStep = lazySteps[index];
const { headers } = getHeaders(
"false",
this.context.workflowRunId,
this.context.url,
this.context.headers,
singleStep,
this.context.failureUrl,
this.context.retries,
lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined
);
const { headers } = getHeaders({
initHeaderValue: "false",
workflowRunId: this.context.workflowRunId,
workflowUrl: this.context.url,
userHeaders: this.context.headers,
step: singleStep,
failureUrl: this.context.failureUrl,
retries: this.context.retries,
callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined,
telemetry: this.telemetry,
});

// if the step is a single step execution or a plan step, we can add sleep headers
const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0;
Expand Down
5 changes: 4 additions & 1 deletion src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
CallResponse,
CallSettings,
NotifyStepResponse,
Telemetry,
WaitEventOptions,
WaitStepResponse,
WorkflowClient,
Expand Down Expand Up @@ -162,6 +163,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
initialPayload,
env,
retries,
telemetry,
}: {
qstashClient: WorkflowClient;
workflowRunId: string;
Expand All @@ -173,6 +175,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
initialPayload: TInitialPayload;
env?: Record<string, string | undefined>;
retries?: number;
telemetry?: Telemetry;
}) {
this.qstashClient = qstashClient;
this.workflowRunId = workflowRunId;
Expand All @@ -184,7 +187,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
this.env = env ?? {};
this.retries = retries ?? DEFAULT_RETRIES;

this.executor = new AutoExecutor(this, this.steps, debug);
this.executor = new AutoExecutor(this, this.steps, telemetry, debug);
}

/**
Expand Down
Loading

0 comments on commit 4c4ec7b

Please sign in to comment.