From 67267d509d0df3689e3d644f7cde8e7626201151 Mon Sep 17 00:00:00 2001 From: Hampus Kraft Date: Wed, 18 Feb 2026 20:50:11 +0000 Subject: [PATCH] feat: improve guild collection rpcs --- .../src/guild/guild_manager_shard.erl | 195 ++++++++++++++- packages/api/src/rpc/RpcService.tsx | 224 +++++++++++++++++- .../schema/src/domains/rpc/RpcSchemas.tsx | 29 +++ 3 files changed, 435 insertions(+), 13 deletions(-) diff --git a/fluxer_gateway/src/guild/guild_manager_shard.erl b/fluxer_gateway/src/guild/guild_manager_shard.erl index 6d39b6aa..4f5ef37f 100644 --- a/fluxer_gateway/src/guild/guild_manager_shard.erl +++ b/fluxer_gateway/src/guild/guild_manager_shard.erl @@ -22,6 +22,16 @@ -define(BATCH_SIZE, 10). -define(BATCH_DELAY_MS, 100). +-define(GUILD_COLLECTION_FETCH_TIMEOUT_MS, 120000). +-define(GUILD_MEMBER_COLLECTION_LIMIT, 250). +-define(GUILD_COLLECTIONS, [ + <<"guild">>, + <<"roles">>, + <<"channels">>, + <<"emojis">>, + <<"stickers">>, + <<"members">> +]). -export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -468,16 +478,195 @@ lookup_existing_guild(GuildId, GuildName, State) -> -spec fetch_guild_data(guild_id()) -> fetch_result(). fetch_guild_data(GuildId) -> + Parent = self(), + Ref = make_ref(), + _ = [ + spawn_monitor(fun() -> + Parent ! {Ref, Collection, fetch_guild_collection(GuildId, Collection)} + end) + || Collection <- ?GUILD_COLLECTIONS + ], + DeadlineMs = erlang:monotonic_time(millisecond) + ?GUILD_COLLECTION_FETCH_TIMEOUT_MS, + collect_guild_collection_results(Ref, ?GUILD_COLLECTIONS, #{}, DeadlineMs). + +-spec collect_guild_collection_results(reference(), [binary()], guild_data(), integer()) -> + fetch_result(). +collect_guild_collection_results(_Ref, [], Acc, _DeadlineMs) -> + {ok, Acc}; +collect_guild_collection_results(Ref, PendingCollections, Acc, DeadlineMs) -> + NowMs = erlang:monotonic_time(millisecond), + RemainingMs = DeadlineMs - NowMs, + case RemainingMs > 0 of + false -> + {error, {guild_collection_fetch_timeout, PendingCollections}}; + true -> + receive + {Ref, Collection, {ok, Data}} -> + Key = guild_collection_result_key(Collection), + NewPending = lists:delete(Collection, PendingCollections), + NewAcc = maps:put(Key, Data, Acc), + collect_guild_collection_results(Ref, NewPending, NewAcc, DeadlineMs); + {Ref, Collection, {error, Reason}} -> + {error, {guild_collection_fetch_failed, Collection, Reason}}; + {'DOWN', _, process, _, _} -> + collect_guild_collection_results(Ref, PendingCollections, Acc, DeadlineMs) + after RemainingMs -> + {error, {guild_collection_fetch_timeout, PendingCollections}} + end + end. + +-spec guild_collection_result_key(binary()) -> binary(). +guild_collection_result_key(<<"guild">>) -> <<"guild">>; +guild_collection_result_key(<<"roles">>) -> <<"roles">>; +guild_collection_result_key(<<"channels">>) -> <<"channels">>; +guild_collection_result_key(<<"emojis">>) -> <<"emojis">>; +guild_collection_result_key(<<"stickers">>) -> <<"stickers">>; +guild_collection_result_key(<<"members">>) -> <<"members">>; +guild_collection_result_key(Collection) -> Collection. + +-spec fetch_guild_collection(guild_id(), binary()) -> {ok, term()} | {error, term()}. +fetch_guild_collection(GuildId, <<"members">>) -> + fetch_guild_members_collection_stream(GuildId, undefined, []); +fetch_guild_collection(GuildId, Collection) -> RpcRequest = #{ - <<"type">> => <<"guild">>, + <<"type">> => <<"guild_collection">>, <<"guild_id">> => type_conv:to_binary(GuildId), - <<"version">> => 1 + <<"collection">> => Collection }, - rpc_client:call(RpcRequest). + case rpc_client:call(RpcRequest) of + {ok, Data} -> + case maps:get(Collection, Data, undefined) of + undefined -> {error, {invalid_collection_response, Collection}}; + Value -> {ok, Value} + end; + {error, Reason} -> + {error, Reason} + end. + +-spec fetch_guild_members_collection_stream(guild_id(), binary() | undefined, [[map()]]) -> + {ok, [map()]} | {error, term()}. +fetch_guild_members_collection_stream(GuildId, AfterUserId, ChunksAcc) -> + RpcRequest0 = #{ + <<"type">> => <<"guild_collection">>, + <<"guild_id">> => type_conv:to_binary(GuildId), + <<"collection">> => <<"members">>, + <<"limit">> => ?GUILD_MEMBER_COLLECTION_LIMIT + }, + RpcRequest = maybe_put_after_user_id(AfterUserId, RpcRequest0), + case rpc_client:call(RpcRequest) of + {ok, Data} -> + parse_members_collection_page(GuildId, Data, ChunksAcc); + {error, Reason} -> + {error, Reason} + end. + +-spec parse_members_collection_page(guild_id(), map(), [[map()]]) -> {ok, [map()]} | {error, term()}. +parse_members_collection_page(GuildId, Data, ChunksAcc) -> + Members = maps:get(<<"members">>, Data, undefined), + HasMore = maps:get(<<"has_more">>, Data, false), + NextAfterUserId = maps:get(<<"next_after_user_id">>, Data, null), + case Members of + MemberList when is_list(MemberList) -> + parse_members_collection_page_result( + GuildId, + MemberList, + HasMore, + NextAfterUserId, + ChunksAcc + ); + _ -> + {error, invalid_members_collection_payload} + end. + +-spec parse_members_collection_page_result( + guild_id(), + [map()], + term(), + term(), + [[map()]] +) -> + {ok, [map()]} | {error, term()}. +parse_members_collection_page_result( + GuildId, + MemberList, + true, + NextAfterUserId, + ChunksAcc +) when is_binary(NextAfterUserId), MemberList =/= [] -> + fetch_guild_members_collection_stream( + GuildId, + NextAfterUserId, + [MemberList | ChunksAcc] + ); +parse_members_collection_page_result( + _GuildId, + [], + true, + _NextAfterUserId, + _ChunksAcc +) -> + {error, invalid_members_collection_empty_page}; +parse_members_collection_page_result( + _GuildId, + _MemberList, + true, + _NextAfterUserId, + _ChunksAcc +) -> + {error, invalid_members_collection_cursor}; +parse_members_collection_page_result( + _GuildId, + MemberList, + false, + _NextAfterUserId, + ChunksAcc +) -> + {ok, lists:append(lists:reverse([MemberList | ChunksAcc]))}; +parse_members_collection_page_result( + _GuildId, + _MemberList, + _HasMore, + _NextAfterUserId, + _ChunksAcc +) -> + {error, invalid_members_collection_has_more}. + +-spec maybe_put_after_user_id(binary() | undefined, map()) -> map(). +maybe_put_after_user_id(undefined, RpcRequest) -> + RpcRequest; +maybe_put_after_user_id(AfterUserId, RpcRequest) when is_binary(AfterUserId) -> + maps:put(<<"after_user_id">>, AfterUserId, RpcRequest). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +parse_members_collection_page_result_final_page_test() -> + Members = [ + #{<<"user">> => #{<<"id">> => <<"1">>}} + ], + ?assertEqual( + {ok, Members}, + parse_members_collection_page_result(42, Members, false, null, []) + ). + +parse_members_collection_page_result_invalid_cursor_test() -> + Members = [ + #{<<"user">> => #{<<"id">> => <<"1">>}} + ], + ?assertEqual( + {error, invalid_members_collection_cursor}, + parse_members_collection_page_result(42, Members, true, null, []) + ). + +maybe_put_after_user_id_test() -> + BaseRequest = #{ + <<"type">> => <<"guild_collection">>, + <<"collection">> => <<"members">> + }, + ?assertEqual(BaseRequest, maybe_put_after_user_id(undefined, BaseRequest)), + WithCursor = maybe_put_after_user_id(<<"100">>, BaseRequest), + ?assertEqual(<<"100">>, maps:get(<<"after_user_id">>, WithCursor)). + select_guilds_to_reload_empty_ids_test() -> Guilds = #{1 => {self(), make_ref()}, 2 => {self(), make_ref()}}, Result = select_guilds_to_reload([], Guilds), diff --git a/packages/api/src/rpc/RpcService.tsx b/packages/api/src/rpc/RpcService.tsx index e7d10ed6..f4f44957 100644 --- a/packages/api/src/rpc/RpcService.tsx +++ b/packages/api/src/rpc/RpcService.tsx @@ -106,8 +106,10 @@ import type {IRateLimitService} from '@fluxer/rate_limit/src/IRateLimitService'; import type {ChannelResponse} from '@fluxer/schema/src/domains/channel/ChannelSchemas'; import type {GuildMemberResponse} from '@fluxer/schema/src/domains/guild/GuildMemberSchemas'; import type { + RpcGuildCollectionType, RpcRequest, RpcResponse, + RpcResponseGuildCollectionData, RpcResponseGuildData, RpcResponseSessionData, } from '@fluxer/schema/src/domains/rpc/RpcSchemas'; @@ -135,6 +137,14 @@ interface HandleGuildRequestParams { requestCache: RequestCache; } +interface HandleGuildCollectionRequestParams { + guildId: GuildID; + collection: RpcGuildCollectionType; + requestCache: RequestCache; + afterUserId?: UserID; + limit?: number; +} + interface GetGuildDataParams { guildId: GuildID; } @@ -166,6 +176,9 @@ interface UserData { pinnedDMs: Array; } +const GUILD_COLLECTION_DEFAULT_LIMIT = 250; +const GUILD_COLLECTION_MAX_LIMIT = 1000; + export class RpcService { private readonly customStatusValidator: CustomStatusValidator; @@ -414,6 +427,17 @@ export class RpcService { requestCache, }), }; + case 'guild_collection': + return { + type: 'guild_collection', + data: await this.handleGuildCollectionRequest({ + guildId: createGuildID(request.guild_id), + collection: request.collection, + requestCache, + afterUserId: request.after_user_id ? createUserID(request.after_user_id) : undefined, + limit: request.limit, + }), + }; case 'get_user_guild_settings': { const result = await this.getUserGuildSettings({ userIds: request.user_ids.map(createUserID), @@ -1214,6 +1238,195 @@ export class RpcService { }; } + private async handleGuildCollectionRequest({ + guildId, + collection, + requestCache, + afterUserId, + limit, + }: HandleGuildCollectionRequestParams): Promise { + switch (collection) { + case 'guild': + return await this.handleGuildCollectionGuildRequest({guildId}); + case 'roles': + return await this.handleGuildCollectionRolesRequest({guildId}); + case 'channels': + return await this.handleGuildCollectionChannelsRequest({guildId, requestCache}); + case 'emojis': + return await this.handleGuildCollectionEmojisRequest({guildId}); + case 'stickers': + return await this.handleGuildCollectionStickersRequest({guildId}); + case 'members': + return await this.handleGuildCollectionMembersRequest({guildId, requestCache, afterUserId, limit}); + default: { + const exhaustiveCheck: never = collection; + throw new Error(`Unknown guild collection: ${String(exhaustiveCheck)}`); + } + } + } + + private createGuildCollectionResponse(collection: RpcGuildCollectionType): RpcResponseGuildCollectionData { + return { + collection, + guild: undefined, + roles: undefined, + channels: undefined, + emojis: undefined, + stickers: undefined, + members: undefined, + has_more: false, + next_after_user_id: null, + }; + } + + private async getGuildOrThrow(guildId: GuildID): Promise { + const guild = await this.guildRepository.findUnique(guildId); + if (!guild) { + throw new UnknownGuildError(); + } + return guild; + } + + private resolveGuildCollectionLimit(limit?: number): number { + if (!limit || !Number.isInteger(limit) || limit < 1) { + return GUILD_COLLECTION_DEFAULT_LIMIT; + } + return Math.min(limit, GUILD_COLLECTION_MAX_LIMIT); + } + + private async handleGuildCollectionGuildRequest({ + guildId, + }: { + guildId: GuildID; + }): Promise { + const guild = await this.getGuildOrThrow(guildId); + const repairedBannerGuild = await this.repairGuildBannerHeight(guild); + const repairedSplashGuild = await this.repairGuildSplashDimensions(repairedBannerGuild); + const repairedEmbedSplashGuild = await this.repairGuildEmbedSplashDimensions(repairedSplashGuild); + return { + ...this.createGuildCollectionResponse('guild'), + guild: mapGuildToGuildResponse(repairedEmbedSplashGuild), + }; + } + + private async handleGuildCollectionRolesRequest({ + guildId, + }: { + guildId: GuildID; + }): Promise { + await this.getGuildOrThrow(guildId); + const roles = await this.guildRepository.listRoles(guildId); + return { + ...this.createGuildCollectionResponse('roles'), + roles: roles.map(mapGuildRoleToResponse), + }; + } + + private async handleGuildCollectionChannelsRequest({ + guildId, + requestCache, + }: { + guildId: GuildID; + requestCache: RequestCache; + }): Promise { + const guild = await this.getGuildOrThrow(guildId); + const channels = await this.channelRepository.listGuildChannels(guildId); + const repairedGuild = await this.repairDanglingChannelReferences({guild, channels}); + this.repairOrphanedInvitesAndWebhooks({guild: repairedGuild, channels}).catch((error) => { + Logger.warn({guildId: guildId.toString(), error}, 'Failed to repair orphaned invites/webhooks'); + }); + const mappedChannels = await Promise.all( + channels.map((channel) => + mapChannelToResponse({ + channel, + currentUserId: null, + userCacheService: this.userCacheService, + requestCache, + }), + ), + ); + return { + ...this.createGuildCollectionResponse('channels'), + channels: mappedChannels, + }; + } + + private async handleGuildCollectionEmojisRequest({ + guildId, + }: { + guildId: GuildID; + }): Promise { + await this.getGuildOrThrow(guildId); + const emojis = await this.guildRepository.listEmojis(guildId); + return { + ...this.createGuildCollectionResponse('emojis'), + emojis: emojis.map(mapGuildEmojiToResponse), + }; + } + + private async handleGuildCollectionStickersRequest({ + guildId, + }: { + guildId: GuildID; + }): Promise { + await this.getGuildOrThrow(guildId); + const stickers = await this.guildRepository.listStickers(guildId); + const migratedStickers = await this.migrateGuildStickersForRpc(guildId, stickers); + return { + ...this.createGuildCollectionResponse('stickers'), + stickers: migratedStickers.map(mapGuildStickerToResponse), + }; + } + + private async handleGuildCollectionMembersRequest({ + guildId, + requestCache, + afterUserId, + limit, + }: { + guildId: GuildID; + requestCache: RequestCache; + afterUserId?: UserID; + limit?: number; + }): Promise { + await this.getGuildOrThrow(guildId); + const chunkSize = this.resolveGuildCollectionLimit(limit); + const members = await this.guildRepository.listMembersPaginated(guildId, chunkSize + 1, afterUserId); + const hasMore = members.length > chunkSize; + const pageMembers = hasMore ? members.slice(0, chunkSize) : members; + const mappedMembers = await this.mapRpcGuildMembers({guildId, members: pageMembers, requestCache}); + let nextAfterUserId: string | null = null; + if (hasMore) { + const lastMember = pageMembers[pageMembers.length - 1]; + if (!lastMember) { + throw new Error('Failed to build next member collection cursor'); + } + nextAfterUserId = lastMember.userId.toString(); + } + return { + ...this.createGuildCollectionResponse('members'), + members: mappedMembers, + has_more: hasMore, + next_after_user_id: nextAfterUserId, + }; + } + + private async migrateGuildStickersForRpc( + guildId: GuildID, + stickers: Array, + ): Promise> { + const needsMigration = stickers.filter((sticker) => sticker.animated === null || sticker.animated === undefined); + if (needsMigration.length === 0) { + return stickers; + } + Logger.info({count: needsMigration.length, guildId}, 'Migrating sticker animated fields'); + const migrated = await Promise.all(needsMigration.map((sticker) => this.migrateStickerAnimated(sticker))); + return stickers.map((sticker) => { + const migratedSticker = migrated.find((candidate) => candidate.id === sticker.id); + return migratedSticker ?? sticker; + }); + } + private async getGuildData({guildId}: GetGuildDataParams): Promise { const [guildResult, channels, emojis, stickers, members, roles] = await Promise.all([ this.guildRepository.findUnique(guildId), @@ -1225,16 +1438,7 @@ export class RpcService { ]); if (!guildResult) return null; - let migratedStickers = stickers; - const needsMigration = stickers.filter((s) => s.animated === null || s.animated === undefined); - if (needsMigration.length > 0) { - Logger.info({count: needsMigration.length, guildId}, 'Migrating sticker animated fields'); - const migrated = await Promise.all(needsMigration.map((s) => this.migrateStickerAnimated(s))); - migratedStickers = stickers.map((s) => { - const migratedSticker = migrated.find((m) => m.id === s.id); - return migratedSticker ?? s; - }); - } + const migratedStickers = await this.migrateGuildStickersForRpc(guildId, stickers); const repairedChannelRefsGuild = await this.repairDanglingChannelReferences({guild: guildResult, channels}); const repairedBannerGuild = await this.repairGuildBannerHeight(repairedChannelRefsGuild); diff --git a/packages/schema/src/domains/rpc/RpcSchemas.tsx b/packages/schema/src/domains/rpc/RpcSchemas.tsx index e2ff9bfc..03f664ef 100644 --- a/packages/schema/src/domains/rpc/RpcSchemas.tsx +++ b/packages/schema/src/domains/rpc/RpcSchemas.tsx @@ -35,6 +35,10 @@ import { import {createStringType, SnowflakeStringType, SnowflakeType} from '@fluxer/schema/src/primitives/SchemaPrimitives'; import {z} from 'zod'; +export const RpcGuildCollectionType = z.enum(['guild', 'roles', 'channels', 'emojis', 'stickers', 'members']); + +export type RpcGuildCollectionType = z.infer; + export const ReadStateResponse = z.object({ id: SnowflakeStringType.describe('The channel ID for this read state'), mention_count: z.number().describe('Number of unread mentions in the channel'), @@ -57,6 +61,13 @@ export const RpcRequest = z.discriminatedUnion('type', [ type: z.literal('guild').describe('Request type for fetching guild data'), guild_id: SnowflakeType.describe('ID of the guild to fetch'), }), + z.object({ + type: z.literal('guild_collection').describe('Request type for fetching a single guild collection chunk'), + guild_id: SnowflakeType.describe('ID of the guild to fetch'), + collection: RpcGuildCollectionType.describe('Guild collection to fetch'), + limit: z.number().int().min(1).max(1000).optional().describe('Maximum number of items to return'), + after_user_id: SnowflakeType.optional().describe('Cursor for member collection pagination'), + }), z.object({ type: z.literal('log_guild_crash').describe('Request type for logging guild crashes'), guild_id: SnowflakeType.describe('ID of the guild that crashed'), @@ -204,6 +215,20 @@ export const RpcResponseGuildData = z.object({ export type RpcResponseGuildData = z.infer; +export const RpcResponseGuildCollectionData = z.object({ + collection: RpcGuildCollectionType.describe('Guild collection returned in this response'), + guild: GuildResponse.nullish().describe('Guild information'), + roles: z.array(GuildRoleResponse).nullish().describe('List of roles in the guild'), + channels: z.array(ChannelResponse).nullish().describe('List of channels in the guild'), + emojis: z.array(GuildEmojiResponse).nullish().describe('List of custom emojis in the guild'), + stickers: z.array(GuildStickerResponse).nullish().describe('List of custom stickers in the guild'), + members: z.array(GuildMemberResponse).nullish().describe('List of guild members in this chunk'), + has_more: z.boolean().describe('Whether more data is available for this collection'), + next_after_user_id: SnowflakeStringType.nullish().describe('Cursor for the next member chunk'), +}); + +export type RpcResponseGuildCollectionData = z.infer; + export const RpcResponseValidateCustomStatus = z.object({ custom_status: CustomStatusResponse.nullish().describe('Validated custom status or null if invalid'), }); @@ -227,6 +252,10 @@ export const RpcResponse = z.discriminatedUnion('type', [ type: z.literal('guild').describe('Response type for guild data'), data: RpcResponseGuildData.describe('Guild data'), }), + z.object({ + type: z.literal('guild_collection').describe('Response type for guild collection chunks'), + data: RpcResponseGuildCollectionData.describe('Guild collection chunk data'), + }), z.object({ type: z.literal('get_user_guild_settings').describe('Response type for user guild settings'), data: z