Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed extension #101

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Version 1.0
- ![Feature][badge-feature] OpenBLAS: Almost all pinning and querying functions now have `openblas_*` analogs that provide (almost) all of the same features as for regular Julia threads. Example: `openblas_pinthreads(:cores)` now works. You can also visualize the placement of OpenBLAS threads via `threadinfo(; blas=true)`. These functions are now also part of the official API (and SemVer).
- ![Feature][badge-feature] Visualizing affinities: Besides `printaffinity` and `printaffinities` there is now is a "pimped" variant `visualize_affinity` which uses the `threadinfo` layout to visualize the affinity.
- ![Feature][badge-feature] MPI: we've added dedicated support for pinning the Julia threads of MPI ranks, which are potentially distributed over multiple nodes.
- ![Feature][badge-feature] Distributed: we've added dedicated support for pinning the Julia threads of Julia workers (Distributed.jl), which are potentially spread over multiple nodes.
- ![Breaking][badge-breaking] `threadinfo(; blas=true)` now shows the placement of OpenBLAS threads (same visualization as for regular Julia threads).
- ![Breaking][badge-breaking] Pinning via environment variable (`JULIA_PIN`) now requires a `pinthreads(...; force=false)` call. This is because we've dropped the `__init__` function entirely. The environment variables `JULIA_LIKWID_PIN` has been dropped for now. (Might be reintroduced later.)
- ![Breaking][badge-breaking] Pinning via Julia preferences has been dropped entirely.
Expand Down
6 changes: 5 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ ThreadPinningCore = "6f48bc29-05ce-4cc8-baad-4adcba581a18"

