Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions redis/push.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
local master_status_key = KEYS[1]
local queue_key = KEYS[2]
local total_key = KEYS[3]
local current_generation_key = KEYS[4]

local expected_lock_value = ARGV[1]
local total_count = ARGV[2]
local generation_uuid = ARGV[3]
local redis_ttl = ARGV[4]

if redis.call('GET', master_status_key) ~= expected_lock_value then
return 0
end

if #ARGV > 4 then
local tests = {}
for i = 5, #ARGV do
tests[#tests + 1] = ARGV[i]
end
redis.call('LPUSH', queue_key, unpack(tests))
end

redis.call('SET', total_key, total_count)
redis.call('SET', master_status_key, 'ready')
redis.call('SET', current_generation_key, generation_uuid)

redis.call('EXPIRE', queue_key, redis_ttl)
redis.call('EXPIRE', total_key, redis_ttl)
redis.call('EXPIRE', master_status_key, redis_ttl)
redis.call('EXPIRE', current_generation_key, redis_ttl)

return 1
9 changes: 9 additions & 0 deletions redis/refresh_master_lock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local master_status_key = KEYS[1]
local expected_value = ARGV[1]
local ttl = ARGV[2]

if redis.call('GET', master_status_key) == expected_value then
redis.call('SET', master_status_key, expected_value, 'EX', ttl)
return 1
end
return 0
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def initialize(
timing_redis_url: nil,
heartbeat_grace_period: 30,
heartbeat_interval: 10,
master_lock_ttl: 30,
master_lock_ttl: 120,
max_election_attempts: 3
)
@build_id = build_id
Expand Down
25 changes: 18 additions & 7 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def initialize(redis_url, config)
end

def exhausted?
return false unless current_generation
queue_initialized? && size == 0
end

Expand Down Expand Up @@ -70,8 +71,12 @@ def wait_for_master(timeout: 120)

# Master lock expired during setup (died mid-population)
# Status will be nil if lock expired
if status.nil? && last_status == 'setup'
raise MasterDied, "Master lock expired during setup - master may have died"
if status.nil?
if last_status == 'setup'
raise MasterDied, "Master lock expired during setup - master may have died"
elsif !redis.exists?(key('current-generation'))
raise MasterDied, "No master has completed setup"
end
end

last_status = status
Expand Down Expand Up @@ -114,6 +119,16 @@ def master_worker_id
redis.get(key('master-worker-id'))
end

def current_generation
@generation || @current_generation
end

def generation_stale?
return false unless @current_generation
current = redis.get(key('current-generation'))
current && current != @current_generation
end

private

attr_reader :redis, :redis_url
Expand All @@ -135,7 +150,7 @@ def master_status

def generation_key(*args)
gen = @generation || @current_generation
return key(*args) unless gen # Fallback for backwards compatibility
raise "Generation not set - call learn_generation first" unless gen
key('gen', gen, *args)
end

Expand All @@ -145,10 +160,6 @@ def learn_generation
@current_generation
end

def current_generation
@generation || @current_generation
end

def eval_script(script, *args)
redis.evalsha(load_script(script), *args)
end
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/redis/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def wait_for_workers

puts "Aborting, it seems all workers died." if time_left_with_no_workers <= 0
exhausted?
rescue CI::Queue::Redis::LostMaster
rescue CI::Queue::Redis::LostMaster, CI::Queue::Redis::MasterDied
false
end

Expand Down
74 changes: 39 additions & 35 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def populate(tests, random: Random.new)

store_chunk_metadata(chunks) if chunks.any?

# Refresh lock TTL before push to prevent expiry during population
unless refresh_master_lock
@master = false
warn "Lock expired during population — another master may have taken over"
next
end

all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
end
Expand Down Expand Up @@ -106,12 +113,6 @@ def poll
idle_state_printed = false
attempt = 0
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
# Check for generation staleness - another master may have taken over
if generation_stale?
warn "Generation changed - queue was repopulated by new master. Exiting poll loop."
break
end

if id = reserve
attempt = 0
idle_since = nil
Expand All @@ -126,6 +127,12 @@ def poll
acknowledge(id)
end
else
# Check for generation staleness when idle - another master may have taken over
if generation_stale?
warn "Generation changed - queue was repopulated by new master. Exiting poll loop."
break
end

idle_since ||= CI::Queue.time_now
if CI::Queue.time_now - idle_since > 120 && !idle_state_printed
puts "Worker #{worker_id} has been idle for 120 seconds. Printing global state..."
Expand Down Expand Up @@ -289,25 +296,6 @@ def heartbeat(test_or_id = nil)

attr_reader :index

def generation_key(*args)
gen = @generation || @current_generation
raise "Generation not set - call learn_generation first" unless gen
key('gen', gen, *args)
end

def learn_generation
@current_generation = redis.get(key('current-generation'))
raise MasterDied, "No generation available - master may have died" unless @current_generation
@current_generation
end

# Check if our cached generation is stale
def generation_stale?
return false unless @current_generation
current = redis.get(key('current-generation'))
current && current != @current_generation
end

# Runs a block while sending periodic heartbeats in a background thread.
# This prevents other workers from stealing the test while it's being executed.
def with_heartbeat(test_id)
Expand Down Expand Up @@ -485,16 +473,22 @@ def push(tests)
@total = tests.size

if @master
redis.multi do |transaction|
transaction.lpush(generation_key('queue'), tests) unless tests.empty?
transaction.set(generation_key('total'), @total)
transaction.set(key('master-status'), 'ready')
transaction.set(key('current-generation'), @generation)

transaction.expire(generation_key('queue'), config.redis_ttl)
transaction.expire(generation_key('total'), config.redis_ttl)
transaction.expire(key('master-status'), config.redis_ttl)
transaction.expire(key('current-generation'), config.redis_ttl)
argv = [
"setup:#{@generation}",
@total.to_s,
@generation,
config.redis_ttl.to_s,
] + tests

result = eval_script(
:push,
keys: [key('master-status'), generation_key('queue'), generation_key('total'), key('current-generation')],
argv: argv,
)

if result == 0
@master = false
warn "Lock lost during push — another master took over (generation #{@generation})"
end
end
rescue *CONNECTION_ERRORS
Expand Down Expand Up @@ -543,6 +537,16 @@ def acquire_master_role?
false
end

def refresh_master_lock
eval_script(
:refresh_master_lock,
keys: [key('master-status')],
argv: ["setup:#{@generation}", config.master_lock_ttl.to_s],
) == 1
rescue *CONNECTION_ERRORS
false
end

def register_worker_presence
register
redis.expire(key('workers'), config.redis_ttl)
Expand Down
152 changes: 152 additions & 0 deletions ruby/test/ci/queue/redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,158 @@ def test_moving_average_updates_persist_across_workers
assert_equal 5500.0, chunks.first.estimated_duration
end

# --- Election recovery tests ---

def test_master_death_triggers_re_election
build = 'election-death'
w1 = worker(1, build_id: build, master_lock_ttl: 1, max_election_attempts: 3)
assert_predicate w1, :master?

# Simulate master death: delete the master-status key (lock expires)
@redis.del("build:#{build}:master-status")

# Worker 2 should detect the dead master and become the new master
w2 = worker(2, build_id: build, master_lock_ttl: 1, max_election_attempts: 3)
assert_predicate w2, :master?

# Both workers should be able to poll successfully
poll(w2)
assert_predicate w2, :exhausted?
end

def test_fenced_push_rejects_stale_master
build = 'fenced-push'
w1 = worker(1, build_id: build, master_lock_ttl: 30)
assert_predicate w1, :master?

# Overwrite master-status to simulate another master winning election
other_gen = 'other-generation-uuid'
@redis.set("build:#{build}:master-status", "setup:#{other_gen}", ex: 30)

# w1 still thinks it's master, but push.lua should reject because lock value changed
w1.send(:push, ['ATest#test_foo'])

# master-status should still be the other master's value
status = @redis.get("build:#{build}:master-status")
assert_equal "setup:#{other_gen}", status, "Fenced push should not overwrite another master's status"
end

def test_generation_stale_exits_poll
build = 'gen-stale'
w1 = worker(1, build_id: build)
assert_predicate w1, :master?

w2 = worker(2, build_id: build)
refute_predicate w2, :master?

# Drain the queue with w1 so w2 sees no tests (idle), but not exhausted
poll(w1)

# Change generation before w2 polls — w2 will be idle and detect staleness
@redis.set("build:#{build}:current-generation", "new-generation-uuid")

tests_seen = []
w2.poll do |test|
tests_seen << test
w2.acknowledge(test)
end

assert_equal 0, tests_seen.size, "Worker should exit poll immediately when generation is stale"
end

def test_learn_generation_raises_when_key_missing
build = 'learn-gen-missing'
w1 = worker(1, build_id: build)
assert_predicate w1, :master?

# Delete the current-generation key
@redis.del("build:#{build}:current-generation")

# A non-master worker trying to learn generation should raise MasterDied
assert_raises(CI::Queue::Redis::MasterDied) do
w2 = worker(2, build_id: build, populate: false)
w2.send(:learn_generation)
end
end

def test_max_election_attempts_raises_lost_master
build = 'max-attempts'

# Worker 1 wins master and starts setup, then dies (lock expires)
# We simulate this by having another process hold setup then expire
# To prevent the test worker from winning master, keep re-setting the key
# so it always sees "setup" then nil (death)
t = Thread.new do
loop do
# Keep setting a short-lived setup lock so the worker always sees a dying master
@redis.set("build:#{build}:master-status", "setup:dead-gen", px: 50)
sleep 0.06
end
end

assert_raises(CI::Queue::Redis::LostMaster) do
worker(1, build_id: build, max_election_attempts: 1, queue_init_timeout: 0.5)
end
ensure
t&.kill
end

def test_build_record_reads_generation_scoped_requeue_key
build = 'requeue-gen'
w1 = worker(1, build_id: build, max_requeues: 1, requeue_tolerance: 1.0)

w1.poll do |test|
w1.report_failure!
unless w1.requeue(test)
w1.acknowledge(test)
end
end

requeues = w1.build.requeued_tests
refute_empty requeues, "Should have requeued at least one test"
end

def test_supervisor_handles_master_died
build = 'supervisor-died'
supervisor = CI::Queue::Redis::Supervisor.new(
@redis_url,
CI::Queue::Configuration.new(
build_id: build,
worker_id: 'sup',
timeout: 0.2,
queue_init_timeout: 0.3,
timing_redis_url: @redis_url,
)
)

# wait_for_workers should return false (not crash) when no master exists
result = supervisor.wait_for_workers
refute result, "Supervisor should return false when master never appeared"
end

def test_wait_for_master_detects_immediate_nil_status
build = 'nil-status'

w = CI::Queue::Redis.new(
@redis_url,
CI::Queue::Configuration.new(
build_id: build,
worker_id: '2',
timeout: 0.2,
queue_init_timeout: 0.5,
timing_redis_url: @redis_url,
)
)

# Simulate that status was "setup" then expired (nil)
@redis.set("build:#{build}:master-status", "setup:some-gen", px: 1)
sleep 0.01 # Let it expire

assert_raises(CI::Queue::Redis::MasterDied) do
w.send(:wait_for_master, timeout: 0.5)
end
end

private

class MockTest
Expand Down
Loading