diff --git a/.gitignore b/.gitignore index e79f203..ba6096d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ dist *.log .DS_Store .eslintcache + +# Archive folder for test examples +examples-archive/ diff --git a/README.md b/README.md index 0e70c5d..d71ff2b 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,11 @@ With this method, you can send a transaction and receive extremely fast response - **Viem Integration:** Built on top of Viem for robust and type-safe interactions with the blockchain. - **WebSocket Transport:** Includes a custom WebSocket transport for real-time Shreds monitoring. - **Fast Response Times:** Achieve transaction confirmations as low as 5ms when close to the sequencer. +- **Enhanced Reliability:** + - **Automatic Reconnection:** Exponential backoff reconnection for resilient connections + - **Connection Status Tracking:** Real-time WebSocket connection health monitoring + - **Request Queuing:** Priority-based request queuing with automatic retry during network interruptions + - **Dynamic Subscription Management:** Add/remove addresses and pause/resume subscriptions without interruption ## Installation @@ -202,6 +207,155 @@ const receipt = await client.sendRawTransactionSync({ }) ``` +### Connection Management + +The Shred client now includes built-in connection monitoring and resilience features: + +#### Monitoring Connection Status + +```typescript +import { createPublicShredClient, shredsWebSocket } from 'shreds/viem' + +const client = createPublicShredClient({ + transport: shredsWebSocket('ws://your-endpoint'), +}) + +// Check connection status +console.log(client.isConnected()) // true/false +console.log(client.getConnectionStatus()) // 'connecting' | 'connected' | 'disconnected' | 'error' + +// Get detailed connection statistics +const stats = client.getConnectionStats() +console.log(stats) +// { +// status: 'connected', +// connectedAt: 1234567890, +// reconnectAttempts: 0, +// totalConnections: 1, +// totalDisconnections: 0 +// } + +// Subscribe to connection changes +const unsubscribe = client.onConnectionChange((status) => { + console.log('Connection status changed:', status) +}) + +// Wait for connection with timeout +await client.waitForConnection(30000) // 30 second timeout +``` + +#### Configuring Reconnection + +By default, the WebSocket transport will automatically reconnect with exponential backoff: + +```typescript +const client = createPublicShredClient({ + transport: shredsWebSocket('ws://your-endpoint', { + // Reconnection is enabled by default with these settings: + reconnect: { + attempts: 5, // Try 5 times + delay: 2000, // Start with 2s delay + }, + // Delays will be: 2s → 4s → 8s → 16s → 30s (capped) + }), +}) + +// Disable reconnection +const clientNoReconnect = createPublicShredClient({ + transport: shredsWebSocket('ws://your-endpoint', { + reconnect: false, + }), +}) + +// Custom reconnection settings +const clientCustom = createPublicShredClient({ + transport: shredsWebSocket('ws://your-endpoint', { + reconnect: { + attempts: 10, // More attempts + delay: 5000, // Start with 5s delay + }, + keepAlive: { + interval: 10000, // Ping every 10s + }, + }), +}) +``` + +### Dynamic Subscription Management + +Manage subscriptions dynamically by adding/removing addresses or pausing event processing: + +```typescript +// Create a managed subscription +const { subscription } = await client.watchContractShredEvent({ + managed: true, // Enable dynamic management + buffered: true, // Buffer events during updates + abi: contractAbi, + eventName: 'Transfer', + address: [], // Start with no addresses + onLogs: (logs) => { + console.log('Transfer events:', logs); + } +}); + +// Dynamically add addresses +await subscription.addAddress('0x123...'); +await subscription.addAddress('0x456...'); + +// Remove an address +await subscription.removeAddress('0x123...'); + +// Pause/resume event processing +subscription.pause(); +// Events are buffered while paused +subscription.resume(); +// Buffered events are delivered + +// Get statistics +const stats = subscription.getStats(); +console.log(`Events received: ${stats.eventCount}`); + +// Unsubscribe when done +await subscription.unsubscribe(); +``` + +### Request Queuing + +Queue requests to handle network disruptions gracefully: + +```typescript +// Queue a high-priority request +await client.queueRequest({ + method: 'eth_sendRawTransaction', + params: [signedTx], + priority: 'high', + onSuccess: (result) => { + console.log('Transaction sent:', result); + }, + onError: (error) => { + console.error('Transaction failed:', error); + } +}); + +// Monitor queue statistics +const stats = client.getQueueStats(); +console.log(`Queued: ${stats.queueSize}, Processing: ${stats.processing}`); +console.log(`Success rate: ${(stats.processed / (stats.processed + stats.failed) * 100).toFixed(2)}%`); + +// Control queue processing +client.pauseQueue(); // Pause processing +client.resumeQueue(); // Resume processing + +// View queued requests +const requests = client.getQueuedRequests(); +requests.forEach(req => { + console.log(`[${req.priority}] ${req.method} - retry ${req.retryCount}/${req.maxRetries}`); +}); + +// Clear all queued requests +client.clearQueue(); +``` + ## Development To set up the development environment: diff --git a/src/viem/actions/shred/watchContractShredEvent.ts b/src/viem/actions/shred/watchContractShredEvent.ts index a648813..5b47470 100644 --- a/src/viem/actions/shred/watchContractShredEvent.ts +++ b/src/viem/actions/shred/watchContractShredEvent.ts @@ -13,8 +13,10 @@ import { type LogTopic, type Transport, } from 'viem' +import { getSubscriptionManager } from '../../utils/subscription/manager' import type { ShredsWebSocketTransport } from '../../clients/transports/shredsWebSocket' import type { ShredLog } from '../../types/log' +import type { ManagedSubscription } from '../../utils/subscription/types' import type { Abi, Address, ExtractAbiEvent } from 'abitype' /** @@ -78,12 +80,19 @@ export type WatchContractShredEventParameters< * @default false */ strict?: strict | boolean | undefined + /** Whether to create a managed subscription that supports dynamic updates. */ + managed?: boolean | undefined + /** Whether to buffer events during subscription updates (only with managed: true). */ + buffered?: boolean | undefined } /** * Return type for {@link watchContractShredEvent}. */ -export type WatchContractShredEventReturnType = () => void +export type WatchContractShredEventReturnType = (() => void) & { + /** The managed subscription object, only present when managed: true */ + subscription?: ManagedSubscription | undefined +} /** * Watches and returns emitted contract events that have been processed and confirmed as shreds @@ -93,7 +102,7 @@ export type WatchContractShredEventReturnType = () => void * @param parameters - {@link WatchContractShredEventParameters} * @returns A function that can be used to unsubscribe from the event. {@link WatchContractShredEventReturnType} */ -export function watchContractShredEvent< +export async function watchContractShredEvent< chain extends Chain | undefined, const abi_ extends Abi | readonly unknown[], eventName_ extends ContractEventName | undefined = undefined, @@ -106,7 +115,7 @@ export function watchContractShredEvent< >( client: Client, parameters: WatchContractShredEventParameters, -): WatchContractShredEventReturnType { +): Promise { const { abi, address, @@ -115,6 +124,8 @@ export function watchContractShredEvent< onError, onLogs, strict: strict_, + managed, + buffered, } = parameters const transport_ = (() => { @@ -202,5 +213,29 @@ export function watchContractShredEvent< })() return () => unsubscribe() } - return subscribeShredContractEvent() + // Handle managed subscriptions + if (managed) { + const manager = getSubscriptionManager() + const subscription = await manager.createManagedSubscription(client, { + abi, + address, + args, + eventName, + onError, + onLogs, + strict: strict_, + buffered, + }) + + // Return enhanced unsubscribe with subscription property + const enhancedUnsubscribe = Object.assign( + () => subscription.unsubscribe(), + { subscription }, + ) as WatchContractShredEventReturnType + + return enhancedUnsubscribe + } + + // Regular subscription (backward compatible) + return subscribeShredContractEvent() as WatchContractShredEventReturnType } diff --git a/src/viem/actions/shred/watchShredEvent.ts b/src/viem/actions/shred/watchShredEvent.ts index 6a2db7a..e3c9c48 100644 --- a/src/viem/actions/shred/watchShredEvent.ts +++ b/src/viem/actions/shred/watchShredEvent.ts @@ -15,8 +15,10 @@ import { type MaybeExtractEventArgsFromAbi, type Transport, } from 'viem' +import { getSubscriptionManager } from '../../utils/subscription/manager' import type { ShredsWebSocketTransport } from '../../clients/transports/shredsWebSocket' import type { ShredLog } from '../../types/log' +import type { ManagedSubscription } from '../../utils/subscription/types' /** * The parameter for the `onLogs` callback in {@link watchShredEvent}. @@ -66,6 +68,10 @@ export type WatchShredEventParameters< onError?: ((error: Error) => void) | undefined /** The callback to call when new event logs are received. */ onLogs: WatchShredEventOnLogsFn + /** Whether to create a managed subscription that supports dynamic updates. */ + managed?: boolean | undefined + /** Whether to buffer events during subscription updates (only with managed: true). */ + buffered?: boolean | undefined } & ( | { event: abiEvent @@ -98,7 +104,10 @@ export type WatchShredEventParameters< /** * Return type for {@link watchShredEvent}. */ -export type WatchShredEventReturnType = () => void +export type WatchShredEventReturnType = (() => void) & { + /** The managed subscription object, only present when managed: true */ + subscription?: ManagedSubscription | undefined +} /** * Watches and returns emitted events that have been processed and confirmed as shreds @@ -108,7 +117,7 @@ export type WatchShredEventReturnType = () => void * @param parameters - {@link WatchShredEventParameters} * @returns A function that can be used to unsubscribe from the event. {@link WatchShredEventReturnType} */ -export function watchShredEvent< +export async function watchShredEvent< chain extends Chain | undefined, const abiEvent extends AbiEvent | undefined = undefined, const abiEvents extends @@ -131,8 +140,10 @@ export function watchShredEvent< onError, onLogs, strict: strict_, + managed, + buffered, }: WatchShredEventParameters, -): WatchShredEventReturnType { +): Promise { const transport_ = (() => { if (client.transport.type === 'webSocket') return client.transport @@ -226,5 +237,29 @@ export function watchShredEvent< return () => unsubscribe() } - return subscribeShredEvents() + // Handle managed subscriptions + if (managed) { + const manager = getSubscriptionManager() + const subscription = await manager.createManagedSubscription(client, { + address, + args, + event, + events, + onError, + onLogs, + strict: strict_, + buffered, + }) + + // Return enhanced unsubscribe with subscription property + const enhancedUnsubscribe = Object.assign( + () => subscription.unsubscribe(), + { subscription }, + ) as WatchShredEventReturnType + + return enhancedUnsubscribe + } + + // Regular subscription (backward compatible) + return subscribeShredEvents() as WatchShredEventReturnType } diff --git a/src/viem/actions/shred/watchShreds.ts b/src/viem/actions/shred/watchShreds.ts index c65d594..c8181a4 100644 --- a/src/viem/actions/shred/watchShreds.ts +++ b/src/viem/actions/shred/watchShreds.ts @@ -1,6 +1,8 @@ import { formatShred } from '../../utils/formatters/shred' +import { getSubscriptionManager } from '../../utils/subscription/manager' import type { ShredsWebSocketTransport } from '../../clients/transports/shredsWebSocket' import type { RpcShred, Shred } from '../../types/shred' +import type { ManagedSubscription } from '../../utils/subscription/types' import type { Chain, Client, FallbackTransport, Transport } from 'viem' /** @@ -11,9 +13,16 @@ export interface WatchShredsParameters { onShred: (shred: Shred) => void /** The callback to call when an error occurred when trying to get for a new shred. */ onError?: ((error: Error) => void) | undefined + /** Whether to create a managed subscription that supports dynamic updates. */ + managed?: boolean | undefined + /** Whether to buffer events during subscription updates (only with managed: true). */ + buffered?: boolean | undefined } -export type WatchShredsReturnType = () => void +export type WatchShredsReturnType = (() => void) & { + /** The managed subscription object, only present when managed: true */ + subscription?: ManagedSubscription | undefined +} /** * Watches for new shreds on the RISE network. @@ -22,7 +31,7 @@ export type WatchShredsReturnType = () => void * @param parameters - {@link WatchShredsParameters} * @returns A function that can be used to unsubscribe from the shred. */ -export function watchShreds< +export async function watchShreds< chain extends Chain | undefined, transport extends | ShredsWebSocketTransport @@ -31,8 +40,8 @@ export function watchShreds< > = ShredsWebSocketTransport, >( client: Client, - { onShred, onError }: WatchShredsParameters, -): () => void { + { onShred, onError, managed, buffered }: WatchShredsParameters, +): Promise { const transport_ = (() => { if (client.transport.type === 'webSocket') return client.transport @@ -78,5 +87,24 @@ export function watchShreds< })() return () => unsubscribe() } - return subscribeShreds() + // Handle managed subscriptions + if (managed) { + const manager = getSubscriptionManager() + const subscription = await manager.createManagedSubscription(client, { + onShred, + onError, + buffered, + }) + + // Return enhanced unsubscribe with subscription property + const enhancedUnsubscribe = Object.assign( + () => subscription.unsubscribe(), + { subscription }, + ) as WatchShredsReturnType + + return enhancedUnsubscribe + } + + // Regular subscription (backward compatible) + return subscribeShreds() as WatchShredsReturnType } diff --git a/src/viem/clients/createPublicShredClient.ts b/src/viem/clients/createPublicShredClient.ts index d860829..6b9fc7f 100644 --- a/src/viem/clients/createPublicShredClient.ts +++ b/src/viem/clients/createPublicShredClient.ts @@ -13,6 +13,11 @@ import { type Transport, } from 'viem' import type { ShredRpcSchema } from '../types/rpcSchema' +import { + connectionActions, + type ConnectionActions, +} from './decorators/connection' +import { queueActions, type QueueActions } from './decorators/queue' import { shredActions, type ShredActions } from './decorators/shred' import type { ShredsWebSocketTransport } from './transports/shredsWebSocket' @@ -33,7 +38,10 @@ export type PublicShredClient< rpcSchema extends RpcSchema ? [...PublicRpcSchema, ...rpcSchema] : PublicRpcSchema, - PublicActions & ShredActions + PublicActions & + ShredActions & + ConnectionActions & + QueueActions > > @@ -52,5 +60,8 @@ export function createPublicShredClient< ParseAccount, rpcSchema > { - return createPublicClient({ ...parameters }).extend(shredActions) as any + return createPublicClient({ ...parameters }) + .extend(shredActions) + .extend(connectionActions) + .extend(queueActions) as any } diff --git a/src/viem/clients/createPublicSyncClient.ts b/src/viem/clients/createPublicSyncClient.ts index 5c616e8..4f6224a 100644 --- a/src/viem/clients/createPublicSyncClient.ts +++ b/src/viem/clients/createPublicSyncClient.ts @@ -11,6 +11,10 @@ import { type RpcSchema, } from 'viem' import type { ShredRpcSchema } from '../types/rpcSchema' +import { + connectionActions, + type ConnectionActions, +} from './decorators/connection' import { syncActions, type SyncActions } from './decorators/sync' import type { ShredsWebSocketTransport } from './transports/shredsWebSocket' @@ -27,7 +31,7 @@ export type PublicSyncClient< rpcSchema extends RpcSchema ? [...PublicRpcSchema, ...rpcSchema, ...ShredRpcSchema] : [...PublicRpcSchema, ...ShredRpcSchema], - PublicActions & SyncActions + PublicActions & SyncActions & ConnectionActions > > @@ -44,5 +48,7 @@ export function createPublicSyncClient< ParseAccount, rpcSchema > { - return createPublicClient({ ...parameters }).extend(syncActions) as any + return createPublicClient({ ...parameters }) + .extend(syncActions) + .extend(connectionActions) as any } diff --git a/src/viem/clients/decorators/connection.ts b/src/viem/clients/decorators/connection.ts new file mode 100644 index 0000000..6c3855d --- /dev/null +++ b/src/viem/clients/decorators/connection.ts @@ -0,0 +1,142 @@ +import type { ConnectionStats, ConnectionStatus } from '../../types/connection' +import type { Chain, Client, Transport } from 'viem' + +export type ConnectionActions = { + getConnectionStatus: () => ConnectionStatus + getConnectionStats: () => ConnectionStats + isConnected: () => boolean + onConnectionChange: ( + callback: (status: ConnectionStatus) => void, + ) => () => void // returns unsubscribe function + waitForConnection: (timeoutMs?: number) => Promise +} + +export function connectionActions< + TTransport extends Transport = Transport, + TChain extends Chain | undefined = Chain | undefined, +>(client: Client): ConnectionActions { + // Cache the connection manager promise to avoid multiple async calls + let managerPromise: Promise | null = null + let cachedManager: any = null + + // We need a more reliable way to get the connection manager + // Let's store it on the transport value directly + const getManager = async () => { + const transport = client.transport as any + + // Direct WebSocket transport - methods are available directly on transport after viem processing + if (transport?.getRpcClient) { + const rpcClient = await transport.getRpcClient() + return rpcClient?.connectionManager + } + + // Legacy fallback for transport.value.getRpcClient (in case some transports still use this structure) + if (transport?.value?.getRpcClient) { + const rpcClient = await transport.value.getRpcClient() + return rpcClient?.connectionManager + } + + // Fallback transport with direct methods on first transport + if (transport?.value?.transports?.[0]?.getRpcClient) { + const rpcClient = await transport.value.transports[0].getRpcClient() + return rpcClient?.connectionManager + } + + // Fallback transport with legacy value structure + if (transport?.value?.transports?.[0]?.value?.getRpcClient) { + const rpcClient = await transport.value.transports[0].value.getRpcClient() + return rpcClient?.connectionManager + } + + return null + } + + // Initialize the manager retrieval immediately + const initializeManager = async () => { + const manager = await getManager() + cachedManager = manager + return manager + } + + // Start initialization immediately + managerPromise = initializeManager() + + const getConnectionManager = () => { + return cachedManager + } + + return { + getConnectionStatus: () => { + const manager = getConnectionManager() + return manager?.getStatus() ?? 'disconnected' + }, + + getConnectionStats: () => { + const manager = getConnectionManager() + return ( + manager?.getStats() ?? { + status: 'disconnected', + reconnectAttempts: 0, + totalConnections: 0, + totalDisconnections: 0, + } + ) + }, + + isConnected: () => { + const manager = getConnectionManager() + return manager?.getStatus() === 'connected' + }, + + onConnectionChange: (callback) => { + // Store the manager reference for unsubscribe + let manager: any = null + + // Set up the subscription when manager is available + managerPromise.then((m) => { + if (!m) return + manager = m + manager.on('statusChange', callback) + }) + + // Return unsubscribe function that works immediately + return () => { + if (manager) { + manager.off('statusChange', callback) + } + } + }, + + waitForConnection: async (timeoutMs = 30000) => { + const manager = await managerPromise + if (!manager) throw new Error('No connection manager available') + + if (manager.getStatus() === 'connected') return + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + unsubscribe() + reject(new Error('Connection timeout')) + }, timeoutMs) + + const checkStatus = (status: ConnectionStatus) => { + if (status === 'connected') { + clearTimeout(timeout) + unsubscribe() + resolve() + } + } + + manager.on('statusChange', checkStatus) + const unsubscribe = () => manager.off('statusChange', checkStatus) + + // Check current status in case it changed before listener was added + if (manager.getStatus() === 'connected') { + clearTimeout(timeout) + unsubscribe() + resolve() + } + }) + }, + } +} diff --git a/src/viem/clients/decorators/queue.ts b/src/viem/clients/decorators/queue.ts new file mode 100644 index 0000000..d5598bd --- /dev/null +++ b/src/viem/clients/decorators/queue.ts @@ -0,0 +1,114 @@ +import { + getRequestQueue, + type RequestQueueManager, +} from '../../utils/queue/manager' +import type { + QueuedRequest, + RequestQueue, + RequestQueueStats, +} from '../../utils/queue/types' +import type { Client } from 'viem' + +export type QueueActions = { + /** + * Queue a request to be processed when the connection is available + */ + queueRequest: (params: { + method: string + params: any[] + priority?: 'high' | 'normal' | 'low' + onSuccess?: (result: any) => void + onError?: (error: Error) => void + }) => Promise + + /** + * Get the current request queue instance + */ + getRequestQueue: () => RequestQueue + + /** + * Get queue statistics + */ + getQueueStats: () => RequestQueueStats + + /** + * Pause request processing + */ + pauseQueue: () => void + + /** + * Resume request processing + */ + resumeQueue: () => void + + /** + * Clear all queued requests + */ + clearQueue: () => void + + /** + * Get all queued requests + */ + getQueuedRequests: () => Omit[] +} + +export function queueActions( + client: TClient, +): QueueActions { + // Get or create queue manager + let queueManager: RequestQueueManager | null = null + + const getQueue = () => { + if (!queueManager) { + // Get the underlying transport + const transport = client.transport + if (!transport) { + throw new Error('Transport not available') + } + + // Create queue with connection awareness + queueManager = getRequestQueue(transport, { + processingInterval: 200, // Slightly slower to avoid overwhelming connection checks + retryDelay: 1000, + maxRetries: 5, // Increase retries for connection issues + }) + } + return queueManager + } + + return { + queueRequest: ({ + method, + params, + priority = 'normal', + onSuccess, + onError, + }) => { + const queue = getQueue() + return queue.add({ + method, + params, + priority, + maxRetries: 3, + onSuccess, + onError, + }) + }, + + getRequestQueue: () => getQueue(), + + getQueueStats: () => getQueue().getStats(), + + pauseQueue: () => getQueue().pause(), + + resumeQueue: () => getQueue().resume(), + + clearQueue: () => getQueue().clear(), + + getQueuedRequests: () => { + const requests = getQueue().getQueuedRequests() + // Remove resolve/reject functions before returning + return requests.map(({ resolve, reject, ...rest }) => rest) + }, + } +} diff --git a/src/viem/clients/decorators/shred.ts b/src/viem/clients/decorators/shred.ts index 18500e1..c242b37 100644 --- a/src/viem/clients/decorators/shred.ts +++ b/src/viem/clients/decorators/shred.ts @@ -43,7 +43,7 @@ export type ShredActions = { strict extends boolean | undefined = undefined, >( parameters: WatchContractShredEventParameters, - ) => WatchContractShredEventReturnType + ) => Promise /** * Watches and returns emitted events that have been processed and confirmed as shreds * on the RISE network. @@ -60,14 +60,16 @@ export type ShredActions = { strict extends boolean | undefined = undefined, >( parameters: WatchShredEventParameters, - ) => WatchShredEventReturnType + ) => Promise /** * Watches for new shreds on the RISE network. * * @param parameters - {@link WatchShredsParameters} * @returns A function that can be used to unsubscribe from the shred. */ - watchShreds: (parameters: WatchShredsParameters) => WatchShredsReturnType + watchShreds: ( + parameters: WatchShredsParameters, + ) => Promise } export function shredActions< diff --git a/src/viem/clients/transports/shredsWebSocket.ts b/src/viem/clients/transports/shredsWebSocket.ts index 8f7eb0d..13c8937 100644 --- a/src/viem/clients/transports/shredsWebSocket.ts +++ b/src/viem/clients/transports/shredsWebSocket.ts @@ -70,9 +70,17 @@ export function shredsWebSocket( value: ws_.value ? { getSocket: ws_.value.getSocket, - getRpcClient: ws_.value.getRpcClient, + getRpcClient: () => getWebSocketRpcClient(url_, wsRpcClientOpts), subscribe: ws_.value.subscribe, - async riseSubscribe({ params, onData, onError }) { + async riseSubscribe({ + params, + onData, + onError, + }: { + params: any + onData: any + onError: any + }) { const rpcClient = await getWebSocketRpcClient( url_, wsRpcClientOpts, @@ -123,6 +131,6 @@ export function shredsWebSocket( }, } : undefined, - } + } as any } } diff --git a/src/viem/types/connection.ts b/src/viem/types/connection.ts new file mode 100644 index 0000000..93af5e1 --- /dev/null +++ b/src/viem/types/connection.ts @@ -0,0 +1,20 @@ +export type ConnectionStatus = + | 'connecting' + | 'connected' + | 'disconnected' + | 'error' + +export interface ConnectionStats { + status: ConnectionStatus + connectedAt?: number + disconnectedAt?: number + reconnectAttempts: number + lastError?: Error + totalConnections: number + totalDisconnections: number +} + +export interface ConnectionEventMap { + statusChange: (status: ConnectionStatus) => void + stats: (stats: ConnectionStats) => void +} diff --git a/src/viem/utils/connection/manager.ts b/src/viem/utils/connection/manager.ts new file mode 100644 index 0000000..e60199e --- /dev/null +++ b/src/viem/utils/connection/manager.ts @@ -0,0 +1,52 @@ +import { EventEmitter } from 'node:events' +import type { ConnectionStats, ConnectionStatus } from '../../types/connection' + +export class ConnectionStateManager extends EventEmitter { + private state: ConnectionStats = { + status: 'disconnected', + reconnectAttempts: 0, + totalConnections: 0, + totalDisconnections: 0, + } + + updateStatus(status: ConnectionStatus, error?: Error): void { + const previousStatus = this.state.status + this.state.status = status + + if (status === 'connected') { + this.state.connectedAt = Date.now() + this.state.totalConnections++ + this.state.reconnectAttempts = 0 + delete this.state.lastError + } else if (status === 'disconnected') { + this.state.disconnectedAt = Date.now() + if (previousStatus === 'connected') { + this.state.totalDisconnections++ + } + } else if (status === 'error') { + this.state.lastError = error + } + + if (previousStatus !== status) { + this.emit('statusChange', status) + this.emit('stats', { ...this.state }) + } + } + + incrementReconnectAttempts(): void { + this.state.reconnectAttempts++ + this.emit('stats', { ...this.state }) + } + + resetReconnectAttempts(): void { + this.state.reconnectAttempts = 0 + } + + getStats(): ConnectionStats { + return { ...this.state } + } + + getStatus(): ConnectionStatus { + return this.state.status + } +} diff --git a/src/viem/utils/queue/manager.ts b/src/viem/utils/queue/manager.ts new file mode 100644 index 0000000..df29924 --- /dev/null +++ b/src/viem/utils/queue/manager.ts @@ -0,0 +1,326 @@ +import type { + QueuedRequest, + RequestQueue, + RequestQueueConfig, + RequestQueueStats, +} from './types' + +export class RequestQueueManager implements RequestQueue { + private queue: QueuedRequest[] = [] + private processing = new Set() + private paused = false + private requestIdCounter = 0 + private processingTimer?: NodeJS.Timeout + + // Stats + private processed = 0 + private failed = 0 + private totalProcessingTime = 0 + private lastProcessedAt?: number + + // Config + private maxSize: number + private maxRetries: number + private retryDelay: number + private processingInterval: number + private priorityWeights: { high: number; normal: number; low: number } + + // Transport reference + private transport: any + private connectionManager: any = null + + constructor(transport: any, config: RequestQueueConfig = {}) { + this.transport = transport + this.maxSize = config.maxSize ?? 1000 + this.maxRetries = config.maxRetries ?? 3 + this.retryDelay = config.retryDelay ?? 1000 + this.processingInterval = config.processingInterval ?? 100 + this.priorityWeights = config.priorityWeights ?? { + high: 3, + normal: 2, + low: 1, + } + + // Initialize connection manager and start processing when ready + this.initializeConnectionManager() + } + + add( + request: Omit< + QueuedRequest, + 'id' | 'createdAt' | 'retryCount' | 'resolve' | 'reject' + >, + ): Promise { + if (this.queue.length >= this.maxSize) { + throw new Error(`Request queue is full (max: ${this.maxSize})`) + } + + return new Promise((resolve, reject) => { + const queuedRequest: QueuedRequest = { + ...request, + id: `req_${++this.requestIdCounter}`, + createdAt: Date.now(), + retryCount: 0, + resolve, + reject, + } + + // Insert based on priority + const insertIndex = this.findInsertIndex(queuedRequest.priority) + this.queue.splice(insertIndex, 0, queuedRequest) + + // Process immediately if not paused + if (!this.paused) { + this.processQueue() + } + }) + } + + private findInsertIndex(priority: 'high' | 'normal' | 'low'): number { + const weight = this.priorityWeights[priority] + + for (let i = 0; i < this.queue.length; i++) { + const itemWeight = this.priorityWeights[this.queue[i].priority] + if (weight > itemWeight) { + return i + } + } + + return this.queue.length + } + + private startProcessing(): void { + this.processingTimer = setInterval(() => { + if (!this.paused && this.queue.length > 0) { + this.processQueue() + } + }, this.processingInterval) + } + + private async processQueue(): Promise { + // Process multiple requests in parallel (up to 5) + const batchSize = Math.min(5, this.queue.length) + const batch: QueuedRequest[] = [] + + for (let i = 0; i < batchSize; i++) { + const request = this.queue.shift() + if (request && !this.processing.has(request.id)) { + batch.push(request) + this.processing.add(request.id) + } + } + + // Process batch + await Promise.all(batch.map((request) => this.processRequest(request))) + } + + private async processRequest(request: QueuedRequest): Promise { + const startTime = Date.now() + + try { + // Use improved connection checking + if (!(await this.isConnectionReady())) { + // Re-queue if not connected + if (request.retryCount < this.maxRetries) { + request.retryCount++ + // Add delay before retry based on connection status + const delay = + this.connectionManager?.getStatus() === 'connecting' + ? 500 + : this.retryDelay + setTimeout(() => { + this.queue.unshift(request) + }, delay * request.retryCount) + this.processing.delete(request.id) + return + } else { + throw new Error('WebSocket not connected after max retries') + } + } + + // Send request using transport's request method + const result = await this.transport.request({ + body: { + method: request.method, + params: request.params, + }, + }) + + // Success + this.processed++ + this.totalProcessingTime += Date.now() - startTime + this.lastProcessedAt = Date.now() + + request.resolve(result) + request.onSuccess?.(result) + } catch (error: any) { + // Handle error + if (request.retryCount < request.maxRetries) { + // Retry with delay + request.retryCount++ + setTimeout(() => { + this.queue.unshift(request) // Add back to front with higher priority + }, this.retryDelay * request.retryCount) + } else { + // Final failure + this.failed++ + request.reject(error) + request.onError?.(error) + } + } finally { + this.processing.delete(request.id) + } + } + + private async getSocket(): Promise { + try { + // Match the pattern used in connection decorators + if (this.transport?.getRpcClient) { + const rpcClient = await this.transport.getRpcClient() + return rpcClient?.socket + } + + if (this.transport?.value?.getRpcClient) { + const rpcClient = await this.transport.value.getRpcClient() + return rpcClient?.socket + } + + // Fallback for direct socket access + if (this.transport?.getSocket) { + return await this.transport.getSocket() + } + + if (this.transport?.value?.getSocket) { + return await this.transport.value.getSocket() + } + + return null + } catch { + return null + } + } + + pause(): void { + this.paused = true + } + + resume(): void { + this.paused = false + if (this.queue.length > 0) { + this.processQueue() + } + } + + private async initializeConnectionManager(): Promise { + try { + if (this.transport?.getRpcClient) { + const rpcClient = await this.transport.getRpcClient() + this.connectionManager = rpcClient?.connectionManager + } else if (this.transport?.value?.getRpcClient) { + const rpcClient = await this.transport.value.getRpcClient() + this.connectionManager = rpcClient?.connectionManager + } + + // Start processing only after connection manager is available + if (this.connectionManager) { + // Wait for connection before starting + if (this.connectionManager.getStatus() === 'connected') { + this.startProcessing() + } else { + this.connectionManager.on('statusChange', (status: string) => { + if (status === 'connected' && !this.processingTimer) { + this.startProcessing() + } else if (status !== 'connected') { + this.pause() + } + }) + } + } else { + // Fallback - start processing after delay + setTimeout(() => this.startProcessing(), 1000) + } + } catch (error) { + console.warn('Failed to initialize connection manager:', error) + // Fallback - start processing after delay + setTimeout(() => this.startProcessing(), 1000) + } + } + + private async isConnectionReady(): Promise { + try { + // First check connection manager status + if (this.connectionManager) { + const status = this.connectionManager.getStatus() + if (status !== 'connected') { + return false + } + } + + // Then verify socket is available and ready + const socket = await this.getSocket() + return Boolean(socket && socket.readyState === 1) + } catch { + return false + } + } + + clear(): void { + // Reject all pending requests + this.queue.forEach((request) => { + request.reject(new Error('Queue cleared')) + }) + this.queue = [] + this.processing.clear() + } + + getStats(): RequestQueueStats { + return { + queueSize: this.queue.length, + processing: this.processing.size, + processed: this.processed, + failed: this.failed, + avgProcessingTime: + this.processed > 0 ? this.totalProcessingTime / this.processed : 0, + lastProcessedAt: this.lastProcessedAt, + } + } + + isPaused(): boolean { + return this.paused + } + + setMaxSize(size: number): void { + this.maxSize = size + } + + getQueuedRequests(): QueuedRequest[] { + return [...this.queue] + } + + destroy(): void { + if (this.processingTimer) { + clearInterval(this.processingTimer) + } + this.clear() + } +} + +// Global instance management +let globalRequestQueue: RequestQueueManager | null = null + +export function getRequestQueue( + transport: any, + config?: RequestQueueConfig, +): RequestQueueManager { + if (!globalRequestQueue) { + globalRequestQueue = new RequestQueueManager(transport, config) + } + return globalRequestQueue +} + +export function clearGlobalRequestQueue(): void { + if (globalRequestQueue) { + globalRequestQueue.destroy() + globalRequestQueue = null + } +} diff --git a/src/viem/utils/queue/types.ts b/src/viem/utils/queue/types.ts new file mode 100644 index 0000000..5abb50e --- /dev/null +++ b/src/viem/utils/queue/types.ts @@ -0,0 +1,50 @@ +export interface QueuedRequest { + id: string + method: string + params: any[] + priority: 'high' | 'normal' | 'low' + createdAt: number + retryCount: number + maxRetries: number + onSuccess?: (result: any) => void + onError?: (error: Error) => void + resolve: (value: any) => void + reject: (reason: any) => void +} + +export interface RequestQueueConfig { + maxSize?: number + maxRetries?: number + retryDelay?: number + processingInterval?: number + priorityWeights?: { + high: number + normal: number + low: number + } +} + +export interface RequestQueueStats { + queueSize: number + processing: number + processed: number + failed: number + avgProcessingTime: number + lastProcessedAt?: number +} + +export interface RequestQueue { + add: ( + request: Omit< + QueuedRequest, + 'id' | 'createdAt' | 'retryCount' | 'resolve' | 'reject' + >, + ) => Promise + pause: () => void + resume: () => void + clear: () => void + getStats: () => RequestQueueStats + isPaused: () => boolean + setMaxSize: (size: number) => void + getQueuedRequests: () => QueuedRequest[] +} diff --git a/src/viem/utils/rpc/socket.ts b/src/viem/utils/rpc/socket.ts index 77e417d..a6a63fe 100644 --- a/src/viem/utils/rpc/socket.ts +++ b/src/viem/utils/rpc/socket.ts @@ -1,4 +1,5 @@ import { SocketClosedError, TimeoutError, withTimeout } from 'viem' +import { ConnectionStateManager } from '../connection/manager' import { createBatchScheduler, type CreateBatchSchedulerErrorType, @@ -42,6 +43,7 @@ export type SocketRpcClient = { requests: CallbackMap subscriptions: CallbackMap url: string + connectionManager: ConnectionStateManager } export type GetSocketRpcClientParameters = { @@ -125,14 +127,20 @@ export async function getSocketRpcClient( // Set up a cache for subscriptions (rise_subscribe). const subscriptions = new Map() + // Create connection state manager + const connectionManager = new ConnectionStateManager() + let error: Error | Event | undefined let socket: Socket<{}> let keepAliveTimer: ReturnType | undefined // Set up socket implementation. async function setup() { + connectionManager.updateStatus('connecting') const result = await getSocket({ onClose() { + connectionManager.updateStatus('disconnected') + // Notify all requests and subscriptions of the closure error. for (const request of requests.values()) request.onError?.(new SocketClosedError({ url })) @@ -140,11 +148,17 @@ export async function getSocketRpcClient( subscription.onError?.(new SocketClosedError({ url })) // Attempt to reconnect. - if (reconnect && reconnectCount < attempts) + if (reconnect && reconnectCount < attempts) { + const backoffDelay = Math.min( + delay * 2 ** reconnectCount, + 30000, // max 30 seconds + ) setTimeout(async () => { reconnectCount++ + connectionManager.incrementReconnectAttempts() await setup().catch(console.error) - }, delay) + }, backoffDelay) + } // Otherwise, clear all requests and subscriptions. else { requests.clear() @@ -153,6 +167,7 @@ export async function getSocketRpcClient( }, onError(error_) { error = error_ + connectionManager.updateStatus('error', error_ as Error) // Notify all requests and subscriptions of the error. for (const request of requests.values()) request.onError?.(error) @@ -163,11 +178,17 @@ export async function getSocketRpcClient( socketClient?.close() // Attempt to reconnect. - if (reconnect && reconnectCount < attempts) + if (reconnect && reconnectCount < attempts) { + const backoffDelay = Math.min( + delay * 2 ** reconnectCount, + 30000, // max 30 seconds + ) setTimeout(async () => { reconnectCount++ + connectionManager.incrementReconnectAttempts() await setup().catch(console.error) - }, delay) + }, backoffDelay) + } // Otherwise, clear all requests and subscriptions. else { requests.clear() @@ -177,6 +198,8 @@ export async function getSocketRpcClient( onOpen() { error = undefined reconnectCount = 0 + connectionManager.resetReconnectAttempts() + connectionManager.updateStatus('connected') }, onResponse(data) { const isSubscription = data.method === 'rise_subscription' @@ -268,6 +291,7 @@ export async function getSocketRpcClient( requests, subscriptions, url, + connectionManager, } socketClientCache.set(`${key}:${url}`, socketClient) diff --git a/src/viem/utils/subscription/manager.ts b/src/viem/utils/subscription/manager.ts new file mode 100644 index 0000000..b82e625 --- /dev/null +++ b/src/viem/utils/subscription/manager.ts @@ -0,0 +1,257 @@ +import type { + ManagedSubscription, + ManagedSubscriptionConfig, + SubscriptionStats, +} from './types' +import type { Address, LogTopic } from 'viem' + +class ManagedSubscriptionImpl implements ManagedSubscription { + public readonly id: string + public readonly type: 'shreds' | 'logs' + + private client: any + private currentParams: any + private onUpdate: (newParams: any) => Promise + private unsubscribeFn?: () => void + + private paused = false + private eventCount = 0 + private createdAt: number + private lastEventAt?: number + private eventBuffer: any[] = [] + private temporaryHandler?: (event: any) => void + + constructor(config: ManagedSubscriptionConfig) { + this.id = config.id + this.type = config.type + this.client = config.client + this.currentParams = { ...config.initialParams } + this.onUpdate = config.onUpdate + this.createdAt = Date.now() + } + + async start(): Promise { + if (this.unsubscribeFn) { + this.unsubscribeFn() + } + + const originalOnLogs = + this.currentParams.onLogs || this.currentParams.onShred + const wrappedHandler = (data: any) => { + this.eventCount++ + this.lastEventAt = Date.now() + + // Handle temporary buffering during updates + if (this.temporaryHandler) { + this.temporaryHandler(data) + return + } + + // Handle pause state + if (this.paused) { + this.eventBuffer.push(data) + return + } + + // Normal event handling + originalOnLogs?.(data) + } + + // Create new subscription with wrapped handler by calling client methods + if (this.type === 'shreds') { + const watchResult = await this.client.watchShreds({ + ...this.currentParams, + onShred: wrappedHandler, + managed: false, // Prevent recursive managed subscriptions + }) + this.unsubscribeFn = watchResult + } else { + const watchResult = await this.client.watchShredEvent({ + ...this.currentParams, + onLogs: wrappedHandler, + managed: false, // Prevent recursive managed subscriptions + }) + this.unsubscribeFn = watchResult + } + } + + async restart(newParams: Partial): Promise { + this.currentParams = { ...this.currentParams, ...newParams } + await this.start() + } + + async addAddress(address: Address): Promise { + const currentAddresses = this.currentParams.address + ? Array.isArray(this.currentParams.address) + ? this.currentParams.address + : [this.currentParams.address] + : [] + + if (!currentAddresses.includes(address)) { + const newAddresses = [...currentAddresses, address] + await this.onUpdate({ address: newAddresses }) + } + } + + async removeAddress(address: Address): Promise { + const currentAddresses = this.currentParams.address + ? Array.isArray(this.currentParams.address) + ? this.currentParams.address + : [this.currentParams.address] + : [] + + const newAddresses = currentAddresses.filter((a: Address) => a !== address) + if (newAddresses.length !== currentAddresses.length) { + await this.onUpdate({ address: newAddresses }) + } + } + + getAddresses(): Address[] { + if (!this.currentParams.address) return [] + return Array.isArray(this.currentParams.address) + ? this.currentParams.address + : [this.currentParams.address] + } + + async updateTopics(topics: LogTopic[]): Promise { + await this.onUpdate({ topics }) + } + + getTopics(): LogTopic[] { + return this.currentParams.topics || [] + } + + pause(): void { + this.paused = true + } + + resume(): void { + this.paused = false + + // Deliver buffered events + const buffer = this.eventBuffer + this.eventBuffer = [] + + const handler = this.currentParams.onLogs || this.currentParams.onShred + buffer.forEach((event) => handler?.(event)) + } + + isPaused(): boolean { + return this.paused + } + + getStats(): SubscriptionStats { + return { + eventCount: this.eventCount, + createdAt: this.createdAt, + lastEventAt: this.lastEventAt, + isPaused: this.paused, + addresses: this.getAddresses(), + topics: this.getTopics(), + } + } + + unsubscribe(): void { + if (this.unsubscribeFn) { + this.unsubscribeFn() + this.unsubscribeFn = undefined + } + } + + setTemporaryHandler(handler: (event: any) => void): void { + this.temporaryHandler = handler + } + + clearTemporaryHandler(): void { + this.temporaryHandler = undefined + } + + handleEvent(event: any): void { + const handler = this.currentParams.onLogs || this.currentParams.onShred + handler?.(event) + } +} + +export class SubscriptionManager { + private subscriptions = new Map() + private subscriptionIdCounter = 0 + + async createManagedSubscription( + client: any, + params: any, + ): Promise { + const subscriptionId = `sub_${++this.subscriptionIdCounter}` + const type = params.onShred ? 'shreds' : 'logs' + + // Create wrapper that manages state + const managed = new ManagedSubscriptionImpl({ + id: subscriptionId, + type, + client, + initialParams: params, + onUpdate: async (newParams) => { + // Handle dynamic updates + await this.updateSubscription(subscriptionId, newParams) + }, + }) + + this.subscriptions.set(subscriptionId, managed) + + // Start initial subscription + await managed.start() + + return managed + } + + private async updateSubscription( + id: string, + newParams: Partial, + ): Promise { + const managed = this.subscriptions.get(id) + if (!managed) throw new Error('Subscription not found') + + // Strategy: Unsubscribe and resubscribe with event buffering + const buffer: any[] = [] + const isPaused = managed.isPaused() + + // Temporarily buffer events + const tempHandler = (event: any) => buffer.push(event) + managed.setTemporaryHandler(tempHandler) + + // Perform update + await managed.restart(newParams) + + // Replay buffered events + buffer.forEach((event) => managed.handleEvent(event)) + managed.clearTemporaryHandler() + + // Restore pause state + if (isPaused) managed.pause() + } + + getSubscription(id: string): ManagedSubscription | undefined { + return this.subscriptions.get(id) + } + + getAllSubscriptions(): ManagedSubscription[] { + return Array.from(this.subscriptions.values()) + } + + async unsubscribeAll(): Promise { + const promises = Array.from(this.subscriptions.values()).map((sub) => + sub.unsubscribe(), + ) + await Promise.all(promises) + this.subscriptions.clear() + } +} + +// Global subscription manager instance +let globalSubscriptionManager: SubscriptionManager | null = null + +export function getSubscriptionManager(): SubscriptionManager { + if (!globalSubscriptionManager) { + globalSubscriptionManager = new SubscriptionManager() + } + return globalSubscriptionManager +} diff --git a/src/viem/utils/subscription/types.ts b/src/viem/utils/subscription/types.ts new file mode 100644 index 0000000..191160c --- /dev/null +++ b/src/viem/utils/subscription/types.ts @@ -0,0 +1,45 @@ +import type { Address, LogTopic } from 'viem' + +export interface ManagedSubscription { + id: string + type: 'shreds' | 'logs' + + // Dynamic management methods + addAddress: (address: Address) => Promise + removeAddress: (address: Address) => Promise + getAddresses: () => Address[] + updateTopics: (topics: LogTopic[]) => Promise + getTopics: () => LogTopic[] + + // State control + pause: () => void + resume: () => void + isPaused: () => boolean + + // Statistics + getStats: () => { + eventCount: number + createdAt: number + lastEventAt?: number + } + + // Cleanup + unsubscribe: () => void +} + +export interface SubscriptionStats { + eventCount: number + createdAt: number + lastEventAt?: number + isPaused: boolean + addresses: Address[] + topics: LogTopic[] +} + +export interface ManagedSubscriptionConfig { + id: string + type: 'shreds' | 'logs' + client: any // Will be typed as PublicShredClient + initialParams: any // Will be typed as WatchShredEventParameters + onUpdate: (newParams: any) => Promise +} diff --git a/tests/integration/backward-compatibility.test.ts b/tests/integration/backward-compatibility.test.ts new file mode 100644 index 0000000..9aabc5c --- /dev/null +++ b/tests/integration/backward-compatibility.test.ts @@ -0,0 +1,61 @@ +import { riseTestnet } from 'viem/chains' +import { describe, expect, it } from 'vitest' +import { createPublicShredClient, shredsWebSocket } from '../../src/viem' + +describe('Backward Compatibility', () => { + it('should create client without any configuration (like old code)', () => { + // This is how users would have created clients before our changes + const client = createPublicShredClient({ + transport: shredsWebSocket('ws://localhost:8545'), + }) + + // Client should be created successfully + expect(client).toBeDefined() + expect(client.watchShreds).toBeDefined() + expect(client.watchShredEvent).toBeDefined() + expect(client.watchContractShredEvent).toBeDefined() + }) + + it('should create client with chain config (common pattern)', () => { + // Another common pattern + const client = createPublicShredClient({ + chain: riseTestnet, + transport: shredsWebSocket('ws://localhost:8545'), + }) + + expect(client).toBeDefined() + expect(client.chain).toBe(riseTestnet) + }) + + it('new connection methods should be available but not required', () => { + const client = createPublicShredClient({ + transport: shredsWebSocket('ws://localhost:8545'), + }) + + // New methods exist + expect(client.getConnectionStatus).toBeDefined() + expect(client.getConnectionStats).toBeDefined() + expect(client.isConnected).toBeDefined() + expect(client.onConnectionChange).toBeDefined() + expect(client.waitForConnection).toBeDefined() + + // But they're not required - old code still works + expect(typeof client.getConnectionStatus).toBe('function') + }) + + it('should handle missing connection manager gracefully', () => { + const client = createPublicShredClient({ + transport: shredsWebSocket('ws://localhost:8545'), + }) + + // Even if connection manager isn't ready, methods should return safe defaults + expect(client.getConnectionStatus()).toBe('disconnected') + expect(client.isConnected()).toBe(false) + expect(client.getConnectionStats()).toEqual({ + status: 'disconnected', + reconnectAttempts: 0, + totalConnections: 0, + totalDisconnections: 0, + }) + }) +}) diff --git a/tests/viem/actions/shred/watchContractShredEvent.test.ts b/tests/viem/actions/shred/watchContractShredEvent.test.ts index 1aa4b8f..43a5891 100644 --- a/tests/viem/actions/shred/watchContractShredEvent.test.ts +++ b/tests/viem/actions/shred/watchContractShredEvent.test.ts @@ -41,13 +41,13 @@ describe('watchContractShredEvent', () => { mockOnError = vi.fn() }) - it('should subscribe to contract events and call onLogs', () => { + it('should subscribe to contract events and call onLogs', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchContractShredEvent(mockClient, { + const unsubscribe = await watchContractShredEvent(mockClient, { abi: mockAbi, address: '0x123', eventName: 'Transfer', @@ -162,7 +162,7 @@ describe('watchContractShredEvent', () => { unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchContractShredEvent(mockClient, { + const unsubscribe = await watchContractShredEvent(mockClient, { abi: mockAbi, address: '0x123', eventName: 'Transfer', @@ -176,13 +176,13 @@ describe('watchContractShredEvent', () => { expect(mockUnsubscribe).toHaveBeenCalled() }) - it('should handle multiple addresses', () => { + it('should handle multiple addresses', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - watchContractShredEvent(mockClient, { + await watchContractShredEvent(mockClient, { abi: mockAbi, address: ['0x123', '0x456'], eventName: 'Transfer', @@ -199,7 +199,7 @@ describe('watchContractShredEvent', () => { }) }) - it('should throw error if no webSocket transport is available', () => { + it('should throw error if no webSocket transport is available', async () => { const mockClientWithoutWS = { transport: { type: 'fallback', @@ -207,13 +207,13 @@ describe('watchContractShredEvent', () => { }, } as any - expect(() => { + await expect( watchContractShredEvent(mockClientWithoutWS, { abi: mockAbi, address: '0x123', eventName: 'Transfer', onLogs: mockOnLogs, - }) - }).toThrow('A shredWebSocket transport is required') + }), + ).rejects.toThrow('A shredWebSocket transport is required') }) }) diff --git a/tests/viem/actions/shred/watchShredEvent.test.ts b/tests/viem/actions/shred/watchShredEvent.test.ts index 3eec7d9..406cac4 100644 --- a/tests/viem/actions/shred/watchShredEvent.test.ts +++ b/tests/viem/actions/shred/watchShredEvent.test.ts @@ -39,13 +39,13 @@ describe('watchShredEvent', () => { mockOnError = vi.fn() }) - it('should subscribe to shred events with single event', () => { + it('should subscribe to shred events with single event', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchShredEvent(mockClient, { + const unsubscribe = await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, }) @@ -59,14 +59,14 @@ describe('watchShredEvent', () => { expect(typeof unsubscribe).toBe('function') }) - it('should subscribe to shred events with multiple events', () => { + it('should subscribe to shred events with multiple events', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) const events = [mockEvent] - const unsubscribe = watchShredEvent(mockClient, { + const unsubscribe = await watchShredEvent(mockClient, { events, onLogs: mockOnLogs, }) @@ -80,13 +80,13 @@ describe('watchShredEvent', () => { expect(typeof unsubscribe).toBe('function') }) - it('should subscribe without specific events', () => { + it('should subscribe without specific events', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchShredEvent(mockClient, { + const unsubscribe = await watchShredEvent(mockClient, { onLogs: mockOnLogs, }) @@ -108,7 +108,7 @@ describe('watchShredEvent', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, }) @@ -140,7 +140,7 @@ describe('watchShredEvent', () => { const error = new Error('Subscription failed') mockTransport.riseSubscribe.mockRejectedValue(error) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, onError: mockOnError, @@ -161,7 +161,7 @@ describe('watchShredEvent', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, strict: false, @@ -195,7 +195,7 @@ describe('watchShredEvent', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, strict: true, @@ -227,7 +227,7 @@ describe('watchShredEvent', () => { unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchShredEvent(mockClient, { + const unsubscribe = await watchShredEvent(mockClient, { event: mockEvent, onLogs: mockOnLogs, }) @@ -239,13 +239,13 @@ describe('watchShredEvent', () => { expect(mockUnsubscribe).toHaveBeenCalled() }) - it('should handle address parameter', () => { + it('should handle address parameter', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, address: '0x123', onLogs: mockOnLogs, @@ -258,13 +258,13 @@ describe('watchShredEvent', () => { }) }) - it('should handle multiple addresses', () => { + it('should handle multiple addresses', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - watchShredEvent(mockClient, { + await watchShredEvent(mockClient, { event: mockEvent, address: ['0x123', '0x456'], onLogs: mockOnLogs, @@ -280,7 +280,7 @@ describe('watchShredEvent', () => { }) }) - it('should throw error if no webSocket transport is available', () => { + it('should throw error if no webSocket transport is available', async () => { const mockClientWithoutWS = { transport: { type: 'fallback', @@ -288,15 +288,15 @@ describe('watchShredEvent', () => { }, } as any - expect(() => { + await expect( watchShredEvent(mockClientWithoutWS, { event: mockEvent, onLogs: mockOnLogs, - }) - }).toThrow('A shredWebSocket transport is required') + }), + ).rejects.toThrow('A shredWebSocket transport is required') }) - it('should handle fallback transport with webSocket', () => { + it('should handle fallback transport with webSocket', async () => { const mockUnsubscribe = vi.fn() const mockFallbackTransport = { type: 'fallback', @@ -316,7 +316,7 @@ describe('watchShredEvent', () => { transport: mockFallbackTransport, } as any - const unsubscribe = watchShredEvent(mockClientWithFallback, { + const unsubscribe = await watchShredEvent(mockClientWithFallback, { event: mockEvent, onLogs: mockOnLogs, }) diff --git a/tests/viem/actions/shred/watchShreds.test.ts b/tests/viem/actions/shred/watchShreds.test.ts index 5f553a1..4c7acca 100644 --- a/tests/viem/actions/shred/watchShreds.test.ts +++ b/tests/viem/actions/shred/watchShreds.test.ts @@ -37,13 +37,13 @@ describe('watchShreds', () => { vi.clearAllMocks() }) - it('should subscribe to shreds and call onShred', () => { + it('should subscribe to shreds and call onShred', async () => { const mockUnsubscribe = vi.fn() mockTransport.riseSubscribe.mockResolvedValue({ unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchShreds(mockClient, { + const unsubscribe = await watchShreds(mockClient, { onShred: mockOnShred, }) @@ -65,7 +65,7 @@ describe('watchShreds', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShreds(mockClient, { + await watchShreds(mockClient, { onShred: mockOnShred, }) @@ -115,7 +115,7 @@ describe('watchShreds', () => { const error = new Error('Subscription failed') mockTransport.riseSubscribe.mockRejectedValue(error) - watchShreds(mockClient, { + await watchShreds(mockClient, { onShred: mockOnShred, onError: mockOnError, }) @@ -135,7 +135,7 @@ describe('watchShreds', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShreds(mockClient, { + await watchShreds(mockClient, { onShred: mockOnShred, onError: mockOnError, }) @@ -154,7 +154,7 @@ describe('watchShreds', () => { unsubscribe: mockUnsubscribe, }) - const unsubscribe = watchShreds(mockClient, { + const unsubscribe = await watchShreds(mockClient, { onShred: mockOnShred, }) @@ -174,7 +174,7 @@ describe('watchShreds', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShreds(mockClient, { + await watchShreds(mockClient, { onShred: mockOnShred, }) @@ -192,7 +192,7 @@ describe('watchShreds', () => { expect(mockOnShred).toHaveBeenCalled() }) - it('should throw error if no webSocket transport is available', () => { + it('should throw error if no webSocket transport is available', async () => { const mockClientWithoutWS = { transport: { type: 'fallback', @@ -200,14 +200,14 @@ describe('watchShreds', () => { }, } as any - expect(() => { + await expect( watchShreds(mockClientWithoutWS, { onShred: mockOnShred, - }) - }).toThrow('A shredWebSocket transport is required') + }), + ).rejects.toThrow('A shredWebSocket transport is required') }) - it('should handle fallback transport with webSocket', () => { + it('should handle fallback transport with webSocket', async () => { const mockUnsubscribe = vi.fn() const mockFallbackTransport = { type: 'fallback', @@ -227,7 +227,7 @@ describe('watchShreds', () => { transport: mockFallbackTransport, } as any - const unsubscribe = watchShreds(mockClientWithFallback, { + const unsubscribe = await watchShreds(mockClientWithFallback, { onShred: mockOnShred, }) @@ -243,7 +243,7 @@ describe('watchShreds', () => { return Promise.resolve({ unsubscribe: mockUnsubscribe }) }) - watchShreds(mockClient, { + await watchShreds(mockClient, { onShred: mockOnShred, }) @@ -292,7 +292,7 @@ describe('watchShreds', () => { }) }) - const unsubscribe = watchShreds(mockClient, { + const unsubscribe = await watchShreds(mockClient, { onShred: mockOnShred, }) diff --git a/tests/viem/clients/decorators/connection.test.ts b/tests/viem/clients/decorators/connection.test.ts new file mode 100644 index 0000000..51d0c85 --- /dev/null +++ b/tests/viem/clients/decorators/connection.test.ts @@ -0,0 +1,121 @@ +import { beforeEach, describe, expect, it } from 'vitest' +import { ConnectionStateManager } from '../../../../src/viem/utils/connection/manager' +import type { ConnectionStatus } from '../../../../src/viem/types/connection' + +describe('Connection Status Tracking', () => { + let manager: ConnectionStateManager + + beforeEach(() => { + manager = new ConnectionStateManager() + }) + + describe('ConnectionStateManager', () => { + it('should start with disconnected status', () => { + expect(manager.getStatus()).toBe('disconnected') + expect(manager.getStats()).toEqual({ + status: 'disconnected', + reconnectAttempts: 0, + totalConnections: 0, + totalDisconnections: 0, + }) + }) + + it('should update status and emit events', () => { + const statusChanges: ConnectionStatus[] = [] + manager.on('statusChange', (status) => statusChanges.push(status)) + + manager.updateStatus('connecting') + manager.updateStatus('connected') + manager.updateStatus('disconnected') + manager.updateStatus('error', new Error('test error')) + + expect(statusChanges).toEqual([ + 'connecting', + 'connected', + 'disconnected', + 'error', + ]) + }) + + it('should track connection timestamps', () => { + const now = Date.now() + + manager.updateStatus('connected') + const stats1 = manager.getStats() + expect(stats1.connectedAt).toBeGreaterThanOrEqual(now) + expect(stats1.totalConnections).toBe(1) + + manager.updateStatus('disconnected') + const stats2 = manager.getStats() + expect(stats2.disconnectedAt).toBeGreaterThanOrEqual(stats1.connectedAt!) + expect(stats2.totalDisconnections).toBe(1) + }) + + it('should track reconnection attempts', () => { + manager.incrementReconnectAttempts() + expect(manager.getStats().reconnectAttempts).toBe(1) + + manager.incrementReconnectAttempts() + expect(manager.getStats().reconnectAttempts).toBe(2) + + manager.updateStatus('connected') + expect(manager.getStats().reconnectAttempts).toBe(0) + }) + + it('should reset reconnect attempts on successful connection', () => { + manager.incrementReconnectAttempts() + manager.incrementReconnectAttempts() + expect(manager.getStats().reconnectAttempts).toBe(2) + + manager.resetReconnectAttempts() + expect(manager.getStats().reconnectAttempts).toBe(0) + }) + + it('should store last error', () => { + const error = new Error('Connection failed') + manager.updateStatus('error', error) + + const stats = manager.getStats() + expect(stats.status).toBe('error') + expect(stats.lastError).toBe(error) + + // Error should be cleared on successful connection + manager.updateStatus('connected') + expect(manager.getStats().lastError).toBeUndefined() + }) + + it('should not emit duplicate status changes', () => { + const statusChanges: ConnectionStatus[] = [] + manager.on('statusChange', (status) => statusChanges.push(status)) + + manager.updateStatus('connecting') + manager.updateStatus('connecting') // duplicate + manager.updateStatus('connected') + manager.updateStatus('connected') // duplicate + + expect(statusChanges).toEqual(['connecting', 'connected']) + }) + + it('should track total connections and disconnections correctly', () => { + // First connection cycle + manager.updateStatus('connecting') + manager.updateStatus('connected') + manager.updateStatus('disconnected') + + expect(manager.getStats().totalConnections).toBe(1) + expect(manager.getStats().totalDisconnections).toBe(1) + + // Second connection cycle + manager.updateStatus('connecting') + manager.updateStatus('connected') + manager.updateStatus('disconnected') + + expect(manager.getStats().totalConnections).toBe(2) + expect(manager.getStats().totalDisconnections).toBe(2) + + // Error without prior connection shouldn't increment disconnections + manager.updateStatus('error') + expect(manager.getStats().totalDisconnections).toBe(2) + }) + }) +}) diff --git a/tests/viem/clients/decorators/connectionActions.test.ts b/tests/viem/clients/decorators/connectionActions.test.ts new file mode 100644 index 0000000..e8b0a45 --- /dev/null +++ b/tests/viem/clients/decorators/connectionActions.test.ts @@ -0,0 +1,279 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { connectionActions } from '../../../../src/viem/clients/decorators/connection' +import { ConnectionStateManager } from '../../../../src/viem/utils/connection/manager' +import type { ConnectionStatus } from '../../../../src/viem/types/connection' + +describe('Connection Actions Decorator', () => { + let mockClient: any + let mockConnectionManager: ConnectionStateManager + let mockRpcClient: any + + beforeEach(() => { + // Create a real connection manager for testing + mockConnectionManager = new ConnectionStateManager() + + // Mock RPC client with connection manager + mockRpcClient = { + connectionManager: mockConnectionManager, + } + + // Mock client with transport that returns the RPC client + mockClient = { + transport: { + value: { + getRpcClient: vi.fn().mockResolvedValue(mockRpcClient), + }, + }, + } + }) + + afterEach(() => { + vi.clearAllMocks() + }) + + describe('getConnectionStatus', () => { + it('should return current connection status', async () => { + const actions = connectionActions(mockClient) + + // Initially disconnected + expect(actions.getConnectionStatus()).toBe('disconnected') + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Update status + mockConnectionManager.updateStatus('connected') + expect(actions.getConnectionStatus()).toBe('connected') + + mockConnectionManager.updateStatus('error') + expect(actions.getConnectionStatus()).toBe('error') + }) + + it('should return disconnected when no manager available', () => { + const clientNoManager = { + transport: { + value: { + getRpcClient: vi.fn().mockResolvedValue({}), + }, + }, + } + + const actions = connectionActions(clientNoManager) + expect(actions.getConnectionStatus()).toBe('disconnected') + }) + }) + + describe('getConnectionStats', () => { + it('should return connection statistics', async () => { + const actions = connectionActions(mockClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Update some stats + mockConnectionManager.updateStatus('connected') + mockConnectionManager.incrementReconnectAttempts() + + const stats = actions.getConnectionStats() + expect(stats.status).toBe('connected') + expect(stats.reconnectAttempts).toBe(1) + expect(stats.totalConnections).toBe(1) + expect(stats.connectedAt).toBeDefined() + }) + + it('should return default stats when no manager', () => { + const clientNoManager = { + transport: { value: {} }, + } + + const actions = connectionActions(clientNoManager) + const stats = actions.getConnectionStats() + + expect(stats).toEqual({ + status: 'disconnected', + reconnectAttempts: 0, + totalConnections: 0, + totalDisconnections: 0, + }) + }) + }) + + describe('isConnected', () => { + it('should return true when connected', async () => { + const actions = connectionActions(mockClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(actions.isConnected()).toBe(false) + + mockConnectionManager.updateStatus('connected') + expect(actions.isConnected()).toBe(true) + + mockConnectionManager.updateStatus('disconnected') + expect(actions.isConnected()).toBe(false) + }) + }) + + describe('onConnectionChange', () => { + it('should subscribe to connection status changes', async () => { + const actions = connectionActions(mockClient) + const statusChanges: ConnectionStatus[] = [] + + // Subscribe to changes + const unsubscribe = actions.onConnectionChange((status) => { + statusChanges.push(status) + }) + + // Wait for async subscription + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Trigger status changes + mockConnectionManager.updateStatus('connecting') + mockConnectionManager.updateStatus('connected') + mockConnectionManager.updateStatus('disconnected') + + expect(statusChanges).toEqual(['connecting', 'connected', 'disconnected']) + + // Test unsubscribe + unsubscribe() + mockConnectionManager.updateStatus('error') + + // Should not receive the error status + expect(statusChanges).toEqual(['connecting', 'connected', 'disconnected']) + }) + + it('should handle missing connection manager gracefully', () => { + const clientNoManager = { + transport: { + value: { + getRpcClient: vi.fn().mockResolvedValue({}), + }, + }, + } + + const actions = connectionActions(clientNoManager) + const unsubscribe = actions.onConnectionChange(() => {}) + + // Should not throw + expect(unsubscribe).toBeDefined() + unsubscribe() + }) + }) + + describe('waitForConnection', () => { + it('should resolve immediately when already connected', async () => { + const actions = connectionActions(mockClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + mockConnectionManager.updateStatus('connected') + + const start = Date.now() + await actions.waitForConnection() + const duration = Date.now() - start + + expect(duration).toBeLessThan(50) // Should be nearly instant + }) + + it('should wait for connection and resolve when connected', async () => { + const actions = connectionActions(mockClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Start disconnected + mockConnectionManager.updateStatus('disconnected') + + // Start waiting + const waitPromise = actions.waitForConnection(1000) + + // Connect after 50ms + setTimeout(() => { + mockConnectionManager.updateStatus('connected') + }, 50) + + await expect(waitPromise).resolves.toBeUndefined() + }) + + it('should timeout when connection not established', async () => { + const actions = connectionActions(mockClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + mockConnectionManager.updateStatus('disconnected') + + await expect( + actions.waitForConnection(100), // 100ms timeout + ).rejects.toThrow('Connection timeout') + }) + + it('should throw when no connection manager available', async () => { + const clientNoManager = { + transport: { + value: { + getRpcClient: vi.fn().mockResolvedValue({}), + }, + }, + } + + const actions = connectionActions(clientNoManager) + + await expect(actions.waitForConnection()).rejects.toThrow( + 'No connection manager available', + ) + }) + }) + + describe('fallback transport handling', () => { + it('should handle fallback transport with WebSocket as first transport', async () => { + const fallbackClient = { + transport: { + value: { + transports: [ + { + value: { + getRpcClient: vi.fn().mockResolvedValue(mockRpcClient), + }, + }, + // Other transports... + ], + }, + }, + } + + const actions = connectionActions(fallbackClient) + + // Wait for manager to be cached + await new Promise((resolve) => setTimeout(resolve, 10)) + + mockConnectionManager.updateStatus('connected') + expect(actions.getConnectionStatus()).toBe('connected') + }) + }) + + describe('caching behavior', () => { + it('should cache connection manager after first access', async () => { + const getRpcClientSpy = vi.spyOn( + mockClient.transport.value, + 'getRpcClient', + ) + const actions = connectionActions(mockClient) + + // First call triggers async retrieval + actions.getConnectionStatus() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // These calls should use cache + actions.getConnectionStatus() + actions.getConnectionStats() + actions.isConnected() + + // Should only be called once during initialization + expect(getRpcClientSpy).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/tests/viem/utils/queue/manager.test.ts b/tests/viem/utils/queue/manager.test.ts new file mode 100644 index 0000000..facacdd --- /dev/null +++ b/tests/viem/utils/queue/manager.test.ts @@ -0,0 +1,351 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { + clearGlobalRequestQueue, + getRequestQueue, + RequestQueueManager, +} from '../../../../src/viem/utils/queue/manager' + +describe('Request Queue Manager', () => { + let manager: RequestQueueManager + let mockTransport: any + let mockSocket: any + + beforeEach(() => { + // Clear any existing global queue + clearGlobalRequestQueue() + + // Mock WebSocket + mockSocket = { + readyState: 1, // OPEN + send: vi.fn(), + } + + // Mock transport + mockTransport = { + request: vi.fn().mockResolvedValue({ result: 'success' }), + value: { + getSocket: vi.fn().mockResolvedValue(mockSocket), + }, + } + + manager = new RequestQueueManager(mockTransport) + }) + + afterEach(() => { + manager.destroy() + }) + + describe('add', () => { + it('should add request to queue and process it', async () => { + const result = await manager.add({ + method: 'eth_blockNumber', + params: [], + priority: 'normal', + maxRetries: 3, + }) + + expect(result).toEqual({ result: 'success' }) + expect(mockTransport.request).toHaveBeenCalledWith({ + body: { + method: 'eth_blockNumber', + params: [], + }, + }) + }) + + it('should respect priority ordering', async () => { + // Pause to queue requests + manager.pause() + + const requests = [ + manager.add({ + method: 'low', + params: [], + priority: 'low', + maxRetries: 3, + }), + manager.add({ + method: 'high', + params: [], + priority: 'high', + maxRetries: 3, + }), + manager.add({ + method: 'normal', + params: [], + priority: 'normal', + maxRetries: 3, + }), + ] + + // Check queue order + const queued = manager.getQueuedRequests() + expect(queued[0].method).toBe('high') + expect(queued[1].method).toBe('normal') + expect(queued[2].method).toBe('low') + + // Resume and process + manager.resume() + await Promise.all(requests) + }) + + it('should reject when queue is full', async () => { + manager.setMaxSize(1) + manager.pause() + + // First request should succeed in queueing + const req1 = manager.add({ + method: 'test1', + params: [], + priority: 'normal', + maxRetries: 3, + }) + + // Wait a bit to ensure first request is in queue + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Second request should fail + expect(() => { + manager.add({ + method: 'test2', + params: [], + priority: 'normal', + maxRetries: 3, + }) + }).toThrow('Request queue is full') + + manager.resume() + await req1 + }) + }) + + describe('retry mechanism', () => { + it('should retry failed requests', async () => { + let attempts = 0 + mockTransport.request.mockImplementation(() => { + attempts++ + if (attempts < 3) { + return Promise.reject(new Error('Network error')) + } + return Promise.resolve({ result: 'success' }) + }) + + const result = await manager.add({ + method: 'eth_call', + params: [], + priority: 'normal', + maxRetries: 3, + }) + + expect(result).toEqual({ result: 'success' }) + expect(attempts).toBe(3) + }) + + it('should fail after max retries', async () => { + mockTransport.request.mockRejectedValue(new Error('Persistent error')) + + await expect( + manager.add({ + method: 'eth_call', + params: [], + priority: 'normal', + maxRetries: 2, + }), + ).rejects.toThrow('Persistent error') + + expect(mockTransport.request).toHaveBeenCalledTimes(3) // Initial + 2 retries + }) + + it('should re-queue when socket is not connected', async () => { + mockSocket.readyState = 3 // CLOSED + let connectAttempts = 0 + + mockTransport.value.getSocket.mockImplementation(() => { + connectAttempts++ + if (connectAttempts > 2) { + mockSocket.readyState = 1 // OPEN + } + return Promise.resolve(mockSocket) + }) + + const result = await manager.add({ + method: 'eth_subscribe', + params: [], + priority: 'high', + maxRetries: 5, + }) + + expect(result).toEqual({ result: 'success' }) + expect(connectAttempts).toBeGreaterThan(2) + }) + }) + + describe('pause/resume', () => { + it('should pause and resume processing', async () => { + manager.pause() + expect(manager.isPaused()).toBe(true) + + let processed = false + const promise = manager.add({ + method: 'test', + params: [], + priority: 'normal', + maxRetries: 3, + onSuccess: () => { + processed = true + }, + }) + + // Give some time for processing + await new Promise((resolve) => setTimeout(resolve, 50)) + expect(processed).toBe(false) + + manager.resume() + expect(manager.isPaused()).toBe(false) + + await promise + expect(processed).toBe(true) + }) + }) + + describe('clear', () => { + it('should clear all queued requests', async () => { + manager.pause() + + const promises = [ + manager.add({ + method: 'test1', + params: [], + priority: 'normal', + maxRetries: 3, + }), + manager.add({ + method: 'test2', + params: [], + priority: 'normal', + maxRetries: 3, + }), + ] + + expect(manager.getQueuedRequests().length).toBe(2) + + manager.clear() + + expect(manager.getQueuedRequests().length).toBe(0) + + // All promises should reject + for (const promise of promises) { + await expect(promise).rejects.toThrow('Queue cleared') + } + }) + }) + + describe('statistics', () => { + it('should track queue statistics', async () => { + const initialStats = manager.getStats() + expect(initialStats).toEqual({ + queueSize: 0, + processing: 0, + processed: 0, + failed: 0, + avgProcessingTime: 0, + lastProcessedAt: undefined, + }) + + // Add delay to ensure processing time is measurable + mockTransport.request.mockImplementation(() => { + return new Promise((resolve) => + setTimeout(() => resolve({ result: 'success' }), 10), + ) + }) + + // Process some requests + await manager.add({ + method: 'test1', + params: [], + priority: 'normal', + maxRetries: 3, + }) + await manager.add({ + method: 'test2', + params: [], + priority: 'normal', + maxRetries: 3, + }) + + // Fail one request + mockTransport.request.mockRejectedValueOnce(new Error('Failed')) + await expect( + manager.add({ + method: 'test3', + params: [], + priority: 'normal', + maxRetries: 0, + }), + ).rejects.toThrow() + + const stats = manager.getStats() + expect(stats.processed).toBe(2) + expect(stats.failed).toBe(1) + expect(stats.avgProcessingTime).toBeGreaterThan(0) + expect(stats.lastProcessedAt).toBeDefined() + }) + }) + + describe('callbacks', () => { + it('should call onSuccess callback', async () => { + const onSuccess = vi.fn() + const onError = vi.fn() + + await manager.add({ + method: 'test', + params: [], + priority: 'normal', + maxRetries: 3, + onSuccess, + onError, + }) + + expect(onSuccess).toHaveBeenCalledWith({ result: 'success' }) + expect(onError).not.toHaveBeenCalled() + }) + + it('should call onError callback', async () => { + const onSuccess = vi.fn() + const onError = vi.fn() + + mockTransport.request.mockRejectedValue(new Error('Request failed')) + + await expect( + manager.add({ + method: 'test', + params: [], + priority: 'normal', + maxRetries: 0, + onSuccess, + onError, + }), + ).rejects.toThrow('Request failed') + + expect(onError).toHaveBeenCalledWith(expect.any(Error)) + expect(onSuccess).not.toHaveBeenCalled() + }) + }) + + describe('global instance', () => { + it('should return singleton instance', () => { + const queue1 = getRequestQueue(mockTransport) + const queue2 = getRequestQueue(mockTransport) + + expect(queue1).toBe(queue2) + }) + + it('should clear global instance', () => { + const queue1 = getRequestQueue(mockTransport) + clearGlobalRequestQueue() + const queue2 = getRequestQueue(mockTransport) + + expect(queue1).not.toBe(queue2) + }) + }) +}) diff --git a/tests/viem/utils/rpc/socket.exponential-backoff.test.ts b/tests/viem/utils/rpc/socket.exponential-backoff.test.ts new file mode 100644 index 0000000..c84b3d1 --- /dev/null +++ b/tests/viem/utils/rpc/socket.exponential-backoff.test.ts @@ -0,0 +1,84 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +describe('WebSocket Exponential Backoff', () => { + let originalSetTimeout: any + let timeoutCalls: Array<{ callback: Function; delay: number }> = [] + + beforeEach(() => { + // Store original setTimeout + originalSetTimeout = global.setTimeout + timeoutCalls = [] + + // Mock setTimeout to capture delays + global.setTimeout = vi.fn((callback: Function, delay: number) => { + timeoutCalls.push({ callback, delay }) + // Return a fake timer ID + return 1 as any + }) as any + }) + + afterEach(() => { + // Restore original setTimeout + global.setTimeout = originalSetTimeout + }) + + it('should calculate exponential backoff correctly', () => { + // Test the exponential backoff calculation + const baseDelay = 2000 + const maxDelay = 30000 + + // Calculate expected delays + const calculateBackoff = (attempt: number) => { + return Math.min(baseDelay * 2 ** attempt, maxDelay) + } + + // Test sequence + expect(calculateBackoff(0)).toBe(2000) // 2^0 * 2000 = 2000 + expect(calculateBackoff(1)).toBe(4000) // 2^1 * 2000 = 4000 + expect(calculateBackoff(2)).toBe(8000) // 2^2 * 2000 = 8000 + expect(calculateBackoff(3)).toBe(16000) // 2^3 * 2000 = 16000 + expect(calculateBackoff(4)).toBe(30000) // 2^4 * 2000 = 32000, capped at 30000 + expect(calculateBackoff(5)).toBe(30000) // 2^5 * 2000 = 64000, capped at 30000 + }) + + it('should verify exponential backoff implementation in socket.ts', async () => { + // Read the actual implementation to verify it's correct + const fs = await import('node:fs') + const path = await import('node:path') + const process = await import('node:process') + const socketPath = path.join(process.cwd(), 'src/viem/utils/rpc/socket.ts') + const socketContent = fs.readFileSync(socketPath, 'utf-8') + + // Verify exponential backoff code exists + expect(socketContent).toContain('2 ** reconnectCount') + expect(socketContent).toContain('30000') // max delay + expect(socketContent).toContain('backoffDelay') + + // Verify it's used in setTimeout + const backoffPattern = + /const\s+backoffDelay\s*=\s*Math\.min\s*\(\s*delay\s*\*\s*2\s*\*\*\s*reconnectCount\s*,\s*30000[^)]*\)/ + expect(socketContent).toMatch(backoffPattern) + }) + + it('should use different delays for different base values', () => { + const calculateBackoff = ( + baseDelay: number, + attempt: number, + maxDelay = 30000, + ) => { + return Math.min(baseDelay * 2 ** attempt, maxDelay) + } + + // Test with 1 second base + expect(calculateBackoff(1000, 0)).toBe(1000) + expect(calculateBackoff(1000, 1)).toBe(2000) + expect(calculateBackoff(1000, 2)).toBe(4000) + expect(calculateBackoff(1000, 3)).toBe(8000) + + // Test with 5 second base + expect(calculateBackoff(5000, 0)).toBe(5000) + expect(calculateBackoff(5000, 1)).toBe(10000) + expect(calculateBackoff(5000, 2)).toBe(20000) + expect(calculateBackoff(5000, 3)).toBe(30000) // Would be 40000 but capped + }) +}) diff --git a/tests/viem/utils/subscription/manager.test.ts b/tests/viem/utils/subscription/manager.test.ts new file mode 100644 index 0000000..cac37f8 --- /dev/null +++ b/tests/viem/utils/subscription/manager.test.ts @@ -0,0 +1,220 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { + getSubscriptionManager, + SubscriptionManager, +} from '../../../../src/viem/utils/subscription/manager' +import type { ManagedSubscription } from '../../../../src/viem/utils/subscription/types' + +describe('Subscription Manager', () => { + let manager: SubscriptionManager + let mockClient: any + + beforeEach(() => { + manager = new SubscriptionManager() + + // Mock client with watch methods and transport + mockClient = { + watchShreds: vi.fn().mockResolvedValue(() => {}), + watchShredEvent: vi.fn().mockResolvedValue(() => {}), + watchContractShredEvent: vi.fn().mockResolvedValue(() => {}), + transport: { + type: 'webSocket', + value: { + riseSubscribe: vi.fn().mockResolvedValue({ + subscriptionId: 'test-sub-123', + unsubscribe: vi.fn().mockResolvedValue({}), + }), + }, + }, + } + }) + + describe('createManagedSubscription', () => { + it('should create a managed subscription for shreds', async () => { + const onShred = vi.fn() + const subscription = await manager.createManagedSubscription(mockClient, { + onShred, + onError: vi.fn(), + }) + + expect(subscription).toBeDefined() + expect(subscription.id).toMatch(/^sub_\d+$/) + expect(subscription.type).toBe('shreds') + expect(mockClient.watchShreds).toHaveBeenCalledWith( + expect.objectContaining({ + onShred: expect.any(Function), + managed: false, + }), + ) + }) + + it('should create a managed subscription for events', async () => { + const onLogs = vi.fn() + const subscription = await manager.createManagedSubscription(mockClient, { + address: '0x123', + onLogs, + onError: vi.fn(), + }) + + expect(subscription).toBeDefined() + expect(subscription.type).toBe('logs') + expect(mockClient.watchShredEvent).toHaveBeenCalledWith( + expect.objectContaining({ + address: '0x123', + onLogs: expect.any(Function), + managed: false, + }), + ) + }) + }) + + describe('ManagedSubscription', () => { + let subscription: ManagedSubscription + let onLogs: any + + beforeEach(async () => { + onLogs = vi.fn() + subscription = await manager.createManagedSubscription(mockClient, { + address: '0x123', + onLogs, + onError: vi.fn(), + }) + }) + + describe('address management', () => { + it('should add addresses dynamically', async () => { + expect(subscription.getAddresses()).toEqual(['0x123']) + + await subscription.addAddress('0x456') + expect(subscription.getAddresses()).toEqual(['0x123', '0x456']) + + // Should restart subscription with new addresses + expect(mockClient.watchShredEvent).toHaveBeenCalledTimes(2) + }) + + it('should not add duplicate addresses', async () => { + await subscription.addAddress('0x123') + expect(subscription.getAddresses()).toEqual(['0x123']) + + // Should not restart subscription + expect(mockClient.watchShredEvent).toHaveBeenCalledTimes(1) + }) + + it('should remove addresses dynamically', async () => { + await subscription.addAddress('0x456') + expect(subscription.getAddresses()).toEqual(['0x123', '0x456']) + + await subscription.removeAddress('0x123') + expect(subscription.getAddresses()).toEqual(['0x456']) + + // Should restart subscription + expect(mockClient.watchShredEvent).toHaveBeenCalledTimes(3) + }) + + it('should handle empty initial addresses', async () => { + const sub = await manager.createManagedSubscription(mockClient, { + onLogs: vi.fn(), + }) + + expect(sub.getAddresses()).toEqual([]) + + await sub.addAddress('0x789') + expect(sub.getAddresses()).toEqual(['0x789']) + }) + }) + + describe('pause/resume', () => { + it('should pause and buffer events', () => { + // Get the wrapped handler + const wrappedHandler = + mockClient.watchShredEvent.mock.calls[0][0].onLogs + + subscription.pause() + expect(subscription.isPaused()).toBe(true) + + // Send events while paused + const event1 = { data: '0x1' } + const event2 = { data: '0x2' } + wrappedHandler(event1) + wrappedHandler(event2) + + // Original handler should not be called + expect(onLogs).not.toHaveBeenCalled() + + // Resume and check buffered events are delivered + subscription.resume() + expect(subscription.isPaused()).toBe(false) + expect(onLogs).toHaveBeenCalledWith(event1) + expect(onLogs).toHaveBeenCalledWith(event2) + }) + }) + + describe('statistics', () => { + it('should track event statistics', () => { + const stats = subscription.getStats() + expect(stats.eventCount).toBe(0) + expect(stats.createdAt).toBeLessThanOrEqual(Date.now()) + expect(stats.lastEventAt).toBeUndefined() + + // Simulate events + const wrappedHandler = + mockClient.watchShredEvent.mock.calls[0][0].onLogs + wrappedHandler({ data: '0x1' }) + wrappedHandler({ data: '0x2' }) + + const newStats = subscription.getStats() + expect(newStats.eventCount).toBe(2) + expect(newStats.lastEventAt).toBeDefined() + }) + }) + + describe('unsubscribe', () => { + it('should call underlying unsubscribe function', async () => { + const unsubscribeFn = vi.fn() + mockClient.watchShredEvent.mockReturnValue(unsubscribeFn) + + const sub = await manager.createManagedSubscription(mockClient, { + onLogs: vi.fn(), + }) + + await sub.unsubscribe() + expect(unsubscribeFn).toHaveBeenCalled() + }) + }) + }) + + describe('updateSubscription', () => { + it('should buffer events during updates', async () => { + const onLogs = vi.fn() + const subscription = await manager.createManagedSubscription(mockClient, { + address: '0x123', + onLogs, + }) + + // Get the first wrapped handler + const firstHandler = mockClient.watchShredEvent.mock.calls[0][0].onLogs + + // Start update (which will set temporary handler) + const updatePromise = subscription.addAddress('0x456') + + // Send event during update + const bufferedEvent = { data: '0xbuffered' } + firstHandler(bufferedEvent) + + // Wait for update to complete + await updatePromise + + // Verify event was replayed + expect(onLogs).toHaveBeenCalledWith(bufferedEvent) + }) + }) + + describe('global instance', () => { + it('should return singleton instance', () => { + const manager1 = getSubscriptionManager() + const manager2 = getSubscriptionManager() + + expect(manager1).toBe(manager2) + }) + }) +})