-
Notifications
You must be signed in to change notification settings - Fork 7
Use Base.Semaphore to control test execution parallelism #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cbe6f9b
62bb315
7ee3779
a0fc2a3
7c39cf5
b28a3be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -826,12 +826,17 @@ function runtests(mod::Module, args::ParsedArgs; | |
| jobs = clamp(jobs, 1, length(tests)) | ||
| println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") | ||
| !isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))") | ||
| workers = fill(nothing, jobs) | ||
| sem = Base.Semaphore(max(1, jobs)) | ||
| worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs) | ||
| for _ in 1:jobs | ||
| put!(worker_pool, nothing) | ||
| end | ||
|
|
||
| t0 = time() | ||
| results = [] | ||
| running_tests = Dict{String, Float64}() # test => start_time | ||
| test_lock = ReentrantLock() # to protect crucial access to tests and running_tests | ||
| results_lock = ReentrantLock() # to protect concurrent access to results | ||
|
|
||
| worker_tasks = Task[] | ||
|
|
||
|
|
@@ -887,7 +892,7 @@ function runtests(mod::Module, args::ParsedArgs; | |
| # only draw if we have something to show | ||
| isempty(running_tests) && return | ||
| completed = length(results) | ||
| total = completed + length(tests) + length(running_tests) | ||
| total = length(tests) | ||
|
|
||
| # line 1: empty line | ||
| line1 = "" | ||
|
|
@@ -922,6 +927,9 @@ function runtests(mod::Module, args::ParsedArgs; | |
| end | ||
| ## yet-to-run | ||
| for test in tests | ||
| haskey(running_tests, test) && continue | ||
| # Test is in any completed test | ||
| any(r -> test == r.test, results) && continue | ||
| est_remaining += get(historical_durations, test, est_per_test) | ||
| end | ||
|
|
||
|
|
@@ -1004,31 +1012,34 @@ function runtests(mod::Module, args::ParsedArgs; | |
| end | ||
| isa(ex, InterruptException) || rethrow() | ||
| finally | ||
| if isempty(tests) && isempty(running_tests) | ||
| if isempty(running_tests) && length(results) >= length(tests) | ||
| # XXX: only erase the status if we completed successfully. | ||
| # in other cases we'll have printed "caught interrupt" | ||
| clear_status() | ||
| end | ||
| end | ||
| end | ||
|
|
||
|
|
||
| # | ||
| # execution | ||
| # | ||
|
|
||
| for p in workers | ||
| push!(worker_tasks, @async begin | ||
| while !done | ||
| # get a test to run | ||
| test, test_t0 = Base.@lock test_lock begin | ||
| isempty(tests) && break | ||
| test = popfirst!(tests) | ||
| tests_to_start = Threads.Atomic{Int}(length(tests)) | ||
| @sync for test in tests | ||
| push!(worker_tasks, Threads.@spawn begin | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't switching from
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a lock for
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And those locks are probably already needed, the bugs just show up more infrequent. E.g. |
||
| local p = nothing | ||
| acquired = false | ||
| try | ||
| Base.acquire(sem) | ||
| acquired = true | ||
| p = take!(worker_pool) | ||
| Threads.atomic_sub!(tests_to_start, 1) | ||
|
|
||
| done && return | ||
|
|
||
| test_t0 = Base.@lock test_lock begin | ||
| test_t0 = time() | ||
| running_tests[test] = test_t0 | ||
|
|
||
| test, test_t0 | ||
| end | ||
|
|
||
| # pass in init_worker_code to custom worker function if defined | ||
|
|
@@ -1055,21 +1066,21 @@ function runtests(mod::Module, args::ParsedArgs; | |
| if isa(ex, InterruptException) | ||
| # the worker got interrupted, signal other tasks to stop | ||
| stop_work() | ||
| break | ||
| return | ||
| end | ||
|
|
||
| ex | ||
| end | ||
| test_t1 = time() | ||
| output = String(take!(wrkr.io)) | ||
| push!(results, (test, result, output, test_t0, test_t1)) | ||
| Base.@lock results_lock push!(results, (; test, result, output, test_t0, test_t1)) | ||
|
|
||
| # act on the results | ||
| if result isa AbstractTestRecord | ||
| put!(printer_channel, (:finished, test, worker_id(wrkr), result)) | ||
| if anynonpass(result[]) && args.quickfail !== nothing | ||
| stop_work() | ||
| break | ||
| return | ||
| end | ||
|
|
||
| if memory_usage(result) > max_worker_rss | ||
|
|
@@ -1083,7 +1094,7 @@ function runtests(mod::Module, args::ParsedArgs; | |
| put!(printer_channel, (:crashed, test, worker_id(wrkr))) | ||
| if args.quickfail !== nothing | ||
| stop_work() | ||
| break | ||
| return | ||
| end | ||
|
|
||
| # the worker encountered some serious failure, recycle it | ||
|
|
@@ -1098,14 +1109,22 @@ function runtests(mod::Module, args::ParsedArgs; | |
| Base.@lock test_lock begin | ||
| delete!(running_tests, test) | ||
| end | ||
| end | ||
| if p !== nothing | ||
| Malt.stop(p) | ||
| catch ex | ||
| isa(ex, InterruptException) || rethrow() | ||
| finally | ||
| if acquired | ||
| # stop the worker if no more tests will need one from the pool | ||
| if tests_to_start[] == 0 && p !== nothing && Malt.isrunning(p) | ||
| Malt.stop(p) | ||
| p = nothing | ||
| end | ||
| put!(worker_pool, p) | ||
| Base.release(sem) | ||
| end | ||
| end | ||
| end) | ||
| end | ||
|
|
||
|
|
||
| # | ||
| # finalization | ||
| # | ||
|
|
@@ -1116,7 +1135,7 @@ function runtests(mod::Module, args::ParsedArgs; | |
| if any(istaskfailed, worker_tasks) | ||
| println(io_ctx.stderr, "\nCaught an error, stopping...") | ||
| break | ||
| elseif done || Base.@lock(test_lock, isempty(tests) && isempty(running_tests)) | ||
| elseif done || Base.@lock(test_lock, isempty(running_tests) && length(results) >= length(tests)) | ||
| break | ||
| end | ||
| sleep(1) | ||
|
|
@@ -1146,6 +1165,14 @@ function runtests(mod::Module, args::ParsedArgs; | |
| end | ||
| end | ||
|
|
||
| # clean up remaining workers in the pool | ||
| close(worker_pool) | ||
| for p in worker_pool | ||
| if p !== nothing && Malt.isrunning(p) | ||
| Malt.stop(p) | ||
| end | ||
| end | ||
|
|
||
| # print the output generated by each testset | ||
| for (testname, result, output, _start, _stop) in results | ||
| if !isempty(output) | ||
|
|
@@ -1227,7 +1254,7 @@ function runtests(mod::Module, args::ParsedArgs; | |
| end | ||
|
|
||
| # mark remaining or running tests as interrupted | ||
| for test in [tests; collect(keys(running_tests))] | ||
| for test in tests | ||
| (test in completed_tests) && continue | ||
| testset = create_testset(test) | ||
| Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1))) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By making this
@sync, isn't the loop immediately at the start of finalization pretty much dead code?It's also a bit of a regression; we used to handle unknown errors more gracefully (catching worker errors vs having
@syncthrow them AFAIU).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that wrapping the
@sync forloop with thetry/catchblock atParallelTestRunner.jl/src/ParallelTestRunner.jl
Lines 1133 to 1148 in b28a3be
whileloop in there) would address your concern? Doing test locally, I get same behaviour as onmainwhen Ctrl+C'ing the tests.I'll try to craft a regression test, the things I tried so far failed to reproduce the catching of the error at
ParallelTestRunner.jl/src/ParallelTestRunner.jl
Line 1136 in b28a3be
main(for example I tried to run the tests in a subprocess, and sending to it aBase.SIGINT, but that doesn't seem to hit that error handling), so I need to come up with a different idea.