/*
* Copyright (C) 2026 Fluxer Contributors
*
* This file is part of Fluxer.
*
* Fluxer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Fluxer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Fluxer. If not, see .
*/
import crypto from 'node:crypto';
import fs from 'node:fs';
import net from 'node:net';
import path from 'node:path';
import {parseArgs} from 'node:util';
import cassandra from 'cassandra-driver';
const MIGRATION_TABLE = 'schema_migrations';
const MIGRATION_KEYSPACE = process.env['CASSANDRA_KEYSPACE'] ?? 'fluxer';
const MIGRATIONS_DIR = '../fluxer_devops/cassandra/migrations';
interface ForbiddenPattern {
pattern: RegExp;
message: string;
}
const FORBIDDEN_PATTERNS: Array = [
{pattern: /\bCREATE\s+INDEX\b/i, message: 'Secondary indexes are forbidden (CREATE INDEX)'},
{pattern: /\bCREATE\s+CUSTOM\s+INDEX\b/i, message: 'Custom indexes are forbidden (CREATE CUSTOM INDEX)'},
{
pattern: /\bCREATE\s+MATERIALIZED\s+VIEW\b/i,
message: 'Materialized views are forbidden (CREATE MATERIALIZED VIEW)',
},
{pattern: /\bDROP\s+TABLE\b/i, message: 'DROP TABLE is forbidden'},
{pattern: /\bDROP\s+KEYSPACE\b/i, message: 'DROP KEYSPACE is forbidden'},
{pattern: /\bDROP\s+TYPE\b/i, message: 'DROP TYPE is forbidden'},
{pattern: /\bDROP\s+INDEX\b/i, message: 'DROP INDEX is forbidden'},
{pattern: /\bDROP\s+MATERIALIZED\s+VIEW\b/i, message: 'DROP MATERIALIZED VIEW is forbidden'},
{pattern: /\bDROP\s+COLUMN\b/i, message: 'DROP COLUMN is forbidden (use ALTER TABLE ... DROP ...)'},
{pattern: /\bTRUNCATE\b/i, message: 'TRUNCATE is forbidden'},
];
function getMigrationsDir(): string {
return MIGRATIONS_DIR;
}
function getMigrationPath(filename: string): string {
return path.join(getMigrationsDir(), filename);
}
function sanitizeName(name: string): string {
let result = name.replace(/ /g, '_').replace(/-/g, '_').toLowerCase();
result = result
.split('')
.filter((c) => /[a-z0-9_]/.test(c))
.join('');
while (result.includes('__')) {
result = result.replace(/__/g, '_');
}
return result.replace(/^_+|_+$/g, '');
}
function removeComments(content: string): string {
return content
.split('\n')
.map((line) => {
const idx = line.indexOf('--');
return idx !== -1 ? line.slice(0, idx) : line;
})
.map((line) => line.trim())
.filter((line) => line.length > 0)
.join('\n');
}
function parseStatements(content: string): Array {
const statements: Array = [];
let currentStatement = '';
for (const line of content.split('\n')) {
const cleanLine = line.includes('--') ? line.slice(0, line.indexOf('--')) : line;
const trimmed = cleanLine.trim();
if (trimmed.length === 0) {
continue;
}
currentStatement += `${trimmed} `;
if (trimmed.endsWith(';')) {
const statement = currentStatement.trim();
if (statement.length > 0) {
statements.push(statement);
}
currentStatement = '';
}
}
if (currentStatement.trim().length > 0) {
statements.push(currentStatement.trim());
}
return statements;
}
function calculateChecksum(content: string): string {
return crypto.createHash('md5').update(content).digest('hex');
}
function validateMigrationContent(filename: string, content: string): Array {
const errors: Array = [];
const cleanContent = removeComments(content);
for (const forbidden of FORBIDDEN_PATTERNS) {
if (forbidden.pattern.test(cleanContent)) {
errors.push(` ${filename}: ${forbidden.message}`);
}
}
if (cleanContent.trim().length === 0) {
errors.push(` ${filename}: migration file is empty`);
}
return errors;
}
function getMigrationFiles(): Array {
const migrationsDir = getMigrationsDir();
if (!fs.existsSync(migrationsDir)) {
return [];
}
const files = fs.readdirSync(migrationsDir);
const migrations = files
.filter((file) => {
const filePath = path.join(migrationsDir, file);
return fs.statSync(filePath).isFile() && file.endsWith('.cql');
})
.sort();
return migrations;
}
function hasSkipCi(filename: string): boolean {
const content = fs.readFileSync(getMigrationPath(filename), 'utf-8');
const lines = content.split('\n').slice(0, 10);
for (const line of lines) {
const lower = line.trim().toLowerCase();
if (lower.includes('-- skip ci') || lower.includes('--skip ci')) {
return true;
}
}
return false;
}
async function sleep(ms: number): Promise {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function createSession(
host: string,
port: number,
username: string,
password: string,
): Promise {
const maxRetries = 5;
const retryDelay = 10000;
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
if (attempt > 1) {
console.log(`Retrying connection (attempt ${attempt}/${maxRetries})...`);
}
try {
const client = new cassandra.Client({
contactPoints: [`${host}:${port}`],
localDataCenter: 'dc1',
credentials: {username, password},
socketOptions: {connectTimeout: 60000},
});
await client.connect();
return client;
} catch (e) {
lastError = e instanceof Error ? e : new Error(String(e));
console.log(`Connection attempt ${attempt}/${maxRetries} failed: ${lastError.message}`);
if (attempt < maxRetries) {
await sleep(retryDelay);
}
}
}
throw new Error(`Failed to connect to Cassandra after ${maxRetries} attempts: ${lastError?.message}`);
}
async function getAppliedMigrations(session: cassandra.Client): Promise