fix(api): various subtle memory leaks

(and some not so subtle ones, *cough* ReportService *cough*)
This commit is contained in:
Hampus Kraft 2026-02-27 04:50:21 +00:00
parent 848269a4d4
commit 7b1aa6ff2e
No known key found for this signature in database
GPG Key ID: 6090864C465A454D
8 changed files with 193 additions and 41 deletions

View File

@ -25,7 +25,7 @@ import {KVAccountDeletionQueueService} from '@fluxer/api/src/infrastructure/KVAc
import {initializeMetricsService} from '@fluxer/api/src/infrastructure/MetricsService';
import {InstanceConfigRepository} from '@fluxer/api/src/instance/InstanceConfigRepository';
import {ipBanCache} from '@fluxer/api/src/middleware/IpBanMiddleware';
import {initializeServiceSingletons} from '@fluxer/api/src/middleware/ServiceMiddleware';
import {initializeServiceSingletons, shutdownReportService} from '@fluxer/api/src/middleware/ServiceMiddleware';
import {
ensureVoiceResourcesInitialized,
getKVClient,
@ -207,6 +207,13 @@ export function createShutdown(logger: ILogger): () => Promise<void> {
logger.error({error}, 'Error shutting down IP ban cache');
}
try {
shutdownReportService();
logger.info('Report service shut down');
} catch (error) {
logger.error({error}, 'Error shutting down report service');
}
logger.info('API service shutdown complete');
};
}

View File

