diff --git a/demo/detached.jl b/demo/detached.jl index abcf4930..2573672a 100644 --- a/demo/detached.jl +++ b/demo/detached.jl @@ -67,7 +67,8 @@ sessionbundle!( job4 = @detach vm(;session=sessionbundle(:management),persist=true) begin using Distributed, AzManagers, AzStorage, AzSessions - addprocs("cbox04", 4; session=sessionbundle(:management)) + using AzManagers: addprocs_azure + addprocs_azure("cbox04", 4; session=sessionbundle(:management)) for pid in workers() hostname = remotecall_fetch(gethostname, pid) @@ -89,4 +90,4 @@ status(job4) wait(job4) read(job4) -rmproc(job4.vm; session=sessionbundle(:management)) \ No newline at end of file +rmproc(job4.vm; session=sessionbundle(:management)) diff --git a/docs/src/examples.md b/docs/src/examples.md index 21d7fbdb..a09e58aa 100644 --- a/docs/src/examples.md +++ b/docs/src/examples.md @@ -5,8 +5,10 @@ ```julia using Distributed, AzManagers +using AzManagers: addprocs_azure + # add 10 instances to an Azure scale set with the "cbox08" scale-set template. -addprocs("cbox08", 10) +addprocs_azure("cbox08", 10) # monitor the workers as they join the Julia cluster. while nprocs() == nworkers() || nworkers() < 10 @@ -16,7 +18,7 @@ end length(workers()) # 10 # add 10 more instances, waiting for the cluster to be stable before returning control to the caller. -addprocs("cbox08", 10; waitfor=true) +addprocs_azure("cbox08", 10; waitfor=true) length(workers()) # 20 # remove the first 5 instances @@ -26,14 +28,14 @@ rmprocs(workers()[1:5]) rmprocs(workers()) # list self-doc for AzManagers addprocs method: -?addprocs +?addprocs_azure # make a scale-set from SPOT VMs -addprocs("cbox08", 10; group="myspotgroup", spot=true) +addprocs_azure("cbox08", 10; group="myspotgroup", spot=true) # wait for at-least one worker to be available while nprocs() - nworkers() == 0; yield(); end # check to see if Azure is shutting down (pre-empt'ing) the worker remotecall_fetch(preempted, workers()[1]) -``` \ No newline at end of file +``` diff --git a/docs/src/index.md b/docs/src/index.md index 8c5899b9..8a65bee5 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -6,16 +6,18 @@ uses a user-defined template. For example, we can create a new julia cluster co 5 VMs, and where the scale-set is described by the template `"myscaleset"` as follows, ```julia using AzManagers, Distributed -addprocs("myscaleset", 5) +using AzManagers: addprocs_azure +addprocs_azure("myscaleset", 5) ``` Note that `addprocs` will return as soon as the provisioning is initialized. Subsequently, workers -will add themselves to the Julia cluster as they become available. This is similar to the "elastic.jl" +will add themselves to the Julia cluster as they become available. This is similar to the "elastic.jl" cluster manager in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl), and allows AzManagers to behave dynamically. To wait for the cluster to be completely up use the `waitfor` argument. For example, ```julia using AzManagers, Distributed -addprocs("myscaleset", 5; waitfor=true) +using AzManagers: addprocs_azure +addprocs_azure("myscaleset", 5; waitfor=true) ``` In this case `addprocs` will return only once the 5 workers have joined the cluster. @@ -27,7 +29,7 @@ for more information. AzManagers does not provide scale-set templates since they will depend on your specific Azure setup. However, we provide a means to create the templates. Please see the section -[Scale-set templates](# Scale-set templates) for more information. +[Scale-set templates](# Scale-set templates) for more information. AzManagers requires a user provided Azure resource group and subscription, as well as information about the ssh user for the scale-set VMs. AzManagers uses a manifest file to store this information. @@ -60,7 +62,7 @@ myscaleset = AzManagers.build_sstemplate("myvm", AzManagers.save_template_scaleset("myscaleset", myscaleset) ``` The above code will save the template to the json file, `~/.azmanagers/templates_scaleset.json`. -Subsequently, `addprocs("myscaleset", 5)` will query the json file for the VM template. One can +Subsequently, `addprocs_azure("myscaleset", 5)` will query the json file for the VM template. One can repeat this process, populating `~/.azmanagers/templates_scaleset.json` with a variety of templates for a variety of machine types. @@ -82,7 +84,7 @@ AzManagers.write_manifest(; resourcegroup = "my-resource-group", subscriptionid = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", ssh_user = "username") -``` +``` One can also specify the locations of the public and private ssh keys which AzManagers will use to establish ssh connections to the cluster machines. This connection is used for the initial set-up of the cluster, and for sending log messages back to the master process. By default, @@ -100,7 +102,8 @@ such as azure log analytics might be useful. At this time, AzManagers does not logger; but, if one had such a logger (e.g. MyAzureLogger), then one would do: ``` using AzManagers, Distributed -addprocs("myscaleset",5) +using AzManagers: addprocs_azure +addprocs_azure("myscaleset",5) @everywhere using Logging, MyAzureLogger @everywhere global_logger(MyAzureLogger()) ``` @@ -143,7 +146,8 @@ variablebundle!(session = AzSession()) myvm = addproc("myvm") detached_job = @detachat myvm begin using Distributed, AzManagers - addprocs("myscaleset", 5; session=variablebundle(:session)) + using AzManagers: addprocs_azure + addprocs_azure("myscaleset", 5; session=variablebundle(:session)) for pid in workers() remotecall_fetch(println, "hello from pid=$(myid())") end @@ -169,7 +173,8 @@ using Pkg Pkg.instantiate(".") Pkg.add("AzManagers") Pkg.add("Jets") -addprocs("cbox16",2;customenv=true) +using AzManagers: addprocs_azure +addprocs_azure("cbox16",2;customenv=true) ``` Now, when worker VMs are initialized, they will have the software stack defined by the current project. Please note that this can add significant diff --git a/docs/src/reference.md b/docs/src/reference.md index 926ebf45..1094dfe2 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -2,7 +2,7 @@ ## Julia clusters ```@docs -addprocs +addprocs_azure preempted ``` @@ -27,4 +27,4 @@ AzManagers.write_manifest AzManagers.save_template_nic AzManagers.save_template_scaleset AzManagers.save_template_vm -``` \ No newline at end of file +``` diff --git a/src/AzManagers.jl b/src/AzManagers.jl index c33786f4..42fba9ce 100644 --- a/src/AzManagers.jl +++ b/src/AzManagers.jl @@ -150,11 +150,11 @@ function azrequest(rtype, verbose, url, headers, body=nothing) else r = HTTP.request(rtype, url, headers, body; verbose=verbose, options...) end - + if r.status >= 300 throw(HTTP.Exceptions.StatusError(r.status, r.request.method, r.request.target, r)) end - + r end @@ -512,7 +512,7 @@ function Distributed.addprocs(manager::AzManager; sockets) Distributed.init_multi() Distributed.cluster_mgmt_from_master_check() lock(Distributed.worker_lock) - pids = Distributed.addprocs_locked(manager; sockets) + pids = addprocs_locked(manager; sockets) catch e @debug "AzManagers, error processing pending connection" logerror(e, Logging.Debug) @@ -526,7 +526,7 @@ function addprocs_with_timeout(manager; sockets) # Distributed.setup_launched_worker also uses Distributed.worker_timeout, so we add a grace period # to allow for the Distributed.setup_launched_worker to hit its timeout. timeout = Distributed.worker_timeout() + 30 - tsk_addprocs = @async addprocs(manager; sockets) + tsk_addprocs = @async addprocs_azure(manager; sockets) tic = time() pids = [] interrupted = false @@ -743,7 +743,7 @@ function nthreads_filter(nthreads) end """ - addprocs(template, ninstances[; kwargs...]) + addprocs_azure(template, ninstances[; kwargs...]) Add Azure scale set instances where template is either a dictionary produced via the `AzManagers.build_sstemplate` method or a string corresponding to a template stored in `~/.azmanagers/templates_scaleset.json.` @@ -783,10 +783,10 @@ method or a string corresponding to a template stored in `~/.azmanagers/template * `use_lvm=false` For SKUs that have 1 or more nvme disks, combines all disks as a single mount point /scratch vs /scratch, /scratch1, /scratch2, etc.. # Notes -[1] If `addprocs` is called from an Azure VM, then the default `imagename`,`imageversion` are the +[1] If `addprocs_azure` is called from an Azure VM, then the default `imagename`,`imageversion` are the image/version the VM was built with; otherwise, it is the latest version of the image specified in the scale-set template. [2] Interactive threads are supported beginning in version 1.9 of Julia. For earlier versions, the default for `julia_num_threads` is `Threads.nthreads()`. -[3] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs` returns, use `workers()` +[3] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs_azure` returns, use `workers()` to monitor the size of the cluster. [4] This is inteneded for use with Devito. In particular, it allows Devito to gain performance by using MPI to do domain decomposition using MPI within a single VM. If `mpi_ranks_per_worker=0`, then MPI is not @@ -794,7 +794,7 @@ used on the Julia workers. This feature makes use of package extensions, meanin that `using MPI` is somewhere in your calling script. [5] This may result in a re-boot of the VMs """ -function Distributed.addprocs(template::Dict, n::Int; +function addprocs_azure(template::Dict, n::Int; subscriptionid = "", resourcegroup = "", sigimagename = "", @@ -863,13 +863,13 @@ function Distributed.addprocs(template::Dict, n::Int; nothing end -function Distributed.addprocs(template::AbstractString, n::Int; kwargs...) +function addprocs_azure(template::AbstractString, n::Int; kwargs...) isfile(templates_filename_scaleset()) || error("scale-set template file does not exist. See `AzManagers.save_template_scaleset`") templates_scaleset = JSON.parse(read(templates_filename_scaleset(), String)) haskey(templates_scaleset, template) || error("scale-set template file does not contain a template with name: $template. See `AzManagers.save_template_scaleset`") - addprocs(templates_scaleset[template], n; kwargs...) + addprocs_azure(templates_scaleset[template], n; kwargs...) end function Distributed.launch(manager::AzManager, params::Dict, launched::Array, c::Condition) @@ -1524,7 +1524,7 @@ function scaleset_image(manager::AzManager, sigimagename, sigimageversion, image k_galleries = findfirst(x->x=="galleries", _image) gallery = k_galleries == nothing ? "" : _image[k_galleries+1] different_image = true - + if sigimagename == "" && imagename == "" different_image = false k_images = findfirst(x->x=="images", _image) @@ -1537,7 +1537,7 @@ function scaleset_image(manager::AzManager, sigimagename, sigimageversion, image (sigimagename != "" && gallery == "") && error("sigimagename provided, but gallery name not found in template") (sigimagename == "" && imagename == "") && error("Unable to determine 'image gallery name' or 'image name'") - + if imagename == "" && sigimageversion == "" k = findfirst(x->x=="versions", _image) if k != nothing && !different_image @@ -1766,7 +1766,7 @@ function build_lvm() if isfile("/usr/sbin/azure_nvme.sh") @info "Building scratch.." run(`sudo bash /usr/sbin/azure_nvme.sh`) - else + else @warn "No scratch nvme script found!" end end @@ -1789,7 +1789,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d if isfile(joinpath(homedir(), ".gitconfig")) gitconfig = read(joinpath(homedir(), ".gitconfig"), String) cmd *= """ - + sudo su - $user << EOF echo '$gitconfig' > ~/.gitconfig EOF @@ -1798,7 +1798,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d if isfile(joinpath(homedir(), ".git-credentials")) gitcredentials = rstrip(read(joinpath(homedir(), ".git-credentials"), String), [' ','\n']) cmd *= """ - + sudo su - $user << EOF echo "$gitcredentials" > ~/.git-credentials chmod 600 ~/.git-credentials @@ -1821,7 +1821,7 @@ function buildstartupscript(manager::AzManager, exename::String, user::String, d project_compressed, manifest_compressed, localpreferences_compressed = compress_environment(julia_environment_folder) cmd *= """ - + sudo su - $user << 'EOF' $exename -e 'using AzManagers; AzManagers.decompress_environment("$project_compressed", "$manifest_compressed", "$localpreferences_compressed", "$remote_julia_environment_name")' $exename -e 'using Pkg; path=joinpath(Pkg.envdir(), "$remote_julia_environment_name"); Pkg.Registry.update(); Pkg.activate(path); (retry(Pkg.instantiate))(); Pkg.precompile()' @@ -1881,7 +1881,7 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp """ if use_lvm - if mpi_ranks_per_worker == 0 + if mpi_ranks_per_worker == 0 shell_cmds *= """ attempt_number=1 @@ -1889,7 +1889,7 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp exit_code=0 while [ \$attempt_number -le \$maximum_attempts ]; do $exename $_exeflags -e '$(juliaenvstring)try using AzManagers; catch; using Pkg; Pkg.instantiate(); using AzManagers; end; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.build_lvm(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi, "$_exeflags")' - + exit_code=\$? echo "attempt \$attempt_number is done with exit code \$exit_code..." @@ -1956,7 +1956,7 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp --$boundary-- """ else - if mpi_ranks_per_worker == 0 + if mpi_ranks_per_worker == 0 shell_cmds *= """ attempt_number=1 @@ -1964,7 +1964,7 @@ function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mp exit_code=0 while [ \$attempt_number -le \$maximum_attempts ]; do $exename $_exeflags -e '$(juliaenvstring)try using AzManagers; catch; using Pkg; Pkg.instantiate(); using AzManagers; end; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi, "$_exeflags")' - + exit_code=\$? echo "attempt \$attempt_number is done with exit code \$exit_code..." @@ -2340,7 +2340,7 @@ function scaleset_create_or_update(manager::AzManager, user, subscriptionid, res key = Dict("path" => "/home/$user/.ssh/authorized_keys", "keyData" => read(ssh_key, String)) push!(_template["properties"]["virtualMachineProfile"]["osProfile"]["linuxConfiguration"]["ssh"]["publicKeys"], key) - + cmd = buildstartupscript_cluster(manager, spot, ppi, mpi_ranks_per_worker, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads, omp_num_threads, exename, exeflags, env, user, template["tempdisk"], custom_environment, use_lvm) _cmd = base64encode(cmd) @@ -2948,7 +2948,7 @@ function addproc(vm_template::Dict, nic_template=nothing; nic_dic = JSON.parse(String(nic_r.body)) nic_state = nic_dic["properties"]["provisioningState"] end - catch e + catch e @error "failed to get $nicname status" end @@ -2984,7 +2984,7 @@ function addproc(vm_template::Dict, nic_template=nothing; else cmd,_ = buildstartupscript(manager, exename, user, disk, customenv, use_lvm) end - + _cmd = base64encode(cmd) if length(_cmd) > 64_000 @@ -3243,11 +3243,11 @@ function detached_service_wait(vm, custom_environment) @error "reached timeout ($timeout seconds) while waiting for $waitfor to start." throw(DetachedServiceTimeoutException(vm)) end - + write(stdout, spin(spincount, elapsed_time)*", waiting for $waitfor on VM, $(vm["name"]):$(vm["port"]), to start.\r") flush(stdout) spincount = spincount == 4 ? 1 : spincount + 1 - + sleep(0.5) end write(stdout, spin(5, elapsed_time)*", waiting for $waitfor on VM, $(vm["name"]):$(vm["port"]), to start.\r") diff --git a/src/templates.jl b/src/templates.jl index 80b4b778..0fb27eaa 100644 --- a/src/templates.jl +++ b/src/templates.jl @@ -29,7 +29,7 @@ end """ AzManagers.build_sstemplate(name; kwargs...) -returns a dictionary that is an Azure scaleset template for use in `addprocs` or for saving +returns a dictionary that is an Azure scaleset template for use in `addprocs_azure` or for saving to the `~/.azmanagers` folder. # required key-word arguments @@ -260,7 +260,7 @@ or written to AzManagers.jl configuration files. * `location` Azure data center location * `resourcegroup` Azure resource group where the VM will reside * `imagegallery` Azure shared image gallery name -* `imagename` Azure image name that is in the shared image gallery +* `imagename` Azure image name that is in the shared image gallery * `vmsize` Azure vm type, e.g. "Standard_D8s_v3" # Optional keyword arguments @@ -273,7 +273,7 @@ or written to AzManagers.jl configuration files. * `tempdisk = "sudo mkdir -m 777 /mnt/scratch\nln -s /mnt/scratch /scratch"` cloud-init commands used to mount or link to temporary disk * `tags = Dict("azure_tag_name" => "some_tag_value")` Optional tags argument for resource * `encryption_at_host = false` Optional argument for enabling encryption at host -* `default_nic = ""` Optional argument for inserting "default_nic" as a key +* `default_nic = ""` Optional argument for inserting "default_nic" as a key # Notes [1] Each datadisk is a Dictionary. For example, diff --git a/test/runtests.jl b/test/runtests.jl index 004d96d2..25511067 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,6 @@ using Distributed, AzManagers, Random, TOML, Test, HTTP, AzSessions, JSON, Pkg using MPI +using AzManagers: addprocs_azure session = AzSession(;protocal=AzClientCredentials) @@ -19,10 +20,10 @@ error: No outbound connectivity configured for virtual machine .... Please attach standard load balancer or public IP address to VM, create NAT gateway or configure user-defined routes (UDR) in the subnet. Learn more at aka.ms/defaultoutboundaccess. =# -@testset "AzManagers, addprocs, ppi=$ppi, flexible=$flexible" for ppi in (1,), flexible in (false,#=true=#) +@testset "AzManagers, addprocs_azure, ppi=$ppi, flexible=$flexible" for ppi in (1,), flexible in (false,#=true=#) ninstances = 4 group = "test$(randstring('a':'z',4))" - + # Set up iteration vars url = "https://management.azure.com/subscriptions/$subscriptionid/resourceGroups/$resourcegroup/providers/Microsoft.Compute/virtualMachineScaleSets/$group?api-version=2019-12-01" tppi = ppi*ninstances # Total number of Julia processes in the entire scale set @@ -31,7 +32,7 @@ or configure user-defined routes (UDR) in the subnet. Learn more at aka.ms/defau # Unit Test 1 - Create scale set and start Julia processes # if flexible - addprocs(templatename, ninstances; + addprocs_azure(templatename, ninstances; waitfor = true, ppi, group, @@ -39,13 +40,13 @@ or configure user-defined routes (UDR) in the subnet. Learn more at aka.ms/defau spot = true, spot_base_regular_priority_count = 2) else - addprocs(templatename, ninstances; + addprocs_azure(templatename, ninstances; waitfor = true, ppi, group, session) end - + # Verify that the scale set is present _r = HTTP.request("GET", url, Dict("Authorization"=>"Bearer $(token(session))"); verbose=0) @test _r.status == 200 @@ -67,7 +68,7 @@ or configure user-defined routes (UDR) in the subnet. Learn more at aka.ms/defau unique_workers = unique(myworkers) @test length(unique_workers) == ninstances - for worker in myworkers + for worker in myworkers @test master != worker end @@ -109,10 +110,10 @@ or configure user-defined routes (UDR) in the subnet. Learn more at aka.ms/defau end end -@testset "addprocs, spot" begin +@testset "addprocs_azure, spot" begin group = "test$(randstring('a':'z',4))" julia_num_threads = VERSION >= v"1.9" ? "2,0" : "2" - addprocs(templatename, 1; waitfor = true, group, session, julia_num_threads) + addprocs_azure(templatename, 1; waitfor = true, group, session, julia_num_threads) @test remotecall_fetch(Threads.nthreads, workers()[1]) == 2 @@ -123,7 +124,7 @@ end group = "test$(randstring('a':'z',4))" julia_num_threads = VERSION >= v"1.9" ? "2,0" : "2" - addprocs(templatename, 1; waitfor = true, group, session, julia_num_threads, spot = true) + addprocs_azure(templatename, 1; waitfor = true, group, session, julia_num_threads, spot = true) @test remotecall_fetch(Threads.nthreads, workers()[1]) == 2 @@ -136,7 +137,7 @@ end group = "test$(randstring('a':'z',4))" julia_num_threads = VERSION >= v"1.9" ? "3,2" : "3" - addprocs(templatename, 1; waitfor = true, group, session, julia_num_threads, spot=true) + addprocs_azure(templatename, 1; waitfor = true, group, session, julia_num_threads, spot=true) @test remotecall_fetch(Threads.nthreads, workers()[1]) == 3 @@ -150,7 +151,7 @@ if VERSION >= v"1.9" @testset "spot eviction" begin group = "test$(randstring('a':'z',4))" julia_num_threads = "2,1" - addprocs(templatename, 2; waitfor = true, group, session, julia_num_threads, spot = true) + addprocs_azure(templatename, 2; waitfor = true, group, session, julia_num_threads, spot = true) AzManagers.simulate_spot_eviction(workers()[1]) @@ -202,7 +203,7 @@ end rmproc(testvm; session=session) end -@testset "environment, addprocs" begin +@testset "environment, addprocs_azure" begin mkpath("myproject") cd("myproject") Pkg.activate(".") @@ -217,7 +218,7 @@ end group = "test$(randstring('a':'z',4))" - addprocs(templatename, 1; waitfor=true, group=group, session=session, customenv=true) + addprocs_azure(templatename, 1; waitfor=true, group=group, session=session, customenv=true) @everywhere using Pkg pinfo = remotecall_fetch(Pkg.project, workers()[1]) @test contains(pinfo.path, "myproject") @@ -232,7 +233,7 @@ end end -@testset "tags, addprocs" begin +@testset "tags, addprocs_azure" begin mkpath("myproject") cd("myproject") Pkg.activate(".") @@ -255,7 +256,7 @@ end _template["tags"] = Dict("foo"=>"bar") end - addprocs(template, 1; waitfor=true, group=group, session=session) + addprocs_azure(template, 1; waitfor=true, group=group, session=session) _r = HTTP.request( "GET", @@ -367,12 +368,12 @@ end templates_scaleset = JSON.parse(read(AzManagers.templates_filename_scaleset(), String)) template = templates_scaleset[templatename] - - addprocs(template, 2; waitfor=true, group=group, session=session) + + addprocs_azure(template, 2; waitfor=true, group=group, session=session) wrkers = Distributed.map_pid_wrkr for i in workers() - userdata = wrkers[i].config.userdata + userdata = wrkers[i].config.userdata @info userdata name = get(userdata, "physical_hostname", "unknown") @@ -399,4 +400,4 @@ end @test name !== "unknown" && match(r"[A-Z0-9]", name) !== nothing rmproc(testvm; session=session) -end \ No newline at end of file +end