✨ event emit
This commit is contained in:
parent
0a08938d18
commit
644430921a
BIN
util/package-lock.json
generated
BIN
util/package-lock.json
generated
Binary file not shown.
@ -1,10 +1,11 @@
|
||||
{
|
||||
"name": "@fosscord/server-util",
|
||||
"name": "@fosscord/util",
|
||||
"version": "1.3.52",
|
||||
"description": "Utility functions for the all server repositories",
|
||||
"description": "Utility functions and database models for fosscord",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"link": "npm run build && npm link",
|
||||
"build": "tsc -b ."
|
||||
},
|
||||
"repository": {
|
||||
@ -36,7 +37,7 @@
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"missing-native-js-functions": "^1.2.2",
|
||||
"mongodb": "^3.6.9",
|
||||
"mongoose": "^5.12.3",
|
||||
"mongoose": "^5.13.7",
|
||||
"mongoose-autopopulate": "^0.12.3",
|
||||
"typescript": "^4.1.3"
|
||||
},
|
||||
|
||||
@ -12,6 +12,9 @@ type UpdateAggregationStage =
|
||||
type EnforceDocument<T, TMethods> = T extends Document ? T : T & Document & TMethods;
|
||||
|
||||
declare module "mongoose" {
|
||||
interface SchemaOptions {
|
||||
removeResponse?: string[];
|
||||
}
|
||||
interface Model<T, TQueryHelpers = {}, TMethods = {}> {
|
||||
// removed null -> always return document -> throw error if it doesn't exist
|
||||
findOne(
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { Schema, model, Types, Document } from "mongoose";
|
||||
import "missing-native-js-functions";
|
||||
import db, { MongooseCache } from "./Database";
|
||||
import db from "./Database";
|
||||
import { Snowflake } from "./Snowflake";
|
||||
import crypto from "crypto";
|
||||
|
||||
@ -15,6 +15,7 @@ export default {
|
||||
return config as DefaultOptions;
|
||||
},
|
||||
set: function set(val: any) {
|
||||
config = val.merge(config);
|
||||
return db.collection("config").updateOne({}, { $set: val }, { upsert: true });
|
||||
},
|
||||
};
|
||||
|
||||
@ -14,6 +14,9 @@ const connection = mongoose.createConnection(uri, {
|
||||
useFindAndModify: false,
|
||||
});
|
||||
console.log(`[Database] connect: mongodb://${url.username}@${url.host}${url.pathname}${url.search}`);
|
||||
connection.once("open", () => {
|
||||
console.log("[Database] connected");
|
||||
});
|
||||
|
||||
export default <Connection>connection;
|
||||
|
||||
|
||||
94
util/src/util/Event.ts
Normal file
94
util/src/util/Event.ts
Normal file
@ -0,0 +1,94 @@
|
||||
import { Channel, ConsumeMessage } from "amqplib";
|
||||
import { EVENT, Event, EventModel } from "../models";
|
||||
import { RabbitMQ } from "./RabbitMQ";
|
||||
import EventEmitter from "events";
|
||||
const events = new EventEmitter();
|
||||
|
||||
export async function emitEvent(payload: Omit<Event, "created_at">) {
|
||||
const id = (payload.channel_id || payload.user_id || payload.guild_id) as string;
|
||||
if (!id) console.error("event doesn't contain any id", payload);
|
||||
|
||||
if (RabbitMQ.connection) {
|
||||
const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission
|
||||
await RabbitMQ.channel?.assertExchange(id, "fanout", { durable: false });
|
||||
|
||||
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
|
||||
const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
|
||||
if (!successful) throw new Error("failed to send event");
|
||||
} else {
|
||||
events.emit(id, payload);
|
||||
}
|
||||
}
|
||||
|
||||
export async function initEvent() {
|
||||
await RabbitMQ.init(); // does nothing if rabbitmq is not setup
|
||||
if (RabbitMQ.connection) {
|
||||
} else {
|
||||
// use event emitter
|
||||
}
|
||||
}
|
||||
|
||||
export interface EventOpts extends Event {
|
||||
acknowledge?: Function;
|
||||
channel?: Channel;
|
||||
cancel: Function;
|
||||
}
|
||||
|
||||
export interface ListenEventOpts {
|
||||
channel?: Channel;
|
||||
acknowledge?: boolean;
|
||||
}
|
||||
|
||||
export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) {
|
||||
if (RabbitMQ.connection) {
|
||||
// @ts-ignore
|
||||
return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge });
|
||||
} else {
|
||||
const cancel = () => events.removeListener(event, callback);
|
||||
events.addListener(event, (opts) => callback({ ...opts, cancel }));
|
||||
|
||||
return cancel;
|
||||
}
|
||||
}
|
||||
|
||||
async function rabbitListen(
|
||||
channel: Channel,
|
||||
id: string,
|
||||
callback: (event: EventOpts) => any,
|
||||
opts?: { acknowledge?: boolean }
|
||||
) {
|
||||
await channel.assertExchange(id, "fanout", { durable: false });
|
||||
const q = await channel.assertQueue("", { exclusive: true, autoDelete: true });
|
||||
|
||||
const cancel = () => {
|
||||
channel.cancel(q.queue);
|
||||
channel.unbindQueue(q.queue, id, "");
|
||||
};
|
||||
|
||||
channel.bindQueue(q.queue, id, "");
|
||||
channel.consume(
|
||||
q.queue,
|
||||
(opts) => {
|
||||
if (!opts) return;
|
||||
|
||||
const data = JSON.parse(opts.content.toString());
|
||||
const event = opts.properties.type as EVENT;
|
||||
|
||||
callback({
|
||||
event,
|
||||
data,
|
||||
acknowledge() {
|
||||
channel.ack(opts);
|
||||
},
|
||||
channel,
|
||||
cancel,
|
||||
});
|
||||
// rabbitCh.ack(opts);
|
||||
},
|
||||
{
|
||||
noAck: !opts?.acknowledge,
|
||||
}
|
||||
);
|
||||
|
||||
return cancel;
|
||||
}
|
||||
@ -7,3 +7,4 @@ export * from "./Snowflake";
|
||||
export * from "./UserFlags";
|
||||
export * from "./toBigInt";
|
||||
export * from "./RabbitMQ";
|
||||
export * from "./Event";
|
||||
|
||||
Reference in New Issue
Block a user