From cbe914cf6f16d2bbf49991c2dfa23515bac7096c Mon Sep 17 00:00:00 2001 From: hampus-fluxer Date: Tue, 6 Jan 2026 01:32:00 +0100 Subject: [PATCH] fix(search): make closed DM search work correctly (#37) --- fluxer_admin/src/fluxer_admin/constants.gleam | 6 +++ fluxer_api/src/Tables.ts | 8 +++ .../channel/services/message/dmScopeUtils.ts | 7 +++ fluxer_api/src/constants/User.ts | 1 + fluxer_api/src/database/types/UserTypes.ts | 9 ++++ fluxer_api/src/rpc/RpcService.ts | 53 +++++++++++++++++++ .../repositories/IUserChannelRepository.ts | 2 + .../repositories/UserChannelRepository.ts | 29 +++++++++- .../src/user/repositories/UserRepository.ts | 8 +++ .../20260106001944_user_dm_history.cql | 5 ++ 10 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 fluxer_devops/cassandra/migrations/20260106001944_user_dm_history.cql diff --git a/fluxer_admin/src/fluxer_admin/constants.gleam b/fluxer_admin/src/fluxer_admin/constants.gleam index ec03824d..8b5a10bc 100644 --- a/fluxer_admin/src/fluxer_admin/constants.gleam +++ b/fluxer_admin/src/fluxer_admin/constants.gleam @@ -69,6 +69,11 @@ pub const flag_app_store_reviewer = Flag( 9_007_199_254_740_992, ) +pub const flag_dm_history_backfilled = Flag( + "DM_HISTORY_BACKFILLED", + 18_014_398_509_481_984, +) + pub fn get_patchable_flags() -> List(Flag) { [ flag_staff, @@ -84,6 +89,7 @@ pub fn get_patchable_flags() -> List(Flag) { flag_pending_manual_verification, flag_used_mobile_client, flag_app_store_reviewer, + flag_dm_history_backfilled, ] } diff --git a/fluxer_api/src/Tables.ts b/fluxer_api/src/Tables.ts index e39a70af..6a67cb08 100644 --- a/fluxer_api/src/Tables.ts +++ b/fluxer_api/src/Tables.ts @@ -161,6 +161,7 @@ import { USER_BY_USERNAME_COLUMNS, USER_COLUMNS, USER_CONTACT_CHANGE_LOG_COLUMNS, + USER_DM_HISTORY_COLUMNS, USER_GUILD_SETTINGS_COLUMNS, USER_HARVEST_COLUMNS, USER_SETTINGS_COLUMNS, @@ -171,6 +172,7 @@ import { type UserByStripeSubscriptionIdRow, type UserByUsernameRow, type UserContactChangeLogRow, + type UserDmHistoryRow, type UserGuildSettingsRow, type UserHarvestRow, type UserRow, @@ -200,6 +202,12 @@ export const UsersPendingDeletion = defineTable< primaryKey: ['deletion_date', 'pending_deletion_at', 'user_id'], }); +export const UserDmHistory = defineTable({ + name: 'user_dm_history', + columns: USER_DM_HISTORY_COLUMNS, + primaryKey: ['user_id', 'channel_id'], +}); + export const UserByUsername = defineTable({ name: 'users_by_username', columns: USER_BY_USERNAME_COLUMNS, diff --git a/fluxer_api/src/channel/services/message/dmScopeUtils.ts b/fluxer_api/src/channel/services/message/dmScopeUtils.ts index f8c16f87..acc2cc6c 100644 --- a/fluxer_api/src/channel/services/message/dmScopeUtils.ts +++ b/fluxer_api/src/channel/services/message/dmScopeUtils.ts @@ -54,6 +54,13 @@ export const getDmChannelIdsForScope = async ({ channelIdStrings.add(summary.channelId.toString()); } + if (scope === 'all_dms') { + const historicalIds = await userRepository.listHistoricalDmChannelIds(userId); + for (const channelId of historicalIds) { + channelIdStrings.add(channelId.toString()); + } + } + if (includeChannelId) { channelIdStrings.add(includeChannelId.toString()); } diff --git a/fluxer_api/src/constants/User.ts b/fluxer_api/src/constants/User.ts index ca651f69..bbbf4128 100644 --- a/fluxer_api/src/constants/User.ts +++ b/fluxer_api/src/constants/User.ts @@ -55,6 +55,7 @@ export const UserFlags = { HAS_DISMISSED_PREMIUM_ONBOARDING: 1n << 51n, USED_MOBILE_CLIENT: 1n << 52n, APP_STORE_REVIEWER: 1n << 53n, + HAS_DM_HISTORY_BACKFILLED: 1n << 54n, } as const; export const PUBLIC_USER_FLAGS = UserFlags.STAFF | UserFlags.CTP_MEMBER | UserFlags.PARTNER | UserFlags.BUG_HUNTER; diff --git a/fluxer_api/src/database/types/UserTypes.ts b/fluxer_api/src/database/types/UserTypes.ts index 05dc0f8d..6d8ad205 100644 --- a/fluxer_api/src/database/types/UserTypes.ts +++ b/fluxer_api/src/database/types/UserTypes.ts @@ -592,3 +592,12 @@ export const USERS_PENDING_DELETION_COLUMNS = [ 'user_id', 'deletion_reason_code', ] as const satisfies ReadonlyArray; + +export interface UserDmHistoryRow { + user_id: UserID; + channel_id: ChannelID; +} + +export const USER_DM_HISTORY_COLUMNS = ['user_id', 'channel_id'] as const satisfies ReadonlyArray< + keyof UserDmHistoryRow +>; diff --git a/fluxer_api/src/rpc/RpcService.ts b/fluxer_api/src/rpc/RpcService.ts index 9fdb553e..434802fa 100644 --- a/fluxer_api/src/rpc/RpcService.ts +++ b/fluxer_api/src/rpc/RpcService.ts @@ -28,6 +28,7 @@ import { createMessageID, createUserID, type GuildID, + type MessageID, type UserID, userIdToChannelId, vanityCodeToInviteCode, @@ -152,6 +153,8 @@ interface UserData { pinnedDMs: Array; } +const DM_HISTORY_BATCH_SIZE = 1000; + export class RpcService { private readonly customStatusValidator: CustomStatusValidator; @@ -219,6 +222,47 @@ export class RpcService { }); } + private async backfillHistoricalDmChannels(userId: UserID): Promise { + const processedChannels = new Set(); + let lastChannelId: ChannelID | undefined; + let lastMessageId: MessageID | undefined; + + while (true) { + const messageRefs = await this.channelRepository.listMessagesByAuthor( + userId, + DM_HISTORY_BATCH_SIZE, + lastChannelId, + lastMessageId, + ); + if (messageRefs.length === 0) { + break; + } + + for (const {channelId} of messageRefs) { + const channelKey = channelId.toString(); + if (processedChannels.has(channelKey)) { + continue; + } + processedChannels.add(channelKey); + + const channel = await this.channelRepository.channelData.findUnique(channelId); + if (!channel || channel.guildId || channel.type !== ChannelTypes.DM) { + continue; + } + + await this.userRepository.recordHistoricalDmChannel(userId, channelId, false); + } + + const lastRef = messageRefs[messageRefs.length - 1]; + lastChannelId = lastRef.channelId; + lastMessageId = lastRef.messageId; + + if (messageRefs.length < DM_HISTORY_BATCH_SIZE) { + break; + } + } + } + private async updateGuildMemberCount(guild: Guild, actualMemberCount: number): Promise { if (guild.memberCount === actualMemberCount) { return guild; @@ -651,6 +695,15 @@ export class RpcService { }); } + if (!(user.flags & UserFlags.HAS_DM_HISTORY_BACKFILLED)) { + try { + await this.backfillHistoricalDmChannels(user.id); + flagsToUpdate = (flagsToUpdate ?? user.flags) | UserFlags.HAS_DM_HISTORY_BACKFILLED; + } catch (error) { + Logger.warn({userId: user.id, error}, 'Failed to backfill DM history'); + } + } + if (flagsToUpdate !== null && flagsToUpdate !== user.flags) { await this.userRepository.patchUpsert(user.id, { flags: flagsToUpdate, diff --git a/fluxer_api/src/user/repositories/IUserChannelRepository.ts b/fluxer_api/src/user/repositories/IUserChannelRepository.ts index 787e383c..fa15d1fa 100644 --- a/fluxer_api/src/user/repositories/IUserChannelRepository.ts +++ b/fluxer_api/src/user/repositories/IUserChannelRepository.ts @@ -32,6 +32,8 @@ export interface IUserChannelRepository { listPrivateChannels(userId: UserID): Promise>; deleteAllPrivateChannels(userId: UserID): Promise; listPrivateChannelSummaries(userId: UserID): Promise>; + listHistoricalDmChannelIds(userId: UserID): Promise>; + recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise; findExistingDmState(user1Id: UserID, user2Id: UserID): Promise; createDmChannelAndState(user1Id: UserID, user2Id: UserID, channelId: ChannelID): Promise; diff --git a/fluxer_api/src/user/repositories/UserChannelRepository.ts b/fluxer_api/src/user/repositories/UserChannelRepository.ts index 87c43565..e070a634 100644 --- a/fluxer_api/src/user/repositories/UserChannelRepository.ts +++ b/fluxer_api/src/user/repositories/UserChannelRepository.ts @@ -22,7 +22,7 @@ import {ChannelTypes} from '~/Constants'; import {BatchBuilder, deleteOneOrMany, fetchMany, fetchManyInChunks, fetchOne, upsertOne} from '~/database/Cassandra'; import type {ChannelRow, DmStateRow, PrivateChannelRow} from '~/database/CassandraTypes'; import {Channel} from '~/Models'; -import {Channels, DmStates, PinnedDms, PrivateChannels, ReadStates} from '~/Tables'; +import {Channels, DmStates, PinnedDms, PrivateChannels, ReadStates, UserDmHistory} from '~/Tables'; import type {IUserChannelRepository, PrivateChannelSummary} from './IUserChannelRepository'; interface PinnedDmRow { @@ -77,6 +77,11 @@ const FETCH_PRIVATE_CHANNELS_CQL = PrivateChannels.selectCql({ where: PrivateChannels.where.eq('user_id'), }); +const HISTORICAL_DM_CHANNELS_CQL = UserDmHistory.selectCql({ + columns: ['channel_id'], + where: UserDmHistory.where.eq('user_id'), +}); + const FETCH_CHANNEL_METADATA_CQL = Channels.selectCql({ columns: ['channel_id', 'type', 'last_message_id', 'soft_deleted'], where: [Channels.where.in('channel_id', 'channel_ids'), {kind: 'eq', col: 'soft_deleted', param: 'soft_deleted'}], @@ -325,6 +330,13 @@ export class UserChannelRepository implements IUserChannelRepository { }); } + async listHistoricalDmChannelIds(userId: UserID): Promise> { + const rows = await fetchMany<{channel_id: ChannelID}>(HISTORICAL_DM_CHANNELS_CQL, { + user_id: userId, + }); + return rows.map((row) => row.channel_id); + } + async openDmForUser(userId: UserID, channelId: ChannelID, isGroupDm?: boolean): Promise { let resolvedIsGroupDm: boolean; if (isGroupDm !== undefined) { @@ -337,6 +349,8 @@ export class UserChannelRepository implements IUserChannelRepository { resolvedIsGroupDm = channelRow?.type === ChannelTypes.GROUP_DM; } + await this.recordHistoricalDmChannel(userId, channelId, resolvedIsGroupDm); + await upsertOne( PrivateChannels.upsertAll({ user_id: userId, @@ -346,6 +360,19 @@ export class UserChannelRepository implements IUserChannelRepository { ); } + async recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise { + if (isGroupDm) { + return; + } + + await upsertOne( + UserDmHistory.upsertAll({ + user_id: userId, + channel_id: channelId, + }), + ); + } + async removePinnedDm(userId: UserID, channelId: ChannelID): Promise> { await deleteOneOrMany( PinnedDms.deleteByPk({ diff --git a/fluxer_api/src/user/repositories/UserRepository.ts b/fluxer_api/src/user/repositories/UserRepository.ts index eae8e98d..c06f281e 100644 --- a/fluxer_api/src/user/repositories/UserRepository.ts +++ b/fluxer_api/src/user/repositories/UserRepository.ts @@ -435,6 +435,14 @@ export class UserRepository implements IUserRepositoryAggregate { return this.channelRepo.listPrivateChannels(userId); } + async listHistoricalDmChannelIds(userId: UserID): Promise> { + return this.channelRepo.listHistoricalDmChannelIds(userId); + } + + async recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise { + return this.channelRepo.recordHistoricalDmChannel(userId, channelId, isGroupDm); + } + async listPrivateChannelSummaries(userId: UserID): Promise> { return this.channelRepo.listPrivateChannelSummaries(userId); } diff --git a/fluxer_devops/cassandra/migrations/20260106001944_user_dm_history.cql b/fluxer_devops/cassandra/migrations/20260106001944_user_dm_history.cql new file mode 100644 index 00000000..272d11ef --- /dev/null +++ b/fluxer_devops/cassandra/migrations/20260106001944_user_dm_history.cql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS fluxer.user_dm_history ( + user_id bigint, + channel_id bigint, + PRIMARY KEY ((user_id), channel_id) +);