ZSTD on gateway

This commit is contained in:
Rory& 2025-10-17 11:58:53 +02:00
parent eef3fee049
commit 37594a5bec
8 changed files with 42 additions and 27 deletions

View File

@ -5,7 +5,7 @@
<scripts> <scripts>
<script value="start:gateway" /> <script value="start:gateway" />
</scripts> </scripts>
<node-interpreter value="/nix/store/dcdc33kdjdhjnzg6rkmd0cx4kpwl8cac-nodejs-20.17.0/bin/node" /> <node-interpreter value="/run/current-system/sw/bin/node" />
<package-manager value="npm" /> <package-manager value="npm" />
<envs /> <envs />
<EXTENSION ID="com.fapiko.jetbrains.plugins.better_direnv.runconfigs.NodeRunConfiguration"> <EXTENSION ID="com.fapiko.jetbrains.plugins.better_direnv.runconfigs.NodeRunConfiguration">

3
.idea/workspace.xml generated
View File

@ -111,6 +111,7 @@
<value> <value>
<map> <map>
<entry key="Start API" value="STOPPED" /> <entry key="Start API" value="STOPPED" />
<entry key="Start Gateway" value="STOPPED" />
<entry key="build" value="STOPPED" /> <entry key="build" value="STOPPED" />
</map> </map>
</value> </value>
@ -118,7 +119,7 @@
</map> </map>
</option> </option>
</component> </component>
<component name="RunManager" selected="npm.Start API"> <component name="RunManager" selected="npm.Start Gateway">
<list> <list>
<item itemvalue="Compound.Start separated" /> <item itemvalue="Compound.Start separated" />
<item itemvalue="npm.Start bundle" /> <item itemvalue="npm.Start bundle" />

BIN
package-lock.json generated

Binary file not shown.

View File

@ -72,6 +72,7 @@
}, },
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.911.0", "@aws-sdk/client-s3": "^3.911.0",
"@toondepauw/node-zstd": "^1.2.0",
"ajv": "^8.17.1", "ajv": "^8.17.1",
"ajv-formats": "^3.0.1", "ajv-formats": "^3.0.1",
"amqplib": "^0.10.9", "amqplib": "^0.10.9",
@ -102,10 +103,12 @@
"murmurhash-js": "^1.0.0", "murmurhash-js": "^1.0.0",
"node-2fa": "^2.0.3", "node-2fa": "^2.0.3",
"node-fetch-commonjs": "^3.3.2", "node-fetch-commonjs": "^3.3.2",
"pg": "^8.16.3",
"picocolors": "^1.1.1", "picocolors": "^1.1.1",
"probe-image-size": "^7.2.3", "probe-image-size": "^7.2.3",
"proxy-agent": "^6.5.0", "proxy-agent": "^6.5.0",
"reflect-metadata": "^0.2.2", "reflect-metadata": "^0.2.2",
"sqlite3": "^5.1.7",
"tslib": "^2.8.1", "tslib": "^2.8.1",
"typeorm": "^0.3.27", "typeorm": "^0.3.27",
"typescript-json-schema": "^0.65.1", "typescript-json-schema": "^0.65.1",
@ -127,8 +130,6 @@
"jimp": "^1.6.0", "jimp": "^1.6.0",
"mailgun.js": "^12.1.1", "mailgun.js": "^12.1.1",
"node-mailjet": "^6.0.9", "node-mailjet": "^6.0.9",
"nodemailer": "^7.0.9", "nodemailer": "^7.0.9"
"pg": "^8.16.3",
"sqlite3": "^5.1.7"
} }
} }

View File

