Skip to content

Commit 7ac6517

Browse files
committed
Changes logs added/removed callbacks to receive an array of all logs in a particular block.
Also fires callback with empty array when block contains no logs matching filters. Also includes blockHash with callback. This change makes it easier for consumers to know when they are done receiving logs for a particular block. With this knowledge, they can now process block + logs as a single atomic operation if they desire by saving off the block and waiting for a matching set of logs to come through (or notification of block removal in case log fetching fails). This change somewhat constrains the problem being solved by this library to just "getting ordering right", rather than trying to provide a nice interface into the stream. It isn't difficult for users of this library to split apart the logs into multiple callbacks with a _very_ tiny callback wrapper, yet this change opens the doors to a number of new use cases.
1 parent 2576905 commit 7ac6517

File tree

4 files changed

+67
-85
lines changed

4 files changed

+67
-85
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "ethereumjs-blockstream",
3-
"version": "6.0.1",
3+
"version": "7.0.0",
44
"description": "A library to turn an unreliable remote source of Ethereum blocks into a reliable stream of blocks with removals on re-orgs and backfills on skips.",
55
"main": "output/source/index.js",
66
"types": "output/source/index.d.ts",

source/block-and-log-streamer.ts

+23-23
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
3030
private readonly logFilters: { [propName: string]: Filter } = {}
3131
private readonly onBlockAddedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
3232
private readonly onBlockRemovedSubscribers: { [propName: string]: (block: TBlock) => void } = {};
33-
private readonly onLogAddedSubscribers: { [propName: string]: (log: TLog) => void } = {};
34-
private readonly onLogRemovedSubscribers: { [propName: string]: (log: TLog) => void } = {};
33+
private readonly onLogsAddedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};
34+
private readonly onLogsRemovedSubscribers: { [propName: string]: (blockHash: string, logs: Array<TLog>) => void } = {};
3535

