From 4ea73bb64edb8a54ebf50fa9a2950d45766d1138 Mon Sep 17 00:00:00 2001 From: Jiralite <33201955+Jiralite@users.noreply.github.com> Date: Sun, 2 Jun 2024 20:53:31 +0100 Subject: [PATCH] revert: refactor: native zlib support (#10314) Revert "refactor: native zlib support (#10243)" This reverts commit 20258f94bf1a62ec1989ef04d839e2800f6e6d28. --- packages/ws/README.md | 5 +- packages/ws/src/utils/constants.ts | 9 +- packages/ws/src/ws/WebSocketManager.ts | 10 +- packages/ws/src/ws/WebSocketShard.ts | 195 ++++++++----------------- 4 files changed, 61 insertions(+), 158 deletions(-) diff --git a/packages/ws/README.md b/packages/ws/README.md index 61be4e93573b..c1d3b70a13e4 100644 --- a/packages/ws/README.md +++ b/packages/ws/README.md @@ -50,10 +50,7 @@ const manager = new WebSocketManager({ intents: 0, // for no intents rest, // uncomment if you have zlib-sync installed and want to use compression - // compression: CompressionMethod.ZlibSync, - - // alternatively, we support compression using node's native `node:zlib` module: - // compression: CompressionMethod.ZlibNative, + // compression: CompressionMethod.ZlibStream, }); manager.on(WebSocketShardEvents.Dispatch, (event) => { diff --git a/packages/ws/src/utils/constants.ts b/packages/ws/src/utils/constants.ts index 2917090eaaff..c901c23b276c 100644 --- a/packages/ws/src/utils/constants.ts +++ b/packages/ws/src/utils/constants.ts @@ -18,19 +18,13 @@ export enum Encoding { * Valid compression methods */ export enum CompressionMethod { - ZlibNative, - ZlibSync, + ZlibStream = 'zlib-stream', } export const DefaultDeviceProperty = `@discordjs/ws [VI]{{inject}}[/VI]` as `@discordjs/ws ${string}`; const getDefaultSessionStore = lazy(() => new Collection()); -export const CompressionParameterMap = { - [CompressionMethod.ZlibNative]: 'zlib-stream', - [CompressionMethod.ZlibSync]: 'zlib-stream', -} as const satisfies Record; - /** * Default options used by the manager */ @@ -52,7 +46,6 @@ export const DefaultWebSocketManagerOptions = { version: APIVersion, encoding: Encoding.JSON, compression: null, - useIdentifyCompression: false, retrieveSessionInfo(shardId) { const store = getDefaultSessionStore(); return store.get(shardId) ?? null; diff --git a/packages/ws/src/ws/WebSocketManager.ts b/packages/ws/src/ws/WebSocketManager.ts index 2bc8a601c006..f4a80bec778c 100644 --- a/packages/ws/src/ws/WebSocketManager.ts +++ b/packages/ws/src/ws/WebSocketManager.ts @@ -96,9 +96,9 @@ export interface OptionalWebSocketManagerOptions { */ buildStrategy(manager: WebSocketManager): IShardingStrategy; /** - * The transport compression method to use - mutually exclusive with `useIdentifyCompression` + * The compression method to use * - * @defaultValue `null` (no transport compression) + * @defaultValue `null` (no compression) */ compression: CompressionMethod | null; /** @@ -176,12 +176,6 @@ export interface OptionalWebSocketManagerOptions { * Function used to store session information for a given shard */ updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable; - /** - * Whether to use the `compress` option when identifying - * - * @defaultValue `false` - */ - useIdentifyCompression: boolean; /** * The gateway version to use * diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index 5126002c4a31..1eeecc2715ff 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -1,10 +1,11 @@ +/* eslint-disable id-length */ import { Buffer } from 'node:buffer'; import { once } from 'node:events'; import { clearInterval, clearTimeout, setInterval, setTimeout } from 'node:timers'; import { setTimeout as sleep } from 'node:timers/promises'; import { URLSearchParams } from 'node:url'; import { TextDecoder } from 'node:util'; -import type * as nativeZlib from 'node:zlib'; +import { inflate } from 'node:zlib'; import { Collection } from '@discordjs/collection'; import { lazy, shouldUseGlobalFetchAndWebSocket } from '@discordjs/util'; import { AsyncQueue } from '@sapphire/async-queue'; @@ -20,20 +21,13 @@ import { type GatewaySendPayload, } from 'discord-api-types/v10'; import { WebSocket, type Data } from 'ws'; -import type * as ZlibSync from 'zlib-sync'; -import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy'; -import { - CompressionMethod, - CompressionParameterMap, - ImportantGatewayOpcodes, - getInitialSendRateLimitState, -} from '../utils/constants.js'; +import type { Inflate } from 'zlib-sync'; +import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy.js'; +import { ImportantGatewayOpcodes, getInitialSendRateLimitState } from '../utils/constants.js'; import type { SessionInfo } from './WebSocketManager.js'; -/* eslint-disable promise/prefer-await-to-then */ +// eslint-disable-next-line promise/prefer-await-to-then const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null)); -const getNativeZlib = lazy(async () => import('node:zlib').then((mod) => mod).catch(() => null)); -/* eslint-enable promise/prefer-await-to-then */ export enum WebSocketShardEvents { Closed = 'closed', @@ -92,9 +86,9 @@ const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket( export class WebSocketShard extends AsyncEventEmitter { private connection: WebSocket | null = null; - private nativeInflate: nativeZlib.Inflate | null = null; + private useIdentifyCompress = false; - private zLibSyncInflate: ZlibSync.Inflate | null = null; + private inflate: Inflate | null = null; private readonly textDecoder = new TextDecoder(); @@ -126,18 +120,6 @@ export class WebSocketShard extends AsyncEventEmitter { #status: WebSocketShardStatus = WebSocketShardStatus.Idle; - private identifyCompressionEnabled = false; - - /** - * @privateRemarks - * - * This is needed because `this.strategy.options.compression` is not an actual reflection of the compression method - * used, but rather the compression method that the user wants to use. This is because the libraries could just be missing. - */ - private get transportCompressionEnabled() { - return this.strategy.options.compression !== null && (this.nativeInflate ?? this.zLibSyncInflate) !== null; - } - public get status(): WebSocketShardStatus { return this.#status; } @@ -179,63 +161,21 @@ export class WebSocketShard extends AsyncEventEmitter { throw new Error("Tried to connect a shard that wasn't idle"); } - const { version, encoding, compression, useIdentifyCompression } = this.strategy.options; - this.identifyCompressionEnabled = useIdentifyCompression; - - // eslint-disable-next-line id-length + const { version, encoding, compression } = this.strategy.options; const params = new URLSearchParams({ v: version, encoding }); - if (compression !== null) { - if (useIdentifyCompression) { - console.warn('WebSocketShard: transport compression is enabled, disabling identify compression'); - this.identifyCompressionEnabled = false; - } - - params.append('compress', CompressionParameterMap[compression]); - - switch (compression) { - case CompressionMethod.ZlibNative: { - const zlib = await getNativeZlib(); - if (zlib) { - const inflate = zlib.createInflate({ - chunkSize: 65_535, - flush: zlib.constants.Z_SYNC_FLUSH, - }); - - inflate.on('error', (error) => { - this.emit(WebSocketShardEvents.Error, { error }); - }); - - this.nativeInflate = inflate; - } else { - console.warn('WebSocketShard: Compression is set to native but node:zlib is not available.'); - params.delete('compress'); - } - - break; - } - - case CompressionMethod.ZlibSync: { - const zlib = await getZlibSync(); - if (zlib) { - this.zLibSyncInflate = new zlib.Inflate({ - chunkSize: 65_535, - to: 'string', - }); - } else { - console.warn('WebSocketShard: Compression is set to zlib-sync, but it is not installed.'); - params.delete('compress'); - } - - break; - } - } - } - - if (this.identifyCompressionEnabled) { - const zlib = await getNativeZlib(); - if (!zlib) { - console.warn('WebSocketShard: Identify compression is enabled, but node:zlib is not available.'); - this.identifyCompressionEnabled = false; + if (compression) { + const zlib = await getZlibSync(); + if (zlib) { + params.append('compress', compression); + this.inflate = new zlib.Inflate({ + chunkSize: 65_535, + to: 'string', + }); + } else if (!this.useIdentifyCompress) { + this.useIdentifyCompress = true; + console.warn( + 'WebSocketShard: Compression is enabled but zlib-sync is not installed, falling back to identify compress', + ); } } @@ -511,29 +451,28 @@ export class WebSocketShard extends AsyncEventEmitter { `shard id: ${this.id.toString()}`, `shard count: ${this.strategy.options.shardCount}`, `intents: ${this.strategy.options.intents}`, - `compression: ${this.transportCompressionEnabled ? CompressionParameterMap[this.strategy.options.compression!] : this.identifyCompressionEnabled ? 'identify' : 'none'}`, + `compression: ${this.inflate ? 'zlib-stream' : this.useIdentifyCompress ? 'identify' : 'none'}`, ]); - const data: GatewayIdentifyData = { + const d: GatewayIdentifyData = { token: this.strategy.options.token, properties: this.strategy.options.identifyProperties, intents: this.strategy.options.intents, - compress: this.identifyCompressionEnabled, + compress: this.useIdentifyCompress, shard: [this.id, this.strategy.options.shardCount], }; if (this.strategy.options.largeThreshold) { - data.large_threshold = this.strategy.options.largeThreshold; + d.large_threshold = this.strategy.options.largeThreshold; } if (this.strategy.options.initialPresence) { - data.presence = this.strategy.options.initialPresence; + d.presence = this.strategy.options.initialPresence; } await this.send({ op: GatewayOpcodes.Identify, - // eslint-disable-next-line id-length - d: data, + d, }); await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout); @@ -551,7 +490,6 @@ export class WebSocketShard extends AsyncEventEmitter { this.replayedEvents = 0; return this.send({ op: GatewayOpcodes.Resume, - // eslint-disable-next-line id-length d: { token: this.strategy.options.token, seq: session.sequence, @@ -569,7 +507,6 @@ export class WebSocketShard extends AsyncEventEmitter { await this.send({ op: GatewayOpcodes.Heartbeat, - // eslint-disable-next-line id-length d: session?.sequence ?? null, }); @@ -577,14 +514,6 @@ export class WebSocketShard extends AsyncEventEmitter { this.isAck = false; } - private parseInflateResult(result: any): GatewayReceivePayload | null { - if (!result) { - return null; - } - - return JSON.parse(typeof result === 'string' ? result : this.textDecoder.decode(result)) as GatewayReceivePayload; - } - private async unpackMessage(data: Data, isBinary: boolean): Promise { // Deal with no compression if (!isBinary) { @@ -599,12 +528,10 @@ export class WebSocketShard extends AsyncEventEmitter { const decompressable = new Uint8Array(data as ArrayBuffer); // Deal with identify compress - if (this.identifyCompressionEnabled) { - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve, reject) => { - const zlib = (await getNativeZlib())!; + if (this.useIdentifyCompress) { + return new Promise((resolve, reject) => { // eslint-disable-next-line promise/prefer-await-to-callbacks - zlib.inflate(decompressable, { chunkSize: 65_535 }, (err, result) => { + inflate(decompressable, { chunkSize: 65_535 }, (err, result) => { if (err) { reject(err); return; @@ -615,50 +542,42 @@ export class WebSocketShard extends AsyncEventEmitter { }); } - // Deal with transport compression - if (this.transportCompressionEnabled) { + // Deal with gw wide zlib-stream compression + if (this.inflate) { + const l = decompressable.length; const flush = - decompressable.length >= 4 && - decompressable.at(-4) === 0x00 && - decompressable.at(-3) === 0x00 && - decompressable.at(-2) === 0xff && - decompressable.at(-1) === 0xff; + l >= 4 && + decompressable[l - 4] === 0x00 && + decompressable[l - 3] === 0x00 && + decompressable[l - 2] === 0xff && + decompressable[l - 1] === 0xff; - if (this.nativeInflate) { - this.nativeInflate.write(decompressable, 'binary'); + const zlib = (await getZlibSync())!; + this.inflate.push(Buffer.from(decompressable), flush ? zlib.Z_SYNC_FLUSH : zlib.Z_NO_FLUSH); - if (!flush) { - return null; - } - - const [result] = await once(this.nativeInflate, 'data'); - return this.parseInflateResult(result); - } else if (this.zLibSyncInflate) { - const zLibSync = (await getZlibSync())!; - this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH); - - if (this.zLibSyncInflate.err) { - this.emit(WebSocketShardEvents.Error, { - error: new Error( - `${this.zLibSyncInflate.err}${this.zLibSyncInflate.msg ? `: ${this.zLibSyncInflate.msg}` : ''}`, - ), - }); - } + if (this.inflate.err) { + this.emit(WebSocketShardEvents.Error, { + error: new Error(`${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`), + }); + } - if (!flush) { - return null; - } + if (!flush) { + return null; + } - const { result } = this.zLibSyncInflate; - return this.parseInflateResult(result); + const { result } = this.inflate; + if (!result) { + return null; } + + return JSON.parse(typeof result === 'string' ? result : this.textDecoder.decode(result)) as GatewayReceivePayload; } this.debug([ 'Received a message we were unable to decompress', `isBinary: ${isBinary.toString()}`, - `identifyCompressionEnabled: ${this.identifyCompressionEnabled.toString()}`, - `inflate: ${this.transportCompressionEnabled ? CompressionMethod[this.strategy.options.compression!] : 'none'}`, + `useIdentifyCompress: ${this.useIdentifyCompress.toString()}`, + `inflate: ${Boolean(this.inflate).toString()}`, ]); return null; @@ -919,7 +838,7 @@ export class WebSocketShard extends AsyncEventEmitter { messages.length > 1 ? `\n${messages .slice(1) - .map((message) => ` ${message}`) + .map((m) => ` ${m}`) .join('\n')}` : '' }`;