Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -932,15 +932,15 @@ function runtests(mod::Module, args::ParsedArgs;
filter_tests!(testsuite, args)

# determine test order
tests = Lockable(collect(keys(testsuite)))
@lock tests Random.shuffle!(tests[])
tests = collect(keys(testsuite))
Random.shuffle!(tests)
historical_durations = load_test_history(mod)
@lock tests sort!(tests[], by = x -> -get(historical_durations, x, Inf))
sort!(tests, by = x -> -get(historical_durations, x, Inf))

# determine parallelism
jobs = something(args.jobs, default_njobs())
jobs = @lock tests clamp(jobs, 1, length(tests[]))
@lock tests println(stdout, "Running $(length(tests[])) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $(length(tests)) tests using $jobs parallel jobs. If this is too many concurrent jobs, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
!isnothing(args.verbose) && println(stdout, "Available memory: $(Base.format_bytes(available_memory()))")
sem = Base.Semaphore(max(1, jobs))
worker_pool = Channel{Union{Nothing, PTRWorker}}(jobs)
Expand Down Expand Up @@ -974,10 +974,10 @@ function runtests(mod::Module, args::ParsedArgs;
# pretty print information about gc and mem usage
testgroupheader = "Test"
workerheader = "(Worker)"
name_align = @lock tests maximum(
name_align = maximum(
[
textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader);
map(x -> textwidth(x) + 5, tests[])
map(x -> textwidth(x) + 5, tests)
]
)

Expand All @@ -1003,16 +1003,19 @@ function runtests(mod::Module, args::ParsedArgs;
end

function update_status()
# only draw if we have something to show
@lock(running_tests, isempty(running_tests[])) && return
completed = @lock results length(results[])
total = @lock tests length(tests[])
# take consistent snapshots once, so the rest of this function operates on
# frozen data rather than racing with workers that mutate these collections
running_snapshot = @lock running_tests copy(running_tests[])
isempty(running_snapshot) && return
results_snapshot = @lock results copy(results[])
completed = length(results_snapshot)
total = length(tests)

# line 1: empty line
line1 = ""

# line 2: running tests
test_list = @lock running_tests sort(collect(keys(running_tests[])), by = x -> running_tests[][x])
test_list = sort(collect(keys(running_snapshot)), by = x -> running_snapshot[x])
status_parts = map(test_list) do test
"$test"
end
Expand All @@ -1027,23 +1030,23 @@ function runtests(mod::Module, args::ParsedArgs;
line3 = "Progress: $completed/$total tests completed"
if completed > 0
# estimate per-test time (slightly pessimistic)
durations_done = @lock results [end_time - start_time for (_, _,_, start_time, end_time) in results[]]
durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results_snapshot]
μ = mean(durations_done)
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
est_per_test = μ + 0.5σ

est_remaining = 0.0
## currently-running
for (test, start_time) in @lock(running_tests, running_tests[])
for (test, start_time) in running_snapshot
elapsed = time() - start_time
duration = get(historical_durations, test, est_per_test)
est_remaining += max(0.0, duration - elapsed)
end
## yet-to-run
for test in @lock(tests, tests[])
@lock(running_tests, haskey(running_tests[], test)) && continue
for test in tests
haskey(running_snapshot, test) && continue
# Test is in any completed test
@lock(results, any(r -> test == r.test, results[])) && continue
any(r -> test == r.test, results_snapshot) && continue
est_remaining += get(historical_durations, test, est_per_test)
end

Expand Down Expand Up @@ -1126,7 +1129,9 @@ function runtests(mod::Module, args::ParsedArgs;
end
isa(ex, InterruptException) || rethrow()
finally
if @lock running_tests @lock results @lock tests isempty(running_tests[]) && length(results[]) >= length(tests[])
n_running = @lock running_tests length(running_tests[])
n_results = @lock results length(results[])
if n_running == 0 && n_results >= length(tests)
# XXX: only erase the status if we completed successfully.
# in other cases we'll have printed "caught interrupt"
clear_status()
Expand All @@ -1138,9 +1143,9 @@ function runtests(mod::Module, args::ParsedArgs;
# execution
#

tests_to_start = @lock tests Threads.Atomic{Int}(length(tests[]))
tests_to_start = Threads.Atomic{Int}(length(tests))
try
@sync for test in @lock(tests, tests[])
@sync for test in tests
push!(worker_tasks, Threads.@spawn begin
local p = nothing
acquired = false
Expand Down Expand Up @@ -1360,7 +1365,7 @@ function runtests(mod::Module, args::ParsedArgs;
end

# mark remaining or running tests as interrupted
for test in @lock(tests, tests[])
for test in tests
(test in completed_tests) && continue
testset = create_testset(test)
Test.record(testset, Test.Error(:test_interrupted, test, nothing, Base.ExceptionStack(NamedTuple[(;exception = "skipped", backtrace = [])]), LineNumberNode(1)))
Expand Down