3636
/**
3737
* @param getBlockByHash async function that returns a block given a particular hash or null/throws if the block is not found
@@ -81,12 +81,12 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
8181
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));
8282

8383
const logFilters = Object.keys(this.logFilters).map(key => this.logFilters[key]);
84-
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogAdded, logFilters, this.blockRetention);
84+
this.logHistory = reconcileLogHistoryWithAddedBlock(this.getLogs, this.logHistory, block, this.onLogsAdded, logFilters, this.blockRetention);
8585
await this.logHistory;
8686
};
8787

8888
private readonly onBlockRemoved = async (block: TBlock): Promise<void> => {
89-
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogRemoved);
89+
this.logHistory = reconcileLogHistoryWithRemovedBlock(this.logHistory, block, this.onLogsRemoved);
9090
await this.logHistory;
9191

9292
Object.keys(this.onBlockRemovedSubscribers)
@@ -95,18 +95,18 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
9595
.forEach(callback => this.pendingCallbacks.push(() => callback(block)));
9696
};
9797

98-
private readonly onLogAdded = async (log: TLog): Promise<void> => {
99-
Object.keys(this.onLogAddedSubscribers)
100-
.map((key: string) => this.onLogAddedSubscribers[key])
98+
private readonly onLogsAdded = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
99+
Object.keys(this.onLogsAddedSubscribers)
100+
.map((key: string) => this.onLogsAddedSubscribers[key])
101101
.map(callback => logAndSwallowWrapper(callback, this.onError))
102-
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
102+
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
103103
};
104104

105-
private readonly onLogRemoved = async (log: TLog): Promise<void> => {
106-
Object.keys(this.onLogRemovedSubscribers)
107-
.map((key: string) => this.onLogRemovedSubscribers[key])
105+
private readonly onLogsRemoved = async (blockHash: string, logs: Array<TLog>): Promise<void> => {
106+
Object.keys(this.onLogsRemovedSubscribers)
107+
.map((key: string) => this.onLogsRemovedSubscribers[key])
108108
.map(callback => logAndSwallowWrapper(callback, this.onError))
109-
.forEach(callback => this.pendingCallbacks.push(() => callback(log)));
109+
.forEach(callback => this.pendingCallbacks.push(() => callback(blockHash, logs)));
110110
};
111111

112112

@@ -151,34 +151,34 @@ export class BlockAndLogStreamer<TBlock extends Block, TLog extends Log> {
151151
};
152152

153153

154-
public readonly subscribeToOnLogAdded = (onLogAdded: (log: TLog) => void): string => {
154+
public readonly subscribeToOnLogsAdded = (onLogsAdded: (blockHash: string, logs: Array<TLog>) => void): string => {
155155
const uuid = `on log added token ${createUuid()}`;
156-
this.onLogAddedSubscribers[uuid] = onLogAdded;
156+
this.onLogsAddedSubscribers[uuid] = onLogsAdded;
157157
return uuid;
158158
};
159159

160-
public readonly unsubscribeFromOnLogAdded = (token: string) => {
160+
public readonly unsubscribeFromOnLogsAdded = (token: string) => {
161161
if (!token.startsWith("on log added token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
162-
delete this.onLogAddedSubscribers[token];
162+
delete this.onLogsAddedSubscribers[token];
163163
};
164164

165165

166-
public readonly subscribeToOnLogRemoved = (onLogRemoved: (log: TLog) => void): string => {
166+
public readonly subscribeToOnLogsRemoved = (onLogsRemoved: (blockHash: string, logs: Array<TLog>) => void): string => {
167167
const uuid = `on log removed token ${createUuid()}`;
168-
this.onLogRemovedSubscribers[uuid] = onLogRemoved;
168+
this.onLogsRemovedSubscribers[uuid] = onLogsRemoved;
169169
return uuid;
170170
};
171171

172-
public readonly unsubscribeFromOnLogRemoved = (token: string) => {
172+
public readonly unsubscribeFromOnLogsRemoved = (token: string) => {
173173
if (!token.startsWith("on log removed token ")) throw new Error(`Expected a log added subscription token. Actual: ${token}`);
174-
delete this.onLogRemovedSubscribers[token];
174+
delete this.onLogsRemovedSubscribers[token];
175175
};
176176
}
177177

178-
function logAndSwallowWrapper<T>(callback: (arg: T) => void, onError: (error: Error) => void): (arg: T) => void {
179-
return function (parameter) {
178+
function logAndSwallowWrapper<T, U>(callback: (arg1?: T, arg2?: U) => void, onError: (error: Error) => void): (arg1?: T, arg2?: U) => void {
179+
return function (parameter1, parameter2) {
180180
try {
181-
callback(parameter);
181+
callback(parameter1, parameter2);
182182
} catch (error) {
183183
onError(error);
184184
}

source/log-reconciler.ts

+12-13
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ export const reconcileLogHistoryWithAddedBlock = async <TBlock extends Block, TL
88
getLogs: (filterOptions: FilterOptions) => Promise<TLog[]>,
99
logHistory: LogHistory<TLog> | Promise<LogHistory<TLog>>,
1010
newBlock: TBlock,
11-
onLogAdded: (log: TLog) => Promise<void>,
11+
onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>,
1212
filters: Filter[] = [],
1313
historyBlockLength: number = 100,
1414
): Promise<LogHistory<TLog>> => {
1515
logHistory = await logHistory;
1616
const logs = await getFilteredLogs(getLogs, newBlock, filters);
17-
logHistory = await addNewLogsToHead(logHistory, logs, onLogAdded);
17+
logHistory = await addNewLogsToHead(newBlock.hash, logHistory, logs, onLogsAdded);
1818
logHistory = await pruneOldLogs(logHistory, newBlock, historyBlockLength);
1919
return logHistory;
2020
}
@@ -27,14 +27,18 @@ const getFilteredLogs = async <TBlock extends Block, TLog extends Log>(getLogs:
2727
return nestedLogs.reduce((allLogs, logs) => allLogs.concat(logs), []);
2828
}
2929

30-
const addNewLogsToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
30+
const addNewLogsToHead = async <TLog extends Log>(blockHash: string, logHistory: LogHistory<TLog>, newLogs: Array<TLog>, onLogsAdded: (blockHash: string, logs: Array<TLog>) => Promise<void>): Promise<LogHistory<TLog>> => {
3131
const sortedLogs = newLogs.sort((logA, logB) => parseHexInt(logA.logIndex) - parseHexInt(logB.logIndex));
32+
const addedLogs: Array<TLog> = []
3233
for (const logToAdd of sortedLogs) {
3334
// we may already have this log because two filters can return the same log
3435
if (logHistory.some(logInHistory => logInHistory!.blockHash === logToAdd.blockHash && logInHistory!.logIndex === logToAdd.logIndex)) continue;
3536
ensureOrder(logHistory.last(), logToAdd);
36-
logHistory = await addNewLogToHead(logHistory, logToAdd, onLogAdded);
37+
logHistory = logHistory.push(logToAdd)
38+
addedLogs.push(logToAdd)
3739
}
40+
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
41+
await onLogsAdded(blockHash, addedLogs)
3842
return logHistory;
3943
}
4044

@@ -43,13 +47,6 @@ const pruneOldLogs = async <TBlock extends Block, TLog extends Log>(logHistory:
4347
return logHistory.skipUntil(log => parseHexInt(newBlock.number) - parseHexInt(log!.blockNumber) < historyBlockLength).toList();
4448
}
4549

46-
const addNewLogToHead = async <TLog extends Log>(logHistory: LogHistory<TLog>, newLog: TLog, onLogAdded: (log: TLog) => Promise<void>): Promise<LogHistory<TLog>> => {
47-
logHistory = logHistory.push(newLog);
48-
// CONSIDER: the user getting this notification won't have any visibility into the updated log history yet. should we announce new logs in a `setTimeout`? should we provide log history with new logs?
49-
await onLogAdded(newLog);
50-
return logHistory;
51-
}
52-
5350
const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog) => {
5451
if (headLog === undefined) return;
5552
const headBlockNumber = parseHexInt(headLog.blockNumber);
@@ -64,14 +61,16 @@ const ensureOrder = <TLog extends Log>(headLog: TLog | undefined, newLog: TLog)
6461
export const reconcileLogHistoryWithRemovedBlock = async <TBlock extends Block, TLog extends Log>(
6562
logHistory: LogHistory<TLog>|Promise<LogHistory<TLog>>,
6663
removedBlock: TBlock,
67-
onLogRemoved: (log: TLog) => Promise<void>,
64+
onLogsRemoved: (blockHash: string, logs: Array<TLog>) => Promise<void>,
6865
): Promise<LogHistory<TLog>> => {
6966
logHistory = await logHistory;
7067

68+
const removedLogs = []
7169
while (!logHistory.isEmpty() && logHistory.last().blockHash === removedBlock.hash) {
72-
await onLogRemoved(logHistory.last());
70+
removedLogs.push(logHistory.last());
7371
logHistory = logHistory.pop();
7472
}
73+
await onLogsRemoved(removedBlock.hash, removedLogs);
7574

7675
// sanity check, no known way to trigger the error
7776
if (logHistory.some(log => log!.blockHash === removedBlock.hash)) throw new Error("found logs for removed block not at head of log history");

0 commit comments

Comments
 (0)