add webrtc support (#1284)

Co-authored-by: MaddyUnderStars <46743919+MaddyUnderStars@users.noreply.github.com>
This commit is contained in:
dank074 2025-06-21 21:41:13 -05:00 committed by GitHub
parent f03c6209a4
commit 526a8da8f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 1227 additions and 782 deletions

Binary file not shown.

BIN
package-lock.json generated

Binary file not shown.

View File

@ -67,6 +67,7 @@
"husky": "^9.1.7",
"prettier": "^3.5.3",
"pretty-quick": "^4.1.1",
"spacebar-webrtc-types": "github:spacebarchat/spacebar-webrtc-types",
"typescript": "^5.8.3"
},
"dependencies": {
@ -118,7 +119,8 @@
"@spacebar/api": "dist/api",
"@spacebar/cdn": "dist/cdn",
"@spacebar/gateway": "dist/gateway",
"@spacebar/util": "dist/util"
"@spacebar/util": "dist/util",
"@spacebar/webrtc": "dist/webrtc"
},
"optionalDependencies": {
"@yukikaze-bot/erlpack": "^1.0.1",
@ -130,4 +132,4 @@
"pg": "^8.14.1",
"sqlite3": "^5.1.7"
}
}
}

View File

@ -93,7 +93,7 @@ router.patch(
voice_state.save(),
emitEvent({
event: "VOICE_STATE_UPDATE",
data: voice_state,
data: voice_state.toPublicVoiceState(),
guild_id,
} as VoiceStateUpdateEvent),
]);

View File

@ -22,6 +22,7 @@ process.on("uncaughtException", console.error);
import http from "http";
import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway";
import * as Webrtc from "@spacebar/webrtc";
import { CDNServer } from "@spacebar/cdn";
import express from "express";
import { green, bold } from "picocolors";
@ -30,18 +31,25 @@ import { Config, initDatabase, Sentry } from "@spacebar/util";
const app = express();
const server = http.createServer();
const port = Number(process.env.PORT) || 3001;
const wrtcWsPort = Number(process.env.WRTC_WS_PORT) || 3004;
const production = process.env.NODE_ENV == "development" ? false : true;
server.on("request", app);
const api = new Api.SpacebarServer({ server, port, production, app });
const cdn = new CDNServer({ server, port, production, app });
const gateway = new Gateway.Server({ server, port, production });
const webrtc = new Webrtc.Server({
server: undefined,
port: wrtcWsPort,
production,
});
process.on("SIGTERM", async () => {
console.log("Shutting down due to SIGTERM");
await gateway.stop();
await cdn.stop();
await api.stop();
await webrtc.stop();
server.close();
Sentry.close();
});
@ -54,7 +62,12 @@ async function main() {
await new Promise((resolve) =>
server.listen({ port }, () => resolve(undefined)),
);
await Promise.all([api.start(), cdn.start(), gateway.start()]);
await Promise.all([
api.start(),
cdn.start(),
gateway.start(),
webrtc.start(),
]);
Sentry.errorHandler(app);

View File

@ -29,6 +29,7 @@ import {
import ws from "ws";
import { Connection } from "./events/Connection";
import http from "http";
import { cleanupOnStartup } from "./util/Utils";
export class Server {
public ws: ws.Server;
@ -74,6 +75,8 @@ export class Server {
await Config.init();
await initEvent();
await Sentry.init();
// temporary fix
await cleanupOnStartup();
if (!this.server.listening) {
this.server.listen(this.port);

View File

@ -24,6 +24,8 @@ import {
Session,
SessionsReplace,
User,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
export async function Close(this: WebSocket, code: number, reason: Buffer) {
@ -36,6 +38,39 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) {
if (this.session_id) {
await Session.delete({ session_id: this.session_id });
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});
// clear the voice state for this session if user was in voice channel
if (
voiceState &&
voiceState.session_id === this.session_id &&
voiceState.channel_id
) {
const prevGuildId = voiceState.guild_id;
const prevChannelId = voiceState.channel_id;
// @ts-expect-error channel_id is nullable
voiceState.channel_id = null;
// @ts-expect-error guild_id is nullable
voiceState.guild_id = null;
voiceState.self_stream = false;
voiceState.self_video = false;
await voiceState.save();
// let the users in previous guild/channel know that user disconnected
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: {
...voiceState.toPublicVoiceState(),
guild_id: prevGuildId, // have to send the previous guild_id because that's what client expects for disconnect messages
},
guild_id: prevGuildId,
channel_id: prevChannelId,
} as VoiceStateUpdateEvent);
}
}
if (this.user_id) {

View File

@ -183,6 +183,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
"guild.emojis",
"guild.roles",
"guild.stickers",
"guild.voice_states",
"roles",
// For these entities, `user` is always just the logged in user we fetched above
@ -485,6 +486,18 @@ export async function onIdentify(this: WebSocket, data: Payload) {
}),
);
const readySupplementalGuilds = (
guilds.filter((guild) => !guild.unavailable) as Guild[]
).map((guild) => {
return {
voice_states: guild.voice_states.map((state) =>
state.toPublicVoiceState(),
),
id: guild.id,
embedded_activities: [],
};
});
// TODO: ready supplemental
await Send(this, {
op: OPCodes.DISPATCH,
@ -498,7 +511,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
// these merged members seem to be all users currently in vc in your guilds
merged_members: [],
lazy_private_channels: [],
guilds: [], // { voice_states: [], id: string, embedded_activities: [] }
guilds: readySupplementalGuilds, // { voice_states: [], id: string, embedded_activities: [] }
// embedded_activities are users currently in an activity?
disclose: [], // Config.get().general.uniqueUsernames ? ["pomelo"] : []
},

View File

@ -0,0 +1,131 @@
import {
genVoiceToken,
Payload,
WebSocket,
generateStreamKey,
} from "@spacebar/gateway";
import {
Channel,
Config,
emitEvent,
Member,
Region,
Snowflake,
Stream,
StreamCreateEvent,
StreamCreateSchema,
StreamServerUpdateEvent,
StreamSession,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf";
export async function onStreamCreate(this: WebSocket, data: Payload) {
check.call(this, StreamCreateSchema, data.d);
const body = data.d as StreamCreateSchema;
if (body.channel_id.trim().length === 0) return;
// first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});
if (!voiceState || !voiceState.channel_id) return;
if (body.guild_id) {
voiceState.member = await Member.findOneOrFail({
where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
relations: ["user", "roles"],
});
}
// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild
const channel = await Channel.findOne({
where: { id: body.channel_id },
});
if (
!channel ||
(body.type === "guild" && channel.guild_id != body.guild_id)
)
return this.close(4000, "invalid channel");
// TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions;
const guildRegion = regions.available.filter(
(r) => r.id === regions.default,
)[0];
// first make sure theres no other streams for this user that somehow didnt get cleared
await Stream.delete({
owner_id: this.user_id,
});
// create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY
const stream = Stream.create({
id: Snowflake.generate(),
owner_id: this.user_id,
channel_id: body.channel_id,
endpoint: guildRegion.endpoint,
});
await stream.save();
const token = genVoiceToken();
const streamSession = StreamSession.create({
stream_id: stream.id,
user_id: this.user_id,
session_id: this.session_id,
token,
});
await streamSession.save();
const streamKey = generateStreamKey(
body.type,
body.guild_id,
body.channel_id,
this.user_id,
);
await emitEvent({
event: "STREAM_CREATE",
data: {
stream_key: streamKey,
rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
viewer_ids: [],
region: guildRegion.name,
paused: false,
},
user_id: this.user_id,
} as StreamCreateEvent);
await emitEvent({
event: "STREAM_SERVER_UPDATE",
data: {
token: streamSession.token,
stream_key: streamKey,
guild_id: null, // not sure why its always null
endpoint: stream.endpoint,
},
user_id: this.user_id,
} as StreamServerUpdateEvent);
voiceState.self_stream = true;
await voiceState.save();
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: voiceState.toPublicVoiceState(),
guild_id: voiceState.guild_id,
channel_id: voiceState.channel_id,
} as VoiceStateUpdateEvent);
}
//stream key:
// guild:${guild_id}:${channel_id}:${user_id}
// call:${channel_id}:${user_id}

View File

@ -0,0 +1,76 @@
import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import {
emitEvent,
Stream,
StreamDeleteEvent,
StreamDeleteSchema,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf";
export async function onStreamDelete(this: WebSocket, data: Payload) {
check.call(this, StreamDeleteSchema, data.d);
const body = data.d as StreamDeleteSchema;
let parsedKey: {
type: "guild" | "call";
channelId: string;
guildId?: string;
userId: string;
};
try {
parsedKey = parseStreamKey(body.stream_key);
} catch (e) {
return this.close(4000, "Invalid stream key");
}
const { userId, channelId, guildId, type } = parsedKey;
// when a user selects to stop watching another user stream, this event gets triggered
// just disconnect user without actually deleting stream
if (this.user_id !== userId) {
await emitEvent({
event: "STREAM_DELETE",
data: {
stream_key: body.stream_key,
},
user_id: this.user_id,
} as StreamDeleteEvent);
return;
}
const stream = await Stream.findOne({
where: { channel_id: channelId, owner_id: userId },
});
if (!stream) return;
await stream.remove();
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});
if (voiceState) {
voiceState.self_stream = false;
await voiceState.save();
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: voiceState.toPublicVoiceState(),
guild_id: guildId,
channel_id: channelId,
} as VoiceStateUpdateEvent);
}
await emitEvent({
event: "STREAM_DELETE",
data: {
stream_key: body.stream_key,
},
guild_id: guildId,
channel_id: channelId,
} as StreamDeleteEvent);
}

