Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
16149b6
test: reproduce missing batch finally dispatch on terminal failure
elpete Feb 12, 2026
eb25679
fix: complete batches correctly when jobs end in failure
elpete Feb 12, 2026
1f13780
fix: make batch name optional and nullable
elpete Feb 12, 2026
b1e529b
test: load lib jars in test app and require time UUID generator
elpete Feb 12, 2026
6245c23
breaking: require successfulJobs and add batch count coverage
elpete Feb 12, 2026
53655da
Apply cfformat changes
elpete Feb 12, 2026
9389595
Do not change `failedJobIds` except for incrementing failed jobs
elpete Feb 12, 2026
3b36a73
v6.0.0-beta.1
elpete Feb 12, 2026
c7a32e2
fix: use availableDate instead of reservedDate for timeout watcher
elpete Apr 6, 2026
3f93107
v6.0.0-beta.2
elpete Apr 6, 2026
4d012f6
fix: set job attempt count in ColdBoxAsyncProvider and tighten tryToL…
elpete Apr 6, 2026
aa8bce1
6.0.0-beta.3
elpete Apr 6, 2026
7ea7222
fix: remove skip locked from DB timeout watcher query
elpete Apr 6, 2026
8574754
chore: upgrade CI to MySQL 8 and re-enable skip locked
elpete Apr 6, 2026
202c5d3
fix: remove invalid mysql docker flag in workflow services
elpete Apr 6, 2026
2320e5d
fix: set mysql8 test user auth plugin via init script
elpete Apr 6, 2026
afe2e04
fix: configure mysql8 auth plugin in workflow step
elpete Apr 6, 2026
f645060
Apply cfformat changes
elpete Apr 20, 2026
48cf9b3
fix: complete batches correctly when jobs end in failure
elpete Feb 12, 2026
eafde20
fix: make batch name optional and nullable
elpete Feb 12, 2026
32e619d
test: load lib jars in test app and require time UUID generator
elpete Feb 12, 2026
123e95d
breaking: require successfulJobs and add batch count coverage
elpete Feb 12, 2026
5772b0a
Apply cfformat changes
elpete Feb 12, 2026
9ee8593
Do not change `failedJobIds` except for incrementing failed jobs
elpete Feb 12, 2026
2a28ef8
fix: use availableDate instead of reservedDate for timeout watcher
elpete Apr 6, 2026
c5d465d
fix: set job attempt count in ColdBoxAsyncProvider and tighten tryToL…
elpete Apr 6, 2026
ce249d2
fix: remove skip locked from DB timeout watcher query
elpete Apr 6, 2026
0e7e077
chore: upgrade CI to MySQL 8 and re-enable skip locked
elpete Apr 6, 2026
03a33c8
fix: remove invalid mysql docker flag in workflow services
elpete Apr 6, 2026
76e4553
fix: set mysql8 test user auth plugin via init script
elpete Apr 6, 2026
d0f6627
fix: configure mysql8 auth plugin in workflow step
elpete Apr 6, 2026
b241236
refactor: extract processLockedRecord for testability and add max-att…
elpete Apr 18, 2026
f639c98
test: assert markJobFailed called (not DB row) when releaseJob throws
elpete Apr 19, 2026
bef5b30
test: use real subclass fixture to test releaseJob-throws path
elpete Apr 20, 2026
7046e69
Apply cfformat changes
elpete Apr 20, 2026
8e98a09
chore: include test model fixtures in cfformat script
elpete Apr 20, 2026
43fc2b5
v6.0.0-beta.4
elpete Apr 20, 2026
7ca5f8d
Fix DB max attempts failure logging
elpete May 5, 2026
ce43ca1
v6.0.0-beta.5
elpete May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion box.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name":"cbq",
"version":"5.0.8",
"version":"6.0.0-beta.5",
"author":"Eric Peterson <eric@elpete.com>",
"location":"forgeboxStorage",
"homepage":"https://github.com/coldbox-modules/cbq",
Expand Down
34 changes: 15 additions & 19 deletions models/Providers/DBProvider.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ component accessors="true" extends="AbstractQueueProvider" {
{ "job" : jobCFC.getMemento() }
);
}
markJobAsFailedById( arguments.record.id, arguments.pool );
ensureFailedBatchJobIsRecorded(
markJobFailed(
jobCFC,
"Job exceeded maximum attempts (#jobMaxAttempts#) before execution."
arguments.pool,
buildMaxAttemptsReachedException( jobCFC, jobMaxAttempts )
);
return;
}
Expand Down Expand Up @@ -349,22 +349,7 @@ component accessors="true" extends="AbstractQueueProvider" {
newQuery()
.table( variables.tableName )
.where( "id", arguments.id )
.update(
values = {
"failedDate" : getCurrentUnixTimestamp(),
"reservedBy" : {
"value" : "",
"null" : true,
"nulls" : true
},
"reservedDate" : {
"value" : "",
"null" : true,
"nulls" : true
}
},
options = variables.defaultQueryOptions
);
.update( values = { "failedDate" : getCurrentUnixTimestamp() }, options = variables.defaultQueryOptions );
}

