-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnoc_graph.py
253 lines (205 loc) · 8.9 KB
/
noc_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""The NoC graph class."""
__copyright__ = """
Copyright (c) 2024 RapidStream Design Automation, Inc. and contributors.
All rights reserved. The contributor(s) of this file has/have agreed to the
RapidStream Contributor License Agreement.
"""
from typing import Any
from pydantic import BaseModel
class Node(BaseModel):
"""Represents a node in the NoC graph."""
name: str
class Edge(BaseModel):
"""Represents an edge in the NoC graph."""
src: Node
dest: Node
bandwidth: float
class NocGraph(BaseModel):
"""Represents a NoC graph.
num_slr: number of slr
num_col: number of vertical NoC
rows_per_slr: number of NMU <-> NSU rows per SLR
nmu_nodes: 2d array of all NMU nodes. Indexing follows Vivado.
nsu_nodes: 2d array of all NSU nodes. Indexing follows Vivado.
nps_vnoc_nodes: 2d array of all NPS nodes connected to NMUs and NSUs.
Indexing follows Vivado.
nps_hnoc_nodes: 2d array of all interconnect NPS nodes bridging SLRs.
nps_slr0_nodes: one row of the NPS nodes connecting the bottom SLR0 to DDR and CIPS.
ncrb_nodes: 2d array of all NCRB nodes for east-west communication.
Note: all 2d arrays are indexed as in the Cartesian plane.
"""
num_slr: int
num_col: int
rows_per_slr: list[int]
nmu_nodes: list[list[Node]]
nsu_nodes: list[list[Node]]
nps_vnoc_nodes: list[list[Node]]
nps_hnoc_nodes: list[list[Node]]
nps_slr0_nodes: list[list[Node]]
ncrb_nodes: list[list[Node]]
nps_hbm_nodes: list[list[Node]]
ncrb_hbm_nodes: list[list[Node]]
hbm_mc_nodes: list[list[list[Node]]]
nmu_hbm_nodes: list[Node]
nps4_hbm_mc_nodes: list[Node]
nps6_hbm_mc_nodes: list[list[Node]]
edges: list[Edge]
def __init__(self, **data: Any) -> None:
"""Initialize class."""
super().__init__(**data)
assert self.num_slr == len(self.rows_per_slr), "Invalid class attributes!"
def add_edge(self, edge: Edge) -> None:
"""Add an edge."""
self.edges.append(edge)
def add_edges(self, edges: list[Edge]) -> None:
"""Add a list of edges."""
for e in edges:
assert e.src != e.dest, f"Invalid edge! {e.src.name} -> {e.dest.name}"
self.add_edge(e)
# get node helper functions
def get_all_nodes(self) -> list[str]:
"""Get a list of all nodes' names.
Returns a list of strings.
"""
all_nodes: list[str] = []
all_nodes += [n.name for row in self.nmu_nodes for n in row]
all_nodes += [n.name for row in self.nsu_nodes for n in row]
all_nodes += [n.name for row in self.nps_vnoc_nodes for n in row]
all_nodes += [n.name for row in self.nps_hnoc_nodes for n in row]
all_nodes += [n.name for row in self.nps_slr0_nodes for n in row]
all_nodes += [n.name for row in self.ncrb_nodes for n in row]
all_nodes += [n.name for row in self.nps_hbm_nodes for n in row]
all_nodes += [n.name for row in self.ncrb_hbm_nodes for n in row]
all_nodes += [p.name for x in self.hbm_mc_nodes for n in x for p in n]
all_nodes += [row.name for row in self.nmu_hbm_nodes]
all_nodes += [row.name for row in self.nps4_hbm_mc_nodes]
all_nodes += [n.name for row in self.nps6_hbm_mc_nodes for n in row]
return all_nodes
def get_all_nmu_nodes(self) -> list[str]:
"""Get a list of all NMU nodes' names.
Returns a list of strings.
"""
return [n.name for row in self.nmu_nodes for n in row]
def get_all_nsu_nodes(self) -> list[str]:
"""Get a list of all NSU nodes' names.
Returns a list of strings.
"""
return [n.name for row in self.nsu_nodes for n in row]
def get_column_nmu_nodes(self, col: int, slr: int) -> list[str]:
"""Get a list of NMU nodes' names in the given column and slr.
Returns a list of strings.
"""
row_start = sum(self.rows_per_slr[:slr])
row_end = row_start + self.rows_per_slr[slr]
return [self.nmu_nodes[col][r].name for r in range(row_start, row_end)]
def get_column_nsu_nodes(self, col: int, slr: int) -> list[str]:
"""Get a list of NSU nodes' names in the given column and slr.
Returns a list of strings.
"""
row_start = sum(self.rows_per_slr[:slr])
row_end = row_start + self.rows_per_slr[slr]
return [self.nsu_nodes[col][r].name for r in range(row_start, row_end)]
def get_all_hbm_mc_nodes(self) -> list[str]:
"""Get all HBM bank's port 0 and port 1 node names.
Returns a list of strings.
"""
return [n.name for x in self.hbm_mc_nodes for pc in x for n in pc]
def get_hbm_mc_nodes(self, bank: int) -> list[str]:
"""Get an HBM bank's port 0 and port 1 node names.
Returns a list of strings.
"""
return [self.hbm_mc_nodes[bank // 2][bank % 2][p].name for p in range(2)]
def get_all_nmu_hbm_nodes(self) -> list[str]:
"""Get a list of all NMU HBM nodes' names.
Returns a list of strings.
"""
return [n.name for n in self.nmu_hbm_nodes]
def get_range_nmu_hbm_nodes(self, r: range) -> list[str]:
"""Get a list of a range of NMU HBM nodes' names.
Returns a list of strings.
"""
return [self.nmu_hbm_nodes[x].name for x in r]
# get edge helper functions
def get_all_edges(self) -> list[tuple[str, str]]:
"""Get a list of all edges without attributes.
Returns a list of tuples[str, str].
"""
return [(edge.src.name, edge.dest.name) for edge in self.edges]
def get_edge_tuple(self, src: Node, dest: Node) -> list[tuple[str, str]]:
"""Get one edge tuple from src Node to dest Node.
Returns a list of tuples[str, str].
"""
return [(src.name, dest.name)]
def get_bidir_edge_tuple(self, n1: Node, n2: Node) -> list[tuple[str, str]]:
"""Get two edge tuples between two Nodes.
Returns a list of tuples[str, str].
"""
return [(n1.name, n2.name), (n2.name, n1.name)]
def get_column_cross_slr_edges(self, col: int) -> list[tuple[str, str]]:
"""Get a list of edges crossing the SLR in a column.
Returns a list of tuples[str, str].
"""
edges = []
for r in range(self.num_slr - 1):
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][r * 4], self.nps_hnoc_nodes[col][r * 4 + 2]
)
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][r * 4 + 1], self.nps_hnoc_nodes[col][r * 4 + 3]
)
return edges
def get_column_entrance_edges(self, col: int) -> list[tuple[str, str]]:
"""Get a list of incoming/outgoing edges of each NMU/NSU group in a column.
Returns a list of tuples[str, str].
"""
edges = []
# add the edges of bottom vnoc NPS <-> SLR0 NPS
edges += self.get_bidir_edge_tuple(
self.nps_slr0_nodes[col][0], self.nps_vnoc_nodes[col][0]
)
edges += self.get_bidir_edge_tuple(
self.nps_slr0_nodes[col][1], self.nps_vnoc_nodes[col][1]
)
# if there is HBM,
# add the edges of top vnoc NPS <-> HBM NCRB
if self.ncrb_hbm_nodes:
nps_vnoc_top_y = sum(self.rows_per_slr) * 2 - 2
# incoming
edges += self.get_edge_tuple(
src=self.ncrb_hbm_nodes[col][0],
dest=self.nps_vnoc_nodes[col][nps_vnoc_top_y],
)
edges += self.get_edge_tuple(
src=self.ncrb_hbm_nodes[col][0],
dest=self.nps_vnoc_nodes[col][nps_vnoc_top_y + 1],
)
# outgoing
edges += self.get_edge_tuple(
src=self.nps_vnoc_nodes[col][nps_vnoc_top_y],
dest=self.ncrb_hbm_nodes[col][1],
)
edges += self.get_edge_tuple(
src=self.nps_vnoc_nodes[col][nps_vnoc_top_y + 1],
dest=self.ncrb_hbm_nodes[col][1],
)
# add the edges of hnoc NPS <-> vnoc NPS for each SLR
for slr in range(self.num_slr - 1):
lower_y_idx = self.rows_per_slr[slr] * 2 - 2
upper_y_idx = self.rows_per_slr[slr] * 2
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][slr * 4],
self.nps_vnoc_nodes[col][lower_y_idx + 1],
)
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][slr * 4 + 1],
self.nps_vnoc_nodes[col][lower_y_idx],
)
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][slr * 4 + 2],
self.nps_vnoc_nodes[col][upper_y_idx + 1],
)
edges += self.get_bidir_edge_tuple(
self.nps_hnoc_nodes[col][slr * 4 + 3],
self.nps_vnoc_nodes[col][upper_y_idx],
)
return edges