View File

@ -0,0 +1,98 @@
import {
genVoiceToken,
parseStreamKey,
Payload,
WebSocket,
} from "@spacebar/gateway";
import {
Config,
emitEvent,
Stream,
StreamCreateEvent,
StreamServerUpdateEvent,
StreamSession,
StreamWatchSchema,
} from "@spacebar/util";
import { check } from "./instanceOf";
import { Not } from "typeorm";
export async function onStreamWatch(this: WebSocket, data: Payload) {
check.call(this, StreamWatchSchema, data.d);
const body = data.d as StreamWatchSchema;
// TODO: apply perms: check if user is allowed to watch
let parsedKey: {
type: "guild" | "call";
channelId: string;
guildId?: string;
userId: string;
};
try {
parsedKey = parseStreamKey(body.stream_key);
} catch (e) {
return this.close(4000, "Invalid stream key");
}
const { type, channelId, guildId, userId } = parsedKey;
const stream = await Stream.findOne({
where: { channel_id: channelId, owner_id: userId },
relations: ["channel"],
});
if (!stream) return this.close(4000, "Invalid stream key");
if (type === "guild" && stream.channel.guild_id != guildId)
return this.close(4000, "Invalid stream key");
const regions = Config.get().regions;
const guildRegion = regions.available.find(
(r) => r.endpoint === stream.endpoint,
);
if (!guildRegion) return this.close(4000, "Unknown region");
const streamSession = StreamSession.create({
stream_id: stream.id,
user_id: this.user_id,
session_id: this.session_id,
token: genVoiceToken(),
});
await streamSession.save();
// get the viewers: stream session tokens for this stream that have been used but not including stream owner
const viewers = await StreamSession.find({
where: {
stream_id: stream.id,
used: true,
user_id: Not(stream.owner_id),
},
});
await emitEvent({
event: "STREAM_CREATE",
data: {
stream_key: body.stream_key,
rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
viewer_ids: viewers.map((v) => v.user_id),
region: guildRegion.name,
paused: false,
},
channel_id: channelId,
user_id: this.user_id,
} as StreamCreateEvent);
await emitEvent({
event: "STREAM_SERVER_UPDATE",
data: {
token: streamSession.token,
stream_key: body.stream_key,
guild_id: null, // not sure why its always null
endpoint: stream.endpoint,
},
user_id: this.user_id,
} as StreamServerUpdateEvent);
}

View File

@ -17,19 +17,19 @@
*/
import { Payload, WebSocket } from "@spacebar/gateway";
import { genVoiceToken } from "../util/SessionUtils";
import { check } from "./instanceOf";
import {
Config,
emitEvent,
Guild,
Member,
Region,
VoiceServerUpdateEvent,
VoiceState,
VoiceStateUpdateEvent,
VoiceStateUpdateSchema,
Region,
} from "@spacebar/util";
import { genVoiceToken } from "../util/SessionUtils";
import { check } from "./instanceOf";
// TODO: check if a voice server is setup
// Notice: Bot users respect the voice channel's user limit, if set.
@ -39,6 +39,10 @@ import {
export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
check.call(this, VoiceStateUpdateSchema, data.d);
const body = data.d as VoiceStateUpdateSchema;
const isNew = body.channel_id === null && body.guild_id === null;
let isChanged = false;
let prevState;
let voiceState: VoiceState;
try {
@ -54,20 +58,24 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
return;
}
if (voiceState.channel_id !== body.channel_id) isChanged = true;
//If a user change voice channel between guild we should send a left event first
if (
voiceState.guild_id &&
voiceState.guild_id !== body.guild_id &&
voiceState.session_id === this.session_id
) {
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: { ...voiceState, channel_id: null },
data: { ...voiceState.toPublicVoiceState(), channel_id: null },
guild_id: voiceState.guild_id,
});
}
//The event send by Discord's client on channel leave has both guild_id and channel_id as null
if (body.guild_id === null) body.guild_id = voiceState.guild_id;
//if (body.guild_id === null) body.guild_id = voiceState.guild_id;
prevState = { ...voiceState };
voiceState.assign(body);
} catch (error) {
voiceState = VoiceState.create({
@ -79,39 +87,58 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
});
}
// 'Fix' for this one voice state error. TODO: Find out why this is sent
// It seems to be sent on client load,
// so maybe its trying to find which server you were connected to before disconnecting, if any?
if (body.guild_id == null) {
return;
// if user left voice channel, send an update to previous channel/guild to let other people know that the user left
if (
voiceState.session_id === this.session_id &&
body.guild_id == null &&
body.channel_id == null &&
(prevState?.guild_id || prevState?.channel_id)
) {
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: {
...voiceState.toPublicVoiceState(),
channel_id: null,
guild_id: null,
},
guild_id: prevState?.guild_id,
channel_id: prevState?.channel_id,
});
}
//TODO the member should only have these properties: hoisted_role, deaf, joined_at, mute, roles, user
//TODO the member.user should only have these properties: avatar, discriminator, id, username
//TODO this may fail
voiceState.member = await Member.findOneOrFail({
where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
relations: ["user", "roles"],
});
if (body.guild_id) {
voiceState.member = await Member.findOneOrFail({
where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
relations: ["user", "roles"],
});
}
//If the session changed we generate a new token
if (voiceState.session_id !== this.session_id)
voiceState.token = genVoiceToken();
voiceState.session_id = this.session_id;
const { id, ...newObj } = voiceState;
const { member } = voiceState;
await Promise.all([
voiceState.save(),
emitEvent({
event: "VOICE_STATE_UPDATE",
data: newObj,
data: {
...voiceState.toPublicVoiceState(),
member: member?.toPublicMember(),
},
guild_id: voiceState.guild_id,
channel_id: voiceState.channel_id,
user_id: voiceState.user_id,
} as VoiceStateUpdateEvent),
]);
//If it's null it means that we are leaving the channel and this event is not needed
if (voiceState.channel_id !== null) {
if ((isNew || isChanged) && voiceState.channel_id !== null) {
const guild = await Guild.findOne({
where: { id: voiceState.guild_id },
});
@ -133,8 +160,11 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
token: voiceState.token,
guild_id: voiceState.guild_id,
endpoint: guildRegion.endpoint,
channel_id: voiceState.guild_id
? undefined
: voiceState.channel_id, // only DM voice calls have this set, and DM channel is one where guild_id is null
},
guild_id: voiceState.guild_id,
user_id: voiceState.user_id,
} as VoiceServerUpdateEvent);
}
}

