Skip to content

William/change server #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/core/currency/change-server-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { utf8 } from '../../util/encoding'
import {
Address,
changeProtocol,
SubscribeResult
} from './change-server-protocol'

interface ChangeServerCallbacks {
handleChange: (address: Address) => void
handleClose: () => void
handleConnect: () => void
}

export interface ChangeServerConnection {
subscribe: (params: Address[]) => Promise<SubscribeResult[]>
unsubscribe: (params: Address[]) => Promise<void>
close: () => void
connected: boolean
}

/**
* Bundles a change-server Websocket and codec pair.
*/
export function connectChangeServer(
url: string,
callbacks: ChangeServerCallbacks
): ChangeServerConnection {
const ws = new WebSocket(url)
ws.binaryType = 'arraybuffer'

const codec = changeProtocol.makeClientCodec({
// We failed to send a message, so shut down the socket:
handleError() {
ws.close()
},

async handleSend(text) {
ws.send(text)
},

localMethods: {
update(params) {
callbacks.handleChange(params)
}
}
})

ws.addEventListener('message', ev => {
const text = utf8.stringify(new Uint8Array(ev.data as ArrayBuffer))
codec.handleMessage(text)
})

ws.addEventListener('close', () => {
out.connected = false
codec.handleClose()
callbacks.handleClose()
})

ws.addEventListener('error', () => ws.close())
ws.addEventListener('open', () => {
out.connected = true
callbacks.handleConnect()
})

const out: ChangeServerConnection = {
async subscribe(params) {
return await codec.remoteMethods.subscribe(params)
},

async unsubscribe(params) {
await codec.remoteMethods.unsubscribe(params)
},

close() {
ws.close()
},

connected: false
}
return out
}
60 changes: 60 additions & 0 deletions src/core/currency/change-server-protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
asArray,
asOptional,
asString,
asTuple,
asValue,
Cleaner
} from 'cleaners'

import { makeRpcProtocol } from '../../util/json-rpc'

/**
* A chain and address identifier, like `['bitcoin', '19z88q...']`
*/
export type Address = [
pluginId: string,
address: string,

/**
* Block height or similar.
* Might be missing the first time we scan an address.
*/
checkpoint?: string
]

const asAddress = asTuple<Address>(asString, asString, asOptional(asString))

export type SubscribeResult =
/** Subscribe failed */
| 0
/** Subscribe succeeded, no changes */
| 1
/** Subscribed succeeded, changes present */
| 2

const asSubscribeResult: Cleaner<SubscribeResult> = asValue(0, 1, 2)

export const changeProtocol = makeRpcProtocol({
serverMethods: {
subscribe: {
asParams: asArray(asAddress),
asResult: asArray(asSubscribeResult)
},

unsubscribe: {
asParams: asArray(asAddress),
asResult: asValue(undefined)
}
},

clientMethods: {
update: {
asParams: asAddress
}
}
})

export type ChangeClientCodec = ReturnType<
(typeof changeProtocol)['makeClientCodec']
>
49 changes: 49 additions & 0 deletions src/core/currency/currency-pixie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ import { combinePixies, mapPixie, TamePixie } from 'redux-pixies'
import { matchJson } from '../../util/match-json'
import { InfoCacheFile } from '../context/info-cache-file'
import { ApiInput, RootProps } from '../root-pixie'
import {
ChangeServerConnection,
connectChangeServer
} from './change-server-connection'
import {
CurrencyWalletOutput,
CurrencyWalletProps,
walletPixie
} from './wallet/currency-wallet-pixie'
import { CurrencyWalletState } from './wallet/currency-wallet-reducer'

export interface CurrencyOutput {
readonly wallets: { [walletId: string]: CurrencyWalletOutput }
Expand Down Expand Up @@ -51,5 +56,49 @@ export const currency: TamePixie<RootProps> = combinePixies({
}
lastInfo = infoCache
}
},

changeSocket(input: ApiInput) {
let lastWallets: { [walletId: string]: CurrencyWalletState } | undefined
let socket: ChangeServerConnection | undefined

return async () => {
// Grab the wallet state, and bail out if there are no changes:
const { wallets } = input.props.state.currency
if (wallets === lastWallets) return
lastWallets = wallets

const subs = new Set()

// Diff the wallet state with the current subscriptions:
// todo

// Connect the socket if we have 1 or more subscriptions:
if (socket == null && subs.size > 1) {
socket = connectChangeServer('wss://change1.edge.app', {
handleChange() {
// Send to wallets!
},
handleClose() {
// TODO: Reconnect logic
},
handleConnect() {
// Do we even care?
}
})
}

// Disconnect the socket if we have 0 subscriptions:
if (socket != null && subs.size === 0) {
socket.close()
socket = undefined
}

// Subscribe what's new:
if (socket?.connected === true) await socket.subscribe([])

// Unsubscribe what's gone:
if (socket?.connected === true) await socket.unsubscribe([])
}
}
})
25 changes: 23 additions & 2 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -950,11 +950,32 @@ export interface EdgeCurrencyEngineOptions {
export interface EdgeCurrencyEngine {
readonly changeUserSettings: (settings: JsonObject) => Promise<void>

// Engine status:
/**
* Starts any persistent resources the engine needs, such as WebSockets.
* Engines should use `syncNetwork` for periodic tasks (polling),
* rather than trying to manage those by itself.
*/
readonly startEngine: () => Promise<void>

/**
* Shut down the engine, including open sockets, timers,
* and any in-progress tasks that support cancellation.
*/
readonly killEngine: () => Promise<void>
readonly resyncBlockchain: () => Promise<void>

/**
* Polls the blockchain for updates.
* The return value is the delay (in ms) the engine wants to wait
* before its next poll. For engines with active address subscriptions,
* the core will ignore this number and simply wait for the next
* on-chain update.
* Engines with `EdgeCurrencyInfo.unsafeSyncNetwork`
* will receive their private keys in the arguments.
*/
readonly syncNetwork?: (opts: EdgeEnginePrivateKeyOptions) => Promise<number>

// Engine status:
readonly resyncBlockchain: () => Promise<void>
readonly dumpData: () => Promise<EdgeDataDump>

// Chain state:
Expand Down
Loading
Loading