Skip to content

Commit

Permalink
Implement an interface for Distributed-like libraries
Browse files Browse the repository at this point in the history
This moves support for Distributed into a package extension and adds an
interface for other distributed worker libraries to use to support Revise for
their workers.
  • Loading branch information
JamesWrigley committed Dec 28, 2024
1 parent 67ee77c commit 5955e0e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 16 deletions.
11 changes: 9 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "3.6.4"

[deps]
CodeTracking = "da1fd8a2-8d9e-5ec2-8556-3022fb5608a2"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
JuliaInterpreter = "aa1ae85d-cabe-5617-a682-6adf51b2e16a"
LibGit2 = "76f85450-5226-5b5a-8eaa-529ad045b433"
Expand All @@ -15,8 +14,15 @@ Requires = "ae029012-a4dd-5104-9daa-d747884805df"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
Unicode = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5"

[weakdeps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[extensions]
DistributedExt = "Distributed"

[compat]
CodeTracking = "1.2"
Distributed = "1"
JuliaInterpreter = "0.9"
LoweredCodeUtils = "3.0.1"
OrderedCollections = "1"
Expand All @@ -26,6 +32,7 @@ julia = "1.10"

[extras]
CatIndices = "aafaddc9-749c-510e-ac4f-586e18779b91"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
EndpointRanges = "340492b5-2a47-5f55-813d-aca7ddf97656"
EponymTuples = "97e2ac4a-e175-5f49-beb1-4d6866a6cdc3"
Example = "7876af07-990d-54b4-ab0e-23690620f79a"
Expand All @@ -41,4 +48,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
UnsafeArrays = "c4a57d5a-5b31-53a6-b365-19f8c011fbd6"

[targets]
test = ["CatIndices", "EndpointRanges", "EponymTuples", "Example", "IndirectArrays", "InteractiveUtils", "MacroTools", "MappedArrays", "Pkg", "Random", "Requires", "RoundingIntegers", "Test", "UnsafeArrays"]
test = ["CatIndices", "Distributed", "EndpointRanges", "EponymTuples", "Example", "IndirectArrays", "InteractiveUtils", "MacroTools", "MappedArrays", "Pkg", "Random", "Requires", "RoundingIntegers", "Test", "UnsafeArrays"]
24 changes: 24 additions & 0 deletions ext/DistributedExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module DistributedExt

import Distributed: myid, workers, remotecall

import Revise
import Revise: DistributedWorker


function get_workers()
map(DistributedWorker, workers())
end

function Revise.remotecall_impl(f, worker::DistributedWorker, args...; kwargs...)
remotecall(f, worker.id, args...; kwargs...)
end

Revise.is_master_worker(::typeof(get_workers)) = myid() == 1
Revise.is_master_worker(worker::DistributedWorker) = worker.id == 1

function __init__()
Revise.register_workers_function(get_workers)
end

end
78 changes: 64 additions & 14 deletions src/packagedef.jl
Original file line number Diff line number Diff line change
@@ -1,13 +1,44 @@
@eval Base.Experimental.@optlevel 1

using FileWatching, REPL, Distributed, UUIDs
using FileWatching, REPL, UUIDs
import LibGit2
using Base: PkgId
using Base.Meta: isexpr
using Core: CodeInfo

export revise, includet, entr, MethodSummary


# Abstract type to represent a single worker
abstract type AbstractWorker end

# Wrapper struct to indicate a worker belonging to the Distributed stdlib. Other
# libraries should make their own type that subtypes AbstractWorker for Revise
# to dispatch on.
struct DistributedWorker <: AbstractWorker
id::Int
end

# This is a list of functions that will retrieve a list of workers
const workers_functions = Base.Callable[]

# A distributed worker library wanting to use Revise should register their
# workers() function with this.
function register_workers_function(f::Function)
push!(workers_functions, f)
nothing
end

# The library should implement this method such that it behaves like
# Distributed.remotecall().
function remotecall_impl end

# The library should implement two methods for this function:
# - is_master_worker(::typeof(my_workers_function)): check if the current
# process is the master.
# - is_master_worker(w::MyWorkerType): check if `w` is the master.
function is_master_worker end

"""
Revise.watching_files[]
Expand Down Expand Up @@ -278,10 +309,12 @@ function delete_missing!(exs_sigs_old::ExprsSigs, exs_sigs_new)
end
@debug "DeleteMethod" _group="Action" time=time() deltainfo=(sig, MethodSummary(m))
# Delete the corresponding methods
for p in workers()
try # guard against serialization errors if the type isn't defined on the worker
remotecall(Core.eval, p, Main, :(delete_method_by_sig($sig)))
catch
for get_workers in workers_functions
for p in get_workers()
try # guard against serialization errors if the type isn't defined on the worker
remotecall_impl(Core.eval, p, Main, :(delete_method_by_sig($sig)))
catch

Check warning on line 316 in src/packagedef.jl

View check run for this annotation

Codecov / codecov/patch

src/packagedef.jl#L316

Added line #L316 was not covered by tests
end
end
end
Base.delete_method(m)
Expand Down Expand Up @@ -330,12 +363,14 @@ function eval_rex(rex::RelocatableExpr, exs_sigs_old::ExprsSigs, mod::Module; mo
if !isexpr(thunk, :thunk)
thunk = ex
end
if myid() == 1
for p in workers()
p == myid() && continue
try # don't error if `mod` isn't defined on the worker
remotecall(Core.eval, p, mod, thunk)
catch
for get_workers in workers_functions
if is_master_worker(get_workers)
for p in get_workers()
is_master_worker(p) && continue
try # don't error if `mod` isn't defined on the worker
remotecall_impl(Core.eval, p, mod, thunk)
catch

Check warning on line 372 in src/packagedef.jl

View check run for this annotation

Codecov / codecov/patch

src/packagedef.jl#L372

Added line #L372 was not covered by tests
end
end
end
end
Expand Down Expand Up @@ -1275,8 +1310,8 @@ async_steal_repl_backend() = steal_repl_backend()
Define methods on worker `p` that Revise needs in order to perform revisions on `p`.
Revise itself does not need to be running on `p`.
"""
function init_worker(p)
remotecall(Core.eval, p, Main, quote
function init_worker(p::AbstractWorker)
remotecall_impl(Core.eval, p, Main, quote
function whichtt(@nospecialize sig)
ret = Base._methods_by_ftype(sig, -1, Base.get_world_counter())
isempty(ret) && return nothing
Expand All @@ -1292,14 +1327,29 @@ function init_worker(p)
end)
end

init_worker(p::Int) = init_worker(DistributedWorker(p))

active_repl_backend_available() = isdefined(Base, :active_repl_backend) && Base.active_repl_backend !== nothing

function __init__()
ccall(:jl_generating_output, Cint, ()) == 1 && return nothing
run_on_worker = get(ENV, "JULIA_REVISE_WORKER_ONLY", "0")
if !(myid() == 1 || run_on_worker == "1")

# Find the Distributed module if it's been loaded
distributed_pkgid = Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")
distributed_module = get(Base.loaded_modules, distributed_pkgid, nothing)

# We do a little hack to figure out if this is the master worker without
# loading Distributed. When a worker is added with Distributed.addprocs() it
# calls julia with the `--worker` flag. This is processed very early during
# startup before any user code (e.g. through `-E`) is executed, so if
# Distributed is *not* loaded already then we can be sure that this is the
# master worker. And if it is loaded then we can just check
# Distributed.myid() directly.
if !(isnothing(distributed_module) || distributed_module.myid() == 1 || run_on_worker == "1")
return nothing
end

# Check Julia paths (issue #601)
if !isdir(juliadir)
major, minor = Base.VERSION.major, Base.VERSION.minor
Expand Down

0 comments on commit 5955e0e

Please sign in to comment.