diff --git a/CHANGELOG.md b/CHANGELOG.md index ccdbce50..8cc6e741 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Version 1.0 - ![Feature][badge-feature] OpenBLAS: Almost all pinning and querying functions now have `openblas_*` analogs that provide (almost) all of the same features as for regular Julia threads. Example: `openblas_pinthreads(:cores)` now works. You can also visualize the placement of OpenBLAS threads via `threadinfo(; blas=true)`. These functions are now also part of the official API (and SemVer). - ![Feature][badge-feature] Visualizing affinities: Besides `printaffinity` and `printaffinities` there is now is a "pimped" variant `visualize_affinity` which uses the `threadinfo` layout to visualize the affinity. - ![Feature][badge-feature] MPI: we've added dedicated support for pinning the Julia threads of MPI ranks, which are potentially distributed over multiple nodes. +- ![Feature][badge-feature] Distributed: we've added dedicated support for pinning the Julia threads of Julia workers (Distributed.jl), which are potentially spread over multiple nodes. - ![Breaking][badge-breaking] `threadinfo(; blas=true)` now shows the placement of OpenBLAS threads (same visualization as for regular Julia threads). - ![Breaking][badge-breaking] Pinning via environment variable (`JULIA_PIN`) now requires a `pinthreads(...; force=false)` call. This is because we've dropped the `__init__` function entirely. The environment variables `JULIA_LIKWID_PIN` has been dropped for now. (Might be reintroduced later.) - ![Breaking][badge-breaking] Pinning via Julia preferences has been dropped entirely. diff --git a/Project.toml b/Project.toml index f09a7429..2c734a0e 100644 --- a/Project.toml +++ b/Project.toml @@ -16,12 +16,15 @@ ThreadPinningCore = "6f48bc29-05ce-4cc8-baad-4adcba581a18" [weakdeps] MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" [extensions] MPIExt = "MPI" +DistributedExt = "Distributed" [compat] DelimitedFiles = "1" +Distributed = "1" Libdl = "1" LinearAlgebra = "1" MKL = "0.4, 0.6" @@ -37,6 +40,7 @@ ThreadPinningCore = "0.4.5" julia = "1.10" [extras] +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" MKL = "33e6dc65-8f57-5167-99aa-e5a354878fb2" MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" @@ -47,4 +51,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" TestItemRunner = "f8b46487-2199-4994-9208-9a1283c18c0a" [targets] -test = ["Test", "Random", "TestItemRunner", "Statistics", "MKL", "Logging", "MPI", "MPIPreferences"] +test = ["Test", "Random", "TestItemRunner", "Statistics", "MKL", "Logging", "MPI", "MPIPreferences", "Distributed"] diff --git a/docs/make.jl b/docs/make.jl index ab6c8344..9423b043 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -20,6 +20,7 @@ makedocs(; "Pinning Julia Tasks" => "examples/ex_pinning_tasks.md", "Pinning BLAS Threads" => "examples/ex_blas.md", "MPI + Threads" => "examples/ex_mpi.md", + "Distributed.jl + Threads" => "examples/ex_distributed.md", "External Affinity Mask" => "examples/ex_affinity.md", ], "References" => [ diff --git a/docs/src/examples/ex_distributed.md b/docs/src/examples/ex_distributed.md new file mode 100644 index 00000000..83dd0d06 --- /dev/null +++ b/docs/src/examples/ex_distributed.md @@ -0,0 +1,65 @@ +# Distributed.jl + Threads + +ThreadPinning.jl has dedicated support for pinning Julia threads of Julia workers (Distributed.jl) in multi-processing applications, see [Querying - Distributed.jl](@ref api_distributed_querying) and [Pinning - Distributed.jl](@ref api_distributed_pinning). Note that you can use these tools irrespective of whether your parallel application is pure (i.e. each Julia workers runs a single Julia thread) or hybrid (i.e. each Julia worker runs multiple Julia threads). + +## Basic example + +```julia +julia> using Distributed + +julia> withenv("JULIA_NUM_THREADS" => 2) do + addprocs(4) # spawn 4 workers with 2 threads each + end +4-element Vector{Int64}: + 2 + 3 + 4 + 5 + +julia> @everywhere using ThreadPinning + +julia> distributed_getcpuids() +Dict{Int64, Vector{Int64}} with 4 entries: + 5 => [246, 185] + 4 => [198, 99] + 2 => [135, 226] + 3 => [78, 184] + +julia> distributed_getispinned() # none pinned yet +Dict{Int64, Vector{Bool}} with 4 entries: + 5 => [0] + 4 => [0] + 2 => [0] + 3 => [0] + +julia> distributed_pinthreads(:sockets) # pin to sockets (round-robin) + +julia> distributed_getispinned() # all pinned +Dict{Int64, Vector{Bool}} with 4 entries: + 5 => [1, 1] + 4 => [1, 1] + 2 => [1, 1] + 3 => [1, 1] + +julia> distributed_getcpuids() +Dict{Int64, Vector{Int64}} with 4 entries: + 5 => [66, 67] + 4 => [2, 3] + 2 => [0, 1] + 3 => [64, 65] + +julia> socket(1, 1:4), socket(2, 1:4) # check +([0, 1, 2, 3], [64, 65, 66, 67]) + +julia> distributed_pinthreads(:numa) # pin to numa domains (round-robin) + +julia> distributed_getcpuids() +Dict{Int64, Vector{Int64}} with 4 entries: + 5 => [48, 49] + 4 => [32, 33] + 2 => [0, 1] + 3 => [16, 17] + +julia> numa(1, 1:2), numa(2, 1:2), numa(3, 1:2), numa(4, 1:2) # check +([0, 1], [16, 17], [32, 33], [48, 49]) +``` \ No newline at end of file diff --git a/docs/src/refs/api_pinning.md b/docs/src/refs/api_pinning.md index 38aa8950..ba667168 100644 --- a/docs/src/refs/api_pinning.md +++ b/docs/src/refs/api_pinning.md @@ -28,6 +28,13 @@ openblas_setaffinity_cpuids mpi_pinthreads ``` +## [Pinning - Distributed.jl](@id api_distributed_pinning) + +```@docs +distributed_pinthreads +distributed_unpinthreads +``` + ## Pinning - LIKWID Besides [`pinthreads`](@ref), we offer [`pinthreads_likwidpin`](@ref) which, ideally, should handle all inputs that are supported by the `-c` option of [`likwid-pin`](https://github.com/RRZE-HPC/likwid/wiki/Likwid-Pin) (e.g. `S0:1-3@S1:2,4,5` or `E:N:4:2:4`). If you encounter an input that doesn't work as expected, please file an issue. diff --git a/docs/src/refs/api_querying.md b/docs/src/refs/api_querying.md index 311ed338..ea65fb89 100644 --- a/docs/src/refs/api_querying.md +++ b/docs/src/refs/api_querying.md @@ -12,6 +12,7 @@ threadinfo getcpuids getcpuid ispinned +getispinned getaffinity printaffinity printaffinities @@ -65,3 +66,11 @@ mpi_getcpuids mpi_gethostnames mpi_getlocalrank ``` + +## [Querying - Distributed.jl](@id api_distributed_querying) + +```@docs +distributed_getcpuids +distributed_gethostnames +distributed_getispinned +``` diff --git a/ext/DistributedExt/DistributedExt.jl b/ext/DistributedExt/DistributedExt.jl new file mode 100644 index 00000000..6a5950f5 --- /dev/null +++ b/ext/DistributedExt/DistributedExt.jl @@ -0,0 +1,135 @@ +module DistributedExt + +import ThreadPinning: ThreadPinning +using Distributed: Distributed + +function getworkerpids(; include_master = false) + workers = Distributed.workers() + if include_master && !in(1, workers) + pushfirst!(workers, 1) + end + return workers +end + +# querying +function ThreadPinning.distributed_getcpuids(; include_master = false) + res = Dict{Int, Vector{Int}}() + for w in getworkerpids(; include_master) + res[w] = Distributed.@fetchfrom w ThreadPinning.getcpuids() + end + return res +end + +function ThreadPinning.distributed_gethostnames(; include_master = false) + res = Dict{Int, String}() + for w in getworkerpids(; include_master) + res[w] = Distributed.@fetchfrom w gethostname() + end + return res +end + +function ThreadPinning.distributed_getispinned(; include_master = false) + res = Dict{Int, Vector{Bool}}() + for w in getworkerpids(; include_master) + res[w] = Distributed.@fetchfrom w ThreadPinning.getispinned() + end + return res +end + +function compute_distributed_topology(hostnames_dict) + dist_topo = Vector{@NamedTuple{ + pid::Int64, localid::Int64, node::Int64, nodename::String}}( + undef, length(hostnames_dict)) + sorted_by_pid = sortperm(collect(keys(hostnames_dict))) + nodes = unique(collect(values(hostnames_dict))[sorted_by_pid]) + idx = 1 + for (inode, node) in enumerate(nodes) + workers_onnode = collect(keys(filter(p -> p[2] == node, hostnames_dict))) + sort!(workers_onnode) # on each node we sort by worker pid + for (i, r) in enumerate(workers_onnode) + dist_topo[idx] = (; pid = r, localid = i - 1, node = inode, nodename = node) + idx += 1 + end + end + return dist_topo +end + +function ThreadPinning.distributed_topology(; include_master = false) + hostnames_dict = ThreadPinning.distributed_gethostnames(; include_master) + dist_topo = compute_distributed_topology(hostnames_dict) + return dist_topo +end + +# pinning +function ThreadPinning.distributed_pinthreads(symb::Symbol; + compact = false, + include_master = false, + kwargs...) + domain_symbol2functions(symb) # to check input arg as early as possible + dist_topo = ThreadPinning.distributed_topology(; include_master) + @sync for worker in dist_topo + Distributed.remotecall( + () -> ThreadPinning._distributed_pinyourself( + symb, dist_topo; compact, kwargs...), + worker.pid) + end + return +end + +function ThreadPinning._distributed_pinyourself(symb, dist_topo; compact, kwargs...) + # println("_distributed_pinyourself START") + idx = findfirst(w -> w.pid == Distributed.myid(), dist_topo) + if isnothing(idx) + error("Couldn't find myself (worker pid $(Distributed.myid())) in distributed topology.") + end + localid = dist_topo[idx].localid + domain, ndomain = domain_symbol2functions(symb) + # compute cpuids + cpuids = cpuids_of_localid(localid, domain, ndomain; compact) + # actual pinning + ThreadPinning.pinthreads(cpuids; kwargs...) + # println("_distributed_pinyourself STOP") + return +end + +function domain_symbol2functions(symb) + if symb == :sockets + domain = ThreadPinning.socket + ndomain = ThreadPinning.nsockets + elseif symb == :numa + domain = ThreadPinning.numa + ndomain = ThreadPinning.nnuma + elseif symb == :cores + domain = ThreadPinning.core + ndomain = ThreadPinning.ncores + else + throw(ArgumentError("Invalid symbol. Supported symbols are :sockets, :numa, and :cores.")) + end + return domain, ndomain +end + +function cpuids_of_localid(localrank, domain, ndomain; + nthreads_per_proc = Threads.nthreads(), + compact = false) + i_in_domain, idomain = divrem(localrank, ndomain()) .+ 1 + idcs = ((i_in_domain - 1) * nthreads_per_proc + 1):(i_in_domain * nthreads_per_proc) + if maximum(idcs) > length(domain(idomain)) + @show maximum(idcs), length(domain(idomain)) + error("Too many Julia threads / Julia workers for the selected domain.") + end + if domain == ThreadPinning.core + cpuids = domain(idomain, idcs) + else + cpuids = domain(idomain, idcs; compact) + end + return cpuids +end + +function ThreadPinning.distributed_unpinthreads(; include_master = false, kwargs...) + @sync for w in getworkerpids(; include_master) + Distributed.@spawnat w ThreadPinning.unpinthreads(; kwargs...) + end + return +end + +end # module diff --git a/src/ThreadPinning.jl b/src/ThreadPinning.jl index 9f5ce36b..b4d17c70 100644 --- a/src/ThreadPinning.jl +++ b/src/ThreadPinning.jl @@ -14,6 +14,7 @@ include("mkl.jl") @static if Sys.islinux() include("pinning.jl") include("mpi.jl") + include("distributed.jl") include("likwid-pin.jl") else # make core pinning functions no-ops @@ -26,6 +27,7 @@ else with_pinthreads(f, args...; kwargs...) = f() pinthreads_likwidpin(args...; kwargs...) = nothing mpi_pinthreads(args...; kwargs...) = nothing + distributed_pinthreads(args...; kwargs...) = nothing openblas_pinthreads(args...; kwargs...) = nothing openblas_pinthread(args...; kwargs...) = nothing openblas_unpinthreads(args...; kwargs...) = nothing @@ -39,7 +41,7 @@ end export threadinfo ## querying -export getcpuid, getcpuids, getaffinity, getnumanode, getnumanodes +export getcpuid, getcpuids, getaffinity, getnumanode, getnumanodes, getispinned export core, numa, socket, node, cores, numas, sockets export printaffinity, printaffinities, visualize_affinity export ispinned, hyperthreading_is_enabled, ishyperthread, isefficiencycore @@ -53,6 +55,8 @@ export pinthread, pinthreads, with_pinthreads, unpinthread, unpinthreads export setaffinity, setaffinity_cpuids export pinthreads_likwidpin, likwidpin_domains, likwidpin_to_cpuids export mpi_pinthreads, mpi_getcpuids, mpi_gethostnames, mpi_getlocalrank +export distributed_pinthreads, distributed_getcpuids, distributed_gethostnames, + distributed_getispinned, distributed_unpinthreads export openblas_setaffinity, openblas_setaffinity_cpuids, openblas_pinthread, openblas_pinthreads, openblas_unpinthread, openblas_unpinthreads diff --git a/src/distributed.jl b/src/distributed.jl new file mode 100644 index 00000000..6fe0ea8f --- /dev/null +++ b/src/distributed.jl @@ -0,0 +1,77 @@ +""" + distributed_pinthreads(symbol; + include_master = false, + compact = false, + nthreads_per_proc = Threads.nthreads(), + kwargs...) + +Pin the Julia threads of Julia workers in a round-robin fashion to specific domains +(e.g. sockets). Supported domains (`symbol`) are `:sockets`, `:numa`, and `:cores`. + +When calling this function, the Julia threads of all Julia workers will be +distributed in a round-robin fashion among the specified domains and will be pinned to +non-overlapping ranges of CPU-threads within the domains. + +A multi-node setup, where Julia workers are hosted on different nodes, is supported. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be +pinned as well. + +If `compact=false` (default), physical cores are occupied before hyperthreads. Otherwise, +CPU-cores - with potentially multiple CPU-threads - are filled up one after another +(compact pinning). + +*Example:* + +``` +using Distributed +addprocs(3) +@everywhere using ThreadPinning +distributed_pinthreads(:sockets) +``` +""" +function distributed_pinthreads end + +""" +Unpin all threads on all Julia workers. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be +unpinned as well. +""" +function distributed_unpinthreads end + +""" +Returns a `Dict{Int, Vector{Int}}` where the keys are the pids of the Julia workers and +the values are the CPU IDs of the CPU-threads that are currently +running the Julia threads of the worker. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included. +""" +function distributed_getcpuids end + +""" +Returns a `Dict{Int, String}` where the keys are the pids of the Julia workers and the +values are the hostnames of the nodes that are currently hosting the respective workers. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included. +""" +function distributed_gethostnames end + +""" +Returns a `Dict{Int, Vector{Bool}}` where the keys are the pids of the Julia workers and +the values are the results of `ThreadPinning.ispinned` evaluated for all Julia threads of +a worker. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included. +""" +function distributed_getispinned end + +""" +Returns a vector of named tuples. Each named tuple represents a +Julia worker and has keys `pid`, `localid`, `node`, and `nodename`. + +If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included. +""" +function distributed_topology end + +function _distributed_pinyourself end diff --git a/src/querying.jl b/src/querying.jl index b00663f4..74ebc5f4 100644 --- a/src/querying.jl +++ b/src/querying.jl @@ -53,6 +53,16 @@ Julia threads that belong to a specific thread pool. """ function getnumanodes end +""" + getispinned(; threadpool = :default) + +Returns the results of `ispinned` for all Julia threads. + +The keyword argument `threadpool` (default: `:default`) may be used to specify a specific +thread pool. +""" +function getispinned end + """ Returns the CPU IDs that belong to core `i` (logical index, starts at 1). Set `shuffle=true` to randomize. @@ -259,7 +269,7 @@ function openblas_printaffinities end module Querying -import ThreadPinning: getcpuid, getcpuids, getnumanode, getnumanodes +import ThreadPinning: getcpuid, getcpuids, getnumanode, getnumanodes, getispinned import ThreadPinning: printaffinity, printaffinities, getaffinity, visualize_affinity import ThreadPinning: ispinned, ishyperthread, hyperthreading_is_enabled, isefficiencycore import ThreadPinning: ncputhreads, ncores, nnuma, nsockets, ncorekinds, nsmt @@ -308,6 +318,11 @@ openblas_getcpuids(; kwargs...) = ThreadPinningCore.openblas_getcpuids(; kwargs. openblas_ispinned(; kwargs...) = ThreadPinningCore.openblas_ispinned(; kwargs...) # no (direct) forwarding +function getispinned(; threadpool = :default) + tids = ThreadPinningCore.threadids(; threadpool) + return [ispinned(; threadid = t) for t in tids] +end + function printaffinity(; threadid = Threads.threadid(), io = getstdout(), kwargs...) mask = ThreadPinningCore.getaffinity(; threadid) str = _affinity_to_string(mask; kwargs...) diff --git a/test/runtests.jl b/test/runtests.jl index a4c61120..c16368ce 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,3 +40,6 @@ end @testitem "mpi" begin Sys.islinux() && include("tests_mpi.jl") end +@testitem "distributed" begin + Sys.islinux() && include("tests_distributed.jl") +end diff --git a/test/tests_distributed.jl b/test/tests_distributed.jl new file mode 100644 index 00000000..5189ea12 --- /dev/null +++ b/test/tests_distributed.jl @@ -0,0 +1,129 @@ +include("common.jl") +using Test +using ThreadPinning +using Distributed: Distributed + +function check_roundrobin(cpuids_dict, f_cpuids, nf) + idomain = 1 + for pid in sort!(collect(keys(cpuids_dict))) + # @show pid, idomain, cpuids_dict[pid] + cpuids_domain = f_cpuids(idomain) + cpuids_worker = cpuids_dict[pid] + all(c -> c in cpuids_domain, cpuids_worker) || return false + idomain = mod1(idomain + 1, nf()) + end + return true +end + +function dist_querying_tests() + workerpids = Distributed.workers() + + # distributed_getcpuids + cpuids_dict = distributed_getcpuids() + @test cpuids_dict isa Dict{Int, Vector{Int}} + @test length(keys(cpuids_dict)) == nworkers + for (pid, cpuids_worker) in cpuids_dict + @test pid in workerpids + @test length(cpuids_worker) == nthreads_of_workers[pid] + end + cpuids_dict = distributed_getcpuids(; include_master = true) + @test 1 in keys(cpuids_dict) + @test length(keys(cpuids_dict)) == nworkers + 1 + + # distributed_gethostnames + hostnames_dict = distributed_gethostnames() + @test hostnames_dict isa Dict{Int, String} + @test length(keys(hostnames_dict)) == nworkers + for (pid, hostname_worker) in hostnames_dict + @test pid in workerpids + @test hostname_worker == gethostname() # we run this test only on a single node + end + hostnames_dict = distributed_gethostnames(; include_master = true) + @test 1 in keys(hostnames_dict) + @test length(keys(hostnames_dict)) == nworkers + 1 + + # distributed_getispinned + ispinned_dict = distributed_getispinned() + @test ispinned_dict isa Dict{Int, Vector{Bool}} + @test length(keys(ispinned_dict)) == nworkers + for (pid, ispinned_worker) in ispinned_dict + @test pid in workerpids + @test length(ispinned_worker) == nthreads_of_workers[pid] + end + ispinned_dict = distributed_getispinned(; include_master = true) + @test 1 in keys(ispinned_dict) + @test length(keys(ispinned_dict)) == nworkers + 1 + return +end + +function dist_pinning_tests() + # unpinning + @test isnothing(distributed_unpinthreads(; include_master = true)) + @test all( + iszero, Iterators.flatten(values(distributed_getispinned(; include_master = true)))) + + # pinning + @test isnothing(distributed_pinthreads(:sockets)) + @test all(isone, Iterators.flatten(values(distributed_getispinned()))) + + # pinning (correct cpuids) + for (symb, f, nf) in ((:sockets, socket, nsockets), (:numa, numa, nnuma)) + @test isnothing(distributed_pinthreads(symb)) + cpuids_dict = distributed_getcpuids() + hostnames_dict = distributed_gethostnames() + nodes = unique(values(hostnames_dict)) + for n in nodes + # on each node we expect round-robin order + workers_onnode = collect(keys(filter(p -> p[2] == n, hostnames_dict))) + cpuids_workers_onnode = filter(p -> p[1] in workers_onnode, cpuids_dict) + @test check_roundrobin(cpuids_workers_onnode, f, nf) + end + end + + # unpinning (after pinning) + @test isnothing(distributed_unpinthreads()) + @test all(iszero, Iterators.flatten(values(distributed_getispinned()))) + return +end + +const nworkers = min(ncputhreads(), 4) +const julia_num_threads_for_workers = 1 + +# start workers +withenv("JULIA_NUM_THREADS" => julia_num_threads_for_workers) do + Distributed.addprocs(nworkers) +end +Distributed.@everywhere using ThreadPinning +const nthreads_of_workers = Dict(pid => (i == 1 ? Threads.nthreads() : + julia_num_threads_for_workers) +for (i, pid) in enumerate(vcat([1], Distributed.workers())) +) + +# run tests +try + @testset "HostSystem" begin + println("") + @warn("\nHostSystem\n") + dist_querying_tests() + dist_pinning_tests() + end + + @testset "TestSystems" begin + # for name in ("PerlmutterComputeNode",) + for name in ThreadPinning.Faking.systems() + println("") + @warn("\nTestSystem: $name\n") + Distributed.@everywhere ThreadPinning.Faking.start($name) + @testset "$name" begin + dist_querying_tests() + dist_pinning_tests() + end + Distributed.@everywhere ThreadPinning.Faking.stop() + end + println() + end +finally + # cleanup + Distributed.rmprocs(Distributed.workers()) + ThreadPinning.Faking.stop() +end