Skip to content

chore: use threaded worker util from toolkit for msg parsing #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

zone117x
Copy link
Member

@zone117x zone117x commented Jun 4, 2025

Use the WorkerThread code from the toolkit library (which was originally developed in this repo then extracted out).

Note: only a single worker thread is used at a time right now, which is helpful because it prevents the main thread/loop from blocking on cpu. However, the msg parsing could be performed faster if multiple workers were used in parallel. This would require some changes in the S&P client library to allow consumers (like this project) to process msgs in batch. Right now only one at a time is supported.

Copy link

github-actions bot commented Jun 4, 2025

Vercel deployment URL: https://signer-metrics-vuiwsbd41-hirosystems.vercel.app 🚀

@@ -491,20 +490,18 @@ export class PgStore extends BasePgStore {
}

async getSignerDataForBlock({ sql, blockId }: { sql: PgSqlClient; blockId: BlockIdParam }) {
let blockFilter: Fragment;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Work around for a type error due to the api-toolkit using an older version of the postgres library.
TODO: update postgres library in toolkit repo

Copy link

codecov bot commented Jun 4, 2025

Codecov Report

Attention: Patch coverage is 43.24324% with 21 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/pg/pg-store.ts 0.00% 12 Missing ⚠️
src/event-stream/threaded-parser-worker.ts 35.71% 9 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

@janniks janniks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice code reduction 👏

const parsed = await this.threadedParser.parseStackerDbChunk(msg);
await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed);
const parsed = await this.threadedParser.exec({ kind: 'chunk', msg });
const result = parsed.result as ParsedStackerDbChunk[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These casts always safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case yes, and tests would catch any accidental breakages. However, it could be made more type safe if the WorkerThreadManager implementation could understand conditional return types when the type for this .exec(...) function are inferred. That's a bit tricky though. Right now the type inference code extracts the args and response type separately, e.g.:

type WorkerPoolModuleInterface<TArgs extends unknown[], TResp> = {
  workerModule: NodeJS.Module;
  processTask: (...args: TArgs) => Promise<TResp> | TResp; // <-- any type interface involving both TArgs and TResp are broken here
}

If the worker module function was called directly, fully type safety could be achieved with:

type TaskArg =
  | { kind: 'block'; msg: CoreNodeNakamotoBlockMessage }
  | { kind: 'chunk'; msg: StackerDbChunk };

type TaskResult<T extends TaskArg> = T['kind'] extends 'block'
  ? { kind: 'block'; result: ReturnType<typeof parseNakamotoBlockMsg> }
  : { kind: 'chunk'; result: ReturnType<typeof parseStackerDbChunk> };

export function processTask<T extends TaskArg>(args: T): TaskResult<T> {
  if (args.kind === 'block') {
    return {
      kind: 'block',
      result: parseNakamotoBlockMsg(args.msg),
    } as TaskResult<T>;
  } else {
    return {
      kind: 'chunk',
      result: parseStackerDbChunk(args.msg),
    } as TaskResult<T>;
  }
}

But right now that code is broken by the WorkerPoolModuleInterface type inference. I'll make it a TODO.

@@ -71,8 +57,9 @@ export class EventStreamHandler {
);
}
if ('signer_signature_hash' in blockMsg) {
const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg);
await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed);
const parsed = await this.threadedParser.exec({ kind: 'block', msg: nakamotoBlockMsg });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: only a single worker thread is used at a time right now, which is helpful because it prevents the main thread/loop from blocking on cpu. However, the msg parsing could be performed faster if multiple workers were used in parallel. This would require some changes in the S&P client library to allow consumers (like this project) to process msgs in batch. Right now only one at a time is supported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants