Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an interface for Distributed-like libraries #871

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ jobs:
fail-fast: false
matrix:
version:
- '1.6' # LTS
- '1.9' # parsing errors branch on 1.10, so test the last pre-1.10 version
- '1.10' # LTS
- '1' # current stable
- 'pre' # next release, if available
os:
Expand Down
13 changes: 10 additions & 3 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "3.6.4"

[deps]
CodeTracking = "da1fd8a2-8d9e-5ec2-8556-3022fb5608a2"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
JuliaInterpreter = "aa1ae85d-cabe-5617-a682-6adf51b2e16a"
LibGit2 = "76f85450-5226-5b5a-8eaa-529ad045b433"
Expand All @@ -15,17 +14,25 @@ Requires = "ae029012-a4dd-5104-9daa-d747884805df"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
Unicode = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5"

[weakdeps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

[extensions]
DistributedExt = "Distributed"

[compat]
CodeTracking = "1.2"
Distributed = "1"
JuliaInterpreter = "0.9"
LoweredCodeUtils = "3.0.1"
OrderedCollections = "1"
# Exclude Requires-1.1.0 - see https://github.com/JuliaPackaging/Requires.jl/issues/94
Requires = "~1.0, ^1.1.1"
julia = "1.6"
julia = "1.10"

[extras]
CatIndices = "aafaddc9-749c-510e-ac4f-586e18779b91"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
EndpointRanges = "340492b5-2a47-5f55-813d-aca7ddf97656"
EponymTuples = "97e2ac4a-e175-5f49-beb1-4d6866a6cdc3"
Example = "7876af07-990d-54b4-ab0e-23690620f79a"
Expand All @@ -41,4 +48,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
UnsafeArrays = "c4a57d5a-5b31-53a6-b365-19f8c011fbd6"

[targets]
test = ["CatIndices", "EndpointRanges", "EponymTuples", "Example", "IndirectArrays", "InteractiveUtils", "MacroTools", "MappedArrays", "Pkg", "Random", "Requires", "RoundingIntegers", "Test", "UnsafeArrays"]
test = ["CatIndices", "Distributed", "EndpointRanges", "EponymTuples", "Example", "IndirectArrays", "InteractiveUtils", "MacroTools", "MappedArrays", "Pkg", "Random", "Requires", "RoundingIntegers", "Test", "UnsafeArrays"]
24 changes: 24 additions & 0 deletions ext/DistributedExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module DistributedExt

import Distributed: myid, workers, remotecall

import Revise
import Revise: DistributedWorker


function get_workers()
map(DistributedWorker, workers())
end

function Revise.remotecall_impl(f, worker::DistributedWorker, args...; kwargs...)
remotecall(f, worker.id, args...; kwargs...)
end

Revise.is_master_worker(::typeof(get_workers)) = myid() == 1
Revise.is_master_worker(worker::DistributedWorker) = worker.id == 1

function __init__()
Revise.register_workers_function(get_workers)
end

end
94 changes: 66 additions & 28 deletions src/packagedef.jl
Original file line number Diff line number Diff line change
@@ -1,13 +1,44 @@
@eval Base.Experimental.@optlevel 1

using FileWatching, REPL, Distributed, UUIDs
using FileWatching, REPL, UUIDs
import LibGit2
using Base: PkgId
using Base.Meta: isexpr
using Core: CodeInfo

export revise, includet, entr, MethodSummary


# Abstract type to represent a single worker
abstract type AbstractWorker end

# Wrapper struct to indicate a worker belonging to the Distributed stdlib. Other
# libraries should make their own type that subtypes AbstractWorker for Revise
# to dispatch on.
struct DistributedWorker <: AbstractWorker
id::Int
end

# This is a list of functions that will retrieve a list of workers
const workers_functions = Base.Callable[]

# A distributed worker library wanting to use Revise should register their
# workers() function with this.
function register_workers_function(f::Base.Callable)
push!(workers_functions, f)
nothing
end

# The library should implement this method such that it behaves like
# Distributed.remotecall().
function remotecall_impl end

# The library should implement two methods for this function:
# - is_master_worker(::typeof(my_workers_function)): check if the current
# process is the master.
# - is_master_worker(w::MyWorkerType): check if `w` is the master.
function is_master_worker end

"""
Revise.watching_files[]

Expand Down Expand Up @@ -178,11 +209,7 @@
normpath(candidate)
end

if VERSION >= v"1.8"
Core.eval(@__MODULE__, :(global juliadir::String))
else
Core.eval(@__MODULE__, :(global juliadir #= ::Any =#; nothing))
end

"""
Revise.juliadir
Expand Down Expand Up @@ -260,11 +287,7 @@
# ex was deleted
sigs === nothing && continue
for sig in sigs
@static if VERSION ≥ v"1.10.0-DEV.873"
ret = Base._methods_by_ftype(sig, -1, Base.get_world_counter())
else
ret = Base._methods_by_ftype(sig, -1, typemax(UInt))
end
ret = Base._methods_by_ftype(sig, -1, Base.get_world_counter())
success = false
if !isempty(ret)
m = get_method_from_match(ret[end]) # the last method returned is the least-specific that matches, and thus most likely to be type-equal
Expand All @@ -286,10 +309,12 @@
end
@debug "DeleteMethod" _group="Action" time=time() deltainfo=(sig, MethodSummary(m))
# Delete the corresponding methods
for p in workers()
try # guard against serialization errors if the type isn't defined on the worker
remotecall(Core.eval, p, Main, :(delete_method_by_sig($sig)))
catch
for get_workers in workers_functions
for p in get_workers()
try # guard against serialization errors if the type isn't defined on the worker
remotecall_impl(Core.eval, p, Main, :(delete_method_by_sig($sig)))
catch

Check warning on line 316 in src/packagedef.jl

View check run for this annotation

Codecov / codecov/patch

src/packagedef.jl#L316

Added line #L316 was not covered by tests
end
end
end
Base.delete_method(m)
Expand Down Expand Up @@ -338,12 +363,14 @@
if !isexpr(thunk, :thunk)
thunk = ex
end
if myid() == 1
for p in workers()
p == myid() && continue
try # don't error if `mod` isn't defined on the worker
remotecall(Core.eval, p, mod, thunk)
catch
for get_workers in workers_functions
if is_master_worker(get_workers)
for p in get_workers()
is_master_worker(p) && continue
try # don't error if `mod` isn't defined on the worker
remotecall_impl(Core.eval, p, mod, thunk)
catch

Check warning on line 372 in src/packagedef.jl

View check run for this annotation

Codecov / codecov/patch

src/packagedef.jl#L372

Added line #L372 was not covered by tests
end
end
end
end
Expand Down Expand Up @@ -1283,14 +1310,10 @@
Define methods on worker `p` that Revise needs in order to perform revisions on `p`.
Revise itself does not need to be running on `p`.
"""
function init_worker(p)
remotecall(Core.eval, p, Main, quote
function init_worker(p::AbstractWorker)
remotecall_impl(Core.eval, p, Main, quote
function whichtt(@nospecialize sig)
@static if VERSION ≥ v"1.10.0-DEV.873"
ret = Base._methods_by_ftype(sig, -1, Base.get_world_counter())
else
ret = Base._methods_by_ftype(sig, -1, typemax(UInt))
end
ret = Base._methods_by_ftype(sig, -1, Base.get_world_counter())
isempty(ret) && return nothing
m = ret[end][3]::Method # the last method returned is the least-specific that matches, and thus most likely to be type-equal
methsig = m.sig
Expand All @@ -1304,14 +1327,29 @@
end)
end

init_worker(p::Int) = init_worker(DistributedWorker(p))

active_repl_backend_available() = isdefined(Base, :active_repl_backend) && Base.active_repl_backend !== nothing

function __init__()
ccall(:jl_generating_output, Cint, ()) == 1 && return nothing
run_on_worker = get(ENV, "JULIA_REVISE_WORKER_ONLY", "0")
if !(myid() == 1 || run_on_worker == "1")

# Find the Distributed module if it's been loaded
distributed_pkgid = Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")
distributed_module = get(Base.loaded_modules, distributed_pkgid, nothing)

# We do a little hack to figure out if this is the master worker without
# loading Distributed. When a worker is added with Distributed.addprocs() it
# calls julia with the `--worker` flag. This is processed very early during
# startup before any user code (e.g. through `-E`) is executed, so if
# Distributed is *not* loaded already then we can be sure that this is the
# master worker. And if it is loaded then we can just check
# Distributed.myid() directly.
if !(isnothing(distributed_module) || distributed_module.myid() == 1 || run_on_worker == "1")
return nothing
end

# Check Julia paths (issue #601)
if !isdir(juliadir)
major, minor = Base.VERSION.major, Base.VERSION.minor
Expand Down
20 changes: 1 addition & 19 deletions src/parsing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ function parse_source!(mod_exprs_sigs::ModuleExprsSigs, src::AbstractString, fil
ex = Base.parse_input_line(src; filename=filename)
ex === nothing && return mod_exprs_sigs
if isexpr(ex, :error) || isexpr(ex, :incomplete)
if Base.VERSION >= v"1.10"
eval(ex) # this will throw, so the statements below will not execute
end
prevex, pos = first_bad_position(src)
ln = count(isequal('\n'), SubString(src, 1, min(pos, length(src)))) + 1
throw(LoadError(filename, ln, ex.args[1]))
eval(ex)
end
return process_source!(mod_exprs_sigs, ex, filename, mod; kwargs...)
end
Expand Down Expand Up @@ -81,16 +76,3 @@ function process_source!(mod_exprs_sigs::ModuleExprsSigs, ex, filename, mod::Mod
end
return mod_exprs_sigs
end

if Base.VERSION < v"1.10"
function first_bad_position(str)
ex, pos, n = nothing, 1, length(str)
while pos < n
ex, pos = Meta.parse(str, pos; greedy=true, raise=false)
if isexpr(ex, :error) || isexpr(ex, :incomplete)
return ex, pos
end
end
error("expected an error, finished without one")
end
end
Loading