View File

@ -25,6 +25,9 @@ import { onRequestGuildMembers } from "./RequestGuildMembers";
import { onResume } from "./Resume";
import { onVoiceStateUpdate } from "./VoiceStateUpdate";
import { onGuildSubscriptionsBulk } from "./GuildSubscriptionsBulk";
import { onStreamCreate } from "./StreamCreate";
import { onStreamDelete } from "./StreamDelete";
import { onStreamWatch } from "./StreamWatch";
export type OPCodeHandler = (this: WebSocket, data: Payload) => unknown;
@ -41,5 +44,8 @@ export default {
// 10: Hello
// 13: Dm_update
14: onLazyRequest,
18: onStreamCreate,
19: onStreamDelete,
20: onStreamWatch,
37: onGuildSubscriptionsBulk,
} as { [key: number]: OPCodeHandler };

View File

@ -16,8 +16,6 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// import { VoiceOPCodes } from "@spacebar/webrtc";
export enum OPCODES {
Dispatch = 0,
Heartbeat = 1,
@ -63,7 +61,7 @@ export enum CLOSECODES {
}
export interface Payload {
op: OPCODES /* | VoiceOPCodes */;
op: OPCODES;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
d?: any;
s?: number;

63
src/gateway/util/Utils.ts Normal file
View File

@ -0,0 +1,63 @@
import { VoiceState } from "@spacebar/util";
export function parseStreamKey(streamKey: string): {
type: "guild" | "call";
channelId: string;
guildId?: string;
userId: string;
} {
const streamKeyArray = streamKey.split(":");
const type = streamKeyArray.shift();
if (type !== "guild" && type !== "call") {
throw new Error(`Invalid stream key type: ${type}`);
}
if (
(type === "guild" && streamKeyArray.length < 3) ||
(type === "call" && streamKeyArray.length < 2)
)
throw new Error(`Invalid stream key: ${streamKey}`); // invalid stream key
let guildId: string | undefined;
if (type === "guild") {
guildId = streamKeyArray.shift();
}
const channelId = streamKeyArray.shift();
const userId = streamKeyArray.shift();
if (!channelId || !userId) {
throw new Error(`Invalid stream key: ${streamKey}`);
}
return { type, channelId, guildId, userId };
}
export function generateStreamKey(
type: "guild" | "call",
guildId: string | undefined,
channelId: string,
userId: string,
): string {
const streamKey = `${type}${type === "guild" ? `:${guildId}` : ""}:${channelId}:${userId}`;
return streamKey;
}
// Temporary cleanup function until shutdown cleanup function is fixed.
// Currently when server is shut down the voice states are not cleared
// TODO: remove this when Server.stop() is fixed so that it waits for all websocket connections to run their
// respective Close event listener function for session cleanup
export async function cleanupOnStartup(): Promise<void> {
await VoiceState.update(
{},
{
// @ts-expect-error channel_id is nullable
channel_id: null,
// @ts-expect-error guild_id is nullable
guild_id: null,
self_stream: false,
self_video: false,
},
);
}

View File

@ -20,7 +20,6 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util";
import WS from "ws";
import { Deflate, Inflate } from "fast-zlib";
import { Capabilities } from "./Capabilities";
// import { Client } from "@spacebar/webrtc";
export interface WebSocket extends WS {
version: number;
@ -42,6 +41,5 @@ export interface WebSocket extends WS {
member_events: Record<string, () => unknown>;
listen_options: ListenEventOpts;
capabilities?: Capabilities;
// client?: Client;
large_threshold: number;
}

View File

@ -22,3 +22,4 @@ export * from "./SessionUtils";
export * from "./Heartbeat";
export * from "./WebSocket";
export * from "./Capabilities";
export * from "./Utils";

View File

@ -0,0 +1,42 @@
import {
Column,
Entity,
JoinColumn,
ManyToOne,
OneToMany,
RelationId,
} from "typeorm";
import { BaseClass } from "./BaseClass";
import { dbEngine } from "../util/Database";
import { User } from "./User";
import { Channel } from "./Channel";
import { StreamSession } from "./StreamSession";
@Entity({
name: "streams",
engine: dbEngine,
})
export class Stream extends BaseClass {
@Column()
@RelationId((stream: Stream) => stream.owner)
owner_id: string;
@JoinColumn({ name: "owner_id" })
@ManyToOne(() => User, {
onDelete: "CASCADE",
})
owner: User;
@Column()
@RelationId((stream: Stream) => stream.channel)
channel_id: string;
@JoinColumn({ name: "channel_id" })
@ManyToOne(() => Channel, {
onDelete: "CASCADE",
})
channel: Channel;
@Column()
endpoint: string;
}

View File

@ -0,0 +1,48 @@
import {
Column,
Entity,
JoinColumn,
ManyToOne,
OneToMany,
RelationId,
} from "typeorm";
import { BaseClass } from "./BaseClass";
import { dbEngine } from "../util/Database";
import { User } from "./User";
import { Stream } from "./Stream";
@Entity({
name: "stream_sessions",
engine: dbEngine,
})
export class StreamSession extends BaseClass {
@Column()
@RelationId((session: StreamSession) => session.stream)
stream_id: string;
@JoinColumn({ name: "stream_id" })
@ManyToOne(() => Stream, {
onDelete: "CASCADE",
})
stream: Stream;
@Column()
@RelationId((session: StreamSession) => session.user)
user_id: string;
@JoinColumn({ name: "user_id" })
@ManyToOne(() => User, {
onDelete: "CASCADE",
})
user: User;
@Column({ nullable: true })
token: string;
// this is for gateway session
@Column()
session_id: string;
@Column({ default: false })
used: boolean;
}

View File

@ -24,6 +24,29 @@ import { Member } from "./Member";
import { User } from "./User";
import { dbEngine } from "../util/Database";
export enum PublicVoiceStateEnum {
user_id,
suppress,
session_id,
self_video,
self_mute,
self_deaf,
self_stream,
request_to_speak_timestamp,
mute,
deaf,
channel_id,
guild_id,
}
export type PublicVoiceStateKeys = keyof typeof PublicVoiceStateEnum;
export const PublicVoiceStateProjection = Object.values(
PublicVoiceStateEnum,
).filter((x) => typeof x === "string") as PublicVoiceStateKeys[];
export type PublicVoiceState = Pick<VoiceState, PublicVoiceStateKeys>;
//https://gist.github.com/vassjozsef/e482c65df6ee1facaace8b3c9ff66145#file-voice_state-ex
@Entity({
name: "voice_states",
@ -96,4 +119,13 @@ export class VoiceState extends BaseClass {
@Column({ nullable: true, default: null })
request_to_speak_timestamp?: Date;
toPublicVoiceState() {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const voiceState: any = {};
PublicVoiceStateProjection.forEach((x) => {
voiceState[x] = this[x];
});
return voiceState as PublicVoiceState;
}
}

View File

@ -47,6 +47,8 @@ export * from "./SecurityKey";
export * from "./Session";
export * from "./Sticker";
export * from "./StickerPack";
export * from "./Stream";
export * from "./StreamSession";
export * from "./Team";
export * from "./TeamMember";
export * from "./Template";

View File

@ -28,4 +28,4 @@ export * from "./schemas";
export * from "./imports";
export * from "./config";
export * from "./connections";
export * from "./Signing"
export * from "./Signing";

View File

@ -21,7 +21,6 @@ import {
ConnectedAccount,
Interaction,
ApplicationCommand,
VoiceState,
Message,
PartialEmoji,
Invite,
@ -43,6 +42,7 @@ import {
ReadyPrivateChannel,
GuildOrUnavailable,
GuildCreateResponse,
PublicVoiceState,
} from "@spacebar/util";
export interface Event {
@ -431,7 +431,7 @@ export interface UserConnectionsUpdateEvent extends Event {
export interface VoiceStateUpdateEvent extends Event {
event: "VOICE_STATE_UPDATE";
data: VoiceState & {
data: PublicVoiceState & {
member: PublicMember;
};
}
@ -440,8 +440,37 @@ export interface VoiceServerUpdateEvent extends Event {
event: "VOICE_SERVER_UPDATE";
data: {
token: string;
guild_id: string;
guild_id: string | null;
endpoint: string;
channel_id?: string;
};
}
export interface StreamCreateEvent extends Event {
event: "STREAM_CREATE";
data: {
stream_key: string;
rtc_server_id: string;
viewer_ids: string[];
region: string;
paused: boolean;
};
}
export interface StreamServerUpdateEvent extends Event {
event: "STREAM_SERVER_UPDATE";
data: {
token: string;
stream_key: string;
endpoint: string;
guild_id: string | null;
};
}
export interface StreamDeleteEvent extends Event {
event: "STREAM_DELETE";
data: {
stream_key: string;
};
}
@ -681,6 +710,9 @@ export type EVENT =
| "INTERACTION_CREATE"
| "VOICE_STATE_UPDATE"
| "VOICE_SERVER_UPDATE"
| "STREAM_CREATE"
| "STREAM_SERVER_UPDATE"
| "STREAM_DELETE"
| "APPLICATION_COMMAND_CREATE"
| "APPLICATION_COMMAND_UPDATE"
| "APPLICATION_COMMAND_DELETE"

View File

@ -0,0 +1,43 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class Voice1745625724865 implements MigrationInterface {
name = "Voice1745625724865";
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE "streams" ("id" character varying NOT NULL, "owner_id" character varying NOT NULL, "channel_id" character varying NOT NULL, "endpoint" character varying NOT NULL, CONSTRAINT "PK_40440b6f569ebc02bc71c25c499" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`CREATE TABLE "stream_sessions" ("id" character varying NOT NULL, "stream_id" character varying NOT NULL, "user_id" character varying NOT NULL, "token" character varying, "session_id" character varying NOT NULL, "used" boolean NOT NULL DEFAULT false, CONSTRAINT "PK_49bdc3f66394c12478f8371c546" PRIMARY KEY ("id"))`,
);
await queryRunner.query(
`ALTER TABLE "streams" ADD CONSTRAINT "FK_1b566f9b54d1cda271da53ac82f" FOREIGN KEY ("owner_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "streams" ADD CONSTRAINT "FK_5101f0cded27ff0aae78fc4eed7" FOREIGN KEY ("channel_id") REFERENCES "channels"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "stream_sessions" ADD CONSTRAINT "FK_8b5a028a34dae9ee54af37c9c32" FOREIGN KEY ("stream_id") REFERENCES "streams"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
await queryRunner.query(
`ALTER TABLE "stream_sessions" ADD CONSTRAINT "FK_13ae5c29aff4d0890c54179511a" FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE NO ACTION`,
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "stream_sessions" DROP CONSTRAINT "FK_13ae5c29aff4d0890c54179511a"`,
);
await queryRunner.query(
`ALTER TABLE "stream_sessions" DROP CONSTRAINT "FK_8b5a028a34dae9ee54af37c9c32"`,
);
await queryRunner.query(
`ALTER TABLE "streams" DROP CONSTRAINT "FK_5101f0cded27ff0aae78fc4eed7"`,
);
await queryRunner.query(
`ALTER TABLE "streams" DROP CONSTRAINT "FK_1b566f9b54d1cda271da53ac82f"`,
);
await queryRunner.query(`DROP TABLE "stream_sessions"`);
await queryRunner.query(`DROP TABLE "streams"`);
}
}

View File

@ -31,7 +31,7 @@ export interface SelectProtocolSchema {
type: "audio" | "video";
priority: number;
payload_type: number;
rtx_payload_type?: number | null;
rtx_payload_type?: number;
}[];
rtc_connection_id?: string; // uuid
}

View File

@ -0,0 +1,13 @@
export interface StreamCreateSchema {
type: "guild" | "call";
channel_id: string;
guild_id?: string;
preferred_region?: string;
}
export const StreamCreateSchema = {
type: String,
channel_id: String,
$guild_id: String,
$preferred_region: String,
};

View File

@ -0,0 +1,7 @@
export interface StreamDeleteSchema {
stream_key: string;
}
export const StreamDeleteSchema = {
stream_key: String,
};

View File

@ -0,0 +1,7 @@
export interface StreamWatchSchema {
stream_key: string;
}
export const StreamWatchSchema = {
stream_key: String,
};

View File

@ -23,8 +23,9 @@ export interface VoiceIdentifySchema {
token: string;
video?: boolean;
streams?: {
type: string;
type: "video" | "audio" | "screen";
rid: string;
quality: number;
}[];
max_secure_frames_version?: number;
}

View File

@ -22,7 +22,7 @@ export interface VoiceVideoSchema {
rtx_ssrc?: number;
user_id?: string;
streams?: {
type: "video" | "audio";
type: "video" | "audio" | "screen";
rid: string;
ssrc: number;
active: boolean;

View File

@ -68,6 +68,9 @@ export * from "./responses";
export * from "./RoleModifySchema";
export * from "./RolePositionUpdateSchema";
export * from "./SelectProtocolSchema";
export * from "./StreamCreateSchema";
export * from "./StreamDeleteSchema";
export * from "./StreamWatchSchema";
export * from "./TeamCreateSchema";
export * from "./TemplateCreateSchema";
export * from "./TemplateModifySchema";

View File

@ -52,23 +52,6 @@ export const WsStatus = {
RESUMING: 8,
};
/**
* The current status of a voice connection. Here are the available statuses:
* * CONNECTED: 0
* * CONNECTING: 1
* * AUTHENTICATING: 2
* * RECONNECTING: 3
* * DISCONNECTED: 4
* @typedef {number} VoiceStatus
*/
export const VoiceStatus = {
CONNECTED: 0,
CONNECTING: 1,
AUTHENTICATING: 2,
RECONNECTING: 3,
DISCONNECTED: 4,
};
export const OPCodes = {
DISPATCH: 0,
HEARTBEAT: 1,
@ -84,22 +67,6 @@ export const OPCodes = {
HEARTBEAT_ACK: 11,
};
export const VoiceOPCodes = {
IDENTIFY: 0,
SELECT_PROTOCOL: 1,
READY: 2,
HEARTBEAT: 3,
SESSION_DESCRIPTION: 4,
SPEAKING: 5,
HEARTBEAT_ACK: 6,
RESUME: 7,
HELLO: 8,
RESUMED: 9,
CLIENT_CONNECT: 12, // incorrect, op 12 is probably used for video
CLIENT_DISCONNECT: 13, // incorrect
VERSION: 16, //not documented
};
export const Events = {
RATE_LIMIT: "rateLimit",
CLIENT_READY: "ready",

View File

@ -23,9 +23,9 @@ import { EVENT, Event } from "../interfaces";
export const events = new EventEmitter();
export async function emitEvent(payload: Omit<Event, "created_at">) {
const id = (payload.channel_id ||
payload.user_id ||
payload.guild_id) as string;
const id = (payload.guild_id ||
payload.channel_id ||
payload.user_id) as string;
if (!id) return console.error("event doesn't contain any id", payload);
if (RabbitMQ.connection) {

View File

@ -21,6 +21,14 @@ import dotenv from "dotenv";
import http from "http";
import ws from "ws";
import { Connection } from "./events/Connection";
import {
loadWebRtcLibrary,
mediaServer,
WRTC_PORT_MAX,
WRTC_PORT_MIN,
WRTC_PUBLIC_IP,
} from "./util/MediaServer";
import { green, yellow } from "picocolors";
dotenv.config();
export class Server {
@ -69,14 +77,25 @@ export class Server {
await initDatabase();
await Config.init();
await initEvent();
// try to load webrtc library, if failed just don't start webrtc endpoint
try {
await loadWebRtcLibrary();
} catch (e) {
console.log(`[WebRTC] ${yellow("WEBRTC disabled")}`);
return;
}
await mediaServer.start(WRTC_PUBLIC_IP, WRTC_PORT_MIN, WRTC_PORT_MAX);
if (!this.server.listening) {
this.server.listen(this.port);
console.log(`[WebRTC] online on 0.0.0.0:${this.port}`);
console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`);
}
}
async stop() {
closeDatabase();
this.server.close();
mediaServer?.stop();
}
}

View File

@ -17,11 +17,9 @@
*/
import { WebSocket } from "@spacebar/gateway";
import { Session } from "@spacebar/util";
export async function onClose(this: WebSocket, code: number, reason: string) {
console.log("[WebRTC] closed", code, reason.toString());
if (this.session_id) await Session.delete({ session_id: this.session_id });
this.removeAllListeners();
}

View File

@ -16,11 +16,11 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { CLOSECODES, Send, setHeartbeat, WebSocket } from "@spacebar/gateway";
import { CLOSECODES, setHeartbeat } from "@spacebar/gateway";
import { IncomingMessage } from "http";
import { URL } from "url";
import WS from "ws";
import { VoiceOPCodes } from "../util";
import { VoiceOPCodes, WebRtcWebSocket, Send } from "../util";
import { onClose } from "./Close";
import { onMessage } from "./Message";
@ -30,7 +30,7 @@ import { onMessage } from "./Message";
export async function Connection(
this: WS.Server,
socket: WebSocket,
socket: WebRtcWebSocket,
request: IncomingMessage,
) {
try {

View File

@ -16,10 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { CLOSECODES, Payload, WebSocket } from "@spacebar/gateway";
import { CLOSECODES } from "@spacebar/gateway";
import { Tuple } from "lambert-server";
import OPCodeHandlers from "../opcodes";
import { VoiceOPCodes } from "../util";
import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util";
const PayloadSchema = {
op: Number,
@ -28,16 +28,14 @@ const PayloadSchema = {
$t: String,
};
export async function onMessage(this: WebSocket, buffer: Buffer) {
export async function onMessage(this: WebRtcWebSocket, buffer: Buffer) {
try {
var data: Payload = JSON.parse(buffer.toString());
const data: VoicePayload = JSON.parse(buffer.toString());
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id)
return this.close(CLOSECODES.Not_authenticated);
// @ts-ignore
const OPCodeHandler = OPCodeHandlers[data.op];
if (!OPCodeHandler) {
// @ts-ignore
console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]);
// TODO: if all opcodes are implemented comment this out:
// this.close(CloseCodes.Unknown_opcode);
@ -49,7 +47,6 @@ export async function onMessage(this: WebSocket, buffer: Buffer) {
data.op as VoiceOPCodes,
)
) {
// @ts-ignore
console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]);
}

View File

@ -16,10 +16,12 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { VoiceOPCodes } from "../util";
import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util";
export async function onBackendVersion(this: WebSocket, data: Payload) {
export async function onBackendVersion(
this: WebRtcWebSocket,
data: VoicePayload,
) {
await Send(this, {
op: VoiceOPCodes.VOICE_BACKEND_VERSION,
d: { voice: "0.8.43", rtc_worker: "0.3.26" },

View File

@ -16,16 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {
CLOSECODES,
Payload,
Send,
setHeartbeat,
WebSocket,
} from "@spacebar/gateway";
import { VoiceOPCodes } from "../util";
import { CLOSECODES, setHeartbeat } from "@spacebar/gateway";
import { VoiceOPCodes, VoicePayload, WebRtcWebSocket, Send } from "../util";
export async function onHeartbeat(this: WebSocket, data: Payload) {
export async function onHeartbeat(this: WebRtcWebSocket, data: VoicePayload) {
setHeartbeat(this);
if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error);

View File

@ -16,76 +16,128 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { CLOSECODES, Payload, Send, WebSocket } from "@spacebar/gateway";
import { CLOSECODES } from "@spacebar/gateway";
import {
StreamSession,
validateSchema,
VoiceIdentifySchema,
VoiceState,
} from "@spacebar/util";
import { endpoint, getClients, VoiceOPCodes, PublicIP } from "@spacebar/webrtc";
import SemanticSDP from "semantic-sdp";
const defaultSDP = require("./sdp.json");
import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
generateSsrc,
} from "@spacebar/webrtc";
import { subscribeToProducers } from "./Video";
import { SSRCs } from "spacebar-webrtc-types";
export async function onIdentify(this: WebSocket, data: Payload) {
export async function onIdentify(this: WebRtcWebSocket, data: VoicePayload) {
clearTimeout(this.readyTimeout);
const { server_id, user_id, session_id, token, streams, video } =
validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema;
const voiceState = await VoiceState.findOne({
where: { guild_id: server_id, user_id, token, session_id },
// server_id can be one of the following: a unique id for a GO Live stream, a channel id for a DM voice call, or a guild id for a guild voice channel
// not sure if there's a way to determine whether a snowflake is a channel id or a guild id without checking if it exists in db
// luckily we will only have to determine this once
let type: "guild-voice" | "dm-voice" | "stream" = "guild-voice";
let authenticated = false;
// first check if its a guild voice connection or DM voice call
let voiceState = await VoiceState.findOne({
where: [
{ guild_id: server_id, user_id, token, session_id },
{ channel_id: server_id, user_id, token, session_id },
],
});
if (!voiceState) return this.close(CLOSECODES.Authentication_failed);
if (voiceState) {
type = voiceState.guild_id === server_id ? "guild-voice" : "dm-voice";
authenticated = true;
} else {
// if its not a guild/dm voice connection, check if it is a go live stream
const streamSession = await StreamSession.findOne({
where: {
stream_id: server_id,
user_id,
token,
session_id,
used: false,
},
relations: ["stream"],
});
if (streamSession) {
type = "stream";
authenticated = true;
streamSession.used = true;
await streamSession.save();
this.once("close", async () => {
await streamSession.remove();
});
}
}
// if it doesnt match any then not valid token
if (!authenticated) return this.close(CLOSECODES.Authentication_failed);
this.user_id = user_id;
this.session_id = session_id;
const sdp = SemanticSDP.SDPInfo.expand(defaultSDP);
sdp.setDTLS(
SemanticSDP.DTLSInfo.expand({
setup: "actpass",
hash: "sha-256",
fingerprint: endpoint.getDTLSFingerprint(),
}),
this.type = type;
const voiceRoomId = type === "stream" ? server_id : voiceState!.channel_id;
this.webRtcClient = await mediaServer.join(
voiceRoomId,
this.user_id,
this,
type!,
);
this.client = {
websocket: this,
out: {
tracks: new Map(),
},
in: {
audio_ssrc: 0,
video_ssrc: 0,
rtx_ssrc: 0,
},
sdp,
channel_id: voiceState.channel_id,
};
const clients = getClients(voiceState.channel_id)!;
clients.add(this.client);
this.on("close", () => {
clients.delete(this.client!);
// ice-lite media server relies on this to know when the peer went away
mediaServer.onClientClose(this.webRtcClient!);
});
// once connected subscribe to tracks from other users
this.webRtcClient.emitter.once("connected", async () => {
await subscribeToProducers.call(this);
});
// the server generates a unique ssrc for the audio and video stream. Must be unique among users connected to same server
// UDP clients will respect this ssrc, but websocket clients will generate and replace it with their own
const generatedSsrc: SSRCs = {
audio_ssrc: generateSsrc(),
video_ssrc: generateSsrc(),
rtx_ssrc: generateSsrc(),
};
this.webRtcClient.initIncomingSSRCs(generatedSsrc);
await Send(this, {
op: VoiceOPCodes.READY,
d: {
streams: [
// { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false }
],
ssrc: -1,
port: endpoint.getLocalPort(),
ssrc: generatedSsrc.audio_ssrc,
port: mediaServer.port,
modes: [
"aead_aes256_gcm_rtpsize",
"aead_aes256_gcm",
"aead_xchacha20_poly1305_rtpsize",
"xsalsa20_poly1305_lite_rtpsize",
"xsalsa20_poly1305_lite",
"xsalsa20_poly1305_suffix",
"xsalsa20_poly1305",
],
ip: PublicIP,
ip: mediaServer.ip,
experiments: [],
streams: streams?.map((x) => ({
...x,
ssrc: generatedSsrc.video_ssrc,
rtx_ssrc: generatedSsrc.rtx_ssrc,
type: "video", // client expects this to be overriden for some reason???
})),
},
});
}

View File

@ -15,51 +15,41 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { SelectProtocolSchema, validateSchema } from "@spacebar/util";
import { PublicIP, VoiceOPCodes, endpoint } from "@spacebar/webrtc";
import SemanticSDP, { MediaInfo, SDPInfo } from "semantic-sdp";
import {
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
mediaServer,
Send,
} from "@spacebar/webrtc";
export async function onSelectProtocol(this: WebSocket, payload: Payload) {
if (!this.client) return;
export async function onSelectProtocol(
this: WebRtcWebSocket,
payload: VoicePayload,
) {
if (!this.webRtcClient) return;
const data = validateSchema(
"SelectProtocolSchema",
payload.d,
) as SelectProtocolSchema;
const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!);
this.client.sdp!.setICE(offer.getICE());
this.client.sdp!.setDTLS(offer.getDTLS());
// UDP protocol not currently supported. Maybe in the future?
if (data.protocol !== "webrtc")
return this.close(4000, "only webrtc protocol supported currently");
const transport = endpoint.createTransport(this.client.sdp!);
this.client.transport = transport;
transport.setRemoteProperties(this.client.sdp!);
transport.setLocalProperties(this.client.sdp!);
const dtls = transport.getLocalDTLSInfo();
const ice = transport.getLocalICEInfo();
const port = endpoint.getLocalPort();
const fingerprint = dtls.getHash() + " " + dtls.getFingerprint();
const candidates = transport.getLocalCandidates();
const candidate = candidates[0];
const answer =
`m=audio ${port} ICE/SDP` +
`a=fingerprint:${fingerprint}` +
`c=IN IP4 ${PublicIP}` +
`a=rtcp:${port}` +
`a=ice-ufrag:${ice.getUfrag()}` +
`a=ice-pwd:${ice.getPwd()}` +
`a=fingerprint:${fingerprint}` +
`a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host`;
const response = await mediaServer.onOffer(
this.webRtcClient,
data.sdp!,
data.codecs ?? [],
);
await Send(this, {
op: VoiceOPCodes.SESSION_DESCRIPTION,
d: {
video_codec: "H264",
sdp: answer,
video_codec: response.selectedVideoCodec,
sdp: response.sdp,
media_session_id: this.session_id,
audio_codec: "opus",
},

View File

@ -16,25 +16,37 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { getClients, VoiceOPCodes } from "../util";
import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
} from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727}
export async function onSpeaking(this: WebSocket, data: Payload) {
if (!this.client) return;
export async function onSpeaking(this: WebRtcWebSocket, data: VoicePayload) {
if (!this.webRtcClient) return;
getClients(this.client.channel_id).forEach((client) => {
if (client === this.client) return;
const ssrc = this.client!.out.tracks.get(client.websocket.user_id);
await Promise.all(
Array.from(
mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.voiceRoomId,
),
).map((client) => {
if (client.user_id === this.user_id) return Promise.resolve();
Send(client.websocket, {
op: VoiceOPCodes.SPEAKING,
d: {
user_id: client.websocket.user_id,
speaking: data.d.speaking,
ssrc: ssrc?.audio_ssrc || 0,
},
});
});
const ssrc = client.getOutgoingStreamSSRCsForUser(this.user_id);
return Send(client.websocket, {
op: VoiceOPCodes.SPEAKING,
d: {
user_id: this.user_id,
speaking: data.d.speaking,
ssrc: ssrc.audio_ssrc ?? 0,
},
});
}),
);
}

View File

@ -15,137 +15,243 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Stream, validateSchema, VoiceVideoSchema } from "@spacebar/util";
import {
mediaServer,
VoiceOPCodes,
VoicePayload,
WebRtcWebSocket,
Send,
} from "@spacebar/webrtc";
import type { WebRtcClient } from "spacebar-webrtc-types";
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { validateSchema, VoiceVideoSchema } from "@spacebar/util";
import { channels, getClients, VoiceOPCodes } from "@spacebar/webrtc";
import { IncomingStreamTrack, SSRCs } from "medooze-media-server";
import SemanticSDP from "semantic-sdp";
export async function onVideo(this: WebRtcWebSocket, payload: VoicePayload) {
if (!this.webRtcClient) return;
const { voiceRoomId } = this.webRtcClient;
export async function onVideo(this: WebSocket, payload: Payload) {
if (!this.client) return;
const { transport, channel_id } = this.client;
if (!transport) return;
const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
if (this.type === "stream") {
const stream = await Stream.findOne({
where: { id: voiceRoomId },
});
if (!stream) return;
// only the stream owner can publish to a go live stream
if (stream?.owner_id != this.user_id) {
return;
}
}
const stream = d.streams?.find((element) => element.active);
const clientsThatNeedUpdate = new Set<WebRtcClient<WebRtcWebSocket>>();
const wantsToProduceAudio = d.audio_ssrc !== 0;
const wantsToProduceVideo = d.video_ssrc !== 0 && stream?.active;
// this is to handle a really weird case where the client sends audio info before the
// dtls ice connection is completely connected. Wait for connection for 3 seconds
// and if no connection, just ignore this message
if (!this.webRtcClient.webrtcConnected) {
if (wantsToProduceAudio) {
try {
await Promise.race([
new Promise<void>((resolve, reject) => {
this.webRtcClient?.emitter.once("connected", () =>
resolve(),
);
}),
new Promise<void>((resolve, reject) => {
// Reject after 3 seconds if still not connected
setTimeout(() => {
if (this.webRtcClient?.webrtcConnected) resolve();
else reject();
}, 3000);
}),
]);
} catch (e) {
return; // just ignore this message if client didn't connect within 3 seconds
}
} else return;
}
await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
const id = "stream" + this.user_id;
// first check if we need stop any tracks
if (!wantsToProduceAudio && this.webRtcClient.isProducingAudio()) {
this.webRtcClient.stopPublishingTrack("audio");
}
var stream = this.client.in.stream!;
if (!stream) {
stream = this.client.transport!.createIncomingStream(
// @ts-ignore
SemanticSDP.StreamInfo.expand({
id,
// @ts-ignore
tracks: [],
}),
);
this.client.in.stream = stream;
if (!wantsToProduceVideo && this.webRtcClient.isProducingVideo()) {
this.webRtcClient.stopPublishingTrack("video");
}
const interval = setInterval(() => {
for (const track of stream.getTracks()) {
for (const layer of Object.values(track.getStats())) {
console.log(track.getId(), layer.total);
}
}
}, 5000);
stream.on("stopped", () => {
console.log("stream stopped");
clearInterval(interval);
});
this.on("close", () => {
transport!.stop();
});
const out = transport.createOutgoingStream(
// @ts-ignore
SemanticSDP.StreamInfo.expand({
id: "out" + this.user_id,
// @ts-ignore
tracks: [],
}),
);
this.client.out.stream = out;
const clients = channels.get(channel_id)!;
clients.forEach((client) => {
if (client.websocket.user_id === this.user_id) return;
if (!client.in.stream) return;
client.in.stream?.getTracks().forEach((track) => {
attachTrack.call(this, track, client.websocket.user_id);
// check if client has signaled that it will send audio
if (wantsToProduceAudio) {
// check if we are already producing audio, if not, publish a new audio track for it
if (!this.webRtcClient!.isProducingAudio()) {
console.log(
`[${this.user_id}] publishing new audio track ssrc:${d.audio_ssrc}`,
);
await this.webRtcClient.publishTrack("audio", {
audio_ssrc: d.audio_ssrc,
});
});
}
// now check that all clients have subscribed to our audio
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
voiceRoomId,
)) {
if (client.user_id === this.user_id) continue;
if (!client.isSubscribedToTrack(this.user_id, "audio")) {
console.log(
`[${client.user_id}] subscribing to audio track ssrcs: ${d.audio_ssrc}`,
);
await client.subscribeToTrack(
this.webRtcClient.user_id,
"audio",
);
clientsThatNeedUpdate.add(client);
}
}
}
// check if client has signaled that it will send video
if (wantsToProduceVideo) {
this.webRtcClient!.videoStream = { ...stream, type: "video" }; // client sends "screen" on go live but expects "video" on response
// check if we are already publishing video, if not, publish a new video track for it
if (!this.webRtcClient!.isProducingVideo()) {
console.log(
`[${this.user_id}] publishing new video track ssrc:${d.video_ssrc}`,
);
await this.webRtcClient.publishTrack("video", {
video_ssrc: d.video_ssrc,
rtx_ssrc: d.rtx_ssrc,
});
}
// now check that all clients have subscribed to our video track
for (const client of mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
voiceRoomId,
)) {
if (client.user_id === this.user_id) continue;
if (!client.isSubscribedToTrack(this.user_id, "video")) {
console.log(
`[${client.user_id}] subscribing to video track ssrc: ${d.video_ssrc}`,
);
await client.subscribeToTrack(
this.webRtcClient.user_id,
"video",
);
clientsThatNeedUpdate.add(client);
}
}
}
if (d.audio_ssrc) {
handleSSRC.call(this, "audio", {
media: d.audio_ssrc,
rtx: d.audio_ssrc + 1,
});
}
if (d.video_ssrc && d.rtx_ssrc) {
handleSSRC.call(this, "video", {
media: d.video_ssrc,
rtx: d.rtx_ssrc,
});
}
}
await Promise.all(
Array.from(clientsThatNeedUpdate).map((client) => {
const ssrcs = client.getOutgoingStreamSSRCsForUser(this.user_id);
function attachTrack(
this: WebSocket,
track: IncomingStreamTrack,
user_id: string,
) {
if (!this.client) return;
const outTrack = this.client.transport!.createOutgoingStreamTrack(
track.getMedia(),
return Send(client.websocket, {
op: VoiceOPCodes.VIDEO,
d: {
user_id: this.user_id,
// can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup
audio_ssrc:
ssrcs.audio_ssrc ??
this.webRtcClient!.getIncomingStreamSSRCs().audio_ssrc,
video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: d.streams?.map((x) => ({
...x,
ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
type: "video",
})),
} as VoiceVideoSchema,
});
}),
);
outTrack.attachTo(track);
this.client.out.stream!.addTrack(outTrack);
var ssrcs = this.client.out.tracks.get(user_id)!;
if (!ssrcs)
ssrcs = this.client.out.tracks
.set(user_id, { audio_ssrc: 0, rtx_ssrc: 0, video_ssrc: 0 })
.get(user_id)!;
if (track.getMedia() === "audio") {
ssrcs.audio_ssrc = outTrack.getSSRCs().media!;
} else if (track.getMedia() === "video") {
ssrcs.video_ssrc = outTrack.getSSRCs().media!;
ssrcs.rtx_ssrc = outTrack.getSSRCs().rtx!;
}
Send(this, {
op: VoiceOPCodes.VIDEO,
d: {
user_id: user_id,
...ssrcs,
} as VoiceVideoSchema,
});
}
function handleSSRC(this: WebSocket, type: "audio" | "video", ssrcs: SSRCs) {
if (!this.client) return;
const stream = this.client.in.stream!;
const transport = this.client.transport!;
// check if we are not subscribed to producers in this server, if not, subscribe
export async function subscribeToProducers(
this: WebRtcWebSocket,
): Promise<void> {
if (!this.webRtcClient || !this.webRtcClient.webrtcConnected) return;
const id = type + ssrcs.media;
var track = stream.getTrack(id);
if (!track) {
console.log("createIncomingStreamTrack", id);
track = transport.createIncomingStreamTrack(type, { id, ssrcs });
stream.addTrack(track);
const clients = mediaServer.getClientsForRtcServer<WebRtcWebSocket>(
this.webRtcClient.voiceRoomId,
);
const clients = getClients(this.client.channel_id)!;
clients.forEach((client) => {
if (client.websocket.user_id === this.user_id) return;
if (!client.out.stream) return;
await Promise.all(
Array.from(clients).map(async (client) => {
let needsUpdate = false;
attachTrack.call(this, track, client.websocket.user_id);
});
}
if (client.user_id === this.user_id) return; // cannot subscribe to self
if (
client.isProducingAudio() &&
!this.webRtcClient!.isSubscribedToTrack(client.user_id, "audio")
) {
await this.webRtcClient!.subscribeToTrack(
client.user_id,
"audio",
);
needsUpdate = true;
}
if (
client.isProducingVideo() &&
!this.webRtcClient!.isSubscribedToTrack(client.user_id, "video")
) {
await this.webRtcClient!.subscribeToTrack(
client.user_id,
"video",
);
needsUpdate = true;
}
if (!needsUpdate) return;
const ssrcs = this.webRtcClient!.getOutgoingStreamSSRCsForUser(
client.user_id,
);
await Send(this, {
op: VoiceOPCodes.VIDEO,
d: {
user_id: client.user_id,
// can never send audio ssrc as 0, it will mess up client state for some reason. send server generated ssrc as backup
audio_ssrc:
ssrcs.audio_ssrc ??
client.getIncomingStreamSSRCs().audio_ssrc,
video_ssrc: ssrcs.video_ssrc ?? 0,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
streams: [
client.videoStream ?? {
type: "video",
rid: "100",
ssrc: ssrcs.video_ssrc ?? 0,
active: client.isProducingVideo(),
quality: 100,
rtx_ssrc: ssrcs.rtx_ssrc ?? 0,
max_bitrate: 2500000,
max_framerate: 20,
max_resolution: {
type: "fixed",
width: 1280,
height: 720,
},
},
],
} as VoiceVideoSchema,
});
}),
);
}

View File

@ -16,8 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Payload, WebSocket } from "@spacebar/gateway";
import { VoiceOPCodes } from "../util";
import { VoiceOPCodes, VoicePayload, WebRtcWebSocket } from "../util";
import { onBackendVersion } from "./BackendVersion";
import { onHeartbeat } from "./Heartbeat";
import { onIdentify } from "./Identify";
@ -25,7 +24,7 @@ import { onSelectProtocol } from "./SelectProtocol";
import { onSpeaking } from "./Speaking";
import { onVideo } from "./Video";
export type OPCodeHandler = (this: WebSocket, data: Payload) => any;
export type OPCodeHandler = (this: WebRtcWebSocket, data: VoicePayload) => any;
export default {
[VoiceOPCodes.HEARTBEAT]: onHeartbeat,
@ -34,4 +33,4 @@ export default {
[VoiceOPCodes.VIDEO]: onVideo,
[VoiceOPCodes.SPEAKING]: onSpeaking,
[VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol,
};
} as { [key: number]: OPCodeHandler };

View File

@ -1,420 +0,0 @@
{
"version": 0,
"streams": [],
"medias": [
{
"id": "0",
"type": "audio",
"direction": "sendrecv",
"codecs": [
{
"codec": "opus",
"type": 111,
"channels": 2,
"params": {
"minptime": "10",
"useinbandfec": "1"
},
"rtcpfbs": [
{
"id": "transport-cc"
}
]
}
],
"extensions": {
"1": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
"2": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"3": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"4": "urn:ietf:params:rtp-hdrext:sdes:mid"
}
},
{
"id": "1",
"type": "video",
"direction": "sendrecv",
"codecs": [
{
"codec": "VP8",
"type": 96,
"rtx": 97,
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "VP9",
"type": 98,
"rtx": 99,
"params": {
"profile-id": "0"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "VP9",
"type": 100,
"rtx": 101,
"params": {
"profile-id": "2"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "VP9",
"type": 102,
"rtx": 122,
"params": {
"profile-id": "1"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 127,
"rtx": 121,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "1",
"profile-level-id": "42001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 125,
"rtx": 107,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "0",
"profile-level-id": "42001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 108,
"rtx": 109,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "1",
"profile-level-id": "42e01f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 124,
"rtx": 120,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "0",
"profile-level-id": "42e01f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 123,
"rtx": 119,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "1",
"profile-level-id": "4d001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 35,
"rtx": 36,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "0",
"profile-level-id": "4d001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 37,
"rtx": 38,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "1",
"profile-level-id": "f4001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 39,
"rtx": 40,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "0",
"profile-level-id": "f4001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
},
{
"codec": "H264",
"type": 114,
"rtx": 115,
"params": {
"level-asymmetry-allowed": "1",
"packetization-mode": "1",
"profile-level-id": "64001f"
},
"rtcpfbs": [
{
"id": "goog-remb"
},
{
"id": "transport-cc"
},
{
"id": "ccm",
"params": ["fir"]
},
{
"id": "nack"
},
{
"id": "nack",
"params": ["pli"]
}
]
}
],
"extensions": {
"2": "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
"3": "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
"4": "urn:ietf:params:rtp-hdrext:sdes:mid",
"5": "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay",
"6": "http://www.webrtc.org/experiments/rtp-hdrext/video-content-type",
"7": "http://www.webrtc.org/experiments/rtp-hdrext/video-timing",
"8": "http://www.webrtc.org/experiments/rtp-hdrext/color-space",
"10": "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"11": "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
"13": "urn:3gpp:video-orientation",
"14": "urn:ietf:params:rtp-hdrext:toffset"
}
}
],
"candidates": []
}

View File

@ -16,13 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
export enum VoiceStatus {
CONNECTED = 0,
CONNECTING = 1,
AUTHENTICATING = 2,
RECONNECTING = 3,
DISCONNECTED = 4,
}
import { Payload } from "@spacebar/gateway";
export enum VoiceOPCodes {
IDENTIFY = 0,
@ -42,3 +36,5 @@ export enum VoiceOPCodes {
VOICE_BACKEND_VERSION = 16,
CHANNEL_OPTIONS_UPDATE = 17,
}
export type VoicePayload = Omit<Payload, "op"> & { op: VoiceOPCodes };

View File

@ -16,62 +16,62 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { WebSocket } from "@spacebar/gateway";
import MediaServer, {
IncomingStream,
OutgoingStream,
Transport,
} from "medooze-media-server";
import SemanticSDP from "semantic-sdp";
MediaServer.enableLog(true);
import type { SignalingDelegate } from "spacebar-webrtc-types";
import { green, red } from "picocolors";
export const PublicIP = process.env.PUBLIC_IP || "127.0.0.1";
export let mediaServer: SignalingDelegate;
try {
const range = process.env.WEBRTC_PORT_RANGE || "4000";
var ports = range.split("-");
const min = Number(ports[0]);
const max = Number(ports[1]);
export const WRTC_PUBLIC_IP = process.env.WRTC_PUBLIC_IP ?? "127.0.0.1";
export const WRTC_PORT_MIN = process.env.WRTC_PORT_MIN
? parseInt(process.env.WRTC_PORT_MIN)
: 2000;
export const WRTC_PORT_MAX = process.env.WRTC_PORT_MAX
? parseInt(process.env.WRTC_PORT_MAX)
: 65000;
MediaServer.setPortRange(min, max);
} catch (error) {
console.error(
"Invalid env var: WEBRTC_PORT_RANGE",
process.env.WEBRTC_PORT_RANGE,
error,
);
process.exit(1);
const selectedWrtcLibrary = process.env.WRTC_LIBRARY;
// could not find a way to hide stack trace from base Error object
class NoConfiguredLibraryError implements Error {
name: string;
message: string;
stack?: string | undefined;
cause?: unknown;
constructor(message: string) {
this.name = "NoConfiguredLibraryError";
this.message = message;
}
}
export const endpoint = MediaServer.createEndpoint(PublicIP);
export const loadWebRtcLibrary = async () => {
try {
//mediaServer = require('medooze-spacebar-wrtc');
if (!selectedWrtcLibrary)
throw new NoConfiguredLibraryError("No library configured in .env");
export const channels = new Map<string, Set<Client>>();
mediaServer = new // @ts-ignore
(await import(selectedWrtcLibrary)).default();
export interface Client {
transport?: Transport;
websocket: WebSocket;
out: {
stream?: OutgoingStream;
tracks: Map<
string,
{
audio_ssrc: number;
video_ssrc: number;
rtx_ssrc: number;
}
>;
};
in: {
stream?: IncomingStream;
audio_ssrc: number;
video_ssrc: number;
rtx_ssrc: number;
};
sdp: SemanticSDP.SDPInfo;
channel_id: string;
}
console.log(
`[WebRTC] ${green(`Succesfully loaded ${selectedWrtcLibrary}`)}`,
);
return Promise.resolve();
} catch (error) {
console.log(
`[WebRTC] ${red(`Failed to import ${selectedWrtcLibrary}: ${error instanceof NoConfiguredLibraryError ? error.message : ""}`)}`,
);
export function getClients(channel_id: string) {
if (!channels.has(channel_id)) channels.set(channel_id, new Set());
return channels.get(channel_id)!;
}
return Promise.reject();
}
};
const MAX_INT32BIT = 2 ** 32;
let count = 1;
export const generateSsrc = () => {
count++;
if (count >= MAX_INT32BIT) count = 1;
return count;
};

27
src/webrtc/util/Send.ts Normal file
View File

@ -0,0 +1,27 @@
import { JSONReplacer } from "@spacebar/util";
import { VoicePayload } from "./Constants";
import { WebRtcWebSocket } from "./WebRtcWebSocket";
export function Send(socket: WebRtcWebSocket, data: VoicePayload) {
if (process.env.WRTC_WS_VERBOSE)
console.log(`[WebRTC] Outgoing message: ${JSON.stringify(data)}`);
let buffer: Buffer | string;
// TODO: encode circular object
if (socket.encoding === "json") buffer = JSON.stringify(data, JSONReplacer);
else return;
return new Promise((res, rej) => {
if (socket.readyState !== 1) {
// return rej("socket not open");
socket.close();
return;
}
socket.send(buffer, (err) => {
if (err) return rej(err);
return res(null);
});
});
}

View File

@ -0,0 +1,7 @@
import { WebSocket } from "@spacebar/gateway";
import type { WebRtcClient } from "spacebar-webrtc-types";
export interface WebRtcWebSocket extends WebSocket {
type: "guild-voice" | "dm-voice" | "stream";
webRtcClient?: WebRtcClient<WebRtcWebSocket>;
}

View File

@ -18,3 +18,5 @@
export * from "./Constants";
export * from "./MediaServer";
export * from "./WebRtcWebSocket";
export * from "./Send";

View File

@ -1,5 +1,4 @@
{
"exclude": ["./src/webrtc"],
"include": ["./src"],
"compilerOptions": {
/* Visit https://aka.ms/tsconfig to read more about this file */
@ -37,7 +36,8 @@
"@spacebar/api*": ["./api"],
"@spacebar/gateway*": ["./gateway"],
"@spacebar/cdn*": ["./cdn"],
"@spacebar/util*": ["./util"]
"@spacebar/util*": ["./util"],
"@spacebar/webrtc*": ["./webrtc"]
} /* Specify a set of entries that re-map imports to additional lookup locations. */,
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
// "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */