Skip to content

Commit cf0bb7f

Browse files
committed
Adds support for a wider variety of Block and Log models.
Depending on the library you are using to provide blocks/logs to blockstream, your Block and Log model may not align exactly with the model this library was using. Now as long as you meet some very basic minimum interface requirements you can use any Block/Log model you want.
1 parent 0a0fa6e commit cf0bb7f

11 files changed

+89
-106
lines changed

.travis.yml

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ sudo: false
33
language: node_js
44

55
node_js:
6-
- "7"
7-
- "6"
6+
- "8"
87

98
before_script:
109
- npm install

package-lock.json

+12-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"typescript": "2.2.2"
3636
},
3737
"dependencies": {
38-
"immutable": "3.8.1",
38+
"immutable": "3.8.2",
3939
"source-map-support": "0.4.14",
4040
"uuid": "3.0.1"
4141
}

source/block-and-log-streamer.ts

+36-34
Original file line numberDiff line numberDiff line change
@@ -9,66 +9,68 @@ import { reconcileLogHistoryWithAddedBlock, reconcileLogHistoryWithRemovedBlock
99
import { List as ImmutableList } from "immutable";
1010
import * as createUuid from "uuid";
1111

12-
export class BlockAndLogStreamer {
13-
private blockHistory: Promise<BlockHistory> = Promise.resolve(ImmutableList<Block>());
14-
private logHistory: Promise<LogHistory> = Promise.resolve(ImmutableList<Log>());
15-
private latestBlock: Block | null = null;
12+
export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
13+
private blockHistory: Promise<BlockHistory<TBlock>> = Promise.resolve(ImmutableList<TBlock>());
14+
private logHistory: Promise<LogHistory<TLog>> = Promise.resolve(ImmutableList<TLog>());
15+
private latestBlock: TBlock | null = null;
1616

1717
private readonly blockRetention: number;
1818

19-
private readonly getBlockByHash: (hash: string) => Promise<Block | null>;
20-
private readonly getLogs: (filterOptions: FilterOptions) => Promise<Log[]>;
19+
private readonly getBlockByHash: (hash: string) => Promise<TBlock | null>;
20+
private readonly getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>;
2121

2222
private readonly logFilters: { [propName: string]: Filter } = {}
23-
private readonly onBlockAddedSubscribers: { [propName: string]: (block: Block) => void } = {};
24-
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: Block) => void } = {};
25-
private readonly onLogAddedSubscribers: { [propName: string]: (log: Log) => void } = {};
26-
private readonly onLogRemovedSubscribers: { [propName: string]: (log: Log) => void } = {};
23+
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
24+
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
25+
private readonly onLogAddedSubscribers: { [propName: string]: (log: TLog) => void } = {};
26+
private readonly onLogRemovedSubscribers: { [propName: string]: (log: TLog) => void } = {};
2727

2828
constructor(
29-
getBlockByHash: (hash: string) => Promise<Block | null>,
30-
getLogs: (filterOptions: FilterOptions) => Promise<Log[]>,
29+
getBlockByHash: (hash: string) => Promise<TBlock | null>,
30+
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
3131
configuration?: { blockRetention?: number },
3232
) {
3333
this.getBlockByHash = getBlockByHash;
3434
this.getLogs = getLogs;
3535
this.blockRetention = (configuration && configuration.blockRetention) ? configuration.blockRetention : 100;
3636
}
3737

