From 421cf4c3db9a2810285b123dfd82f116f81d0fa7 Mon Sep 17 00:00:00 2001 From: Flam3rboy <34555296+Flam3rboy@users.noreply.github.com> Date: Thu, 12 Aug 2021 16:47:52 +0200 Subject: [PATCH] :sparkles: rabbitmq --- .vscode/launch.json | 2 +- package-lock.json | Bin 136577 -> 144289 bytes package.json | 4 +- src/Server.ts | 3 +- src/events/Connection.ts | 1 + src/events/Message.ts | 4 +- src/listener/listener.ts | 191 ++++++++++++++++++++++++++++++++----- src/opcodes/LazyRequest.ts | 21 ++-- src/util/WebSocket.ts | 5 +- 9 files changed, 193 insertions(+), 38 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 07fd32ac..29bdde13 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,7 +9,7 @@ "type": "node", "request": "launch", "name": "Launch Server", - "program": "${workspaceFolder}/dist/index.js", + "program": "${workspaceFolder}/dist/start.js", "preLaunchTask": "tsc: build - tsconfig.json", "outFiles": ["${workspaceFolder}/dist/**/*.js"] } diff --git a/package-lock.json b/package-lock.json index dac9cbcd5c0a833c0c6afb895a2d1112054e1faa..f3b31763d0c85cc857a77e8c1b1a811b0f0ed653 100644 GIT binary patch delta 3961 zcmd6pS!^4}8OJfQWF3wy+p#W7vMAavT&>AHxV+-p4R2BuFL75q0wmt$v0N^f$1ay5 zi8_d0n)V@Sf+@OhfeoZZf&f8m6(w2}XwabWOVKt!`cMgId^}*r;co{QCK$0+E%B`WDV_dV%xK%10GGj& z_Zf}2{t`3}GVry5H-H)5L{Zo{a#0&GI^BBP$~iWQHrh)`EW_4Yp&E-?#R5-JRRObR zgCT`TGpl4PM@5}$M6_BKvZj?<%Ab*HfmSG=^MtuJ<*h{487 zZOgifUL@;J+ftU5NX75QouP6wiq#ZHInzkj$PEWtiepwHTuj;2$!DL#aO;Bx5{;<*%jRLm=QN(`b%S@ z+E35!6|N^mlhJA<`ZG#CpOXw?MamiYa$WAN)mC=b!hh*zkKx9q?&=j>^Ja(9?@TW~ zZ#EKL(e64{q>`b^NOEp(of*D*wTm^3f6pn4{hq|@E;Z4*00&d7h zDKFJX2T0PhQ3$07Q`~7L6Rd;F3NZGMvubJ^pws9Cd}A` zxA6Icosmz_UAsxW3p8m9AA$Kj17w0{=Tb_*-SBt`*0)h<u1<(|vhpx3)hvVCt*C28+Fm!V8Y9t1Oi&LD3sgI7hyj*pO{@ zG#||%rKpaP}c0+tpDv$0}S;LxjgT z1>Wb1CCV}vm#yi-iY?zRZN!jB+Y^$pP}S`SNqC`IW=mBwLpC!0t^l_(%Xi)!uPHgH zCD$ds%<*~k-uJ*HMBXzk9x}JI;0_z$rwbR>y@486Bs_G3VytUsOu;0d+@kXJb}=P3 zP{veX60Rkx-dK(xNY+xc*HMR4tSNywR&&!tHDwV>7Kaj%R?QK?mGp;P z-f#{tIx~ucUm*scQBPb0r+V>@>=UJnV$_oqEcX4Lm?gC~1b9>7Ue+|j_UVfkne%Vf{h+(WRe{_|(x zEd15>0{n3MbaWrh+8gOGocn!3o7wH*&t-F&N;W6;D7B02-7H71A0(?zd<16I^bR;a z-223YcLT)X%bzUv4c%lP#&GrN|A48^Yb3CZ^s)DtDXuaIm{))CeK5TInJ>kMLmUtt zO^p{o=l9); await this.setupSchema(); await Config.init(); + await RabbitMQ.init(); console.log("[Database] connected"); if (!this.server.listening) { this.server.listen(this.port); diff --git a/src/events/Connection.ts b/src/events/Connection.ts index 473c992c..1ef9fb48 100644 --- a/src/events/Connection.ts +++ b/src/events/Connection.ts @@ -40,6 +40,7 @@ export async function Connection(this: Server, socket: WebSocket, request: Incom socket.deflate.on("data", (chunk) => socket.send(chunk)); } + socket.permissions = {}; socket.sequence = 0; setHeartbeat(socket); diff --git a/src/events/Message.ts b/src/events/Message.ts index 51c5a294..2ca82b3c 100644 --- a/src/events/Message.ts +++ b/src/events/Message.ts @@ -25,8 +25,6 @@ export async function Message(this: WebSocket, buffer: Data) { check.call(this, PayloadSchema, data); - console.log(data); - // @ts-ignore const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { @@ -36,6 +34,8 @@ export async function Message(this: WebSocket, buffer: Data) { return; } + console.log("got: " + OPCodeHandler.name); + try { return await OPCodeHandler.call(this, data); } catch (error) { diff --git a/src/listener/listener.ts b/src/listener/listener.ts index ae15c971..692c12b6 100644 --- a/src/listener/listener.ts +++ b/src/listener/listener.ts @@ -1,8 +1,19 @@ -import { db, Event, MongooseCache, UserModel, getPermission, Permissions, ChannelModel } from "@fosscord/server-util"; +import { + db, + Event, + MongooseCache, + UserModel, + getPermission, + Permissions, + ChannelModel, + RabbitMQ, + EVENT, +} from "@fosscord/server-util"; import { OPCODES } from "../util/Constants"; import { Send } from "../util/Send"; import WebSocket from "../util/WebSocket"; import "missing-native-js-functions"; +import { ConsumeMessage } from "amqplib"; // TODO: close connection on Invalidated Token // TODO: check intent @@ -35,40 +46,176 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) ]; } +// TODO: use already required guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { - const channels = await ChannelModel.find({ recipient_ids: this.user_id }, { id: true }).exec(); - console.log( - "subscribe to channels", - channels.map((x) => x.id) - ); - const user = await UserModel.findOne({ id: this.user_id }).lean().exec(); - var guilds = user!.guilds; + const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); + const channels = await ChannelModel.find( + { $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] }, + { id: true, permission_overwrites: true } + ).exec(); + const dm_channels = channels.filter((x) => !x.guild_id); + const guild_channels = channels.filter((x) => x.guild_id); - const eventStream = new MongooseCache( - db.collection("events"), - getPipeline.call( - this, - guilds, - channels.map((x) => x.id) - ), - { - onlyEvents: true, + if (RabbitMQ.connection) { + this.rabbitCh = await RabbitMQ.connection.createChannel(); + this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this))); + + for (const channel of dm_channels) { + this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this))); } - ); + for (const guild of user.guilds) { + // contains guild and dm channels - await eventStream.init(); - eventStream.on("insert", (document: Event) => dispatch.call(this, document, { eventStream, guilds })); + getPermission(this.user_id, guild) + .then((x) => { + this.permissions[guild] = x; + this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this))); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + this.rabbitCh!.assertQueue(channel.id).then(() => + this.rabbitCh!.consume(channel.id, consume.bind(this)) + ); + } + } + }) + .catch((e) => {}); + } - this.once("close", () => eventStream.destroy()); + this.once("close", () => { + this.rabbitCh!.close(); + }); + } else { + const eventStream = new MongooseCache( + db.collection("events"), + getPipeline.call( + this, + user.guilds, + channels.map((x) => x.id) + ), + { + onlyEvents: true, + } + ); + + await eventStream.init(); + eventStream.on("insert", (document: Event) => + dispatch.call(this, document, { eventStream, guilds: user.guilds }) + ); + + this.once("close", () => eventStream.destroy()); + } +} + +// TODO: use rabbitmq to only receive events that are included in intents +function consume(this: WebSocket, opts: ConsumeMessage | null) { + if (!opts) return; + if (!this.rabbitCh) return; + const data = JSON.parse(opts.content.toString()); + const id = data.id as string; + const event = opts.properties.type as EVENT; + const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm + + console.log("rabbitmq event", event); + + // subscription managment + switch (event) { + case "CHANNEL_DELETE": + case "GUILD_DELETE": + this.rabbitCh.cancel(id); + break; + case "CHANNEL_CREATE": + // TODO: check if user has permission to channel + case "GUILD_CREATE": + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + break; + case "CHANNEL_UPDATE": + // @ts-ignore + const exists = this.rabbitCh.consumers[id]; + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { + if (exists) break; + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + } else { + if (!exists) break; + this.rabbitCh.cancel(id); + } + break; + } + + // permission checking + switch (event) { + case "INVITE_CREATE": + case "INVITE_DELETE": + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + case "PRESENCE_UPDATE": // exception if user is friend + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + this.rabbitCh.ack(opts); } // TODO: cache permission +// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. +// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. +// TODO: only subscribe for events that are in the connection intents +// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { var permission = new Permissions("ADMINISTRATOR"); // default permission for dms console.log("event", document); var channel_id = document.channel_id || document.data?.channel_id; - + // TODO: clean up if (document.event === "GUILD_CREATE") { guilds.push(document.data.id); eventStream.changeStream(getPipeline.call(this, guilds)); diff --git a/src/opcodes/LazyRequest.ts b/src/opcodes/LazyRequest.ts index 8a7bb8c4..b1d553b9 100644 --- a/src/opcodes/LazyRequest.ts +++ b/src/opcodes/LazyRequest.ts @@ -23,6 +23,7 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const { guild_id, typing, channels, activities } = d as LazyRequest; const permissions = await getPermission(this.user_id, guild_id); + permissions.hasThrow("VIEW_CHANNEL"); // MongoDB query to retrieve all hoisted roles and join them with the members and users collection const roles = toObject( @@ -70,16 +71,16 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const items = []; for (const role of roles) { - items.push({ - group: { - count: role.members.length, - id: role.id === guild_id ? "online" : role.name - } - }); - for (const member of role.members) { - member.roles.remove(guild_id); - items.push({ member }); - } + items.push({ + group: { + count: role.members.length, + id: role.id === guild_id ? "online" : role.name, + }, + }); + for (const member of role.members) { + member.roles.remove(guild_id); + items.push({ member }); + } } return Send(this, { diff --git a/src/util/WebSocket.ts b/src/util/WebSocket.ts index 347d78cb..11db47e0 100644 --- a/src/util/WebSocket.ts +++ b/src/util/WebSocket.ts @@ -1,6 +1,7 @@ -import { Intents } from "@fosscord/server-util"; +import { Intents, Permissions } from "@fosscord/server-util"; import WS, { Server, Data } from "ws"; import { Deflate } from "zlib"; +import { Channel } from "amqplib"; interface WebSocket extends WS { version: number; @@ -14,6 +15,8 @@ interface WebSocket extends WS { readyTimeout: NodeJS.Timeout; intents: Intents; sequence: number; + rabbitCh?: Channel; + permissions: Record; } export default WebSocket;