Skip to content

Commit

Permalink
improve synch between Julia cluster and scalesets
Browse files Browse the repository at this point in the history
  • Loading branch information
samtkaplan committed Sep 18, 2024
1 parent b4dc6f3 commit e1e184e
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions src/AzManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,10 @@ end
function scaleset_pruning()
interval = parse(Int, get(ENV, "JULIA_AZMANAGERS_PRUNE_POLL_INTERVAL", "600"))

manager = azmanager()
while true
try
lock(manager.lock)
#=
The following seems required for an over-provisioned scaleset. it
is not clear why this is needed.
Expand All @@ -260,6 +262,7 @@ function scaleset_pruning()
@error "scaleset pruning error"
logerror(e, Logging.Error)
finally
unlock(manager.lock)
sleep(interval)
end
end
Expand Down Expand Up @@ -348,12 +351,16 @@ function scaleset_sync()
for scaleset in keys(_scalesets)
_scalesets[scaleset] = scaleset_capacity(manager, scaleset.subscriptionid, scaleset.resourcegroup, scaleset.scalesetname, manager.nretry, manager.verbose)
end
@debug "since client/server are out of sync, calling prune_cluster and prune_scalesets methods"
prune_cluster()
prune_scalesets()
end
catch e
@error "scaleset syncing error"
logerror(e)
finally
unlock(manager.lock)
end
unlock(manager.lock)
end

function prune_cluster()
Expand Down Expand Up @@ -425,23 +432,29 @@ function prune_scalesets()

_scalesets = scalesets(manager)

# list of workers registered with Distributed.jl, organized by scale-set
# list of workers registered with Distributed.jl and discoverable via the 'workers()' method, organized by scale-set
instanceids = Dict{ScaleSet,Array{String}}()
for wrkr in values(Distributed.map_pid_wrkr)
if isdefined(wrkr, :id) && isdefined(wrkr, :config) && isa(wrkr, Distributed.Worker)
if isdefined(wrkr.config, :userdata) && isa(wrkr.config.userdata, Dict)
userdata = wrkr.config.userdata
if haskey(userdata, "instanceid") && haskey(userdata, "scalesetname") && haskey(userdata, "resourcegroup") && haskey(userdata, "subscriptionid")
ss = ScaleSet(userdata["subscriptionid"], userdata["resourcegroup"], userdata["scalesetname"])
if haskey(instanceids, ss)
push!(instanceids[ss], userdata["instanceid"])
else
instanceids[ss] = [userdata["instanceid"]]
for worker_id in workers()
if haskey(Distributed.map_pid_wrkr, worker_id)
wrkr = Distributed.map_pid_wrkr[worker_id]
if isdefined(wrkr, :id) && isdefined(wrkr, :config) && isa(wrkr, Distributed.Worker)
if isdefined(wrkr.config, :userdata) && isa(wrkr.config.userdata, Dict)
userdata = wrkr.config.userdata
if haskey(userdata, "instanceid") && haskey(userdata, "scalesetname") && haskey(userdata, "resourcegroup") && haskey(userdata, "subscriptionid")
ss = ScaleSet(userdata["subscriptionid"], userdata["resourcegroup"], userdata["scalesetname"])
if haskey(instanceids, ss)
push!(instanceids[ss], userdata["instanceid"])
else
instanceids[ss] = [userdata["instanceid"]]
end
end
end
end
else
@warn "worker with id '$worker_id' is not in Distributed.map_pid_wrkr"
end
end
@debug "deciding if there are scaleset vms to be pruned, scaleset instance ids found in Julia cluster: $instanceids, scalesets: $_scalesets"

for scaleset in keys(_scalesets)
# update scale-set instances
Expand Down

0 comments on commit e1e184e

Please sign in to comment.