Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 106 additions & 7 deletions api/jobs/mutationManagerJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ interface JobConfig {

interface MutationTask extends Document {
_id: ObjectId | string;
type: 'delete' | 'update';
type: 'delete' | 'update' | 'native_ch';
db: string;
collection: string;
query: Record<string, unknown>;
update?: Record<string, unknown>;
native_sql?: string;
running: boolean;
status: string;
hb?: number;
Expand Down Expand Up @@ -272,7 +273,7 @@ class MutationManagerJob extends Job {
*/
async processTask(task: MutationTask, summary: SummaryEntry[], jobConfig: JobConfig = jobConfigState || DEFAULT_JOB_CONFIG): Promise<void> {
const type = task.type;
if (type !== 'delete' && type !== 'update') {
if (type !== 'delete' && type !== 'update' && type !== 'native_ch') {
await common.db.collection('mutation_manager').updateOne(
{ _id: task._id },
{
Expand All @@ -288,7 +289,7 @@ class MutationManagerJob extends Job {
const clickhouseEnabled = mutationManager.isClickhouseEnabled();
const hasClickhouseDelete = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.deleteGranularDataByQuery);
const hasClickhouseUpdate = clickhouseEnabled && !!(clickHouseRunner && clickHouseRunner.updateGranularDataByQuery);
const hasClickhouse = (type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete);
const hasClickhouse = type === 'native_ch' ? clickhouseEnabled : (type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete);

if (!mongoDb && !hasClickhouse) {
const reason = `mongo_db_unavailable:${task.db || 'missing'}`;
Expand Down Expand Up @@ -317,7 +318,11 @@ class MutationManagerJob extends Job {
}

let mongoOk = true;
if (mongoDb) {
if (type === 'native_ch') {
// Native CH mutations skip MongoDB entirely
log.d('Native CH mutation - skipping MongoDB', { taskId: task._id });
}
else if (mongoDb) {
if (type === 'update') {
mongoOk = await this.updateMongo(task, mongoDb);
}
Expand All @@ -330,7 +335,10 @@ class MutationManagerJob extends Job {
}

let chScheduledOk = true;
if (type === 'update' && hasClickhouseUpdate) {
if (type === 'native_ch' && clickhouseEnabled) {
chScheduledOk = await this.executeNativeClickhouse(task);
}
else if (type === 'update' && hasClickhouseUpdate) {
chScheduledOk = await this.updateClickhouse(task);
}
else if (type === 'delete' && hasClickhouseDelete) {
Expand Down Expand Up @@ -422,8 +430,10 @@ class MutationManagerJob extends Job {
for (const task of awaiting) {
try {
if (chHealth && typeof chHealth.getMutationStatus === 'function') {
// In cluster mode, mutations target _local tables, so validation must check _local
const validationTable = isClusterMode ? task.collection + '_local' : task.collection;
// In cluster mode, mutations target _local tables, so validation must check _local.
// native_ch tasks may already have _local in collection name — avoid doubling.
const needsLocalSuffix = isClusterMode && !task.collection.endsWith('_local');
const validationTable = needsLocalSuffix ? task.collection + '_local' : task.collection;
const status = await chHealth.getMutationStatus({ validation_command_id: task.validation_command_id, table: validationTable, database: task.db });
if (status && status.is_done) {
await common.db.collection('mutation_manager').updateOne(
Expand Down Expand Up @@ -677,6 +687,95 @@ class MutationManagerJob extends Job {
}
}

/**
* Build validated ClickHouse mutation SQL with an embedded command-id for tracking.
* - Strips trailing semicolon
* - Validates ALTER TABLE ... DELETE/UPDATE ... WHERE ... shape
* - Injects tautological AND before any SETTINGS clause
* @returns Final SQL string, or null if the shape is invalid
*/
private buildValidatedNativeClickhouseSql(baseSql: string, commandId: string): string | null {
if (!baseSql || typeof baseSql !== 'string') {
return null;
}
let sql = baseSql.trim();
if (sql.endsWith(';')) {
sql = sql.slice(0, -1).trimEnd();
}
const upper = sql.toUpperCase();
if (!upper.startsWith('ALTER TABLE ')) {
return null;
}
if (!/\b(DELETE|UPDATE)\b/.test(upper)) {
return null;
}
if (!/\bWHERE\b/.test(upper)) {
return null;
}
// Find SETTINGS clause (if any) — inject command-id BEFORE it
const settingsMatch = upper.match(/\bSETTINGS\b/);
const settingsIdx = settingsMatch?.index ?? -1;
const injection = ` AND '${commandId}' = '${commandId}'`;
if (settingsIdx !== -1) {
return sql.slice(0, settingsIdx) + injection + sql.slice(settingsIdx);
}
return sql + injection;
}

/**
* Executes a native ClickHouse SQL mutation directly.
* Used for complex mutations (e.g., deduplication) that cannot be expressed as Mongo-style queries.
* Embeds validation_command_id for tracking via system.mutations.
* @param task - The mutation task with native_sql field
*/
async executeNativeClickhouse(task: MutationTask): Promise<boolean> {
if (!task.native_sql || typeof task.native_sql !== 'string') {
log.e('Skipping native CH mutation (empty sql)', { taskId: task._id });
await this.markFailedOrRetry(task, 'empty_native_sql');
return false;
}

if (!common.clickhouseQueryService) {
log.e('ClickHouse query service not available for native mutation', { taskId: task._id });
await this.markFailedOrRetry(task, 'ch_query_service_unavailable');
return false;
}

try {
const retryIndex = Number(task.fail_count || 0);
const commandId = `nm_${String(task._id)}_${retryIndex}`;

const sql = this.buildValidatedNativeClickhouseSql(task.native_sql, commandId);
if (!sql) {
log.e('Skipping native CH mutation (invalid SQL shape)', {
taskId: task._id,
native_sql: task.native_sql
});
await this.markFailedOrRetry(task, 'invalid_native_sql_shape');
return false;
}

// Persist command_id BEFORE executing mutation (crash safety: if we crash
// between execution and this update, validation can still find the command_id)
await common.db.collection('mutation_manager').updateOne(
{ _id: task._id },
{ $set: { validation_command_id: commandId } }
);

await common.clickhouseQueryService.executeMutation({ query: sql });
log.d('Native CH mutation scheduled', { taskId: task._id, commandId });
return true;
}
catch (err) {
log.e('Native CH mutation failed', {
taskId: task._id,
error: (err as Error)?.message || String(err)
});
await this.markFailedOrRetry(task, 'native_ch_error: ' + ((err as Error)?.message || err + ''));
return false;
}
}

/**
* Marks a task as failed or schedules it for a retry based on the number of previous failures.
* @param task - The task object to update.
Expand Down
62 changes: 62 additions & 0 deletions api/utils/mutationManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,68 @@ plugins.register('/core/update_granular_data', async function(ob: { db: string;
});
});

/**
* Validate that a native ClickHouse SQL statement is a safe mutation.
* Only allows: ALTER TABLE ... DELETE ... WHERE ... or ALTER TABLE ... UPDATE ... WHERE ...
* Rejects multi-statement SQL, forbidden DDL (DROP, TRUNCATE, RENAME, DETACH, ATTACH).
*/
function isSafeNativeChMutation(sql: string): string | null {
if (!sql || typeof sql !== 'string') {
return 'missing_or_invalid_sql';
}
const trimmed = sql.trim();
if (!trimmed) {
return 'empty_sql';
}
// Reject multi-statement: only allow optional trailing semicolon
const firstSemicolon = trimmed.indexOf(';');
if (firstSemicolon !== -1 && firstSemicolon !== trimmed.length - 1) {
return 'multiple_statements_not_allowed';
}
const upper = trimmed.replace(/\s+/g, ' ').toUpperCase();
// Reject forbidden DDL
if (/\b(DROP|TRUNCATE|RENAME|DETACH|ATTACH)\b/.test(upper)) {
return 'forbidden_command';
}
// Must start with ALTER TABLE
if (!upper.startsWith('ALTER TABLE ')) {
return 'only_alter_table_mutations_allowed';
}
// Must contain DELETE or UPDATE
if (!/\b(DELETE|UPDATE)\b/.test(upper)) {
return 'must_be_delete_or_update_mutation';
}
// Must have WHERE clause (required for command-id injection)
if (!upper.includes(' WHERE ')) {
return 'missing_where_clause';
}
return null;
}

plugins.register('/core/execute_native_ch_mutation', async function(ob: { sql: string; db: string; collection: string; metadata?: Record<string, unknown> }) {
const { sql, db, collection, metadata } = ob;

const validationError = isSafeNativeChMutation(sql);
if (validationError) {
const errMsg = `Native CH mutation rejected: ${validationError}`;
log.e(errMsg);
throw new Error(errMsg);
}
log.d('Mutation (native_ch) queued:' + JSON.stringify({ db, collection }));
const now = Date.now();

await common.db.collection('mutation_manager').insertOne({
db,
collection,
type: 'native_ch',
native_sql: sql,
query: metadata || {},
ts: now,
status: MUTATION_STATUS.QUEUED,
running: false
});
});

plugins.register('/system/observability/collect', async function(ob: { params?: Params }): Promise<ObservabilityResult> {
try {
const filters = buildQueueFilters(ob && ob.params);
Expand Down
Loading
Loading