; direction?: 'ASC' | 'DESC' | undefined};
export interface QueryTemplate {
cql: string;
bind(params: P): PreparedQuery
;
}
export interface TableSelectOptions {
columns?: ReadonlyArray> | undefined;
where?: WhereExpr | ReadonlyArray> | undefined;
orderBy?: OrderBy | undefined;
limit?: number | undefined;
}
export interface TableDeleteOptions {
where?: WhereExpr | ReadonlyArray> | undefined;
}
export interface TableCountOptions {
where?: WhereExpr | ReadonlyArray> | undefined;
}
export interface TableDefinition, PartKey extends ColumnName = PK> {
name: string;
columns: ReadonlyArray>;
primaryKey: ReadonlyArray;
partitionKey?: ReadonlyArray | undefined;
}
export type TablePatch> = Partial<{
[K in Exclude, PK>]: DbOp>;
}>;
export interface Table, PartKey extends ColumnName = PK> {
name: string;
columns: ReadonlyArray>;
primaryKey: ReadonlyArray;
partitionKey: ReadonlyArray;
selectCql(opts?: TableSelectOptions): string;
select(opts?: TableSelectOptions): QueryTemplate;
updateAllCql(): string;
paramsFromRow(row: Row): CassandraParams;
upsertAll(row: Row): PreparedQuery;
patchByPk(pk: Pick, patch: TablePatch): PreparedQuery;
deleteCql(opts?: TableDeleteOptions): string;
delete(opts?: TableDeleteOptions): QueryTemplate;
deleteByPk(pk: Pick): PreparedQuery;
deletePartition(pk: Pick): PreparedQuery;
insertCql(opts?: {ttlParam?: string | undefined}): string;
insert(row: Row): PreparedQuery;
insertWithTtl(row: Row, ttlSeconds: number): PreparedQuery;
insertWithTtlParam(row: Row, ttlParamName: string): PreparedQuery;
insertIfNotExists(row: Row): PreparedQuery;
selectCountCql(opts?: TableCountOptions): string;
selectCount(opts?: TableCountOptions): QueryTemplate;
insertWithNow>(row: Omit, nowColumn: NowCol): PreparedQuery;
patchByPkWithTtl(pk: Pick, patch: TablePatch, ttlSeconds: number): PreparedQuery;
patchByPkWithTtlParam(
pk: Pick,
patch: TablePatch,
ttlParamName: string,
ttlValue: number,
): PreparedQuery;
upsertAllWithTtl(row: Row, ttlSeconds: number): PreparedQuery;
upsertAllWithTtlParam(row: Row, ttlParamName: string, ttlValue: number): PreparedQuery;
patchByPkIf, PK>>(
pk: Pick,
patch: TablePatch,
condition: {col: CondCol; expectedParam: string; expectedValue: RowValue},
): PreparedQuery;
where: {
eq: >(col: K, param?: string) => WhereExpr;
in: >(col: K, param: string) => WhereExpr;
lt: >(col: K, param?: string) => WhereExpr;
gt: >(col: K, param?: string) => WhereExpr;
lte: >(col: K, param?: string) => WhereExpr;
gte: >(col: K, param?: string) => WhereExpr;
tokenGt: >(col: K, param: string) => WhereExpr;
tupleGt: >(cols: ReadonlyArray, params: ReadonlyArray) => WhereExpr;
};
}
function prepared(cql: string, params: P): PreparedQuery
{
return {cql, params};
}
function asWhereArray(
where: WhereExpr | ReadonlyArray> | undefined,
): Array> {
if (where === undefined) {
return [];
}
if (Array.isArray(where)) {
return [...where];
}
return [where as WhereExpr];
}
function compileWhere(where: WhereExpr): string {
switch (where.kind) {
case 'eq':
return `${where.col} = :${where.param}`;
case 'in':
return `${where.col} IN :${where.param}`;
case 'lt':
return `${where.col} < :${where.param}`;
case 'gt':
return `${where.col} > :${where.param}`;
case 'lte':
return `${where.col} <= :${where.param}`;
case 'gte':
return `${where.col} >= :${where.param}`;
case 'tokenGt':
return `TOKEN(${where.col}) > TOKEN(:${where.param})`;
case 'tupleGt': {
if (where.cols.length === 0 || where.cols.length !== where.params.length) {
throw new Error('tupleGt requires equal-length, non-empty cols and params.');
}
const cols = `(${where.cols.join(', ')})`;
const params = `(${where.params.map((paramName) => `:${paramName}`).join(', ')})`;
return `${cols} > ${params}`;
}
default: {
const exhaustive: never = where;
return exhaustive;
}
}
}
function compileWhereClause(
where: WhereExpr | ReadonlyArray> | undefined,
): string {
const clauses = asWhereArray(where);
if (clauses.length === 0) {
return '';
}
return ` WHERE ${clauses.map((clause) => compileWhere(clause)).join(' AND ')}`;
}
function toCassandraValue(op: DbOp, tableName: string, columnName: string): CassandraValue {
if (op.kind === 'clear') {
return null;
}
if (op.value === undefined) {
throw new Error(`Patch value for "${tableName}.${columnName}" is undefined. Use Db.clear() to write null.`);
}
return op.value as CassandraValue;
}
function ensureUnique(values: ReadonlyArray, fieldName: string, tableName: string): void {
const seen = new Set();
for (const value of values) {
if (seen.has(value)) {
throw new Error(`Table "${tableName}" contains duplicate ${fieldName} value "${value}".`);
}
seen.add(value);
}
}
function assertValidTableDefinition, PartKey extends ColumnName>(
definition: TableDefinition,
): void {
const tableName = definition.name;
if (definition.columns.length === 0) {
throw new Error(`Table "${tableName}" must define at least one column.`);
}
if (definition.primaryKey.length === 0) {
throw new Error(`Table "${tableName}" must define at least one primary key column.`);
}
const columns = [...definition.columns] as Array;
const primaryKey = [...definition.primaryKey] as Array;
const partitionKey = [...(definition.partitionKey ?? definition.primaryKey)] as Array;
ensureUnique(columns, 'column', tableName);
ensureUnique(primaryKey, 'primary key', tableName);
ensureUnique(partitionKey, 'partition key', tableName);
const columnSet = new Set(columns);
for (const keyColumn of primaryKey) {
if (!columnSet.has(keyColumn)) {
throw new Error(`Primary key column "${tableName}.${keyColumn}" must exist in table columns.`);
}
}
const primaryKeySet = new Set(primaryKey);
for (const partitionColumn of partitionKey) {
if (!columnSet.has(partitionColumn)) {
throw new Error(`Partition key column "${tableName}.${partitionColumn}" must exist in table columns.`);
}
if (!primaryKeySet.has(partitionColumn)) {
throw new Error(`Partition key column "${tableName}.${partitionColumn}" must also be part of the primary key.`);
}
}
}
interface PatchCqlOptions {
ttlSeconds?: number | undefined;
ttlParamName?: string | undefined;
condition?: {col: ColumnName; expectedParam: string} | undefined;
}
class CassandraTable, PartKey extends ColumnName = PK>
implements Table
{
public readonly name: string;
public readonly columns: ReadonlyArray>;
public readonly primaryKey: ReadonlyArray;
public readonly partitionKey: ReadonlyArray;
public readonly where: Table['where'];
private readonly nonPrimaryKeyColumns: Array, PK>>;
private readonly updateAllStatement: string;
private readonly deleteByPrimaryKeyStatement: string;
private readonly insertBaseStatement: string;
public constructor(definition: TableDefinition) {
assertValidTableDefinition(definition);
this.name = definition.name;
this.columns = [...definition.columns];
this.primaryKey = [...definition.primaryKey];
this.partitionKey = definition.partitionKey
? [...definition.partitionKey]
: ([...definition.primaryKey] as unknown as Array);
this.nonPrimaryKeyColumns = this.columns.filter((column) => {
return !this.primaryKey.includes(column as PK);
}) as Array, PK>>;
this.updateAllStatement = this.buildUpdateAllStatement();
this.deleteByPrimaryKeyStatement = `DELETE FROM ${this.name} WHERE ${this.primaryKeyWhereClause()}`;
this.insertBaseStatement = `INSERT INTO ${this.name} (${this.columns.join(', ')}) VALUES (${this.columns.map((column) => `:${column}`).join(', ')})`;
this.where = {
eq>(col: K, param?: string): WhereExpr {
return {kind: 'eq', col, param: param ?? col};
},
in>(col: K, param: string): WhereExpr {
return {kind: 'in', col, param};
},
lt>(col: K, param?: string): WhereExpr {
return {kind: 'lt', col, param: param ?? col};
},
gt>(col: K, param?: string): WhereExpr {
return {kind: 'gt', col, param: param ?? col};
},
lte>(col: K, param?: string): WhereExpr {
return {kind: 'lte', col, param: param ?? col};
},
gte>(col: K, param?: string): WhereExpr {
return {kind: 'gte', col, param: param ?? col};
},
tokenGt>(col: K, param: string): WhereExpr {
return {kind: 'tokenGt', col, param};
},
tupleGt>(cols: ReadonlyArray, params: ReadonlyArray): WhereExpr {
return {kind: 'tupleGt', cols, params};
},
};
}
public selectCql(options: TableSelectOptions = {}): string {
const selectedColumns = (options.columns ?? this.columns).join(', ');
const whereClause = compileWhereClause(options.where);
const orderByClause = options.orderBy
? ` ORDER BY ${options.orderBy.col} ${options.orderBy.direction ?? 'ASC'}`
: '';
const limitClause = this.limitClause(options.limit);
return `SELECT ${selectedColumns} FROM ${this.name}${whereClause}${orderByClause}${limitClause}`;
}
public select(options: TableSelectOptions = {}): QueryTemplate {
const cql = this.selectCql(options);
return this.toTemplate(cql);
}
public updateAllCql(): string {
return this.updateAllStatement;
}
public paramsFromRow(row: Row): CassandraParams {
return this.collectRowParams(row, true);
}
public upsertAll(row: Row): PreparedQuery {
if (this.hasAllColumns(row)) {
return prepared(this.updateAllStatement, this.collectRowParams(row, true));
}
return this.buildDynamicUpsert(row);
}
public patchByPk(pk: Pick, patch: TablePatch): PreparedQuery {
const patchColumns = this.patchColumns(patch, 'PATCH update');
const cql = this.buildPatchStatement(patchColumns);
const params = this.primaryKeyParams(pk);
this.appendPatchParams(params, patch, patchColumns);
return prepared(cql, params);
}
public deleteCql(options: TableDeleteOptions = {}): string {
const whereClause = options.where ? compileWhereClause(options.where) : ` WHERE ${this.primaryKeyWhereClause()}`;
return `DELETE FROM ${this.name}${whereClause}`;
}
public delete(options: TableDeleteOptions = {}): QueryTemplate {
const cql = this.deleteCql(options);
return this.toTemplate(cql);
}
public deleteByPk(pk: Pick): PreparedQuery {
return prepared(this.deleteByPrimaryKeyStatement, this.primaryKeyParams(pk));
}
public deletePartition(pk: Pick): PreparedQuery {
if (this.partitionKey.length === 0) {
throw new Error(`Table "${this.name}" has no partition key columns.`);
}
const cql = `DELETE FROM ${this.name} WHERE ${this.partitionKeyWhereClause()}`;
return prepared(cql, this.partitionKeyParams(pk));
}
public insertCql(options: {ttlParam?: string | undefined} = {}): string {
if (!options.ttlParam) {
return this.insertBaseStatement;
}
return `${this.insertBaseStatement} USING TTL :${options.ttlParam}`;
}
public insert(row: Row): PreparedQuery {
return prepared(this.insertBaseStatement, this.collectRowParams(row, true));
}
public insertWithTtl(row: Row, ttlSeconds: number): PreparedQuery {
const cql = `${this.insertBaseStatement} USING TTL ${ttlSeconds}`;
return prepared(cql, this.collectRowParams(row, true));
}
public insertWithTtlParam(row: Row, ttlParamName: string): PreparedQuery {
const cql = `${this.insertBaseStatement} USING TTL :${ttlParamName}`;
const params = this.collectRowParams(row, true);
const rowRecord = row as Record;
const ttlValue = rowRecord[ttlParamName];
if (ttlValue === undefined) {
throw new Error(
`Row is missing TTL param value "${this.name}.${ttlParamName}". Include this field or use insertWithTtl().`,
);
}
params[ttlParamName] = ttlValue as CassandraValue;
return prepared(cql, params);
}
public insertIfNotExists(row: Row): PreparedQuery {
const cql = `${this.insertBaseStatement} IF NOT EXISTS`;
return prepared(cql, this.collectRowParams(row, true));
}
public selectCountCql(options: TableCountOptions = {}): string {
const whereClause = compileWhereClause(options.where);
return `SELECT COUNT(*) as count FROM ${this.name}${whereClause}`;
}
public selectCount(options: TableCountOptions = {}): QueryTemplate {
const cql = this.selectCountCql(options);
return this.toTemplate(cql);
}
public insertWithNow>(row: Omit, nowColumn: NowCol): PreparedQuery {
if (!this.columns.includes(nowColumn)) {
throw new Error(`Column "${this.name}.${nowColumn}" does not exist.`);
}
const columns = this.columns.filter((column) => {
return column !== nowColumn;
});
const cql = `INSERT INTO ${this.name} (${[...columns, nowColumn].join(', ')}) VALUES (${columns.map((column) => `:${column}`).join(', ')}, now())`;
const rowRecord = row as Record;
const params: CassandraParams = {};
for (const column of columns) {
const value = rowRecord[column];
if (value === undefined) {
throw new Error(`Row is missing value for "${this.name}.${column}". INSERT requires all non-now columns.`);
}
params[column] = value as CassandraValue;
}
return prepared(cql, params);
}
public patchByPkWithTtl(pk: Pick, patch: TablePatch, ttlSeconds: number): PreparedQuery {
const patchColumns = this.patchColumns(patch, 'PATCH update');
const cql = this.buildPatchStatement(patchColumns, {ttlSeconds});
const params = this.primaryKeyParams(pk);
this.appendPatchParams(params, patch, patchColumns);
return prepared(cql, params);
}
public patchByPkWithTtlParam(
pk: Pick,
patch: TablePatch,
ttlParamName: string,
ttlValue: number,
): PreparedQuery {
const patchColumns = this.patchColumns(patch, 'PATCH update');
const cql = this.buildPatchStatement(patchColumns, {ttlParamName});
const params = this.primaryKeyParams(pk);
this.appendPatchParams(params, patch, patchColumns);
params[ttlParamName] = ttlValue;
return prepared(cql, params);
}
public upsertAllWithTtl(row: Row, ttlSeconds: number): PreparedQuery {
const cql = this.nonPrimaryKeyColumns.length
? `UPDATE ${this.name} USING TTL ${ttlSeconds} SET ${this.nonPrimaryKeyColumns.map((column) => `${column} = :${column}`).join(', ')} WHERE ${this.primaryKeyWhereClause()}`
: `${this.insertBaseStatement} USING TTL ${ttlSeconds}`;
return prepared(cql, this.collectRowParams(row, true));
}
public upsertAllWithTtlParam(row: Row, ttlParamName: string, ttlValue: number): PreparedQuery {
const cql = this.nonPrimaryKeyColumns.length
? `UPDATE ${this.name} USING TTL :${ttlParamName} SET ${this.nonPrimaryKeyColumns.map((column) => `${column} = :${column}`).join(', ')} WHERE ${this.primaryKeyWhereClause()}`
: `${this.insertBaseStatement} USING TTL :${ttlParamName}`;
const params = this.collectRowParams(row, true);
params[ttlParamName] = ttlValue;
return prepared(cql, params);
}
public patchByPkIf, PK>>(
pk: Pick,
patch: TablePatch,
condition: {col: CondCol; expectedParam: string; expectedValue: RowValue},
): PreparedQuery {
const patchColumns = this.patchColumns(patch, 'conditional PATCH update');
const cql = this.buildPatchStatement(patchColumns, {
condition: {
col: condition.col,
expectedParam: condition.expectedParam,
},
});
const params = this.primaryKeyParams(pk);
this.appendPatchParams(params, patch, patchColumns);
params[condition.expectedParam] = condition.expectedValue as CassandraValue;
return prepared(cql, params);
}
private toTemplate(cql: string): QueryTemplate {
return {
cql,
bind(params: P): PreparedQuery
{
return prepared(cql, params);
},
};
}
private buildUpdateAllStatement(): string {
if (this.nonPrimaryKeyColumns.length === 0) {
return this.insertBaseStatement;
}
const setClause = this.nonPrimaryKeyColumns.map((column) => `${column} = :${column}`).join(', ');
return `UPDATE ${this.name} SET ${setClause} WHERE ${this.primaryKeyWhereClause()}`;
}
private collectRowParams(row: Row, requireAllColumns: boolean): CassandraParams {
const rowRecord = row as Record;
const params: CassandraParams = {};
for (const column of this.columns) {
const value = rowRecord[column];
if (value === undefined) {
if (requireAllColumns) {
throw new Error(
`Row is missing value for "${this.name}.${column}". Full-row operations require every column to be present.`,
);
}
continue;
}
params[column] = value as CassandraValue;
}
return params;
}
private hasAllColumns(row: Row): boolean {
const rowRecord = row as Record;
for (const column of this.columns) {
if (rowRecord[column] === undefined) {
return false;
}
}
return true;
}
private buildDynamicUpsert(row: Row): PreparedQuery {
const rowRecord = row as Record;
const params: CassandraParams = {};
const presentColumns: Array> = [];
for (const column of this.columns) {
const value = rowRecord[column];
if (value === undefined) {
continue;
}
presentColumns.push(column);
params[column] = value as CassandraValue;
}
for (const keyColumn of this.primaryKey) {
if (params[keyColumn] === undefined) {
throw new Error(
`Row is missing value for "${this.name}.${keyColumn}". Dynamic upserts require all primary key columns to be present.`,
);
}
}
const mutableColumns = presentColumns.filter((column) => {
return !this.primaryKey.includes(column as PK);
});
const cql = mutableColumns.length
? `UPDATE ${this.name} SET ${mutableColumns.map((column) => `${column} = :${column}`).join(', ')} WHERE ${this.primaryKeyWhereClause()}`
: `INSERT INTO ${this.name} (${this.primaryKey.join(', ')}) VALUES (${this.primaryKey.map((column) => `:${column}`).join(', ')})`;
return prepared(cql, params);
}
private patchColumns(patch: TablePatch, actionName: string): Array, PK>> {
const columns = Object.keys(patch) as Array, PK>>;
if (columns.length === 0) {
throw new Error(`Refusing to execute empty ${actionName} on table "${this.name}".`);
}
columns.sort((left, right) => {
return this.columns.indexOf(left) - this.columns.indexOf(right);
});
return columns;
}
private buildPatchStatement(
patchColumns: Array, PK>>,
options: PatchCqlOptions = {},
): string {
const setClause = patchColumns.map((column) => `${column} = :${column}`).join(', ');
const ttlClause =
options.ttlSeconds !== undefined
? ` USING TTL ${options.ttlSeconds}`
: options.ttlParamName
? ` USING TTL :${options.ttlParamName}`
: '';
const conditionClause = options.condition
? ` IF ${options.condition.col} = :${options.condition.expectedParam}`
: '';
return `UPDATE ${this.name}${ttlClause} SET ${setClause} WHERE ${this.primaryKeyWhereClause()}${conditionClause}`;
}
private appendPatchParams(
params: CassandraParams,
patch: TablePatch,
patchColumns: Array, PK>>,
): void {
for (const column of patchColumns) {
const op = patch[column];
if (!op) {
throw new Error(`Patch operation for "${this.name}.${column}" is missing.`);
}
params[column] = toCassandraValue(op, this.name, column);
}
}
private primaryKeyParams(pk: Pick): CassandraParams {
const record = pk as Record;
const params: CassandraParams = {};
for (const keyColumn of this.primaryKey) {
const value = record[keyColumn];
if (value === undefined) {
throw new Error(`Primary key value is missing for "${this.name}.${keyColumn}".`);
}
params[keyColumn] = value as CassandraValue;
}
return params;
}
private partitionKeyParams(pk: Pick): CassandraParams {
const record = pk as Record;
const params: CassandraParams = {};
for (const keyColumn of this.partitionKey) {
const value = record[keyColumn];
if (value === undefined) {
throw new Error(`Partition key value is missing for "${this.name}.${keyColumn}".`);
}
params[keyColumn] = value as CassandraValue;
}
return params;
}
private primaryKeyWhereClause(): string {
return this.primaryKey.map((column) => `${column} = :${column}`).join(' AND ');
}
private partitionKeyWhereClause(): string {
return this.partitionKey.map((column) => `${column} = :${column}`).join(' AND ');
}
private limitClause(limit: number | undefined): string {
if (limit === undefined) {
return '';
}
if (!Number.isInteger(limit) || limit <= 0) {
throw new Error(`SELECT limit for "${this.name}" must be a positive integer.`);
}
return ` LIMIT ${limit}`;
}
}
export function defineTable, PartKey extends ColumnName = PK>(
definition: TableDefinition,
): Table {
return new CassandraTable(definition);
}