diff --git a/fluxer_gateway/src/guild/guild_client.erl b/fluxer_gateway/src/guild/guild_client.erl index 600a3299..1e7b3f4c 100644 --- a/fluxer_gateway/src/guild/guild_client.erl +++ b/fluxer_gateway/src/guild/guild_client.erl @@ -136,7 +136,7 @@ update_circuit_state(GuildPid, Result, PrevState) -> IsSuccess = is_success_result(Result), case {IsSuccess, PrevState} of {true, half_open} -> - ets:delete(?CIRCUIT_BREAKER_TABLE, GuildPid), + safe_delete(GuildPid), ok; {true, closed} -> reset_failures(GuildPid); @@ -152,7 +152,7 @@ is_success_result(_) -> false. reset_failures(GuildPid) -> case safe_lookup(GuildPid) of [{_, State}] -> - ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State#{failures => 0}}), + safe_insert(GuildPid, State#{failures => 0}), ok; [] -> ok @@ -163,27 +163,21 @@ record_failure(GuildPid) -> Now = erlang:system_time(millisecond), case safe_lookup(GuildPid) of [] -> - ets:insert( - ?CIRCUIT_BREAKER_TABLE, - {GuildPid, #{ - state => closed, - failures => 1, - concurrent => 0 - }} - ), + safe_insert(GuildPid, #{ + state => closed, + failures => 1, + concurrent => 0 + }), ok; [{_, #{failures := F} = State}] when F + 1 >= ?FAILURE_THRESHOLD -> - ets:insert( - ?CIRCUIT_BREAKER_TABLE, - {GuildPid, State#{ - state => open, - failures => F + 1, - opened_at => Now - }} - ), + safe_insert(GuildPid, State#{ + state => open, + failures => F + 1, + opened_at => Now + }), ok; [{_, #{failures := F} = State}] -> - ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State#{failures => F + 1}}), + safe_insert(GuildPid, State#{failures => F + 1}), ok end. @@ -191,19 +185,16 @@ record_failure(GuildPid) -> acquire_slot(GuildPid) -> case safe_lookup(GuildPid) of [] -> - ets:insert( - ?CIRCUIT_BREAKER_TABLE, - {GuildPid, #{ - state => closed, - failures => 0, - concurrent => 1 - }} - ), + safe_insert(GuildPid, #{ + state => closed, + failures => 0, + concurrent => 1 + }), ok; [{_, #{concurrent := C}}] when C >= ?MAX_CONCURRENT -> {error, too_many_requests}; [{_, #{concurrent := C} = State}] -> - ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State#{concurrent => C + 1}}), + safe_insert(GuildPid, State#{concurrent => C + 1}), ok end. @@ -211,12 +202,36 @@ acquire_slot(GuildPid) -> release_slot(GuildPid) -> case safe_lookup(GuildPid) of [{_, #{concurrent := C} = State}] when C > 0 -> - ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State#{concurrent => C - 1}}), + safe_insert(GuildPid, State#{concurrent => C - 1}), ok; _ -> ok end. +-spec safe_insert(pid(), map()) -> ok. +safe_insert(GuildPid, State) -> + ensure_table(), + try ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State}) of + true -> ok + catch + error:badarg -> + ensure_table(), + try ets:insert(?CIRCUIT_BREAKER_TABLE, {GuildPid, State}) of + true -> ok + catch + error:badarg -> ok + end + end. + +-spec safe_delete(pid()) -> ok. +safe_delete(GuildPid) -> + ensure_table(), + try ets:delete(?CIRCUIT_BREAKER_TABLE, GuildPid) of + true -> ok + catch + error:badarg -> ok + end. + -spec safe_lookup(pid()) -> list(). safe_lookup(GuildPid) -> try ets:lookup(?CIRCUIT_BREAKER_TABLE, GuildPid) of @@ -369,6 +384,18 @@ record_failure_opens_circuit_test() -> ?assertEqual(open, maps:get(state, State)), Pid ! done. +record_failure_recreates_missing_table_test() -> + catch ets:delete(?CIRCUIT_BREAKER_TABLE), + Pid = spawn(fun() -> + receive + done -> ok + end + end), + ?assertEqual(ok, record_failure(Pid)), + [{Pid, State}] = ets:lookup(?CIRCUIT_BREAKER_TABLE, Pid), + ?assertEqual(1, maps:get(failures, State)), + Pid ! done. + is_success_result_test() -> ?assertEqual(true, is_success_result({ok, #{}})), ?assertEqual(false, is_success_result({error, timeout})), diff --git a/fluxer_gateway/src/guild/guild_manager.erl b/fluxer_gateway/src/guild/guild_manager.erl index eb451daa..a4dbcbee 100644 --- a/fluxer_gateway/src/guild/guild_manager.erl +++ b/fluxer_gateway/src/guild/guild_manager.erl @@ -248,7 +248,14 @@ call_shard(GuildId, Request, Timeout) -> -spec call_via_manager(term(), pos_integer()) -> term(). call_via_manager(Request, Timeout) -> - gen_server:call(?MODULE, Request, Timeout + 1000). + case catch gen_server:call(?MODULE, Request, Timeout + 1000) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', _} -> + {error, unavailable}; + Reply -> + Reply + end. -spec forward_call(guild_id(), term(), state()) -> {term(), state()}. forward_call(GuildId, {start_or_lookup, _} = Request, State) -> diff --git a/fluxer_gateway/src/session/session_manager.erl b/fluxer_gateway/src/session/session_manager.erl index db8ad4ef..ae53c42f 100644 --- a/fluxer_gateway/src/session/session_manager.erl +++ b/fluxer_gateway/src/session/session_manager.erl @@ -136,6 +136,8 @@ call_shard(SessionId, Request, Timeout) -> case shard_pid_from_table(SessionId) of {ok, Pid} -> case catch gen_server:call(Pid, Request, Timeout) of + {'EXIT', {timeout, _}} -> + {error, timeout}; {'EXIT', _} -> call_via_manager(SessionId, Request, Timeout); Reply -> @@ -147,7 +149,14 @@ call_shard(SessionId, Request, Timeout) -> -spec call_via_manager(session_id(), term(), pos_integer()) -> term(). call_via_manager(SessionId, Request, Timeout) -> - gen_server:call(?MODULE, {proxy_call, SessionId, Request, Timeout}, Timeout + 1000). + case catch gen_server:call(?MODULE, {proxy_call, SessionId, Request, Timeout}, Timeout + 1000) of + {'EXIT', {timeout, _}} -> + {error, timeout}; + {'EXIT', _} -> + {error, unavailable}; + Reply -> + Reply + end. -spec forward_call(session_id(), term(), pos_integer(), state()) -> {term(), state()}. forward_call(SessionId, Request, Timeout, State) ->