-
Notifications
You must be signed in to change notification settings - Fork 347
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding class for new pipelines product
- Loading branch information
Showing
8 changed files
with
310 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
// Copyright (c) 2024 Cloudflare, Inc. | ||
// Licensed under the Apache 2.0 license found in the LICENSE file or at: | ||
// https://opensource.org/licenses/Apache-2.0 | ||
|
||
import entrypoints from 'cloudflare-internal:workers' | ||
|
||
async function* readLines(stream: ReadableStream<string>): AsyncGenerator<string> { | ||
let start, end = 0 | ||
let partial = '' | ||
|
||
// @ts-ignore | ||
for await (const chunk of stream) { | ||
const full = chunk+partial as string | ||
for (const char of full) { | ||
if (char === '\n') { | ||
yield chunk.substring(start, end) | ||
end++ | ||
start = end | ||
} else { | ||
end++ | ||
} | ||
} | ||
|
||
partial = chunk.substring(start, end) | ||
start = 0 | ||
end = 0 | ||
} | ||
|
||
if (partial.length > 0) { | ||
yield partial | ||
} | ||
} | ||
|
||
type Batch = { | ||
id: string // unique identifier for the batch | ||
shard: string // assigned shard | ||
ts: number // timestamp of the event | ||
|
||
format: Format | ||
size: { | ||
bytes: number | ||
rows: number | ||
} | ||
data: unknown | ||
} | ||
|
||
type JsonStream = Batch & { | ||
format: Format.JSON_STREAM, | ||
data: ReadableStream<Uint8Array> | ||
} | ||
|
||
enum Format { | ||
JSON_STREAM = 'json_stream', //jsonl | ||
} | ||
|
||
export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint { | ||
#batch?: Batch | ||
#initalized: boolean = false | ||
|
||
constructor(ctx: unknown, env: unknown) { | ||
super(ctx, env) | ||
} | ||
|
||
// stub overriden on the sub class | ||
// @ts-ignore | ||
public async transformJson(data: object[]): Promise<object[]> { | ||
throw new Error('should be implemented by parent') | ||
} | ||
|
||
// called by the dispatcher which then calls the subclass methods | ||
// @ts-ignore | ||
private async _ping(): Promise<void> { | ||
// making sure the function was overriden by an implementing subclass | ||
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) { | ||
return | ||
} else { | ||
throw new Error('the transformJson method must be overridden by the PipelineTransform subclass') | ||
} | ||
} | ||
|
||
// called by the dispatcher which then calls the subclass methods | ||
// the reason this is typescript private and not javascript private is that this must be | ||
// able to be called by the dispatcher but should not be called by the class implementer | ||
// @ts-ignore | ||
private async _transform(batch: Batch): Promise<JsonStream> { | ||
if (this.#initalized) { | ||
throw new Error('pipeline entrypoint has already been initialized') | ||
} | ||
|
||
this.#batch = batch | ||
this.#initalized = true | ||
|
||
switch (this.#batch!.format) { | ||
case Format.JSON_STREAM: | ||
const data = await this.#readJsonStream() | ||
const transformed = await this.transformJson(data) | ||
return this.#sendJson(transformed) | ||
default: | ||
throw new Error('unsupported batch format') | ||
} | ||
} | ||
|
||
async #readJsonStream(): Promise<object[]> { | ||
if (this.#batch!.format !== Format.JSON_STREAM) { | ||
throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`) | ||
} | ||
|
||
const batch = this.#batch!.data as ReadableStream<Uint8Array> | ||
const decoder = batch.pipeThrough(new TextDecoderStream()) | ||
|
||
const data: object[] = [] | ||
for await (const line of readLines(decoder)) { | ||
data.push(JSON.parse(line)) | ||
} | ||
|
||
return data | ||
} | ||
|
||
async #sendJson(data: object[]): Promise<JsonStream> { | ||
let written = 0 | ||
const encoder = new TextEncoder() | ||
const readable = new ReadableStream<Uint8Array>({ | ||
start(controller) { | ||
for (const obj of data) { | ||
const encoded = encoder.encode(`${JSON.stringify(obj)}\n`) | ||
written += encoded.length | ||
controller.enqueue(encoded) | ||
} | ||
|
||
controller.close() | ||
} | ||
}) | ||
|
||
return { | ||
id: this.#batch!.id, | ||
shard: this.#batch!.shard, | ||
ts: this.#batch!.ts, | ||
format: Format.JSON_STREAM, | ||
size: { | ||
bytes: written, | ||
rows: data.length | ||
}, | ||
data: readable | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
load("//:build/wd_test.bzl", "wd_test") | ||
|
||
wd_test( | ||
src = "transform.wd-test", | ||
args = ["--experimental"], | ||
data = glob(["*.js"]), | ||
) |
108 changes: 108 additions & 0 deletions
108
src/cloudflare/internal/test/pipeline-transform/transform-test.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// @ts-nocheck | ||
// Copyright (c) 2024 Cloudflare, Inc. | ||
// Licensed under the Apache 2.0 license found in the LICENSE file or at: | ||
// https://opensource.org/licenses/Apache-2.0 | ||
|
||
// @ts-ignore | ||
import assert from 'node:assert' | ||
import { PipelineTransform } from 'cloudflare:pipeline-transform' | ||
|
||
// this is how "Pipeline" would be implemented by the user | ||
const customTransform = class MyEntrypoint extends PipelineTransform { | ||
/** | ||
* @param {any} batch | ||
* @override | ||
*/ | ||
async transformJson(batch) { | ||
for (const obj of batch) { | ||
obj.dispatcher = 'was here!' | ||
} | ||
|
||
return batch | ||
} | ||
} | ||
|
||
const lines = [ | ||
`${JSON.stringify({ name: 'jimmy', age: '42' })}\n`, | ||
`${JSON.stringify({ name: 'jonny', age: '9' })}\n`, | ||
`${JSON.stringify({ name: 'joey', age: '108' })}\n`, | ||
] | ||
|
||
function newBatch() { | ||
return { | ||
id: 'test', | ||
shard: '0', | ||
ts: Date.now(), | ||
format: 'json_stream', | ||
data: new ReadableStream({ | ||
start(controller) { | ||
const encoder = new TextEncoder() | ||
for (const line of lines) { | ||
controller.enqueue(encoder.encode(line)) | ||
} | ||
controller.close() | ||
} | ||
}) | ||
} | ||
} | ||
|
||
// bazel test //src/cloudflare/internal/test/pipeline-transform:transform --test_output=errors --sandbox_debug | ||
export const tests = { | ||
async test(ctr, env, ctx) { | ||
{ | ||
// should fail dispatcher test call when PipelineTransform class not extended | ||
const transformer = new PipelineTransform(ctx, env) | ||
await assert.rejects(transformer._ping(), (err) => { | ||
assert.strictEqual(err.message, 'the transformJson method must be overridden by the PipelineTransform subclass') | ||
return true | ||
}) | ||
} | ||
|
||
{ | ||
// should correctly handle dispatcher test call | ||
const transform = new customTransform(ctx, env) | ||
await assert.doesNotReject(transform._ping()) | ||
} | ||
|
||
{ | ||
// should return mutated batch | ||
const transformer = new customTransform(ctx, env) | ||
const batch = newBatch() | ||
|
||
const result = await transformer._transform(batch) | ||
assert.equal(true, result.data instanceof ReadableStream) | ||
|
||
const reader = result.data | ||
.pipeThrough(new TextDecoderStream()) | ||
.getReader() | ||
|
||
let data = '' | ||
while (true) { | ||
const { done, value } = await reader.read() | ||
if (done) { | ||
break | ||
} else { | ||
data += value | ||
} | ||
} | ||
|
||
assert.notEqual(data.length, 0) | ||
|
||
const objects = [] | ||
const resultLines = data.split('\n') | ||
resultLines.pop() | ||
for (const line of resultLines) { | ||
objects.push(JSON.parse(line)) | ||
} | ||
|
||
let index = 0 | ||
for (const obj of objects) { | ||
assert.equal(obj.dispatcher, 'was here!') | ||
delete obj.dispatcher | ||
|
||
assert.equal(`${JSON.stringify(obj)}\n`, lines[index]) | ||
index++ | ||
} | ||
} | ||
}, | ||
} |
15 changes: 15 additions & 0 deletions
15
src/cloudflare/internal/test/pipeline-transform/transform.wd-test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
using Workerd = import "/workerd/workerd.capnp"; | ||
|
||
const unitTests :Workerd.Config = ( | ||
services = [ | ||
( name = "transform-test", | ||
worker = ( | ||
modules = [ | ||
(name = "worker", esModule = embed "transform-test.js") | ||
], | ||
compatibilityDate = "2024-08-15", | ||
compatibilityFlags = ["nodejs_compat", "experimental"] | ||
) | ||
) | ||
] | ||
); |
12 changes: 12 additions & 0 deletions
12
src/cloudflare/internal/test/pipeline-transform/tsconfig.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"extends": "../../../tsconfig.json", | ||
"compilerOptions": { | ||
"checkJs": true, | ||
"noEmit": true | ||
}, | ||
"include": [ | ||
"*.ts", | ||
"*.js", | ||
], | ||
"files": ["../../../../../types/defines/pipeline-transform.d.ts"] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform' | ||
|
||
export const PipelineTransform = PipelineTransformImpl |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
// Copyright (c) 2022-2023 Cloudflare, Inc. | ||
// Licensed under the Apache 2.0 license found in the LICENSE file or at: | ||
// https://opensource.org/licenses/Apache-2.0 | ||
|
||
import { WorkerEntrypoint } from "cloudflare:workers"; | ||
|
||
/** | ||
* The Pipelines class is called by the Pipelines product to support transformations | ||
*/ | ||
declare abstract class PipelineTransform extends WorkerEntrypoint { | ||
/** | ||
* transformJson recieves an array of javascript objects which can be | ||
* mutated and returned to the pipeline | ||
* @param data The data to be mutated | ||
* @returns A promise containing the mutated data | ||
*/ | ||
public transformJson(data: object[]): Promise<object[]> | ||
} |