diff --git a/fluxer_gateway/src/gateway/gateway_rpc_guild.erl b/fluxer_gateway/src/gateway/gateway_rpc_guild.erl index e8fb1722..b27d6939 100644 --- a/fluxer_gateway/src/gateway/gateway_rpc_guild.erl +++ b/fluxer_gateway/src/gateway/gateway_rpc_guild.erl @@ -36,7 +36,7 @@ execute_method(<<"guild.dispatch">>, #{ with_guild(GuildId, fun(Pid) -> EventAtom = constants:dispatch_event_atom(Event), IsAlive = erlang:is_process_alive(Pid), - logger:info("rpc guild.dispatch: guild_id=~p event=~p pid=~p alive=~p", + logger:debug("rpc guild.dispatch: guild_id=~p event=~p pid=~p alive=~p", [GuildId, EventAtom, Pid, IsAlive]), gen_server:cast(Pid, {dispatch, #{event => EventAtom, data => Data}}), true diff --git a/fluxer_gateway/src/guild/guild.erl b/fluxer_gateway/src/guild/guild.erl index 39592944..a9fa2d72 100644 --- a/fluxer_gateway/src/guild/guild.erl +++ b/fluxer_gateway/src/guild/guild.erl @@ -169,7 +169,7 @@ handle_cast({dispatch, Request}, State) -> PendingCount = maps:fold(fun(_, S, Acc) -> case maps:get(pending_connect, S, false) of true -> Acc + 1; _ -> Acc end end, 0, Sessions), - logger:info("guild dispatch: event=~p guild_id=~p sessions=~p pending=~p", + logger:debug("guild dispatch: event=~p guild_id=~p sessions=~p pending=~p", [Event, maps:get(id, State, unknown), SessionCount, PendingCount]), ParsedEventData = parse_event_data(EventData), {noreply, NewState} = guild_dispatch:handle_dispatch(Event, ParsedEventData, State), diff --git a/fluxer_gateway/src/guild/guild_dispatch.erl b/fluxer_gateway/src/guild/guild_dispatch.erl index 0a0d1762..2dbe672a 100644 --- a/fluxer_gateway/src/guild/guild_dispatch.erl +++ b/fluxer_gateway/src/guild/guild_dispatch.erl @@ -74,7 +74,7 @@ process_dispatch(Event, EventData, State) -> FilteredSessions = filter_sessions_for_event( Event, FinalData, SessionIdOpt, Sessions, FilterState ), - logger:info("process_dispatch: event=~p guild_id=~p total_sessions=~p filtered_sessions=~p", + logger:debug("process_dispatch: event=~p guild_id=~p total_sessions=~p filtered_sessions=~p", [Event, GuildId, map_size(Sessions), length(FilteredSessions)]), DispatchSuccess = dispatch_to_sessions(FilteredSessions, Event, FinalData, UpdatedState), track_dispatch_metrics(Event, DispatchSuccess), @@ -262,7 +262,7 @@ dispatch_bulk_to_session(_, _, _, _, Acc) -> -spec dispatch_standard([session_pair()], event(), event_data(), guild_id(), guild_state()) -> non_neg_integer(). dispatch_standard(FilteredSessions, Event, FinalData, GuildId, State) -> - logger:info("dispatch_standard: event=~p guild_id=~p filtered_sessions=~p member_count=~p", + logger:debug("dispatch_standard: event=~p guild_id=~p filtered_sessions=~p member_count=~p", [Event, GuildId, length(FilteredSessions), maps:get(member_count, State, undefined)]), SuccessCount = lists:foldl( fun({Sid, SessionData}, Acc) -> @@ -281,7 +281,7 @@ dispatch_standard(FilteredSessions, Event, FinalData, GuildId, State) -> _:_ -> Acc end; false -> - logger:info("dispatch_standard skip: sid=~p is_pid=~p passive=~p small=~p", + logger:debug("dispatch_standard skip: sid=~p is_pid=~p passive=~p small=~p", [Sid, is_pid(Pid), session_passive:is_passive(GuildId, SessionData), diff --git a/fluxer_gateway/src/guild/guild_manager.erl b/fluxer_gateway/src/guild/guild_manager.erl index 35dad0fc..eb451daa 100644 --- a/fluxer_gateway/src/guild/guild_manager.erl +++ b/fluxer_gateway/src/guild/guild_manager.erl @@ -20,7 +20,15 @@ -include_lib("fluxer_gateway/include/timeout_config.hrl"). --export([start_link/0, start_or_lookup/1, start_or_lookup/2, lookup/1, lookup/2]). +-export([ + start_link/0, + start_or_lookup/1, + start_or_lookup/2, + lookup/1, + lookup/2, + ensure_started/1, + ensure_started/2 +]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(GUILD_PID_CACHE, guild_pid_cache). @@ -58,6 +66,24 @@ lookup(GuildId, Timeout) -> call_shard(GuildId, {lookup, GuildId}, Timeout) end. +-spec ensure_started(guild_id()) -> ok | {error, term()}. +ensure_started(GuildId) -> + ensure_started(GuildId, ?DEFAULT_GEN_SERVER_TIMEOUT). + +-spec ensure_started(guild_id(), pos_integer()) -> ok | {error, term()}. +ensure_started(GuildId, Timeout) -> + case call_shard(GuildId, {ensure_started, GuildId}, Timeout) of + ok -> + ok; + {ok, GuildPid} when is_pid(GuildPid) -> + ets:insert(?GUILD_PID_CACHE, {GuildId, GuildPid}), + ok; + {error, _} = Error -> + Error; + _ -> + {error, unavailable} + end. + -spec init(list()) -> {ok, state()}. init([]) -> process_flag(trap_exit, true), @@ -76,6 +102,9 @@ handle_call({start_or_lookup, GuildId}, _From, State) -> handle_call({lookup, GuildId}, _From, State) -> {Reply, NewState} = forward_call(GuildId, {lookup, GuildId}, State), {reply, Reply, NewState}; +handle_call({ensure_started, GuildId}, _From, State) -> + {Reply, NewState} = forward_call(GuildId, {ensure_started, GuildId}, State), + {reply, Reply, NewState}; handle_call({stop_guild, GuildId}, _From, State) -> {Reply, NewState} = forward_call(GuildId, {stop_guild, GuildId}, State), {reply, Reply, NewState}; diff --git a/fluxer_gateway/src/guild/guild_manager_shard.erl b/fluxer_gateway/src/guild/guild_manager_shard.erl index 7721bea1..6d39b6aa 100644 --- a/fluxer_gateway/src/guild/guild_manager_shard.erl +++ b/fluxer_gateway/src/guild/guild_manager_shard.erl @@ -57,6 +57,8 @@ handle_call({start_or_lookup, GuildId}, From, State) -> do_start_or_lookup(GuildId, From, State); handle_call({lookup, GuildId}, _From, State) -> do_lookup(GuildId, State); +handle_call({ensure_started, GuildId}, _From, State) -> + do_ensure_started(GuildId, State); handle_call({stop_guild, GuildId}, _From, State) -> do_stop_guild(GuildId, State); handle_call({reload_guild, GuildId}, From, State) -> @@ -134,6 +136,30 @@ do_lookup(GuildId, State) -> end end. +-spec do_ensure_started(guild_id(), state()) -> + {reply, ok | {ok, pid()} | {error, term()}, state()}. +do_ensure_started(GuildId, State) -> + Guilds = maps:get(guilds, State), + case maps:get(GuildId, Guilds, undefined) of + {Pid, _Ref} -> + {reply, {ok, Pid}, State}; + loading -> + {reply, ok, State}; + undefined -> + GuildName = process_registry:build_process_name(guild, GuildId), + case whereis(GuildName) of + undefined -> + {reply, ok, start_fetch_without_pending(GuildId, State)}; + _ExistingPid -> + case process_registry:lookup_or_monitor(GuildName, GuildId, Guilds) of + {ok, Pid, _Ref, NewGuilds} -> + {reply, {ok, Pid}, State#{guilds => NewGuilds}}; + {error, not_found} -> + {reply, {error, process_died}, State} + end + end + end. + -spec lookup_or_fetch(guild_id(), gen_server:from(), state()) -> {reply, {ok, pid()}, state()} | {noreply, state()}. lookup_or_fetch(GuildId, From, State) -> @@ -161,6 +187,14 @@ start_fetch(GuildId, From, State) -> spawn_fetch(GuildId, State), {noreply, NewState}. +-spec start_fetch_without_pending(guild_id(), state()) -> state(). +start_fetch_without_pending(GuildId, State) -> + Guilds = maps:get(guilds, State), + NewGuilds = maps:put(GuildId, loading, Guilds), + NewState = State#{guilds => NewGuilds}, + spawn_fetch(GuildId, State), + NewState. + -spec spawn_fetch(guild_id(), state()) -> pid(). spawn_fetch(GuildId, _State) -> Manager = self(), diff --git a/fluxer_gateway/src/presence/presence_manager.erl b/fluxer_gateway/src/presence/presence_manager.erl index 636a5418..378b9e14 100644 --- a/fluxer_gateway/src/presence/presence_manager.erl +++ b/fluxer_gateway/src/presence/presence_manager.erl @@ -21,9 +21,17 @@ -include_lib("fluxer_gateway/include/timeout_config.hrl"). -define(PID_CACHE_TABLE, presence_pid_cache). +-define(SHARD_TABLE, presence_manager_shard_table). -define(CACHE_TTL_MS, 300000). --export([start_link/0, lookup/1, lookup_async/2, dispatch_to_user/3, terminate_all_sessions/1]). +-export([ + start_link/0, + lookup/1, + lookup_async/2, + start_or_lookup/1, + dispatch_to_user/3, + terminate_all_sessions/1 +]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -type user_id() :: integer(). @@ -44,6 +52,11 @@ lookup(UserId) -> lookup_and_cache(UserId) end. +-spec start_or_lookup(map()) -> {ok, pid()} | {error, term()}. +start_or_lookup(Request) when is_map(Request) -> + UserId = maps:get(user_id, Request, 0), + call_shard(UserId, {start_or_lookup, Request}, ?DEFAULT_GEN_SERVER_TIMEOUT). + -spec lookup_async(user_id(), term()) -> ok. lookup_async(UserId, Message) -> case check_cache(UserId) of @@ -73,35 +86,43 @@ check_cache(UserId) -> -spec lookup_and_cache(user_id()) -> {ok, pid()} | {error, not_found}. lookup_and_cache(UserId) -> - case gen_server:call(?MODULE, {lookup, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT) of + case call_shard(UserId, {lookup, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT) of {ok, Pid} -> ets:insert(?PID_CACHE_TABLE, {UserId, Pid, erlang:monotonic_time(millisecond)}), {ok, Pid}; - {error, not_found} = Error -> - Error + _ -> + {error, not_found} end. -spec lookup_and_cast(user_id(), term()) -> ok. lookup_and_cast(UserId, Message) -> - case gen_server:call(?MODULE, {lookup, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT) of + case call_shard(UserId, {lookup, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT) of {ok, Pid} -> ets:insert(?PID_CACHE_TABLE, {UserId, Pid, erlang:monotonic_time(millisecond)}), gen_server:cast(Pid, Message); - {error, not_found} -> + _ -> ok end. -spec terminate_all_sessions(user_id()) -> ok | {error, term()}. terminate_all_sessions(UserId) -> - gen_server:call(?MODULE, {terminate_all_sessions, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT). + call_shard(UserId, {terminate_all_sessions, UserId}, ?DEFAULT_GEN_SERVER_TIMEOUT). -spec dispatch_to_user(user_id(), event_type(), term()) -> ok | {error, not_found}. dispatch_to_user(UserId, Event, Data) -> - gen_server:call(?MODULE, {dispatch, UserId, Event, Data}, ?DEFAULT_GEN_SERVER_TIMEOUT). + case call_shard(UserId, {dispatch, UserId, Event, Data}, ?DEFAULT_GEN_SERVER_TIMEOUT) of + ok -> + ok; + {error, not_found} -> + {error, not_found}; + _ -> + {error, not_found} + end. -spec init(list()) -> {ok, state()}. init([]) -> process_flag(trap_exit, true), + ensure_shard_table(), ets:new(?PID_CACHE_TABLE, [named_table, public, set]), {ShardCount, _Source} = determine_shard_count(), {ShardMap, _} = lists:foldl( @@ -116,7 +137,9 @@ init([]) -> {#{}, 0}, lists:seq(0, ShardCount - 1) ), - {ok, #{shards => ShardMap, shard_count => ShardCount}}. + State = #{shards => ShardMap, shard_count => ShardCount}, + sync_shard_table(State), + {ok, State}. -spec handle_call(term(), gen_server:from(), state()) -> {reply, term(), state()}. handle_call({lookup, UserId}, _From, State) -> @@ -171,6 +194,7 @@ handle_info(_Info, State) -> -spec terminate(term(), state()) -> ok. terminate(_Reason, State) -> + catch ets:delete(?SHARD_TABLE), catch ets:delete(?PID_CACHE_TABLE), Shards = maps:get(shards, State), lists:foreach( @@ -183,6 +207,7 @@ terminate(_Reason, State) -> -spec code_change(term(), term(), term()) -> {ok, state()}. code_change(_OldVsn, State, _Extra) when is_map(State) -> + sync_shard_table(State), {ok, State}; code_change(_OldVsn, {state, Shards, ShardCount}, _Extra) -> ConvertedShards = maps:map( @@ -191,7 +216,9 @@ code_change(_OldVsn, {state, Shards, ShardCount}, _Extra) -> end, Shards ), - {ok, #{shards => ConvertedShards, shard_count => ShardCount}}. + ConvertedState = #{shards => ConvertedShards, shard_count => ShardCount}, + sync_shard_table(ConvertedState), + {ok, ConvertedState}. -spec determine_shard_count() -> {pos_integer(), configured | auto}. determine_shard_count() -> @@ -207,6 +234,7 @@ start_shard(Index) -> case presence_manager_shard:start_link(Index) of {ok, Pid} -> Ref = erlang:monitor(process, Pid), + put_shard_pid(Index, Pid), {ok, #{pid => Pid, ref => Ref}}; Error -> Error @@ -218,12 +246,41 @@ restart_shard(Index, State) -> {ok, Shard} -> Shards = maps:get(shards, State), Updated = State#{shards := maps:put(Index, Shard, Shards)}, + sync_shard_table(Updated), {Shard, Updated}; {error, _Reason} -> + clear_shard_pid(Index), Dummy = #{pid => spawn(fun() -> exit(normal) end), ref => make_ref()}, {Dummy, State} end. +-spec call_shard(user_id(), term(), pos_integer()) -> term(). +call_shard(Key, Request, Timeout) -> + case shard_pid_from_table(Key) of + {ok, Pid} -> + case catch gen_server:call(Pid, Request, Timeout) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', _} -> + call_via_manager(Request, Timeout); + Reply -> + Reply + end; + error -> + call_via_manager(Request, Timeout) + end. + +-spec call_via_manager(term(), pos_integer()) -> term(). +call_via_manager(Request, Timeout) -> + case catch gen_server:call(?MODULE, Request, Timeout + 1000) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', _} -> + {error, unavailable}; + Reply -> + Reply + end. + -spec forward_call(user_id(), term(), state()) -> {term(), state()}. forward_call(Key, Request, State) -> {ShardIndex, State1} = ensure_shard(Key, State), @@ -279,6 +336,72 @@ select_shard(Key, Count) when Count > 0 -> extract_user_id({start_or_lookup, #{user_id := UserId}}) -> UserId; extract_user_id(_) -> 0. +-spec ensure_shard_table() -> ok. +ensure_shard_table() -> + case ets:whereis(?SHARD_TABLE) of + undefined -> + _ = ets:new(?SHARD_TABLE, [named_table, public, set, {read_concurrency, true}]), + ok; + _ -> + ok + end. + +-spec sync_shard_table(state()) -> ok. +sync_shard_table(State) -> + ensure_shard_table(), + _ = ets:delete_all_objects(?SHARD_TABLE), + ShardCount = maps:get(shard_count, State), + ets:insert(?SHARD_TABLE, {shard_count, ShardCount}), + Shards = maps:get(shards, State), + lists:foreach( + fun({Index, #{pid := Pid}}) -> + put_shard_pid(Index, Pid) + end, + maps:to_list(Shards) + ), + ok. + +-spec put_shard_pid(non_neg_integer(), pid()) -> ok. +put_shard_pid(Index, Pid) -> + ensure_shard_table(), + ets:insert(?SHARD_TABLE, {{shard_pid, Index}, Pid}), + ok. + +-spec clear_shard_pid(non_neg_integer()) -> ok. +clear_shard_pid(Index) -> + try ets:delete(?SHARD_TABLE, {shard_pid, Index}) of + _ -> + ok + catch + error:badarg -> + ok + end. + +-spec shard_pid_from_table(user_id()) -> {ok, pid()} | error. +shard_pid_from_table(Key) -> + try + case ets:lookup(?SHARD_TABLE, shard_count) of + [{shard_count, ShardCount}] when is_integer(ShardCount), ShardCount > 0 -> + Index = select_shard(Key, ShardCount), + case ets:lookup(?SHARD_TABLE, {shard_pid, Index}) of + [{{shard_pid, Index}, Pid}] when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> + {ok, Pid}; + false -> + error + end; + _ -> + error + end; + _ -> + error + end + catch + error:badarg -> + error + end. + -spec find_shard_by_ref(reference(), #{non_neg_integer() => shard()}) -> {ok, non_neg_integer()} | not_found. find_shard_by_ref(Ref, Shards) -> diff --git a/fluxer_gateway/src/session/session_connection.erl b/fluxer_gateway/src/session/session_connection.erl index c7c1318d..39a227f2 100644 --- a/fluxer_gateway/src/session/session_connection.erl +++ b/fluxer_gateway/src/session/session_connection.erl @@ -29,6 +29,8 @@ -define(MAX_RETRY_ATTEMPTS, 25). -define(MAX_CALL_RETRY_ATTEMPTS, 15). -define(GUILD_CONNECT_ASYNC_TIMEOUT_MS, 30000). +-define(GUILD_MANAGER_START_TIMEOUT_MS, 20000). +-define(GUILD_MANAGER_LOOKUP_FALLBACK_TIMEOUT_MS, 200). -define(MAX_GUILD_UNAVAILABLE_RETRY_DELAY_MS, 30000). -define(MAX_GUILD_UNAVAILABLE_BACKOFF_ATTEMPT, 5). -define(GUILD_UNAVAILABLE_JITTER_DIVISOR, 5). @@ -58,17 +60,16 @@ handle_presence_connect(Attempt, State) -> SocketPid = maps:get(socket_pid, State, undefined), FriendIds = presence_targets:friend_ids_from_state(State), GroupDmRecipients = presence_targets:group_dm_recipients_from_state(State), - Message = - {start_or_lookup, #{ - user_id => UserId, - user_data => UserData, - guild_ids => maps:keys(Guilds), - status => Status, - friend_ids => FriendIds, - group_dm_recipients => GroupDmRecipients, - custom_status => maps:get(custom_status, State, null) - }}, - try gen_server:call(presence_manager, Message, 5000) of + Request = #{ + user_id => UserId, + user_data => UserData, + guild_ids => maps:keys(Guilds), + status => Status, + friend_ids => FriendIds, + group_dm_recipients => GroupDmRecipients, + custom_status => maps:get(custom_status, State, null) + }, + try presence_manager:start_or_lookup(Request) of {ok, Pid} -> try_presence_session_connect( Pid, @@ -288,34 +289,27 @@ maybe_spawn_guild_connect(GuildId, Attempt, SessionId, UserId, State) -> do_guild_connect(SessionPid, GuildId, Attempt, SessionId, UserId, Bot, InitialGuildId, UserData) -> Result = try - case guild_manager:start_or_lookup(GuildId) of + case guild_manager:lookup(GuildId, ?GUILD_MANAGER_LOOKUP_FALLBACK_TIMEOUT_MS) of {ok, GuildPid} -> - case maybe_build_unavailable_response_from_cache(GuildId, UserData) of - {ok, UnavailableResponse} -> - {ok_cached_unavailable, UnavailableResponse}; - not_unavailable -> - ActiveGuilds = build_initial_active_guilds(InitialGuildId, GuildId), - IsStaff = maps:get(<<"is_staff">>, UserData, false), - Request = #{ - session_id => SessionId, - user_id => UserId, - session_pid => SessionPid, - bot => Bot, - is_staff => IsStaff, - initial_guild_id => InitialGuildId, - active_guilds => ActiveGuilds - }, - gen_server:cast(GuildPid, {session_connect_async, #{ - guild_id => GuildId, - attempt => Attempt, - request => Request - }}), - _ = erlang:send_after( - ?GUILD_CONNECT_ASYNC_TIMEOUT_MS, - SessionPid, - {guild_connect_timeout, GuildId, Attempt} - ), - pending + start_guild_session_connect_async( + GuildPid, + SessionPid, + GuildId, + Attempt, + SessionId, + UserId, + Bot, + InitialGuildId, + UserData + ); + {error, not_found} -> + case guild_manager:ensure_started(GuildId, ?GUILD_MANAGER_START_TIMEOUT_MS) of + ok -> + {error, {guild_manager_failed, {error, loading}}}; + {error, timeout} -> + {error, {guild_manager_failed, {error, timeout}}}; + {error, EnsureReason} -> + {error, {guild_manager_failed, {error, EnsureReason}}} end; Error -> {error, {guild_manager_failed, Error}} @@ -336,6 +330,41 @@ do_guild_connect(SessionPid, GuildId, Attempt, SessionId, UserId, Bot, InitialGu end, ok. +-spec start_guild_session_connect_async( + pid(), pid(), guild_id(), attempt(), binary(), integer(), boolean(), guild_id() | undefined, map() +) -> + pending | {ok_cached_unavailable, map()}. +start_guild_session_connect_async( + GuildPid, SessionPid, GuildId, Attempt, SessionId, UserId, Bot, InitialGuildId, UserData +) -> + case maybe_build_unavailable_response_from_cache(GuildId, UserData) of + {ok, UnavailableResponse} -> + {ok_cached_unavailable, UnavailableResponse}; + not_unavailable -> + ActiveGuilds = build_initial_active_guilds(InitialGuildId, GuildId), + IsStaff = maps:get(<<"is_staff">>, UserData, false), + Request = #{ + session_id => SessionId, + user_id => UserId, + session_pid => SessionPid, + bot => Bot, + is_staff => IsStaff, + initial_guild_id => InitialGuildId, + active_guilds => ActiveGuilds + }, + gen_server:cast(GuildPid, {session_connect_async, #{ + guild_id => GuildId, + attempt => Attempt, + request => Request + }}), + _ = erlang:send_after( + ?GUILD_CONNECT_ASYNC_TIMEOUT_MS, + SessionPid, + {guild_connect_timeout, GuildId, Attempt} + ), + pending + end. + -spec maybe_build_unavailable_response_from_cache(guild_id(), map()) -> {ok, map()} | not_unavailable. maybe_build_unavailable_response_from_cache(GuildId, UserData) -> @@ -374,6 +403,50 @@ handle_guild_connect_result_internal(GuildId, Attempt, {error, {session_connect_ [GuildId, UserId, Attempt, Reason] ), retry_or_fail(GuildId, Attempt, State, fun(_GId, St) -> {noreply, St} end); +handle_guild_connect_result_internal( + GuildId, + Attempt, + {error, {guild_manager_failed, {error, timeout}}}, + State +) -> + UserId = maps:get(user_id, State), + case Attempt of + 0 -> + logger:debug( + "guild_connect_deferred_timeout: guild_id=~p user_id=~p attempt=~p", + [GuildId, UserId, Attempt] + ); + _ when Attempt rem 5 =:= 0 -> + logger:debug( + "guild_connect_deferred_timeout: guild_id=~p user_id=~p attempt=~p", + [GuildId, UserId, Attempt] + ); + _ -> + ok + end, + retry_timeout_without_penalty(GuildId, Attempt, State); +handle_guild_connect_result_internal( + GuildId, + Attempt, + {error, {guild_manager_failed, {error, loading}}}, + State +) -> + UserId = maps:get(user_id, State), + case Attempt of + 0 -> + logger:debug( + "guild_connect_deferred_loading: guild_id=~p user_id=~p attempt=~p", + [GuildId, UserId, Attempt] + ); + _ when Attempt rem 5 =:= 0 -> + logger:debug( + "guild_connect_deferred_loading: guild_id=~p user_id=~p attempt=~p", + [GuildId, UserId, Attempt] + ); + _ -> + ok + end, + retry_timeout_without_penalty(GuildId, Attempt, State); handle_guild_connect_result_internal(GuildId, Attempt, {error, Reason}, State) -> UserId = maps:get(user_id, State), logger:warning( @@ -384,6 +457,14 @@ handle_guild_connect_result_internal(GuildId, Attempt, {error, Reason}, State) - session_ready:mark_guild_unavailable(GId, St) end). +-spec retry_timeout_without_penalty(guild_id(), attempt(), session_state()) -> + {noreply, session_state()}. +retry_timeout_without_penalty(GuildId, Attempt, State) -> + NextAttempt = min(Attempt + 1, 4), + DelayMs = backoff_utils:calculate(NextAttempt), + erlang:send_after(DelayMs, self(), {guild_connect, GuildId, NextAttempt}), + {noreply, State}. + -spec finalize_guild_connection(guild_id(), pid(), session_state(), fun( (session_state()) -> {noreply, session_state()} )) -> @@ -732,6 +813,12 @@ manager_stub_loop(GuildId, GuildPid) -> receive stop -> ok; + {'$gen_call', From, {lookup, GuildId}} -> + gen_server:reply(From, {ok, GuildPid}), + manager_stub_loop(GuildId, GuildPid); + {'$gen_call', From, {ensure_started, GuildId}} -> + gen_server:reply(From, ok), + manager_stub_loop(GuildId, GuildPid); {'$gen_call', From, {start_or_lookup, GuildId}} -> gen_server:reply(From, {ok, GuildPid}), manager_stub_loop(GuildId, GuildPid);