diff --git a/src/power_grid_model_ds/_core/visualizer/app.py b/src/power_grid_model_ds/_core/visualizer/app.py index 169aacbc..d1003139 100644 --- a/src/power_grid_model_ds/_core/visualizer/app.py +++ b/src/power_grid_model_ds/_core/visualizer/app.py @@ -5,6 +5,7 @@ import dash_bootstrap_components as dbc from dash import Dash, dcc, html from dash_bootstrap_components.icons import FONT_AWESOME +from power_grid_model import ComponentType from power_grid_model_ds._core.model.grids.base import Grid from power_grid_model_ds._core.visualizer.callbacks import ( # noqa: F401 # pylint: disable=unused-import @@ -17,7 +18,8 @@ from power_grid_model_ds._core.visualizer.layout.cytoscape_styling import DEFAULT_STYLESHEET from power_grid_model_ds._core.visualizer.layout.header import HEADER_HTML from power_grid_model_ds._core.visualizer.layout.selection_output import SELECTION_OUTPUT_HTML -from power_grid_model_ds._core.visualizer.parsers import parse_branches, parse_node_array +from power_grid_model_ds._core.visualizer.parsers import parse_element_data +from power_grid_model_ds._core.visualizer.typing import VizToComponentData from power_grid_model_ds.arrays import NodeArray GOOGLE_FONTS = "https://fonts.googleapis.com/css?family=Roboto:300,400,500,700&display=swap" @@ -57,26 +59,33 @@ def _get_columns_store(grid: Grid) -> dcc.Store: return dcc.Store( id="columns-store", data={ - "node": grid.node.columns, - "line": grid.line.columns, - "link": grid.link.columns, - "transformer": grid.transformer.columns, - "three_winding_transformer": grid.three_winding_transformer.columns, + ComponentType.node: grid.node.columns, + ComponentType.line: grid.line.columns, + ComponentType.link: grid.link.columns, + ComponentType.transformer: grid.transformer.columns, + ComponentType.three_winding_transformer: grid.three_winding_transformer.columns, "branch": grid.branches.columns, }, ) +def _get_viz_to_comp_store(viz_to_comp_data: VizToComponentData) -> dcc.Store: + """Create a store for all data mapped by visualization element to component data.""" + return dcc.Store(id="viz-to-comp-store", data=viz_to_comp_data) + + def get_app_layout(grid: Grid) -> html.Div: """Get the app layout.""" columns_store = _get_columns_store(grid) graph_layout = _get_graph_layout(grid.node) - elements = parse_node_array(grid.node) + parse_branches(grid) + elements, viz_to_comp_data = parse_element_data(grid) + viz_to_comp_store = _get_viz_to_comp_store(viz_to_comp_data) cytoscape_html = get_cytoscape_html(graph_layout, elements) return html.Div( [ columns_store, + viz_to_comp_store, dcc.Store(id="stylesheet-store", data=DEFAULT_STYLESHEET), HEADER_HTML, html.Hr(style={"border-color": "white", "margin": "0"}), diff --git a/src/power_grid_model_ds/_core/visualizer/callbacks/element_selection.py b/src/power_grid_model_ds/_core/visualizer/callbacks/element_selection.py index 81449983..0f5c6345 100644 --- a/src/power_grid_model_ds/_core/visualizer/callbacks/element_selection.py +++ b/src/power_grid_model_ds/_core/visualizer/callbacks/element_selection.py @@ -4,29 +4,33 @@ from typing import Any -from dash import Input, Output, callback, dash_table +from dash import Input, Output, State, callback, dash_table from power_grid_model_ds._core.visualizer.layout.selection_output import ( SELECTION_OUTPUT_HTML, ) +from power_grid_model_ds._core.visualizer.typing import VizToComponentData @callback( Output("selection-output", "children"), Input("cytoscape-graph", "selectedNodeData"), Input("cytoscape-graph", "selectedEdgeData"), + State("viz-to-comp-store", "data"), ) -def display_selected_element(node_data: list[dict[str, Any]], edge_data: list[dict[str, Any]]): +def display_selected_element( + node_data: list[dict[str, Any]], edge_data: list[dict[str, Any]], viz_to_comp: VizToComponentData +): """Display the tapped edge data.""" if node_data: - return _to_data_table(node_data.pop()) - if edge_data: - edge_data_dict = edge_data.pop() - del edge_data_dict["source"] # duplicated by from_node - del edge_data_dict["target"] # duplicated by to_node - del edge_data_dict["group"] # unnecessary information - return _to_data_table(edge_data_dict) - return SELECTION_OUTPUT_HTML + elm_id_str = node_data.pop()["id"] + elif edge_data: + elm_id_str = edge_data.pop()["id"] + else: + return SELECTION_OUTPUT_HTML.children + + first_value = next(iter(viz_to_comp[elm_id_str].values())) + return _to_data_table(first_value[0]) def _to_data_table(data: dict[str, Any]): diff --git a/src/power_grid_model_ds/_core/visualizer/parsers.py b/src/power_grid_model_ds/_core/visualizer/parsers.py index 50b048fd..4578974e 100644 --- a/src/power_grid_model_ds/_core/visualizer/parsers.py +++ b/src/power_grid_model_ds/_core/visualizer/parsers.py @@ -4,72 +4,119 @@ from typing import Any, Literal +from power_grid_model import ComponentType + from power_grid_model_ds._core.model.arrays.base.array import FancyArray from power_grid_model_ds._core.model.grids.base import Grid +from power_grid_model_ds._core.visualizer.typing import VizToComponentData from power_grid_model_ds.arrays import Branch3Array, BranchArray, NodeArray -def parse_node_array(nodes: NodeArray) -> list[dict[str, Any]]: - """Parse the nodes.""" +def parse_element_data(grid: Grid) -> tuple[list[dict[str, Any]], VizToComponentData]: + """ + Parse grid element data and organize by node ID as string. + + Args: + grid (Grid): The power grid model. + Returns: + tuple[list[dict[str, Any]], VizToComponentData]: A tuple containing + a list of elements for visualization + A mapping from node or edge IDs used in visualization to their associated component data. + """ + viz_to_comp: VizToComponentData = {} + + elements = [] + elements += parse_node_array(grid.node, viz_to_comp) + elements += parse_branches(grid, viz_to_comp) + + return elements, viz_to_comp + + +def parse_node_array(nodes: NodeArray, viz_to_comp: VizToComponentData) -> list[dict[str, Any]]: + """Parse the nodes. Fills node data to viz_to_comp.""" parsed_nodes = [] with_coords = "x" in nodes.columns and "y" in nodes.columns - columns = nodes.columns for node in nodes: - cyto_elements = {"data": _array_to_dict(node, columns)} - cyto_elements["data"]["id"] = str(node.id.item()) - cyto_elements["data"]["group"] = "node" + node_id_str = str(node.id.item()) + + _ensure_component_list(viz_to_comp, node_id_str, ComponentType.node) + viz_to_comp[node_id_str][ComponentType.node].append(_array_to_dict(node, nodes.columns)) + + cyto_elements = {"data": {"id": node_id_str, "group": ComponentType.node.value}} if with_coords: cyto_elements["position"] = {"x": node.x.item(), "y": -node.y.item()} # invert y-axis for visualization parsed_nodes.append(cyto_elements) return parsed_nodes -def parse_branches(grid: Grid) -> list[dict[str, Any]]: - """Parse the branches.""" +def parse_branches(grid: Grid, viz_to_comp: VizToComponentData) -> list[dict[str, Any]]: + """Parse the branches. Fills branch data to viz_to_comp.""" parsed_branches = [] - parsed_branches.extend(parse_branch_array(grid.line, "line")) - parsed_branches.extend(parse_branch_array(grid.link, "link")) - parsed_branches.extend(parse_branch_array(grid.transformer, "transformer")) - parsed_branches.extend(parse_branch3_array(grid.three_winding_transformer, "transformer")) + parsed_branches.extend(parse_branch_array(grid.line, ComponentType.line, viz_to_comp)) + parsed_branches.extend(parse_branch_array(grid.link, ComponentType.link, viz_to_comp)) + parsed_branches.extend(parse_branch_array(grid.transformer, ComponentType.transformer, viz_to_comp)) + + parsed_branches.extend( + parse_branch3_array( + grid.three_winding_transformer, + component_type=ComponentType.three_winding_transformer, + group=ComponentType.transformer.value, + viz_to_comp=viz_to_comp, + ) + ) return parsed_branches -def parse_branch3_array(branches: Branch3Array, group: Literal["transformer"]) -> list[dict[str, Any]]: - """Parse the three-winding transformer array.""" +def parse_branch3_array( + branches: Branch3Array, + component_type: Literal[ComponentType.three_winding_transformer], + group: Literal["transformer"], + viz_to_comp: VizToComponentData, +) -> list[dict[str, Any]]: + """Parse the three-winding transformer array. Fills branch3 data to viz_to_comp.""" parsed_branches = [] - columns = branches.columns for branch3 in branches: - for branch1 in branch3.as_branches(): - cyto_elements = {"data": _array_to_dict(branch1, columns)} - cyto_elements["data"].update( - { + branch3_component_data = _array_to_dict(branch3, branches.columns) # Same for all three branches + for count, branch1 in enumerate(branch3.as_branches()): + branch3_id_str = f"{str(branch3.id.item())}_{count}" + + _ensure_component_list(viz_to_comp, branch3_id_str, component_type) + viz_to_comp[branch3_id_str][component_type].append(branch3_component_data) + + cyto_elements = { + "data": { # IDs need to be unique, so we combine the branch ID with the from and to nodes - "id": str(branch3.id.item()) + f"_{branch1.from_node.item()}_{branch1.to_node.item()}", + "id": branch3_id_str, "source": str(branch1.from_node.item()), "target": str(branch1.to_node.item()), "group": group, } - ) + } parsed_branches.append(cyto_elements) return parsed_branches -def parse_branch_array(branches: BranchArray, group: Literal["line", "link", "transformer"]) -> list[dict[str, Any]]: - """Parse the branch array.""" +def parse_branch_array( + branches: BranchArray, + group: Literal[ComponentType.line, ComponentType.link, ComponentType.transformer], + viz_to_comp: VizToComponentData, +) -> list[dict[str, Any]]: + """Parse the branch array. Fills branch data to viz_to_comp.""" parsed_branches = [] - columns = branches.columns for branch in branches: - cyto_elements = {"data": _array_to_dict(branch, columns)} - cyto_elements["data"].update( - { + _ensure_component_list(viz_to_comp, str(branch.id.item()), group) + viz_to_comp[str(branch.id.item())][group].append(_array_to_dict(branch, branches.columns)) + + cyto_elements = { + "data": { "id": str(branch.id.item()), "source": str(branch.from_node.item()), "target": str(branch.to_node.item()), - "group": group, + "group": group.value, } - ) + } parsed_branches.append(cyto_elements) return parsed_branches @@ -77,3 +124,11 @@ def parse_branch_array(branches: BranchArray, group: Literal["line", "link", "tr def _array_to_dict(array_record: FancyArray, columns: list[str]) -> dict[str, Any]: """Stringify the record (required by Dash).""" return dict(zip(columns, array_record.tolist().pop())) + + +def _ensure_component_list(viz_to_comp: VizToComponentData, node_id: str, component_type: ComponentType) -> None: + """Ensure that the component list exists for a given node and component type.""" + if node_id not in viz_to_comp: + viz_to_comp[node_id] = {} + if component_type not in viz_to_comp[node_id]: + viz_to_comp[node_id][component_type] = [] diff --git a/src/power_grid_model_ds/_core/visualizer/typing.py b/src/power_grid_model_ds/_core/visualizer/typing.py index 083f7080..64aa14c5 100644 --- a/src/power_grid_model_ds/_core/visualizer/typing.py +++ b/src/power_grid_model_ds/_core/visualizer/typing.py @@ -4,4 +4,35 @@ from typing import Any +from power_grid_model import ComponentType + STYLESHEET = list[dict[str, Any]] +ListArrayData = list[dict[str, Any]] + +""" +Mapping from visualization element ID to component type to list of array data. + +For example: + { + "node_id_1": { + "node": [ {"id": 0, "u_rated": 100}, {...} ], + "source": [ {..source data..}, {...}], + ... + }, + "edge_id_1": { + "line": [ {..line data..}, {...} ], + "sym_power_sensor": [ {..sensor data..}, {...}], + ... + }, + "branch3_id_0": { + "three_winding_transformer": [ {..three_winding_transformer data..}, {...} ], + ... + }, + "branch3_id_1": { + "three_winding_transformer": [ {..same three_winding_transformer data..}, {...} ], + ... + }, + ... + } +""" +VizToComponentData = dict[str, dict[ComponentType, ListArrayData]] diff --git a/tests/unit/visualizer/test_callbacks.py b/tests/unit/visualizer/test_callbacks.py index 267b5545..16c114ff 100644 --- a/tests/unit/visualizer/test_callbacks.py +++ b/tests/unit/visualizer/test_callbacks.py @@ -2,11 +2,15 @@ # # SPDX-License-Identifier: MPL-2.0 import pytest +from dash import dash_table from dash.exceptions import PreventUpdate +from power_grid_model import ComponentType from power_grid_model_ds._core.visualizer.callbacks.config import scale_elements, update_arrows +from power_grid_model_ds._core.visualizer.callbacks.element_selection import display_selected_element from power_grid_model_ds._core.visualizer.callbacks.search_form import search_element from power_grid_model_ds._core.visualizer.layout.cytoscape_styling import DEFAULT_STYLESHEET +from power_grid_model_ds._core.visualizer.layout.selection_output import SELECTION_OUTPUT_HTML _EDGE_INDEX = 3 @@ -45,3 +49,31 @@ def test_show_arrows(): def test_hide_arrows(): stylesheet = update_arrows(False, DEFAULT_STYLESHEET) assert stylesheet[_EDGE_INDEX]["style"]["target-arrow-shape"] == "none" + + +def test_display_selected_node(): + viz_to_comp = { + "1": {ComponentType.node: [{"id": 1, "u_rated": 10.0}, {"id": 2, "u_rated": 20.0}]}, + "3": {ComponentType.line: [{"id": 3, "from_node": 1, "to_node": 2, "r1": 10}]}, + "4_0": {ComponentType.three_winding_transformer: [{"id": 4, "node_1": 2, "node_2": 1, "node_3": 1, "sn": 10}]}, + "4_1": {ComponentType.three_winding_transformer: [{"id": 4, "node_1": 2, "node_2": 1, "node_3": 1, "sn": 10}]}, + } + + # node + output = display_selected_element(node_data=[{"id": "1"}], edge_data=[], viz_to_comp=viz_to_comp) + assert isinstance(output, dash_table.DataTable) + assert output.data == [viz_to_comp["1"][ComponentType.node][0]] + + # edge + output = display_selected_element(node_data=[], edge_data=[{"id": "3"}], viz_to_comp=viz_to_comp) + assert isinstance(output, dash_table.DataTable) + assert output.data == [viz_to_comp["3"][ComponentType.line][0]] + + # branch3 + output = display_selected_element(node_data=[], edge_data=[{"id": "4_0"}], viz_to_comp=viz_to_comp) + assert isinstance(output, dash_table.DataTable) + assert output.data == [viz_to_comp["4_0"][ComponentType.three_winding_transformer][0]] + + # nothing selected + output = display_selected_element(node_data=[], edge_data=[], viz_to_comp=viz_to_comp) + assert output == SELECTION_OUTPUT_HTML.children diff --git a/tests/unit/visualizer/test_parsers.py b/tests/unit/visualizer/test_parsers.py index dff42c93..2d6b2431 100644 --- a/tests/unit/visualizer/test_parsers.py +++ b/tests/unit/visualizer/test_parsers.py @@ -4,6 +4,7 @@ import numpy as np from numpy.typing import NDArray +from power_grid_model import ComponentType from power_grid_model_ds._core.model.arrays import LineArray, NodeArray from power_grid_model_ds._core.model.arrays.pgm_arrays import Branch3Array @@ -21,7 +22,8 @@ def test_parse_node_array(self): nodes["id"] = [1, 2, 3] nodes["u_rated"] = [10, 20.4, 30.99] - parsed = parse_node_array(nodes) + viz_to_comp = {} + parsed = parse_node_array(nodes, viz_to_comp) assert len(parsed) == 3 node_1_data = parsed[0]["data"] @@ -35,9 +37,9 @@ def test_parse_node_array(self): assert node_2_data["id"] == "2" assert node_3_data["id"] == "3" - assert node_1_data["u_rated"] == 10 - assert node_2_data["u_rated"] == 20.4 - assert node_3_data["u_rated"] == 30.99 + assert viz_to_comp["1"][ComponentType.node] == [dict(zip(nodes.columns, nodes[0].tolist().pop()))] + assert viz_to_comp["2"][ComponentType.node] == [dict(zip(nodes.columns, nodes[1].tolist().pop()))] + assert viz_to_comp["3"][ComponentType.node] == [dict(zip(nodes.columns, nodes[2].tolist().pop()))] def test_parse_coordinated_node_array(self): nodes = CoordinatedNodeArray.zeros(3) @@ -45,7 +47,7 @@ def test_parse_coordinated_node_array(self): nodes["x"] = [10, 20, 30] nodes["y"] = [99, 88, 77] - parsed = parse_node_array(nodes) + parsed = parse_node_array(nodes, {}) position = parsed[0].get("position") assert position is not None assert position["x"] == 10 @@ -58,7 +60,8 @@ def test_parse_line_array(self): lines["id"] = [100, 101, 102] lines["from_node"] = [1, 2, 3] lines["to_node"] = [4, 5, 6] - parsed = parse_branch_array(lines, "line") + viz_to_comp = {} + parsed = parse_branch_array(lines, ComponentType.line, viz_to_comp) assert len(parsed) == 3 assert parsed[0]["data"]["id"] == "100" @@ -66,6 +69,10 @@ def test_parse_line_array(self): assert parsed[0]["data"]["target"] == "4" assert parsed[0]["data"]["group"] == "line" + assert viz_to_comp["100"][ComponentType.line] == [dict(zip(lines.columns, lines[0].tolist().pop()))] + assert viz_to_comp["101"][ComponentType.line] == [dict(zip(lines.columns, lines[1].tolist().pop()))] + assert viz_to_comp["102"][ComponentType.line] == [dict(zip(lines.columns, lines[2].tolist().pop()))] + def test_parse_branch3_array(self): branch3 = Branch3Array.zeros(1) branch3["id"] = [200] @@ -76,17 +83,29 @@ def test_parse_branch3_array(self): branch3["status_2"] = [1] branch3["status_3"] = [1] - parsed = parse_branch3_array(branch3, "transformer") + viz_to_comp = {} + parsed = parse_branch3_array(branch3, ComponentType.three_winding_transformer, "transformer", viz_to_comp) + assert len(parsed) == 3 - assert parsed[0]["data"]["id"] == "200_1_2" + assert parsed[0]["data"]["id"] == "200_0" assert parsed[0]["data"]["source"] == "1" assert parsed[0]["data"]["target"] == "2" assert parsed[0]["data"]["group"] == "transformer" - assert parsed[1]["data"]["id"] == "200_1_3" + assert parsed[1]["data"]["id"] == "200_1" assert parsed[1]["data"]["source"] == "1" assert parsed[1]["data"]["target"] == "3" assert parsed[1]["data"]["group"] == "transformer" - assert parsed[2]["data"]["id"] == "200_2_3" + assert parsed[2]["data"]["id"] == "200_2" assert parsed[2]["data"]["source"] == "2" assert parsed[2]["data"]["target"] == "3" assert parsed[2]["data"]["group"] == "transformer" + + assert viz_to_comp["200_0"][ComponentType.three_winding_transformer] == [ + dict(zip(branch3.columns, branch3[0].tolist()[0])) + ] + assert viz_to_comp["200_1"][ComponentType.three_winding_transformer] == [ + dict(zip(branch3.columns, branch3[0].tolist()[0])) + ] + assert viz_to_comp["200_2"][ComponentType.three_winding_transformer] == [ + dict(zip(branch3.columns, branch3[0].tolist()[0])) + ]