-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.js
More file actions
130 lines (116 loc) · 3.21 KB
/
client.js
File metadata and controls
130 lines (116 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
const { LaserstreamClient: NapiClient, CommitmentLevel, shutdownAllStreams, getActiveStreamCount } = require('./index');
const { initProtobuf, decodeSubscribeUpdate, decodeSubscribePreprocessedUpdate } = require('./proto-decoder');
// Compression algorithms enum
const CompressionAlgorithms = {
identity: 0,
deflate: 1,
gzip: 2,
zstd: 3 // zstd is supported in our Rust NAPI bindings
};
// Initialize protobuf on module load
let protobufInitialized = false;
async function ensureProtobufInitialized() {
if (!protobufInitialized) {
await initProtobuf();
protobufInitialized = true;
}
}
// Single subscribe function using NAPI directly
async function subscribe(config, request, onData, onError) {
// Ensure protobuf is initialized
await ensureProtobufInitialized();
// Create NAPI client instance directly
const napiClient = new NapiClient(
config.endpoint,
config.apiKey,
config.maxReconnectAttempts,
config.channelOptions,
config.replay
);
// Wrap the callbacks to decode protobuf bytes
const wrappedCallback = (error, updateBytes) => {
if (error) {
if (onError) {
onError(error);
}
return;
}
try {
// Decode the protobuf bytes to JavaScript object
const decodedUpdate = decodeSubscribeUpdate(updateBytes);
if (onData) {
onData(decodedUpdate);
}
} catch (decodeError) {
if (onError) {
onError(decodeError);
}
}
};
// Call the NAPI client directly with the wrapped callback
try {
const streamHandle = await napiClient.subscribe(request, wrappedCallback);
return streamHandle;
} catch (error) {
if (onError) {
onError(error);
}
throw error;
}
}
// Subscribe to preprocessed transactions
async function subscribePreprocessed(config, request, onData, onError) {
// Ensure protobuf is initialized
await ensureProtobufInitialized();
// Create NAPI client instance directly
const napiClient = new NapiClient(
config.endpoint,
config.apiKey,
config.maxReconnectAttempts,
config.channelOptions,
false // replay is not used for preprocessed subscriptions
);
// Wrap the callbacks to decode protobuf bytes
const wrappedCallback = (error, updateBytes) => {
if (error) {
if (onError) {
onError(error);
}
return;
}
try {
// Decode the preprocessed protobuf bytes to JavaScript object
const decodedUpdate = decodeSubscribePreprocessedUpdate(updateBytes);
if (onData) {
onData(decodedUpdate);
}
} catch (decodeError) {
if (onError) {
onError(decodeError);
}
}
};
// Call the NAPI client's subscribePreprocessed method
try {
const streamHandle = await napiClient.subscribePreprocessed(request, wrappedCallback);
return streamHandle;
} catch (error) {
if (onError) {
onError(error);
}
throw error;
}
}
// Export clean API with only NAPI-based subscribe
module.exports = {
subscribe,
subscribePreprocessed,
CommitmentLevel,
CompressionAlgorithms,
initProtobuf,
decodeSubscribeUpdate,
decodeSubscribePreprocessedUpdate,
// re-export lifecycle helpers from native binding
shutdownAllStreams,
getActiveStreamCount,
};