private void function markJobAsFailedById( required numeric id, WorkerPool pool ) {
Expand All @@ -381,6 +366,17 @@ component accessors="true" extends="AbstractQueueProvider" {
.update( values = { "failedDate" : getCurrentUnixTimestamp() }, options = variables.defaultQueryOptions );
}

private any function buildMaxAttemptsReachedException( required AbstractJob job, required numeric maxAttempts ) {
try {
throw(
type = "cbq.MaxAttemptsReached",
message = "Job ###arguments.job.getId()# exceeded maximum attempts (#arguments.maxAttempts#) before execution."
);
} catch ( any e ) {
return e;
}
}

public void function releaseJob( required AbstractJob job, required WorkerPool pool ) {
arguments.job.setCurrentAttempt( arguments.job.getCurrentAttempt() + 1 );
newQuery()
Expand Down
146 changes: 138 additions & 8 deletions tests/specs/integration/Providers/DBProviderMaxAttemptsSpec.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,44 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {
function run() {
describe( "DBProvider maxAttempts safeguards", function() {
beforeEach( function() {
variables.provider = getWireBox().getInstance( "DBProvider@cbq" ).setProperties( {} );
variables.workerPools = [];
variables.provider = getWireBox()
.buildInstance( getWireBox().getBinder().getMapping( "DBProvider@cbq" ) )
.setProperties( {} );
getWireBox().autowire(
target = variables.provider,
mapping = getWireBox().getBinder().getMapping( "DBProvider@cbq" )
);
makePublic( variables.provider, "processLockedRecord" );
variables.pool = makeWorkerPool( variables.provider );
variables.cbqSettings = getController().getModuleSettings( "cbq" );
variables.originalLogFailedJobs = variables.cbqSettings.logFailedJobs;
variables.provider
.newQuery()
.table( "cbq_jobs" )
.delete();
variables.provider
.newQuery()
.table( "cbq_failed_jobs" )
.delete();
} );

afterEach( function() {
for ( var pool in variables.workerPools ) {
pool.shutdown( force = true, timeout = 1 );
}
variables.cbqSettings.logFailedJobs = variables.originalLogFailedJobs;
variables.provider
.newQuery()
.table( "cbq_jobs" )
.delete();
variables.provider
.newQuery()
.table( "cbq_failed_jobs" )
.delete();
} );

it( "forceFailJob sets failedDate and clears the reservation", function() {
it( "forceFailJob sets failedDate and preserves the reservation", function() {
var job = getWireBox().getInstance( "SendWelcomeEmailJob" ).setMaxAttempts( 3 );
variables.provider.push( "default", job );

Expand Down Expand Up @@ -49,8 +70,8 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {

expect( row.failedDate ).notToBeNull( "failedDate should be set" );
expect( row.failedDate ).toBeGT( 0, "failedDate should be a unix timestamp" );
expect( row.reservedBy ?: "" ).toBe( "", "reservedBy should be cleared" );
expect( row.reservedDate ?: "" ).toBe( "", "reservedDate should be cleared" );
expect( row.reservedBy ?: "" ).toBe( variables.pool.getUniqueId(), "reservedBy should be preserved" );
expect( row.reservedDate ).toBe( now, "reservedDate should be preserved" );
} );

it( "skips dispatch and marks the job failed when attempts already meets maxAttempts", function() {
Expand Down Expand Up @@ -98,6 +119,93 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {
.where( "id", record.id )
.first();
expect( row.failedDate ).notToBeNull( "the runaway job should be marked failed" );
expect( row.reservedBy ?: "" ).toBe(
variables.pool.getUniqueId(),
"reservedBy should be preserved after terminal failure"
);
expect( row.reservedDate ?: "" ).toBe(
"",
"reservedDate should remain unchanged after terminal failure"
);
} );

it( "logs a failed job when a timeout retry already meets maxAttempts before dispatch", function() {
variables.cbqSettings.logFailedJobs = true;
var job = getWireBox().getInstance( "AlwaysErrorJob" ).setMaxAttempts( 3 );
variables.provider.push( "default", job );

var now = javacast( "long", getTickCount() / 1000 );
variables.provider
.newQuery()
.table( "cbq_jobs" )
.update( {
"reservedBy" : variables.pool.getUniqueId(),
"reservedDate" : {
"value" : "",
"null" : true,
"nulls" : true
},
"availableDate" : now - 1,
"attempts" : 3
} );

var record = variables.provider
.newQuery()
.from( "cbq_jobs" )
.first();

variables.provider.processLockedRecord( record, variables.pool );

var failedLog = variables.provider
.newQuery()
.from( "cbq_failed_jobs" )
.first();

expect( failedLog ).notToBeNull(
"terminal maxAttempts failures discovered by the timeout watcher should be visible in the failed jobs log"
);
expect( failedLog.originalId ).toBe( record.id );
expect( failedLog.exceptionType ).toBe( "cbq.MaxAttemptsReached" );
expect( failedLog.exceptionMessage ).toInclude( "exceeded maximum attempts" );
} );

it( "persists the terminal attempt before failing a job that reaches maxAttempts during execution", function() {
var job = getWireBox().getInstance( "AlwaysErrorJob" ).setMaxAttempts( 3 );
variables.provider.push( "default", job );

var now = javacast( "long", getTickCount() / 1000 );
variables.provider
.newQuery()
.table( "cbq_jobs" )
.update( {
"reservedBy" : variables.pool.getUniqueId(),
"reservedDate" : {
"value" : "",
"null" : true,
"nulls" : true
},
"availableDate" : now - 1,
"attempts" : 2
} );

var record = variables.provider
.newQuery()
.from( "cbq_jobs" )
.first();

variables.provider.processLockedRecord( record, variables.pool );
var row = waitForFailedJobRow( record.id );

expect( row.failedDate ).notToBeNull( "the third failed run should mark the row failed" );
expect( row.attempts ).toBe( 3, "the terminal third run should be reflected in the attempts column" );
expect( row.reservedBy ?: "" ).toBe(
variables.pool.getUniqueId(),
"reservedBy should be preserved after terminal failure"
);
expect( row.reservedDate ?: "" ).notToBe(
"",
"reservedDate should be preserved after terminal failure"
);
} );

it( "still proceeds normally when attempts is below maxAttempts", function() {
Expand Down Expand Up @@ -151,7 +259,6 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {
.newQuery()
.table( "cbq_jobs" )
.delete();

var job = getWireBox()
.getInstance( "AlwaysErrorJob" )
.setMaxAttempts( 5 )
Expand Down Expand Up @@ -253,16 +360,39 @@ component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {
.value( "id" );
}

private struct function waitForFailedJobRow( required numeric id ) {
for ( var i = 1; i <= 20; i++ ) {
var row = variables.provider
.newQuery()
.from( "cbq_jobs" )
.where( "id", arguments.id )
.first();
if ( !isNull( row.failedDate ) && row.attempts == 3 ) {
return row;
}
sleep( 100 );
}

return variables.provider
.newQuery()
.from( "cbq_jobs" )
.where( "id", arguments.id )
.first();
}

private any function makeWorkerPool( required any provider ) {
var uniqueName = createUUID();
var connection = getInstance( "QueueConnection@cbq" )
.setName( "TestMaxAttemptsConnection" )
.setName( "TestMaxAttemptsConnection-#uniqueName#" )
.setProvider( arguments.provider );

return getInstance( "WorkerPool@cbq" )
.setName( "TestMaxAttemptsPool" )
var pool = getInstance( "WorkerPool@cbq" )
.setName( "TestMaxAttemptsPool-#uniqueName#" )
.setConnection( connection )
.setConnectionName( connection.getName() )
.startWorkers();
variables.workerPools.append( pool );
return pool;
}

}
Loading