diff --git a/assets/schemas.json b/assets/schemas.json index e95b0ba7..a30c3f46 100755 Binary files a/assets/schemas.json and b/assets/schemas.json differ diff --git a/package-lock.json b/package-lock.json index a4dccca9..b2d2f39b 100644 Binary files a/package-lock.json and b/package-lock.json differ diff --git a/package.json b/package.json index 69779141..c792f768 100644 --- a/package.json +++ b/package.json @@ -67,6 +67,7 @@ "husky": "^9.1.7", "prettier": "^3.5.3", "pretty-quick": "^4.1.1", + "spacebar-webrtc-types": "github:spacebarchat/spacebar-webrtc-types", "typescript": "^5.8.3" }, "dependencies": { @@ -118,7 +119,8 @@ "@spacebar/api": "dist/api", "@spacebar/cdn": "dist/cdn", "@spacebar/gateway": "dist/gateway", - "@spacebar/util": "dist/util" + "@spacebar/util": "dist/util", + "@spacebar/webrtc": "dist/webrtc" }, "optionalDependencies": { "@yukikaze-bot/erlpack": "^1.0.1", @@ -130,4 +132,4 @@ "pg": "^8.14.1", "sqlite3": "^5.1.7" } -} +} \ No newline at end of file diff --git a/src/api/routes/guilds/#guild_id/voice-states/#user_id/index.ts b/src/api/routes/guilds/#guild_id/voice-states/#user_id/index.ts index 60c69075..00820b06 100644 --- a/src/api/routes/guilds/#guild_id/voice-states/#user_id/index.ts +++ b/src/api/routes/guilds/#guild_id/voice-states/#user_id/index.ts @@ -93,7 +93,7 @@ router.patch( voice_state.save(), emitEvent({ event: "VOICE_STATE_UPDATE", - data: voice_state, + data: voice_state.toPublicVoiceState(), guild_id, } as VoiceStateUpdateEvent), ]); diff --git a/src/bundle/Server.ts b/src/bundle/Server.ts index d281120d..c3cfa04a 100644 --- a/src/bundle/Server.ts +++ b/src/bundle/Server.ts @@ -22,6 +22,7 @@ process.on("uncaughtException", console.error); import http from "http"; import * as Api from "@spacebar/api"; import * as Gateway from "@spacebar/gateway"; +import * as Webrtc from "@spacebar/webrtc"; import { CDNServer } from "@spacebar/cdn"; import express from "express"; import { green, bold } from "picocolors"; @@ -30,18 +31,25 @@ import { Config, initDatabase, Sentry } from "@spacebar/util"; const app = express(); const server = http.createServer(); const port = Number(process.env.PORT) || 3001; +const wrtcWsPort = Number(process.env.WRTC_WS_PORT) || 3004; const production = process.env.NODE_ENV == "development" ? false : true; server.on("request", app); const api = new Api.SpacebarServer({ server, port, production, app }); const cdn = new CDNServer({ server, port, production, app }); const gateway = new Gateway.Server({ server, port, production }); +const webrtc = new Webrtc.Server({ + server: undefined, + port: wrtcWsPort, + production, +}); process.on("SIGTERM", async () => { console.log("Shutting down due to SIGTERM"); await gateway.stop(); await cdn.stop(); await api.stop(); + await webrtc.stop(); server.close(); Sentry.close(); }); @@ -54,7 +62,12 @@ async function main() { await new Promise((resolve) => server.listen({ port }, () => resolve(undefined)), ); - await Promise.all([api.start(), cdn.start(), gateway.start()]); + await Promise.all([ + api.start(), + cdn.start(), + gateway.start(), + webrtc.start(), + ]); Sentry.errorHandler(app); diff --git a/src/gateway/Server.ts b/src/gateway/Server.ts index 9fba2d4c..94b4fbe9 100644 --- a/src/gateway/Server.ts +++ b/src/gateway/Server.ts @@ -29,6 +29,7 @@ import { import ws from "ws"; import { Connection } from "./events/Connection"; import http from "http"; +import { cleanupOnStartup } from "./util/Utils"; export class Server { public ws: ws.Server; @@ -74,6 +75,8 @@ export class Server { await Config.init(); await initEvent(); await Sentry.init(); + // temporary fix + await cleanupOnStartup(); if (!this.server.listening) { this.server.listen(this.port); diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts index 311ed32a..dbbc41d8 100644 --- a/src/gateway/events/Close.ts +++ b/src/gateway/events/Close.ts @@ -24,6 +24,8 @@ import { Session, SessionsReplace, User, + VoiceState, + VoiceStateUpdateEvent, } from "@spacebar/util"; export async function Close(this: WebSocket, code: number, reason: Buffer) { @@ -36,6 +38,39 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) { if (this.session_id) { await Session.delete({ session_id: this.session_id }); + + const voiceState = await VoiceState.findOne({ + where: { user_id: this.user_id }, + }); + + // clear the voice state for this session if user was in voice channel + if ( + voiceState && + voiceState.session_id === this.session_id && + voiceState.channel_id + ) { + const prevGuildId = voiceState.guild_id; + const prevChannelId = voiceState.channel_id; + + // @ts-expect-error channel_id is nullable + voiceState.channel_id = null; + // @ts-expect-error guild_id is nullable + voiceState.guild_id = null; + voiceState.self_stream = false; + voiceState.self_video = false; + await voiceState.save(); + + // let the users in previous guild/channel know that user disconnected + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: { + ...voiceState.toPublicVoiceState(), + guild_id: prevGuildId, // have to send the previous guild_id because that's what client expects for disconnect messages + }, + guild_id: prevGuildId, + channel_id: prevChannelId, + } as VoiceStateUpdateEvent); + } } if (this.user_id) { diff --git a/src/gateway/opcodes/Identify.ts b/src/gateway/opcodes/Identify.ts index 4f1c7e2d..fbf579ff 100644 --- a/src/gateway/opcodes/Identify.ts +++ b/src/gateway/opcodes/Identify.ts @@ -183,6 +183,7 @@ export async function onIdentify(this: WebSocket, data: Payload) { "guild.emojis", "guild.roles", "guild.stickers", + "guild.voice_states", "roles", // For these entities, `user` is always just the logged in user we fetched above @@ -485,6 +486,18 @@ export async function onIdentify(this: WebSocket, data: Payload) { }), ); + const readySupplementalGuilds = ( + guilds.filter((guild) => !guild.unavailable) as Guild[] + ).map((guild) => { + return { + voice_states: guild.voice_states.map((state) => + state.toPublicVoiceState(), + ), + id: guild.id, + embedded_activities: [], + }; + }); + // TODO: ready supplemental await Send(this, { op: OPCodes.DISPATCH, @@ -498,7 +511,7 @@ export async function onIdentify(this: WebSocket, data: Payload) { // these merged members seem to be all users currently in vc in your guilds merged_members: [], lazy_private_channels: [], - guilds: [], // { voice_states: [], id: string, embedded_activities: [] } + guilds: readySupplementalGuilds, // { voice_states: [], id: string, embedded_activities: [] } // embedded_activities are users currently in an activity? disclose: [], // Config.get().general.uniqueUsernames ? ["pomelo"] : [] }, diff --git a/src/gateway/opcodes/StreamCreate.ts b/src/gateway/opcodes/StreamCreate.ts new file mode 100644 index 00000000..80325a2f --- /dev/null +++ b/src/gateway/opcodes/StreamCreate.ts @@ -0,0 +1,131 @@ +import { + genVoiceToken, + Payload, + WebSocket, + generateStreamKey, +} from "@spacebar/gateway"; +import { + Channel, + Config, + emitEvent, + Member, + Region, + Snowflake, + Stream, + StreamCreateEvent, + StreamCreateSchema, + StreamServerUpdateEvent, + StreamSession, + VoiceState, + VoiceStateUpdateEvent, +} from "@spacebar/util"; +import { check } from "./instanceOf"; + +export async function onStreamCreate(this: WebSocket, data: Payload) { + check.call(this, StreamCreateSchema, data.d); + const body = data.d as StreamCreateSchema; + + if (body.channel_id.trim().length === 0) return; + + // first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection + const voiceState = await VoiceState.findOne({ + where: { user_id: this.user_id }, + }); + + if (!voiceState || !voiceState.channel_id) return; + + if (body.guild_id) { + voiceState.member = await Member.findOneOrFail({ + where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, + relations: ["user", "roles"], + }); + } + + // TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild + + const channel = await Channel.findOne({ + where: { id: body.channel_id }, + }); + + if ( + !channel || + (body.type === "guild" && channel.guild_id != body.guild_id) + ) + return this.close(4000, "invalid channel"); + + // TODO: actually apply preferred_region from the event payload + const regions = Config.get().regions; + const guildRegion = regions.available.filter( + (r) => r.id === regions.default, + )[0]; + + // first make sure theres no other streams for this user that somehow didnt get cleared + await Stream.delete({ + owner_id: this.user_id, + }); + + // create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY + const stream = Stream.create({ + id: Snowflake.generate(), + owner_id: this.user_id, + channel_id: body.channel_id, + endpoint: guildRegion.endpoint, + }); + + await stream.save(); + + const token = genVoiceToken(); + + const streamSession = StreamSession.create({ + stream_id: stream.id, + user_id: this.user_id, + session_id: this.session_id, + token, + }); + + await streamSession.save(); + + const streamKey = generateStreamKey( + body.type, + body.guild_id, + body.channel_id, + this.user_id, + ); + + await emitEvent({ + event: "STREAM_CREATE", + data: { + stream_key: streamKey, + rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number + viewer_ids: [], + region: guildRegion.name, + paused: false, + }, + user_id: this.user_id, + } as StreamCreateEvent); + + await emitEvent({ + event: "STREAM_SERVER_UPDATE", + data: { + token: streamSession.token, + stream_key: streamKey, + guild_id: null, // not sure why its always null + endpoint: stream.endpoint, + }, + user_id: this.user_id, + } as StreamServerUpdateEvent); + + voiceState.self_stream = true; + await voiceState.save(); + + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: voiceState.toPublicVoiceState(), + guild_id: voiceState.guild_id, + channel_id: voiceState.channel_id, + } as VoiceStateUpdateEvent); +} + +//stream key: +// guild:${guild_id}:${channel_id}:${user_id} +// call:${channel_id}:${user_id} diff --git a/src/gateway/opcodes/StreamDelete.ts b/src/gateway/opcodes/StreamDelete.ts new file mode 100644 index 00000000..76c87029 --- /dev/null +++ b/src/gateway/opcodes/StreamDelete.ts @@ -0,0 +1,76 @@ +import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway"; +import { + emitEvent, + Stream, + StreamDeleteEvent, + StreamDeleteSchema, + VoiceState, + VoiceStateUpdateEvent, +} from "@spacebar/util"; +import { check } from "./instanceOf"; + +export async function onStreamDelete(this: WebSocket, data: Payload) { + check.call(this, StreamDeleteSchema, data.d); + const body = data.d as StreamDeleteSchema; + + let parsedKey: { + type: "guild" | "call"; + channelId: string; + guildId?: string; + userId: string; + }; + + try { + parsedKey = parseStreamKey(body.stream_key); + } catch (e) { + return this.close(4000, "Invalid stream key"); + } + + const { userId, channelId, guildId, type } = parsedKey; + + // when a user selects to stop watching another user stream, this event gets triggered + // just disconnect user without actually deleting stream + if (this.user_id !== userId) { + await emitEvent({ + event: "STREAM_DELETE", + data: { + stream_key: body.stream_key, + }, + user_id: this.user_id, + } as StreamDeleteEvent); + return; + } + + const stream = await Stream.findOne({ + where: { channel_id: channelId, owner_id: userId }, + }); + + if (!stream) return; + + await stream.remove(); + + const voiceState = await VoiceState.findOne({ + where: { user_id: this.user_id }, + }); + + if (voiceState) { + voiceState.self_stream = false; + await voiceState.save(); + + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: voiceState.toPublicVoiceState(), + guild_id: guildId, + channel_id: channelId, + } as VoiceStateUpdateEvent); + } + + await emitEvent({ + event: "STREAM_DELETE", + data: { + stream_key: body.stream_key, + }, + guild_id: guildId, + channel_id: channelId, + } as StreamDeleteEvent); +} diff --git a/src/gateway/opcodes/StreamWatch.ts b/src/gateway/opcodes/StreamWatch.ts new file mode 100644 index 00000000..163dbeaf --- /dev/null +++ b/src/gateway/opcodes/StreamWatch.ts @@ -0,0 +1,98 @@ +import { + genVoiceToken, + parseStreamKey, + Payload, + WebSocket, +} from "@spacebar/gateway"; +import { + Config, + emitEvent, + Stream, + StreamCreateEvent, + StreamServerUpdateEvent, + StreamSession, + StreamWatchSchema, +} from "@spacebar/util"; +import { check } from "./instanceOf"; +import { Not } from "typeorm"; + +export async function onStreamWatch(this: WebSocket, data: Payload) { + check.call(this, StreamWatchSchema, data.d); + const body = data.d as StreamWatchSchema; + + // TODO: apply perms: check if user is allowed to watch + + let parsedKey: { + type: "guild" | "call"; + channelId: string; + guildId?: string; + userId: string; + }; + + try { + parsedKey = parseStreamKey(body.stream_key); + } catch (e) { + return this.close(4000, "Invalid stream key"); + } + + const { type, channelId, guildId, userId } = parsedKey; + + const stream = await Stream.findOne({ + where: { channel_id: channelId, owner_id: userId }, + relations: ["channel"], + }); + + if (!stream) return this.close(4000, "Invalid stream key"); + + if (type === "guild" && stream.channel.guild_id != guildId) + return this.close(4000, "Invalid stream key"); + + const regions = Config.get().regions; + const guildRegion = regions.available.find( + (r) => r.endpoint === stream.endpoint, + ); + + if (!guildRegion) return this.close(4000, "Unknown region"); + + const streamSession = StreamSession.create({ + stream_id: stream.id, + user_id: this.user_id, + session_id: this.session_id, + token: genVoiceToken(), + }); + + await streamSession.save(); + + // get the viewers: stream session tokens for this stream that have been used but not including stream owner + const viewers = await StreamSession.find({ + where: { + stream_id: stream.id, + used: true, + user_id: Not(stream.owner_id), + }, + }); + + await emitEvent({ + event: "STREAM_CREATE", + data: { + stream_key: body.stream_key, + rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number + viewer_ids: viewers.map((v) => v.user_id), + region: guildRegion.name, + paused: false, + }, + channel_id: channelId, + user_id: this.user_id, + } as StreamCreateEvent); + + await emitEvent({ + event: "STREAM_SERVER_UPDATE", + data: { + token: streamSession.token, + stream_key: body.stream_key, + guild_id: null, // not sure why its always null + endpoint: stream.endpoint, + }, + user_id: this.user_id, + } as StreamServerUpdateEvent); +} diff --git a/src/gateway/opcodes/VoiceStateUpdate.ts b/src/gateway/opcodes/VoiceStateUpdate.ts index b45c8203..61dad0cd 100644 --- a/src/gateway/opcodes/VoiceStateUpdate.ts +++ b/src/gateway/opcodes/VoiceStateUpdate.ts @@ -17,19 +17,19 @@ */ import { Payload, WebSocket } from "@spacebar/gateway"; -import { genVoiceToken } from "../util/SessionUtils"; -import { check } from "./instanceOf"; import { Config, emitEvent, Guild, Member, + Region, VoiceServerUpdateEvent, VoiceState, VoiceStateUpdateEvent, VoiceStateUpdateSchema, - Region, } from "@spacebar/util"; +import { genVoiceToken } from "../util/SessionUtils"; +import { check } from "./instanceOf"; // TODO: check if a voice server is setup // Notice: Bot users respect the voice channel's user limit, if set. @@ -39,6 +39,10 @@ import { export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { check.call(this, VoiceStateUpdateSchema, data.d); const body = data.d as VoiceStateUpdateSchema; + const isNew = body.channel_id === null && body.guild_id === null; + let isChanged = false; + + let prevState; let voiceState: VoiceState; try { @@ -54,20 +58,24 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { return; } + if (voiceState.channel_id !== body.channel_id) isChanged = true; + //If a user change voice channel between guild we should send a left event first if ( + voiceState.guild_id && voiceState.guild_id !== body.guild_id && voiceState.session_id === this.session_id ) { await emitEvent({ event: "VOICE_STATE_UPDATE", - data: { ...voiceState, channel_id: null }, + data: { ...voiceState.toPublicVoiceState(), channel_id: null }, guild_id: voiceState.guild_id, }); } //The event send by Discord's client on channel leave has both guild_id and channel_id as null - if (body.guild_id === null) body.guild_id = voiceState.guild_id; + //if (body.guild_id === null) body.guild_id = voiceState.guild_id; + prevState = { ...voiceState }; voiceState.assign(body); } catch (error) { voiceState = VoiceState.create({ @@ -79,39 +87,58 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { }); } - // 'Fix' for this one voice state error. TODO: Find out why this is sent - // It seems to be sent on client load, - // so maybe its trying to find which server you were connected to before disconnecting, if any? - if (body.guild_id == null) { - return; + // if user left voice channel, send an update to previous channel/guild to let other people know that the user left + if ( + voiceState.session_id === this.session_id && + body.guild_id == null && + body.channel_id == null && + (prevState?.guild_id || prevState?.channel_id) + ) { + await emitEvent({ + event: "VOICE_STATE_UPDATE", + data: { + ...voiceState.toPublicVoiceState(), + channel_id: null, + guild_id: null, + }, + guild_id: prevState?.guild_id, + channel_id: prevState?.channel_id, + }); } //TODO the member should only have these properties: hoisted_role, deaf, joined_at, mute, roles, user //TODO the member.user should only have these properties: avatar, discriminator, id, username //TODO this may fail - voiceState.member = await Member.findOneOrFail({ - where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, - relations: ["user", "roles"], - }); + if (body.guild_id) { + voiceState.member = await Member.findOneOrFail({ + where: { id: voiceState.user_id, guild_id: voiceState.guild_id }, + relations: ["user", "roles"], + }); + } //If the session changed we generate a new token if (voiceState.session_id !== this.session_id) voiceState.token = genVoiceToken(); voiceState.session_id = this.session_id; - const { id, ...newObj } = voiceState; + const { member } = voiceState; await Promise.all([ voiceState.save(), emitEvent({ event: "VOICE_STATE_UPDATE", - data: newObj, + data: { + ...voiceState.toPublicVoiceState(), + member: member?.toPublicMember(), + }, guild_id: voiceState.guild_id, + channel_id: voiceState.channel_id, + user_id: voiceState.user_id, } as VoiceStateUpdateEvent), ]); //If it's null it means that we are leaving the channel and this event is not needed - if (voiceState.channel_id !== null) { + if ((isNew || isChanged) && voiceState.channel_id !== null) { const guild = await Guild.findOne({ where: { id: voiceState.guild_id }, }); @@ -133,8 +160,11 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { token: voiceState.token, guild_id: voiceState.guild_id, endpoint: guildRegion.endpoint, + channel_id: voiceState.guild_id + ? undefined + : voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null }, - guild_id: voiceState.guild_id, + user_id: voiceState.user_id, } as VoiceServerUpdateEvent); } } diff --git a/src/gateway/opcodes/index.ts b/src/gateway/opcodes/index.ts index e925134d..cba9e545 100644 --- a/src/gateway/opcodes/index.ts +++ b/src/gateway/opcodes/index.ts @@ -25,6 +25,9 @@ import { onRequestGuildMembers } from "./RequestGuildMembers"; import { onResume } from "./Resume"; import { onVoiceStateUpdate } from "./VoiceStateUpdate"; import { onGuildSubscriptionsBulk } from "./GuildSubscriptionsBulk"; +import { onStreamCreate } from "./StreamCreate"; +import { onStreamDelete } from "./StreamDelete"; +import { onStreamWatch } from "./StreamWatch"; export type OPCodeHandler = (this: WebSocket, data: Payload) => unknown; @@ -41,5 +44,8 @@ export default { // 10: Hello // 13: Dm_update 14: onLazyRequest, + 18: onStreamCreate, + 19: onStreamDelete, + 20: onStreamWatch, 37: onGuildSubscriptionsBulk, } as { [key: number]: OPCodeHandler }; diff --git a/src/gateway/util/Constants.ts b/src/gateway/util/Constants.ts index 5c0f134a..26c90dbe 100644 --- a/src/gateway/util/Constants.ts +++ b/src/gateway/util/Constants.ts @@ -16,8 +16,6 @@ along with this program. If not, see . */ -// import { VoiceOPCodes } from "@spacebar/webrtc"; - export enum OPCODES { Dispatch = 0, Heartbeat = 1, @@ -63,7 +61,7 @@ export enum CLOSECODES { } export interface Payload { - op: OPCODES /* | VoiceOPCodes */; + op: OPCODES; // eslint-disable-next-line @typescript-eslint/no-explicit-any d?: any; s?: number; diff --git a/src/gateway/util/Utils.ts b/src/gateway/util/Utils.ts new file mode 100644 index 00000000..dac1c805 --- /dev/null +++ b/src/gateway/util/Utils.ts @@ -0,0 +1,63 @@ +import { VoiceState } from "@spacebar/util"; + +export function parseStreamKey(streamKey: string): { + type: "guild" | "call"; + channelId: string; + guildId?: string; + userId: string; +} { + const streamKeyArray = streamKey.split(":"); + + const type = streamKeyArray.shift(); + + if (type !== "guild" && type !== "call") { + throw new Error(`Invalid stream key type: ${type}`); + } + + if ( + (type === "guild" && streamKeyArray.length < 3) || + (type === "call" && streamKeyArray.length < 2) + ) + throw new Error(`Invalid stream key: ${streamKey}`); // invalid stream key + + let guildId: string | undefined; + if (type === "guild") { + guildId = streamKeyArray.shift(); + } + const channelId = streamKeyArray.shift(); + const userId = streamKeyArray.shift(); + + if (!channelId || !userId) { + throw new Error(`Invalid stream key: ${streamKey}`); + } + return { type, channelId, guildId, userId }; +} + +export function generateStreamKey( + type: "guild" | "call", + guildId: string | undefined, + channelId: string, + userId: string, +): string { + const streamKey = `${type}${type === "guild" ? `:${guildId}` : ""}:${channelId}:${userId}`; + + return streamKey; +} + +// Temporary cleanup function until shutdown cleanup function is fixed. +// Currently when server is shut down the voice states are not cleared +// TODO: remove this when Server.stop() is fixed so that it waits for all websocket connections to run their +// respective Close event listener function for session cleanup +export async function cleanupOnStartup(): Promise { + await VoiceState.update( + {}, + { + // @ts-expect-error channel_id is nullable + channel_id: null, + // @ts-expect-error guild_id is nullable + guild_id: null, + self_stream: false, + self_video: false, + }, + ); +} diff --git a/src/gateway/util/WebSocket.ts b/src/gateway/util/WebSocket.ts index 8cfc5e08..5c110840 100644 --- a/src/gateway/util/WebSocket.ts +++ b/src/gateway/util/WebSocket.ts @@ -20,7 +20,6 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util"; import WS from "ws"; import { Deflate, Inflate } from "fast-zlib"; import { Capabilities } from "./Capabilities"; -// import { Client } from "@spacebar/webrtc"; export interface WebSocket extends WS { version: number; @@ -42,6 +41,5 @@ export interface WebSocket extends WS { member_events: Record unknown>; listen_options: ListenEventOpts; capabilities?: Capabilities; - // client?: Client; large_threshold: number; } diff --git a/src/gateway/util/index.ts b/src/gateway/util/index.ts index 6ef694d9..5a8c906b 100644 --- a/src/gateway/util/index.ts +++ b/src/gateway/util/index.ts @@ -22,3 +22,4 @@ export * from "./SessionUtils"; export * from "./Heartbeat"; export * from "./WebSocket"; export * from "./Capabilities"; +export * from "./Utils"; diff --git a/src/util/entities/Stream.ts b/src/util/entities/Stream.ts new file mode 100644 index 00000000..2787e3ce --- /dev/null +++ b/src/util/entities/Stream.ts @@ -0,0 +1,42 @@ +import { + Column, + Entity, + JoinColumn, + ManyToOne, + OneToMany, + RelationId, +} from "typeorm"; +import { BaseClass } from "./BaseClass"; +import { dbEngine } from "../util/Database"; +import { User } from "./User"; +import { Channel } from "./Channel"; +import { StreamSession } from "./StreamSession"; + +@Entity({ + name: "streams", + engine: dbEngine, +}) +export class Stream extends BaseClass { + @Column() + @RelationId((stream: Stream) => stream.owner) + owner_id: string; + + @JoinColumn({ name: "owner_id" }) + @ManyToOne(() => User, { + onDelete: "CASCADE", + }) + owner: User; + + @Column() + @RelationId((stream: Stream) => stream.channel) + channel_id: string; + + @JoinColumn({ name: "channel_id" }) + @ManyToOne(() => Channel, { + onDelete: "CASCADE", + }) + channel: Channel; + + @Column() + endpoint: string; +} diff --git a/src/util/entities/StreamSession.ts b/src/util/entities/StreamSession.ts new file mode 100644 index 00000000..6d7ccf9d --- /dev/null +++ b/src/util/entities/StreamSession.ts @@ -0,0 +1,48 @@ +import { + Column, + Entity, + JoinColumn, + ManyToOne, + OneToMany, + RelationId, +} from "typeorm"; +import { BaseClass } from "./BaseClass"; +import { dbEngine } from "../util/Database"; +import { User } from "./User"; +import { Stream } from "./Stream"; + +@Entity({ + name: "stream_sessions", + engine: dbEngine, +}) +export class StreamSession extends BaseClass { + @Column() + @RelationId((session: StreamSession) => session.stream) + stream_id: string; + + @JoinColumn({ name: "stream_id" }) + @ManyToOne(() => Stream, { + onDelete: "CASCADE", + }) + stream: Stream; + + @Column() + @RelationId((session: StreamSession) => session.user) + user_id: string; + + @JoinColumn({ name: "user_id" }) + @ManyToOne(() => User, { + onDelete: "CASCADE", + }) + user: User; + + @Column({ nullable: true }) + token: string; + + // this is for gateway session + @Column() + session_id: string; + + @Column({ default: false }) + used: boolean; +} diff --git a/src/util/entities/VoiceState.ts b/src/util/entities/VoiceState.ts index 83a0af63..549d667f 100644 --- a/src/util/entities/VoiceState.ts +++ b/src/util/entities/VoiceState.ts @@ -24,6 +24,29 @@ import { Member } from "./Member"; import { User } from "./User"; import { dbEngine } from "../util/Database"; +export enum PublicVoiceStateEnum { + user_id, + suppress, + session_id, + self_video, + self_mute, + self_deaf, + self_stream, + request_to_speak_timestamp, + mute, + deaf, + channel_id, + guild_id, +} + +export type PublicVoiceStateKeys = keyof typeof PublicVoiceStateEnum; + +export const PublicVoiceStateProjection = Object.values( + PublicVoiceStateEnum, +).filter((x) => typeof x === "string") as PublicVoiceStateKeys[]; + +export type PublicVoiceState = Pick; + //https://gist.github.com/vassjozsef/e482c65df6ee1facaace8b3c9ff66145#file-voice_state-ex @Entity({ name: "voice_states", @@ -96,4 +119,13 @@ export class VoiceState extends BaseClass { @Column({ nullable: true, default: null }) request_to_speak_timestamp?: Date; + + toPublicVoiceState() { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const voiceState: any = {}; + PublicVoiceStateProjection.forEach((x) => { + voiceState[x] = this[x]; + }); + return voiceState as PublicVoiceState; + } } diff --git a/src/util/entities/index.ts b/src/util/entities/index.ts index b2356aa7..6f132084 100644 --- a/src/util/entities/index.ts +++ b/src/util/entities/index.ts @@ -47,6 +47,8 @@ export * from "./SecurityKey"; export * from "./Session"; export * from "./Sticker"; export * from "./StickerPack"; +export * from "./Stream"; +export * from "./StreamSession"; export * from "./Team"; export * from "./TeamMember"; export * from "./Template"; diff --git a/src/util/index.ts b/src/util/index.ts index dba69812..9a84d1af 100644 --- a/src/util/index.ts +++ b/src/util/index.ts @@ -28,4 +28,4 @@ export * from "./schemas"; export * from "./imports"; export * from "./config"; export * from "./connections"; -export * from "./Signing" \ No newline at end of file +export * from "./Signing"; diff --git a/src/util/interfaces/Event.ts b/src/util/interfaces/Event.ts index 253a013c..5ef3b05d 100644 --- a/src/util/interfaces/Event.ts +++ b/src/util/interfaces/Event.ts @@ -21,7 +21,6 @@ import { ConnectedAccount, Interaction, ApplicationCommand, - VoiceState, Message, PartialEmoji, Invite, @@ -43,6 +42,7 @@ import { ReadyPrivateChannel, GuildOrUnavailable, GuildCreateResponse, + PublicVoiceState, } from "@spacebar/util"; export interface Event { @@ -431,7 +431,7 @@ export interface UserConnectionsUpdateEvent extends Event { export interface VoiceStateUpdateEvent extends Event { event: "VOICE_STATE_UPDATE"; - data: VoiceState & { + data: PublicVoiceState & { member: PublicMember; }; } @@ -440,8 +440,37 @@ export interface VoiceServerUpdateEvent extends Event { event: "VOICE_SERVER_UPDATE"; data: { token: string; - guild_id: string; + guild_id: string | null; endpoint: string; + channel_id?: string; + }; +} + +export interface StreamCreateEvent extends Event { + event: "STREAM_CREATE"; + data: { + stream_key: string; + rtc_server_id: string; + viewer_ids: string[]; + region: string; + paused: boolean; + }; +} + +export interface StreamServerUpdateEvent extends Event { + event: "STREAM_SERVER_UPDATE"; + data: { + token: string; + stream_key: string; + endpoint: string; + guild_id: string | null; + }; +} + +export interface StreamDeleteEvent extends Event { + event: "STREAM_DELETE"; + data: { + stream_key: string; }; } @@ -681,6 +710,9 @@ export type EVENT = | "INTERACTION_CREATE" | "VOICE_STATE_UPDATE" | "VOICE_SERVER_UPDATE" + | "STREAM_CREATE" + | "STREAM_SERVER_UPDATE" + | "STREAM_DELETE" | "APPLICATION_COMMAND_CREATE" | "APPLICATION_COMMAND_UPDATE" | "APPLICATION_COMMAND_DELETE" diff --git a/src/util/migration/postgres/1745625724865-voice.ts b/src/util/migration/postgres/1745625724865-voice.ts new file mode 100644 index 00000000..d9f7101f --- /dev/null +++ b/src/util/migration/postgres/1745625724865-voice.ts @@ -0,0 +1,43 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class Voice1745625724865 implements MigrationInterface { + name = "Voice1745625724865"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "streams" ("id" character varying NOT NULL, "owner_id" character varying NOT NULL, "channel_id" character varying NOT NULL, "endpoint" character varying NOT NULL, CONSTRAINT "PK_40440b6f569ebc02bc71c25c499" PRIMARY KEY ("id"))`, + ); + await queryRunner.query( + `CREATE TABLE "stream_sessions" ("id" character varying NOT NULL, "stream_id" character varying NOT NULL, "user_id" character varying NOT NULL, "token" character varying, "session_id" character varying NOT NULL, "used" boolean NOT NULL DEFAULT false, CONSTRAINT "PK_49bdc3f66394c12478f8371c546" PRIMARY KEY ("id"))`, + ); + await queryRunner.query( + `ALTER TABLE "streams" ADD CONSTRAINT "FK_1b566f9b54d1cda271da53ac82f" FOREIGN KEY ("owner_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "streams" ADD CONSTRAINT "FK_5101f0cded27ff0aae78fc4eed7" FOREIGN KEY ("channel_id") REFERENCES "channels"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "stream_sessions" ADD CONSTRAINT "FK_8b5a028a34dae9ee54af37c9c32" FOREIGN KEY ("stream_id") REFERENCES "streams"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + await queryRunner.query( + `ALTER TABLE "stream_sessions" ADD CONSTRAINT "FK_13ae5c29aff4d0890c54179511a" FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "stream_sessions" DROP CONSTRAINT "FK_13ae5c29aff4d0890c54179511a"`, + ); + await queryRunner.query( + `ALTER TABLE "stream_sessions" DROP CONSTRAINT "FK_8b5a028a34dae9ee54af37c9c32"`, + ); + await queryRunner.query( + `ALTER TABLE "streams" DROP CONSTRAINT "FK_5101f0cded27ff0aae78fc4eed7"`, + ); + await queryRunner.query( + `ALTER TABLE "streams" DROP CONSTRAINT "FK_1b566f9b54d1cda271da53ac82f"`, + ); + await queryRunner.query(`DROP TABLE "stream_sessions"`); + await queryRunner.query(`DROP TABLE "streams"`); + } +} diff --git a/src/util/schemas/SelectProtocolSchema.ts b/src/util/schemas/SelectProtocolSchema.ts index 09283619..d04adf71 100644 --- a/src/util/schemas/SelectProtocolSchema.ts +++ b/src/util/schemas/SelectProtocolSchema.ts @@ -31,7 +31,7 @@ export interface SelectProtocolSchema { type: "audio" | "video"; priority: number; payload_type: number; - rtx_payload_type?: number | null; + rtx_payload_type?: number; }[]; rtc_connection_id?: string; // uuid } diff --git a/src/util/schemas/StreamCreateSchema.ts b/src/util/schemas/StreamCreateSchema.ts new file mode 100644 index 00000000..bb650791 --- /dev/null +++ b/src/util/schemas/StreamCreateSchema.ts @@ -0,0 +1,13 @@ +export interface StreamCreateSchema { + type: "guild" | "call"; + channel_id: string; + guild_id?: string; + preferred_region?: string; +} + +export const StreamCreateSchema = { + type: String, + channel_id: String, + $guild_id: String, + $preferred_region: String, +}; diff --git a/src/util/schemas/StreamDeleteSchema.ts b/src/util/schemas/StreamDeleteSchema.ts new file mode 100644 index 00000000..0e2aff75 --- /dev/null +++ b/src/util/schemas/StreamDeleteSchema.ts @@ -0,0 +1,7 @@ +export interface StreamDeleteSchema { + stream_key: string; +} + +export const StreamDeleteSchema = { + stream_key: String, +}; diff --git a/src/util/schemas/StreamWatchSchema.ts b/src/util/schemas/StreamWatchSchema.ts new file mode 100644 index 00000000..263bb11f --- /dev/null +++ b/src/util/schemas/StreamWatchSchema.ts @@ -0,0 +1,7 @@ +export interface StreamWatchSchema { + stream_key: string; +} + +export const StreamWatchSchema = { + stream_key: String, +}; diff --git a/src/util/schemas/VoiceIdentifySchema.ts b/src/util/schemas/VoiceIdentifySchema.ts index 618d6591..82f846c3 100644 --- a/src/util/schemas/VoiceIdentifySchema.ts +++ b/src/util/schemas/VoiceIdentifySchema.ts @@ -23,8 +23,9 @@ export interface VoiceIdentifySchema { token: string; video?: boolean; streams?: { - type: string; + type: "video" | "audio" | "screen"; rid: string; quality: number; }[]; + max_secure_frames_version?: number; } diff --git a/src/util/schemas/VoiceVideoSchema.ts b/src/util/schemas/VoiceVideoSchema.ts index 0f43adc0..c621431b 100644 --- a/src/util/schemas/VoiceVideoSchema.ts +++ b/src/util/schemas/VoiceVideoSchema.ts @@ -22,7 +22,7 @@ export interface VoiceVideoSchema { rtx_ssrc?: number; user_id?: string; streams?: { - type: "video" | "audio"; + type: "video" | "audio" | "screen"; rid: string; ssrc: number; active: boolean; diff --git a/src/util/schemas/index.ts b/src/util/schemas/index.ts index 9701faec..f19eef0d 100644 --- a/src/util/schemas/index.ts +++ b/src/util/schemas/index.ts @@ -68,6 +68,9 @@ export * from "./responses"; export * from "./RoleModifySchema"; export * from "./RolePositionUpdateSchema"; export * from "./SelectProtocolSchema"; +export * from "./StreamCreateSchema"; +export * from "./StreamDeleteSchema"; +export * from "./StreamWatchSchema"; export * from "./TeamCreateSchema"; export * from "./TemplateCreateSchema"; export * from "./TemplateModifySchema"; diff --git a/src/util/util/Constants.ts b/src/util/util/Constants.ts index 34e925e5..df4501fc 100644 --- a/src/util/util/Constants.ts +++ b/src/util/util/Constants.ts @@ -52,23 +52,6 @@ export const WsStatus = { RESUMING: 8, }; -/** - * The current status of a voice connection. Here are the available statuses: - * * CONNECTED: 0 - * * CONNECTING: 1 - * * AUTHENTICATING: 2 - * * RECONNECTING: 3 - * * DISCONNECTED: 4 - * @typedef {number} VoiceStatus - */ -export const VoiceStatus = { - CONNECTED: 0, - CONNECTING: 1, - AUTHENTICATING: 2, - RECONNECTING: 3, - DISCONNECTED: 4, -}; - export const OPCodes = { DISPATCH: 0, HEARTBEAT: 1, @@ -84,22 +67,6 @@ export const OPCodes = { HEARTBEAT_ACK: 11, }; -export const VoiceOPCodes = { - IDENTIFY: 0, - SELECT_PROTOCOL: 1, - READY: 2, - HEARTBEAT: 3, - SESSION_DESCRIPTION: 4, - SPEAKING: 5, - HEARTBEAT_ACK: 6, - RESUME: 7, - HELLO: 8, - RESUMED: 9, - CLIENT_CONNECT: 12, // incorrect, op 12 is probably used for video - CLIENT_DISCONNECT: 13, // incorrect - VERSION: 16, //not documented -}; - export const Events = { RATE_LIMIT: "rateLimit", CLIENT_READY: "ready", diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index bbc93aac..f56d6664 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -23,9 +23,9 @@ import { EVENT, Event } from "../interfaces"; export const events = new EventEmitter(); export async function emitEvent(payload: Omit) { - const id = (payload.channel_id || - payload.user_id || - payload.guild_id) as string; + const id = (payload.guild_id || + payload.channel_id || + payload.user_id) as string; if (!id) return console.error("event doesn't contain any id", payload); if (RabbitMQ.connection) { diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 0ba2e41b..08f9439f 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -21,6 +21,14 @@ import dotenv from "dotenv"; import http from "http"; import ws from "ws"; import { Connection } from "./events/Connection"; +import { + loadWebRtcLibrary, + mediaServer, + WRTC_PORT_MAX, + WRTC_PORT_MIN, + WRTC_PUBLIC_IP, +} from "./util/MediaServer"; +import { green, yellow } from "picocolors"; dotenv.config(); export class Server { @@ -69,14 +77,25 @@ export class Server { await initDatabase(); await Config.init(); await initEvent(); + + // try to load webrtc library, if failed just don't start webrtc endpoint + try { + await loadWebRtcLibrary(); + } catch (e) { + console.log(`[WebRTC] ${yellow("WEBRTC disabled")}`); + return; + } + + await mediaServer.start(WRTC_PUBLIC_IP, WRTC_PORT_MIN, WRTC_PORT_MAX); if (!this.server.listening) { this.server.listen(this.port); - console.log(`[WebRTC] online on 0.0.0.0:${this.port}`); + console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`); } } async stop() { closeDatabase(); this.server.close(); + mediaServer?.stop(); } } diff --git a/src/webrtc/events/Close.ts b/src/webrtc/events/Close.ts index 7b71e9ce..0419a70e 100644 --- a/src/webrtc/events/Close.ts +++ b/src/webrtc/events/Close.ts @@ -17,11 +17,9 @@ */ import { WebSocket } from "@spacebar/gateway"; -import { Session } from "@spacebar/util"; export async function onClose(this: WebSocket, code: number, reason: string) { console.log("[WebRTC] closed", code, reason.toString()); - if (this.session_id) await Session.delete({ session_id: this.session_id }); this.removeAllListeners(); } diff --git a/src/webrtc/events/Connection.ts b/src/webrtc/events/Connection.ts index 6c5bab03..a068a8fd 100644 --- a/src/webrtc/events/Connection.ts +++ b/src/webrtc/events/Connection.ts @@ -16,11 +16,11 @@ along with this program. If not, see . */ -import { CLOSECODES, Send, setHeartbeat, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES, setHeartbeat } from "@spacebar/gateway"; import { IncomingMessage } from "http"; import { URL } from "url"; import WS from "ws"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, WebRtcWebSocket, Send } from "../util"; import { onClose } from "./Close"; import { onMessage } from "./Message"; @@ -30,7 +30,7 @@ import { onMessage } from "./Message"; export async function Connection( this: WS.Server, - socket: WebSocket, + socket: WebRtcWebSocket, request: IncomingMessage, ) { try { diff --git a/src/webrtc/events/Message.ts b/src/webrtc/events/Message.ts index 22189e95..f503bd1e 100644 --- a/src/webrtc/events/Message.ts +++ b/src/webrtc/events/Message.ts @@ -16,10 +16,10 @@ along with this program. If not, see . */ -import { CLOSECODES, Payload, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES } from "@spacebar/gateway"; import { Tuple } from "lambert-server"; import OPCodeHandlers from "../opcodes"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util"; const PayloadSchema = { op: Number, @@ -28,16 +28,14 @@ const PayloadSchema = { $t: String, }; -export async function onMessage(this: WebSocket, buffer: Buffer) { +export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) { try { - var data: Payload = JSON.parse(buffer.toString()); + const data: VoicePayload = JSON.parse(buffer.toString()); if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated); - // @ts-ignore const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { - // @ts-ignore console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]); // TODO: if all opcodes are implemented comment this out: // this.close(CloseCodes.Unknown_opcode); @@ -49,7 +47,6 @@ export async function onMessage(this: WebSocket, buffer: Buffer) { data.op as VoiceOPCodes, ) ) { - // @ts-ignore console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]); } diff --git a/src/webrtc/opcodes/BackendVersion.ts b/src/webrtc/opcodes/BackendVersion.ts index 60de3e58..c97f4b49 100644 --- a/src/webrtc/opcodes/BackendVersion.ts +++ b/src/webrtc/opcodes/BackendVersion.ts @@ -16,10 +16,12 @@ along with this program. If not, see . */ -import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util"; -export async function onBackendVersion(this: WebSocket, data: Payload) { +export async function onBackendVersion( + this: WebRtcWebSocket, + data: VoicePayload, +) { await Send(this, { op: VoiceOPCodes.VOICE_BACKEND_VERSION, d: { voice: "0.8.43", rtc_worker: "0.3.26" }, diff --git a/src/webrtc/opcodes/Heartbeat.ts b/src/webrtc/opcodes/Heartbeat.ts index 3d8e187b..ef3cae44 100644 --- a/src/webrtc/opcodes/Heartbeat.ts +++ b/src/webrtc/opcodes/Heartbeat.ts @@ -16,16 +16,10 @@ along with this program. If not, see . */ -import { - CLOSECODES, - Payload, - Send, - setHeartbeat, - WebSocket, -} from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { CLOSECODES, setHeartbeat } from "@spacebar/gateway"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util"; -export async function onHeartbeat(this: WebSocket, data: Payload) { +export async function onHeartbeat(this: WebRtcWebSocket, data: VoicePayload) { setHeartbeat(this); if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error); diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index 3f65127e..065813fb 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -16,76 +16,128 @@ along with this program. If not, see . */ -import { CLOSECODES, Payload, Send, WebSocket } from "@spacebar/gateway"; +import { CLOSECODES } from "@spacebar/gateway"; import { + StreamSession, validateSchema, VoiceIdentifySchema, VoiceState, } from "@spacebar/util"; -import { endpoint, getClients, VoiceOPCodes, PublicIP } from "@spacebar/webrtc"; -import SemanticSDP from "semantic-sdp"; -const defaultSDP = require("./sdp.json"); +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + Send, + generateSsrc, +} from "@spacebar/webrtc"; +import { subscribeToProducers } from "./Video"; +import { SSRCs } from "spacebar-webrtc-types"; -export async function onIdentify(this: WebSocket, data: Payload) { +export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) { clearTimeout(this.readyTimeout); const { server_id, user_id, session_id, token, streams, video } = validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema; - const voiceState = await VoiceState.findOne({ - where: { guild_id: server_id, user_id, token, session_id }, + // server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel + // not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db + // luckily we will only have to determine this once + let type: "guild-voice" | "dm-voice" | "stream" = "guild-voice"; + let authenticated = false; + + // first check if its a guild voice connection or DM voice call + let voiceState = await VoiceState.findOne({ + where: [ + { guild_id: server_id, user_id, token, session_id }, + { channel_id: server_id, user_id, token, session_id }, + ], }); - if (!voiceState) return this.close(CLOSECODES.Authentication_failed); + + if (voiceState) { + type = voiceState.guild_id === server_id ? "guild-voice" : "dm-voice"; + authenticated = true; + } else { + // if its not a guild/dm voice connection, check if it is a go live stream + const streamSession = await StreamSession.findOne({ + where: { + stream_id: server_id, + user_id, + token, + session_id, + used: false, + }, + relations: ["stream"], + }); + + if (streamSession) { + type = "stream"; + authenticated = true; + streamSession.used = true; + await streamSession.save(); + + this.once("close", async () => { + await streamSession.remove(); + }); + } + } + + // if it doesnt match any then not valid token + if (!authenticated) return this.close(CLOSECODES.Authentication_failed); this.user_id = user_id; this.session_id = session_id; - const sdp = SemanticSDP.SDPInfo.expand(defaultSDP); - sdp.setDTLS( - SemanticSDP.DTLSInfo.expand({ - setup: "actpass", - hash: "sha-256", - fingerprint: endpoint.getDTLSFingerprint(), - }), + + this.type = type; + + const voiceRoomId = type === "stream" ? server_id : voiceState!.channel_id; + this.webRtcClient = await mediaServer.join( + voiceRoomId, + this.user_id, + this, + type!, ); - this.client = { - websocket: this, - out: { - tracks: new Map(), - }, - in: { - audio_ssrc: 0, - video_ssrc: 0, - rtx_ssrc: 0, - }, - sdp, - channel_id: voiceState.channel_id, - }; - - const clients = getClients(voiceState.channel_id)!; - clients.add(this.client); - this.on("close", () => { - clients.delete(this.client!); + // ice-lite media server relies on this to know when the peer went away + mediaServer.onClientClose(this.webRtcClient!); }); + // once connected subscribe to tracks from other users + this.webRtcClient.emitter.once("connected", async () => { + await subscribeToProducers.call(this); + }); + + // the server generates a unique ssrc for the audio and video stream. Must be unique among users connected to same server + // UDP clients will respect this ssrc, but websocket clients will generate and replace it with their own + const generatedSsrc: SSRCs = { + audio_ssrc: generateSsrc(), + video_ssrc: generateSsrc(), + rtx_ssrc: generateSsrc(), + }; + this.webRtcClient.initIncomingSSRCs(generatedSsrc); + await Send(this, { op: VoiceOPCodes.READY, d: { - streams: [ - // { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false } - ], - ssrc: -1, - port: endpoint.getLocalPort(), + ssrc: generatedSsrc.audio_ssrc, + port: mediaServer.port, modes: [ "aead_aes256_gcm_rtpsize", "aead_aes256_gcm", + "aead_xchacha20_poly1305_rtpsize", "xsalsa20_poly1305_lite_rtpsize", "xsalsa20_poly1305_lite", "xsalsa20_poly1305_suffix", "xsalsa20_poly1305", ], - ip: PublicIP, + ip: mediaServer.ip, experiments: [], + streams: streams?.map((x) => ({ + ...x, + ssrc: generatedSsrc.video_ssrc, + rtx_ssrc: generatedSsrc.rtx_ssrc, + type: "video", // client expects this to be overriden for some reason??? + })), }, }); } diff --git a/src/webrtc/opcodes/SelectProtocol.ts b/src/webrtc/opcodes/SelectProtocol.ts index 0a06e722..4a2ee85d 100644 --- a/src/webrtc/opcodes/SelectProtocol.ts +++ b/src/webrtc/opcodes/SelectProtocol.ts @@ -15,51 +15,41 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ - -import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { SelectProtocolSchema, validateSchema } from "@spacebar/util"; -import { PublicIP, VoiceOPCodes, endpoint } from "@spacebar/webrtc"; -import SemanticSDP, { MediaInfo, SDPInfo } from "semantic-sdp"; +import { + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + mediaServer, + Send, +} from "@spacebar/webrtc"; -export async function onSelectProtocol(this: WebSocket, payload: Payload) { - if (!this.client) return; +export async function onSelectProtocol( + this: WebRtcWebSocket, + payload: VoicePayload, +) { + if (!this.webRtcClient) return; const data = validateSchema( "SelectProtocolSchema", payload.d, ) as SelectProtocolSchema; - const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!); - this.client.sdp!.setICE(offer.getICE()); - this.client.sdp!.setDTLS(offer.getDTLS()); + // UDP protocol not currently supported. Maybe in the future? + if (data.protocol !== "webrtc") + return this.close(4000, "only webrtc protocol supported currently"); - const transport = endpoint.createTransport(this.client.sdp!); - this.client.transport = transport; - transport.setRemoteProperties(this.client.sdp!); - transport.setLocalProperties(this.client.sdp!); - - const dtls = transport.getLocalDTLSInfo(); - const ice = transport.getLocalICEInfo(); - const port = endpoint.getLocalPort(); - const fingerprint = dtls.getHash() + " " + dtls.getFingerprint(); - const candidates = transport.getLocalCandidates(); - const candidate = candidates[0]; - - const answer = - `m=audio ${port} ICE/SDP` + - `a=fingerprint:${fingerprint}` + - `c=IN IP4 ${PublicIP}` + - `a=rtcp:${port}` + - `a=ice-ufrag:${ice.getUfrag()}` + - `a=ice-pwd:${ice.getPwd()}` + - `a=fingerprint:${fingerprint}` + - `a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host`; + const response = await mediaServer.onOffer( + this.webRtcClient, + data.sdp!, + data.codecs ?? [], + ); await Send(this, { op: VoiceOPCodes.SESSION_DESCRIPTION, d: { - video_codec: "H264", - sdp: answer, + video_codec: response.selectedVideoCodec, + sdp: response.sdp, media_session_id: this.session_id, audio_codec: "opus", }, diff --git a/src/webrtc/opcodes/Speaking.ts b/src/webrtc/opcodes/Speaking.ts index 97055e0a..bff0db97 100644 --- a/src/webrtc/opcodes/Speaking.ts +++ b/src/webrtc/opcodes/Speaking.ts @@ -16,25 +16,37 @@ along with this program. If not, see . */ -import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { getClients, VoiceOPCodes } from "../util"; +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + Send, +} from "../util"; // {"speaking":1,"delay":5,"ssrc":2805246727} -export async function onSpeaking(this: WebSocket, data: Payload) { - if (!this.client) return; +export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) { + if (!this.webRtcClient) return; - getClients(this.client.channel_id).forEach((client) => { - if (client === this.client) return; - const ssrc = this.client!.out.tracks.get(client.websocket.user_id); + await Promise.all( + Array.from( + mediaServer.getClientsForRtcServer( + this.webRtcClient.voiceRoomId, + ), + ).map((client) => { + if (client.user_id === this.user_id) return Promise.resolve(); - Send(client.websocket, { - op: VoiceOPCodes.SPEAKING, - d: { - user_id: client.websocket.user_id, - speaking: data.d.speaking, - ssrc: ssrc?.audio_ssrc || 0, - }, - }); - }); + const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id); + + return Send(client.websocket, { + op: VoiceOPCodes.SPEAKING, + d: { + user_id: this.user_id, + speaking: data.d.speaking, + ssrc: ssrc.audio_ssrc ?? 0, + }, + }); + }), + ); } diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 3228d4ee..d78827a4 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -15,137 +15,243 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +import { Stream, validateSchema, VoiceVideoSchema } from "@spacebar/util"; +import { + mediaServer, + VoiceOPCodes, + VoicePayload, + WebRtcWebSocket, + Send, +} from "@spacebar/webrtc"; +import type { WebRtcClient } from "spacebar-webrtc-types"; -import { Payload, Send, WebSocket } from "@spacebar/gateway"; -import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; -import { channels, getClients, VoiceOPCodes } from "@spacebar/webrtc"; -import { IncomingStreamTrack, SSRCs } from "medooze-media-server"; -import SemanticSDP from "semantic-sdp"; +export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) { + if (!this.webRtcClient) return; + + const { voiceRoomId } = this.webRtcClient; -export async function onVideo(this: WebSocket, payload: Payload) { - if (!this.client) return; - const { transport, channel_id } = this.client; - if (!transport) return; const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema; + if (this.type === "stream") { + const stream = await Stream.findOne({ + where: { id: voiceRoomId }, + }); + + if (!stream) return; + + // only the stream owner can publish to a go live stream + if (stream?.owner_id != this.user_id) { + return; + } + } + + const stream = d.streams?.find((element) => element.active); + + const clientsThatNeedUpdate = new Set>(); + const wantsToProduceAudio = d.audio_ssrc !== 0; + const wantsToProduceVideo = d.video_ssrc !== 0 && stream?.active; + + // this is to handle a really weird case where the client sends audio info before the + // dtls ice connection is completely connected. Wait for connection for 3 seconds + // and if no connection, just ignore this message + if (!this.webRtcClient.webrtcConnected) { + if (wantsToProduceAudio) { + try { + await Promise.race([ + new Promise((resolve, reject) => { + this.webRtcClient?.emitter.once("connected", () => + resolve(), + ); + }), + new Promise((resolve, reject) => { + // Reject after 3 seconds if still not connected + setTimeout(() => { + if (this.webRtcClient?.webrtcConnected) resolve(); + else reject(); + }, 3000); + }), + ]); + } catch (e) { + return; // just ignore this message if client didn't connect within 3 seconds + } + } else return; + } + await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); - const id = "stream" + this.user_id; + // first check if we need stop any tracks + if (!wantsToProduceAudio && this.webRtcClient.isProducingAudio()) { + this.webRtcClient.stopPublishingTrack("audio"); + } - var stream = this.client.in.stream!; - if (!stream) { - stream = this.client.transport!.createIncomingStream( - // @ts-ignore - SemanticSDP.StreamInfo.expand({ - id, - // @ts-ignore - tracks: [], - }), - ); - this.client.in.stream = stream; + if (!wantsToProduceVideo && this.webRtcClient.isProducingVideo()) { + this.webRtcClient.stopPublishingTrack("video"); + } - const interval = setInterval(() => { - for (const track of stream.getTracks()) { - for (const layer of Object.values(track.getStats())) { - console.log(track.getId(), layer.total); - } - } - }, 5000); - - stream.on("stopped", () => { - console.log("stream stopped"); - clearInterval(interval); - }); - this.on("close", () => { - transport!.stop(); - }); - const out = transport.createOutgoingStream( - // @ts-ignore - SemanticSDP.StreamInfo.expand({ - id: "out" + this.user_id, - // @ts-ignore - tracks: [], - }), - ); - this.client.out.stream = out; - - const clients = channels.get(channel_id)!; - - clients.forEach((client) => { - if (client.websocket.user_id === this.user_id) return; - if (!client.in.stream) return; - - client.in.stream?.getTracks().forEach((track) => { - attachTrack.call(this, track, client.websocket.user_id); + // check if client has signaled that it will send audio + if (wantsToProduceAudio) { + // check if we are already producing audio, if not, publish a new audio track for it + if (!this.webRtcClient!.isProducingAudio()) { + console.log( + `[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`, + ); + await this.webRtcClient.publishTrack("audio", { + audio_ssrc: d.audio_ssrc, }); - }); + } + + // now check that all clients have subscribed to our audio + for (const client of mediaServer.getClientsForRtcServer( + voiceRoomId, + )) { + if (client.user_id === this.user_id) continue; + + if (!client.isSubscribedToTrack(this.user_id, "audio")) { + console.log( + `[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`, + ); + await client.subscribeToTrack( + this.webRtcClient.user_id, + "audio", + ); + + clientsThatNeedUpdate.add(client); + } + } + } + // check if client has signaled that it will send video + if (wantsToProduceVideo) { + this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response + // check if we are already publishing video, if not, publish a new video track for it + if (!this.webRtcClient!.isProducingVideo()) { + console.log( + `[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`, + ); + await this.webRtcClient.publishTrack("video", { + video_ssrc: d.video_ssrc, + rtx_ssrc: d.rtx_ssrc, + }); + } + + // now check that all clients have subscribed to our video track + for (const client of mediaServer.getClientsForRtcServer( + voiceRoomId, + )) { + if (client.user_id === this.user_id) continue; + + if (!client.isSubscribedToTrack(this.user_id, "video")) { + console.log( + `[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`, + ); + await client.subscribeToTrack( + this.webRtcClient.user_id, + "video", + ); + + clientsThatNeedUpdate.add(client); + } + } } - if (d.audio_ssrc) { - handleSSRC.call(this, "audio", { - media: d.audio_ssrc, - rtx: d.audio_ssrc + 1, - }); - } - if (d.video_ssrc && d.rtx_ssrc) { - handleSSRC.call(this, "video", { - media: d.video_ssrc, - rtx: d.rtx_ssrc, - }); - } -} + await Promise.all( + Array.from(clientsThatNeedUpdate).map((client) => { + const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id); -function attachTrack( - this: WebSocket, - track: IncomingStreamTrack, - user_id: string, -) { - if (!this.client) return; - const outTrack = this.client.transport!.createOutgoingStreamTrack( - track.getMedia(), + return Send(client.websocket, { + op: VoiceOPCodes.VIDEO, + d: { + user_id: this.user_id, + // can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup + audio_ssrc: + ssrcs.audio_ssrc ?? + this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc, + video_ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + streams: d.streams?.map((x) => ({ + ...x, + ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + type: "video", + })), + } as VoiceVideoSchema, + }); + }), ); - outTrack.attachTo(track); - this.client.out.stream!.addTrack(outTrack); - var ssrcs = this.client.out.tracks.get(user_id)!; - if (!ssrcs) - ssrcs = this.client.out.tracks - .set(user_id, { audio_ssrc: 0, rtx_ssrc: 0, video_ssrc: 0 }) - .get(user_id)!; - - if (track.getMedia() === "audio") { - ssrcs.audio_ssrc = outTrack.getSSRCs().media!; - } else if (track.getMedia() === "video") { - ssrcs.video_ssrc = outTrack.getSSRCs().media!; - ssrcs.rtx_ssrc = outTrack.getSSRCs().rtx!; - } - - Send(this, { - op: VoiceOPCodes.VIDEO, - d: { - user_id: user_id, - ...ssrcs, - } as VoiceVideoSchema, - }); } -function handleSSRC(this: WebSocket, type: "audio" | "video", ssrcs: SSRCs) { - if (!this.client) return; - const stream = this.client.in.stream!; - const transport = this.client.transport!; +// check if we are not subscribed to producers in this server, if not, subscribe +export async function subscribeToProducers( + this: WebRtcWebSocket, +): Promise { + if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return; - const id = type + ssrcs.media; - var track = stream.getTrack(id); - if (!track) { - console.log("createIncomingStreamTrack", id); - track = transport.createIncomingStreamTrack(type, { id, ssrcs }); - stream.addTrack(track); + const clients = mediaServer.getClientsForRtcServer( + this.webRtcClient.voiceRoomId, + ); - const clients = getClients(this.client.channel_id)!; - clients.forEach((client) => { - if (client.websocket.user_id === this.user_id) return; - if (!client.out.stream) return; + await Promise.all( + Array.from(clients).map(async (client) => { + let needsUpdate = false; - attachTrack.call(this, track, client.websocket.user_id); - }); - } + if (client.user_id === this.user_id) return; // cannot subscribe to self + + if ( + client.isProducingAudio() && + !this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio") + ) { + await this.webRtcClient!.subscribeToTrack( + client.user_id, + "audio", + ); + needsUpdate = true; + } + + if ( + client.isProducingVideo() && + !this.webRtcClient!.isSubscribedToTrack(client.user_id, "video") + ) { + await this.webRtcClient!.subscribeToTrack( + client.user_id, + "video", + ); + needsUpdate = true; + } + + if (!needsUpdate) return; + + const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser( + client.user_id, + ); + + await Send(this, { + op: VoiceOPCodes.VIDEO, + d: { + user_id: client.user_id, + // can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup + audio_ssrc: + ssrcs.audio_ssrc ?? + client.getIncomingStreamSSRCs().audio_ssrc, + video_ssrc: ssrcs.video_ssrc ?? 0, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + streams: [ + client.videoStream ?? { + type: "video", + rid: "100", + ssrc: ssrcs.video_ssrc ?? 0, + active: client.isProducingVideo(), + quality: 100, + rtx_ssrc: ssrcs.rtx_ssrc ?? 0, + max_bitrate: 2500000, + max_framerate: 20, + max_resolution: { + type: "fixed", + width: 1280, + height: 720, + }, + }, + ], + } as VoiceVideoSchema, + }); + }), + ); } diff --git a/src/webrtc/opcodes/index.ts b/src/webrtc/opcodes/index.ts index 34681055..71c5f2e7 100644 --- a/src/webrtc/opcodes/index.ts +++ b/src/webrtc/opcodes/index.ts @@ -16,8 +16,7 @@ along with this program. If not, see . */ -import { Payload, WebSocket } from "@spacebar/gateway"; -import { VoiceOPCodes } from "../util"; +import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util"; import { onBackendVersion } from "./BackendVersion"; import { onHeartbeat } from "./Heartbeat"; import { onIdentify } from "./Identify"; @@ -25,7 +24,7 @@ import { onSelectProtocol } from "./SelectProtocol"; import { onSpeaking } from "./Speaking"; import { onVideo } from "./Video"; -export type OPCodeHandler = (this: WebSocket, data: Payload) => any; +export type OPCodeHandler = (this: WebRtcWebSocket, data: VoicePayload) => any; export default { [VoiceOPCodes.HEARTBEAT]: onHeartbeat, @@ -34,4 +33,4 @@ export default { [VoiceOPCodes.VIDEO]: onVideo, [VoiceOPCodes.SPEAKING]: onSpeaking, [VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol, -}; +} as { [key: number]: OPCodeHandler }; diff --git a/src/webrtc/opcodes/sdp.json b/src/webrtc/opcodes/sdp.json deleted file mode 100644 index 5f7eba38..00000000 --- a/src/webrtc/opcodes/sdp.json +++ /dev/null @@ -1,420 +0,0 @@ -{ - "version": 0, - "streams": [], - "medias": [ - { - "id": "0", - "type": "audio", - "direction": "sendrecv", - "codecs": [ - { - "codec": "opus", - "type": 111, - "channels": 2, - "params": { - "minptime": "10", - "useinbandfec": "1" - }, - "rtcpfbs": [ - { - "id": "transport-cc" - } - ] - } - ], - "extensions": { - "1": "urn:ietf:params:rtp-hdrext:ssrc-audio-level", - "2": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", - "3": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", - "4": "urn:ietf:params:rtp-hdrext:sdes:mid" - } - }, - { - "id": "1", - "type": "video", - "direction": "sendrecv", - "codecs": [ - { - "codec": "VP8", - "type": 96, - "rtx": 97, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "VP9", - "type": 98, - "rtx": 99, - "params": { - "profile-id": "0" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "VP9", - "type": 100, - "rtx": 101, - "params": { - "profile-id": "2" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "VP9", - "type": 102, - "rtx": 122, - "params": { - "profile-id": "1" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 127, - "rtx": 121, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "1", - "profile-level-id": "42001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 125, - "rtx": 107, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "0", - "profile-level-id": "42001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 108, - "rtx": 109, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "1", - "profile-level-id": "42e01f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 124, - "rtx": 120, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "0", - "profile-level-id": "42e01f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 123, - "rtx": 119, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "1", - "profile-level-id": "4d001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 35, - "rtx": 36, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "0", - "profile-level-id": "4d001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 37, - "rtx": 38, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "1", - "profile-level-id": "f4001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 39, - "rtx": 40, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "0", - "profile-level-id": "f4001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - }, - { - "codec": "H264", - "type": 114, - "rtx": 115, - "params": { - "level-asymmetry-allowed": "1", - "packetization-mode": "1", - "profile-level-id": "64001f" - }, - "rtcpfbs": [ - { - "id": "goog-remb" - }, - { - "id": "transport-cc" - }, - { - "id": "ccm", - "params": ["fir"] - }, - { - "id": "nack" - }, - { - "id": "nack", - "params": ["pli"] - } - ] - } - ], - "extensions": { - "2": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time", - "3": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01", - "4": "urn:ietf:params:rtp-hdrext:sdes:mid", - "5": "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay", - "6": "http://www.webrtc.org/experiments/rtp-hdrext/video-content-type", - "7": "http://www.webrtc.org/experiments/rtp-hdrext/video-timing", - "8": "http://www.webrtc.org/experiments/rtp-hdrext/color-space", - "10": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id", - "11": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id", - "13": "urn:3gpp:video-orientation", - "14": "urn:ietf:params:rtp-hdrext:toffset" - } - } - ], - "candidates": [] -} diff --git a/src/webrtc/util/Constants.ts b/src/webrtc/util/Constants.ts index dba1c511..f11c95fd 100644 --- a/src/webrtc/util/Constants.ts +++ b/src/webrtc/util/Constants.ts @@ -16,13 +16,7 @@ along with this program. If not, see . */ -export enum VoiceStatus { - CONNECTED = 0, - CONNECTING = 1, - AUTHENTICATING = 2, - RECONNECTING = 3, - DISCONNECTED = 4, -} +import { Payload } from "@spacebar/gateway"; export enum VoiceOPCodes { IDENTIFY = 0, @@ -42,3 +36,5 @@ export enum VoiceOPCodes { VOICE_BACKEND_VERSION = 16, CHANNEL_OPTIONS_UPDATE = 17, } + +export type VoicePayload = Omit & { op: VoiceOPCodes }; diff --git a/src/webrtc/util/MediaServer.ts b/src/webrtc/util/MediaServer.ts index 0c12876c..848b3f73 100644 --- a/src/webrtc/util/MediaServer.ts +++ b/src/webrtc/util/MediaServer.ts @@ -16,62 +16,62 @@ along with this program. If not, see . */ -import { WebSocket } from "@spacebar/gateway"; -import MediaServer, { - IncomingStream, - OutgoingStream, - Transport, -} from "medooze-media-server"; -import SemanticSDP from "semantic-sdp"; -MediaServer.enableLog(true); +import type { SignalingDelegate } from "spacebar-webrtc-types"; +import { green, red } from "picocolors"; -export const PublicIP = process.env.PUBLIC_IP || "127.0.0.1"; +export let mediaServer: SignalingDelegate; -try { - const range = process.env.WEBRTC_PORT_RANGE || "4000"; - var ports = range.split("-"); - const min = Number(ports[0]); - const max = Number(ports[1]); +export const WRTC_PUBLIC_IP = process.env.WRTC_PUBLIC_IP ?? "127.0.0.1"; +export const WRTC_PORT_MIN = process.env.WRTC_PORT_MIN + ? parseInt(process.env.WRTC_PORT_MIN) + : 2000; +export const WRTC_PORT_MAX = process.env.WRTC_PORT_MAX + ? parseInt(process.env.WRTC_PORT_MAX) + : 65000; - MediaServer.setPortRange(min, max); -} catch (error) { - console.error( - "Invalid env var: WEBRTC_PORT_RANGE", - process.env.WEBRTC_PORT_RANGE, - error, - ); - process.exit(1); +const selectedWrtcLibrary = process.env.WRTC_LIBRARY; + +// could not find a way to hide stack trace from base Error object +class NoConfiguredLibraryError implements Error { + name: string; + message: string; + stack?: string | undefined; + cause?: unknown; + + constructor(message: string) { + this.name = "NoConfiguredLibraryError"; + this.message = message; + } } -export const endpoint = MediaServer.createEndpoint(PublicIP); +export const loadWebRtcLibrary = async () => { + try { + //mediaServer = require('medooze-spacebar-wrtc'); + if (!selectedWrtcLibrary) + throw new NoConfiguredLibraryError("No library configured in .env"); -export const channels = new Map>(); + mediaServer = new // @ts-ignore + (await import(selectedWrtcLibrary)).default(); -export interface Client { - transport?: Transport; - websocket: WebSocket; - out: { - stream?: OutgoingStream; - tracks: Map< - string, - { - audio_ssrc: number; - video_ssrc: number; - rtx_ssrc: number; - } - >; - }; - in: { - stream?: IncomingStream; - audio_ssrc: number; - video_ssrc: number; - rtx_ssrc: number; - }; - sdp: SemanticSDP.SDPInfo; - channel_id: string; -} + console.log( + `[WebRTC] ${green(`Succesfully loaded ${selectedWrtcLibrary}`)}`, + ); + return Promise.resolve(); + } catch (error) { + console.log( + `[WebRTC] ${red(`Failed to import ${selectedWrtcLibrary}: ${error instanceof NoConfiguredLibraryError ? error.message : ""}`)}`, + ); -export function getClients(channel_id: string) { - if (!channels.has(channel_id)) channels.set(channel_id, new Set()); - return channels.get(channel_id)!; -} + return Promise.reject(); + } +}; + +const MAX_INT32BIT = 2 ** 32; + +let count = 1; +export const generateSsrc = () => { + count++; + if (count >= MAX_INT32BIT) count = 1; + + return count; +}; diff --git a/src/webrtc/util/Send.ts b/src/webrtc/util/Send.ts new file mode 100644 index 00000000..7f8ab4dd --- /dev/null +++ b/src/webrtc/util/Send.ts @@ -0,0 +1,27 @@ +import { JSONReplacer } from "@spacebar/util"; +import { VoicePayload } from "./Constants"; +import { WebRtcWebSocket } from "./WebRtcWebSocket"; + +export function Send(socket: WebRtcWebSocket, data: VoicePayload) { + if (process.env.WRTC_WS_VERBOSE) + console.log(`[WebRTC] Outgoing message: ${JSON.stringify(data)}`); + + let buffer: Buffer | string; + + // TODO: encode circular object + if (socket.encoding === "json") buffer = JSON.stringify(data, JSONReplacer); + else return; + + return new Promise((res, rej) => { + if (socket.readyState !== 1) { + // return rej("socket not open"); + socket.close(); + return; + } + + socket.send(buffer, (err) => { + if (err) return rej(err); + return res(null); + }); + }); +} diff --git a/src/webrtc/util/WebRtcWebSocket.ts b/src/webrtc/util/WebRtcWebSocket.ts new file mode 100644 index 00000000..5bb2da46 --- /dev/null +++ b/src/webrtc/util/WebRtcWebSocket.ts @@ -0,0 +1,7 @@ +import { WebSocket } from "@spacebar/gateway"; +import type { WebRtcClient } from "spacebar-webrtc-types"; + +export interface WebRtcWebSocket extends WebSocket { + type: "guild-voice" | "dm-voice" | "stream"; + webRtcClient?: WebRtcClient; +} diff --git a/src/webrtc/util/index.ts b/src/webrtc/util/index.ts index 66126c1f..264f1ecd 100644 --- a/src/webrtc/util/index.ts +++ b/src/webrtc/util/index.ts @@ -18,3 +18,5 @@ export * from "./Constants"; export * from "./MediaServer"; +export * from "./WebRtcWebSocket"; +export * from "./Send"; diff --git a/tsconfig.json b/tsconfig.json index 63b5e96c..5d408b24 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,5 +1,4 @@ { - "exclude": ["./src/webrtc"], "include": ["./src"], "compilerOptions": { /* Visit https://aka.ms/tsconfig to read more about this file */ @@ -37,7 +36,8 @@ "@spacebar/api*": ["./api"], "@spacebar/gateway*": ["./gateway"], "@spacebar/cdn*": ["./cdn"], - "@spacebar/util*": ["./util"] + "@spacebar/util*": ["./util"], + "@spacebar/webrtc*": ["./webrtc"] } /* Specify a set of entries that re-map imports to additional lookup locations. */, // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ // "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */