Skip to content

Commit

Permalink
Add Workflow binding module
Browse files Browse the repository at this point in the history
  • Loading branch information
sidharthachatterjee committed Sep 9, 2024
1 parent 616dfc5 commit e27705f
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 9 deletions.
169 changes: 165 additions & 4 deletions src/cloudflare/internal/workflows-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,173 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export class NonRetryableError extends Error {
// `__brand` is needed for Workflows' engine to validate if the user returned a NonRetryableError
// this provides better DX because they can just extend NonRetryableError for their own Errors
// and override name.
// This needs to be a public field because it's serialized over RPC to the Workflows' engine
// `__brand` is needed for engine to validate if the user returned a NonRetryableError
// This provides a better DX because they can just extend NonRetryableError for their own Errors
// and override name
// Private fields are not serialized over RPC
public readonly __brand: string = 'NonRetryableError';

public constructor(message: string, name = 'NonRetryableError') {
super(message);
this.name = name;
}
}

type InstanceStatus = {
status:
| 'queued'
| 'running'
| 'paused'
| 'errored'
| 'terminated'
| 'complete'
| 'unknown';
error?: string;
output?: object;
};

interface Fetcher {
fetch: typeof fetch;
}

async function callFetcher<T>(
fetcher: Fetcher,
path: string,
body: object
): Promise<T> {
const response = await fetcher.fetch(`http://workflow-binding.local${path}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Version': '1',
},
body: JSON.stringify(body),
});

// TODO: Throw if result is not 200 or ok is false

return response.json() as Promise<T>;
}

export class Instance {
private readonly fetcher: Fetcher;
private readonly instanceId: string;

public constructor(instanceId: string, fetcher: Fetcher) {
this.instanceId = instanceId;
this.fetcher = fetcher;
}

public async pause(): Promise<void> {
try {
await callFetcher(this.fetcher, '/pause-instance', {
instanceId: this.instanceId,
});
} catch (_error) {
throw new WorkflowError(
'Failed to pause Workflow instance ' + this.instanceId
);
}
}
public async resume(): Promise<void> {
try {
await callFetcher(this.fetcher, '/resume-instance', {
instanceId: this.instanceId,
});
} catch (_error) {
throw new WorkflowError(
'Failed to resume Workflow instance ' + this.instanceId
);
}
}

public async abort(): Promise<void> {
try {
await callFetcher(this.fetcher, '/abort-instance', {
instanceId: this.instanceId,
});
} catch (_error) {
throw new WorkflowError(
'Failed to abort Workflow instance ' + this.instanceId
);
}
}

public async restart(): Promise<void> {
try {
await callFetcher(this.fetcher, '/restart-instance', {
instanceId: this.instanceId,
});
} catch (_error) {
throw new WorkflowError(
'Failed to restart Workflow instance ' + this.instanceId
);
}
}

public async status(): Promise<InstanceStatus> {
try {
const result = await callFetcher<InstanceStatus>(
this.fetcher,
'/instance-status',
{
instanceId: this.instanceId,
}
);
return result;
} catch (_error) {
throw new WorkflowError(
'Failed to get status for Workflow instance ' + this.instanceId
);
}
}
}

export class WorkflowError extends Error {
public constructor(message: string, name = 'WorkflowError') {
super(message);
this.name = name;
}
}

export class Workflow {
private readonly fetcher: Fetcher;

public constructor(fetcher: Fetcher) {
this.fetcher = fetcher;
}

public async get(name: string): Promise<Instance> {
try {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/get-instance',
{ name }
);

return new Instance(result.instanceId, this.fetcher);
} catch (_error) {
throw new WorkflowError('Failed to get Workflow instance for ' + name);
}
}

public async create(name: string, params: object): Promise<Instance> {
try {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/create-instance',
{ name, params }
);

return new Instance(result.instanceId, this.fetcher);
} catch (_error) {
throw new WorkflowError('Failed to create Workflow instance for ' + name);
}
}
}

export default function makeBinding(env: { fetcher: Fetcher }): Workflow {
return new Workflow(env.fetcher);
}
9 changes: 4 additions & 5 deletions types/defines/workflows.d.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/**
* NonRetryableError allows for a Workflow to throw a "fatal" error as in,
* an error that makes the instance fail immediately without triggering a retry.
* NonRetryableError allows for a Workflow to throw a "fatal" error,
* an error that makes the instance fail immediately without triggering a retry
*/
export class NonRetryableError extends Error {
// __brand has been explicity omitted because it's a internal brand used for
// the Workflows' engine and user's shouldn't be able to override it
// (at least, in a direct way)
// `__brand` is used to differentiate between `NonRetryableError` and `Error`
// It is omitted from the constructor because users should not be able to set it

public constructor(message: string, name?: string);
}

0 comments on commit e27705f

Please sign in to comment.