@ -28,7 +28,8 @@ import { Message } from "./Message";
import { Deflate, Inflate } from "fast-zlib"; import { Deflate, Inflate } from "fast-zlib";
import { URL } from "url"; import { URL } from "url";
import { Config, ErlpackType } from "@spacebar/util"; import { Config, ErlpackType } from "@spacebar/util";
// import zlib from "node:zlib"; import zlib from "node:zlib";
import { Decoder, Encoder } from "@toondepauw/node-zstd";
let erlpack: ErlpackType | null = null; let erlpack: ErlpackType | null = null;
try { try {
@ -108,7 +109,7 @@ export async function Connection(
return socket.close(CLOSECODES.Decode_error); return socket.close(CLOSECODES.Decode_error);
if (socket.encoding === "etf" && !erlpack) if (socket.encoding === "etf" && !erlpack)
throw new Error("Erlpack is not installed: 'npm i erlpack'"); throw new Error("Erlpack is not installed: 'npm i @yukikaze-bot/erlpack'");
socket.version = Number(searchParams.get("version")) || 8; socket.version = Number(searchParams.get("version")) || 8;
if (socket.version != 8) if (socket.version != 8)
@ -121,11 +122,8 @@ export async function Connection(
socket.deflate = new Deflate(); socket.deflate = new Deflate();
socket.inflate = new Inflate(); socket.inflate = new Inflate();
} else if (socket.compress === "zstd-stream") { } else if (socket.compress === "zstd-stream") {
// TODO socket.zstdEncoder = new Encoder(6);
return socket.close( socket.zstdDecoder = new Decoder();
CLOSECODES.Decode_error
);
} else { } else {
return socket.close(CLOSECODES.Decode_error); return socket.close(CLOSECODES.Decode_error);
} }

View File

@ -45,9 +45,15 @@ export async function Message(this: WebSocket, buffer: WS.Data) {
) { ) {
data = bigIntJson.parse(buffer.toString()); data = bigIntJson.parse(buffer.toString());
} else if (this.encoding === "json" && buffer instanceof Buffer) { } else if (this.encoding === "json" && buffer instanceof Buffer) {
if (this.inflate) { if (this.compress === "zlib-stream") {
try { try {
buffer = this.inflate.process(buffer); buffer = this.inflate!.process(buffer);
} catch {
buffer = buffer.toString();
}
} else if (this.compress === "zstd-stream") {
try {
buffer = await this.zstdDecoder!.decode(buffer);
} catch { } catch {
buffer = buffer.toString(); buffer = buffer.toString();
} }

View File

@ -43,14 +43,13 @@ const recurseJsonReplace = (json: any) => {
return json; return json;
}; };
export function Send(socket: WebSocket, data: Payload) { export async function Send(socket: WebSocket, data: Payload) {
if (process.env.WS_VERBOSE) if (process.env.WS_VERBOSE)
console.log(`[Websocket] Outgoing message: ${JSON.stringify(data)}`); console.log(`[Websocket] Outgoing message: ${JSON.stringify(data)}`);
if (process.env.WS_DUMP) { if (process.env.WS_DUMP) {
const id = socket.session_id || "unknown"; const id = socket.session_id || "unknown";
(async () => {
await fs.mkdir(path.join("dump", id), { await fs.mkdir(path.join("dump", id), {
recursive: true, recursive: true,
}); });
@ -58,7 +57,6 @@ export function Send(socket: WebSocket, data: Payload) {
path.join("dump", id, `${Date.now()}.out.json`), path.join("dump", id, `${Date.now()}.out.json`),
JSON.stringify(data, null, 2), JSON.stringify(data, null, 2),
); );
})();
} }
let buffer: Buffer | string; let buffer: Buffer | string;
@ -71,9 +69,15 @@ export function Send(socket: WebSocket, data: Payload) {
else if (socket.encoding === "json") else if (socket.encoding === "json")
buffer = JSON.stringify(data, JSONReplacer); buffer = JSON.stringify(data, JSONReplacer);
else return; else return;
// TODO: compression // TODO: compression
if (socket.deflate) { if (socket.compress === "zlib-stream") {
buffer = socket.deflate.process(buffer) as Buffer; buffer = socket.deflate!.process(buffer) as Buffer;
} else if (socket.compress === "zstd-stream") {
if (typeof(buffer) === "string")
buffer = Buffer.from(buffer as string);
buffer = await socket.zstdEncoder!.encode(buffer as Buffer) as Buffer;
} }
return new Promise((res, rej) => { return new Promise((res, rej) => {

View File

@ -20,19 +20,24 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util";
import WS from "ws"; import WS from "ws";
import { Deflate, Inflate } from "fast-zlib"; import { Deflate, Inflate } from "fast-zlib";
import { Capabilities } from "./Capabilities"; import { Capabilities } from "./Capabilities";
import { ZstdCompress } from "zlib";
import { ZstdDecompress } from "node:zlib";
import { Decoder, Encoder } from "@toondepauw/node-zstd";
export interface WebSocket extends WS { export interface WebSocket extends WS {
version: number; version: number;
user_id: string; user_id: string;
session_id: string; session_id: string;
encoding: "etf" | "json"; encoding: "etf" | "json";
compress?: "zlib-stream"; compress?: "zlib-stream" | "zstd-stream";
ipAddress?: string; ipAddress?: string;
userAgent?: string; // for cdn request signing userAgent?: string; // for cdn request signing
shard_count?: bigint; shard_count?: bigint;
shard_id?: bigint; shard_id?: bigint;
deflate?: Deflate; deflate?: Deflate;
inflate?: Inflate; inflate?: Inflate;
zstdEncoder?: Encoder;
zstdDecoder?: Decoder;
heartbeatTimeout: NodeJS.Timeout; heartbeatTimeout: NodeJS.Timeout;
readyTimeout: NodeJS.Timeout; readyTimeout: NodeJS.Timeout;
intents: Intents; intents: Intents;