From 261d4bd84cf1cde18c06b695fccd8595f16a77a5 Mon Sep 17 00:00:00 2001 From: Hampus Kraft Date: Wed, 18 Feb 2026 17:27:51 +0000 Subject: [PATCH] fix: eliminate more gateway bottlenecks --- .../src/gateway/gateway_rpc_guild.erl | 8 +- fluxer_gateway/src/guild/guild_manager.erl | 233 ++++++++++++++++-- .../src/guild/guild_manager_shard.erl | 40 +++ .../src/guild/guild_request_members.erl | 2 +- .../src/session/session_connection.erl | 2 +- 5 files changed, 262 insertions(+), 23 deletions(-) diff --git a/fluxer_gateway/src/gateway/gateway_rpc_guild.erl b/fluxer_gateway/src/gateway/gateway_rpc_guild.erl index c78d1a17..b80e141e 100644 --- a/fluxer_gateway/src/gateway/gateway_rpc_guild.erl +++ b/fluxer_gateway/src/gateway/gateway_rpc_guild.erl @@ -124,7 +124,7 @@ execute_method(<<"guild.list_members_cursor">>, Request) -> end); execute_method(<<"guild.start">>, #{<<"guild_id">> := GuildIdBin}) -> GuildId = validation:snowflake_or_throw(<<"guild_id">>, GuildIdBin), - case gen_server:call(guild_manager, {start_or_lookup, GuildId}, ?GUILD_LOOKUP_TIMEOUT) of + case guild_manager:start_or_lookup(GuildId, ?GUILD_LOOKUP_TIMEOUT) of {ok, _Pid} -> true; {error, Reason} -> throw({error, <<"guild_start_error:", (error_term_to_binary(Reason))/binary>>}); @@ -143,7 +143,7 @@ execute_method(<<"guild.reload">>, #{<<"guild_id">> := GuildIdBin}) -> ok -> true; {error, not_found} -> - case gen_server:call(guild_manager, {start_or_lookup, GuildId}, 20000) of + case guild_manager:start_or_lookup(GuildId, 20000) of {ok, _Pid} -> true; _ -> throw({error, <<"guild_reload_error">>}) end; @@ -717,7 +717,7 @@ lookup_guild_pid_from_cache(GuildId) -> -spec lookup_guild_pid_from_manager(integer()) -> {ok, pid()} | error. lookup_guild_pid_from_manager(GuildId) -> - case gen_server:call(guild_manager, {start_or_lookup, GuildId}, ?GUILD_LOOKUP_TIMEOUT) of + case guild_manager:start_or_lookup(GuildId, ?GUILD_LOOKUP_TIMEOUT) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; _ -> @@ -962,7 +962,7 @@ parse_voice_update( -spec process_voice_update({integer(), integer(), boolean(), boolean(), term()}) -> map(). process_voice_update({GuildId, UserId, Mute, Deaf, ConnectionId}) -> - case gen_server:call(guild_manager, {start_or_lookup, GuildId}, ?GUILD_LOOKUP_TIMEOUT) of + case guild_manager:start_or_lookup(GuildId, ?GUILD_LOOKUP_TIMEOUT) of {ok, GuildPid} -> VoicePid = resolve_voice_pid(GuildId, GuildPid), Request = #{ diff --git a/fluxer_gateway/src/guild/guild_manager.erl b/fluxer_gateway/src/guild/guild_manager.erl index 97d54a6f..35dad0fc 100644 --- a/fluxer_gateway/src/guild/guild_manager.erl +++ b/fluxer_gateway/src/guild/guild_manager.erl @@ -20,10 +20,11 @@ -include_lib("fluxer_gateway/include/timeout_config.hrl"). --export([start_link/0]). +-export([start_link/0, start_or_lookup/1, start_or_lookup/2, lookup/1, lookup/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(GUILD_PID_CACHE, guild_pid_cache). +-define(SHARD_TABLE, guild_manager_shard_table). -type guild_id() :: integer(). -type shard_map() :: #{pid := pid(), ref := reference()}. @@ -36,18 +37,45 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec start_or_lookup(guild_id()) -> {ok, pid()} | {error, term()}. +start_or_lookup(GuildId) -> + start_or_lookup(GuildId, ?DEFAULT_GEN_SERVER_TIMEOUT). + +-spec start_or_lookup(guild_id(), pos_integer()) -> {ok, pid()} | {error, term()}. +start_or_lookup(GuildId, Timeout) -> + call_shard(GuildId, {start_or_lookup, GuildId}, Timeout). + +-spec lookup(guild_id()) -> {ok, pid()} | {error, term()}. +lookup(GuildId) -> + lookup(GuildId, ?DEFAULT_GEN_SERVER_TIMEOUT). + +-spec lookup(guild_id(), pos_integer()) -> {ok, pid()} | {error, term()}. +lookup(GuildId, Timeout) -> + case lookup_cached_guild_pid(GuildId) of + {ok, GuildPid} -> + {ok, GuildPid}; + not_found -> + call_shard(GuildId, {lookup, GuildId}, Timeout) + end. + -spec init(list()) -> {ok, state()}. init([]) -> process_flag(trap_exit, true), + ensure_shard_table(), ets:new(?GUILD_PID_CACHE, [named_table, public, set, {read_concurrency, true}]), {ShardCount, _Source} = determine_shard_count(), ShardMap = start_shards(ShardCount), - {ok, #{shards => ShardMap, shard_count => ShardCount}}. + State = #{shards => ShardMap, shard_count => ShardCount}, + sync_shard_table(State), + {ok, State}. -spec handle_call(term(), gen_server:from(), state()) -> {reply, term(), state()}. handle_call({start_or_lookup, GuildId}, _From, State) -> {Reply, NewState} = forward_call(GuildId, {start_or_lookup, GuildId}, State), {reply, Reply, NewState}; +handle_call({lookup, GuildId}, _From, State) -> + {Reply, NewState} = forward_call(GuildId, {lookup, GuildId}, State), + {reply, Reply, NewState}; handle_call({stop_guild, GuildId}, _From, State) -> {Reply, NewState} = forward_call(GuildId, {stop_guild, GuildId}, State), {reply, Reply, NewState}; @@ -106,11 +134,13 @@ terminate(_Reason, State) -> end, maps:values(Shards) ), + catch ets:delete(?SHARD_TABLE), catch ets:delete(?GUILD_PID_CACHE), ok. -spec code_change(term(), term(), term()) -> {ok, state()}. code_change(_OldVsn, State, _Extra) when is_map(State) -> + sync_shard_table(State), {ok, State}. -spec determine_shard_count() -> {pos_integer(), configured | auto}. @@ -150,6 +180,7 @@ start_shard(Index) -> case guild_manager_shard:start_link(Index) of {ok, Pid} -> Ref = erlang:monitor(process, Pid), + put_shard_pid(Index, Pid), {ok, #{pid => Pid, ref => Ref}}; Error -> Error @@ -161,25 +192,48 @@ restart_shard(Index, State) -> case start_shard(Index) of {ok, Shard} -> Updated = State#{shards => maps:put(Index, Shard, Shards)}, + sync_shard_table(Updated), {Shard, Updated}; {error, _Reason} -> + clear_shard_pid(Index), DummyPid = spawn(fun() -> ok end), Dummy = #{pid => DummyPid, ref => make_ref()}, {Dummy, State} end. +-spec call_shard(guild_id(), term(), pos_integer()) -> term(). +call_shard(GuildId, Request, Timeout) -> + case shard_pid_from_table(GuildId) of + {ok, Pid} -> + case catch gen_server:call(Pid, Request, Timeout) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', _} -> + call_via_manager(Request, Timeout); + Reply -> + maybe_cache_guild_pid(GuildId, Request, Reply) + end; + error -> + call_via_manager(Request, Timeout) + end. + +-spec call_via_manager(term(), pos_integer()) -> term(). +call_via_manager(Request, Timeout) -> + gen_server:call(?MODULE, Request, Timeout + 1000). + -spec forward_call(guild_id(), term(), state()) -> {term(), state()}. forward_call(GuildId, {start_or_lookup, _} = Request, State) -> - case ets:lookup(?GUILD_PID_CACHE, GuildId) of - [{GuildId, GuildPid}] when is_pid(GuildPid) -> - case erlang:is_process_alive(GuildPid) of - true -> - {{ok, GuildPid}, State}; - false -> - ets:delete(?GUILD_PID_CACHE, GuildId), - forward_call_to_shard(GuildId, Request, State) - end; - [] -> + case lookup_cached_guild_pid(GuildId) of + {ok, GuildPid} -> + {{ok, GuildPid}, State}; + not_found -> + forward_call_to_shard(GuildId, Request, State) + end; +forward_call(GuildId, {lookup, _} = Request, State) -> + case lookup_cached_guild_pid(GuildId) of + {ok, GuildPid} -> + {{ok, GuildPid}, State}; + not_found -> forward_call_to_shard(GuildId, Request, State) end; forward_call(GuildId, Request, State) -> @@ -200,12 +254,8 @@ forward_call_to_shard(GuildId, Request, State) -> {_Shard, State2} = restart_shard(Index, State1), forward_call_to_shard(GuildId, Request, State2) end; - {ok, GuildPid} = Reply -> - ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}), - erlang:monitor(process, GuildPid), - {Reply, State1}; Reply -> - {Reply, State1} + {maybe_cache_guild_pid(GuildId, Request, Reply), State1} end. -spec ensure_shard(guild_id(), state()) -> {non_neg_integer(), state()}. @@ -290,6 +340,99 @@ handle_reload_all(GuildIds, State) -> group_ids_by_shard(GuildIds, ShardCount) -> rendezvous_router:group_keys(GuildIds, ShardCount). +-spec ensure_shard_table() -> ok. +ensure_shard_table() -> + case ets:whereis(?SHARD_TABLE) of + undefined -> + _ = ets:new(?SHARD_TABLE, [named_table, public, set, {read_concurrency, true}]), + ok; + _ -> + ok + end. + +-spec sync_shard_table(state()) -> ok. +sync_shard_table(State) -> + ensure_shard_table(), + _ = ets:delete_all_objects(?SHARD_TABLE), + ShardCount = maps:get(shard_count, State), + ets:insert(?SHARD_TABLE, {shard_count, ShardCount}), + Shards = maps:get(shards, State), + lists:foreach( + fun({Index, #{pid := Pid}}) -> + put_shard_pid(Index, Pid) + end, + maps:to_list(Shards) + ), + ok. + +-spec put_shard_pid(non_neg_integer(), pid()) -> ok. +put_shard_pid(Index, Pid) -> + ensure_shard_table(), + ets:insert(?SHARD_TABLE, {{shard_pid, Index}, Pid}), + ok. + +-spec clear_shard_pid(non_neg_integer()) -> ok. +clear_shard_pid(Index) -> + try ets:delete(?SHARD_TABLE, {shard_pid, Index}) of + _ -> + ok + catch + error:badarg -> + ok + end. + +-spec shard_pid_from_table(guild_id()) -> {ok, pid()} | error. +shard_pid_from_table(GuildId) -> + try + case ets:lookup(?SHARD_TABLE, shard_count) of + [{shard_count, ShardCount}] when is_integer(ShardCount), ShardCount > 0 -> + Index = select_shard(GuildId, ShardCount), + case ets:lookup(?SHARD_TABLE, {shard_pid, Index}) of + [{{shard_pid, Index}, Pid}] when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> + {ok, Pid}; + false -> + error + end; + _ -> + error + end; + _ -> + error + end + catch + error:badarg -> + error + end. + +-spec lookup_cached_guild_pid(guild_id()) -> {ok, pid()} | not_found. +lookup_cached_guild_pid(GuildId) -> + case catch ets:lookup(?GUILD_PID_CACHE, GuildId) of + [{GuildId, GuildPid}] when is_pid(GuildPid) -> + case erlang:is_process_alive(GuildPid) of + true -> + {ok, GuildPid}; + false -> + ets:delete(?GUILD_PID_CACHE, GuildId), + not_found + end; + _ -> + not_found + end. + +-spec maybe_cache_guild_pid(guild_id(), term(), term()) -> term(). +maybe_cache_guild_pid(GuildId, {start_or_lookup, GuildId}, {ok, GuildPid} = Reply) + when is_pid(GuildPid) +-> + ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}), + Reply; +maybe_cache_guild_pid(GuildId, {lookup, GuildId}, {ok, GuildPid} = Reply) when is_pid(GuildPid) -> + ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}), + Reply; +maybe_cache_guild_pid(_GuildId, _Request, Reply) -> + Reply. + -spec find_shard_by_ref(reference(), #{non_neg_integer() => shard_map()}) -> {ok, non_neg_integer()} | not_found. find_shard_by_ref(Ref, Shards) -> @@ -422,4 +565,60 @@ cleanup_guild_from_cache_does_not_remove_new_pid_test() -> catch ets:delete(guild_pid_cache) end. +start_or_lookup_uses_shard_table_without_manager_test_() -> + {timeout, 10, fun() -> + catch ets:delete(guild_pid_cache), + catch ets:delete(guild_manager_shard_table), + ets:new(guild_pid_cache, [named_table, public, set, {read_concurrency, true}]), + ets:new(guild_manager_shard_table, [named_table, public, set, {read_concurrency, true}]), + GuildId = 101, + GuildPid = spawn(fun() -> timer:sleep(1000) end), + ShardPid = spawn(fun() -> shard_stub_loop(GuildId, GuildPid) end), + ets:insert(guild_manager_shard_table, {shard_count, 1}), + ets:insert(guild_manager_shard_table, {{shard_pid, 0}, ShardPid}), + try + ?assertEqual({ok, GuildPid}, start_or_lookup(GuildId)) + after + ShardPid ! stop, + catch ets:delete(guild_manager_shard_table), + catch ets:delete(guild_pid_cache) + end + end}. + +call_shard_timeout_returns_error_timeout_test_() -> + {timeout, 10, fun() -> + catch ets:delete(guild_pid_cache), + catch ets:delete(guild_manager_shard_table), + ets:new(guild_pid_cache, [named_table, public, set, {read_concurrency, true}]), + ets:new(guild_manager_shard_table, [named_table, public, set, {read_concurrency, true}]), + GuildId = 202, + SlowShardPid = spawn(fun() -> slow_shard_loop() end), + ets:insert(guild_manager_shard_table, {shard_count, 1}), + ets:insert(guild_manager_shard_table, {{shard_pid, 0}, SlowShardPid}), + try + ?assertEqual({error, timeout}, call_shard(GuildId, {start_or_lookup, GuildId}, 20)) + after + SlowShardPid ! stop, + catch ets:delete(guild_manager_shard_table), + catch ets:delete(guild_pid_cache) + end + end}. + +shard_stub_loop(GuildId, GuildPid) -> + receive + stop -> + ok; + {'$gen_call', From, {start_or_lookup, GuildId}} -> + gen_server:reply(From, {ok, GuildPid}), + shard_stub_loop(GuildId, GuildPid); + {'$gen_call', From, {lookup, GuildId}} -> + gen_server:reply(From, {ok, GuildPid}), + shard_stub_loop(GuildId, GuildPid); + {'$gen_call', From, _Request} -> + gen_server:reply(From, {error, unsupported}), + shard_stub_loop(GuildId, GuildPid); + _ -> + shard_stub_loop(GuildId, GuildPid) + end. + -endif. diff --git a/fluxer_gateway/src/guild/guild_manager_shard.erl b/fluxer_gateway/src/guild/guild_manager_shard.erl index 97f14d3a..7721bea1 100644 --- a/fluxer_gateway/src/guild/guild_manager_shard.erl +++ b/fluxer_gateway/src/guild/guild_manager_shard.erl @@ -55,6 +55,8 @@ init(Args) -> {reply, term(), state()} | {noreply, state()}. handle_call({start_or_lookup, GuildId}, From, State) -> do_start_or_lookup(GuildId, From, State); +handle_call({lookup, GuildId}, _From, State) -> + do_lookup(GuildId, State); handle_call({stop_guild, GuildId}, _From, State) -> do_stop_guild(GuildId, State); handle_call({reload_guild, GuildId}, From, State) -> @@ -114,6 +116,24 @@ do_start_or_lookup(GuildId, From, State) -> lookup_or_fetch(GuildId, From, State) end. +-spec do_lookup(guild_id(), state()) -> {reply, {ok, pid()} | {error, not_found}, state()}. +do_lookup(GuildId, State) -> + Guilds = maps:get(guilds, State), + case maps:get(GuildId, Guilds, undefined) of + {Pid, _Ref} -> + {reply, {ok, Pid}, State}; + loading -> + {reply, {error, not_found}, State}; + undefined -> + GuildName = process_registry:build_process_name(guild, GuildId), + case process_registry:lookup_or_monitor(GuildName, GuildId, Guilds) of + {ok, Pid, _Ref, NewGuilds} -> + {reply, {ok, Pid}, State#{guilds => NewGuilds}}; + {error, not_found} -> + {reply, {error, not_found}, State} + end + end. + -spec lookup_or_fetch(guild_id(), gen_server:from(), state()) -> {reply, {ok, pid()}, state()} | {noreply, state()}. lookup_or_fetch(GuildId, From, State) -> @@ -458,6 +478,26 @@ do_start_or_lookup_loading_deduplicates_requests_test() -> ?assert(lists:member(From1, Requests)), ?assert(lists:member(From2, Requests)). +do_lookup_returns_existing_pid_from_state_test() -> + GuildId = 5151, + GuildPid = self(), + State0 = #{ + guilds => #{GuildId => {GuildPid, make_ref()}}, + pending_requests => #{}, + shard_index => 0 + }, + {reply, {ok, GuildPid}, State1} = do_lookup(GuildId, State0), + ?assertEqual(State0, State1). + +do_lookup_returns_not_found_when_loading_test() -> + GuildId = 6161, + State0 = #{ + guilds => #{GuildId => loading}, + pending_requests => #{}, + shard_index => 0 + }, + ?assertEqual({reply, {error, not_found}, State0}, do_lookup(GuildId, State0)). + start_new_guild_skips_start_when_already_registered_test() -> GuildId = 77777, GuildName = process_registry:build_process_name(guild, GuildId), diff --git a/fluxer_gateway/src/guild/guild_request_members.erl b/fluxer_gateway/src/guild/guild_request_members.erl index c6dae30c..db1f35a9 100644 --- a/fluxer_gateway/src/guild/guild_request_members.erl +++ b/fluxer_gateway/src/guild/guild_request_members.erl @@ -184,7 +184,7 @@ lookup_guild(GuildId, SessionState) -> {Pid, _Ref} when is_pid(Pid) -> {ok, Pid}; undefined -> - case gen_server:call(guild_manager, {lookup, GuildId}, 5000) of + case guild_manager:lookup(GuildId) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; _ -> {error, not_found} end; diff --git a/fluxer_gateway/src/session/session_connection.erl b/fluxer_gateway/src/session/session_connection.erl index ed0141a5..c7c1318d 100644 --- a/fluxer_gateway/src/session/session_connection.erl +++ b/fluxer_gateway/src/session/session_connection.erl @@ -288,7 +288,7 @@ maybe_spawn_guild_connect(GuildId, Attempt, SessionId, UserId, State) -> do_guild_connect(SessionPid, GuildId, Attempt, SessionId, UserId, Bot, InitialGuildId, UserData) -> Result = try - case gen_server:call(guild_manager, {start_or_lookup, GuildId}, 5000) of + case guild_manager:start_or_lookup(GuildId) of {ok, GuildPid} -> case maybe_build_unavailable_response_from_cache(GuildId, UserData) of {ok, UnavailableResponse} ->