Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING: Don't pirate the Distributed.addprocs function; introduce a new AzManagers.addprocs_azure function instead #176

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions demo/detached.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ sessionbundle!(

job4 = @detach vm(;session=sessionbundle(:management),persist=true) begin
using Distributed, AzManagers, AzStorage, AzSessions
addprocs("cbox04", 4; session=sessionbundle(:management))
using AzManagers: addprocs_azure
addprocs_azure("cbox04", 4; session=sessionbundle(:management))

for pid in workers()
hostname = remotecall_fetch(gethostname, pid)
Expand All @@ -89,4 +90,4 @@ status(job4)
wait(job4)
read(job4)

rmproc(job4.vm; session=sessionbundle(:management))
rmproc(job4.vm; session=sessionbundle(:management))
12 changes: 7 additions & 5 deletions docs/src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
```julia
using Distributed, AzManagers

using AzManagers: addprocs_azure

# add 10 instances to an Azure scale set with the "cbox08" scale-set template.
addprocs("cbox08", 10)
addprocs_azure("cbox08", 10)

# monitor the workers as they join the Julia cluster.
while nprocs() == nworkers() || nworkers() < 10
Expand All @@ -16,7 +18,7 @@ end
length(workers()) # 10

# add 10 more instances, waiting for the cluster to be stable before returning control to the caller.
addprocs("cbox08", 10; waitfor=true)
addprocs_azure("cbox08", 10; waitfor=true)
length(workers()) # 20

# remove the first 5 instances
Expand All @@ -26,14 +28,14 @@ rmprocs(workers()[1:5])
rmprocs(workers())

# list self-doc for AzManagers addprocs method:
?addprocs
?addprocs_azure

# make a scale-set from SPOT VMs
addprocs("cbox08", 10; group="myspotgroup", spot=true)
addprocs_azure("cbox08", 10; group="myspotgroup", spot=true)

# wait for at-least one worker to be available
while nprocs() - nworkers() == 0; yield(); end

# check to see if Azure is shutting down (pre-empt'ing) the worker
remotecall_fetch(preempted, workers()[1])
```
```
23 changes: 14 additions & 9 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ uses a user-defined template. For example, we can create a new julia cluster co
5 VMs, and where the scale-set is described by the template `"myscaleset"` as follows,
```julia
using AzManagers, Distributed
addprocs("myscaleset", 5)
using AzManagers: addprocs_azure
addprocs_azure("myscaleset", 5)
```
Note that `addprocs` will return as soon as the provisioning is initialized. Subsequently, workers
will add themselves to the Julia cluster as they become available. This is similar to the "elastic.jl"
will add themselves to the Julia cluster as they become available. This is similar to the "elastic.jl"
cluster manager in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl), and allows
AzManagers to behave dynamically. To wait for the cluster to be completely up use the `waitfor` argument.
For example,
```julia
using AzManagers, Distributed
addprocs("myscaleset", 5; waitfor=true)
using AzManagers: addprocs_azure
addprocs_azure("myscaleset", 5; waitfor=true)
```
In this case `addprocs` will return only once the 5 workers have joined the cluster.

Expand All @@ -27,7 +29,7 @@ for more information.

AzManagers does not provide scale-set templates since they will depend on your specific Azure
setup. However, we provide a means to create the templates. Please see the section
[Scale-set templates](# Scale-set templates) for more information.
[Scale-set templates](# Scale-set templates) for more information.

AzManagers requires a user provided Azure resource group and subscription, as well as information
about the ssh user for the scale-set VMs. AzManagers uses a manifest file to store this information.
Expand Down Expand Up @@ -60,7 +62,7 @@ myscaleset = AzManagers.build_sstemplate("myvm",
AzManagers.save_template_scaleset("myscaleset", myscaleset)
```
The above code will save the template to the json file, `~/.azmanagers/templates_scaleset.json`.
Subsequently, `addprocs("myscaleset", 5)` will query the json file for the VM template. One can
Subsequently, `addprocs_azure("myscaleset", 5)` will query the json file for the VM template. One can
repeat this process, populating `~/.azmanagers/templates_scaleset.json` with a variety of templates
for a variety of machine types.

Expand All @@ -82,7 +84,7 @@ AzManagers.write_manifest(;
resourcegroup = "my-resource-group",
subscriptionid = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
ssh_user = "username")
```
```
One can also specify the locations of the public and private ssh keys which AzManagers will use
to establish ssh connections to the cluster machines. This connection is used for the initial
set-up of the cluster, and for sending log messages back to the master process. By default,
Expand All @@ -100,7 +102,8 @@ such as azure log analytics might be useful. At this time, AzManagers does not
logger; but, if one had such a logger (e.g. MyAzureLogger), then one would do:
```
using AzManagers, Distributed
addprocs("myscaleset",5)
using AzManagers: addprocs_azure
addprocs_azure("myscaleset",5)
@everywhere using Logging, MyAzureLogger
@everywhere global_logger(MyAzureLogger())
```
Expand Down Expand Up @@ -143,7 +146,8 @@ variablebundle!(session = AzSession())
myvm = addproc("myvm")
detached_job = @detachat myvm begin
using Distributed, AzManagers
addprocs("myscaleset", 5; session=variablebundle(:session))
using AzManagers: addprocs_azure
addprocs_azure("myscaleset", 5; session=variablebundle(:session))
for pid in workers()
remotecall_fetch(println, "hello from pid=$(myid())")
end
Expand All @@ -169,7 +173,8 @@ using Pkg
Pkg.instantiate(".")
Pkg.add("AzManagers")
Pkg.add("Jets")
addprocs("cbox16",2;customenv=true)
using AzManagers: addprocs_azure
addprocs_azure("cbox16",2;customenv=true)
```
Now, when worker VMs are initialized, they will have the software stack
defined by the current project. Please note that this can add significant
Expand Down
4 changes: 2 additions & 2 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Julia clusters
```@docs
addprocs
addprocs_azure
preempted
```

Expand All @@ -27,4 +27,4 @@ AzManagers.write_manifest
AzManagers.save_template_nic
AzManagers.save_template_scaleset
AzManagers.save_template_vm
```
```
50 changes: 25 additions & 25 deletions src/AzManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ function azrequest(rtype, verbose, url, headers, body=nothing)
else
r = HTTP.request(rtype, url, headers, body; verbose=verbose, options...)
end

if r.status >= 300
throw(HTTP.Exceptions.StatusError(r.status, r.request.method, r.request.target, r))
end

r
end

Expand Down Expand Up @@ -512,7 +512,7 @@ function Distributed.addprocs(manager::AzManager; sockets)
Distributed.init_multi()
Distributed.cluster_mgmt_from_master_check()
lock(Distributed.worker_lock)
pids = Distributed.addprocs_locked(manager; sockets)
pids = addprocs_locked(manager; sockets)
catch e
@debug "AzManagers, error processing pending connection"
logerror(e, Logging.Debug)
Expand All @@ -526,7 +526,7 @@ function addprocs_with_timeout(manager; sockets)
# Distributed.setup_launched_worker also uses Distributed.worker_timeout, so we add a grace period
# to allow for the Distributed.setup_launched_worker to hit its timeout.
timeout = Distributed.worker_timeout() + 30
tsk_addprocs = @async addprocs(manager; sockets)
tsk_addprocs = @async addprocs_azure(manager; sockets)
tic = time()
pids = []
interrupted = false
Expand Down Expand Up @@ -743,7 +743,7 @@ function nthreads_filter(nthreads)
end

"""
addprocs(template, ninstances[; kwargs...])
addprocs_azure(template, ninstances[; kwargs...])

Add Azure scale set instances where template is either a dictionary produced via the `AzManagers.build_sstemplate`
method or a string corresponding to a template stored in `~/.azmanagers/templates_scaleset.json.`
Expand Down Expand Up @@ -783,18 +783,18 @@ method or a string corresponding to a template stored in `~/.azmanagers/template
* `use_lvm=false` For SKUs that have 1 or more nvme disks, combines all disks as a single mount point /scratch vs /scratch, /scratch1, /scratch2, etc..

# Notes
[1] If `addprocs` is called from an Azure VM, then the default `imagename`,`imageversion` are the
[1] If `addprocs_azure` is called from an Azure VM, then the default `imagename`,`imageversion` are the
image/version the VM was built with; otherwise, it is the latest version of the image specified in the scale-set template.
[2] Interactive threads are supported beginning in version 1.9 of Julia. For earlier versions, the default for `julia_num_threads` is `Threads.nthreads()`.
[3] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs` returns, use `workers()`
[3] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs_azure` returns, use `workers()`
to monitor the size of the cluster.
[4] This is inteneded for use with Devito. In particular, it allows Devito to gain performance by using
MPI to do domain decomposition using MPI within a single VM. If `mpi_ranks_per_worker=0`, then MPI is not
used on the Julia workers. This feature makes use of package extensions, meaning that you need to ensure
that `using MPI` is somewhere in your calling script.
[5] This may result in a re-boot of the VMs
"""
function Distributed.addprocs(template::Dict, n::Int;
function addprocs_azure(template::Dict, n::Int;
subscriptionid = "",
resourcegroup = "",
sigimagename = "",
Expand Down Expand Up @@ -863,13 +863,13 @@ function Distributed.addprocs(template::Dict, n::Int;
nothing
end

function Distributed.addprocs(template::AbstractString, n::Int; kwargs...)
function addprocs_azure(template::AbstractString, n::Int; kwargs...)
isfile(templates_filename_scaleset()) || error("scale-set template file does not exist. See `AzManagers.save_template_scaleset`")

templates_scaleset = JSON.parse(read(templates_filename_scaleset(), String))
haskey(templates_scaleset, template) || error("scale-set template file does not contain a template with name: $template. See `AzManagers.save_template_scaleset`")

addprocs(templates_scaleset[template], n; kwargs...)
addprocs_azure(templates_scaleset[template], n; kwargs...)
end

function Distributed.launch(manager::AzManager, params::Dict, launched::Array, c::Condition)
Expand Down Expand Up @@ -1524,7 +1524,7 @@ function scaleset_image(manager::AzManager, sigimagename, sigimageversion, image
k_galleries = findfirst(x->x=="galleries", _image)
gallery = k_galleries == nothing ? "" : _image[k_galleries+1]
different_image = true

if sigimagename == "" && imagename == ""
different_image = false
k_images = findfirst(x->x=="images", _image)
Expand All @@ -1537,7 +1537,7 @@ function scaleset_image(manager::AzManager, sigimagename, sigimageversion, image

(sigimagename != "" && gallery == "") && error("sigimagename provided, but gallery name not found in template")
(sigimagename == "" && imagename == "") && error("Unable to determine 'image gallery name' or 'image name'")

if imagename == "" && sigimageversion == ""
k = findfirst(x->x=="versions", _image)
if k != nothing && !different_image
Expand Down Expand Up @@ -1766,7 +1766,7 @@ function build_lvm()
if isfile("/usr/sbin/azure_nvme.sh")
@info "Building scratch.."
run(`sudo bash /usr/sbin/azure_nvme.sh`)
else
else
@warn "No scratch nvme script found!"
end
end
Expand All @@ -1789,7 +1789,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d
if isfile(joinpath(homedir(), ".gitconfig"))
gitconfig = read(joinpath(homedir(), ".gitconfig"), String)
cmd *= """

sudo su - $user << EOF
echo '$gitconfig' > ~/.gitconfig
EOF
Expand All @@ -1798,7 +1798,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d
if isfile(joinpath(homedir(), ".git-credentials"))
gitcredentials = rstrip(read(joinpath(homedir(), ".git-credentials"), String), [' ','\n'])
cmd *= """

sudo su - $user << EOF
echo "$gitcredentials" > ~/.git-credentials
chmod 600 ~/.git-credentials
Expand All @@ -1821,7 +1821,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d
project_compressed, manifest_compressed, localpreferences_compressed = compress_environment(julia_environment_folder)

cmd *= """

sudo su - $user << 'EOF'
$exename -e 'using AzManagers; AzManagers.decompress_environment("$project_compressed", "$manifest_compressed", "$localpreferences_compressed", "$remote_julia_environment_name")'
$exename -e 'using Pkg; path=joinpath(Pkg.envdir(), "$remote_julia_environment_name"); Pkg.Registry.update(); Pkg.activate(path); (retry(Pkg.instantiate))(); Pkg.precompile()'
Expand Down Expand Up @@ -1881,15 +1881,15 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp
"""

if use_lvm
if mpi_ranks_per_worker == 0
if mpi_ranks_per_worker == 0
shell_cmds *= """

attempt_number=1
maximum_attempts=5
exit_code=0
while [ \$attempt_number -le \$maximum_attempts ]; do
$exename $_exeflags -e '$(juliaenvstring)try using AzManagers; catch; using Pkg; Pkg.instantiate(); using AzManagers; end; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.build_lvm(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi, "$_exeflags")'

exit_code=\$?
echo "attempt \$attempt_number is done with exit code \$exit_code..."

Expand Down Expand Up @@ -1956,15 +1956,15 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp
--$boundary--
"""
else
if mpi_ranks_per_worker == 0
if mpi_ranks_per_worker == 0
shell_cmds *= """

attempt_number=1
maximum_attempts=5
exit_code=0
while [ \$attempt_number -le \$maximum_attempts ]; do
$exename $_exeflags -e '$(juliaenvstring)try using AzManagers; catch; using Pkg; Pkg.instantiate(); using AzManagers; end; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi, "$_exeflags")'

exit_code=\$?
echo "attempt \$attempt_number is done with exit code \$exit_code..."

Expand Down Expand Up @@ -2340,7 +2340,7 @@ function scaleset_create_or_update(manager::AzManager, user, subscriptionid, res

key = Dict("path" => "/home/$user/.ssh/authorized_keys", "keyData" => read(ssh_key, String))
push!(_template["properties"]["virtualMachineProfile"]["osProfile"]["linuxConfiguration"]["ssh"]["publicKeys"], key)

cmd = buildstartupscript_cluster(manager, spot, ppi, mpi_ranks_per_worker, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads, omp_num_threads, exename, exeflags, env, user, template["tempdisk"], custom_environment, use_lvm)
_cmd = base64encode(cmd)

Expand Down Expand Up @@ -2948,7 +2948,7 @@ function addproc(vm_template::Dict, nic_template=nothing;
nic_dic = JSON.parse(String(nic_r.body))
nic_state = nic_dic["properties"]["provisioningState"]
end
catch e
catch e
@error "failed to get $nicname status"
end

Expand Down Expand Up @@ -2984,7 +2984,7 @@ function addproc(vm_template::Dict, nic_template=nothing;
else
cmd,_ = buildstartupscript(manager, exename, user, disk, customenv, use_lvm)
end

_cmd = base64encode(cmd)

if length(_cmd) > 64_000
Expand Down Expand Up @@ -3243,11 +3243,11 @@ function detached_service_wait(vm, custom_environment)
@error "reached timeout ($timeout seconds) while waiting for $waitfor to start."
throw(DetachedServiceTimeoutException(vm))
end

write(stdout, spin(spincount, elapsed_time)*", waiting for $waitfor on VM, $(vm["name"]):$(vm["port"]), to start.\r")
flush(stdout)
spincount = spincount == 4 ? 1 : spincount + 1

sleep(0.5)
end
write(stdout, spin(5, elapsed_time)*", waiting for $waitfor on VM, $(vm["name"]):$(vm["port"]), to start.\r")
Expand Down
6 changes: 3 additions & 3 deletions src/templates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ end
"""
AzManagers.build_sstemplate(name; kwargs...)

returns a dictionary that is an Azure scaleset template for use in `addprocs` or for saving
returns a dictionary that is an Azure scaleset template for use in `addprocs_azure` or for saving
to the `~/.azmanagers` folder.

# required key-word arguments
Expand Down Expand Up @@ -260,7 +260,7 @@ or written to AzManagers.jl configuration files.
* `location` Azure data center location
* `resourcegroup` Azure resource group where the VM will reside
* `imagegallery` Azure shared image gallery name
* `imagename` Azure image name that is in the shared image gallery
* `imagename` Azure image name that is in the shared image gallery
* `vmsize` Azure vm type, e.g. "Standard_D8s_v3"

# Optional keyword arguments
Expand All @@ -273,7 +273,7 @@ or written to AzManagers.jl configuration files.
* `tempdisk = "sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch"` cloud-init commands used to mount or link to temporary disk
* `tags = Dict("azure_tag_name" => "some_tag_value")` Optional tags argument for resource
* `encryption_at_host = false` Optional argument for enabling encryption at host
* `default_nic = ""` Optional argument for inserting "default_nic" as a key
* `default_nic = ""` Optional argument for inserting "default_nic" as a key

# Notes
[1] Each datadisk is a Dictionary. For example,
Expand Down
Loading