[weakdeps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[extensions]
MPIExt = "MPI"
DistributedExt = "Distributed"

[compat]
DelimitedFiles = "1"
Distributed = "1"
Libdl = "1"
LinearAlgebra = "1"
MKL = "0.4, 0.6"
Expand All @@ -37,6 +40,7 @@ ThreadPinningCore = "0.4.5"
julia = "1.10"

[extras]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
MKL = "33e6dc65-8f57-5167-99aa-e5a354878fb2"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Expand All @@ -47,4 +51,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TestItemRunner = "f8b46487-2199-4994-9208-9a1283c18c0a"

[targets]
test = ["Test", "Random", "TestItemRunner", "Statistics", "MKL", "Logging", "MPI", "MPIPreferences"]
test = ["Test", "Random", "TestItemRunner", "Statistics", "MKL", "Logging", "MPI", "MPIPreferences", "Distributed"]
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ makedocs(;
"Pinning Julia Tasks" => "examples/ex_pinning_tasks.md",
"Pinning BLAS Threads" => "examples/ex_blas.md",
"MPI + Threads" => "examples/ex_mpi.md",
"Distributed.jl + Threads" => "examples/ex_distributed.md",
"External Affinity Mask" => "examples/ex_affinity.md",
],
"References" => [
Expand Down
65 changes: 65 additions & 0 deletions docs/src/examples/ex_distributed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Distributed.jl + Threads

ThreadPinning.jl has dedicated support for pinning Julia threads of Julia workers (Distributed.jl) in multi-processing applications, see [Querying - Distributed.jl](@ref api_distributed_querying) and [Pinning - Distributed.jl](@ref api_distributed_pinning). Note that you can use these tools irrespective of whether your parallel application is pure (i.e. each Julia workers runs a single Julia thread) or hybrid (i.e. each Julia worker runs multiple Julia threads).

## Basic example

```julia
julia> using Distributed

julia> withenv("JULIA_NUM_THREADS" => 2) do
addprocs(4) # spawn 4 workers with 2 threads each
end
4-element Vector{Int64}:
2
3
4
5

julia> @everywhere using ThreadPinning

julia> distributed_getcpuids()
Dict{Int64, Vector{Int64}} with 4 entries:
5 => [246, 185]
4 => [198, 99]
2 => [135, 226]
3 => [78, 184]

julia> distributed_getispinned() # none pinned yet
Dict{Int64, Vector{Bool}} with 4 entries:
5 => [0]
4 => [0]
2 => [0]
3 => [0]

julia> distributed_pinthreads(:sockets) # pin to sockets (round-robin)

julia> distributed_getispinned() # all pinned
Dict{Int64, Vector{Bool}} with 4 entries:
5 => [1, 1]
4 => [1, 1]
2 => [1, 1]
3 => [1, 1]

julia> distributed_getcpuids()
Dict{Int64, Vector{Int64}} with 4 entries:
5 => [66, 67]
4 => [2, 3]
2 => [0, 1]
3 => [64, 65]

julia> socket(1, 1:4), socket(2, 1:4) # check
([0, 1, 2, 3], [64, 65, 66, 67])

julia> distributed_pinthreads(:numa) # pin to numa domains (round-robin)

julia> distributed_getcpuids()
Dict{Int64, Vector{Int64}} with 4 entries:
5 => [48, 49]
4 => [32, 33]
2 => [0, 1]
3 => [16, 17]

julia> numa(1, 1:2), numa(2, 1:2), numa(3, 1:2), numa(4, 1:2) # check
([0, 1], [16, 17], [32, 33], [48, 49])
```
7 changes: 7 additions & 0 deletions docs/src/refs/api_pinning.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ openblas_setaffinity_cpuids
mpi_pinthreads
```

## [Pinning - Distributed.jl](@id api_distributed_pinning)

```@docs
distributed_pinthreads
distributed_unpinthreads
```

## Pinning - LIKWID

Besides [`pinthreads`](@ref), we offer [`pinthreads_likwidpin`](@ref) which, ideally, should handle all inputs that are supported by the `-c` option of [`likwid-pin`](https://github.com/RRZE-HPC/likwid/wiki/Likwid-Pin) (e.g. `S0:1-3@S1:2,4,5` or `E:N:4:2:4`). If you encounter an input that doesn't work as expected, please file an issue.
Expand Down
9 changes: 9 additions & 0 deletions docs/src/refs/api_querying.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ threadinfo
getcpuids
getcpuid
ispinned
getispinned
getaffinity
printaffinity
printaffinities
Expand Down Expand Up @@ -65,3 +66,11 @@ mpi_getcpuids
mpi_gethostnames
mpi_getlocalrank
```

## [Querying - Distributed.jl](@id api_distributed_querying)

```@docs
distributed_getcpuids
distributed_gethostnames
distributed_getispinned
```
135 changes: 135 additions & 0 deletions ext/DistributedExt/DistributedExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
module DistributedExt

import ThreadPinning: ThreadPinning
using Distributed: Distributed

function getworkerpids(; include_master = false)
workers = Distributed.workers()
if include_master && !in(1, workers)
pushfirst!(workers, 1)
end
return workers
end

# querying
function ThreadPinning.distributed_getcpuids(; include_master = false)
res = Dict{Int, Vector{Int}}()
for w in getworkerpids(; include_master)
res[w] = Distributed.@fetchfrom w ThreadPinning.getcpuids()
end
return res
end

function ThreadPinning.distributed_gethostnames(; include_master = false)
res = Dict{Int, String}()
for w in getworkerpids(; include_master)
res[w] = Distributed.@fetchfrom w gethostname()
end
return res
end

function ThreadPinning.distributed_getispinned(; include_master = false)
res = Dict{Int, Vector{Bool}}()
for w in getworkerpids(; include_master)
res[w] = Distributed.@fetchfrom w ThreadPinning.getispinned()
end
return res
end

function compute_distributed_topology(hostnames_dict)
dist_topo = Vector{@NamedTuple{
pid::Int64, localid::Int64, node::Int64, nodename::String}}(
undef, length(hostnames_dict))
sorted_by_pid = sortperm(collect(keys(hostnames_dict)))
nodes = unique(collect(values(hostnames_dict))[sorted_by_pid])
idx = 1
for (inode, node) in enumerate(nodes)
workers_onnode = collect(keys(filter(p -> p[2] == node, hostnames_dict)))
sort!(workers_onnode) # on each node we sort by worker pid
for (i, r) in enumerate(workers_onnode)
dist_topo[idx] = (; pid = r, localid = i - 1, node = inode, nodename = node)
idx += 1
end
end
return dist_topo
end

function ThreadPinning.distributed_topology(; include_master = false)
hostnames_dict = ThreadPinning.distributed_gethostnames(; include_master)
dist_topo = compute_distributed_topology(hostnames_dict)
return dist_topo
end

# pinning
function ThreadPinning.distributed_pinthreads(symb::Symbol;
compact = false,
include_master = false,
kwargs...)
domain_symbol2functions(symb) # to check input arg as early as possible
dist_topo = ThreadPinning.distributed_topology(; include_master)
@sync for worker in dist_topo
Distributed.remotecall(
() -> ThreadPinning._distributed_pinyourself(
symb, dist_topo; compact, kwargs...),
worker.pid)
end
return
end

function ThreadPinning._distributed_pinyourself(symb, dist_topo; compact, kwargs...)
# println("_distributed_pinyourself START")
idx = findfirst(w -> w.pid == Distributed.myid(), dist_topo)
if isnothing(idx)
error("Couldn't find myself (worker pid $(Distributed.myid())) in distributed topology.")
end
localid = dist_topo[idx].localid
domain, ndomain = domain_symbol2functions(symb)
# compute cpuids
cpuids = cpuids_of_localid(localid, domain, ndomain; compact)
# actual pinning
ThreadPinning.pinthreads(cpuids; kwargs...)
# println("_distributed_pinyourself STOP")
return
end

function domain_symbol2functions(symb)
if symb == :sockets
domain = ThreadPinning.socket
ndomain = ThreadPinning.nsockets
elseif symb == :numa
domain = ThreadPinning.numa
ndomain = ThreadPinning.nnuma
elseif symb == :cores
domain = ThreadPinning.core
ndomain = ThreadPinning.ncores
else
throw(ArgumentError("Invalid symbol. Supported symbols are :sockets, :numa, and :cores."))
end
return domain, ndomain
end

function cpuids_of_localid(localrank, domain, ndomain;
nthreads_per_proc = Threads.nthreads(),
compact = false)
i_in_domain, idomain = divrem(localrank, ndomain()) .+ 1
idcs = ((i_in_domain - 1) * nthreads_per_proc + 1):(i_in_domain * nthreads_per_proc)
if maximum(idcs) > length(domain(idomain))
@show maximum(idcs), length(domain(idomain))
error("Too many Julia threads / Julia workers for the selected domain.")
end
if domain == ThreadPinning.core
cpuids = domain(idomain, idcs)
else
cpuids = domain(idomain, idcs; compact)
end
return cpuids
end

function ThreadPinning.distributed_unpinthreads(; include_master = false, kwargs...)
@sync for w in getworkerpids(; include_master)
Distributed.@spawnat w ThreadPinning.unpinthreads(; kwargs...)
end
return
end

end # module
6 changes: 5 additions & 1 deletion src/ThreadPinning.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ include("mkl.jl")
@static if Sys.islinux()
include("pinning.jl")
include("mpi.jl")
include("distributed.jl")
include("likwid-pin.jl")
else
# make core pinning functions no-ops
Expand All @@ -26,6 +27,7 @@ else
with_pinthreads(f, args...; kwargs...) = f()
pinthreads_likwidpin(args...; kwargs...) = nothing
mpi_pinthreads(args...; kwargs...) = nothing
distributed_pinthreads(args...; kwargs...) = nothing
openblas_pinthreads(args...; kwargs...) = nothing
openblas_pinthread(args...; kwargs...) = nothing
openblas_unpinthreads(args...; kwargs...) = nothing
Expand All @@ -39,7 +41,7 @@ end
export threadinfo

## querying
export getcpuid, getcpuids, getaffinity, getnumanode, getnumanodes
export getcpuid, getcpuids, getaffinity, getnumanode, getnumanodes, getispinned
export core, numa, socket, node, cores, numas, sockets
export printaffinity, printaffinities, visualize_affinity
export ispinned, hyperthreading_is_enabled, ishyperthread, isefficiencycore
Expand All @@ -53,6 +55,8 @@ export pinthread, pinthreads, with_pinthreads, unpinthread, unpinthreads
export setaffinity, setaffinity_cpuids
export pinthreads_likwidpin, likwidpin_domains, likwidpin_to_cpuids
export mpi_pinthreads, mpi_getcpuids, mpi_gethostnames, mpi_getlocalrank
export distributed_pinthreads, distributed_getcpuids, distributed_gethostnames,
distributed_getispinned, distributed_unpinthreads
export openblas_setaffinity, openblas_setaffinity_cpuids,
openblas_pinthread, openblas_pinthreads,
openblas_unpinthread, openblas_unpinthreads
Expand Down
77 changes: 77 additions & 0 deletions src/distributed.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
distributed_pinthreads(symbol;
include_master = false,
compact = false,
nthreads_per_proc = Threads.nthreads(),
kwargs...)

Pin the Julia threads of Julia workers in a round-robin fashion to specific domains
(e.g. sockets). Supported domains (`symbol`) are `:sockets`, `:numa`, and `:cores`.

When calling this function, the Julia threads of all Julia workers will be
distributed in a round-robin fashion among the specified domains and will be pinned to
non-overlapping ranges of CPU-threads within the domains.

A multi-node setup, where Julia workers are hosted on different nodes, is supported.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be
pinned as well.

If `compact=false` (default), physical cores are occupied before hyperthreads. Otherwise,
CPU-cores - with potentially multiple CPU-threads - are filled up one after another
(compact pinning).

*Example:*

```
using Distributed
addprocs(3)
@everywhere using ThreadPinning
distributed_pinthreads(:sockets)
```
"""
function distributed_pinthreads end

"""
Unpin all threads on all Julia workers.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be
unpinned as well.
"""
function distributed_unpinthreads end

"""
Returns a `Dict{Int, Vector{Int}}` where the keys are the pids of the Julia workers and
the values are the CPU IDs of the CPU-threads that are currently
running the Julia threads of the worker.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included.
"""
function distributed_getcpuids end

"""
Returns a `Dict{Int, String}` where the keys are the pids of the Julia workers and the
values are the hostnames of the nodes that are currently hosting the respective workers.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included.
"""
function distributed_gethostnames end

"""
Returns a `Dict{Int, Vector{Bool}}` where the keys are the pids of the Julia workers and
the values are the results of `ThreadPinning.ispinned` evaluated for all Julia threads of
a worker.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included.
"""
function distributed_getispinned end

"""
Returns a vector of named tuples. Each named tuple represents a
Julia worker and has keys `pid`, `localid`, `node`, and `nodename`.

If `include_master=true`, the master process (`Distributed.myid() == 1`) will be included.
"""
function distributed_topology end

function _distributed_pinyourself end
Loading