Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Telemetry #49

Merged
merged 4 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion platforms/astro.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { APIContext, APIRoute } from "astro";

import type { PublicServeOptions, WorkflowContext } from "../src";
import { PublicServeOptions, WorkflowContext } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

export function serve<TInitialPayload = unknown>(
routeFunction: (
Expand All @@ -13,6 +14,13 @@ export function serve<TInitialPayload = unknown>(
const POST: APIRoute = (apiContext) => {
const { handler } = serveBase<TInitialPayload>(
(workflowContext) => routeFunction(workflowContext, apiContext),
{
sdk: SDK_TELEMETRY,
framework: "astro",
runtime: process.versions.bun
? `bun@${process.versions.bun}/node@${process.version}`
: `node@${process.version}`,
},
options
);

Expand Down
16 changes: 12 additions & 4 deletions platforms/cloudflare.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { PublicServeOptions, RouteFunction } from "../src";
import { SDK_TELEMETRY } from "../src/constants";
import { serveBase } from "../src/serve";

export type WorkflowBindings = {
Expand Down Expand Up @@ -62,10 +63,17 @@ export const serve = <TInitialPayload = unknown>(
): { fetch: (...args: PagesHandlerArgs | WorkersHandlerArgs) => Promise<Response> } => {
const fetch = async (...args: PagesHandlerArgs | WorkersHandlerArgs) => {
const { request, env } = getArgs(args);
const { handler: serveHandler } = serveBase(routeFunction, {
env,
...options,
});
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "cloudflare",
},
{
env,
...options,
}
);
return await serveHandler(request);
};
return { fetch };
Expand Down
10 changes: 9 additions & 1 deletion platforms/express.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { WorkflowServeOptions, RouteFunction } from "../src";
import { SDK_TELEMETRY } from "../src/constants";
import { serveBase } from "../src/serve";
import {
Request as ExpressRequest,
Expand Down Expand Up @@ -43,7 +44,14 @@ export function serve<TInitialPayload = unknown>(

// create handler
const { handler: serveHandler } = serveBase<TInitialPayload>(
(workflowContext) => routeFunction(workflowContext),
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "express",
runtime: process.versions.bun
? `bun@${process.versions.bun}/node@${process.version}`
: `node@${process.version}`,
},
{
...options,
useJSONContent: true,
Expand Down
13 changes: 12 additions & 1 deletion platforms/h3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { defineEventHandler, readRawBody } from "h3";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import type { IncomingHttpHeaders } from "node:http";
import { SDK_TELEMETRY } from "../src/constants";

function transformHeaders(headers: IncomingHttpHeaders): [string, string][] {
const formattedHeaders = Object.entries(headers).map(([key, value]) => [
Expand Down Expand Up @@ -37,7 +38,17 @@ export const serve = <TInitialPayload = unknown>(
method: "POST",
});

const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "h3",
runtime: process.versions.bun
? `bun@${process.versions.bun}/node@${process.version}`
: `node@${process.version}`,
},
options
);
return await serveHandler(request);
});

Expand Down
20 changes: 14 additions & 6 deletions platforms/hono.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Context } from "hono";
import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { Variables } from "hono/types";
import { SDK_TELEMETRY } from "../src/constants";

export type WorkflowBindings = {
QSTASH_TOKEN: string;
Expand Down Expand Up @@ -32,12 +33,19 @@ export const serve = <
const environment = context.env;
const request = context.req.raw;

const { handler: serveHandler } = serveBase(routeFunction, {
// when hono is used without cf workers, it sends a DebugHTTPServer
// object in `context.env`. don't pass env if this is the case:
env: "QSTASH_TOKEN" in environment ? environment : undefined,
...options,
});
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "hono",
},
{
// when hono is used without cf workers, it sends a DebugHTTPServer
// object in `context.env`. don't pass env if this is the case:
env: "QSTASH_TOKEN" in environment ? environment : undefined,
...options,
}
);
return await serveHandler(request);
};
return handler;
Expand Down
18 changes: 17 additions & 1 deletion platforms/nextjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { NextApiHandler, NextApiRequest, NextApiResponse } from "next";

import type { RouteFunction, PublicServeOptions } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -20,6 +21,11 @@ export const serve = <TInitialPayload = unknown>(
): { POST: (request: Request) => Promise<Response> } => {
const { handler: serveHandler } = serveBase<TInitialPayload, Request, Response>(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "nextjs",
runtime: `node@${process.version}`,
},
options
);

Expand All @@ -34,7 +40,17 @@ export const servePagesRouter = <TInitialPayload = unknown>(
routeFunction: RouteFunction<TInitialPayload>,
options?: PublicServeOptions<TInitialPayload>
): { handler: NextApiHandler } => {
const { handler: serveHandler } = serveBase(routeFunction, options);
const { handler: serveHandler } = serveBase(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "nextjs-pages",
runtime: process.versions.bun
? `bun@${process.versions.bun}/node@${process.version}`
: `node@${process.version}`,
},
options
);

const handler = async (request_: NextApiRequest, res: NextApiResponse) => {
if (request_.method?.toUpperCase() !== "POST") {
Expand Down
13 changes: 12 additions & 1 deletion platforms/solidjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { APIEvent } from "@solidjs/start/server";

import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -28,7 +29,17 @@ export const serve = <TInitialPayload = unknown>(
}

// create serve handler
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, options);
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "solidjs",
runtime: process.versions.bun
? `bun@${process.versions.bun}/node@${process.version}`
: `node@${process.version}`,
},
options
);

return await serveHandler(event.request);
};
Expand Down
16 changes: 12 additions & 4 deletions platforms/svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { RequestHandler } from "@sveltejs/kit";

