From 7b1aa6ff2e026953f0ad08306c1f4685474a6e69 Mon Sep 17 00:00:00 2001 From: Hampus Kraft Date: Fri, 27 Feb 2026 04:50:21 +0000 Subject: [PATCH] fix(api): various subtle memory leaks (and some not so subtle ones, *cough* ReportService *cough*) --- packages/api/src/app/APILifecycle.tsx | 9 +- packages/api/src/infrastructure/ClamAV.tsx | 13 +++ .../api/src/infrastructure/GatewayService.tsx | 97 ++++++++++++++++++- .../instance/SnowflakeReservationService.tsx | 10 +- .../api/src/limits/LimitConfigService.tsx | 23 +++-- .../api/src/middleware/IpBanMiddleware.tsx | 35 ++++--- .../api/src/middleware/ServiceMiddleware.tsx | 37 ++++--- packages/api/src/voice/VoiceTopology.tsx | 10 +- 8 files changed, 193 insertions(+), 41 deletions(-) diff --git a/packages/api/src/app/APILifecycle.tsx b/packages/api/src/app/APILifecycle.tsx index 557acf58..23d7827d 100644 --- a/packages/api/src/app/APILifecycle.tsx +++ b/packages/api/src/app/APILifecycle.tsx @@ -25,7 +25,7 @@ import {KVAccountDeletionQueueService} from '@fluxer/api/src/infrastructure/KVAc import {initializeMetricsService} from '@fluxer/api/src/infrastructure/MetricsService'; import {InstanceConfigRepository} from '@fluxer/api/src/instance/InstanceConfigRepository'; import {ipBanCache} from '@fluxer/api/src/middleware/IpBanMiddleware'; -import {initializeServiceSingletons} from '@fluxer/api/src/middleware/ServiceMiddleware'; +import {initializeServiceSingletons, shutdownReportService} from '@fluxer/api/src/middleware/ServiceMiddleware'; import { ensureVoiceResourcesInitialized, getKVClient, @@ -207,6 +207,13 @@ export function createShutdown(logger: ILogger): () => Promise { logger.error({error}, 'Error shutting down IP ban cache'); } + try { + shutdownReportService(); + logger.info('Report service shut down'); + } catch (error) { + logger.error({error}, 'Error shutting down report service'); + } + logger.info('API service shutdown complete'); }; } diff --git a/packages/api/src/infrastructure/ClamAV.tsx b/packages/api/src/infrastructure/ClamAV.tsx index 3b54915b..8c3d0a1c 100644 --- a/packages/api/src/infrastructure/ClamAV.tsx +++ b/packages/api/src/infrastructure/ClamAV.tsx @@ -42,8 +42,17 @@ export class ClamAV { const socket = createConnection(this.port, this.host); let response = ''; let isResolved = false; + const MAX_RESPONSE_SIZE = 10 * 1024 * 1024; + const CONNECT_TIMEOUT_MS = 5000; + + const connectTimeout = setTimeout(() => { + if (!isResolved) { + doReject(new Error('ClamAV connection timeout (5s)')); + } + }, CONNECT_TIMEOUT_MS); const cleanup = () => { + clearTimeout(connectTimeout); if (!socket.destroyed) { socket.destroy(); } @@ -64,6 +73,7 @@ export class ClamAV { }; socket.on('connect', () => { + clearTimeout(connectTimeout); try { socket.write('zINSTREAM\0'); @@ -92,6 +102,9 @@ export class ClamAV { socket.on('data', (data) => { response += data.toString(); + if (response.length > MAX_RESPONSE_SIZE) { + doReject(new Error(`ClamAV response exceeded ${(MAX_RESPONSE_SIZE / 1024 / 1024).toFixed(0)} MB limit`)); + } }); socket.on('end', () => { diff --git a/packages/api/src/infrastructure/GatewayService.tsx b/packages/api/src/infrastructure/GatewayService.tsx index de77268d..cca0ac34 100644 --- a/packages/api/src/infrastructure/GatewayService.tsx +++ b/packages/api/src/infrastructure/GatewayService.tsx @@ -246,6 +246,7 @@ export class GatewayService { private circuitBreakerOpenUntilMs = 0; private readonly CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5; private readonly CIRCUIT_BREAKER_COOLDOWN_MS = ms('10 seconds'); + private readonly PENDING_REQUEST_TIMEOUT_MS = ms('30 seconds'); constructor() { this.rpcClient = GatewayRpcClient.getInstance(); @@ -260,9 +261,29 @@ export class GatewayService { this.circuitBreakerOpenUntilMs = 0; return false; } + this.rejectAllPendingRequests(new ServiceUnavailableError('Gateway circuit breaker open')); return true; } + private rejectAllPendingRequests(error: Error): void { + this.pendingGuildDataRequests.forEach((requests) => { + requests.forEach((req) => req.reject(error)); + }); + this.pendingGuildDataRequests.clear(); + + this.pendingGuildMemberRequests.forEach((requests) => { + requests.forEach((req) => req.reject(error)); + }); + this.pendingGuildMemberRequests.clear(); + + this.pendingPermissionRequests.forEach((requests) => { + requests.forEach((req) => req.reject(error)); + }); + this.pendingPermissionRequests.clear(); + + this.pendingBatchRequestCount = 0; + } + private recordCircuitBreakerSuccess(): void { this.circuitBreakerConsecutiveFailures = 0; } @@ -626,8 +647,25 @@ export class GatewayService { return; } + let timeoutId: NodeJS.Timeout | null = setTimeout(() => { + reject(new GatewayTimeoutError()); + this.removePendingRequest(this.pendingGuildDataRequests, key, wrappedResolve, wrappedReject); + }, this.PENDING_REQUEST_TIMEOUT_MS); + + const wrappedResolve = (value: GuildResponse) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + resolve(value); + }; + + const wrappedReject = (error: Error) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + reject(error); + }; + const pending = this.pendingGuildDataRequests.get(key) || []; - pending.push({resolve, reject}); + pending.push({resolve: wrappedResolve, reject: wrappedReject}); this.pendingGuildDataRequests.set(key, pending); this.pendingBatchRequestCount += 1; @@ -651,8 +689,25 @@ export class GatewayService { return; } + let timeoutId: NodeJS.Timeout | null = setTimeout(() => { + reject(new GatewayTimeoutError()); + this.removePendingRequest(this.pendingGuildMemberRequests, key, wrappedResolve, wrappedReject); + }, this.PENDING_REQUEST_TIMEOUT_MS); + + const wrappedResolve = (value: {success: boolean; memberData?: GuildMemberResponse}) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + resolve(value); + }; + + const wrappedReject = (error: Error) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + reject(error); + }; + const pending = this.pendingGuildMemberRequests.get(key) || []; - pending.push({resolve, reject}); + pending.push({resolve: wrappedResolve, reject: wrappedReject}); this.pendingGuildMemberRequests.set(key, pending); this.pendingBatchRequestCount += 1; @@ -804,8 +859,25 @@ export class GatewayService { return; } + let timeoutId: NodeJS.Timeout | null = setTimeout(() => { + reject(new GatewayTimeoutError()); + this.removePendingRequest(this.pendingPermissionRequests, key, wrappedResolve, wrappedReject); + }, this.PENDING_REQUEST_TIMEOUT_MS); + + const wrappedResolve = (value: boolean) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + resolve(value); + }; + + const wrappedReject = (error: Error) => { + if (timeoutId) clearTimeout(timeoutId); + timeoutId = null; + reject(error); + }; + const pending = this.pendingPermissionRequests.get(key) || []; - pending.push({resolve, reject}); + pending.push({resolve: wrappedResolve, reject: wrappedReject}); this.pendingPermissionRequests.set(key, pending); this.pendingBatchRequestCount += 1; @@ -817,6 +889,25 @@ export class GatewayService { }); } + private removePendingRequest( + map: Map>>, + key: string, + resolve: (value: T) => void, + reject: (error: Error) => void, + ): void { + const pending = map.get(key); + if (pending) { + const index = pending.findIndex((r) => r.resolve === resolve || r.reject === reject); + if (index >= 0) { + pending.splice(index, 1); + this.pendingBatchRequestCount--; + if (pending.length === 0) { + map.delete(key); + } + } + } + } + async canManageRoles({guildId, userId, targetUserId, roleId}: CanManageRolesParams): Promise { const result = await this.call<{can_manage: boolean}>('guild.can_manage_roles', { guild_id: guildId.toString(), diff --git a/packages/api/src/instance/SnowflakeReservationService.tsx b/packages/api/src/instance/SnowflakeReservationService.tsx index 158207bb..0808bf7e 100644 --- a/packages/api/src/instance/SnowflakeReservationService.tsx +++ b/packages/api/src/instance/SnowflakeReservationService.tsx @@ -30,6 +30,7 @@ export class SnowflakeReservationService { private initialized = false; private reloadPromise: Promise | null = null; private kvSubscription: IKVSubscription | null = null; + private messageHandler: ((channel: string) => void) | null = null; constructor( private repository: SnowflakeReservationRepository, @@ -50,13 +51,14 @@ export class SnowflakeReservationService { this.kvSubscription = subscription; await subscription.connect(); await subscription.subscribe(SNOWFLAKE_RESERVATION_REFRESH_CHANNEL); - subscription.on('message', (channel) => { + this.messageHandler = (channel: string) => { if (channel === SNOWFLAKE_RESERVATION_REFRESH_CHANNEL) { this.reload().catch((error) => { Logger.error({error}, 'Failed to reload snowflake reservations'); }); } - }); + }; + subscription.on('message', this.messageHandler); } catch (error) { Logger.error({error}, 'Failed to subscribe to snowflake reservation refresh channel'); } @@ -99,9 +101,13 @@ export class SnowflakeReservationService { } shutdown(): void { + if (this.kvSubscription && this.messageHandler) { + this.kvSubscription.removeAllListeners('message'); + } if (this.kvSubscription) { this.kvSubscription.disconnect(); this.kvSubscription = null; } + this.messageHandler = null; } } diff --git a/packages/api/src/limits/LimitConfigService.tsx b/packages/api/src/limits/LimitConfigService.tsx index 37727d42..56d7cd6f 100644 --- a/packages/api/src/limits/LimitConfigService.tsx +++ b/packages/api/src/limits/LimitConfigService.tsx @@ -45,6 +45,7 @@ export class LimitConfigService { private kvSubscription: IKVSubscription | null = null; private subscriberInitialized = false; private readonly cacheKey: string; + private messageHandler: ((channel: string) => void) | null = null; constructor(repository: InstanceConfigRepository, cacheService: ICacheService, kvClient: IKVProvider | null = null) { this.repository = repository; @@ -144,17 +145,21 @@ export class LimitConfigService { const subscription = this.kvClient.duplicate(); this.kvSubscription = subscription; + this.messageHandler = (channel: string) => { + if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) { + this.refreshCache().catch((err) => { + Logger.error({err}, 'Failed to refresh limit config from pubsub'); + }); + } + }; + subscription .connect() .then(() => subscription.subscribe(LIMIT_CONFIG_REFRESH_CHANNEL)) .then(() => { - subscription.on('message', (channel) => { - if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) { - this.refreshCache().catch((err) => { - Logger.error({err}, 'Failed to refresh limit config from pubsub'); - }); - } - }); + if (this.messageHandler) { + subscription.on('message', this.messageHandler); + } }) .catch((error) => { Logger.error({error}, 'Failed to subscribe to limit config refresh channel'); @@ -164,12 +169,16 @@ export class LimitConfigService { } shutdown(): void { + if (this.kvSubscription && this.messageHandler) { + this.kvSubscription.removeAllListeners('message'); + } if (this.kvSubscription) { this.kvSubscription.quit().catch((err) => { Logger.error({err}, 'Failed to close KV subscription'); }); this.kvSubscription = null; } + this.messageHandler = null; } } diff --git a/packages/api/src/middleware/IpBanMiddleware.tsx b/packages/api/src/middleware/IpBanMiddleware.tsx index 98c0fec3..794dfe40 100644 --- a/packages/api/src/middleware/IpBanMiddleware.tsx +++ b/packages/api/src/middleware/IpBanMiddleware.tsx @@ -52,6 +52,7 @@ class IpBanCache { private kvClient: IKVProvider | null = null; private kvSubscription: IKVSubscription | null = null; private subscriberInitialized = false; + private messageHandler: ((channel: string) => void) | null = null; constructor() { this.singleIpBans = this.createFamilyMaps(); @@ -78,23 +79,27 @@ class IpBanCache { const subscription = this.kvClient.duplicate(); this.kvSubscription = subscription; + this.messageHandler = (channel: string) => { + if (channel === IP_BAN_REFRESH_CHANNEL) { + this.refresh().catch((err) => { + this.consecutiveFailures++; + const message = err instanceof Error ? err.message : String(err); + if (this.consecutiveFailures >= this.maxConsecutiveFailures) { + Logger.error({error: message}, 'Failed to refresh IP ban cache after notification'); + } else { + Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification'); + } + }); + } + }; + subscription .connect() .then(() => subscription.subscribe(IP_BAN_REFRESH_CHANNEL)) .then(() => { - subscription.on('message', (channel) => { - if (channel === IP_BAN_REFRESH_CHANNEL) { - this.refresh().catch((err) => { - this.consecutiveFailures++; - const message = err instanceof Error ? err.message : String(err); - if (this.consecutiveFailures >= this.maxConsecutiveFailures) { - Logger.error({error: message}, 'Failed to refresh IP ban cache after notification'); - } else { - Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification'); - } - }); - } - }); + if (this.messageHandler) { + subscription.on('message', this.messageHandler); + } }) .catch((error) => { Logger.error({error}, 'Failed to subscribe to IP ban refresh channel'); @@ -203,10 +208,14 @@ class IpBanCache { } shutdown(): void { + if (this.kvSubscription && this.messageHandler) { + this.kvSubscription.removeAllListeners('message'); + } if (this.kvSubscription) { this.kvSubscription.disconnect(); this.kvSubscription = null; } + this.messageHandler = null; } } diff --git a/packages/api/src/middleware/ServiceMiddleware.tsx b/packages/api/src/middleware/ServiceMiddleware.tsx index 61d3fca9..3852bb73 100644 --- a/packages/api/src/middleware/ServiceMiddleware.tsx +++ b/packages/api/src/middleware/ServiceMiddleware.tsx @@ -173,6 +173,15 @@ import {createMiddleware} from 'hono/factory'; const errorI18nService = new ErrorI18nService(); +let _reportService: ReportService | null = null; + +export function shutdownReportService(): void { + if (_reportService) { + _reportService.shutdown(); + _reportService = null; + } +} + let _testEmailService: TestEmailService | null = null; function getTestEmailService(): TestEmailService { if (!_testEmailService) { @@ -617,19 +626,21 @@ export const ServiceMiddleware = createMiddleware(async (ctx, next) => const desktopHandoffService = new DesktopHandoffService(cacheService); const authRequestService = new AuthRequestService(authService, ssoService, cacheService, desktopHandoffService); - const reportSearchService = getReportSearchService(); - const reportService = new ReportService( - reportRepository, - channelRepository, - guildRepository, - userRepository, - inviteRepository, - emailService, - emailDnsValidationService, - snowflakeService, - storageService, - reportSearchService, - ); + if (!_reportService) { + _reportService = new ReportService( + reportRepository, + channelRepository, + guildRepository, + userRepository, + inviteRepository, + emailService, + emailDnsValidationService, + snowflakeService, + storageService, + getReportSearchService(), + ); + } + const reportService = _reportService; const reportRequestService = new ReportRequestService(reportService); const adminService = new AdminService( diff --git a/packages/api/src/voice/VoiceTopology.tsx b/packages/api/src/voice/VoiceTopology.tsx index a1ffb82e..06ed18c6 100644 --- a/packages/api/src/voice/VoiceTopology.tsx +++ b/packages/api/src/voice/VoiceTopology.tsx @@ -34,6 +34,7 @@ export class VoiceTopology { private subscribers: Set = new Set(); private serverRotationIndex: Map = new Map(); private kvSubscription: IKVSubscription | null = null; + private messageHandler: ((channel: string) => void) | null = null; constructor( private voiceRepository: IVoiceRepository, @@ -53,13 +54,14 @@ export class VoiceTopology { this.kvSubscription = subscription; await subscription.connect(); await subscription.subscribe(VOICE_CONFIGURATION_CHANNEL); - subscription.on('message', (channel) => { + this.messageHandler = (channel: string) => { if (channel === VOICE_CONFIGURATION_CHANNEL) { this.reload().catch((error) => { Logger.error({error}, 'Failed to reload voice topology from KV notification'); }); } - }); + }; + subscription.on('message', this.messageHandler); } catch (error) { Logger.error({error}, 'Failed to subscribe to voice configuration channel'); } @@ -239,9 +241,13 @@ export class VoiceTopology { } shutdown(): void { + if (this.kvSubscription && this.messageHandler) { + this.kvSubscription.removeAllListeners('message'); + } if (this.kvSubscription) { this.kvSubscription.disconnect(); this.kvSubscription = null; } + this.messageHandler = null; } }