38-
static createCallbackStyle = (
39-
getBlockByHash: (hash: string, callback: (error?: Error, block?: Block | null) => void) => void,
40-
getLogs: (filterOptions: FilterOptions, callback: (error?: Error, logs?: Log[]) => void) => void,
38+
static createCallbackStyle = <TBlock extends Block, TLog extends Log>(
39+
getBlockByHash: (hash: string, callback: (error?: Error, block?: TBlock | null) => void) => void,
40+
getLogs: (filterOptions: FilterOptions, callback: (error?: Error, logs?: TLog[]) => void) => void,
4141
configuration?: { blockRetention?: number },
42-
): BlockAndLogStreamer => {
43-
const wrappedGetBlockByHash = (hash: string): Promise<Block | null> => new Promise<Block | null>((resolve, reject) => {
44-
getBlockByHash(hash, (error, block) => {
45-
if (error) throw error;
46-
else resolve(block);
42+
): BlockAndLogStreamer<TBlock, TLog> => {
43+
const wrappedGetBlockByHash = (hash: string): Promise<TBlock | null> => {
44+
return new Promise<TBlock | null>((resolve, reject) => {
45+
getBlockByHash(hash, (error, block) => {
46+
if (error) throw error;
47+
else resolve(block);
48+
});
4749
});
48-
});
49-
const wrappedGetLogs = (filterOptions: FilterOptions): Promise<Log[]> => new Promise<Log[]>((resolve, reject) => {
50+
};
51+
const wrappedGetLogs = (filterOptions: FilterOptions): Promise<Array<TLog>> => new Promise<Array<TLog>>((resolve, reject) => {
5052
getLogs(filterOptions, (error, logs) => {
5153
if (error) throw error;
5254
if (!logs) throw new Error("Received null/undefined logs and no error.");
5355
resolve(logs);
5456
});
5557
});
56-
return new BlockAndLogStreamer(wrappedGetBlockByHash, wrappedGetLogs, configuration);
58+
return new BlockAndLogStreamer<TBlock, TLog>(wrappedGetBlockByHash, wrappedGetLogs, configuration);
5759
}
5860

59-
public readonly reconcileNewBlock = async (block: Block): Promise<void> => {
61+
public readonly reconcileNewBlock = async (block: TBlock): Promise<void> => {
6062
this.blockHistory = reconcileBlockHistory(this.getBlockByHash, this.blockHistory, block, this.onBlockAdded, this.onBlockRemoved, this.blockRetention);
6163
const blockHistory = await this.blockHistory;
6264
this.latestBlock = blockHistory.last();
6365
};
6466

65-
public readonly reconcileNewBlockCallbackStyle = async (block: Block, callback: (error?: Error) => void): Promise<void> => {
67+
public readonly reconcileNewBlockCallbackStyle = async (block: TBlock, callback: (error?: Error) => void): Promise<void> => {
6668
this.reconcileNewBlock(block)
6769
.then(() => callback(undefined))
6870
.catch(error => callback(error));
6971
};
7072

71-
private readonly onBlockAdded = async (block: Block): Promise<void> => {
73+
private readonly onBlockAdded = async (block: TBlock): Promise<void> => {
7274
const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]);
7375
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention);
7476

@@ -79,7 +81,7 @@ export class BlockAndLogStreamer {
7981
.forEach(callback => callback(block));
8082
};
8183

82-
private readonly onBlockRemoved = async (block: Block): Promise<void> => {
84+
private readonly onBlockRemoved = async (block: TBlock): Promise<void> => {
8385
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved);
8486

8587
await this.logHistory;
@@ -89,22 +91,22 @@ export class BlockAndLogStreamer {
8991
.forEach(callback => callback(block));
9092
};
9193

92-
private readonly onLogAdded = async (log: Log): Promise<void> => {
94+
private readonly onLogAdded = async (log: TLog): Promise<void> => {
9395
Object.keys(this.onLogAddedSubscribers)
9496
.map((key: string) => this.onLogAddedSubscribers[key])
9597
.map(callback => logAndSwallowWrapper(callback))
9698
.forEach(callback => callback(log));
9799
};
98100

99-
private readonly onLogRemoved = async (log: Log): Promise<void> => {
101+
private readonly onLogRemoved = async (log: TLog): Promise<void> => {
100102
Object.keys(this.onLogRemovedSubscribers)
101103
.map((key: string) => this.onLogRemovedSubscribers[key])
102104
.map(callback => logAndSwallowWrapper(callback))
103105
.forEach(callback => callback(log));
104106
};
105107

106108

107-
public readonly getLatestReconciledBlock = (): Block | null => {
109+
public readonly getLatestReconciledBlock = (): TBlock | null => {
108110
return this.latestBlock;
109111
};
110112

@@ -121,7 +123,7 @@ export class BlockAndLogStreamer {
121123
};
122124

123125

124-
public readonly subscribeToOnBlockAdded = (onBlockAdded: (block: Block) => void): string => {
126+
public readonly subscribeToOnBlockAdded = (onBlockAdded: (block: TBlock) => void): string => {
125127
const uuid = `on block added token ${createUuid()}`;
126128
this.onBlockAddedSubscribers[uuid] = onBlockAdded;
127129
return uuid;
@@ -133,7 +135,7 @@ export class BlockAndLogStreamer {
133135
};
134136

135137

136-
public readonly subscribeToOnBlockRemoved = (onBlockRemoved: (block: Block) => void): string => {
138+
public readonly subscribeToOnBlockRemoved = (onBlockRemoved: (block: TBlock) => void): string => {
137139
const uuid = `on block removed token ${createUuid()}`;
138140
this.onBlockRemovedSubscribers[uuid] = onBlockRemoved;
139141
return uuid;
@@ -145,7 +147,7 @@ export class BlockAndLogStreamer {
145147
};
146148

147149

148-
public readonly subscribeToOnLogAdded = (onLogAdded: (log: Log) => void): string => {
150+
public readonly subscribeToOnLogAdded = (onLogAdded: (log: TLog) => void): string => {
149151
const uuid = `on log added token ${createUuid()}`;
150152
this.onLogAddedSubscribers[uuid] = onLogAdded;
151153
return uuid;
@@ -157,7 +159,7 @@ export class BlockAndLogStreamer {
157159
};
158160

159161

160-
public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: Log) => void): string => {
162+
public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: TLog) => void): string => {
161163
const uuid = `on log removed token ${createUuid()}`;
162164
this.onLogRemovedSubscribers[uuid] = onLogRemoved;
163165
return uuid;

source/block-reconciler.ts

+18-16
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ import { Block } from "./models/block";
22
import { BlockHistory } from "./models/block-history";
33
import { List as ImmutableList } from "immutable";
44

5-
export const reconcileBlockHistory = async (
6-
getBlockByHash: (hash: string) => Promise<Block|null>,
7-
blockHistory: BlockHistory|Promise<BlockHistory>,
8-
newBlock: Block,
9-
onBlockAdded: (block: Block) => Promise<void>,
10-
onBlockRemoved: (block: Block) => Promise<void>,
5+
type GetBlockByHash<TBlock> = (hash: string) => Promise<TBlock|null>;
6+
7+
export const reconcileBlockHistory = async <TBlock extends Block>(
8+
getBlockByHash: GetBlockByHash<TBlock>,
9+
blockHistory: BlockHistory<TBlock>|Promise<BlockHistory<TBlock>>,
10+
newBlock: TBlock,
11+
onBlockAdded: (block: TBlock) => Promise<void>,
12+
onBlockRemoved: (block: TBlock) => Promise<void>,
1113
blockRetention: number = 100,
12-
): Promise<BlockHistory> => {
14+
): Promise<BlockHistory<TBlock>> => {
1315
blockHistory = await blockHistory;
1416
if (isFirstBlock(blockHistory))
1517
return await addNewHeadBlock(blockHistory, newBlock, onBlockAdded, blockRetention);
@@ -30,15 +32,15 @@ export const reconcileBlockHistory = async (
3032
return await backfill(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
3133
}
3234

33-
const rollback = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => {
35+
const rollback = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, onBlockRemoved: (block: TBlock) => Promise<void>): Promise<BlockHistory<TBlock>> => {
3436
while (!blockHistory.isEmpty()) {
3537
// CONSIDER: if this throws an exception, removals may have been announced that are actually still in history since throwing will result in no history update. we can't catch errors here because there isn't a clear way to recover from them, the failure may be a downstream system telling us that the block removal isn't possible because they are in a bad state. we could try re-announcing the successfully added blocks, but there would still be a problem with the failed block (should it be re-announced?) and the addition announcements may also fail
3638
blockHistory = await removeHeadBlock(blockHistory, onBlockRemoved);
3739
}
3840
return blockHistory;
3941
}
4042

41-
const backfill = async (getBlockByHash: (hash: string) => Promise<Block|null>, blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, onBlockRemoved: (block: Block) => Promise<void>, blockRetention: number) => {
43+
const backfill = async <TBlock extends Block>(getBlockByHash: GetBlockByHash<TBlock>, blockHistory: BlockHistory<TBlock>, newBlock: TBlock, onBlockAdded: (block: TBlock) => Promise<void>, onBlockRemoved: (block: TBlock) => Promise<void>, blockRetention: number) => {
4244
if (newBlock.parentHash === "0x0000000000000000000000000000000000000000000000000000000000000000")
4345
return rollback(blockHistory, onBlockRemoved);
4446
const parentBlock = await getBlockByHash(newBlock.parentHash);
@@ -49,36 +51,36 @@ const backfill = async (getBlockByHash: (hash: string) => Promise<Block|null>, b
4951
return await reconcileBlockHistory(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
5052
}
5153

52-
const addNewHeadBlock = async (blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, blockRetention: number): Promise<BlockHistory> => {
54+
const addNewHeadBlock = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock, onBlockAdded: (block: TBlock) => Promise<void>, blockRetention: number): Promise<BlockHistory<TBlock>> => {
5355
// this is here as a final sanity check, in case we somehow got into an unexpected state, there are no known (and should never be) ways to reach this exception
5456
if (!blockHistory.isEmpty() && blockHistory.last().hash !== newBlock.parentHash) throw new Error("New head block's parent isn't our current head.");
55-
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback be cause it may be trying to signal to use that the block has become invalid and is un-processable
57+
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback because it may be trying to signal to use that the block has become invalid and is un-processable
5658
await onBlockAdded(newBlock);
5759
blockHistory = blockHistory.push(newBlock);
5860
return blockHistory.takeLast(blockRetention).toList();
5961
}
6062

61-
const removeHeadBlock = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => {
63+
const removeHeadBlock = async <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, onBlockRemoved: (block: TBlock) => Promise<void>): Promise<BlockHistory<TBlock>> => {
6264
let removedBlock = blockHistory.last();
6365
blockHistory = blockHistory.pop();
6466
await onBlockRemoved(removedBlock);
6567
return blockHistory;
6668
}
6769

68-
const isFirstBlock = (blockHistory: BlockHistory, ): boolean => {
70+
const isFirstBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, ): boolean => {
6971
return blockHistory.isEmpty();
7072
}
7173

72-
const isAlreadyInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => {
74+
const isAlreadyInHistory = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => {
7375
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions
7476
return blockHistory.some(block => block!.hash === newBlock.hash);
7577
}
7678

77-
const isNewHeadBlock = (blockHistory: BlockHistory, newBlock: Block): boolean => {
79+
const isNewHeadBlock = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => {
7880
return blockHistory.last().hash === newBlock.parentHash;
7981
}
8082

81-
const parentHashIsInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => {
83+
const parentHashIsInHistory = <TBlock extends Block>(blockHistory: BlockHistory<TBlock>, newBlock: TBlock): boolean => {
8284
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions
8385
return blockHistory.some(block => block!.hash === newBlock.parentHash);
8486
}

0 commit comments

Comments
 (0)