Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 75 additions & 57 deletions bluefog/common/topology_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,18 @@ def MeshGrid2DGraph(size: int, shape: Optional[Tuple[int, int]] = None) -> nx.Di
i = int(np.sqrt(size))
while size % i != 0:
i -= 1
shape = (i, size//i)
shape = (i, size // i)
nrow, ncol = shape
assert size == nrow*ncol, "The shape doesn't match the size provided."
assert size == nrow * ncol, "The shape doesn't match the size provided."
topo = np.zeros((size, size))
for i in range(size):
topo[i][i] = 1.0
if (i+1) % ncol != 0:
topo[i][i+1] = 1.0
topo[i+1][i] = 1.0
if i+ncol < size:
topo[i][i+ncol] = 1.0
topo[i+ncol][i] = 1.0
if (i + 1) % ncol != 0:
topo[i][i + 1] = 1.0
topo[i + 1][i] = 1.0
if i + ncol < size:
topo[i][i + ncol] = 1.0
topo[i + ncol][i] = 1.0

# According to Hasting rule (Policy 1) in https://arxiv.org/pdf/1702.05122.pdf
# The neighbor definition in the paper is different from our implementation,
Expand All @@ -204,9 +204,10 @@ def MeshGrid2DGraph(size: int, shape: Optional[Tuple[int, int]] = None) -> nx.Di
for i in range(size):
for j in topo_neighbor_with_self[i]:
if i != j:
topo[i][j] = 1.0/max(len(topo_neighbor_with_self[i]),
len(topo_neighbor_with_self[j]))
topo[i][i] = 2.0-topo[i].sum()
topo[i][j] = 1.0 / max(
len(topo_neighbor_with_self[i]), len(topo_neighbor_with_self[j])
)
topo[i][i] = 2.0 - topo[i].sum()
G = nx.from_numpy_array(topo, create_using=nx.DiGraph)
return G

Expand Down Expand Up @@ -253,20 +254,23 @@ def RingGraph(size: int, connect_style: int = 0) -> nx.DiGraph:
>>> nx.draw_circular(G)
"""
assert size > 0
assert connect_style >= 0 and connect_style <= 2, \
"connect_style has to be int between 0 and 2, where 1 " \
assert connect_style >= 0 and connect_style <= 2, (
"connect_style has to be int between 0 and 2, where 1 "
"for bi-connection, 1 for left connection, 2 for right connection."
)
if size == 1:
return nx.from_numpy_array(np.array([[1.0]]), create_using=nx.DiGraph)
if size == 2:
return nx.from_numpy_array(np.array([[0.5, 0.5], [0.5, 0.5]]), create_using=nx.DiGraph)
return nx.from_numpy_array(
np.array([[0.5, 0.5], [0.5, 0.5]]), create_using=nx.DiGraph
)

x = np.zeros(size)
x[0] = 0.5
if connect_style == 0: # bi-connection
x[0] = 1/3.0
x[-1] = 1/3.0
x[1] = 1/3.0
x[0] = 1 / 3.0
x[-1] = 1 / 3.0
x[1] = 1 / 3.0
elif connect_style == 1: # left-connection
x[-1] = 0.5
elif connect_style == 2: # right-connection
Expand Down Expand Up @@ -295,7 +299,7 @@ def FullyConnectedGraph(size: int) -> nx.DiGraph:
>>> nx.draw_spring(G)
"""
assert size > 0
x = np.array([1/size] * size)
x = np.array([1 / size] * size)
topo = np.empty((size, size))
for i in range(size):
topo[i] = np.roll(x, i)
Expand All @@ -313,7 +317,8 @@ def IsRegularGraph(topo: nx.DiGraph) -> bool:


def GetDynamicOnePeerSendRecvRanks(
topo: nx.DiGraph, self_rank: int) -> Iterator[Tuple[List[int], List[int]]]:
topo: nx.DiGraph, self_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
"""A utility function to generate 1-outoging send rank and corresponding recieving rank(s).

Args:
Expand All @@ -335,8 +340,10 @@ def GetDynamicOnePeerSendRecvRanks(
size = topo.number_of_nodes()
sorted_send_ranks = []
for rank in range(size):
sorted_ranks = sorted(topo.successors(rank),
key=lambda r, rk=rank: r-rk if r >= rk else r-rk+size)
sorted_ranks = sorted(
topo.successors(rank),
key=lambda r, rk=rank: r - rk if r >= rk else r - rk + size,
)
if sorted_ranks[0] == rank:
sorted_ranks = sorted_ranks[1:] # remove the self-loop
sorted_send_ranks.append(sorted_ranks)
Expand All @@ -358,8 +365,8 @@ def GetDynamicOnePeerSendRecvRanks(


def GetExp2DynamicSendRecvMachineRanks(
world_size: int, local_size: int, self_rank: int, local_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
world_size: int, local_size: int, self_rank: int, local_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
"""
A utility function to generate 1-outgoing send machine id and corresponding recieving
machine id(s) for Exponentia-2 topology.
Expand All @@ -377,28 +384,31 @@ def GetExp2DynamicSendRecvMachineRanks(
This function should be used under homogeneous enviroment only, i.e. all machines have
the same number of local processes.
"""
assert (self_rank % local_size) == local_rank, \
"It should be used under homogeneous environment only."
assert (world_size % local_size) == 0, \
"It should be used under homogeneous environment only."
assert world_size > local_size, \
"It should be used under at least two machines case."
assert (
self_rank % local_size
) == local_rank, "It should be used under homogeneous environment only."
assert (
world_size % local_size
) == 0, "It should be used under homogeneous environment only."
assert (
world_size > local_size
), "It should be used under at least two machines case."

machine_id = self_rank // local_size
machine_size = world_size // local_size
exp_2_size = int(np.log2(machine_size-1)) if machine_size > 1 else 0
exp_2_size = int(np.log2(machine_size - 1)) if machine_size > 1 else 0
index = 0
while True:
machine_dist = 2**(index % (exp_2_size + 1))
machine_dist = 2 ** (index % (exp_2_size + 1))
send_machine_rank = (machine_id + machine_dist) % machine_size
recv_machine_ranks = (machine_id - machine_dist) % machine_size
yield [send_machine_rank], [recv_machine_ranks]
index += 1


def GetInnerOuterRingDynamicSendRecvRanks(
world_size: int, local_size: int, self_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
world_size: int, local_size: int, self_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
"""
A utility function to generate 1-outgoing send rank and corresponding recieving rank(s)
for Inner-Ring-Outer-Ring topology.
Expand All @@ -419,11 +429,15 @@ def GetInnerOuterRingDynamicSendRecvRanks(
>>> for _ in range(10):
>>> print(next(gen))
"""
num_machines = world_size//local_size
num_machines = world_size // local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or " \
assert (
world_size % local_size == 0
), "It should be used under homogeneous environment only."
assert local_size > 2, (
"Do no support the case where nodes_per_machine is equal or "
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."
)

index = 0
while True:
Expand All @@ -446,16 +460,14 @@ def GetInnerOuterRingDynamicSendRecvRanks(
# find send_rank
target_local_rank_id = (local_rank_id + 1) % nodes_per_machine
if target_local_rank_id == local_rank_to_go_outside_id:
target_local_rank_id = (
target_local_rank_id + 1) % nodes_per_machine
target_local_rank_id = (target_local_rank_id + 1) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

# find recv_rank
source_local_rank_id = (local_rank_id - 1) % nodes_per_machine
if source_local_rank_id == local_rank_to_go_outside_id:
source_local_rank_id = (
source_local_rank_id - 1) % nodes_per_machine
source_local_rank_id = (source_local_rank_id - 1) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id

Expand All @@ -464,8 +476,8 @@ def GetInnerOuterRingDynamicSendRecvRanks(


def GetInnerOuterExpo2DynamicSendRecvRanks(
world_size: int, local_size: int, self_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
world_size: int, local_size: int, self_rank: int
) -> Iterator[Tuple[List[int], List[int]]]:
"""
A utility function to generate 1-outgoing send rank and corresponding recieving rank(s)
for Inner-Exp2-Outer-Exp2 ring topology.
Expand All @@ -486,18 +498,22 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
>>> for _ in range(10):
>>> print(next(gen))
"""
num_machines = world_size//local_size
num_machines = world_size // local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or " \
assert (
world_size % local_size == 0
), "It should be used under homogeneous environment only."
assert local_size > 2, (
"Do no support the case where nodes_per_machine is equal or "
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."
)

exp_2_out_size = int(np.log2(num_machines-1))
exp_2_out_size = int(np.log2(num_machines - 1))
if nodes_per_machine == 2:
exp_2_in_size = 0
else:
# -2 because we need to remove outgoing node
exp_2_in_size = int(np.log2(nodes_per_machine-2))
exp_2_in_size = int(np.log2(nodes_per_machine - 2))

index = 0
while True:
Expand All @@ -513,7 +529,7 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
# exp_2_out_size=3, and local_rank_id=1. If this branch is reached,
# local_rank_to_go_outside_id=1, and index % (exp_2_out_size+1)=1, resulting in
# next_machine_dist always equal to 2.
next_machine_dist = 2**(index % (exp_2_out_size+1))
next_machine_dist = 2 ** (index % (exp_2_out_size + 1))
# find send_rank
target_machine_id = (machine_id + next_machine_dist) % num_machines
target_rank_id = target_machine_id * nodes_per_machine + local_rank_id
Expand All @@ -526,27 +542,29 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(

else:
# Distance from self to out-rank:
dist_to_out = (local_rank_to_go_outside_id -
local_rank_id) % nodes_per_machine
next_inner_dist = 2**(index % (exp_2_in_size + 1))
dist_to_out = (
local_rank_to_go_outside_id - local_rank_id
) % nodes_per_machine
next_inner_dist = 2 ** (index % (exp_2_in_size + 1))
if next_inner_dist >= dist_to_out:
next_inner_dist += 1

# find send_rank
target_local_rank_id = (local_rank_id +
next_inner_dist) % nodes_per_machine
target_local_rank_id = (local_rank_id + next_inner_dist) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

reverse_inner_dist = 2**(index % (exp_2_in_size + 1))
reverse_inner_dist = 2 ** (index % (exp_2_in_size + 1))
reverse_dist_to_out = (
local_rank_id - local_rank_to_go_outside_id) % nodes_per_machine
local_rank_id - local_rank_to_go_outside_id
) % nodes_per_machine
if reverse_inner_dist >= reverse_dist_to_out:
reverse_inner_dist += 1

# find recv_rank
source_local_rank_id = (local_rank_id -
reverse_inner_dist) % nodes_per_machine
source_local_rank_id = (
local_rank_id - reverse_inner_dist
) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id

Expand Down
49 changes: 30 additions & 19 deletions bluefog/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,26 @@
import os
import sysconfig

EXTENSIONS = ['tensorflow', 'torch']
EXTENSIONS = ["tensorflow", "torch"]


def is_running_from_ipython():
from IPython import get_ipython

return get_ipython() is not None


def get_ext_suffix():
"""Determine library extension for various versions of Python."""
ext_suffix = sysconfig.get_config_var('EXT_SUFFIX')
ext_suffix = sysconfig.get_config_var("EXT_SUFFIX")
if ext_suffix:
return ext_suffix

ext_suffix = sysconfig.get_config_var('SO')
ext_suffix = sysconfig.get_config_var("SO")
if ext_suffix:
return ext_suffix

return '.so'
return ".so"


def get_extension_full_path(pkg_path, *args):
Expand All @@ -48,45 +51,51 @@ def get_extension_full_path(pkg_path, *args):
def check_extension(ext_name, pkg_path, *args):
full_path = get_extension_full_path(pkg_path, *args)
if not os.path.exists(full_path):
raise ImportError(
'Extension {} has not been built. '.format(ext_name))
raise ImportError("Extension {} has not been built. ".format(ext_name))


def _check_extension_lambda(ext_base_name, fn, fn_desc, verbose):
"""
Tries to load the extension in a new process. If successful, puts fn(ext)
to the queue or False otherwise. Mutes all stdout/stderr.
"""

def _target_fn(ext_base_name, fn, fn_desc, queue, verbose):
import importlib
import sys
import traceback

if verbose:
print('Checking whether extension {ext_base_name} was {fn_desc}.'.format(
ext_base_name=ext_base_name, fn_desc=fn_desc))
print(
"Checking whether extension {ext_base_name} was {fn_desc}.".format(
ext_base_name=ext_base_name, fn_desc=fn_desc
)
)
else:
# Suppress output
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")

try:
ext = importlib.import_module('.' + ext_base_name, 'bluefog')
ext = importlib.import_module("." + ext_base_name, "bluefog")
result = fn(ext)
except: # pylint: disable=bare-except
traceback.print_exc()
result = None

if verbose:
print('Extension {ext_base_name} {flag} {fn_desc}.'.format(
ext_base_name=ext_base_name, flag=('was' if result else 'was NOT'),
fn_desc=fn_desc))
print(
"Extension {ext_base_name} {flag} {fn_desc}.".format(
ext_base_name=ext_base_name,
flag=("was" if result else "was NOT"),
fn_desc=fn_desc,
)
)

queue.put(result)

queue = Queue()
p = Process(target=_target_fn,
args=(ext_base_name, fn, fn_desc, queue, verbose))
p = Process(target=_target_fn, args=(ext_base_name, fn, fn_desc, queue, verbose))
p.daemon = True
p.start()
p.join()
Expand All @@ -95,15 +104,17 @@ def _target_fn(ext_base_name, fn, fn_desc, queue, verbose):

def extension_available(ext_base_name, verbose=False):
available_fn = lambda ext: ext is not None
return _check_extension_lambda(
ext_base_name, available_fn, 'built', verbose) or False
return (
_check_extension_lambda(ext_base_name, available_fn, "built", verbose) or False
)


def mpi_built(verbose=False):
for ext_base_name in EXTENSIONS:
built_fn = lambda ext: ext.mpi_built()
result = _check_extension_lambda(
ext_base_name, built_fn, 'built with MPI', verbose)
ext_base_name, built_fn, "built with MPI", verbose
)
if result is not None:
return result
return False
Expand Down