Skip to content

Commit 7e5330f

Browse files
committed
Initial implementation.
Includes ES5 style callback bindings.
1 parent c4a8847 commit 7e5330f

22 files changed

+1273
-30
lines changed

.npmignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
node_modules/
22
.git/
3-
output/
3+
output/tests/
44
tests/
55
.vscode/
6+
output/.editorconfig
67
.editorconfig
78
.gitignore
89
Dockerfile

.vscode/launch.json

+2-11
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,9 @@
99
"request": "launch",
1010
"name": "Mocha Tests",
1111
"program": "${workspaceRoot}/node_modules/mocha/bin/_mocha",
12-
"args": [
13-
"-u",
14-
"tdd",
15-
"--timeout",
16-
"999999",
17-
"--colors",
18-
"${workspaceRoot}/output/tests"
19-
],
12+
"args": [ "--timeout", "999999", "--recursive", "--colors", "${workspaceRoot}/output/tests" ],
2013
"sourceMaps": true,
21-
"outFiles": [
22-
"${workspaceRoot}/output/**/*.js"
23-
],
14+
"outFiles": [ "${workspaceRoot}/output/**/*.js" ],
2415
"internalConsoleOptions": "openOnSessionStart"
2516
}
2617
]

output/.editorconfig

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
root = true

package.json

+12-3
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
"name": "ethereumjs-blockstream",
33
"version": "1.0.0",
44
"description": "A library to turn an unreliably remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
5-
"main": "output/index.js",
6-
"types": "output/index.d.ts",
5+
"main": "output/source/index.js",
6+
"types": "output/source/index.d.ts",
77
"scripts": {
8-
"build": "tsc",
8+
"build": "copyfiles tests/es5.js output/ && tsc",
99
"pretest": "npm run build",
1010
"test": "mocha output/tests"
1111
},
@@ -21,9 +21,18 @@
2121
"homepage": "https://github.com/ethereumjs/ethereumjs-blockstream#readme",
2222
"devDependencies": {
2323
"@types/chai": "3.4.35",
24+
"@types/chai-as-promised": "0.0.30",
2425
"@types/mocha": "2.2.40",
2526
"chai": "3.5.0",
27+
"chai-as-promised": "6.0.0",
28+
"chai-immutable": "1.6.0",
29+
"copyfiles": "^1.2.0",
2630
"mocha": "3.2.0",
2731
"typescript": "2.2.1"
32+
},
33+
"dependencies": {
34+
"@types/source-map-support": "0.2.28",
35+
"immutable": "3.8.1",
36+
"source-map-support": "0.4.14"
2837
}
2938
}

