Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC #166

Closed
wants to merge 31 commits into from
Closed

RPC #166

wants to merge 31 commits into from

Conversation

zenhack
Copy link

@zenhack zenhack commented Aug 24, 2021

I've rebased @fasterthanlime's rpc work on top of master, fixing some merge conflicts. Tests pass.

I have another branch I've been working off of where I switch the bluebird promises over to standard js promises, and fixed a handful of bugs I found trying to apply this to real-world schema, as well as some bugs it looks like you've separately fixed on master. I plan to cherry-pick those where relevant, but wanted to get you in the loop sooner rather than later.

fasterthanlime and others added 30 commits August 19, 2021 01:54
This was giving us this type error:

packages/capnp-ts/test/unit/rpc/queue.spec.ts:2:21 - error TS2307: Cannot find module '../../util' or its corresponding type declarations.

2 import { tap } from "../../util";

(as well as some cascading errors about an any type).
We do, in fact, now generate code for interfaces.

Not sure whether I botched the merge or Amos never deleted this.
...to where all the others are.
@jdiaz5513
Copy link
Owner

Look a quick look and it looks amazing so far. Sorry it's taken this long to get around to - it's been a tough year.

I have a renewed interest (and free time) available to see this through, finally - it looks like there are some use cases for wasm and elsewhere that would really benefit from this kind of RPC mechanism.

I am about to land a somewhat big patch in the next day or so that will add native bigint support for 64 bit integer values. I may try to rebase this work on top of that and see what happens from there.

I switch the bluebird promises over to standard js promises

Bluebird is already gone on master so I'm going to assume this comment is stale.

fixed a handful of bugs I found trying to apply this to real-world schema

I'll be especially interested to work with you on fixing these. I suspect some will be resolved with the aforementioned bigint patch.

@zenhack
Copy link
Author

zenhack commented Jan 6, 2022 via email

@jdiaz5513
Copy link
Owner

I now have an RPC branch with the bigint changes merged in. Tests do indeed still pass, but most of the new code isn't covered. I tried creating a test case based off the Go RPC example (HashFactory) and was unsuccessful - it's complaining about a missing target and I see there might be an answer missing from the client's connection object. It's entirely possible I'm not using the RPC classes correctly, which emphasizes how important it will be to document this well.

Seems like there are quite a few sharp edges (unimplemented methods and TODO comments) remaining, so maybe the implementation is still too incomplete to be usable. Were you successful with using it at all?

Here's the test case I wrote, maybe you can see if I'm doing something wrong:

hash-factory.capnp

@0xbba26a2caa3411e8;

interface HashFactory {
    newSha1 @0 () -> (hash :Hash);
}

interface Hash {
    write @0 (data :Data) -> ();
    sum @1 () -> (hash :Data);
}

hash-factory.spec.ts

import {
  Hash,
  HashFactory,
  Hash$Server$Target,
  HashFactory$Server$Target,
  HashFactory_NewSha1$Params,
  HashFactory_NewSha1$Results,
  Hash_Sum$Params,
  Hash_Sum$Results,
  Hash_Write$Params,
} from "./hash-factory.capnp.js";
import { createHash } from "crypto";
import { Conn, Message, RPCMessage, Transport } from "capnp-ts";
import { MessageChannel, MessagePort } from "worker_threads";
import tap from "tap";

class HashFactoryServer implements HashFactory$Server$Target {
  newSha1(_params: HashFactory_NewSha1$Params, results: HashFactory_NewSha1$Results): Promise<void> {
    results.setHash(new Hash.Server(new HashServer()).client());
    return Promise.resolve();
  }
}

class HashServer implements Hash$Server$Target {
  constructor(private hash = createHash("sha1")) {}

  sum(_params: Hash_Sum$Params, results: Hash_Sum$Results): Promise<void> {
    const digest = this.hash.digest();
    results.initHash(digest.length).copyBuffer(digest);
    return Promise.resolve();
  }

  write(params: Hash_Write$Params): Promise<void> {
    return new Promise((resolve, reject) => {
      const data = params.getData().toDataView();
      this.hash.write(data, undefined, (err) => {
        if (err) {
          reject(err);
        } else {
          resolve();
        }
      });
    });
  }
}