import type { PublicServeOptions, RouteFunction } from "../src";
import { serveBase } from "../src/serve";
import { SDK_TELEMETRY } from "../src/constants";

/**
* Serve method to serve a Upstash Workflow in a Nextjs project
Expand All @@ -19,10 +20,17 @@ export const serve = <TInitialPayload = unknown>(
}
): { POST: RequestHandler } => {
const handler: RequestHandler = async ({ request }) => {
const { handler: serveHandler } = serveBase<TInitialPayload>(routeFunction, {
...options,
useJSONContent: true,
});
const { handler: serveHandler } = serveBase<TInitialPayload>(
routeFunction,
{
sdk: SDK_TELEMETRY,
framework: "svelte",
},
{
...options,
useJSONContent: true,
}
);
return await serveHandler(request);
};

Expand Down
8 changes: 6 additions & 2 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
import { getWorkflowRunId } from "../utils";
import { triggerFirstInvocation } from "../workflow-requests";
import { WorkflowContext } from "../context";
import { DEFAULT_RETRIES } from "../constants";

type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

Expand Down Expand Up @@ -214,8 +213,13 @@ export class Client {
steps: [],
url,
workflowRunId: finalWorkflowRunId,
retries,
telemetry: undefined, // can't know workflow telemetry here
});
const result = await triggerFirstInvocation({
workflowContext: context,
telemetry: undefined, // can't know workflow telemetry here
});
const result = await triggerFirstInvocation(context, retries ?? DEFAULT_RETRIES);
if (result.isOk()) {
return { workflowRunId: finalWorkflowRunId };
} else {
Expand Down
14 changes: 14 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Telemetry } from "./types";

export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
Expand All @@ -12,3 +14,15 @@ export const DEFAULT_CONTENT_TYPE = "application/json";
export const NO_CONCURRENCY = 1;
export const NOT_SET = "not-set";
export const DEFAULT_RETRIES = 3;

export const VERSION = "v0.2.3";
export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`;

export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const;
export const TELEMETRY_HEADER_FRAMEWORK = "Upstash-Telemetry-Framework" as const;
export const TELEMETRY_HEADER_RUNTIME = "Upstash-Telemetry-Runtime" as const;

export const MOCK_TELEMETRY: Telemetry = {
framework: "mock",
sdk: "mock",
};
59 changes: 36 additions & 23 deletions src/context/auto-executor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowAbort, WorkflowError } from "../error";
import type { WorkflowContext } from "./context";
import type { StepFunction, ParallelCallState, Step, WaitRequest } from "../types";
import type { StepFunction, ParallelCallState, Step, WaitRequest, Telemetry } from "../types";
import { LazyCallStep, type BaseLazyStep } from "./steps";
import { getHeaders } from "../workflow-requests";
import type { WorkflowLogger } from "../logger";
Expand All @@ -15,15 +15,24 @@ export class AutoExecutor {
private readonly nonPlanStepCount: number;
private readonly steps: Step[];
private indexInCurrentList = 0;
private telemetry?: Telemetry;

public stepCount = 0;
public planStepCount = 0;

protected executingStep: string | false = false;

constructor(context: WorkflowContext, steps: Step[], debug?: WorkflowLogger) {
constructor(
context: WorkflowContext,
steps: Step[],
telemetry?: Telemetry,
debug?: WorkflowLogger
) {
this.context = context;
this.debug = debug;
this.steps = steps;
this.telemetry = telemetry;
this.debug = debug;

this.nonPlanStepCount = this.steps.filter((step) => !step.targetStep).length;
}

Expand Down Expand Up @@ -323,18 +332,21 @@ export class AutoExecutor {
steps,
});

// must check length to be 1, otherwise was the if would return
// true for plan steps.
if (steps[0].waitEventId && steps.length === 1) {
const waitStep = steps[0];

const { headers, timeoutHeaders } = getHeaders(
"false",
this.context.workflowRunId,
this.context.url,
this.context.headers,
waitStep,
this.context.failureUrl,
this.context.retries
);
const { headers, timeoutHeaders } = getHeaders({
initHeaderValue: "false",
workflowRunId: this.context.workflowRunId,
workflowUrl: this.context.url,
userHeaders: this.context.headers,
step: waitStep,
failureUrl: this.context.failureUrl,
retries: this.context.retries,
telemetry: this.telemetry,
});

// call wait
const waitBody: WaitRequest = {
Expand Down Expand Up @@ -366,17 +378,18 @@ export class AutoExecutor {
const result = await this.context.qstashClient.batchJSON(
steps.map((singleStep, index) => {
const lazyStep = lazySteps[index];
const { headers } = getHeaders(
"false",
this.context.workflowRunId,
this.context.url,
this.context.headers,
singleStep,
this.context.failureUrl,
this.context.retries,
lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined
);
const { headers } = getHeaders({
initHeaderValue: "false",
workflowRunId: this.context.workflowRunId,
workflowUrl: this.context.url,
userHeaders: this.context.headers,
step: singleStep,
failureUrl: this.context.failureUrl,
retries: this.context.retries,
callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined,
telemetry: this.telemetry,
});

// if the step is a single step execution or a plan step, we can add sleep headers
const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0;
Expand Down
Loading
Loading