From 8f9daac8b0ee03b4642211fabdafd816ee0fde83 Mon Sep 17 00:00:00 2001 From: hampus-fluxer Date: Tue, 6 Jan 2026 00:22:30 +0100 Subject: [PATCH] perf(api): replace polling for in-memory caches with signals (#35) --- fluxer_api/src/App.ts | 2 + .../services/AdminBanManagementService.ts | 9 +++- .../src/admin/services/AdminUserService.ts | 1 + fluxer_api/src/constants/FeatureFlags.ts | 3 +- fluxer_api/src/constants/IpBan.ts | 20 ++++++++ .../src/feature_flag/FeatureFlagService.ts | 51 +++++++++++++------ fluxer_api/src/middleware/IpBanMiddleware.ts | 51 +++++++++++++------ .../src/middleware/ServiceMiddleware.ts | 3 +- fluxer_api/src/rpc/RpcService.ts | 4 +- 9 files changed, 106 insertions(+), 38 deletions(-) create mode 100644 fluxer_api/src/constants/IpBan.ts diff --git a/fluxer_api/src/App.ts b/fluxer_api/src/App.ts index fb99a286..0ba9c187 100644 --- a/fluxer_api/src/App.ts +++ b/fluxer_api/src/App.ts @@ -263,6 +263,8 @@ app.route('/', routes); app.onError(AppErrorHandler); app.notFound(AppNotFoundHandler); +const ipBanSubscriber = new Redis(Config.redis.url); +ipBanCache.setRefreshSubscriber(ipBanSubscriber); await ipBanCache.initialize(); initializeMetricsService(Config.metrics.host ?? null); diff --git a/fluxer_api/src/admin/services/AdminBanManagementService.ts b/fluxer_api/src/admin/services/AdminBanManagementService.ts index 028a0392..5f64542f 100644 --- a/fluxer_api/src/admin/services/AdminBanManagementService.ts +++ b/fluxer_api/src/admin/services/AdminBanManagementService.ts @@ -19,21 +19,25 @@ import type {IAdminRepository} from '~/admin/IAdminRepository'; import type {UserID} from '~/BrandedTypes'; +import {IP_BAN_REFRESH_CHANNEL} from '~/constants/IpBan'; +import type {ICacheService} from '~/infrastructure/ICacheService'; import {ipBanCache} from '~/middleware/IpBanMiddleware'; import type {AdminAuditService} from './AdminAuditService'; interface AdminBanManagementServiceDeps { adminRepository: IAdminRepository; auditService: AdminAuditService; + cacheService: ICacheService; } export class AdminBanManagementService { constructor(private readonly deps: AdminBanManagementServiceDeps) {} async banIp(data: {ip: string}, adminUserId: UserID, auditLogReason: string | null) { - const {adminRepository, auditService} = this.deps; + const {adminRepository, auditService, cacheService} = this.deps; await adminRepository.banIp(data.ip); ipBanCache.ban(data.ip); + await cacheService.publish(IP_BAN_REFRESH_CHANNEL, 'refresh'); await auditService.createAuditLog({ adminUserId, @@ -46,9 +50,10 @@ export class AdminBanManagementService { } async unbanIp(data: {ip: string}, adminUserId: UserID, auditLogReason: string | null) { - const {adminRepository, auditService} = this.deps; + const {adminRepository, auditService, cacheService} = this.deps; await adminRepository.unbanIp(data.ip); ipBanCache.unban(data.ip); + await cacheService.publish(IP_BAN_REFRESH_CHANNEL, 'refresh'); await auditService.createAuditLog({ adminUserId, diff --git a/fluxer_api/src/admin/services/AdminUserService.ts b/fluxer_api/src/admin/services/AdminUserService.ts index e8c0cf40..e103270e 100644 --- a/fluxer_api/src/admin/services/AdminUserService.ts +++ b/fluxer_api/src/admin/services/AdminUserService.ts @@ -159,6 +159,7 @@ export class AdminUserService { this.banManagementService = new AdminBanManagementService({ adminRepository: deps.adminRepository, auditService: deps.auditService, + cacheService: deps.cacheService, }); this.registrationService = new AdminUserRegistrationService({ diff --git a/fluxer_api/src/constants/FeatureFlags.ts b/fluxer_api/src/constants/FeatureFlags.ts index 95fa99a6..fdebc1f7 100644 --- a/fluxer_api/src/constants/FeatureFlags.ts +++ b/fluxer_api/src/constants/FeatureFlags.ts @@ -28,7 +28,6 @@ export const ALL_FEATURE_FLAGS: Array = Object.values(FeatureFlags) export const FEATURE_FLAG_KEY_PREFIX = 'feature_flag:'; export const FEATURE_FLAG_REDIS_KEY = 'feature_flags:config'; -export const FEATURE_FLAG_POLL_INTERVAL_MS = 30000; -export const FEATURE_FLAG_POLL_JITTER_MS = 5000; export const FEATURE_FLAG_USER_CACHE_PREFIX = 'feature_flag:user'; export const FEATURE_FLAG_USER_CACHE_TTL_SECONDS = 30; +export const FEATURE_FLAG_REFRESH_CHANNEL = 'feature_flags:refresh'; diff --git a/fluxer_api/src/constants/IpBan.ts b/fluxer_api/src/constants/IpBan.ts new file mode 100644 index 00000000..ab85ebf8 --- /dev/null +++ b/fluxer_api/src/constants/IpBan.ts @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2026 Fluxer Contributors + * + * This file is part of Fluxer. + * + * Fluxer is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Fluxer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Fluxer. If not, see . + */ + +export const IP_BAN_REFRESH_CHANNEL = 'ip_bans:refresh'; diff --git a/fluxer_api/src/feature_flag/FeatureFlagService.ts b/fluxer_api/src/feature_flag/FeatureFlagService.ts index 3cbb8c66..9672400d 100644 --- a/fluxer_api/src/feature_flag/FeatureFlagService.ts +++ b/fluxer_api/src/feature_flag/FeatureFlagService.ts @@ -17,13 +17,13 @@ * along with Fluxer. If not, see . */ +import type {Redis} from 'ioredis'; import type {GuildID, UserID} from '~/BrandedTypes'; import {Config} from '~/Config'; import { ALL_FEATURE_FLAGS, - FEATURE_FLAG_POLL_INTERVAL_MS, - FEATURE_FLAG_POLL_JITTER_MS, FEATURE_FLAG_REDIS_KEY, + FEATURE_FLAG_REFRESH_CHANNEL, FEATURE_FLAG_USER_CACHE_PREFIX, FEATURE_FLAG_USER_CACHE_TTL_SECONDS, type FeatureFlag, @@ -38,13 +38,15 @@ interface SerializedFeatureFlagConfig { export class FeatureFlagService { private inMemoryCache: Map> = new Map(); - private pollInterval: ReturnType | null = null; private repository: FeatureFlagRepository; private cacheService: ICacheService; + private redisSubscriber: Redis | null; + private subscriberInitialized = false; - constructor(repository: FeatureFlagRepository, cacheService: ICacheService) { + constructor(repository: FeatureFlagRepository, cacheService: ICacheService, redisSubscriber: Redis | null = null) { this.repository = repository; this.cacheService = cacheService; + this.redisSubscriber = redisSubscriber; for (const flag of ALL_FEATURE_FLAGS) { this.inMemoryCache.set(flag, new Set()); @@ -53,26 +55,36 @@ export class FeatureFlagService { async initialize(): Promise { await this.refreshCache(); - this.startPolling(); + this.initializeSubscriber(); Logger.info('FeatureFlagService initialized'); } shutdown(): void { - if (this.pollInterval) { - clearInterval(this.pollInterval); - this.pollInterval = null; - } + // Subscriber connections are managed externally. } - private startPolling(): void { - const jitter = Math.random() * FEATURE_FLAG_POLL_JITTER_MS; - const interval = FEATURE_FLAG_POLL_INTERVAL_MS + jitter; + private initializeSubscriber(): void { + if (this.subscriberInitialized || !this.redisSubscriber) { + return; + } - this.pollInterval = setInterval(() => { - this.refreshCache().catch((err) => { - Logger.error({err}, 'Failed to refresh feature flag cache'); + const subscriber = this.redisSubscriber; + subscriber + .subscribe(FEATURE_FLAG_REFRESH_CHANNEL) + .then(() => { + subscriber.on('message', (channel) => { + if (channel === FEATURE_FLAG_REFRESH_CHANNEL) { + this.refreshCache().catch((err) => { + Logger.error({err}, 'Failed to refresh feature flag cache from pubsub'); + }); + } + }); + }) + .catch((error) => { + Logger.error({error}, 'Failed to subscribe to feature flag refresh channel'); }); - }, interval); + + this.subscriberInitialized = true; } private async refreshCache(): Promise { @@ -139,8 +151,10 @@ export class FeatureFlagService { async setFeatureGuildIds(flag: FeatureFlag, guildIds: Set): Promise { await this.repository.setFeatureFlag(flag, guildIds); + await this.invalidateUserFeatureCache(flag); await this.cacheService.delete(FEATURE_FLAG_REDIS_KEY); await this.refreshCache(); + await this.cacheService.publish(FEATURE_FLAG_REFRESH_CHANNEL, 'refresh'); Logger.info({flag, guildCount: guildIds.size}, 'Feature flag guild IDs updated'); } @@ -155,4 +169,9 @@ export class FeatureFlagService { getGuildIdsForFlag(flag: FeatureFlag): Set { return new Set(this.inMemoryCache.get(flag) ?? []); } + + private async invalidateUserFeatureCache(flag: FeatureFlag): Promise { + const pattern = `${FEATURE_FLAG_USER_CACHE_PREFIX}:${flag}:*`; + await this.cacheService.deletePattern(pattern); + } } diff --git a/fluxer_api/src/middleware/IpBanMiddleware.ts b/fluxer_api/src/middleware/IpBanMiddleware.ts index 41af3a69..7ed6a928 100644 --- a/fluxer_api/src/middleware/IpBanMiddleware.ts +++ b/fluxer_api/src/middleware/IpBanMiddleware.ts @@ -18,8 +18,10 @@ */ import {createMiddleware} from 'hono/factory'; +import type {Redis} from 'ioredis'; import type {HonoEnv} from '~/App'; import {AdminRepository} from '~/admin/AdminRepository'; +import {IP_BAN_REFRESH_CHANNEL} from '~/constants/IpBan'; import {IpBannedError} from '~/Errors'; import {Logger} from '~/Logger'; import {type IpFamily, parseIpBanEntry, tryParseSingleIp} from '~/utils/IpRangeUtils'; @@ -42,38 +44,57 @@ class IpBanCache { private singleIpBans: FamilyMap; private rangeIpBans: FamilyMap; private isInitialized = false; - private refreshIntervalMs = 30 * 1000; private adminRepository = new AdminRepository(); private consecutiveFailures = 0; private maxConsecutiveFailures = 5; + private redisSubscriber: Redis | null = null; + private subscriberInitialized = false; constructor() { this.singleIpBans = this.createFamilyMaps(); this.rangeIpBans = this.createFamilyMaps(); } + setRefreshSubscriber(subscriber: Redis | null): void { + this.redisSubscriber = subscriber; + } + async initialize(): Promise { if (this.isInitialized) return; await this.refresh(); this.isInitialized = true; + this.setupSubscriber(); + } - setInterval(() => { - this.refresh().catch((err) => { - this.consecutiveFailures++; + private setupSubscriber(): void { + if (this.subscriberInitialized || !this.redisSubscriber) { + return; + } - if (this.consecutiveFailures >= this.maxConsecutiveFailures) { - console.error( - `Failed to refresh IP ban cache ${this.consecutiveFailures} times in a row. ` + - `Last error: ${err.message}. Cache may be stale.`, - ); - } else { - console.warn( - `Failed to refresh IP ban cache (${this.consecutiveFailures}/${this.maxConsecutiveFailures}): ${err.message}`, - ); - } + const subscriber = this.redisSubscriber; + subscriber + .subscribe(IP_BAN_REFRESH_CHANNEL) + .then(() => { + subscriber.on('message', (channel) => { + if (channel === IP_BAN_REFRESH_CHANNEL) { + this.refresh().catch((err) => { + this.consecutiveFailures++; + const message = err instanceof Error ? err.message : String(err); + if (this.consecutiveFailures >= this.maxConsecutiveFailures) { + Logger.error({error: message}, 'Failed to refresh IP ban cache after notification'); + } else { + Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification'); + } + }); + } + }); + }) + .catch((error) => { + Logger.error({error}, 'Failed to subscribe to IP ban refresh channel'); }); - }, this.refreshIntervalMs); + + this.subscriberInitialized = true; } async refresh(): Promise { diff --git a/fluxer_api/src/middleware/ServiceMiddleware.ts b/fluxer_api/src/middleware/ServiceMiddleware.ts index da1e1a0f..68256dbd 100644 --- a/fluxer_api/src/middleware/ServiceMiddleware.ts +++ b/fluxer_api/src/middleware/ServiceMiddleware.ts @@ -145,7 +145,8 @@ const cloudflarePurgeQueue: ICloudflarePurgeQueue = Config.cloudflare.purgeEnabl const assetDeletionQueue: IAssetDeletionQueue = new AssetDeletionQueue(redis); const featureFlagRepository = new FeatureFlagRepository(); -const featureFlagService = new FeatureFlagService(featureFlagRepository, cacheService); +const featureFlagSubscriber = new Redis(Config.redis.url); +const featureFlagService = new FeatureFlagService(featureFlagRepository, cacheService, featureFlagSubscriber); let featureFlagServiceInitialized = false; const snowflakeReservationRepository = new SnowflakeReservationRepository(); const snowflakeReservationSubscriber = new Redis(Config.redis.url); diff --git a/fluxer_api/src/rpc/RpcService.ts b/fluxer_api/src/rpc/RpcService.ts index 36a16dbf..9fdb553e 100644 --- a/fluxer_api/src/rpc/RpcService.ts +++ b/fluxer_api/src/rpc/RpcService.ts @@ -289,7 +289,7 @@ export class RpcService { return { type: 'geoip_lookup', data: { - country_code: geoip.countryCode, + country_code: geoip.countryCode ?? 'US', }, }; } @@ -689,7 +689,7 @@ export class RpcService { let countryCode = 'US'; if (ip) { const geoip = await lookupGeoip(ip); - countryCode = geoip.countryCode; + countryCode = geoip.countryCode ?? countryCode; } else { Logger.warn({context: 'rpc_geoip', reason: 'ip_missing'}, 'RPC session request missing IP for GeoIP'); }