source/block-and-log-reconciler.ts

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { Block } from "./models/block";
2+
import { Log } from "./models/log";
3+
import { BlockHistory } from "./models/block-history";
4+
import { LogHistory } from "./models/log-history";
5+
import { BlockAndLogHistory } from "./models/block-and-log-history";
6+
import { Filter, FilterOptions } from "./models/filters";
7+
import { reconcileBlockHistory } from "./block-reconciler";
8+
import { reconcileLogHistoryWithAddedBlock, reconcileLogHistoryWithRemovedBlock } from "./log-reconciler";
9+
import { List as ImmutableList } from "immutable";
10+
11+
export async function reconcileBlocksAndLogs(
12+
getBlockByHash: (hash: string) => Promise<Block | null>,
13+
getLogs: (filterOptions: FilterOptions[]) => Promise<Log[]>,
14+
history: BlockAndLogHistory | null,
15+
newBlock: Block,
16+
onLogAdded: (log: Log) => Promise<void>,
17+
onLogRemoved: (log: Log) => Promise<void>,
18+
filters: Filter[] = [],
19+
blockRetention: number = 100,
20+
): Promise<BlockAndLogHistory> {
21+
const blockHistory = (history) ? history.blockHistory : ImmutableList<Block>();
22+
let newLogHistory = (history) ? history.logHistory : ImmutableList<Log>();
23+
const onBlockAdded = async (block: Block) => { newLogHistory = await reconcileLogHistoryWithAddedBlock(getLogs, newLogHistory, block, onLogAdded, filters, blockRetention); };
24+
const onBlockRemoved = async (block: Block) => { newLogHistory = await reconcileLogHistoryWithRemovedBlock(newLogHistory, block, onLogRemoved); };
25+
const newBlockHistory = await reconcileBlockHistory(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
26+
return { blockHistory: newBlockHistory, logHistory: newLogHistory };
27+
}

source/block-reconciler.ts

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { Block } from "./models/block";
2+
import { BlockHistory } from "./models/block-history";
3+
import { List as ImmutableList } from "immutable";
4+
5+
export const reconcileBlockHistory = async (
6+
getBlockByHash: (hash: string) => Promise<Block|null>,
7+
blockHistory: BlockHistory | null,
8+
newBlock: Block,
9+
onBlockAdded: (block: Block) => Promise<void>,
10+
onBlockRemoved: (block: Block) => Promise<void>,
11+
blockRetention: number = 100,
12+
): Promise<BlockHistory> => {
13+
if (blockHistory === null) blockHistory = ImmutableList<Block>();
14+
if (isFirstBlock(blockHistory))
15+
return await addNewHeadBlock(blockHistory, newBlock, onBlockAdded, blockRetention);
16+
17+
if (isAlreadyInHistory(blockHistory, newBlock))
18+
return blockHistory;
19+
20+
if (isNewHeadBlock(blockHistory, newBlock))
21+
return await addNewHeadBlock(blockHistory, newBlock, onBlockAdded, blockRetention);
22+
23+
if (parentHashIsInHistory(blockHistory, newBlock)) {
24+
while (blockHistory.last().hash !== newBlock.parentHash) {
25+
blockHistory = await removeHeadBlock(blockHistory, onBlockRemoved);
26+
}
27+
return await addNewHeadBlock(blockHistory, newBlock, onBlockAdded, blockRetention);
28+
}
29+
30+
return await backfill(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
31+
}
32+
33+
const rollback = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => {
34+
while (!blockHistory.isEmpty()) {
35+
// CONSIDER: if this throws an exception, removals may have been announced that are actually still in history since throwing will result in no history update. we can't catch errors here because there isn't a clear way to recover from them, the failure may be a downstream system telling us that the block removal isn't possible because they are in a bad state. we could try re-announcing the successfully added blocks, but there would still be a problem with the failed block (should it be re-announced?) and the addition announcements may also fail
36+
blockHistory = await removeHeadBlock(blockHistory, onBlockRemoved);
37+
}
38+
return blockHistory;
39+
}
40+
41+
const backfill = async (getBlockByHash: (hash: string) => Promise<Block|null>, blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, onBlockRemoved: (block: Block) => Promise<void>, blockRetention: number) => {
42+
const parentBlock = await getBlockByHash(newBlock.parentHash);
43+
if (parentBlock === null) throw new Error("Failed to fetch parent block.");
44+
if (parseInt(parentBlock.number, 16) + blockRetention < parseInt(blockHistory.last().number, 16))
45+
return rollback(blockHistory, onBlockRemoved);
46+
blockHistory = await reconcileBlockHistory(getBlockByHash, blockHistory, parentBlock, onBlockAdded, onBlockRemoved, blockRetention);
47+
return await reconcileBlockHistory(getBlockByHash, blockHistory, newBlock, onBlockAdded, onBlockRemoved, blockRetention);
48+
}
49+
50+
const addNewHeadBlock = async (blockHistory: BlockHistory, newBlock: Block, onBlockAdded: (block: Block) => Promise<void>, blockRetention: number): Promise<BlockHistory> => {
51+
// this is here as a final sanity check, in case we somehow got into an unexpected state, there are no known (and should never be) ways to reach this exception
52+
if (!blockHistory.isEmpty() && blockHistory.last().hash !== newBlock.parentHash) throw new Error("New head block's parent isn't our current head.");
53+
// CONSIDER: the user getting this notification won't have any visibility into the updated block history yet. should we announce new blocks in a `setTimeout`? should we provide block history with new logs? an announcement failure will result in unwinding the stack and returning the original blockHistory, if we are in the process of backfilling we may have already announced previous blocks that won't actually end up in history (they won't get removed if a re-org occurs and may be re-announced). we can't catch errors thrown by the callback be cause it may be trying to signal to use that the block has become invalid and is un-processable
54+
await onBlockAdded(newBlock);
55+
blockHistory = blockHistory.push(newBlock);
56+
return blockHistory.takeLast(blockRetention).toList();
57+
}
58+
59+
const removeHeadBlock = async (blockHistory: BlockHistory, onBlockRemoved: (block: Block) => Promise<void>): Promise<BlockHistory> => {
60+
let removedBlock = blockHistory.last();
61+
blockHistory = blockHistory.pop();
62+
await onBlockRemoved(removedBlock);
63+
return blockHistory;
64+
}
65+
66+
const isFirstBlock = (blockHistory: BlockHistory, ): boolean => {
67+
return blockHistory.isEmpty();
68+
}
69+
70+
const isAlreadyInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => {
71+
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions
72+
return blockHistory.some(block => block!.hash === newBlock.hash);
73+
}
74+
75+
const isNewHeadBlock = (blockHistory: BlockHistory, newBlock: Block): boolean => {
76+
return blockHistory.last().hash === newBlock.parentHash;
77+
}
78+
79+
const parentHashIsInHistory = (blockHistory: BlockHistory, newBlock: Block): boolean => {
80+
// `block!` is required until the next version of `immutable` is published to NPM (current version 3.8.1) which improves the type definitions
81+
return blockHistory.some(block => block!.hash === newBlock.parentHash);
82+
}

source/callback-style-wrappers.ts

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { Block } from "./models/block";
2+
import { Log } from "./models/log";
3+
import { BlockHistory } from "./models/block-history";
4+
import { LogHistory } from "./models/log-history";
5+
import { BlockAndLogHistory } from "./models/block-and-log-history";
6+
import { Filter, FilterOptions } from "./models/filters";
7+
import { reconcileBlockHistory } from "./block-reconciler";
8+
import { reconcileLogHistoryWithAddedBlock, reconcileLogHistoryWithRemovedBlock } from "./log-reconciler";
9+
import { reconcileBlocksAndLogs } from "./block-and-log-reconciler";
10+
11+
export function reconcileBlockHistoryWithCallback(
12+
getBlockByHash: (hash: string, callback: (error?: Error, block?: Block | null) => void) => void,
13+
blockHistory: BlockHistory | null,
14+
newBlock: Block,
15+
onBlockAdded: (block: Block, callback: (error?: Error) => void) => void,
16+
onBlockRemoved: (block: Block, callback: (error?: Error) => void) => void,
17+
blockRetention: number,
18+
callback: (error?: Error, newHistory?: BlockHistory) => void,
19+
): void {
20+
const wrappedGetBlockByHash = (hash: string): Promise<Block | null> => new Promise<Block | null>((resolve, reject) => {
21+
getBlockByHash(hash, (error, block) => {
22+
if (error) reject(error);
23+
else resolve(block);
24+
});
25+
});
26+
const wrappedOnBlockAdded = (block: Block): Promise<void> => new Promise<void>((resolve, reject) => {
27+
onBlockAdded(block, (error) => {
28+
if (error) reject(error);
29+
else resolve();
30+
});
31+
});
32+
const wrappedOnBlockRemoved = (block: Block): Promise<void> => new Promise<void>((resolve, reject) => {
33+
onBlockRemoved(block, (error) => {
34+
if (error) reject(error);
35+
else resolve();
36+
});
37+
});
38+
reconcileBlockHistory(wrappedGetBlockByHash, blockHistory, newBlock, wrappedOnBlockAdded, wrappedOnBlockRemoved, blockRetention)
39+
.then(newBlockHistory => callback(undefined, newBlockHistory))
40+
.catch(error => callback(error, undefined));
41+
}
42+
43+
export function reconcileLogHistoryWithAddedBlockWithCallback(
44+
getLogs: (filterOptions: FilterOptions[], callback: (error?: Error, logs?: Log[]) => void) => void,
45+
logHistory: LogHistory | null,
46+
newBlock: Block,
47+
onLogAdded: (log: Log, callback: (error?: Error) => void) => void,
48+
filters: Filter[],
49+
blockRetention: number,
50+
callback: (error?: Error, newHistory?: LogHistory) => void,
51+
): void {
52+
const wrappedGetLogs = (filterOptions: FilterOptions[]): Promise<Log[]> => new Promise<Log[]>((resolve, reject) => {
53+
getLogs(filterOptions, (error, logs) => {
54+
if (error) reject(error);
55+
else resolve(logs);
56+
})
57+
});
58+
const wrappedOnLogAdded = (log: Log): Promise<void> => new Promise<void>((resolve, reject) => {
59+
onLogAdded(log, error => {
60+
if (error) reject(error);
61+
else resolve();
62+
});
63+
});
64+
reconcileLogHistoryWithAddedBlock(wrappedGetLogs, logHistory, newBlock, wrappedOnLogAdded, filters, blockRetention)
65+
.then(newLogHistory => callback(undefined, newLogHistory))
66+
.catch(error => callback(error, undefined));
67+
}
68+
69+
export function reconcileLogHistoryWithRemovedBlockWithCallback(
70+
logHistory: LogHistory,
71+
removedBlock: Block,
72+
onLogRemoved: (log: Log, callback: (error?: Error) => void) => void,
73+
callback: (error?: Error, logHistory?: LogHistory) => void,
74+
): void {
75+
const wrappedOnLogRemoved = (log: Log): Promise<void> => new Promise<void>((resolve, reject) => {
76+
onLogRemoved(log, error => {
77+
if (error) reject(error);
78+
else resolve();
79+
});
80+
});
81+
reconcileLogHistoryWithRemovedBlock(logHistory, removedBlock, wrappedOnLogRemoved)
82+
.then(logHistory => callback(undefined, logHistory))
83+
.catch(error => callback(error, undefined));
84+
}
85+
86+
export function reconcileBlocksAndLogsWithCallback(
87+
getBlockByHash: (hash: string, callback: (error?: Error, block?: Block | null) => void) => void,
88+
getLogs: (filterOptions: FilterOptions[], callback: (error?: Error, logs?: Log[]) => void) => void,
89+
history: BlockAndLogHistory | null,
90+
newBlock: Block,
91+
onLogAdded: (log: Log, callback: (error?: Error) => void) => void,
92+
onLogRemoved: (log: Log, callback: (error?: Error) => void) => void,
93+
filters: Filter[],
94+
blockRetention: number,
95+
callback: (error?: Error, newHistory?: BlockAndLogHistory) => void,
96+
): void {
97+
const wrappedGetBlockByHash = (hash: string): Promise<Block | null> => new Promise<Block | null>((resolve, reject) => {
98+
getBlockByHash(hash, (error, block) => {
99+
if (error) reject(error);
100+
else resolve(block);
101+
});
102+
});
103+
const wrappedGetLogs = (filterOptions: FilterOptions[]): Promise<Log[]> => new Promise<Log[]>((resolve, reject) => {
104+
getLogs(filterOptions, (error, logs) => {
105+
if (error) reject(error);
106+
else resolve(logs);
107+
})
108+
});
109+
const wrappedOnLogAdded = (log: Log): Promise<void> => new Promise<void>((resolve, reject) => {
110+
onLogAdded(log, error => {
111+
if (error) reject(error);
112+
else resolve();
113+
});
114+
});
115+
const wrappedOnLogRemoved = (log: Log): Promise<void> => new Promise<void>((resolve, reject) => {
116+
onLogRemoved(log, error => {
117+
if (error) reject(error);
118+
else resolve();
119+
});
120+
});
121+
reconcileBlocksAndLogs(wrappedGetBlockByHash, wrappedGetLogs, history, newBlock, wrappedOnLogAdded, wrappedOnLogRemoved, filters, blockRetention)
122+
.then(blockAndLogHistory => callback(undefined, blockAndLogHistory))
123+
.catch(error => callback(error, undefined));
124+
}

source/index.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
1-
export function hello() {
2-
return "Hello World!";
3-
}
1+
// import * as sourceMapSupport from "source-map-support";
2+
// sourceMapSupport.install();
43

5-
export default hello;
4+
export { Block } from "./models/block";
5+
export { Log, } from "./models/log";
6+
export { Transaction } from "./models/transaction";
7+
export { BlockHistory } from "./models/block-history";
8+
export { LogHistory } from "./models/log-history";
9+
export { BlockAndLogHistory } from "./models/block-and-log-history";
10+
export { FilterOptions } from "./models/filters";
11+
12+
export { reconcileBlockHistory } from "./block-reconciler";
13+
export { reconcileLogHistoryWithAddedBlock, reconcileLogHistoryWithRemovedBlock } from "./log-reconciler";
14+
export { reconcileBlocksAndLogs } from "./block-and-log-reconciler";
15+
export { reconcileBlockHistoryWithCallback, reconcileLogHistoryWithAddedBlockWithCallback, reconcileLogHistoryWithRemovedBlockWithCallback, reconcileBlocksAndLogsWithCallback } from "./callback-style-wrappers";

0 commit comments

Comments
 (0)