Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 243 additions & 30 deletions sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const ORACLE_DEFAULT_ID = getOracleId(
export class WebSocketDriftClientAccountSubscriberV2
implements DriftClientAccountSubscriber
{
// Global counters for diagnostics
static totalSocketsOpened = 0;
static currentActiveSockets = 0;
static instancesCreated = 0;
static subscribeCalls = 0;
isSubscribed: boolean;
program: Program;
commitment?: Commitment;
Expand Down Expand Up @@ -82,6 +87,12 @@ export class WebSocketDriftClientAccountSubscriberV2
spotOracleMap = new Map<number, PublicKey>();
spotOracleStringMap = new Map<number, string>();
oracleSubscribers = new Map<string, AccountSubscriber<OraclePriceData>>();
// Track in-flight oracle subscriptions to prevent duplicate subscriptions/races
private oracleSubscribePromises = new Map<string, Promise<boolean>>();
// Debounced refresh handles to avoid side-effects in getters
private perpOracleMapRefreshTimeout?: ReturnType<typeof setTimeout>;
private spotOracleMapRefreshTimeout?: ReturnType<typeof setTimeout>;
private oracleReconcileTimeout?: ReturnType<typeof setTimeout>;
delistedMarketSetting: DelistedMarketSetting;

initialPerpMarketAccountData: Map<number, PerpMarketAccount>;
Expand Down Expand Up @@ -124,10 +135,108 @@ export class WebSocketDriftClientAccountSubscriberV2
});
this.rpc = rpc;
this.rpcSubscriptions = rpcSubscriptions;

// Instance counter
WebSocketDriftClientAccountSubscriberV2.instancesCreated += 1;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS] WebSocketDriftClientAccountSubscriberV2 instance created #${WebSocketDriftClientAccountSubscriberV2.instancesCreated}`
);
}

