Description
I think this is a perfectly reasonable interaction:
import { pbStream } from 'it-pb-stream'
const stream = await node.dialProtocol(remotePeer, '/my/protocol/1.0.0')
const pb = pbStream(stream)
pb.writePB({ hello: 'world' }, Message)
stream.close()
But the thing is it doesn't work, the following is thrown on the remote:
Error: stream ended before 1 bytes became available
at file:///Users/alex/Documents/Workspaces/libp2p/js-libp2p/node_modules/it-reader/src/index.ts:70:9
at Object.next (file:///Users/alex/Documents/Workspaces/libp2p/js-libp2p/node_modules/@libp2p/multistream-select/src/multistream.ts:62:23)
... more stack trace here
Behind the scenes it-pb-stream
has passed an it-pushable
to stream.sink
, pb.writePB
pushes some bytes to the pushable but because stream.sink
consumes the passed stream using for await..of
, the bytes won't be read until the microtask queue is processed since everything is a promise.
This never happens because stream.close
is synchronous and causes the CLOSE
message to be sent before the DATA
message since it's queued for sending in the current macrotask.
We can see this if we enable trace logging:
libp2p:mplex new initiator stream 0 +0ms
libp2p:mplex:trace initiator stream 0 send { id: 0, type: 'NEW_STREAM (0)', data: '0' } +0ms
-- local pb.writePB(...) --
-- local stream.close() --
libp2p:stream:trace outbound stream i0 close +0ms
libp2p:stream:trace outbound stream i0 closeRead +0ms
libp2p:stream:trace outbound stream i0 source end - err: undefined +0ms
libp2p:stream:trace outbound stream i0 closeWrite +0ms
-- local sends close message --
libp2p:mplex:trace initiator stream 0 send { id: 0, type: 'CLOSE_INITIATOR (4)' } +2ms
libp2p:stream:trace outbound stream i0 sink end - err: undefined +0ms
libp2p:mplex initiator stream with id 0 and protocol undefined ended +3ms
-- local sends data message --
libp2p:mplex:trace initiator stream 0 send {
id: 0,
type: 'MESSAGE_INITIATOR (2)',
data: '110a0b68656c6c6f20776f726c6410051801'
} +1ms
-- remote node notices incoming stream --
libp2p:mplex:trace incoming message { id: 0, type: 'NEW_STREAM (0)', data: '0' } +1ms
libp2p:mplex new receiver stream 0 +2ms
-- remote stream receives stream close --
libp2p:mplex:trace incoming message { id: 0, type: 'CLOSE_INITIATOR (4)' } +0ms
libp2p:stream:trace inbound stream r0 closeRead +2ms
libp2p:stream:trace inbound stream r0 source end - err: undefined +0ms
-- remote stream receives data message but stream is already closed --
libp2p:mplex:trace incoming message {
id: 0,
type: 'MESSAGE_INITIATOR (2)',
data: '110a0b68656c6c6f20776f726c6410051801'
} +0ms
If we move the stream.close()
to the microtask queue it gets queued behind the task that sends the DATA
message and everything works:
pb.writePB({ hello: 'world' }, Message)
await delay(0)
stream.close()
.close
should be a graceful close and ensure that all data is sent before closing the stream (subject to a timeout). If we need to end the stream abruptly we have .abort
for that.
The problem is according to the interface .close
is synchronous - we can't block the thread while this happens so we'll need to convert .close
, .closeRead
and .closeWrite
to asynchronous to do this (.abort
should remain synchronous as it's really for error handling).
This is a breaking change so will cause [email protected]
to be released. I do wonder if this is a good time to make our streams more "webby" and have { readable: ReadbleStream, writeable: WriteableStream }
instead of { source: AsyncGenerator, sink: (source: AsyncGenerator) => Promise<void> }
instead?
Either way we should get #1792 in first so we can deliver this as the minimum number of PRs and not several across here, libp2p/js-libp2p-interfaces, mplex and yamux.
- interfaces
- mplex
- yamux
todo:
other components below above layers
Metadata
Metadata
Assignees
Type
Projects
Status