Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions src/channel/Base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import EventEmitter from 'events';
import { w3cwebsocket as W3CWebSocket } from 'websocket';
import { snakeToPascal } from '../utils/string';
import { buildTx, unpackTx } from '../tx/builder';
import { unpackTx } from '../tx/builder';
import { Tag } from '../tx/builder/constants';
import * as handlers from './handlers';
import {
Expand All @@ -20,7 +20,7 @@ import {
ChannelMessage,
ChannelEvents,
} from './internal';
import { ChannelError } from '../utils/errors';
import { ChannelError, IllegalArgumentError } from '../utils/errors';
import { Encoded } from '../utils/encoder';
import { TxUnpacked } from '../tx/builder/schema.generated';
import { EntryTag } from '../tx/builder/entry/constants';
Expand Down Expand Up @@ -108,9 +108,16 @@ export default class Channel {
}

static async _initialize<T extends Channel>(channel: T, options: ChannelOptions): Promise<T> {
const reconnect = (options.existingFsmId ?? options.existingChannelId) != null;
if (reconnect && (options.existingFsmId == null || options.existingChannelId == null)) {
throw new IllegalArgumentError('`existingChannelId`, `existingFsmId` should be both provided or missed');
}
const reconnectHandler = handlers[
options.reestablish === true ? 'awaitingReestablish' : 'awaitingReconnection'
];
await initialize(
channel,
options.existingFsmId != null ? handlers.awaitingReconnection : handlers.awaitingConnection,
reconnect ? reconnectHandler : handlers.awaitingConnection,
handlers.channelOpen,
options,
);
Expand Down Expand Up @@ -249,8 +256,7 @@ export default class Channel {
* signed state and then terminates.
*
* The channel can be reestablished by instantiating another Channel instance
* with two extra params: existingChannelId and offchainTx (returned from leave
* method as channelId and signedTx respectively).
* with two extra params: existingChannelId and existingFsmId.
*
* @example
* ```js
Expand Down Expand Up @@ -290,16 +296,4 @@ export default class Channel {
};
});
}

static async reconnect(options: ChannelOptions, txParams: any): Promise<Channel> {
const { sign } = options;

return Channel.initialize({
...options,
reconnectTx: await sign(
'reconnect',
buildTx({ ...txParams, tag: Tag.ChannelClientReconnectTx }),
),
});
}
}
45 changes: 37 additions & 8 deletions src/channel/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,40 @@ export function awaitingConnection(
}
}

export async function awaitingReestablish(
channel: Channel,
message: ChannelMessage,
state: ChannelState,
): Promise<ChannelFsm> {
if (message.method === 'channels.info' && message.params.data.event === 'fsm_up') {
channel._fsmId = message.params.data.fsm_id;
return {
handler: function awaitingChannelReestablished(
_: Channel,
message2: ChannelMessage,
state2: ChannelState,
): ChannelFsm | undefined {
if (
message2.method === 'channels.info'
&& message2.params.data.event === 'channel_reestablished'
) return { handler: awaitingOpenConfirmation };
return handleUnexpectedMessage(channel, message2, state2);
},
};
}
return handleUnexpectedMessage(channel, message, state);
}

export async function awaitingReconnection(
channel: Channel,
message: ChannelMessage,
state: ChannelState,
): Promise<ChannelFsm> {
if (message.method === 'channels.info') {
if (message.params.data.event === 'fsm_up') {
channel._fsmId = message.params.data.fsm_id;
const { signedTx } = await channel.state();
changeState(channel, signedTx == null ? '' : buildTx(signedTx));
return { handler: channelOpen };
}
if (message.method === 'channels.info' && message.params.data.event === 'fsm_up') {
channel._fsmId = message.params.data.fsm_id;
const { signedTx } = await channel.state();
changeState(channel, signedTx == null ? '' : buildTx(signedTx));
return { handler: channelOpen };
}
return handleUnexpectedMessage(channel, message, state);
}
Expand Down Expand Up @@ -220,18 +242,25 @@ export function awaitingOnChainTx(
function awaitingOpenConfirmation(
channel: Channel,
message: ChannelMessage,
state: ChannelState,
): ChannelFsm | undefined {
if (message.method === 'channels.info' && message.params.data.event === 'open') {
channel._channelId = message.params.channel_id;
return {
handler(_: Channel, message2: ChannelMessage): ChannelFsm | undefined {
handler: function awaitingChannelsUpdate(
_: Channel,
message2: ChannelMessage,
state2: ChannelState,
): ChannelFsm | undefined {
if (message2.method === 'channels.update') {
changeState(channel, message2.params.data.state);
return { handler: channelOpen };
}
return handleUnexpectedMessage(channel, message2, state2);
},
};
}
return handleUnexpectedMessage(channel, message, state);
}

export async function channelOpen(
Expand Down
24 changes: 8 additions & 16 deletions src/channel/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
ChannelError,
} from '../utils/errors';
import { encodeContractAddress } from '../utils/crypto';
import { buildTx } from '../tx/builder';
import { ensureError } from '../utils/other';

export interface ChannelEvents {
Expand Down Expand Up @@ -53,7 +52,6 @@ export type SignTx = (tx: Encoded.Transaction, options?: SignOptions) => (
* @see {@link https://github.com/aeternity/protocol/blob/6734de2e4c7cce7e5e626caa8305fb535785131d/node/api/channels_api_usage.md#channel-establishing-parameters}
*/
interface CommonChannelOptions {
existingFsmId?: Encoded.Bytearray;
/**
* Channel url (for example: "ws://localhost:3001")
*/
Expand Down Expand Up @@ -118,10 +116,14 @@ interface CommonChannelOptions {
*/
existingChannelId?: Encoded.Channel;
/**
* Offchain transaction (required if reestablishing a channel)
* Existing FSM id (required if reestablishing a channel)
*/
existingFsmId?: Encoded.Bytearray;
/**
* Needs to be provided if reconnecting with calling `leave` before
*/
offChainTx?: Encoded.Transaction;
reconnectTx?: Encoded.Transaction;
// TODO: remove after solving https://github.com/aeternity/aeternity/issues/4399
reestablish?: boolean;
/**
* The time waiting for a new event to be initiated (default: 600000)
*/
Expand Down Expand Up @@ -174,7 +176,6 @@ interface CommonChannelOptions {
* Function which verifies and signs transactions
*/
sign: SignTxWithTag;
offchainTx?: Encoded.Transaction;
}

export type ChannelOptions = CommonChannelOptions & ({
Expand Down Expand Up @@ -439,16 +440,7 @@ export async function initialize(
onopen: async (event: Event) => {
resolve();
changeStatus(channel, 'connected', event);
if (channelOptions.reconnectTx != null) {
enterState(channel, { handler: openHandler });
const { signedTx } = await channel.state();
if (signedTx == null) {
throw new ChannelError('`signedTx` missed in state while reconnection');
}
changeState(channel, buildTx(signedTx));
} else {
enterState(channel, { handler: connectionHandler });
}
enterState(channel, { handler: connectionHandler });
ping(channel);
},
onclose: (event: ICloseEvent) => {
Expand Down
2 changes: 0 additions & 2 deletions src/tx/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ export enum Tag {
ContractCreateTx = 42,
ContractCallTx = 43,
ChannelCreateTx = 50,
// ChannelSetDelegatesTx = 501,
ChannelDepositTx = 51,
ChannelWithdrawTx = 52,
ChannelForceProgressTx = 521,
Expand All @@ -114,7 +113,6 @@ export enum Tag {
ChannelSlashTx = 55,
ChannelSettleTx = 56,
ChannelOffChainTx = 57,
ChannelClientReconnectTx = 575,
ChannelSnapshotSoloTx = 59,
GaAttachTx = 80,
GaMetaTx = 81,
Expand Down
7 changes: 0 additions & 7 deletions src/tx/builder/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,6 @@ export const txSchema = [{
ttl,
fee,
nonce: nonce('fromId'),
}, {
tag: shortUIntConst(Tag.ChannelClientReconnectTx),
version: shortUIntConst(1, true),
channelId: address(Encoding.Channel),
round: shortUInt,
role: string,
pubkey: address(Encoding.AccountAddress),
}, {
tag: shortUIntConst(Tag.GaAttachTx),
version: shortUIntConst(1, true),
Expand Down
35 changes: 16 additions & 19 deletions test/integration/channel-other.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('Channel other', () => {
const [initiatorBalanceBeforeClose, responderBalanceBeforeClose] = await getBalances();
const closeSoloTx = await aeSdk.buildTx({
tag: Tag.ChannelCloseSoloTx,
channelId: await initiatorCh.id(),
channelId: initiatorCh.id(),
fromId: initiator.address,
poi,
payload: signedTx,
Expand All @@ -85,7 +85,7 @@ describe('Channel other', () => {

const settleTx = await aeSdk.buildTx({
tag: Tag.ChannelSettleTx,
channelId: await initiatorCh.id(),
channelId: initiatorCh.id(),
fromId: initiator.address,
initiatorAmountFinal: balances[initiator.address],
responderAmountFinal: balances[responder.address],
Expand Down Expand Up @@ -164,36 +164,33 @@ describe('Channel other', () => {
.should.be.equal(true);
}).timeout(timeoutBlock);

// https://github.com/aeternity/protocol/blob/d634e7a3f3110657900759b183d0734e61e5803a/node/api/channels_api_usage.md#reestablish
it('can reconnect', async () => {
expect(await initiatorCh.round()).to.be.equal(1);
const result = await initiatorCh.update(
initiator.address,
responder.address,
100,
initiatorSign,
);
expect(result.accepted).to.equal(true);
const channelId = await initiatorCh.id();
it('can reconnect a channel without leave', async () => {
expect(initiatorCh.round()).to.be.equal(1);
await initiatorCh.update(initiator.address, responder.address, 100, initiatorSign);
expect(initiatorCh.round()).to.be.equal(2);
const channelId = initiatorCh.id();
const fsmId = initiatorCh.fsmId();
initiatorCh.disconnect();
await waitForChannel(initiatorCh, ['disconnected']);
const ch = await Channel.initialize({
...sharedParams,
...initiatorParams,
existingChannelId: channelId,
existingFsmId: fsmId,
});
await waitForChannel(ch);
await waitForChannel(ch, ['open']);
expect(ch.fsmId()).to.be.equal(fsmId);
expect(await ch.round()).to.be.equal(2);
expect(ch.round()).to.be.equal(2);
const state = await ch.state();
ch.disconnect();
assertNotNull(state.signedTx);
expect(state.signedTx.encodedTx.tag).to.be.equal(Tag.ChannelOffChainTx);
await ch.update(initiator.address, responder.address, 100, initiatorSign);
expect(ch.round()).to.be.equal(3);
ch.disconnect();
});

it('can post backchannel update', async () => {
expect(await responderCh.round()).to.be.equal(1);
expect(responderCh.round()).to.be.equal(1);
initiatorCh.disconnect();
const { accepted } = await responderCh.update(
initiator.address,
Expand All @@ -202,7 +199,7 @@ describe('Channel other', () => {
responderSign,
);
expect(accepted).to.equal(false);
expect(await responderCh.round()).to.be.equal(1);
expect(responderCh.round()).to.be.equal(1);
const result = await responderCh.update(
initiator.address,
responder.address,
Expand All @@ -212,7 +209,7 @@ describe('Channel other', () => {
),
);
result.accepted.should.equal(true);
expect(await responderCh.round()).to.be.equal(2);
expect(responderCh.round()).to.be.equal(2);
expect(result.signedTx).to.be.a('string');
});
});
27 changes: 15 additions & 12 deletions test/integration/channel-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import {
} from '../../src';
import { ChannelOptions, SignTxWithTag } from '../../src/channel/internal';

export async function waitForChannel(channel: Channel): Promise<void> {
export async function waitForChannel(channel: Channel, statuses: string[]): Promise<void> {
return new Promise((resolve, reject) => {
channel.on('statusChanged', (status: string) => {
switch (status) {
case 'open':
resolve();
break;
case 'disconnected':
reject(new Error('Unexpected SC status: disconnected'));
break;
default:
function handler(status: string): void {
const expectedStatus = statuses.shift();
if (status !== expectedStatus) {
reject(new Error(`Expected SC status ${expectedStatus}, got ${status} instead`));
channel.off('statusChanged', handler);
} else if (statuses.length === 0) {
resolve();
channel.off('statusChanged', handler);
}
});
}
channel.on('statusChanged', handler);
});
}

Expand Down Expand Up @@ -46,7 +46,10 @@ export async function initializeChannels(
...sharedParams,
...responderParams,
});
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)]);
await Promise.all([
waitForChannel(initiatorCh, ['accepted', 'signed', 'open']),
waitForChannel(responderCh, ['halfSigned', 'open']),
]);
return [initiatorCh, responderCh];
}

Expand Down
21 changes: 10 additions & 11 deletions test/integration/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,33 +572,32 @@ describe('Channel', () => {
// TODO: check `initiatorAmountFinal` and `responderAmountFinal`
});

let existingChannelId: Encoded.Channel;
let offchainTx: Encoded.Transaction;
it('can leave a channel', async () => {
initiatorCh.disconnect();
responderCh.disconnect();
[initiatorCh, responderCh] = await initializeChannels(initiatorParams, responderParams);
initiatorCh.round(); // existingChannelRound
await initiatorCh.update(initiator.address, responder.address, 100, initiatorSign);
const result = await initiatorCh.leave();
expect(result.channelId).to.satisfy((t: string) => t.startsWith('ch_'));
expect(result.signedTx).to.satisfy((t: string) => t.startsWith('tx_'));
existingChannelId = result.channelId;
offchainTx = result.signedTx;
});

// https://github.com/aeternity/protocol/blob/d634e7a3f3110657900759b183d0734e61e5803a/node/api/channels_api_usage.md#reestablish
it('can reestablish a channel', async () => {
expect(initiatorCh.round()).to.be.equal(2);
initiatorCh = await Channel.initialize({
...sharedParams,
...initiatorParams,
// @ts-expect-error TODO: use existingChannelId instead existingFsmId
existingFsmId: existingChannelId,
offchainTx,
reestablish: true,
existingChannelId: initiatorCh.id(),
existingFsmId: initiatorCh.fsmId(),
});
await waitForChannel(initiatorCh);
// TODO: why node doesn't return signed_tx when channel is reestablished?
// initiatorCh.round().should.equal(existingChannelRound)
await waitForChannel(initiatorCh, ['open']);
expect(initiatorCh.round()).to.be.equal(2);
sinon.assert.notCalled(initiatorSignTag);
sinon.assert.notCalled(responderSignTag);
await initiatorCh.update(initiator.address, responder.address, 100, initiatorSign);
expect(initiatorCh.round()).to.be.equal(3);
});

describe('throws errors', () => {
Expand Down