// Monkey-patch rpcSubscriptions internals for minimal WS open/close logging
try {
const subsAny = this.rpcSubscriptions as any;
const PATCHED_FLAG = '__driftWsPatched';

const attachLoggingToWs = (ws: any) => {
if (!ws || ws.__driftPatched) return;
ws.__driftPatched = true;
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened += 1;
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets += 1;
const readyState = ws.readyState;
const activeSubs =
(subsAny?._activeSubscriptions && subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS OPEN] readyState=${readyState} activeSubs=${activeSubs} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
const onClose = () => {
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets = Math.max(
0,
WebSocketDriftClientAccountSubscriberV2.currentActiveSockets - 1
);
const rs = ws.readyState;
const act =
(subsAny?._activeSubscriptions &&
subsAny._activeSubscriptions.size) || 0;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS CLOSE] readyState=${rs} activeSubs=${act} totalOpened=${WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened} currentActive=${WebSocketDriftClientAccountSubscriberV2.currentActiveSockets}`
);
}
};
// Support browser/node ws
if (typeof ws.addEventListener === 'function') {
ws.addEventListener('close', onClose);
}
if (typeof ws.on === 'function') {
ws.on('close', onClose);
}
};

if (!subsAny[PATCHED_FLAG]) {
subsAny[PATCHED_FLAG] = true;
const originalConnect = subsAny._connectWebsocket;
if (typeof originalConnect === 'function') {
subsAny._connectWebsocket = (...args: any[]) => {
const prevWs = subsAny._ws;
const result = originalConnect.apply(subsAny, args);
const hook = () => {
try {
const ws = subsAny._ws;
// Only treat as new if object identity changed or not patched yet
if (ws && ws !== prevWs) {
// may not be open yet; still attach listeners and count as opened
attachLoggingToWs(ws);
}
} catch (_e) {
// swallow
}
};
try {
if (result && typeof result.then === 'function') {
result.then(() => hook()).catch(() => hook());
} else {
queueMicrotask(hook);
}
} catch (_e) {
setTimeout(hook, 0);
}
return result;
};
}

// Fallback: if a WS already exists, attach once and treat as opened
if (subsAny._ws && !subsAny._ws.__driftPatched) {
attachLoggingToWs(subsAny._ws);
}
}
Comment on lines +147 to +225
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🌐 Web query:

gill npm RpcSubscriptions _connectWebsocket _ws API documentation v0.10.2

💡 Result:

In gill v0.10.2, the rpcSubscriptions object you get from createSolanaClient() is not a Gill-specific RpcSubscriptions class with documented internals—it’s the Solana Kit (@solana/kit) RPC Subscriptions client that Gill wraps/re-exports. Gill’s docs describe creating rpcSubscriptions, but they don’t document private internals like _connectWebsocket or _ws. [1] [2]

What is documented / supported

  • Gill: create a client that includes rpcSubscriptions:
    • const { rpc, rpcSubscriptions } = createSolanaClient({ urlOrMoniker }) (Gill docs / README-level guidance). [1] [2]
  • Solana Kit: create subscriptions directly via:
    • createSolanaRpcSubscriptions('ws://...')
    • and consume subscriptions as AsyncIterators (for await...of). [3]

Where the subscriptions API is specified

  • The typed Subscriptions surface area (what methods exist and how subscription execution is shaped) is described by @solana/rpc-subscriptions-spec (and re-exported via Kit). [4]

About _connectWebsocket / _ws

Those names look like private implementation details (underscore-prefixed) and are not part of the public, stable API in the Kit/Gill docs—so there isn’t official “API documentation” for them; the supported approach is to use the exported creators (e.g. createSolanaRpcSubscriptions, or the transport/channel factory functions shown in Kit docs). [3]

Sources
[1] Gill API Reference (overview / install / exports): (gillsdk.com)
[2] Gill package overview (rpc + rpcSubscriptions via createSolanaClient): (socket.dev)
[3] Anza Kit docs (RPC Subscriptions, createSolanaRpcSubscriptions, AsyncIterator usage, transport customization): (github.com)
[4] @solana/rpc-subscriptions-spec (purpose + usage pattern around createSolanaRpcSubscriptions): (npmjs.com)

Citations:


🌐 Web query:

gill library websocket public API alternatives

💡 Result:

If you’re using gill (Solana JS/TS SDK) and want alternatives for WebSocket (PubSub) / “public API” access, here are the main options:

  1. Use Solana’s standard JSON-RPC PubSub WebSockets directly (no SDK required)
    Solana exposes WebSocket subscription methods like accountSubscribe, logsSubscribe, programSubscribe, etc. via the RPC PubSub endpoint. [1], [2]

  2. Use Anza’s @solana/kit / @solana/rpc-subscriptions (modern, low-level primitives)
    Provides a TypeScript interface over Solana RPC subscriptions and supports the standard Solana WebSocket methods. [3]

  3. Use a managed Solana WebSocket RPC provider (standard + value-add streams)

  • Helius Enhanced WebSockets (transaction/account streaming + filtering; their docs distinguish Enhanced vs Standard Solana WebSockets). [4], [5]
  • Surfpool WebSockets (documents Solana-style subscription endpoints like logsSubscribe, programSubscribe, blockSubscribe). [6]
  • SolanaTracker WebSockets (documents accountSubscribe over WSS with API key). [7]

Sources
[1] Solana RPC WebSocket methods overview: (solana.com)
[2] Example method detail (accountSubscribe): (solana.com)
[3] @solana/kit RPC subscriptions: (solanakit.com)
[4] Helius WebSockets/Webhooks product page: (helius.dev)
[5] Helius Enhanced WebSockets docs: (helius.dev)
[6] Surfpool WebSocket RPC docs: (docs.surfpool.run)
[7] SolanaTracker accountSubscribe docs: (docs.solanatracker.io)

Citations:


🏁 Script executed:

# Search for any references to gill in the codebase to understand version and usage
rg "gill" --type json,ts --max-count=20

Repository: drift-labs/protocol-v2

Length of output: 97


🏁 Script executed:

# Look for package.json to see gill dependency version
find . -name "package.json" -type f -exec grep -l "gill" {} \;

Repository: drift-labs/protocol-v2

Length of output: 84


🏁 Script executed:

# Check the gill version in package.json
cat ./sdk/package.json | grep -A 2 -B 2 "gill"

Repository: drift-labs/protocol-v2

Length of output: 204


🏁 Script executed:

# Look for any existing public API usage patterns for rpcSubscriptions in the codebase
rg "createSolanaRpc|rpcSubscriptions" sdk/src --type ts -B 2 -A 2 | head -40

Repository: drift-labs/protocol-v2

Length of output: 3218


Refactor to use documented public APIs instead of monkey-patching private internals.

Relying on _connectWebsocket, _ws, and _activeSubscriptions (lines 191, 198, 159) couples this code to undocumented private implementation details of @solana/kit's RPC Subscriptions client, which Gill wraps. These fields are not part of any stable API contract and can change without notice in dependency updates. The broad try/catch (lines 209, 216, 222, 224) will silently swallow breaking changes, making failures difficult to diagnose.

Instead, use the public subscription methods (rpcSubscriptions.accountNotifications(), etc.) and their AsyncIterator interface, or create a custom transport/observer wrapper via the documented Solana Kit patterns for instrumentation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 147
- 225, The code is monkey-patching private internals (_connectWebsocket, _ws,
_activeSubscriptions) of rpcSubscriptions to track WS open/close which is
brittle and swallows errors; replace this with instrumentation that uses the
public RPC subscription APIs (e.g., rpcSubscriptions.accountNotifications() or
other public subscription methods) or a documented transport/observer wrapper so
you no longer touch private symbols. Concretely, remove the monkey-patch logic
(the PATCHED_FLAG branch and attachLoggingToWs usage), and instead hook into the
public AsyncIterator/event interface returned by
rpcSubscriptions.accountNotifications() (or the equivalent public subscription
factory) to increment/decrement
WebSocketDriftClientAccountSubscriberV2.totalSocketsOpened and
currentActiveSockets and emit the same log messages; ensure errors are surfaced
(no broad swallowing try/catch) and keep logging gated by
this.resubOpts?.logResubMessages.

} catch (_e) {
// swallow
}
}

public async subscribe(): Promise<boolean> {
try {
// Subscribe call counter
WebSocketDriftClientAccountSubscriberV2.subscribeCalls += 1;
if (this.resubOpts?.logResubMessages) {
console.log(
`[WS] WebSocketDriftClientAccountSubscriberV2.subscribe() call #${WebSocketDriftClientAccountSubscriberV2.subscribeCalls}`
);
}
const startTime = performance.now();
if (this.isSubscribed) {
console.log(
Expand Down Expand Up @@ -312,21 +421,6 @@ export class WebSocketDriftClientAccountSubscriberV2
})(),
]);

