Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -826,12 +826,17 @@ function runtests(mod::Module, args::ParsedArgs;
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
!isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))")
workers = fill(nothing, jobs)
sem = Base.Semaphore(max(1, jobs))
worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs)
for _ in 1:jobs
put!(worker_pool, nothing)
end

t0 = time()
results = []
running_tests = Dict{String, Float64}() # test => start_time
test_lock = ReentrantLock() # to protect crucial access to tests and running_tests
results_lock = ReentrantLock() # to protect concurrent access to results

worker_tasks = Task[]

Expand Down Expand Up @@ -887,7 +892,7 @@ function runtests(mod::Module, args::ParsedArgs;
# only draw if we have something to show
isempty(running_tests) && return
completed = length(results)
total = completed + length(tests) + length(running_tests)
total = length(tests)

# line 1: empty line
line1 = ""
Expand Down Expand Up @@ -922,6 +927,9 @@ function runtests(mod::Module, args::ParsedArgs;
end
## yet-to-run
for test in tests
haskey(running_tests, test) && continue
# Test is in any completed test
any(r -> test == r.test, results) && continue
est_remaining += get(historical_durations, test, est_per_test)
end

Expand Down Expand Up @@ -1004,31 +1012,34 @@ function runtests(mod::Module, args::ParsedArgs;
end
isa(ex, InterruptException) || rethrow()
finally
if isempty(tests) && isempty(running_tests)
if isempty(running_tests) && length(results) >= length(tests)
# XXX: only erase the status if we completed successfully.
# in other cases we'll have printed "caught interrupt"
clear_status()
end
end
end


#
# execution
#

for p in workers
push!(worker_tasks, @async begin
while !done
# get a test to run
test, test_t0 = Base.@lock test_lock begin
isempty(tests) && break
test = popfirst!(tests)
tests_to_start = Threads.Atomic{Int}(length(tests))
@sync for test in tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making this @sync, isn't the loop immediately at the start of finalization pretty much dead code?
It's also a bit of a regression; we used to handle unknown errors more gracefully (catching worker errors vs having @sync throw them AFAIU).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that wrapping the @sync for loop with the try/catch block at

try
while true
if any(istaskfailed, worker_tasks)
println(io_ctx.stderr, "\nCaught an error, stopping...")
break
elseif done || Base.@lock(test_lock, isempty(running_tests) && length(results) >= length(tests))
break
end
sleep(1)
end
catch err
# in case the sleep got interrupted
isa(err, InterruptException) || rethrow()
finally
stop_work()
end
(without the while loop in there) would address your concern? Doing test locally, I get same behaviour as on main when Ctrl+C'ing the tests.

I'll try to craft a regression test, the things I tried so far failed to reproduce the catching of the error at

println(io_ctx.stderr, "\nCaught an error, stopping...")
even on main (for example I tried to run the tests in a subprocess, and sending to it a Base.SIGINT, but that doesn't seem to hit that error handling), so I need to come up with a different idea.

push!(worker_tasks, Threads.@spawn begin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't switching from @async to potential multithreaded execution require locks for thread-safety, e.g., for the concurrent access to results?

Copy link
Collaborator Author

@giordano giordano Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a lock for results. Searching for ! within this for loop, push!(results,...) seemed to me the only potentially risky. put!-ting and take!-ing Channels should be safe, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And those locks are probably already needed, the bugs just show up more infrequent. E.g. @async and @spawn need the same locks.

local p = nothing
acquired = false
try
Base.acquire(sem)
acquired = true
p = take!(worker_pool)
Threads.atomic_sub!(tests_to_start, 1)

done && return

test_t0 = Base.@lock test_lock begin
test_t0 = time()
running_tests[test] = test_t0

test, test_t0
end

# pass in init_worker_code to custom worker function if defined
Expand All @@ -1055,21 +1066,21 @@ function runtests(mod::Module, args::ParsedArgs;
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
stop_work()
break
return
end

ex
end
test_t1 = time()
output = String(take!(wrkr.io))
push!(results, (test, result, output, test_t0, test_t1))
Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1))

# act on the results
if result isa AbstractTestRecord
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
if anynonpass(result[]) && args.quickfail !== nothing
stop_work()
break
return
end

if memory_usage(result) > max_worker_rss
Expand All @@ -1083,7 +1094,7 @@ function runtests(mod::Module, args::ParsedArgs;
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if args.quickfail !== nothing
stop_work()
break
return
end

# the worker encountered some serious failure, recycle it
Expand All @@ -1098,14 +1109,22 @@ function runtests(mod::Module, args::ParsedArgs;
Base.@lock test_lock begin
delete!(running_tests, test)
end
end
if p !== nothing
Malt.stop(p)
catch ex
isa(ex, InterruptException) || rethrow()
finally
if acquired
# stop the worker if no more tests will need one from the pool
if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p)
Malt.stop(p)
p = nothing
end
put!(worker_pool, p)
Base.release(sem)
end
end
end)
end


#
# finalization
#
Expand All @@ -1116,7 +1135,7 @@ function runtests(mod::Module, args::ParsedArgs;
if any(istaskfailed, worker_tasks)
println(io_ctx.stderr, "\nCaught an error, stopping...")
break
elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests))
elseif done || Base.@lock(test_lock, isempty(running_tests) && length(results) >= length(tests))
break
end
sleep(1)
Expand Down Expand Up @@ -1146,6 +1165,14 @@ function runtests(mod::Module, args::ParsedArgs;
end
end

# clean up remaining workers in the pool
close(worker_pool)
for p in worker_pool
if p !== nothing && Malt.isrunning(p)
Malt.stop(p)
end
end

# print the output generated by each testset
for (testname, result, output, _start, _stop) in results
if !isempty(output)
Expand Down Expand Up @@ -1227,7 +1254,7 @@ function runtests(mod::Module, args::ParsedArgs;
end

# mark remaining or running tests as interrupted
for test in [tests; collect(keys(running_tests))]
for test in tests
(test in completed_tests) && continue
testset = create_testset(test)
Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1)))
Expand Down
Loading