@ -42,8 +42,17 @@ export class ClamAV {
const socket = createConnection(this.port, this.host);
let response = '';
let isResolved = false;
const MAX_RESPONSE_SIZE = 10 * 1024 * 1024;
const CONNECT_TIMEOUT_MS = 5000;
const connectTimeout = setTimeout(() => {
if (!isResolved) {
doReject(new Error('ClamAV connection timeout (5s)'));
}
}, CONNECT_TIMEOUT_MS);
const cleanup = () => {
clearTimeout(connectTimeout);
if (!socket.destroyed) {
socket.destroy();
}
@ -64,6 +73,7 @@ export class ClamAV {
};
socket.on('connect', () => {
clearTimeout(connectTimeout);
try {
socket.write('zINSTREAM\0');
@ -92,6 +102,9 @@ export class ClamAV {
socket.on('data', (data) => {
response += data.toString();
if (response.length > MAX_RESPONSE_SIZE) {
doReject(new Error(`ClamAV response exceeded ${(MAX_RESPONSE_SIZE / 1024 / 1024).toFixed(0)} MB limit`));
}
});
socket.on('end', () => {

View File

@ -246,6 +246,7 @@ export class GatewayService {
private circuitBreakerOpenUntilMs = 0;
private readonly CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5;
private readonly CIRCUIT_BREAKER_COOLDOWN_MS = ms('10 seconds');
private readonly PENDING_REQUEST_TIMEOUT_MS = ms('30 seconds');
constructor() {
this.rpcClient = GatewayRpcClient.getInstance();
@ -260,9 +261,29 @@ export class GatewayService {
this.circuitBreakerOpenUntilMs = 0;
return false;
}
this.rejectAllPendingRequests(new ServiceUnavailableError('Gateway circuit breaker open'));
return true;
}
private rejectAllPendingRequests(error: Error): void {
this.pendingGuildDataRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingGuildDataRequests.clear();
this.pendingGuildMemberRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingGuildMemberRequests.clear();
this.pendingPermissionRequests.forEach((requests) => {
requests.forEach((req) => req.reject(error));
});
this.pendingPermissionRequests.clear();
this.pendingBatchRequestCount = 0;
}
private recordCircuitBreakerSuccess(): void {
this.circuitBreakerConsecutiveFailures = 0;
}
@ -626,8 +647,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingGuildDataRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: GuildResponse) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingGuildDataRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingGuildDataRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@ -651,8 +689,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingGuildMemberRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: {success: boolean; memberData?: GuildMemberResponse}) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingGuildMemberRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingGuildMemberRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@ -804,8 +859,25 @@ export class GatewayService {
return;
}
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
reject(new GatewayTimeoutError());
this.removePendingRequest(this.pendingPermissionRequests, key, wrappedResolve, wrappedReject);
}, this.PENDING_REQUEST_TIMEOUT_MS);
const wrappedResolve = (value: boolean) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
resolve(value);
};
const wrappedReject = (error: Error) => {
if (timeoutId) clearTimeout(timeoutId);
timeoutId = null;
reject(error);
};
const pending = this.pendingPermissionRequests.get(key) || [];
pending.push({resolve, reject});
pending.push({resolve: wrappedResolve, reject: wrappedReject});
this.pendingPermissionRequests.set(key, pending);
this.pendingBatchRequestCount += 1;
@ -817,6 +889,25 @@ export class GatewayService {
});
}
private removePendingRequest<T>(
map: Map<string, Array<PendingRequest<T>>>,
key: string,
resolve: (value: T) => void,
reject: (error: Error) => void,
): void {
const pending = map.get(key);
if (pending) {
const index = pending.findIndex((r) => r.resolve === resolve || r.reject === reject);
if (index >= 0) {
pending.splice(index, 1);
this.pendingBatchRequestCount--;
if (pending.length === 0) {
map.delete(key);
}
}
}
}
async canManageRoles({guildId, userId, targetUserId, roleId}: CanManageRolesParams): Promise<boolean> {
const result = await this.call<{can_manage: boolean}>('guild.can_manage_roles', {
guild_id: guildId.toString(),

View File

@ -30,6 +30,7 @@ export class SnowflakeReservationService {
private initialized = false;
private reloadPromise: Promise<void> | null = null;
private kvSubscription: IKVSubscription | null = null;
private messageHandler: ((channel: string) => void) | null = null;
constructor(
private repository: SnowflakeReservationRepository,
@ -50,13 +51,14 @@ export class SnowflakeReservationService {
this.kvSubscription = subscription;
await subscription.connect();
await subscription.subscribe(SNOWFLAKE_RESERVATION_REFRESH_CHANNEL);
subscription.on('message', (channel) => {
this.messageHandler = (channel: string) => {
if (channel === SNOWFLAKE_RESERVATION_REFRESH_CHANNEL) {
this.reload().catch((error) => {
Logger.error({error}, 'Failed to reload snowflake reservations');
});
}
});
};
subscription.on('message', this.messageHandler);
} catch (error) {
Logger.error({error}, 'Failed to subscribe to snowflake reservation refresh channel');
}
@ -99,9 +101,13 @@ export class SnowflakeReservationService {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@ -45,6 +45,7 @@ export class LimitConfigService {
private kvSubscription: IKVSubscription | null = null;
private subscriberInitialized = false;
private readonly cacheKey: string;
private messageHandler: ((channel: string) => void) | null = null;
constructor(repository: InstanceConfigRepository, cacheService: ICacheService, kvClient: IKVProvider | null = null) {
this.repository = repository;
@ -144,17 +145,21 @@ export class LimitConfigService {
const subscription = this.kvClient.duplicate();
this.kvSubscription = subscription;
this.messageHandler = (channel: string) => {
if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh limit config from pubsub');
});
}
};
subscription
.connect()
.then(() => subscription.subscribe(LIMIT_CONFIG_REFRESH_CHANNEL))
.then(() => {
subscription.on('message', (channel) => {
if (channel === LIMIT_CONFIG_REFRESH_CHANNEL) {
this.refreshCache().catch((err) => {
Logger.error({err}, 'Failed to refresh limit config from pubsub');
});
}
});
if (this.messageHandler) {
subscription.on('message', this.messageHandler);
}
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to limit config refresh channel');
@ -164,12 +169,16 @@ export class LimitConfigService {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.quit().catch((err) => {
Logger.error({err}, 'Failed to close KV subscription');
});
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@ -52,6 +52,7 @@ class IpBanCache {
private kvClient: IKVProvider | null = null;
private kvSubscription: IKVSubscription | null = null;
private subscriberInitialized = false;
private messageHandler: ((channel: string) => void) | null = null;
constructor() {
this.singleIpBans = this.createFamilyMaps();
@ -78,23 +79,27 @@ class IpBanCache {
const subscription = this.kvClient.duplicate();
this.kvSubscription = subscription;
this.messageHandler = (channel: string) => {
if (channel === IP_BAN_REFRESH_CHANNEL) {
this.refresh().catch((err) => {
this.consecutiveFailures++;
const message = err instanceof Error ? err.message : String(err);
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
Logger.error({error: message}, 'Failed to refresh IP ban cache after notification');
} else {
Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification');
}
});
}
};
subscription
.connect()
.then(() => subscription.subscribe(IP_BAN_REFRESH_CHANNEL))
.then(() => {
subscription.on('message', (channel) => {
if (channel === IP_BAN_REFRESH_CHANNEL) {
this.refresh().catch((err) => {
this.consecutiveFailures++;
const message = err instanceof Error ? err.message : String(err);
if (this.consecutiveFailures >= this.maxConsecutiveFailures) {
Logger.error({error: message}, 'Failed to refresh IP ban cache after notification');
} else {
Logger.warn({error: message}, 'Failed to refresh IP ban cache after notification');
}
});
}
});
if (this.messageHandler) {
subscription.on('message', this.messageHandler);
}
})
.catch((error) => {
Logger.error({error}, 'Failed to subscribe to IP ban refresh channel');
@ -203,10 +208,14 @@ class IpBanCache {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}

View File

@ -173,6 +173,15 @@ import {createMiddleware} from 'hono/factory';
const errorI18nService = new ErrorI18nService();
let _reportService: ReportService | null = null;
export function shutdownReportService(): void {
if (_reportService) {
_reportService.shutdown();
_reportService = null;
}
}
let _testEmailService: TestEmailService | null = null;
function getTestEmailService(): TestEmailService {
if (!_testEmailService) {
@ -617,19 +626,21 @@ export const ServiceMiddleware = createMiddleware<HonoEnv>(async (ctx, next) =>
const desktopHandoffService = new DesktopHandoffService(cacheService);
const authRequestService = new AuthRequestService(authService, ssoService, cacheService, desktopHandoffService);
const reportSearchService = getReportSearchService();
const reportService = new ReportService(
reportRepository,
channelRepository,
guildRepository,
userRepository,
inviteRepository,
emailService,
emailDnsValidationService,
snowflakeService,
storageService,
reportSearchService,
);
if (!_reportService) {
_reportService = new ReportService(
reportRepository,
channelRepository,
guildRepository,
userRepository,
inviteRepository,
emailService,
emailDnsValidationService,
snowflakeService,
storageService,
getReportSearchService(),
);
}
const reportService = _reportService;
const reportRequestService = new ReportRequestService(reportService);
const adminService = new AdminService(

View File

@ -34,6 +34,7 @@ export class VoiceTopology {
private subscribers: Set<Subscriber> = new Set();
private serverRotationIndex: Map<string, number> = new Map();
private kvSubscription: IKVSubscription | null = null;
private messageHandler: ((channel: string) => void) | null = null;
constructor(
private voiceRepository: IVoiceRepository,
@ -53,13 +54,14 @@ export class VoiceTopology {
this.kvSubscription = subscription;
await subscription.connect();
await subscription.subscribe(VOICE_CONFIGURATION_CHANNEL);
subscription.on('message', (channel) => {
this.messageHandler = (channel: string) => {
if (channel === VOICE_CONFIGURATION_CHANNEL) {
this.reload().catch((error) => {
Logger.error({error}, 'Failed to reload voice topology from KV notification');
});
}
});
};
subscription.on('message', this.messageHandler);
} catch (error) {
Logger.error({error}, 'Failed to subscribe to voice configuration channel');
}
@ -239,9 +241,13 @@ export class VoiceTopology {
}
shutdown(): void {
if (this.kvSubscription && this.messageHandler) {
this.kvSubscription.removeAllListeners('message');
}
if (this.kvSubscription) {
this.kvSubscription.disconnect();
this.kvSubscription = null;
}
this.messageHandler = null;
}
}