Skip to content

chore: use threaded worker util from toolkit for msg parsing #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
],
"outputCapture": "std",
"console": "internalConsole",
"runtimeVersion": "22",
"nodeVersionHint": 22,
},
{
"name": "Chunk parser prototyping",
Expand Down
33 changes: 18 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"main": "index.js",
"author": "Hiro Systems PBC <[email protected]> (https://hiro.so)",
"license": "GPL-3.0",
"engines": {
"node": ">=22.0.0"
},
"scripts": {
"build": "rimraf ./dist && tsc --project tsconfig.build.json",
"build:tests": "tsc -p tsconfig.json --noEmit",
Expand Down Expand Up @@ -56,8 +59,8 @@
"@fastify/cors": "^9.0.1",
"@fastify/swagger": "^8.15.0",
"@fastify/type-provider-typebox": "^4.1.0",
"@hirosystems/api-toolkit": "^1.9.0",
"@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1",
"@hirosystems/api-toolkit": "^1.9.1",
"@hirosystems/salt-n-pepper-client": "^1.1.1",
"@noble/secp256k1": "^2.2.3",
"@sinclair/typebox": "^0.28.17",
"@stacks/transactions": "^7.0.6",
Expand Down
34 changes: 11 additions & 23 deletions src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,34 @@ import {
CoreNodeNakamotoBlockMessage,
StackerDbChunk,
} from './core-node-message';
import { logger as defaultLogger, stopwatch } from '@hirosystems/api-toolkit';
import { logger as defaultLogger, stopwatch, WorkerThreadManager } from '@hirosystems/api-toolkit';
import { ENV } from '../env';
import {
ParsedNakamotoBlock,
ParsedStackerDbChunk,
parseNakamotoBlockMsg,
parseStackerDbChunk,
} from './msg-parsing';
import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing';
import { SignerMessagesEventPayload } from '../pg/types';
import { ThreadedParser } from './threaded-parser';
import { SERVER_VERSION } from '@hirosystems/api-toolkit';
import { EventEmitter } from 'node:events';

// TODO: move this into the @hirosystems/salt-n-pepper-client lib
function sanitizeRedisClientName(value: string): string {
const nameSanitizer = /[^!-~]+/g;
return value.trim().replace(nameSanitizer, '-');
}
import * as msgParserWorkerBlocks from './threaded-parser-worker';

export class EventStreamHandler {
db: PgStore;
logger = defaultLogger.child({ name: 'EventStreamHandler' });
eventStream: StacksEventStream;
threadedParser: ThreadedParser;
threadedParser = new WorkerThreadManager(msgParserWorkerBlocks);

readonly events = new EventEmitter<{
processedMessage: [{ msgId: string }];
}>();

constructor(opts: { db: PgStore; lastMessageId: string }) {
this.db = opts.db;
const appName = sanitizeRedisClientName(
`signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`
);
const appName = `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`;
this.eventStream = new StacksEventStream({
redisUrl: ENV.REDIS_URL,
redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX,
eventStreamType: StacksEventStreamType.all,
lastMessageId: opts.lastMessageId,
appName,
});
this.threadedParser = new ThreadedParser();
}

async start() {
Expand All @@ -71,8 +57,9 @@ export class EventStreamHandler {
);
}
if ('signer_signature_hash' in blockMsg) {
const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg);
await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed);
const parsed = await this.threadedParser.exec({ kind: 'block', msg: nakamotoBlockMsg });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: only a single worker thread is used at a time right now, which is helpful because it prevents the main thread/loop from blocking on cpu. However, the msg parsing could be performed faster if multiple workers were used in parallel. This would require some changes in the S&P client library to allow consumers (like this project) to process msgs in batch. Right now only one at a time is supported.

const result = parsed.result as ParsedNakamotoBlock;
await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), result);
} else {
// ignore pre-Nakamoto blocks
}
Expand All @@ -81,8 +68,9 @@ export class EventStreamHandler {

case '/stackerdb_chunks': {
const msg = body as StackerDbChunk;
const parsed = await this.threadedParser.parseStackerDbChunk(msg);
await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed);
const parsed = await this.threadedParser.exec({ kind: 'chunk', msg });
const result = parsed.result as ParsedStackerDbChunk[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These casts always safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case yes, and tests would catch any accidental breakages. However, it could be made more type safe if the WorkerThreadManager implementation could understand conditional return types when the type for this .exec(...) function are inferred. That's a bit tricky though. Right now the type inference code extracts the args and response type separately, e.g.:

type WorkerPoolModuleInterface<TArgs extends unknown[], TResp> = {
  workerModule: NodeJS.Module;
  processTask: (...args: TArgs) => Promise<TResp> | TResp; // <-- any type interface involving both TArgs and TResp are broken here
}

If the worker module function was called directly, fully type safety could be achieved with:

type TaskArg =
  | { kind: 'block'; msg: CoreNodeNakamotoBlockMessage }
  | { kind: 'chunk'; msg: StackerDbChunk };

type TaskResult<T extends TaskArg> = T['kind'] extends 'block'
  ? { kind: 'block'; result: ReturnType<typeof parseNakamotoBlockMsg> }
  : { kind: 'chunk'; result: ReturnType<typeof parseStackerDbChunk> };

export function processTask<T extends TaskArg>(args: T): TaskResult<T> {
  if (args.kind === 'block') {
    return {
      kind: 'block',
      result: parseNakamotoBlockMsg(args.msg),
    } as TaskResult<T>;
  } else {
    return {
      kind: 'chunk',
      result: parseStackerDbChunk(args.msg),
    } as TaskResult<T>;
  }
}

But right now that code is broken by the WorkerPoolModuleInterface type inference. I'll make it a TODO.

await this.handleStackerDbMsg(messageId, parseInt(timestamp), result);
break;
}

Expand Down
104 changes: 14 additions & 90 deletions src/event-stream/threaded-parser-worker.ts
Original file line number Diff line number Diff line change
@@ -1,92 +1,16 @@
import * as WorkerThreads from 'node:worker_threads';
import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message';
import {
ParsedNakamotoBlock,
ParsedStackerDbChunk,
parseNakamotoBlockMsg,
parseStackerDbChunk,
} from './msg-parsing';

export const workerFile = __filename;

export enum ThreadedParserMsgType {
NakamotoBlock = 'NakamotoBlock',
StackerDbChunk = 'StackerDbChunk',
}

interface ThreadMsg {
type: ThreadedParserMsgType;
msgId: number;
}

export interface NakamotoBlockMsgRequest extends ThreadMsg {
type: ThreadedParserMsgType.NakamotoBlock;
msgId: number;
block: CoreNodeNakamotoBlockMessage;
}

export interface NakamotoBlockMsgReply extends ThreadMsg {
type: ThreadedParserMsgType.NakamotoBlock;
msgId: number;
block: ParsedNakamotoBlock;
}

export interface StackerDbChunkMsgRequest extends ThreadMsg {
type: ThreadedParserMsgType.StackerDbChunk;
msgId: number;
chunk: StackerDbChunk;
}

export interface StackerDbChunkMsgReply extends ThreadMsg {
type: ThreadedParserMsgType.StackerDbChunk;
msgId: number;
chunk: ParsedStackerDbChunk[];
}

export type ThreadedParserMsgRequest = NakamotoBlockMsgRequest | StackerDbChunkMsgRequest;
export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgReply;

if (!WorkerThreads.isMainThread) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const mainThreadPort = WorkerThreads.parentPort!;
mainThreadPort.on('messageerror', err => {
console.error(`Worker thread message error`, err);
});
mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => {
try {
handleWorkerMsg(msg);
} catch (err) {
console.error(`Failed to parse message: ${JSON.stringify(msg)}`);
console.error(`Error handling message from main thread`, err);
}
});
}

