✨ rabbitmq
This commit is contained in:
parent
cfab1d34d1
commit
421cf4c3db
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@ -9,7 +9,7 @@
|
||||
"type": "node",
|
||||
"request": "launch",
|
||||
"name": "Launch Server",
|
||||
"program": "${workspaceFolder}/dist/index.js",
|
||||
"program": "${workspaceFolder}/dist/start.js",
|
||||
"preLaunchTask": "tsc: build - tsconfig.json",
|
||||
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
|
||||
}
|
||||
|
||||
BIN
package-lock.json
generated
BIN
package-lock.json
generated
Binary file not shown.
@ -13,8 +13,9 @@
|
||||
"author": "Fosscord",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@fosscord/server-util": "^1.3.43",
|
||||
"@fosscord/server-util": "^1.3.51",
|
||||
"ajv": "^8.5.0",
|
||||
"amqplib": "^0.8.0",
|
||||
"dotenv": "^8.2.0",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"lambert-server": "^1.2.8",
|
||||
@ -26,6 +27,7 @@
|
||||
"ws": "^7.4.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.8.1",
|
||||
"@types/jsonwebtoken": "^8.5.0",
|
||||
"@types/mongoose-autopopulate": "^0.10.1",
|
||||
"@types/uuid": "^8.3.0",
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import "missing-native-js-functions";
|
||||
import dotenv from "dotenv";
|
||||
dotenv.config();
|
||||
import { Config, db } from "@fosscord/server-util";
|
||||
import { Config, db, RabbitMQ } from "@fosscord/server-util";
|
||||
import { Server as WebSocketServer } from "ws";
|
||||
import { Connection } from "./events/Connection";
|
||||
import http from "http";
|
||||
@ -40,6 +40,7 @@ export class Server {
|
||||
await (db as Promise<Connection>);
|
||||
await this.setupSchema();
|
||||
await Config.init();
|
||||
await RabbitMQ.init();
|
||||
console.log("[Database] connected");
|
||||
if (!this.server.listening) {
|
||||
this.server.listen(this.port);
|
||||
|
||||
@ -40,6 +40,7 @@ export async function Connection(this: Server, socket: WebSocket, request: Incom
|
||||
socket.deflate.on("data", (chunk) => socket.send(chunk));
|
||||
}
|
||||
|
||||
socket.permissions = {};
|
||||
socket.sequence = 0;
|
||||
|
||||
setHeartbeat(socket);
|
||||
|
||||
@ -25,8 +25,6 @@ export async function Message(this: WebSocket, buffer: Data) {
|
||||
|
||||
check.call(this, PayloadSchema, data);
|
||||
|
||||
console.log(data);
|
||||
|
||||
// @ts-ignore
|
||||
const OPCodeHandler = OPCodeHandlers[data.op];
|
||||
if (!OPCodeHandler) {
|
||||
@ -36,6 +34,8 @@ export async function Message(this: WebSocket, buffer: Data) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log("got: " + OPCodeHandler.name);
|
||||
|
||||
try {
|
||||
return await OPCodeHandler.call(this, data);
|
||||
} catch (error) {
|
||||
|
||||
@ -1,8 +1,19 @@
|
||||
import { db, Event, MongooseCache, UserModel, getPermission, Permissions, ChannelModel } from "@fosscord/server-util";
|
||||
import {
|
||||
db,
|
||||
Event,
|
||||
MongooseCache,
|
||||
UserModel,
|
||||
getPermission,
|
||||
Permissions,
|
||||
ChannelModel,
|
||||
RabbitMQ,
|
||||
EVENT,
|
||||
} from "@fosscord/server-util";
|
||||
import { OPCODES } from "../util/Constants";
|
||||
import { Send } from "../util/Send";
|
||||
import WebSocket from "../util/WebSocket";
|
||||
import "missing-native-js-functions";
|
||||
import { ConsumeMessage } from "amqplib";
|
||||
|
||||
// TODO: close connection on Invalidated Token
|
||||
// TODO: check intent
|
||||
@ -35,40 +46,176 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = [])
|
||||
];
|
||||
}
|
||||
|
||||
// TODO: use already required guilds/channels of Identify and don't fetch them again
|
||||
export async function setupListener(this: WebSocket) {
|
||||
const channels = await ChannelModel.find({ recipient_ids: this.user_id }, { id: true }).exec();
|
||||
console.log(
|
||||
"subscribe to channels",
|
||||
channels.map((x) => x.id)
|
||||
);
|
||||
const user = await UserModel.findOne({ id: this.user_id }).lean().exec();
|
||||
var guilds = user!.guilds;
|
||||
const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec();
|
||||
const channels = await ChannelModel.find(
|
||||
{ $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] },
|
||||
{ id: true, permission_overwrites: true }
|
||||
).exec();
|
||||
const dm_channels = channels.filter((x) => !x.guild_id);
|
||||
const guild_channels = channels.filter((x) => x.guild_id);
|
||||
|
||||
const eventStream = new MongooseCache(
|
||||
db.collection("events"),
|
||||
getPipeline.call(
|
||||
this,
|
||||
guilds,
|
||||
channels.map((x) => x.id)
|
||||
),
|
||||
{
|
||||
onlyEvents: true,
|
||||
if (RabbitMQ.connection) {
|
||||
this.rabbitCh = await RabbitMQ.connection.createChannel();
|
||||
this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this)));
|
||||
|
||||
for (const channel of dm_channels) {
|
||||
this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this)));
|
||||
}
|
||||
);
|
||||
for (const guild of user.guilds) {
|
||||
// contains guild and dm channels
|
||||
|
||||
await eventStream.init();
|
||||
eventStream.on("insert", (document: Event) => dispatch.call(this, document, { eventStream, guilds }));
|
||||
getPermission(this.user_id, guild)
|
||||
.then((x) => {
|
||||
this.permissions[guild] = x;
|
||||
this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this)));
|
||||
for (const channel of guild_channels) {
|
||||
if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) {
|
||||
this.rabbitCh!.assertQueue(channel.id).then(() =>
|
||||
this.rabbitCh!.consume(channel.id, consume.bind(this))
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
.catch((e) => {});
|
||||
}
|
||||
|
||||
this.once("close", () => eventStream.destroy());
|
||||
this.once("close", () => {
|
||||
this.rabbitCh!.close();
|
||||
});
|
||||
} else {
|
||||
const eventStream = new MongooseCache(
|
||||
db.collection("events"),
|
||||
getPipeline.call(
|
||||
this,
|
||||
user.guilds,
|
||||
channels.map((x) => x.id)
|
||||
),
|
||||
{
|
||||
onlyEvents: true,
|
||||
}
|
||||
);
|
||||
|
||||
await eventStream.init();
|
||||
eventStream.on("insert", (document: Event) =>
|
||||
dispatch.call(this, document, { eventStream, guilds: user.guilds })
|
||||
);
|
||||
|
||||
this.once("close", () => eventStream.destroy());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use rabbitmq to only receive events that are included in intents
|
||||
function consume(this: WebSocket, opts: ConsumeMessage | null) {
|
||||
if (!opts) return;
|
||||
if (!this.rabbitCh) return;
|
||||
const data = JSON.parse(opts.content.toString());
|
||||
const id = data.id as string;
|
||||
const event = opts.properties.type as EVENT;
|
||||
const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm
|
||||
|
||||
console.log("rabbitmq event", event);
|
||||
|
||||
// subscription managment
|
||||
switch (event) {
|
||||
case "CHANNEL_DELETE":
|
||||
case "GUILD_DELETE":
|
||||
this.rabbitCh.cancel(id);
|
||||
break;
|
||||
case "CHANNEL_CREATE":
|
||||
// TODO: check if user has permission to channel
|
||||
case "GUILD_CREATE":
|
||||
this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
|
||||
break;
|
||||
case "CHANNEL_UPDATE":
|
||||
// @ts-ignore
|
||||
const exists = this.rabbitCh.consumers[id];
|
||||
if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
|
||||
if (exists) break;
|
||||
this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
|
||||
} else {
|
||||
if (!exists) break;
|
||||
this.rabbitCh.cancel(id);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// permission checking
|
||||
switch (event) {
|
||||
case "INVITE_CREATE":
|
||||
case "INVITE_DELETE":
|
||||
case "GUILD_INTEGRATIONS_UPDATE":
|
||||
if (!permission.has("MANAGE_GUILD")) return;
|
||||
break;
|
||||
case "WEBHOOKS_UPDATE":
|
||||
if (!permission.has("MANAGE_WEBHOOKS")) return;
|
||||
break;
|
||||
case "GUILD_MEMBER_ADD":
|
||||
case "GUILD_MEMBER_REMOVE":
|
||||
case "GUILD_MEMBER_UPDATE":
|
||||
// only send them, if the user subscribed for this part of the member list, or is a bot
|
||||
case "PRESENCE_UPDATE": // exception if user is friend
|
||||
break;
|
||||
case "GUILD_BAN_ADD":
|
||||
case "GUILD_BAN_REMOVE":
|
||||
if (!permission.has("BAN_MEMBERS")) break;
|
||||
break;
|
||||
case "VOICE_STATE_UPDATE":
|
||||
case "MESSAGE_CREATE":
|
||||
case "MESSAGE_DELETE":
|
||||
case "MESSAGE_DELETE_BULK":
|
||||
case "MESSAGE_UPDATE":
|
||||
case "CHANNEL_PINS_UPDATE":
|
||||
case "MESSAGE_REACTION_ADD":
|
||||
case "MESSAGE_REACTION_REMOVE":
|
||||
case "MESSAGE_REACTION_REMOVE_ALL":
|
||||
case "MESSAGE_REACTION_REMOVE_EMOJI":
|
||||
case "TYPING_START":
|
||||
// only gets send if the user is alowed to view the current channel
|
||||
if (!permission.has("VIEW_CHANNEL")) return;
|
||||
break;
|
||||
case "GUILD_CREATE":
|
||||
case "GUILD_DELETE":
|
||||
case "GUILD_UPDATE":
|
||||
case "GUILD_ROLE_CREATE":
|
||||
case "GUILD_ROLE_UPDATE":
|
||||
case "GUILD_ROLE_DELETE":
|
||||
case "CHANNEL_CREATE":
|
||||
case "CHANNEL_DELETE":
|
||||
case "CHANNEL_UPDATE":
|
||||
case "GUILD_EMOJI_UPDATE":
|
||||
case "READY": // will be sent by the gateway
|
||||
case "USER_UPDATE":
|
||||
case "APPLICATION_COMMAND_CREATE":
|
||||
case "APPLICATION_COMMAND_DELETE":
|
||||
case "APPLICATION_COMMAND_UPDATE":
|
||||
default:
|
||||
// always gets sent
|
||||
// Any events not defined in an intent are considered "passthrough" and will always be sent
|
||||
break;
|
||||
}
|
||||
|
||||
Send(this, {
|
||||
op: OPCODES.Dispatch,
|
||||
t: event,
|
||||
d: data,
|
||||
s: this.sequence++,
|
||||
});
|
||||
this.rabbitCh.ack(opts);
|
||||
}
|
||||
|
||||
// TODO: cache permission
|
||||
// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries.
|
||||
// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes.
|
||||
// TODO: only subscribe for events that are in the connection intents
|
||||
// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes)
|
||||
|
||||
export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) {
|
||||
var permission = new Permissions("ADMINISTRATOR"); // default permission for dms
|
||||
console.log("event", document);
|
||||
var channel_id = document.channel_id || document.data?.channel_id;
|
||||
|
||||
// TODO: clean up
|
||||
if (document.event === "GUILD_CREATE") {
|
||||
guilds.push(document.data.id);
|
||||
eventStream.changeStream(getPipeline.call(this, guilds));
|
||||
|
||||
@ -23,6 +23,7 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) {
|
||||
const { guild_id, typing, channels, activities } = d as LazyRequest;
|
||||
|
||||
const permissions = await getPermission(this.user_id, guild_id);
|
||||
permissions.hasThrow("VIEW_CHANNEL");
|
||||
|
||||
// MongoDB query to retrieve all hoisted roles and join them with the members and users collection
|
||||
const roles = toObject(
|
||||
@ -70,16 +71,16 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) {
|
||||
const items = [];
|
||||
|
||||
for (const role of roles) {
|
||||
items.push({
|
||||
group: {
|
||||
count: role.members.length,
|
||||
id: role.id === guild_id ? "online" : role.name
|
||||
}
|
||||
});
|
||||
for (const member of role.members) {
|
||||
member.roles.remove(guild_id);
|
||||
items.push({ member });
|
||||
}
|
||||
items.push({
|
||||
group: {
|
||||
count: role.members.length,
|
||||
id: role.id === guild_id ? "online" : role.name,
|
||||
},
|
||||
});
|
||||
for (const member of role.members) {
|
||||
member.roles.remove(guild_id);
|
||||
items.push({ member });
|
||||
}
|
||||
}
|
||||
|
||||
return Send(this, {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { Intents } from "@fosscord/server-util";
|
||||
import { Intents, Permissions } from "@fosscord/server-util";
|
||||
import WS, { Server, Data } from "ws";
|
||||
import { Deflate } from "zlib";
|
||||
import { Channel } from "amqplib";
|
||||
|
||||
interface WebSocket extends WS {
|
||||
version: number;
|
||||
@ -14,6 +15,8 @@ interface WebSocket extends WS {
|
||||
readyTimeout: NodeJS.Timeout;
|
||||
intents: Intents;
|
||||
sequence: number;
|
||||
rabbitCh?: Channel;
|
||||
permissions: Record<string, Permissions>;
|
||||
}
|
||||
|
||||
export default WebSocket;
|
||||
|
||||
Reference in New Issue
Block a user