Skip to content

Commit

Permalink
Merge pull request #149 from ChevronETC/retrystart
Browse files Browse the repository at this point in the history
Fix retry logic
  • Loading branch information
samtkaplan authored Nov 30, 2023
2 parents 799c046 + d63f347 commit 0e6443b
Showing 1 changed file with 116 additions and 63 deletions.
179 changes: 116 additions & 63 deletions src/AzManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1030,10 +1030,17 @@ function logging()
nothing
end

if VERSION < v"1.7"
errormonitor = identity
end

function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
Distributed.init_multi()

close_stdin && close(stdin) # workers will not use it
if close_stdin # workers will not use it
redirect_stdin(devnull)
close(stdin)
end
stderr_to_stdout && redirect_stderr(stdout)

Distributed.init_worker(cookie)
Expand All @@ -1046,11 +1053,34 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo
sock = listen(interface, Distributed.LPROC.bind_port)
end

tsk_messages = nothing
@async while isopen(sock)
t = errormonitor(@async while isopen(sock)
client = accept(sock)
tsk_messages = Distributed.process_messages(client, client, true)
end

#=
We observe that a valid machine often receive UInt(0)'s instead
of the cookie. We do not know he cuase of this, but here we throw
an error which will be handled and rethrown, below, in the 'while true'
loop. This results in this function to throw, causing the 'azure_worker'
method to re-try joining the cluster.
The error handling is a little complicated here due to how the error
handling in 'Distributed.process_messages' works. In particular, we
read the cookie ourselves and, subsequently, pass 'false' as the
third argument to 'Distributed.process_messages'. This, in turn,
lets 'process_messages' skip its cookie read/check.
=#

cookie_from_master = read(client, Distributed.HDR_COOKIE_LEN)
if cookie_from_master[1] == 0x00
error("received cookie with at least one null character")
end

if String(cookie_from_master) != cookie
error("received invalid cookie.")
end

Distributed.process_messages(client, client, false)
end)
print(out, "julia_worker:") # print header
print(out, "$(string(Distributed.LPROC.bind_port))#") # print port
print(out, Distributed.LPROC.bind_addr)
Expand All @@ -1067,27 +1097,19 @@ function azure_worker_start(out::IO, cookie::AbstractString=readline(stdin); clo
manager = azmanager()
manager.worker_socket = out

while true
if tsk_messages != nothing
try
wait(tsk_messages)

#=
We throw an error regardless of whether the tsk_messages task completes
or throws an error. We throw when it complete due to the complex error
handling in the Distributed.process_messages method. We can be a bit
messy about process clean-up here since when we remove a worker from the
cluster, we delete the corresponding Azure VM.
=#
error("")
catch e
close(sock)
throw(e)
end
try
while true
Distributed.check_master_connect()
@info "message loop..."
wait(t)
istaskfailed(t) && fetch(t)
sleep(10)
end
sleep(10)
catch e
throw(e)
finally
close(sock)
end
close(sock)
end

function azure_worker(cookie, master_address, master_port, ppi, exeflags)
Expand All @@ -1100,6 +1122,7 @@ function azure_worker(cookie, master_address, master_port, ppi, exeflags)
=#
while true
itry += 1
local c
try
c = azure_worker_init(cookie, master_address, master_port, ppi, exeflags, 0)
azure_worker_start(c, cookie)
Expand All @@ -1109,6 +1132,12 @@ function azure_worker(cookie, master_address, master_port, ppi, exeflags)
if itry > 10
throw(e)
end
if @isdefined c
try
close(c)
catch
end
end
end
sleep(60)
end
Expand Down Expand Up @@ -1156,23 +1185,43 @@ end
# These methods are slightly modified versions of what is in the Julia distributed standard library
#
function azure_worker_mpi(cookie, master_address, master_port, ppi, exeflags)
MPI.Initialized() || MPI.Init()
itry = 0
while true
itry += 1
local c
try
MPI.Initialized() || MPI.Init()

comm = MPI.COMM_WORLD
mpi_size = MPI.Comm_size(comm)
mpi_rank = MPI.Comm_rank(comm)
comm = MPI.COMM_WORLD
mpi_size = MPI.Comm_size(comm)
mpi_rank = MPI.Comm_rank(comm)

local t
if mpi_rank == 0
c = azure_worker_init(cookie, master_address, master_port, ppi, exeflags, mpi_size)
t = @async start_worker_mpi_rank0(c, cookie)
else
t = @async message_handler_loop_mpi_rankN()
end
local t
if mpi_rank == 0
c = azure_worker_init(cookie, master_address, master_port, ppi, exeflags, mpi_size)
t = @async start_worker_mpi_rank0(c, cookie)
else
t = @async message_handler_loop_mpi_rankN()
end

MPI.Barrier(comm)
fetch(t)
MPI.Barrier(comm)
MPI.Barrier(comm)
fetch(t)
MPI.Barrier(comm)
catch e
@error "error starting worker, attempt $itry, cookie=$cookie, master_address=$master_address, master_port=$master_port, ppi=$ppi"
logerror(e, Logging.Error)
if itry > 10
throw(e)
end
if @isdefined c
try
close(c)
catch
end
end
end
sleep(60)
end
end

function process_messages_mpi_rank0(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
Expand Down Expand Up @@ -1336,10 +1385,13 @@ start_worker_mpi_rank0(cookie::AbstractString=readline(stdin); kwargs...) = star
function start_worker_mpi_rank0(out::IO, cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
Distributed.init_multi()

close_stdin && close(stdin) # workers will not use it
if close_stdin # workers will not use it
redirect_stdin(devnull)
close(stdin)
end
stderr_to_stdout && redirect_stderr(stdout)

init_worker(cookie)
Distributed.init_worker(cookie)
interface = IPv4(Distributed.LPROC.bind_addr)
if Distributed.LPROC.bind_port == 0
port_hint = 9000 + (getpid() % 1000)
Expand All @@ -1349,11 +1401,20 @@ function start_worker_mpi_rank0(out::IO, cookie::AbstractString=readline(stdin);
sock = listen(interface, Distributed.LPROC.bind_port)
end

tsk_messages = nothing
@async while isopen(sock)
t = errormonitor(@async while isopen(sock)
client = accept(sock)
tsk_messages = process_messages_mpi_rank0(client, client, true)
end

cookie_from_master = read(client, Distributed.HDR_COOKIE_LEN)
if cookie_from_master[1] == 0x00
error("received cookie with at least one null character")
end

if String(cookie_from_master) != cookie
error("received invalid cookie.")
end

process_messages_mpi_rank0(client, client, false)
end)
print(out, "julia_worker:") # print header
print(out, "$(string(Distributed.LPROC.bind_port))#") # print port
print(out, Distributed.LPROC.bind_addr)
Expand All @@ -1370,27 +1431,19 @@ function start_worker_mpi_rank0(out::IO, cookie::AbstractString=readline(stdin);
manager = azmanager()
manager.worker_socket = out

while true
if tsk_messages != nothing
try
wait(tsk_messages)

#=
We throw an error regardless of whether the tsk_messages task completes
or throws an error. We throw when it complete due to the complex error
handling in the Distributed.process_messages method. We can be a bit
messy about process clean-up here since when we remove a worker from the
cluster, we delete the corresponding Azure VM.
=#
error("")
catch e
close(sock)
throw(e)
end
try
while true
Distributed.check_master_connect()
@info "message loop..."
wait(t)
istaskfailed(t) && fetch(t)
sleep(10)
end
sleep(10)
catch e
throw(e)
finally
close(sock)
end
close(sock)
end

#
Expand Down

0 comments on commit 0e6443b

Please sign in to comment.