void tap.test("Hash Factory demo", async (t) => {
  const handleTransportClose = (err: Error) => {
    if (err instanceof TransportClosedError) {
      return t.pass();
    }
    throw err;
  };

  function server(port: MessagePort): Conn {
    const conn = new Conn(new MessageChannelTransport(port));
    conn.addExport(new HashFactory.Server(new HashFactoryServer()));
    conn.onError = handleTransportClose;
    return conn;
  }

  async function client(port: MessagePort): Promise<void> {
    const conn = new Conn(new MessageChannelTransport(port));
    conn.onError = handleTransportClose;
    const hashFactory = new HashFactory.Client(conn.bootstrap());
    const hasher = hashFactory.newSha1().getHash();
    hasher.write((params) => {
      const buf = str2Buf("hello ");
      params.initData(buf.byteLength).copyBuffer(buf);
    });
    hasher.write((params) => {
      const buf = str2Buf("world");
      params.initData(buf.byteLength).copyBuffer(buf);
    });
    try {
      const sum = await hasher.sum().promise();
      t.equal(sum.getHash(), null /* TODO: compare with expected hash value */);
    } finally {
      conn.shutdown(new Error("done"));
    }
  }

  const c = new MessageChannel();
  const srv = server(c.port1);
  await client(c.port2);
  srv.shutdown(new Error("done"));
  t.end();
});

class MessageChannelTransport implements Transport {
  private resolve: (msg: RPCMessage) => void = () => null;
  private reject: (err: unknown) => void = () => null;

  constructor(public port: MessagePort) {
    this.port.on("message", this.readMessage);
    this.port.on("messageerror", this.handleMessageError);
  }

  private handleMessageError = (err: unknown): void => {
    this.reject(err);
  };

  private readMessage = (buf: ArrayBuffer): void => {
    try {
      this.resolve(new Message(buf, false).getRoot(RPCMessage));
    } catch (err) {
      this.reject(err);
    }
  };

  sendMessage(msg: RPCMessage): void {
    const m = new Message();
    m.setRoot(msg);
    const buf = m.toArrayBuffer();
    this.port.postMessage(buf, [buf]);
  }

  recvMessage(): Promise<RPCMessage> {
    return new Promise((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });
  }

  close(): void {
    this.port.off("message", this.readMessage);
    this.port.off("messageerror", this.handleMessageError);
    this.reject(new TransportClosedError());
  }
}

class TransportClosedError extends Error {
  constructor() {
    super("transport closed");
  }
}

function str2Buf(s: string) {
  const buf = new ArrayBuffer(s.length * 2);
  const view = new Uint16Array(buf);
  for (let i = 0; i < s.length; i++) {
    view[i] = s.charCodeAt(i);
  }
  return buf;
}

@zenhack
Copy link
Author

zenhack commented Jan 6, 2022

It wouldn't greatly surprise me to find that there are still gaps. Most of the issues I found were with errors in the generated code -- things like missing imports for certain edge cases, not all of which were rpc related.

If he's interesting in weighing in, I @fasterthanlime might be able to enlighten us as to where this was when he left it, and whether there are any specific known issues worth calling out.

@CGamesPlay reported having gotten it working, see his comment here: #92 (comment).

I'll see if I can play with your example soonish.

@jdiaz5513
Copy link
Owner

I can see now there are unimplemented pieces that affect bootstrapping an interface and pipelining for sure (FINISH and BOOTSTRAP messages are dropped). I began implementing those but started to hit a wall so I'm backing off for now. Pipelining is a level 2 RPC feature and does not necessarily have to land right now.

What's here might be enough for level 0 - I plan on trying a smaller test case to see if that works, and landing the PR (unimplemented sharp edges and everything) if so.

Long term I'm not sure that copying the implementation details of the Go port is the right call; TypeScript is a far more expressive language and there's likely opportunity here to simplify (and possibly speed up) the code a great deal. At least this patch gets things started, though.

@zenhack
Copy link
Author

zenhack commented Jan 15, 2022 via email

@jdiaz5513
Copy link
Owner

I managed to fix the missing bits and a few other nasty bugs, and in the process of troubleshooting cleaned up the interfaces a good bit. Consuming interfaces is much nicer: https://github.com/jdiaz5513/capnp-ts/pull/169/files#diff-dfb224d5f2d329b0fe4014703d43f62084cf5d0b13ae406965015880670bad0aR7-R52

Level 0 RPC appears to be working great and some of level 1 might be as well. That PR will close out this one once I get a chance to clean it up + take care of CI. Thanks for all the help!

@zenhack
Copy link
Author

zenhack commented Jan 27, 2022 via email

@zenhack zenhack closed this Jan 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants