Skip to content

Commit fd76e08

Browse files
committed
static check changes
Signed-off-by: arunjose696 <[email protected]>
1 parent 5e84a30 commit fd76e08

File tree

3 files changed

+81
-82
lines changed

3 files changed

+81
-82
lines changed

unidist/core/backends/mpi/core/controller/actor.py

+19-14
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
from unidist.core.backends.mpi.core.controller.garbage_collector import (
1212
garbage_collector,
1313
)
14-
from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin
14+
from unidist.core.backends.mpi.core.controller.common import RoundRobin
1515
from .api import workQueue
1616

17+
1718
class ActorMethod:
1819
"""
1920
Class responsible to execute method of an actor.
@@ -40,13 +41,10 @@ def __call__(self, *args, num_returns=1, **kwargs):
4041
unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
4142
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}
4243

43-
#push_data(self._actor._owner_rank, unwrapped_args)
44-
#push_data(self._actor._owner_rank, unwrapped_kwargs)
45-
from unidist.core.backends.common.data_id import DataID
46-
breakpoint()
47-
workQueue.put([1,(self._actor._owner_rank, [DataID("dadada")])])
48-
workQueue.put([1,(self._actor._owner_rank, unwrapped_args)])
49-
workQueue.put([1,(self._actor._owner_rank, unwrapped_kwargs)])
44+
# push_data(self._actor._owner_rank, unwrapped_args)
45+
# push_data(self._actor._owner_rank, unwrapped_kwargs)
46+
workQueue.put([(self._actor._owner_rank, unwrapped_args)])
47+
workQueue.put([(self._actor._owner_rank, unwrapped_kwargs)])
5048
operation_type = common.Operation.ACTOR_EXECUTE
5149
operation_data = {
5250
"task": self._method_name,
@@ -55,12 +53,19 @@ def __call__(self, *args, num_returns=1, **kwargs):
5553
"output": common.master_data_ids_to_base(output_id),
5654
"handler": self._actor._handler_id.base_data_id(),
5755
}
58-
59-
workQueue.put([2,(communication.MPIState.get_instance().comm,
60-
operation_type,
61-
operation_data,
62-
self._actor._owner_rank,)])
63-
56+
57+
workQueue.put(
58+
[
59+
2,
60+
(
61+
communication.MPIState.get_instance().comm,
62+
operation_type,
63+
operation_data,
64+
self._actor._owner_rank,
65+
),
66+
]
67+
)
68+
6469
return output_id
6570

6671

unidist/core/backends/mpi/core/controller/api.py

+54-61
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import atexit
99
import signal
1010
import asyncio
11+
import queue
12+
import threading
13+
import time
1114
from collections import defaultdict
12-
from datetime import datetime
15+
1316
try:
1417
import mpi4py
1518
from mpi4py import futures
@@ -51,47 +54,44 @@
5154
# The global variable is responsible for if MPI backend has already been initialized
5255
is_mpi_initialized = False
5356

54-
import queue
55-
import threading
56-
import time
5757

5858
exitFlag = 0
5959

60-
class myThread (threading.Thread):
61-
def __init__(self, threadID, name, q):
62-
threading.Thread.__init__(self)
63-
self.threadID = threadID
64-
self.name = name
65-
self.q = q
66-
def run(self):
67-
print ("Starting " + self.name)
68-
process_data(self.name, self.q)
69-
print ("Exiting " + self.name)
7060

61+
class myThread(threading.Thread):
62+
def __init__(self, threadID, name, q):
63+
threading.Thread.__init__(self)
64+
self.threadID = threadID
65+
self.name = name
66+
self.q = q
7167

68+
def run(self):
69+
print("Starting " + self.name)
70+
process_data(self.name, self.q)
71+
print("Exiting " + self.name)
7272

7373

7474
def process_data(threadName, q):
7575
global getRequests
7676
while not exitFlag:
7777
queueLock.acquire()
78-
#print("queue size is {} , time ={}".format(workQueue.qsize(),datetime.fromtimestamp(time.time())))
78+
# print("queue size is {} , time ={}".format(workQueue.qsize(),datetime.fromtimestamp(time.time())))
7979
if not workQueue.empty():
80-
future, data = q.get()
80+
future, data = q.get()
8181
queueLock.release()
82-
function, args = data
82+
function, args = data
8383
result = function(*args)
8484
if future:
8585
future.set_result(result)
8686
else:
8787
queueLock.release()
88-
#time.sleep(.1)
88+
# time.sleep(.1)
8989
print("exited")
9090

91+
9192
threadList = ["Thread-1"]
9293
queueLock = threading.Lock()
9394
workQueue = queue.Queue(0)
94-
getQueue = queue.Queue(0)
9595
threads = []
9696

9797

@@ -122,12 +122,12 @@ def init():
122122
comm = MPI.COMM_WORLD
123123
rank = comm.Get_rank()
124124
parent_comm = MPI.Comm.Get_parent()
125-
if not threads and rank == 0 and parent_comm == MPI.COMM_NULL:
125+
if not threads and rank == 0 and parent_comm == MPI.COMM_NULL:
126126
for tName in threadList:
127127
thread = myThread(1, tName, workQueue)
128128
thread.start()
129129
threads.append(thread)
130-
130+
131131
# path to dynamically spawn MPI processes
132132
if rank == 0 and parent_comm == MPI.COMM_NULL:
133133
# Create new threads
@@ -231,7 +231,6 @@ def shutdown():
231231
"""
232232
global exitFlag
233233
exitFlag = 1
234-
print("here")
235234
for t in threads:
236235
t.join()
237236
mpi_state = communication.MPIState.get_instance()
@@ -245,7 +244,6 @@ def shutdown():
245244
async_operations.finish()
246245
if not MPI.Is_finalized():
247246
MPI.Finalize()
248-
249247

250248

251249
def cluster_resources():
@@ -283,12 +281,11 @@ def put(data):
283281
unidist.core.backends.mpi.core.common.MasterDataID
284282
An ID of an object in object storage.
285283
"""
286-
#data_id = object_store.generate_data_id(garbage_collector)
287-
data_id = futures.Future()
288-
workQueue.put([data_id, [ object_store.generate_data_id,[garbage_collector]] ])
284+
# data_id = object_store.generate_data_id(garbage_collector)
285+
data_id = futures.Future()
286+
workQueue.put([data_id, [object_store.generate_data_id, [garbage_collector]]])
289287
data_id = data_id.result()
290-
workQueue.put([ None, [object_store.put,[data_id, data]] ])
291-
#object_store.put(data_id, data)
288+
workQueue.put([None, [object_store.put, [data_id, data]]])
292289

293290
logger.debug("PUT {} id".format(data_id._id))
294291

@@ -311,26 +308,16 @@ def get(data_ids):
311308
"""
312309

313310
def get_impl(data_id):
314-
global getRequests,workQueue
311+
global getRequests, workQueue
315312
if object_store.contains(data_id):
316313
value = object_store.get(data_id)
317314
else:
318315
queueLock.acquire()
319-
print("size {} data_id={}".format(workQueue.qsize(), data_id))
320-
future = futures.Future()
321-
workQueue.put([future, [ request_worker_data, [data_id] ] ])
316+
print("size {} data_id={}".format(workQueue.qsize(), data_id))
317+
future = futures.Future()
318+
workQueue.put([future, [request_worker_data, [data_id]]])
322319
queueLock.release()
323-
value = future.result()
324-
325-
326-
print("got {}".format( data_id))
327-
print("value {}".format( value))
328-
329-
330-
331-
332-
333-
320+
value = future.result()
334321

335322
if isinstance(value, Exception):
336323
raise value
@@ -449,8 +436,16 @@ def submit(task, *args, num_returns=1, **kwargs):
449436
# dest_rank, garbage_collector, num_returns
450437
# )
451438
global workQueue
452-
output_ids = futures.Future()
453-
workQueue.put([output_ids, [ object_store.generate_output_data_id,[dest_rank, garbage_collector, num_returns]] ])
439+
output_ids = futures.Future()
440+
workQueue.put(
441+
[
442+
output_ids,
443+
[
444+
object_store.generate_output_data_id,
445+
[dest_rank, garbage_collector, num_returns],
446+
],
447+
]
448+
)
454449
output_ids = output_ids.result()
455450
logger.debug("REMOTE OPERATION")
456451
logger.debug(
@@ -466,16 +461,13 @@ def submit(task, *args, num_returns=1, **kwargs):
466461

467462
unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
468463
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}
469-
470-
464+
471465
queueLock.acquire()
472466
print(workQueue.qsize())
473-
workQueue.put([ None, [ push_data, [dest_rank, unwrapped_args]]])
474-
475-
print(unwrapped_args, time.time())
476-
workQueue.put([ None, [ push_data, [dest_rank, unwrapped_kwargs]]])
467+
workQueue.put([None, [push_data, [dest_rank, unwrapped_args]]])
477468

478-
469+
print(unwrapped_args, time.time())
470+
workQueue.put([None, [push_data, [dest_rank, unwrapped_kwargs]]])
479471

480472
operation_type = common.Operation.EXECUTE
481473
operation_data = {
@@ -484,19 +476,22 @@ def submit(task, *args, num_returns=1, **kwargs):
484476
"kwargs": unwrapped_kwargs,
485477
"output": common.master_data_ids_to_base(output_ids),
486478
}
479+
487480
def send_operation_impl(comm, operation_type, operation_data, dest_rank):
488481
async_operations = AsyncOperations.get_instance()
489482
h_list, _ = communication.isend_complex_operation(
490-
comm,
491-
operation_type,
492-
operation_data,
493-
dest_rank,
483+
comm,
484+
operation_type,
485+
operation_data,
486+
dest_rank,
494487
)
495488
async_operations.extend(h_list)
489+
496490
comm = communication.MPIState.get_instance().comm
497-
workQueue.put([ None, [ send_operation_impl, [comm, operation_type, operation_data, dest_rank]]])
498-
499-
491+
workQueue.put(
492+
[None, [send_operation_impl, [comm, operation_type, operation_data, dest_rank]]]
493+
)
494+
500495
queueLock.release()
501496
# Track the task execution
502497
garbage_collector.increment_task_counter()
@@ -507,8 +502,6 @@ def send_operation_impl(comm, operation_type, operation_data, dest_rank):
507502
# ---------------------------- #
508503
# unidist termination handling #
509504
# ---------------------------- #
510-
#!/usr/bin/python
511-
512505

513506

514507
def _termination_handler():

unidist/core/backends/mpi/core/controller/garbage_collector.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ def _send_cleanup_request(self, cleanup_list):
6464
s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list)
6565
async_operations = AsyncOperations.get_instance()
6666
for rank_id in range(initial_worker_number, mpi_state.world_size):
67-
h_list = communication.isend_serialized_operation(
68-
mpi_state.comm,
69-
common.Operation.CLEANUP,
70-
s_cleanup_list,
71-
rank_id,
72-
)
73-
async_operations.extend(h_list)
67+
if rank_id != mpi_state.rank:
68+
h_list = communication.isend_serialized_operation(
69+
mpi_state.comm,
70+
common.Operation.CLEANUP,
71+
s_cleanup_list,
72+
rank_id,
73+
)
74+
async_operations.extend(h_list)
7475

7576
def increment_task_counter(self):
7677
"""

0 commit comments

Comments
 (0)