diff --git a/src/core/currency/change-server-connection.ts b/src/core/currency/change-server-connection.ts new file mode 100644 index 00000000..1d7038de --- /dev/null +++ b/src/core/currency/change-server-connection.ts @@ -0,0 +1,81 @@ +import { utf8 } from '../../util/encoding' +import { + Address, + changeProtocol, + SubscribeResult +} from './change-server-protocol' + +interface ChangeServerCallbacks { + handleChange: (address: Address) => void + handleClose: () => void + handleConnect: () => void +} + +export interface ChangeServerConnection { + subscribe: (params: Address[]) => Promise + unsubscribe: (params: Address[]) => Promise + close: () => void + connected: boolean +} + +/** + * Bundles a change-server Websocket and codec pair. + */ +export function connectChangeServer( + url: string, + callbacks: ChangeServerCallbacks +): ChangeServerConnection { + const ws = new WebSocket(url) + ws.binaryType = 'arraybuffer' + + const codec = changeProtocol.makeClientCodec({ + // We failed to send a message, so shut down the socket: + handleError() { + ws.close() + }, + + async handleSend(text) { + ws.send(text) + }, + + localMethods: { + update(params) { + callbacks.handleChange(params) + } + } + }) + + ws.addEventListener('message', ev => { + const text = utf8.stringify(new Uint8Array(ev.data as ArrayBuffer)) + codec.handleMessage(text) + }) + + ws.addEventListener('close', () => { + out.connected = false + codec.handleClose() + callbacks.handleClose() + }) + + ws.addEventListener('error', () => ws.close()) + ws.addEventListener('open', () => { + out.connected = true + callbacks.handleConnect() + }) + + const out: ChangeServerConnection = { + async subscribe(params) { + return await codec.remoteMethods.subscribe(params) + }, + + async unsubscribe(params) { + await codec.remoteMethods.unsubscribe(params) + }, + + close() { + ws.close() + }, + + connected: false + } + return out +} diff --git a/src/core/currency/change-server-protocol.ts b/src/core/currency/change-server-protocol.ts new file mode 100644 index 00000000..eef6ad96 --- /dev/null +++ b/src/core/currency/change-server-protocol.ts @@ -0,0 +1,60 @@ +import { + asArray, + asOptional, + asString, + asTuple, + asValue, + Cleaner +} from 'cleaners' + +import { makeRpcProtocol } from '../../util/json-rpc' + +/** + * A chain and address identifier, like `['bitcoin', '19z88q...']` + */ +export type Address = [ + pluginId: string, + address: string, + + /** + * Block height or similar. + * Might be missing the first time we scan an address. + */ + checkpoint?: string +] + +const asAddress = asTuple
(asString, asString, asOptional(asString)) + +export type SubscribeResult = + /** Subscribe failed */ + | 0 + /** Subscribe succeeded, no changes */ + | 1 + /** Subscribed succeeded, changes present */ + | 2 + +const asSubscribeResult: Cleaner = asValue(0, 1, 2) + +export const changeProtocol = makeRpcProtocol({ + serverMethods: { + subscribe: { + asParams: asArray(asAddress), + asResult: asArray(asSubscribeResult) + }, + + unsubscribe: { + asParams: asArray(asAddress), + asResult: asValue(undefined) + } + }, + + clientMethods: { + update: { + asParams: asAddress + } + } +}) + +export type ChangeClientCodec = ReturnType< + (typeof changeProtocol)['makeClientCodec'] +> diff --git a/src/core/currency/currency-pixie.ts b/src/core/currency/currency-pixie.ts index ed887da1..d9165754 100644 --- a/src/core/currency/currency-pixie.ts +++ b/src/core/currency/currency-pixie.ts @@ -3,11 +3,16 @@ import { combinePixies, mapPixie, TamePixie } from 'redux-pixies' import { matchJson } from '../../util/match-json' import { InfoCacheFile } from '../context/info-cache-file' import { ApiInput, RootProps } from '../root-pixie' +import { + ChangeServerConnection, + connectChangeServer +} from './change-server-connection' import { CurrencyWalletOutput, CurrencyWalletProps, walletPixie } from './wallet/currency-wallet-pixie' +import { CurrencyWalletState } from './wallet/currency-wallet-reducer' export interface CurrencyOutput { readonly wallets: { [walletId: string]: CurrencyWalletOutput } @@ -51,5 +56,49 @@ export const currency: TamePixie = combinePixies({ } lastInfo = infoCache } + }, + + changeSocket(input: ApiInput) { + let lastWallets: { [walletId: string]: CurrencyWalletState } | undefined + let socket: ChangeServerConnection | undefined + + return async () => { + // Grab the wallet state, and bail out if there are no changes: + const { wallets } = input.props.state.currency + if (wallets === lastWallets) return + lastWallets = wallets + + const subs = new Set() + + // Diff the wallet state with the current subscriptions: + // todo + + // Connect the socket if we have 1 or more subscriptions: + if (socket == null && subs.size > 1) { + socket = connectChangeServer('wss://change1.edge.app', { + handleChange() { + // Send to wallets! + }, + handleClose() { + // TODO: Reconnect logic + }, + handleConnect() { + // Do we even care? + } + }) + } + + // Disconnect the socket if we have 0 subscriptions: + if (socket != null && subs.size === 0) { + socket.close() + socket = undefined + } + + // Subscribe what's new: + if (socket?.connected === true) await socket.subscribe([]) + + // Unsubscribe what's gone: + if (socket?.connected === true) await socket.unsubscribe([]) + } } }) diff --git a/src/types/types.ts b/src/types/types.ts index 295d7155..1f3ffe7a 100644 --- a/src/types/types.ts +++ b/src/types/types.ts @@ -950,11 +950,32 @@ export interface EdgeCurrencyEngineOptions { export interface EdgeCurrencyEngine { readonly changeUserSettings: (settings: JsonObject) => Promise - // Engine status: + /** + * Starts any persistent resources the engine needs, such as WebSockets. + * Engines should use `syncNetwork` for periodic tasks (polling), + * rather than trying to manage those by itself. + */ readonly startEngine: () => Promise + + /** + * Shut down the engine, including open sockets, timers, + * and any in-progress tasks that support cancellation. + */ readonly killEngine: () => Promise - readonly resyncBlockchain: () => Promise + + /** + * Polls the blockchain for updates. + * The return value is the delay (in ms) the engine wants to wait + * before its next poll. For engines with active address subscriptions, + * the core will ignore this number and simply wait for the next + * on-chain update. + * Engines with `EdgeCurrencyInfo.unsafeSyncNetwork` + * will receive their private keys in the arguments. + */ readonly syncNetwork?: (opts: EdgeEnginePrivateKeyOptions) => Promise + + // Engine status: + readonly resyncBlockchain: () => Promise readonly dumpData: () => Promise // Chain state: diff --git a/src/util/json-rpc.ts b/src/util/json-rpc.ts new file mode 100644 index 00000000..5c196c40 --- /dev/null +++ b/src/util/json-rpc.ts @@ -0,0 +1,380 @@ +import { + asMaybe, + asNumber, + asObject, + asOptional, + asString, + asUnknown, + asValue, + Cleaner, + uncleaner +} from 'cleaners' + +/** + * A codec object can make calls to the remote system, + * and can process incoming messages from the remote system. + */ +export interface RpcCodec { + /** Cancels all pending method calls if the connection closes. */ + handleClose: () => void + + /** Processes an incoming message from the remote side. */ + handleMessage: (message: string) => void + + /** Call these methods to send messages to the remote side. */ + remoteMethods: RemoteMethods +} + +/** + * To construct a codec, provide a way to send outgoing messages, + * and implementations of any methods our side supports. + */ +export interface RpcCodecOpts { + /** Called if `handleSend` fails. */ + handleError: (error: unknown) => void + + /** Sends a message to the remote side. */ + handleSend: (text: string) => Promise + + /** Implement any messages this side supports receiving. */ + localMethods: LocalMethods +} + +/** + * The protocol object can construct client and server instances, + * depending on which side you want to be. + */ +export interface RpcProtocol { + makeServerCodec: ( + opts: RpcCodecOpts + ) => RpcCodec + + makeClientCodec: ( + opts: RpcCodecOpts + ) => RpcCodec +} + +/** + * Type definitions for remote methods. + * + * For example: + * ``` + * { + * method: { asParams: asTuple(asString, asNumber), asResult: asString } + * notification: { asParams: asTuple(asString, asNumber) } + * } + * ``` + */ +interface MethodCleaners { + [name: string]: { + asParams: Cleaner + + /** Cleans the method return value. Not present for notifications. */ + asResult?: Cleaner + } +} + +/** + * Accepts cleaners for the two sides of a protocol, + * and returns a codec factory. + */ +export function makeRpcProtocol< + ServerCleaners extends MethodCleaners, + ClientCleaners extends MethodCleaners +>(opts: { + /** + * Methods supported on the server side. + */ + serverMethods: ServerCleaners + + /** + * Methods supported on the client side. + */ + clientMethods: ClientCleaners + + /** + * Optionally used if the protocol differs from strict JSON-RPC 2.0. + */ + asCall?: Cleaner + asReturn?: Cleaner +}): RpcProtocol, Methods> { + const { + serverMethods, + clientMethods, + asCall = asJsonRpcCall, + asReturn = asJsonRpcReturn + } = opts + + return { + makeServerCodec(opts) { + return makeCodec(serverMethods, clientMethods, asCall, asReturn, opts) + }, + makeClientCodec(opts) { + return makeCodec(clientMethods, serverMethods, asCall, asReturn, opts) + } + } +} + +type Methods = { + // Normal methods with return values: + [Name in keyof T]: T[Name] extends { + asParams: Cleaner + asResult: Cleaner + } + ? (params: P) => Promise + : // Notifications, which have no return value: + T[Name] extends { + asParams: Cleaner + } + ? (params: P) => void + : // This should never happen: + never +} + +function makeCodec( + localCleaners: MethodCleaners, + remoteCleaners: MethodCleaners, + asCall: Cleaner, + asReturn: Cleaner, + opts: RpcCodecOpts +): RpcCodec { + const { handleError, handleSend, localMethods } = opts + const wasCall = uncleaner(asCall) + const wasReturn = uncleaner(asReturn) + + const sendError = async ( + code: number, + message: string, + id: RpcId = null + ): Promise => + await handleSend( + JSON.stringify( + wasReturn({ + jsonrpc: '2.0', + result: undefined, + error: { code, message, data: undefined }, + id + }) + ) + ) + + // Remote state: + let nextRemoteId = 0 + const remoteCalls = new Map() + + // Create proxy functions for each remote method: + const remoteMethods: { + [name: string]: (params: unknown) => unknown + } = {} + for (const name of Object.keys(remoteCleaners)) { + const { asParams, asResult } = remoteCleaners[name] + const wasParams = uncleaner(asParams) + + if (asResult == null) { + // It's a notification, so send the message with no result handling: + remoteMethods[name] = (params: unknown): void => { + handleSend( + JSON.stringify( + wasCall({ + jsonrpc: '2.0', + method: name, + params: wasParams(params), + id: undefined + }) + ) + ).catch(handleError) + } + } else { + // It's a method call, so sign up to receive a result: + remoteMethods[name] = (params: unknown): unknown => { + const id = nextRemoteId++ + const out = new Promise((resolve, reject) => { + remoteCalls.set(id, { + asResult, + resolve, + reject + }) + }) + + handleSend( + JSON.stringify( + wasCall({ + jsonrpc: '2.0', + method: name, + params: wasParams(params), + id + }) + ) + ).catch(handleError) + return out + } + } + } + + function handleMessage(message: string): void { + let json: unknown + try { + json = JSON.parse(message) + } catch (error) { + sendError(-32700, `Parse error: ${errorMessage(error)}`).catch( + handleError + ) + return + } + + // TODO: We need to add support for batch calls. + const call = asMaybe(asCall)(json) + const response = asMaybe(asReturn)(json) + + if (call != null) { + const { method, id, params } = call + const cleaners = localCleaners[method] + if (cleaners == null) { + sendError(-32601, `Method not found: ${method}`).catch(handleError) + return + } + + if (cleaners.asResult != null && id == null) { + sendError(-32600, `Invalid JSON-RPC request: missing id`).catch( + handleError + ) + return + } + + if (cleaners.asResult == null && id != null) { + sendError( + -32600, + `Invalid JSON-RPC request: notification has an id` + ).catch(handleError) + return + } + + let cleanParams: unknown + try { + cleanParams = cleaners.asParams(params) + } catch (error) { + sendError(-32602, `Invalid params: ${errorMessage(error)}`).catch( + handleError + ) + return + } + + const out = localMethods[method](cleanParams) + if (out.then != null && cleaners.asResult != null) { + const wasResult = uncleaner(cleaners.asResult) + out.then( + (result: unknown) => { + handleSend( + JSON.stringify( + wasReturn({ + jsonrpc: '2.0', + result: wasResult(result), + error: undefined, + id: id ?? null + }) + ) + ).catch(handleError) + }, + (error: unknown) => { + sendError(1, errorMessage(error), id).catch(handleError) + } + ) + } + } else if (response != null) { + const { error, id, result } = response + if (typeof id !== 'number') { + // It's not a call we made... + sendError(-32603, `Cannot find id ${String(id)}`, id).catch(handleError) + return + } + const pendingCall = remoteCalls.get(id) + if (pendingCall == null) { + sendError(-32603, `Cannot find id ${String(id)}`, id).catch(handleError) + return + } + remoteCalls.delete(id) + + if (error != null) { + pendingCall.reject(new Error(error.message)) + } else { + const { asResult } = pendingCall + let cleanResult: unknown + try { + cleanResult = asResult(result) + } catch (error) { + pendingCall.reject(error) + } + pendingCall.resolve(cleanResult) + } + } else { + sendError(-32600, `Invalid JSON-RPC request / response`).catch( + handleError + ) + } + } + + function handleClose(): void { + for (const call of remoteCalls.values()) { + call.reject(new Error('JSON-RPC connection closed')) + } + } + + return { + handleClose, + handleMessage, + remoteMethods: remoteMethods as any + } +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error) +} + +interface PendingRemoteCall { + resolve: (result: unknown) => void + reject: (error: unknown) => void + asResult: Cleaner +} + +type RpcId = number | string | null + +export interface JsonRpcCall { + jsonrpc: '2.0' + method: string + params: unknown + id?: RpcId // Missing for notifications +} + +export interface JsonRpcReturn { + jsonrpc: '2.0' + result: unknown + error?: { code: number; message: string; data?: unknown } + id: RpcId // Null for protocol errors +} + +const asRpcId = (raw: unknown): RpcId => { + if (raw === null) return raw + if (typeof raw === 'string') return raw + if (typeof raw === 'number' && Number.isInteger(raw)) return raw + throw new TypeError('Expected a string or an integer') +} + +const asJsonRpcCall = asObject({ + jsonrpc: asValue('2.0'), + method: asString, + params: asUnknown, + id: asOptional(asRpcId) +}) + +const asJsonRpcReturn = asObject({ + jsonrpc: asValue('2.0'), + result: asUnknown, + error: asOptional( + asObject({ + code: asNumber, + message: asString, + data: asUnknown + }) + ), + id: asRpcId +})