// const initialPerpMarketDataFromLatestData = new Map(
// Array.from(this.perpMarketAccountLatestData.values()).map((data) => [
// data.data.marketIndex,
// data.data,
// ])
// );
// const initialSpotMarketDataFromLatestData = new Map(
// Array.from(this.spotMarketAccountLatestData.values()).map((data) => [
// data.data.marketIndex,
// data.data,
// ])
// );
// this.initialPerpMarketAccountData = initialPerpMarketDataFromLatestData;
// this.initialSpotMarketAccountData = initialSpotMarketDataFromLatestData;

await this.handleDelistedMarketOracles();

await Promise.all([this.setPerpOracleMap(), this.setSpotOracleMap()]);
Expand Down Expand Up @@ -355,6 +449,17 @@ export class WebSocketDriftClientAccountSubscriberV2
console.error('Subscription failed:', error);
this.isSubscribing = false;
this.subscriptionPromiseResolver(false);
// Best-effort cleanup of any partially created subscriptions to avoid leaks
try {
await Promise.all([
this.stateAccountSubscriber?.unsubscribe(),
this.perpMarketAllAccountsSubscriber?.unsubscribe(),
this.spotMarketAllAccountsSubscriber?.unsubscribe(),
this.unsubscribeFromOracles(),
]);
Comment on lines +452 to +459
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Failure cleanup unsubscribes sockets but leaves stale oracle bookkeeping maps populated.

Line 458 cleanup calls unsubscribeFromOracles(), but there’s no map clear here; stale entries in oracleSubscribers / oracleSubscribePromises can cause future subscribeToOracles() to skip required subscriptions.

🔧 Proposed fix
 async unsubscribeFromOracles(): Promise<void> {
-  await Promise.all(
-    Array.from(this.oracleSubscribers.values()).map((accountSubscriber) =>
-      accountSubscriber.unsubscribe()
-    )
-  );
+  const subscribers = Array.from(this.oracleSubscribers.values());
+  await Promise.all(subscribers.map((accountSubscriber) => accountSubscriber.unsubscribe()));
+  this.oracleSubscribers.clear();
+  this.oracleSubscribePromises.clear();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 452
- 459, The unsubscribeFromOracles() call only unsubscribes sockets but doesn't
clear the oracle bookkeeping maps, leaving stale entries in oracleSubscribers
and oracleSubscribePromises that make future subscribeToOracles() skip
subscriptions; update unsubscribeFromOracles() (or add a clear step immediately
after its call in the cleanup block) to also clear oracleSubscribers and
oracleSubscribePromises (e.g., call oracleSubscribers.clear() and
oracleSubscribePromises.clear() or reset them) so state is fully cleaned and
subscribeToOracles() can re-subscribe correctly.

} catch (_cleanupErr) {
// swallow cleanup errors
}
return false;
}
}
Expand Down Expand Up @@ -495,6 +600,22 @@ export class WebSocketDriftClientAccountSubscriberV2
try {
const oracleId = getOracleId(oracleInfo.publicKey, oracleInfo.source);

// If already subscribed, return
if (this.oracleSubscribers.has(oracleId)) {
return true;
}
// If a subscribe is already in-flight for this oracle, reuse it
const inFlight = this.oracleSubscribePromises.get(oracleId);
if (inFlight) {
return await inFlight;
}

const subscribePromise = (async (): Promise<boolean> => {
// Double-check after we were scheduled
if (this.oracleSubscribers.has(oracleId)) {
return true;
}

const client = this.oracleClientCache.get(
oracleInfo.source,
this.program.provider.connection,
Expand All @@ -517,19 +638,25 @@ export class WebSocketDriftClientAccountSubscriberV2
if (initialOraclePriceData) {
accountSubscriber.setData(initialOraclePriceData);
}
await accountSubscriber.subscribe((data: OraclePriceData) => {
this.eventEmitter.emit(
'oraclePriceUpdate',
oracleInfo.publicKey,
oracleInfo.source,
data
);
this.eventEmitter.emit('update');
});
await accountSubscriber.subscribe((data: OraclePriceData) => {
this.eventEmitter.emit(
'oraclePriceUpdate',
oracleInfo.publicKey,
oracleInfo.source,
data
);
this.eventEmitter.emit('update');
});

this.oracleSubscribers.set(oracleId, accountSubscriber);
// Only after successful subscribe, store in map
this.oracleSubscribers.set(oracleId, accountSubscriber);
return true;
})().finally(() => {
this.oracleSubscribePromises.delete(oracleId);
});

return true;
this.oracleSubscribePromises.set(oracleId, subscribePromise);
return await subscribePromise;
} catch (error) {
console.error(
`Failed to subscribe to oracle ${oracleInfo.publicKey.toString()}:`,
Expand Down Expand Up @@ -611,6 +738,8 @@ export class WebSocketDriftClientAccountSubscriberV2
this.perpOracleStringMap.set(perpMarketIndex, oracleId);
}
await Promise.all(addOraclePromises);
// Debounce oracle reconciliation to run once after both perp/spot map updates
this.scheduleOracleReconcile();
}

async setSpotOracleMap() {
Expand All @@ -636,6 +765,8 @@ export class WebSocketDriftClientAccountSubscriberV2
this.spotOracleStringMap.set(spotMarketIndex, oracleId);
}
await Promise.all(addOraclePromises);
// Debounce oracle reconciliation to run once after both perp/spot map updates
this.scheduleOracleReconcile();
}

