Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions LocalPreferences.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# When using system MPI, run once in the environment where you run MPI jobs (with MPI module loaded):
# julia --project=Dagger.jl -e 'using MPIPreferences; MPIPreferences.use_system_binary()'
# That populates abi, libmpi, mpiexec and avoids "Unknown MPI ABI nothing".
[MPIPreferences]
_format = "1.0"
abi = "MPICH"
binary = "system"
libmpi = "libmpi"
mpiexec = "mpiexec"
preloads = []
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
NextLA = "d37ed344-79c4-486d-9307-6d11355a15a3"
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
Expand Down Expand Up @@ -77,6 +78,7 @@ Graphs = "1"
JSON3 = "1"
KernelAbstractions = "0.9"
MacroTools = "0.5"
MPI = "0.20.22"
MemPool = "0.4.12"
Metal = "1.1"
NextLA = "0.2.2"
Expand Down
111 changes: 111 additions & 0 deletions benchmarks/check_comm_asymmetry.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env julia
# Parse MPI+Dagger logs and report communication decision asymmetry per tag.
# Asymmetry: for the same tag, one rank decides to send (local+bcast, sender+communicated, etc.)
# and another rank decides to infer (inferred, uninvolved) and never recv → deadlock.
#
# Usage: julia check_comm_asymmetry.jl < logfile
# Or: mpiexec -n 10 julia ... run_matmul.jl 2>&1 | tee matmul.log; julia check_comm_asymmetry.jl < matmul.log

const SEND_DECISIONS = Set([
"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast",
"aliasing", # when followed by local+bcast we already capture local+bcast
])
const RECV_DECISIONS = Set([
"communicated", "receiver", "sender+communicated", # received data
])
const INFER_DECISIONS = Set([
"inferred", "uninvolved", # did not recv (uses inferred type)
])

function parse_line(line)
# Match [rank X][tag Y] then any [...] and capture the last bracket pair before space or end
rank = nothing
tag = nothing
decision = nothing
category = nothing # aliasing, execute!, remotecall_endpoint
for m in eachmatch(r"\[rank\s+(\d+)\]", line)
rank = parse(Int, m.captures[1])
end
for m in eachmatch(r"\[tag\s+(\d+)\]", line)
tag = parse(Int, m.captures[1])
end
for m in eachmatch(r"\[(execute!|aliasing|remotecall_endpoint)\]", line)
category = m.captures[1]
end
# Decision is usually in last [...] that looks like [word] or [word+word]
for m in eachmatch(r"\]\[([^\]]+)\]", line)
candidate = m.captures[1]
# Normalize: "communicated" "inferred" "local+bcast" "sender+inferred" "receiver" etc.
if occursin("inferred", candidate) && !occursin("communicated", candidate)
decision = "inferred"
break
elseif occursin("communicated", candidate)
decision = "communicated"
break
elseif occursin("local+bcast", candidate)
decision = "local+bcast"
break
elseif occursin("sender+", candidate)
decision = startswith(candidate, "sender+inferred") ? "sender+inferred" : "sender+communicated"
break
elseif candidate == "receiver"
decision = "receiver"
break
elseif candidate == "receiver+bcast"
decision = "receiver+bcast"
break
elseif candidate == "inplace_move"
decision = "inplace_move"
break
end
end
return rank, tag, category, decision
end

function main()
# tag => Dict(rank => decision)
by_tag = Dict{Int, Dict{Int, String}}()
for line in eachline(stdin)
rank, tag, category, decision = parse_line(line)
isnothing(rank) && continue
isnothing(tag) && continue
isnothing(decision) && continue
if !haskey(by_tag, tag)
by_tag[tag] = Dict{Int, String}()
end
by_tag[tag][rank] = decision
end

# For each tag, check: is there at least one sender and one inferrer (non-receiver)?
send_keys = Set(["local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"])
infer_keys = Set(["inferred", "sender+inferred"]) # sender+inferred means sender didn't need to recv
recv_keys = Set(["communicated", "receiver", "sender+communicated"])

asymmetries = []
for (tag, ranks) in sort(collect(by_tag), by = first)
senders = [r for (r, d) in ranks if d in send_keys]
inferrers = [r for (r, d) in ranks if d in infer_keys || d == "uninvolved"]
receivers = [r for (r, d) in ranks if d in recv_keys]
# Asymmetry: someone sends (bcast) so will send to ALL other ranks; someone chose infer and won't recv.
if !isempty(senders) && !isempty(inferrers)
push!(asymmetries, (tag, senders, inferrers, receivers, ranks))
end
end

if isempty(asymmetries)
println("No communication decision asymmetry found (no tag has both sender and inferrer).")
return
end

println("=== Communication decision asymmetry (can cause deadlock) ===\n")
for (tag, senders, inferrers, receivers, ranks) in asymmetries
println("Tag $tag:")
println(" Senders (will bcast to all others): $senders")
println(" Inferrers (did not recv): $inferrers")
println(" Receivers: $receivers")
println(" All decisions: $ranks")
println()
end
end

