Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelTestRunner"
uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc"
authors = ["Valentin Churavy <v.churavy@gmail.com>"]
version = "2.0.2"
version = "2.1.0"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand All @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[compat]
Dates = "1"
IOCapture = "0.2.5, 1"
Malt = "1.3.0"
Malt = "1.4.0"
Printf = "1"
Random = "1"
Scratch = "1.3.0"
Expand Down
116 changes: 70 additions & 46 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ function anynonpass(ts::Test.AbstractTestSet)
end
end

const ID_COUNTER = Threads.Atomic{Int}(0)

# Thin wrapper around Malt.Worker, to handle the stdio loop differently.
struct PTRWorker <: Malt.AbstractWorker
w::Malt.Worker
io::IOBuffer
id::Int
end

function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[])
io = IOBuffer()
wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false)
stdio_loop(wrkr, io)
id = ID_COUNTER[] += 1
return PTRWorker(wrkr, io, id)
end

worker_id(wrkr::PTRWorker) = wrkr.id
Malt.isrunning(wrkr::PTRWorker) = Malt.isrunning(wrkr.w)
Malt.stop(wrkr::PTRWorker) = Malt.stop(wrkr.w)

#Always set the max rss so that if tests add large global variables (which they do) we don't make the GC's life too hard
if Sys.WORD_SIZE == 64
const JULIA_TEST_MAXRSS_MB = 3800
Expand Down Expand Up @@ -57,7 +78,6 @@ abstract type AbstractTestRecord end

struct TestRecord <: AbstractTestRecord
value::DefaultTestSet
output::String # captured stdout/stderr

# stats
time::Float64
Expand Down Expand Up @@ -201,6 +221,25 @@ function print_test_crashed(wrkr, test, ctx::TestIOContext)
end
end

# Adapted from `Malt._stdio_loop`
function stdio_loop(worker::Malt.Worker, io)
Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stdout)
write(io, bytes)
catch
break
end
end
Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker)
try
bytes = readavailable(worker.stderr)
write(io, bytes)
catch
break
end
end
end

#
# entry point
Expand Down Expand Up @@ -236,7 +275,7 @@ function Test.finish(ts::WorkerTestSet)
return ts.wrapped_ts
end

function runtest(f, name, init_code, color)
function runtest(f, name, init_code)
function inner()
# generate a temporary module to execute the tests in
mod = @eval(Main, module $(gensym(name)) end)
Expand All @@ -252,28 +291,15 @@ function runtest(f, name, init_code, color)
GC.gc(true)
Random.seed!(1)

pipe = Pipe()
pipe_initialized = Channel{Nothing}(1)
reader = @async begin
take!(pipe_initialized)
read(pipe, String)
end
io = IOContext(pipe, :color=>$(color))
stats = redirect_stdio(; stdout=io, stderr=io) do
put!(pipe_initialized, nothing)

# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
@timed @testset WorkerTestSet "placeholder" begin
@testset DefaultTestSet $name begin
$f
end
# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
stats = @timed @testset WorkerTestSet "placeholder" begin
@testset DefaultTestSet $name begin
$f
end
end
close(pipe.in)
output = fetch(reader)
(; testset=stats.value, output, stats.time, stats.bytes, stats.gctime)
(; testset=stats.value, stats.time, stats.bytes, stats.gctime)
end

# process results
Expand Down Expand Up @@ -392,7 +418,7 @@ function save_test_history(mod::Module, history::Dict{String, Float64})
end
end

function test_exe()
function test_exe(color::Bool=false)
test_exeflags = Base.julia_cmd()
filter!(test_exeflags.exec) do c
!(startswith(c, "--depwarn") || startswith(c, "--check-bounds"))
Expand All @@ -401,16 +427,12 @@ function test_exe()
push!(test_exeflags.exec, "--startup-file=no")
push!(test_exeflags.exec, "--depwarn=yes")
push!(test_exeflags.exec, "--project=$(Base.active_project())")
push!(test_exeflags.exec, "--color=$(color ? "yes" : "no")")
return test_exeflags
end

# Map PIDs to logical worker IDs
# Malt doesn't have a global worker ID, and PID make printing ugly
const WORKER_IDS = Dict{Int32, Int32}()
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]

"""
addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing)
addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing, color::Bool=false)

Add `X` worker processes.
To add a single worker, use [`addworker`](@ref).
Expand All @@ -419,11 +441,12 @@ To add a single worker, use [`addworker`](@ref).
- `env`: Vector of environment variable pairs to set for the worker process.
- `exename`: Custom executable to use for the worker process.
- `exeflags`: Custom flags to pass to the worker process.
- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`).
"""
addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X]