async handleDelistedMarketOracles(): Promise<void> {
Expand Down Expand Up @@ -710,6 +841,7 @@ export class WebSocketDriftClientAccountSubscriberV2
public getOraclePriceDataAndSlotForPerpMarket(
marketIndex: number
): DataAndSlot<OraclePriceData> | undefined {
this.assertIsSubscribed();
const perpMarketAccount = this.getMarketAccountAndSlot(marketIndex);
const oracle = this.perpOracleMap.get(marketIndex);
const oracleId = this.perpOracleStringMap.get(marketIndex);
Expand All @@ -718,8 +850,8 @@ export class WebSocketDriftClientAccountSubscriberV2
}

if (!perpMarketAccount.data.amm.oracle.equals(oracle)) {
// If the oracle has changed, we need to update the oracle map in background
this.setPerpOracleMap();
// Schedule a debounced oracle map refresh instead of immediate side-effect
this.schedulePerpOracleMapRefresh();
}

return this.getOraclePriceDataAndSlot(oracleId);
Expand All @@ -728,6 +860,7 @@ export class WebSocketDriftClientAccountSubscriberV2
public getOraclePriceDataAndSlotForSpotMarket(
marketIndex: number
): DataAndSlot<OraclePriceData> | undefined {
this.assertIsSubscribed();
const spotMarketAccount = this.getSpotMarketAccountAndSlot(marketIndex);
const oracle = this.spotOracleMap.get(marketIndex);
const oracleId = this.spotOracleStringMap.get(marketIndex);
Expand All @@ -736,10 +869,90 @@ export class WebSocketDriftClientAccountSubscriberV2
}

if (!spotMarketAccount.data.oracle.equals(oracle)) {
// If the oracle has changed, we need to update the oracle map in background
this.setSpotOracleMap();
// Schedule a debounced oracle map refresh instead of immediate side-effect
this.scheduleSpotOracleMapRefresh();
}

return this.getOraclePriceDataAndSlot(oracleId);
}