main()
97 changes: 97 additions & 0 deletions benchmarks/check_comm_asymmetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Parse MPI+Dagger logs and report communication decision asymmetry per tag.
Asymmetry: for the same tag, one rank decides to send (local+bcast, etc.)
and another decides to infer (inferred) and never recv → deadlock.

Usage:
# Capture full log (all ranks' Core.println from mpi.jl go to stdout):
mpiexec -n 10 julia --project=/path/to/Dagger.jl benchmarks/run_matmul.jl 2>&1 | tee matmul.log
# Then look for asymmetry (same tag: one rank sends, another infers → deadlock):
python3 check_comm_asymmetry.py < matmul.log
"""

import re
import sys
from collections import defaultdict

SEND_DECISIONS = {"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"}
RECV_DECISIONS = {"communicated", "receiver", "sender+communicated"}
INFER_DECISIONS = {"inferred", "uninvolved", "sender+inferred"}


def parse_line(line: str):
rank = tag = category = decision = None
m = re.search(r"\[rank\s+(\d+)\]", line)
if m:
rank = int(m.group(1))
m = re.search(r"\[tag\s+(\d+)\]", line)
if m:
tag = int(m.group(1))
m = re.search(r"\[(execute!|aliasing|remotecall_endpoint)\]", line)
if m:
category = m.group(1)
# Capture decision from [...] blocks
for m in re.finditer(r"\]\[([^\]]+)\]", line):
candidate = m.group(1)
if "inferred" in candidate and "communicated" not in candidate:
decision = "inferred"
break
if "communicated" in candidate:
decision = "communicated"
break
if "local+bcast" in candidate:
decision = "local+bcast"
break
if candidate.startswith("sender+"):
decision = "sender+inferred" if "inferred" in candidate else "sender+communicated"
break
if candidate == "receiver":
decision = "receiver"
break
if candidate == "receiver+bcast":
decision = "receiver+bcast"
break
if candidate == "inplace_move":
decision = "inplace_move"
break
return rank, tag, category, decision


def main():
by_tag = defaultdict(dict) # tag -> {rank: decision}
for line in sys.stdin:
rank, tag, category, decision = parse_line(line)
if rank is None or tag is None or decision is None:
continue
by_tag[tag][rank] = decision

send_keys = {"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"}
infer_keys = {"inferred", "sender+inferred", "uninvolved"}
recv_keys = {"communicated", "receiver", "sender+communicated"}

asymmetries = []
for tag in sorted(by_tag.keys()):
ranks = by_tag[tag]
senders = [r for r, d in ranks.items() if d in send_keys]
inferrers = [r for r, d in ranks.items() if d in infer_keys]
receivers = [r for r, d in ranks.items() if d in recv_keys]
if senders and inferrers:
asymmetries.append((tag, senders, inferrers, receivers, ranks))

if not asymmetries:
print("No communication decision asymmetry found (no tag has both sender and inferrer).")
return

print("=== Communication decision asymmetry (can cause deadlock) ===\n")
for tag, senders, inferrers, receivers, ranks in asymmetries:
print(f"Tag {tag}:")
print(f" Senders (will bcast to all others): {senders}")
print(f" Inferrers (did not recv): {inferrers}")
print(f" Receivers: {receivers}")
print(f" All decisions: {dict(ranks)}")
print()


if __name__ == "__main__":
main()
42 changes: 42 additions & 0 deletions benchmarks/run_distribute_fetch.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env julia
# Create a matrix with a fixed reproducible pattern, distribute it with an
# MPI procgrid, then on each rank fetch and println the chunk(s) it owns.
# Usage (from repo root, use full path to Dagger.jl):
# mpiexec -n 4 julia --project=/path/to/Dagger.jl benchmarks/run_distribute_fetch.jl

using MPI
using Dagger

if !isdefined(Dagger, :accelerate!)
error("Dagger.accelerate! not found. Run with the local Dagger project: julia --project=/path/to/Dagger.jl ...")
end
Dagger.accelerate!(:mpi)

const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const nranks = MPI.Comm_size(comm)

# Fixed reproducible pattern: 6×6 matrix, M[i,j] = 10*i + j (same on all ranks)
const N = 6
const BLOCK = 2
A = [10 * i + j for i in 1:N, j in 1:N]

# Procgrid: use Dagger's compatible processors so the procgrid passes validation
availprocs = collect(Dagger.compatible_processors())
nblocks = (cld(N, BLOCK), cld(N, BLOCK))
procgrid = reshape(
[availprocs[mod(i - 1, length(availprocs)) + 1] for i in 1:prod(nblocks)],
nblocks,
)

# Distribute so chunk (i,j) is computed on procgrid[i,j]
D = distribute(A, Blocks(BLOCK, BLOCK), procgrid)
D_fetched = fetch(D)

# On each rank: fetch and print only the chunk(s) this rank owns
for (idx, ch) in enumerate(D_fetched.chunks)
if ch isa Dagger.Chunk && ch.handle isa Dagger.MPIRef && ch.handle.rank == rank
data = fetch(ch)
println("rank $rank chunk $idx: ", data)
end
end
105 changes: 105 additions & 0 deletions benchmarks/run_matmul.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env julia
# N×N matmul benchmark (Float32); block size scales with number of ranks.
# Usage (use the full path to Dagger.jl, not "..."):
# mpiexec -n 10 julia --project=/home/felipetome/dagger-dev/mpi/Dagger.jl benchmarks/run_matmul.jl
# Set CHECK_CORRECTNESS=true to collect and compare against GPU baseline:
# CHECK_CORRECTNESS=true mpiexec -n 10 julia --project=/home/felipetome/dagger-dev/mpi/Dagger.jl benchmarks/run_matmul.jl

using MPI
using Dagger
using LinearAlgebra

if !isdefined(Dagger, :accelerate!)
error("Dagger.accelerate! not found. Run with the local Dagger project: julia --project=/path/to/Dagger.jl ...")
end
Dagger.accelerate!(:mpi)

const N = 2_000
const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const nranks = MPI.Comm_size(comm)
# Block size proportional to ranks: ~nranks blocks in 2D => side blocks ≈ √nranks
const BLOCK = max(1, ceil(Int, N / ceil(Int, sqrt(nranks))))

const CHECK_CORRECTNESS = parse(Bool, get(ENV, "CHECK_CORRECTNESS", "false"))

if rank == 0
println("Benchmark: ", nranks, " ranks, N=", N, ", block size ", BLOCK, "×", BLOCK, " (matmul)")
end

# Allocate and fill matrices in blocks (Float32)
A = rand(Blocks(BLOCK, BLOCK), Float32, N, N)
B = rand(Blocks(BLOCK, BLOCK), Float32, N, N)

# Matrix multiply C = A * B
t_matmul = @elapsed begin
C = A * B
end

if rank == 0
println("Matmul time: ", round(t_matmul; digits=4), " s")
end

# Optional: collect via datadeps (root=0). All ranks participate in the datadeps region.
if CHECK_CORRECTNESS
t_collect = @elapsed begin
A_full = Dagger.collect_datadeps(A; root=0)
B_full = Dagger.collect_datadeps(B; root=0)
C_dagger = Dagger.collect_datadeps(C; root=0)
end
if rank == 0
println("Collecting result and computing baseline for correctness check (GPU)...")
using CUDA
CUDA.functional() || error("CUDA not functional; cannot compute GPU baseline. Check CUDA driver and device.")
t_upload = @elapsed begin
A_g = CUDA.cu(A_full)
B_g = CUDA.cu(B_full)
end
println("Collect + upload time: ", round(t_collect + t_upload; digits=4), " s")

t_baseline = @elapsed begin
C_ref_g = A_g * B_g
end
println("Baseline (GPU/CUDA) time: ", round(t_baseline; digits=4), " s")

# Require all elements within 100× machine epsilon relative error (componentwise)
C_dagger_cpu = C_dagger
C_ref_cpu = Array(C_ref_g)
eps_f = eps(Float32)
rtol = 50.0f0 * eps_f
diff = C_dagger_cpu .- C_ref_cpu
# rel_ij = |diff|/|C_ref|, denominator at least eps to avoid div by zero
denom = max.(abs.(C_ref_cpu), eps_f)
rel_err = abs.(diff) ./ denom
max_rel_err = Float32(maximum(rel_err))
ok = max_rel_err <= rtol
if ok
println("Correctness: OK (max rel_err = ", max_rel_err, " <= 100×eps = ", rtol, ")")
else
println("Correctness: FAIL (max rel_err = ", max_rel_err, " > 100×eps = ", rtol, ")")
end

# Per-block: which blocks have any element with rel_err > 100×eps
n_bi = ceil(Int, N / BLOCK)
n_bj = ceil(Int, N / BLOCK)
bad_blocks = Tuple{Int,Int,Float32}[]
for bi in 1:n_bi, bj in 1:n_bj
ri = (bi - 1) * BLOCK + 1 : min(bi * BLOCK, N)
rj = (bj - 1) * BLOCK + 1 : min(bj * BLOCK, N)
block_rel = Float32(maximum(@view(rel_err[ri, rj])))
if block_rel > rtol
push!(bad_blocks, (bi, bj, block_rel))
end
end
if isempty(bad_blocks)
println("Per-block: all ", n_bi * n_bj, " blocks within 100×eps rel_err.")
else
println("Per-block: ", length(bad_blocks), " block(s) exceed 100×eps rel_err (block size ", BLOCK, "×", BLOCK, "):")
sort!(bad_blocks; by = x -> -x[3])
for (bi, bj, block_rel) in bad_blocks
println(" block [", bi, ",", bj, "] rows ", (bi - 1) * BLOCK + 1, ":", min(bi * BLOCK, N),
", cols ", (bj - 1) * BLOCK + 1, ":", min(bj * BLOCK, N), " max rel_err = ", block_rel)
end
end
end
end
Loading
Loading