"""
addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing)
addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing; color::Bool=false)

Add a single worker process.
To add multiple workers, use [`addworkers`](@ref).
Expand All @@ -432,12 +455,15 @@ To add multiple workers, use [`addworkers`](@ref).
- `env`: Vector of environment variable pairs to set for the worker process.
- `exename`: Custom executable to use for the worker process.
- `exeflags`: Custom flags to pass to the worker process.
- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`).
"""
function addworker(;
env = Vector{Pair{String, String}}(),
exename = nothing, exeflags = nothing
exename = nothing,
exeflags = nothing,
color::Bool = false,
)
exe = test_exe()
exe = test_exe(color)
if exename === nothing
exename = exe[1]
end
Expand All @@ -450,10 +476,7 @@ function addworker(;
push!(env, "JULIA_NUM_THREADS" => "1")
# Malt already sets OPENBLAS_NUM_THREADS to 1
push!(env, "OPENBLAS_NUM_THREADS" => "1")

wrkr = Malt.Worker(; exename, exeflags, env)
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
return wrkr
return PTRWorker(; exename, exeflags, env)
end

"""
Expand Down Expand Up @@ -840,7 +863,7 @@ function runtests(mod::Module, args::ParsedArgs;
line3 = "Progress: $completed/$total tests completed"
if completed > 0
# estimate per-test time (slightly pessimistic)
durations_done = [end_time - start_time for (_, _, start_time, end_time) in results]
durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results]
μ = mean(durations_done)
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
est_per_test = μ + 0.5σ
Expand Down Expand Up @@ -970,15 +993,15 @@ function runtests(mod::Module, args::ParsedArgs;
wrkr = p
end
if wrkr === nothing || !Malt.isrunning(wrkr)
wrkr = p = addworker()
wrkr = p = addworker(; io_ctx.color)
end

# run the test
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr, runtest,
testsuite[test], test, init_code, io_ctx.color)
Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr.w, runtest,
testsuite[test], test, init_code)
catch ex
if isa(ex, InterruptException)
# the worker got interrupted, signal other tasks to stop
Expand All @@ -989,7 +1012,8 @@ function runtests(mod::Module, args::ParsedArgs;
ex
end
test_t1 = time()
push!(results, (test, result, test_t0, test_t1))
output = String(take!(wrkr.io))
push!(results, (test, result, output, test_t0, test_t1))

# act on the results
if result isa AbstractTestRecord
Expand Down Expand Up @@ -1070,10 +1094,10 @@ function runtests(mod::Module, args::ParsedArgs;
@async rmprocs(; waitfor=0)

# print the output generated by each testset
for (testname, result, start, stop) in results
if isa(result, AbstractTestRecord) && !isempty(result.output)
for (testname, result, output, start, stop) in results
if !isempty(output)
println(io_ctx.stdout, "\nOutput generated during execution of '$testname':")
lines = collect(eachline(IOBuffer(result.output)))
lines = collect(eachline(IOBuffer(output)))

for (i,line) in enumerate(lines)
prefix = if length(lines) == 1
Expand Down Expand Up @@ -1122,7 +1146,7 @@ function runtests(mod::Module, args::ParsedArgs;
function collect_results()
with_testset(o_ts) do
completed_tests = Set{String}()
for (testname, result, start, stop) in results
for (testname, result, output, start, stop) in results
push!(completed_tests, testname)

if result isa AbstractTestRecord
Expand Down
40 changes: 38 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ end
end

@testset "crashing test" begin
msg = "This test will crash"
testsuite = Dict(
"abort" => quote
println($(msg))
abort() = ccall(:abort, Nothing, ())
abort()
end
Expand All @@ -192,6 +194,13 @@ end
end

str = String(take!(io))
# Make sure we can capture the output generated by the crashed process, see
# issue <https://github.com/JuliaTesting/ParallelTestRunner.jl/issues/83>.
@test contains(str, msg)
# "in expression starting at" comes from the abort trap, make sure we
# captured that as well.
@test contains(str, "in expression starting at")
# Following are messages printed by ParallelTestRunner.
@test contains(str, r"abort .+ started at")
@test contains(str, r"abort .+ crashed at")
@test contains(str, "FAILURE")
Expand All @@ -200,9 +209,10 @@ end
end

@testset "test output" begin
msg = "This is some output from the test"
testsuite = Dict(
"output" => quote
println("This is some output from the test")
println($(msg))
end
)

Expand All @@ -211,7 +221,33 @@ end

str = String(take!(io))
@test contains(str, r"output .+ started at")
@test contains(str, r"This is some output from the test")
@test contains(str, msg)
@test contains(str, "SUCCESS")

msg2 = "More output"
testsuite = Dict(
"verbose-1" => quote
print($(msg))
end,
"verbose-2" => quote
println($(msg2))
end,
"silent" => quote
@test true
end,
)
io = IOBuffer()
# Run all tests on the same worker, makre sure all the output is captured
# and attributed to the correct test set.
runtests(ParallelTestRunner, ["--verbose", "--jobs=1"]; testsuite, stdout=io, stderr=io)

str = String(take!(io))
@test contains(str, r"verbose-1 .+ started at")
@test contains(str, r"verbose-2 .+ started at")
@test contains(str, r"silent .+ started at")
@test contains(str, "Output generated during execution of 'verbose-1':\n[ $(msg)")
@test contains(str, "Output generated during execution of 'verbose-2':\n[ $(msg2)")
@test !contains(str, "Output generated during execution of 'silent':")
@test contains(str, "SUCCESS")
end

Expand Down