/**
* Debounced refreshers to avoid side-effects in getters and reduce races
*/
private schedulePerpOracleMapRefresh(): void {
if (this.perpOracleMapRefreshTimeout) {
clearTimeout(this.perpOracleMapRefreshTimeout);
}
this.perpOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setPerpOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleSpotOracleMapRefresh(): void {
if (this.spotOracleMapRefreshTimeout) {
clearTimeout(this.spotOracleMapRefreshTimeout);
}
this.spotOracleMapRefreshTimeout = setTimeout(async () => {
try {
await this.setSpotOracleMap();
} catch (_e) {
// swallow
}
}, 50);
}

private scheduleOracleReconcile(): void {
if (this.oracleReconcileTimeout) {
clearTimeout(this.oracleReconcileTimeout);
}
this.oracleReconcileTimeout = setTimeout(async () => {
try {
await this.reconcileOracleSubscribers();
} catch (_e) {
// swallow
}
}, 75);
}
Comment on lines +882 to +919
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Debounced callbacks can run after unsubscribe and reopen oracle subscriptions (leak risk).

Line 886, Line 899, and Line 912 schedule async work that can still fire after unsubscribe(). Those callbacks call set*OracleMap()/reconcileOracleSubscribers(), which can call addOracle() and resubscribe while the instance is supposed to be torn down.

🔧 Proposed fix
 private schedulePerpOracleMapRefresh(): void {
+  if (!this.isSubscribed) return;
   if (this.perpOracleMapRefreshTimeout) {
     clearTimeout(this.perpOracleMapRefreshTimeout);
   }
   this.perpOracleMapRefreshTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.setPerpOracleMap();
     } catch (_e) {
       // swallow
+    } finally {
+      this.perpOracleMapRefreshTimeout = undefined;
     }
   }, 50);
 }

 private scheduleSpotOracleMapRefresh(): void {
+  if (!this.isSubscribed) return;
   if (this.spotOracleMapRefreshTimeout) {
     clearTimeout(this.spotOracleMapRefreshTimeout);
   }
   this.spotOracleMapRefreshTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.setSpotOracleMap();
     } catch (_e) {
       // swallow
+    } finally {
+      this.spotOracleMapRefreshTimeout = undefined;
     }
   }, 50);
 }

 private scheduleOracleReconcile(): void {
+  if (!this.isSubscribed) return;
   if (this.oracleReconcileTimeout) {
     clearTimeout(this.oracleReconcileTimeout);
   }
   this.oracleReconcileTimeout = setTimeout(async () => {
+    if (!this.isSubscribed) return;
     try {
       await this.reconcileOracleSubscribers();
     } catch (_e) {
       // swallow
+    } finally {
+      this.oracleReconcileTimeout = undefined;
     }
   }, 75);
 }
 public async unsubscribe(): Promise<void> {
   if (!this.isSubscribed) {
     return;
   }

+  if (this.perpOracleMapRefreshTimeout) clearTimeout(this.perpOracleMapRefreshTimeout);
+  if (this.spotOracleMapRefreshTimeout) clearTimeout(this.spotOracleMapRefreshTimeout);
+  if (this.oracleReconcileTimeout) clearTimeout(this.oracleReconcileTimeout);
+  this.perpOracleMapRefreshTimeout = undefined;
+  this.spotOracleMapRefreshTimeout = undefined;
+  this.oracleReconcileTimeout = undefined;
+
   if (this.subscriptionPromise) {
     await this.subscriptionPromise;
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sdk/src/accounts/webSocketDriftClientAccountSubscriberV2.ts` around lines 882
- 919, The scheduled async callbacks in schedulePerpOracleMapRefresh,
scheduleSpotOracleMapRefresh, and scheduleOracleReconcile can run after
unsubscribe() and re-open subscriptions; change each scheduled handler to no-op
if the instance is torn down by adding a guard (e.g., check a boolean flag like
this.isClosed or this.unsubscribed) before calling setPerpOracleMap,
setSpotOracleMap, or reconcileOracleSubscribers, and ensure unsubscribe() sets
that flag and clears perpOracleMapRefreshTimeout, spotOracleMapRefreshTimeout,
and oracleReconcileTimeout; alternatively capture the timeout id in a local
variable and verify it still matches the corresponding this.*Timeout before
invoking the async work so addOracle/resubscribe cannot run after teardown.


/**
* Ensure we only keep oracle subscriptions that are currently referenced by any market.
* Subscribe missing ones (if any remained due to race), and unsubscribe extras.
*/
private async reconcileOracleSubscribers(): Promise<void> {
// Build desired set from current maps
const desired = new Set<string>();
for (const id of this.perpOracleStringMap.values()) {
if (id) desired.add(id);
}
for (const id of this.spotOracleStringMap.values()) {
if (id) desired.add(id);
}

// Subscribe to any desired oracles that aren't yet active or in-flight
const subscribePromises: Promise<boolean>[] = [];
for (const id of desired) {
if (!this.oracleSubscribers.has(id) && !this.oracleSubscribePromises.has(id)) {
const { publicKey, source } = getPublicKeyAndSourceFromOracleId(id);
// Skip default oracle id
if (!publicKey.equals(PublicKey.default)) {
subscribePromises.push(this.addOracle({ publicKey, source }));
}
}
}
await Promise.all(subscribePromises);

// Unsubscribe from any oracles that are no longer desired
const unsubscribePromises: Promise<void>[] = [];
for (const [id, sub] of this.oracleSubscribers.entries()) {
if (!desired.has(id)) {
unsubscribePromises.push(sub.unsubscribe());
this.oracleSubscribers.delete(id);
}
}
await Promise.all(unsubscribePromises);
}
}
Loading