function handleWorkerMsg(msg: ThreadedParserMsgRequest) {
let reply: ThreadedParserMsgReply;
switch (msg.type) {
case ThreadedParserMsgType.NakamotoBlock: {
reply = {
type: ThreadedParserMsgType.NakamotoBlock,
msgId: msg.msgId,
block: parseNakamotoBlockMsg(msg.block),
} satisfies NakamotoBlockMsgReply;
break;
}
case ThreadedParserMsgType.StackerDbChunk: {
reply = {
type: ThreadedParserMsgType.StackerDbChunk,
msgId: msg.msgId,
chunk: parseStackerDbChunk(msg.chunk),
} satisfies StackerDbChunkMsgReply;
break;
}
default: {
const _exhaustiveCheck: never = msg;
throw new Error(`Unhandled message type: ${msg}`);
}
import type { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message';
import { parseNakamotoBlockMsg, parseStackerDbChunk } from './msg-parsing';

export function processTask(
args:
| { kind: 'block'; msg: CoreNodeNakamotoBlockMessage }
| { kind: 'chunk'; msg: StackerDbChunk }
) {
if (args.kind === 'block') {
return { kind: 'block', result: parseNakamotoBlockMsg(args.msg) };
} else {
return { kind: 'chunk', result: parseStackerDbChunk(args.msg) };

Check warning on line 12 in src/event-stream/threaded-parser-worker.ts

View check run for this annotation

Codecov / codecov/patch

src/event-stream/threaded-parser-worker.ts#L4-L12

Added lines #L4 - L12 were not covered by tests
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const mainThreadPort = WorkerThreads.parentPort!;
mainThreadPort.postMessage(reply);
}

export const workerModule = module;
82 changes: 0 additions & 82 deletions src/event-stream/threaded-parser.ts

This file was deleted.

Loading