Skip to content

Fix merging responses in nest-server-mpi #3492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 78 additions & 80 deletions pynest/nest/server/hl_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def do_call(call_name, args=[], kwargs={}):
response = mpi_comm.gather(response[0], root=0)
log(call_name, f"received response gather, data={response}")

return combine(call_name, response)
return combine(response)


@app.route("/exec", methods=["GET", "POST"])
Expand Down Expand Up @@ -543,7 +543,7 @@ def run_mpi_app(host="127.0.0.1", port=52425):
app.run(host=host, port=port, threaded=False)


def combine(call_name, response):
def combine(response: list) -> dict | list | None:
"""Combine responses from different MPI processes.

In a distributed scenario, each MPI process creates its own share
Expand All @@ -560,118 +560,116 @@ def combine(call_name, response):
The combination of results is based on a cascade of heuristics
based on the call that was issued and individual repsonse data:
* if all responses are None, the combined response will also just
be None
* for some specific calls, the responses are known to be the
same from the master and all workers. In this case, the
combined response is just the master response
be None.
* the responses are known to be the same from the master and all
workers. In this case, the combined response is just the master
response.
* if the response list contains only a single actual response and
None otherwise, the combined response will be that one actual
response
* for calls to GetStatus on recording devices, the combined
response will be a merged dictionary in the sense that all
fields that contain a single value in the individual responsed
are kept as a single values, while lists will be appended in
order of appearance; dictionaries in the response are
recursively treated in the same way
* for calls to GetStatus on neurons, the combined response is just
the single dictionary returned by the process on which the
neuron is actually allocated
* if the response contains one list per process, the combined
response will be those lists concatenated and flattened.
response.
* if the response contains one list per process, the combined
response will be those first list.

"""

if mpi_comm is None:
if type(response) is not list or len(response) == 0:
return response

# return first dictionary if the response contains only one element.
elif mpi_comm is None or len(response) == 1:
return response[0]

if all(v is None for v in response):
elif all(v is None for v in response):
return None

# return the master response if all responses are known to be the same
if call_name in ("exec", "Create", "GetDefaults", "GetKernelStatus", "SetKernelStatus", "SetStatus"):
return response[0]

# return a single response if there is only one which is not None
filtered_response = list(filter(lambda x: x is not None, response))
if len(filtered_response) == 1:
return filtered_response[0]
# return first list/tuple if the response only consists of lists or tuples.
elif all(type(v) is (list) or type(v) is tuple for v in response):
return response[0] # TODO consider alternative: _flatten(response)

# return a single merged dictionary if there are many of them
if all(type(v[0]) is dict for v in response):
elif all(type(v) is dict for v in response):
return merge_dicts(response)

# return a flattened list if the response only consists of lists
if all(type(v) is list for v in response):
return [item for lst in response for item in lst]

log("combine()", f"ERROR: cannot combine response={response}")
msg = "Cannot combine data because of unknown reason"
raise Exception(msg) # pylint: disable=W0719


def merge_dicts(response):
"""Merge status dictionaries of recorders
def merge_dicts(response: list[dict]) -> dict:
"""Merge dictionaries of the response.

This function runs through a zipped list and performs the
following steps:
* sum up all n_events fields
* if recording to memory: merge the event dictionaries by joining
all contained arrays
* if recording to ascii: join filenames arrays
* take all other values directly from the device on the first
process
Parameters
----------
response: list
list of response

"""

result = []
# return first dictionary if the response contains only one element.
if len(response) == 1:
return response[0]

for device_dicts in zip(*response):
# TODO: either stip fields like thread, vp, thread_local_id,
# and local or make them lists that contain the values from
# all dicts.
# if the response comes from recorders in exec call.
elif "data" in response[0]:
data = response[0]["data"]
if len(data) == 1:
data_key = list(data.keys())[0]
response_data = [res["data"][data_key] for res in response]
merged = [_merge_dict([d[idx] for d in response_data]) for idx in range(len(response_data[0]))]
return dict([["data", dict([[data_key, merged]])]])

element_type = device_dicts[0]["element_type"]
# if the response contains duplicates because of the MPI.
else:
keys = [list(r.keys()) for r in response]
if len(keys[0]) == len(set(sum(keys, []))):
return response[0]

if element_type not in ("neuron", "recorder", "stimulator"):
msg = f'Cannot combine data of element with type "{element_type}".'
raise Exception(msg) # pylint: disable=W0719
log("merge_dict()", f"ERROR: cannot merge dict={response}")
msg = "Cannot merge dict because of unknown reason"
raise Exception(msg) # pylint: disable=W0719

if element_type == "neuron":
tmp = list(filter(lambda status: status["local"], device_dicts))
assert len(tmp) == 1
result.append(tmp[0])

if element_type == "recorder":
tmp = deepcopy(device_dicts[0])
tmp["n_events"] = 0
def _flatten(data: list[dict | list]) -> list:
"""Flatten nested list.

for device_dict in device_dicts:
tmp["n_events"] += device_dict["n_events"]
Parameters
----------
data: list
Nested list of data

record_to = tmp["record_to"]
if record_to not in ("ascii", "memory"):
msg = f'Cannot combine data when recording to "{record_to}".'
raise Exception(msg) # pylint: disable=W0719
"""
return [elem for dl in data for elem in dl]

if record_to == "memory":
event_keys = tmp["events"].keys()
for key in event_keys:
tmp["events"][key] = []
for device_dict in device_dicts:
for key in event_keys:
tmp["events"][key].extend(device_dict["events"][key])

if record_to == "ascii":
tmp["filenames"] = []
for device_dict in device_dicts:
tmp["filenames"].extend(device_dict["filenames"])
def _merge_dict(data: list[dict]) -> dict:
"""Merge dictionaries of the list.

Parameters
----------
data: list
List of dictionary data

"""
data_keys = list(set(_flatten(data)))
return dict([(data_key, _flatten([d[data_key] for d in data])) for data_key in data_keys])


def merge_response(response: list):
if "events" in response[0]["data"]:
events = [res["data"]["events"] for res in response]
merged = [_merge_event([e[idx] for e in events]) for idx in range(len(events[0]))]
return [{"data": {"events": merged}}]
else:
return response


result.append(tmp)
def _flatten(xss):
return [x for xs in xss for x in xs]

if element_type == "stimulator":
result.append(device_dicts[0])

return result
def _merge_event(event: list):
eventKeys = list(set(_flatten([e for e in event])))
return dict([(eKey, _flatten([e[eKey] for e in event])) for eKey in eventKeys])


if __name__ == "__main__":
Expand Down
Loading