fix(search): make closed DM search work correctly (#37)
This commit is contained in:
parent
ea0a2d8aae
commit
cbe914cf6f
@ -69,6 +69,11 @@ pub const flag_app_store_reviewer = Flag(
|
||||
9_007_199_254_740_992,
|
||||
)
|
||||
|
||||
pub const flag_dm_history_backfilled = Flag(
|
||||
"DM_HISTORY_BACKFILLED",
|
||||
18_014_398_509_481_984,
|
||||
)
|
||||
|
||||
pub fn get_patchable_flags() -> List(Flag) {
|
||||
[
|
||||
flag_staff,
|
||||
@ -84,6 +89,7 @@ pub fn get_patchable_flags() -> List(Flag) {
|
||||
flag_pending_manual_verification,
|
||||
flag_used_mobile_client,
|
||||
flag_app_store_reviewer,
|
||||
flag_dm_history_backfilled,
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@ -161,6 +161,7 @@ import {
|
||||
USER_BY_USERNAME_COLUMNS,
|
||||
USER_COLUMNS,
|
||||
USER_CONTACT_CHANGE_LOG_COLUMNS,
|
||||
USER_DM_HISTORY_COLUMNS,
|
||||
USER_GUILD_SETTINGS_COLUMNS,
|
||||
USER_HARVEST_COLUMNS,
|
||||
USER_SETTINGS_COLUMNS,
|
||||
@ -171,6 +172,7 @@ import {
|
||||
type UserByStripeSubscriptionIdRow,
|
||||
type UserByUsernameRow,
|
||||
type UserContactChangeLogRow,
|
||||
type UserDmHistoryRow,
|
||||
type UserGuildSettingsRow,
|
||||
type UserHarvestRow,
|
||||
type UserRow,
|
||||
@ -200,6 +202,12 @@ export const UsersPendingDeletion = defineTable<
|
||||
primaryKey: ['deletion_date', 'pending_deletion_at', 'user_id'],
|
||||
});
|
||||
|
||||
export const UserDmHistory = defineTable<UserDmHistoryRow, 'user_id' | 'channel_id'>({
|
||||
name: 'user_dm_history',
|
||||
columns: USER_DM_HISTORY_COLUMNS,
|
||||
primaryKey: ['user_id', 'channel_id'],
|
||||
});
|
||||
|
||||
export const UserByUsername = defineTable<UserByUsernameRow, 'username' | 'discriminator' | 'user_id'>({
|
||||
name: 'users_by_username',
|
||||
columns: USER_BY_USERNAME_COLUMNS,
|
||||
|
||||
@ -54,6 +54,13 @@ export const getDmChannelIdsForScope = async ({
|
||||
channelIdStrings.add(summary.channelId.toString());
|
||||
}
|
||||
|
||||
if (scope === 'all_dms') {
|
||||
const historicalIds = await userRepository.listHistoricalDmChannelIds(userId);
|
||||
for (const channelId of historicalIds) {
|
||||
channelIdStrings.add(channelId.toString());
|
||||
}
|
||||
}
|
||||
|
||||
if (includeChannelId) {
|
||||
channelIdStrings.add(includeChannelId.toString());
|
||||
}
|
||||
|
||||
@ -55,6 +55,7 @@ export const UserFlags = {
|
||||
HAS_DISMISSED_PREMIUM_ONBOARDING: 1n << 51n,
|
||||
USED_MOBILE_CLIENT: 1n << 52n,
|
||||
APP_STORE_REVIEWER: 1n << 53n,
|
||||
HAS_DM_HISTORY_BACKFILLED: 1n << 54n,
|
||||
} as const;
|
||||
|
||||
export const PUBLIC_USER_FLAGS = UserFlags.STAFF | UserFlags.CTP_MEMBER | UserFlags.PARTNER | UserFlags.BUG_HUNTER;
|
||||
|
||||
@ -592,3 +592,12 @@ export const USERS_PENDING_DELETION_COLUMNS = [
|
||||
'user_id',
|
||||
'deletion_reason_code',
|
||||
] as const satisfies ReadonlyArray<keyof UsersPendingDeletionRow>;
|
||||
|
||||
export interface UserDmHistoryRow {
|
||||
user_id: UserID;
|
||||
channel_id: ChannelID;
|
||||
}
|
||||
|
||||
export const USER_DM_HISTORY_COLUMNS = ['user_id', 'channel_id'] as const satisfies ReadonlyArray<
|
||||
keyof UserDmHistoryRow
|
||||
>;
|
||||
|
||||
@ -28,6 +28,7 @@ import {
|
||||
createMessageID,
|
||||
createUserID,
|
||||
type GuildID,
|
||||
type MessageID,
|
||||
type UserID,
|
||||
userIdToChannelId,
|
||||
vanityCodeToInviteCode,
|
||||
@ -152,6 +153,8 @@ interface UserData {
|
||||
pinnedDMs: Array<ChannelID>;
|
||||
}
|
||||
|
||||
const DM_HISTORY_BATCH_SIZE = 1000;
|
||||
|
||||
export class RpcService {
|
||||
private readonly customStatusValidator: CustomStatusValidator;
|
||||
|
||||
@ -219,6 +222,47 @@ export class RpcService {
|
||||
});
|
||||
}
|
||||
|
||||
private async backfillHistoricalDmChannels(userId: UserID): Promise<void> {
|
||||
const processedChannels = new Set<string>();
|
||||
let lastChannelId: ChannelID | undefined;
|
||||
let lastMessageId: MessageID | undefined;
|
||||
|
||||
while (true) {
|
||||
const messageRefs = await this.channelRepository.listMessagesByAuthor(
|
||||
userId,
|
||||
DM_HISTORY_BATCH_SIZE,
|
||||
lastChannelId,
|
||||
lastMessageId,
|
||||
);
|
||||
if (messageRefs.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (const {channelId} of messageRefs) {
|
||||
const channelKey = channelId.toString();
|
||||
if (processedChannels.has(channelKey)) {
|
||||
continue;
|
||||
}
|
||||
processedChannels.add(channelKey);
|
||||
|
||||
const channel = await this.channelRepository.channelData.findUnique(channelId);
|
||||
if (!channel || channel.guildId || channel.type !== ChannelTypes.DM) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.userRepository.recordHistoricalDmChannel(userId, channelId, false);
|
||||
}
|
||||
|
||||
const lastRef = messageRefs[messageRefs.length - 1];
|
||||
lastChannelId = lastRef.channelId;
|
||||
lastMessageId = lastRef.messageId;
|
||||
|
||||
if (messageRefs.length < DM_HISTORY_BATCH_SIZE) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async updateGuildMemberCount(guild: Guild, actualMemberCount: number): Promise<Guild> {
|
||||
if (guild.memberCount === actualMemberCount) {
|
||||
return guild;
|
||||
@ -651,6 +695,15 @@ export class RpcService {
|
||||
});
|
||||
}
|
||||
|
||||
if (!(user.flags & UserFlags.HAS_DM_HISTORY_BACKFILLED)) {
|
||||
try {
|
||||
await this.backfillHistoricalDmChannels(user.id);
|
||||
flagsToUpdate = (flagsToUpdate ?? user.flags) | UserFlags.HAS_DM_HISTORY_BACKFILLED;
|
||||
} catch (error) {
|
||||
Logger.warn({userId: user.id, error}, 'Failed to backfill DM history');
|
||||
}
|
||||
}
|
||||
|
||||
if (flagsToUpdate !== null && flagsToUpdate !== user.flags) {
|
||||
await this.userRepository.patchUpsert(user.id, {
|
||||
flags: flagsToUpdate,
|
||||
|
||||
@ -32,6 +32,8 @@ export interface IUserChannelRepository {
|
||||
listPrivateChannels(userId: UserID): Promise<Array<Channel>>;
|
||||
deleteAllPrivateChannels(userId: UserID): Promise<void>;
|
||||
listPrivateChannelSummaries(userId: UserID): Promise<Array<PrivateChannelSummary>>;
|
||||
listHistoricalDmChannelIds(userId: UserID): Promise<Array<ChannelID>>;
|
||||
recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise<void>;
|
||||
|
||||
findExistingDmState(user1Id: UserID, user2Id: UserID): Promise<Channel | null>;
|
||||
createDmChannelAndState(user1Id: UserID, user2Id: UserID, channelId: ChannelID): Promise<Channel>;
|
||||
|
||||
@ -22,7 +22,7 @@ import {ChannelTypes} from '~/Constants';
|
||||
import {BatchBuilder, deleteOneOrMany, fetchMany, fetchManyInChunks, fetchOne, upsertOne} from '~/database/Cassandra';
|
||||
import type {ChannelRow, DmStateRow, PrivateChannelRow} from '~/database/CassandraTypes';
|
||||
import {Channel} from '~/Models';
|
||||
import {Channels, DmStates, PinnedDms, PrivateChannels, ReadStates} from '~/Tables';
|
||||
import {Channels, DmStates, PinnedDms, PrivateChannels, ReadStates, UserDmHistory} from '~/Tables';
|
||||
import type {IUserChannelRepository, PrivateChannelSummary} from './IUserChannelRepository';
|
||||
|
||||
interface PinnedDmRow {
|
||||
@ -77,6 +77,11 @@ const FETCH_PRIVATE_CHANNELS_CQL = PrivateChannels.selectCql({
|
||||
where: PrivateChannels.where.eq('user_id'),
|
||||
});
|
||||
|
||||
const HISTORICAL_DM_CHANNELS_CQL = UserDmHistory.selectCql({
|
||||
columns: ['channel_id'],
|
||||
where: UserDmHistory.where.eq('user_id'),
|
||||
});
|
||||
|
||||
const FETCH_CHANNEL_METADATA_CQL = Channels.selectCql({
|
||||
columns: ['channel_id', 'type', 'last_message_id', 'soft_deleted'],
|
||||
where: [Channels.where.in('channel_id', 'channel_ids'), {kind: 'eq', col: 'soft_deleted', param: 'soft_deleted'}],
|
||||
@ -325,6 +330,13 @@ export class UserChannelRepository implements IUserChannelRepository {
|
||||
});
|
||||
}
|
||||
|
||||
async listHistoricalDmChannelIds(userId: UserID): Promise<Array<ChannelID>> {
|
||||
const rows = await fetchMany<{channel_id: ChannelID}>(HISTORICAL_DM_CHANNELS_CQL, {
|
||||
user_id: userId,
|
||||
});
|
||||
return rows.map((row) => row.channel_id);
|
||||
}
|
||||
|
||||
async openDmForUser(userId: UserID, channelId: ChannelID, isGroupDm?: boolean): Promise<void> {
|
||||
let resolvedIsGroupDm: boolean;
|
||||
if (isGroupDm !== undefined) {
|
||||
@ -337,6 +349,8 @@ export class UserChannelRepository implements IUserChannelRepository {
|
||||
resolvedIsGroupDm = channelRow?.type === ChannelTypes.GROUP_DM;
|
||||
}
|
||||
|
||||
await this.recordHistoricalDmChannel(userId, channelId, resolvedIsGroupDm);
|
||||
|
||||
await upsertOne(
|
||||
PrivateChannels.upsertAll({
|
||||
user_id: userId,
|
||||
@ -346,6 +360,19 @@ export class UserChannelRepository implements IUserChannelRepository {
|
||||
);
|
||||
}
|
||||
|
||||
async recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise<void> {
|
||||
if (isGroupDm) {
|
||||
return;
|
||||
}
|
||||
|
||||
await upsertOne(
|
||||
UserDmHistory.upsertAll({
|
||||
user_id: userId,
|
||||
channel_id: channelId,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
async removePinnedDm(userId: UserID, channelId: ChannelID): Promise<Array<ChannelID>> {
|
||||
await deleteOneOrMany(
|
||||
PinnedDms.deleteByPk({
|
||||
|
||||
@ -435,6 +435,14 @@ export class UserRepository implements IUserRepositoryAggregate {
|
||||
return this.channelRepo.listPrivateChannels(userId);
|
||||
}
|
||||
|
||||
async listHistoricalDmChannelIds(userId: UserID): Promise<Array<ChannelID>> {
|
||||
return this.channelRepo.listHistoricalDmChannelIds(userId);
|
||||
}
|
||||
|
||||
async recordHistoricalDmChannel(userId: UserID, channelId: ChannelID, isGroupDm: boolean): Promise<void> {
|
||||
return this.channelRepo.recordHistoricalDmChannel(userId, channelId, isGroupDm);
|
||||
}
|
||||
|
||||
async listPrivateChannelSummaries(userId: UserID): Promise<Array<PrivateChannelSummary>> {
|
||||
return this.channelRepo.listPrivateChannelSummaries(userId);
|
||||
}
|
||||
|
||||
@ -0,0 +1,5 @@
|
||||
CREATE TABLE IF NOT EXISTS fluxer.user_dm_history (
|
||||
user_id bigint,
|
||||
channel_id bigint,
|
||||
PRIMARY KEY ((user_id), channel_id)
|
||||
);
|
||||
Loading…
x
Reference in New Issue
Block a user