perf(api): replace polling for in-memory caches with signals (#35)

This commit is contained in:
hampus-fluxer 2026-01-06 00:22:30 +01:00 committed by GitHub
parent 9c665413ac
commit 8f9daac8b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 106 additions and 38 deletions

View File

@ -263,6 +263,8 @@ app.route('/', routes);
app.onError(AppErrorHandler);
app.notFound(AppNotFoundHandler);
const ipBanSubscriber = new Redis(Config.redis.url);
ipBanCache.setRefreshSubscriber(ipBanSubscriber);
await ipBanCache.initialize();
initializeMetricsService(Config.metrics.host ?? null);

View File

@ -19,21 +19,25 @@
import type {IAdminRepository} from '~/admin/IAdminRepository';
import type {UserID} from '~/BrandedTypes';
import {IP_BAN_REFRESH_CHANNEL} from '~/constants/IpBan';
import type {ICacheService} from '~/infrastructure/ICacheService';
import {ipBanCache} from '~/middleware/IpBanMiddleware';
import type {AdminAuditService} from './AdminAuditService';
interface AdminBanManagementServiceDeps {
adminRepository: IAdminRepository;
auditService: AdminAuditService;
cacheService: ICacheService;
}
export class AdminBanManagementService {
constructor(private readonly deps: AdminBanManagementServiceDeps) {}
async banIp(data: {ip: string}, adminUserId: UserID, auditLogReason: string | null) {
const {adminRepository, auditService} = this.deps;
const {adminRepository, auditService, cacheService} = this.deps;
await adminRepository.banIp(data.ip);
ipBanCache.ban(data.ip);
await cacheService.publish(IP_BAN_REFRESH_CHANNEL, 'refresh');
await auditService.createAuditLog({
adminUserId,
@ -46,9 +50,10 @@ export class AdminBanManagementService {
}
async unbanIp(data: {ip: string}, adminUserId: UserID, auditLogReason: string | null) {
const {adminRepository, auditService} = this.deps;
const {adminRepository, auditService, cacheService} = this.deps;
await adminRepository.unbanIp(data.ip);
ipBanCache.unban(data.ip);
await cacheService.publish(IP_BAN_REFRESH_CHANNEL, 'refresh');
await auditService.createAuditLog({
adminUserId,

View File

@ -159,6 +159,7 @@ export class AdminUserService {
this.banManagementService = new AdminBanManagementService({
adminRepository: deps.adminRepository,
auditService: deps.auditService,
cacheService: deps.cacheService,
});
this.registrationService = new AdminUserRegistrationService({

View File

@ -28,7 +28,6 @@ export const ALL_FEATURE_FLAGS: Array<FeatureFlag> = Object.values(FeatureFlags)
export const FEATURE_FLAG_KEY_PREFIX = 'feature_flag:';
export const FEATURE_FLAG_REDIS_KEY = 'feature_flags:config';
export const FEATURE_FLAG_POLL_INTERVAL_MS = 30000;
export const FEATURE_FLAG_POLL_JITTER_MS = 5000;
export const FEATURE_FLAG_USER_CACHE_PREFIX = 'feature_flag:user';
export const FEATURE_FLAG_USER_CACHE_TTL_SECONDS = 30;
export const FEATURE_FLAG_REFRESH_CHANNEL = 'feature_flags:refresh';

View File

@ -0,0 +1,20 @@
/*
* Copyright (C) 2026 Fluxer Contributors
*
* This file is part of Fluxer.
*
* Fluxer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Fluxer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Fluxer. If not, see <https://www.gnu.org/licenses/>.
*/
export const IP_BAN_REFRESH_CHANNEL = 'ip_bans:refresh';

View File

@ -17,13 +17,13 @@
* along with Fluxer. If not, see <https://www.gnu.org/licenses/>.
*/
import type {Redis} from 'ioredis';
import type {GuildID, UserID} from '~/BrandedTypes';
import {Config} from '~/Config';
import {
ALL_FEATURE_FLAGS,
FEATURE_FLAG_POLL_INTERVAL_MS,
FEATURE_FLAG_POLL_JITTER_MS,
FEATURE_FLAG_REDIS_KEY,
FEATURE_FLAG_REFRESH_CHANNEL,
FEATURE_FLAG_USER_CACHE_PREFIX,
FEATURE_FLAG_USER_CACHE_TTL_SECONDS,
type FeatureFlag,
@ -38,13 +38,15 @@ interface SerializedFeatureFlagConfig {
export class FeatureFlagService {
private inMemoryCache: Map<FeatureFlag, Set<string>> = new Map();
private pollInterval: ReturnType<typeof setInterval> | null = null;
private repository: FeatureFlagRepository;
private cacheService: ICacheService;
private redisSubscriber: Redis | null;
private subscriberInitialized = false;
constructor(repository: FeatureFlagRepository, cacheService: ICacheService) {
constructor(repository: FeatureFlagRepository, cacheService: ICacheService, redisSubscriber: Redis | null = null) {
this.repository = repository;
this.cacheService = cacheService;
this.redisSubscriber = redisSubscriber;
for (const flag of ALL_FEATURE_FLAGS) {
this.inMemoryCache.set(flag, new Set());
@ -53,26 +55,36 @@ export class FeatureFlagService {
async initialize(): Promise<void> {
await this.refreshCache();
this.startPolling();
this.initializeSubscriber();
Logger.info('FeatureFlagService initialized');
}
shutdown(): void {
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
// Subscriber connections are managed externally.
}
private startPolling(): void {
const jitter = Math.random() * FEATURE_FLAG_POLL_JITTER_MS;
const interval = FEATURE_FLAG_POLL_INTERVAL_MS + jitter;
private initializeSubscriber(): void {
if (this.subscriberInitialized || !this.redisSubscriber) {
return;
}
this.pollInterval = setInterval(() => {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh feature flag cache');
const subscriber = this.redisSubscriber;
subscriber
.subscribe(FEATURE_FLAG_REFRESH_CHANNEL)
.then(() => {
subscriber.on('message', (channel) => {
if (channel === FEATURE_FLAG_REFRESH_CHANNEL) {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh feature flag cache from pubsub');
});
}
});
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to feature flag refresh channel');
});
}, interval);
this.subscriberInitialized = true;
}
private async refreshCache(): Promise<void> {
@ -139,8 +151,10 @@ export class FeatureFlagService {
async setFeatureGuildIds(flag: FeatureFlag, guildIds: Set<string>): Promise<void> {
await this.repository.setFeatureFlag(flag, guildIds);
await this.invalidateUserFeatureCache(flag);
await this.cacheService.delete(FEATURE_FLAG_REDIS_KEY);
await this.refreshCache();
await this.cacheService.publish(FEATURE_FLAG_REFRESH_CHANNEL, 'refresh');
Logger.info({flag, guildCount: guildIds.size}, 'Feature flag guild IDs updated');
}
@ -155,4 +169,9 @@ export class FeatureFlagService {
getGuildIdsForFlag(flag: FeatureFlag): Set<string> {
return new Set(this.inMemoryCache.get(flag) ?? []);
}
private async invalidateUserFeatureCache(flag: FeatureFlag): Promise<void> {
const pattern = `${FEATURE_FLAG_USER_CACHE_PREFIX}:${flag}:*`;
await this.cacheService.deletePattern(pattern);
}
}

View File

@ -18,8 +18,10 @@
*/
import {createMiddleware} from 'hono/factory';
import type {Redis} from 'ioredis';
import type {HonoEnv} from '~/App';
import {AdminRepository} from '~/admin/AdminRepository';
import {IP_BAN_REFRESH_CHANNEL} from '~/constants/IpBan';
import {IpBannedError} from '~/Errors';
import {Logger} from '~/Logger';
import {type IpFamily, parseIpBanEntry, tryParseSingleIp} from '~/utils/IpRangeUtils';
@ -42,38 +44,57 @@ class IpBanCache {
private singleIpBans: FamilyMap<SingleCacheEntry>;
private rangeIpBans: FamilyMap<RangeCacheEntry>;
private isInitialized = false;
private refreshIntervalMs = 30 * 1000;
private adminRepository = new AdminRepository();
private consecutiveFailures = 0;
private maxConsecutiveFailures = 5;
private redisSubscriber: Redis | null = null;
private subscriberInitialized = false;
constructor() {
this.singleIpBans = this.createFamilyMaps();
this.rangeIpBans = this.createFamilyMaps();
}
setRefreshSubscriber(subscriber: Redis | null): void {
this.redisSubscriber = subscriber;
}
async initialize(): Promise<void> {
if (this.isInitialized) return;
await this.refresh();
this.isInitialized = true;
this.setupSubscriber();
}
setInterval(() => {
this.refresh().catch((err) => {
this.consecutiveFailures++;
private setupSubscriber(): void {
if (this.subscriberInitialized || !this.redisSubscriber) {
return;
}
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
console.error(
`Failed to refresh IP ban cache ${this.consecutiveFailures} times in a row. ` +
`Last error: ${err.message}. Cache may be stale.`,
);
} else {
console.warn(
`Failed to refresh IP ban cache (${this.consecutiveFailures}/${this.maxConsecutiveFailures}): ${err.message}`,
);
}
const subscriber = this.redisSubscriber;
subscriber
.subscribe(IP_BAN_REFRESH_CHANNEL)
.then(() => {
subscriber.on('message', (channel) => {
if (channel === IP_BAN_REFRESH_CHANNEL) {
this.refresh().catch((err) => {
this.consecutiveFailures++;
const message = err instanceof Error ? err.message : String(err);
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
Logger.error({error: message}, 'Failed to refresh IP ban cache after notification');
} else {
Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification');
}
});
}
});
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to IP ban refresh channel');
});
}, this.refreshIntervalMs);
this.subscriberInitialized = true;
}
async refresh(): Promise<void> {

View File

@ -145,7 +145,8 @@ const cloudflarePurgeQueue: ICloudflarePurgeQueue = Config.cloudflare.purgeEnabl
const assetDeletionQueue: IAssetDeletionQueue = new AssetDeletionQueue(redis);
const featureFlagRepository = new FeatureFlagRepository();
const featureFlagService = new FeatureFlagService(featureFlagRepository, cacheService);
const featureFlagSubscriber = new Redis(Config.redis.url);
const featureFlagService = new FeatureFlagService(featureFlagRepository, cacheService, featureFlagSubscriber);
let featureFlagServiceInitialized = false;
const snowflakeReservationRepository = new SnowflakeReservationRepository();
const snowflakeReservationSubscriber = new Redis(Config.redis.url);

View File

@ -289,7 +289,7 @@ export class RpcService {
return {
type: 'geoip_lookup',
data: {
country_code: geoip.countryCode,
country_code: geoip.countryCode ?? 'US',
},
};
}
@ -689,7 +689,7 @@ export class RpcService {
let countryCode = 'US';
if (ip) {
const geoip = await lookupGeoip(ip);
countryCode = geoip.countryCode;
countryCode = geoip.countryCode ?? countryCode;
} else {
Logger.warn({context: 'rpc_geoip', reason: 'ip_missing'}, 'RPC session request missing IP for GeoIP');
}