diff --git a/src/routes/evm/index.ts b/src/routes/evm/index.ts index a744540..a13db0a 100644 --- a/src/routes/evm/index.ts +++ b/src/routes/evm/index.ts @@ -410,7 +410,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) { } else { const hexBlockNum = removeLeftZeros(blockHex); const hexGas = removeLeftZeros(numToHex(receipt['gas_limit'])); - const hexGasPrice = removeLeftZeros(numToHex(receipt['charged_gas_price'])); + const hexGasPrice = removeLeftZeros(numToHex(receipt['gas_price'])); const hexNonce = removeLeftZeros(numToHex(receipt['nonce'])); const hexTransactionIndex = removeLeftZeros(numToHex(receipt['trx_index'])); const hexValue = addHexPrefix(receipt['value']); @@ -1144,6 +1144,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) { blockNumber: removeLeftZeros(numToHex(receipt['block'])), contractAddress: toChecksumAddress(_contractAddr)?.toLowerCase(), cumulativeGasUsed: removeLeftZeros(_gas), + effectiveGasPrice: removeLeftZeros(numToHex(receipt['charged_gas_price'])), from: toChecksumAddress(receipt['from'])?.toLowerCase(), gasUsed: removeLeftZeros(_gas), logsBloom: _logsBloom, @@ -1188,7 +1189,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) { blockNumber: removeLeftZeros(_blockNum), from: toChecksumAddress(receipt['from']).toLowerCase(), gas: removeLeftZeros(numToHex(receipt.gas_limit)), - gasPrice: removeLeftZeros(numToHex(receipt.charged_gas_price)), + gasPrice: removeLeftZeros(numToHex(receipt.gas_price)), hash: receipt['hash'], input: receipt['input_data'], nonce: removeLeftZeros(numToHex(receipt['nonce'])), diff --git a/src/ws/WebsocketRPC.ts b/src/ws/WebsocketRPC.ts index b5fc324..58ea04c 100644 --- a/src/ws/WebsocketRPC.ts +++ b/src/ws/WebsocketRPC.ts @@ -5,7 +5,6 @@ import {TelosEvmConfig} from "../types"; import Subscription from "./Subscription"; import LogSubscription from "./LogSubscription"; import {createLogger} from "../util/logger"; -import {BigNumber} from 'ethers'; const NEW_HEADS_SUBSCRIPTION = "0x9ce59a13059e417087c02d3236a0b1cd" const logger = createLogger('telos-evm-rpc-ws'); @@ -40,7 +39,7 @@ export default class WebsocketRPC { initUWS(): void { const host = this.config.rpcWebsocketHost; const port = this.config.rpcWebsocketPort; - let ip, origin; + let ip: String, origin : String; this.websocketRPC = uWS.App({}).ws('/evm', { compression: 0, maxPayloadLength: 16 * 1024 * 1024, @@ -48,8 +47,8 @@ export default class WebsocketRPC { upgrade: (res: uWS.HttpResponse, req: uWS.HttpRequest, context: uWS.us_socket_context_t) => { const tRef = process.hrtime.bigint(); const buffer = Buffer.from(res.getRemoteAddressAsText()); - const string = buffer.toString(); - ip = req.getHeader('x-forwarded-for') || string || ''; + const remoteAddress = buffer.toString(); + ip = req.getHeader('x-forwarded-for') || remoteAddress || ''; if (req.getHeader('origin') === 'chrome-extension://nkbihfbeogaeaoehlefnkodbefgpgknn') { origin = 'MetaMask'; @@ -78,9 +77,13 @@ export default class WebsocketRPC { drain: () => { }, close: (ws: uWS.WebSocket) => { + console.log(`WSCLOSE: ${new Date().toISOString()} - ${ip} (0/0) - ${ws.clientInfo.origin} - close`); + if(ws.readyState !== ws.CLOSED){ + ws.close(); + } this.headSubscription.removeWs(ws, true); for (const [subId, sub] of this.logSubscriptions) - sub.removeWs(ws, true) + sub.removeWs(ws, true); }, }).listen(host, port, (token: uWS.us_listen_socket) => { if (token) { @@ -91,21 +94,26 @@ export default class WebsocketRPC { }); } - makeResponse(result, originalMessage) { + makeResponse(result: any, originalMessage: any) { return {"jsonrpc": "2.0", result, id: originalMessage.id}; } - makeError(message, id=null, code=-32600) { + makeError(message: String, id=null, code=-32600) { return {"jsonrpc": "2.0", "error": {code, message}, id}; } - async handleMessage(ws, msg, ip, origin) { + async handleMessage(ws: WebSocket, msg: ArrayBuffer, ip: String, origin: String) { const tRef = process.hrtime.bigint(); const buffer = Buffer.from(msg); const string = buffer.toString(); try { + if(string === "CLOSE" || string === "INVALID_DATA"){ + ws.close(); + return; + } const msgObj = JSON.parse(string); if (!msgObj.method) { + this.attemptSend(ws, JSON.stringify(this.makeError("Invalid Request, no method specified", msgObj.id ? msgObj.id : null))); ws.send(this.makeError("Invalid Request, no method specified", msgObj.id ? msgObj.id : null)); return; } @@ -117,7 +125,7 @@ export default class WebsocketRPC { return; } else if (method === "eth_unsubscribe") { if (!msgObj?.params?.length) { - ws.send(JSON.stringify(this.makeError("Subscription ID should be provided as first parameter", msgObj.id))) + this.attemptSend(ws, JSON.stringify(this.makeError("Subscription ID should be provided as first parameter", msgObj.id))) return; } const subscriptionId = msgObj.params[0]; @@ -132,7 +140,7 @@ export default class WebsocketRPC { this.logSubscriptions.delete(sub.getId()); }); } - ws.send(JSON.stringify(this.makeResponse(true, msgObj))); + this.attemptSend(ws, JSON.stringify(this.makeResponse(true, msgObj))); const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3); console.log(`WSUNSUBSCRIBE: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - ${msgObj.params[0]}`); return; @@ -143,22 +151,41 @@ export default class WebsocketRPC { if (!sub.hasClients()) this.logSubscriptions.delete(sub.getId()); }); - ws.send(JSON.stringify(this.makeResponse(true, msgObj))); + this.attemptSend(ws, JSON.stringify(this.makeResponse(true, msgObj))); const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3); console.log(`WSUNSUBSCRIBE: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - all`); return; } const rpcResponse = await this.rpcHandlerContainer.handler(msgObj, ws.clientInfo); - ws.send(JSON.stringify(rpcResponse)); + this.attemptSend(ws, JSON.stringify(rpcResponse)); const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3); console.log(`WS: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - ${msgObj.method}`); } catch (e) { console.error(`Failed to parse websocket message: ${string} error: ${e.message}`); } } - - async handleSubscription(ws, msgObj): Promise { + async attemptSend(ws: WebSocket, message: string, retries = 0, maxRetries = 30) { + if (ws.readyState !== ws.OPEN) { + console.log('WebSocket is not open. Cannot send message.'); + return; + } + + const success = ws.send(message); + if (!success) { + console.log('Failed to send message'); + if (retries < maxRetries) { + retries++; + console.log(`Failed to send message. Retrying... Attempt ${retries} of ${maxRetries}`); + const self = this; + setTimeout(async () => await self.attemptSend(ws, message, retries, maxRetries), 1000); // Wait 1 second before retrying + } else { + console.log('Failed to send message. Max retries reached. Closing connection.'); + ws.close(1011, 'Repeated send failures'); + } + } + } + async handleSubscription(ws: WebSocket, msgObj: any): Promise { switch (msgObj.params[0]) { case 'logs': this.handleLogSubscription(ws, msgObj); @@ -167,15 +194,15 @@ export default class WebsocketRPC { this.handleNewHeadsSubscription(ws, msgObj); break; default: - ws.send(JSON.stringify(this.makeError(`Subscription type ${msgObj.params[0]} is not supported`, msgObj.id))); + await this.attemptSend(ws, JSON.stringify(this.makeError(`Subscription type ${msgObj.params[0]} is not supported`, msgObj.id))); break; } } - async handleLogSubscription(ws, msgObj): Promise { + async handleLogSubscription(ws: WebSocket, msgObj: any): Promise { const filter = msgObj.params[1]; if(!filter?.address){ - ws.send(JSON.stringify(this.makeError("address should be provided in params", msgObj.id))); + await this.attemptSend(ws, JSON.stringify(this.makeError("address should be provided in params", msgObj.id))); } const subscriptionId = LogSubscription.makeId(filter); if (!this.logSubscriptions.has(subscriptionId)) { @@ -183,12 +210,13 @@ export default class WebsocketRPC { } this.logSubscriptions.get(subscriptionId).addWs(ws); - ws.send(JSON.stringify(this.makeResponse(subscriptionId, msgObj))); + + await this.attemptSend(ws, JSON.stringify(this.makeResponse(subscriptionId, msgObj))); } - async handleNewHeadsSubscription(ws, msgObj): Promise { + async handleNewHeadsSubscription(ws: WebSocket, msgObj: Object): Promise { this.headSubscription.addWs(ws); - ws.send(JSON.stringify(this.makeResponse(this.headSubscription.getId(), msgObj))); + await this.attemptSend(ws, JSON.stringify(this.makeResponse(this.headSubscription.getId(), msgObj))); } handleIndexerMessage(data): void{ @@ -208,7 +236,7 @@ export default class WebsocketRPC { } } - handleRawMessage(data): void { + handleRawMessage(data: any): void { for (const [subId, sub] of this.logSubscriptions) { const tRef = process.hrtime.bigint(); sub.handleRawAction(data); @@ -219,7 +247,7 @@ export default class WebsocketRPC { } } - handleHeadMessage(head): void { + handleHeadMessage(head: any): void { const tRef = process.hrtime.bigint(); if(this.headSubscription.hasClients()){ const headMessage = {