Skip to content

Commit e5a1a66

Browse files
committed
Drop remaining aggregated data stream code
1 parent a31a595 commit e5a1a66

File tree

9 files changed

+11
-112
lines changed

9 files changed

+11
-112
lines changed

ui/public/scenarios.json

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,7 @@
44
"name": "Small",
55
"topology": "topologies/example.yaml",
66
"duration": 120,
7-
"trace": "traces/example-nocpu.jsonl",
8-
"aggregated": false
9-
},
10-
{
11-
"name": "1x",
12-
"topology": "topologies/thousand.yaml",
13-
"duration": 300,
14-
"trace": "traces/1xsummary.jsonl.gz",
15-
"aggregated": true
16-
},
17-
{
18-
"name": "10x",
19-
"topology": "topologies/thousand.yaml",
20-
"duration": 300,
21-
"trace": "traces/10xsummary.jsonl.gz",
22-
"aggregated": true
23-
},
24-
{
25-
"name": "100x",
26-
"topology": "topologies/thousand.yaml",
27-
"duration": 300,
28-
"trace": "traces/100xsummary.jsonl.gz",
29-
"aggregated": true
7+
"trace": "traces/example-nocpu.jsonl"
308
}
319
]
3210
}

ui/public/traces/100xsummary.jsonl.gz

Lines changed: 0 additions & 3 deletions
This file was deleted.

ui/public/traces/10xsummary.jsonl.gz

Lines changed: 0 additions & 3 deletions
This file was deleted.

ui/public/traces/1xsummary.jsonl.gz

Lines changed: 0 additions & 3 deletions
This file was deleted.

ui/src/components/Sim/hooks/useStreamMessagesHandler.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import StreamWorker from "./worker?worker";
55

66
export const useStreamMessagesHandler = () => {
77
const {
8-
state: { tracePath, aggregated, speedMultiplier },
8+
state: { tracePath },
99
dispatch,
1010
} = useSimContext();
1111
const [streaming, setStreaming] = useState(false);
@@ -26,12 +26,10 @@ export const useStreamMessagesHandler = () => {
2626
worker.postMessage({
2727
type: "START",
2828
tracePath,
29-
aggregated,
30-
speedMultiplier,
3129
includeTransactions,
3230
});
3331
},
34-
[worker, tracePath, aggregated, speedMultiplier, dispatch],
32+
[worker, tracePath, dispatch],
3533
);
3634

3735
const stopStream = useCallback(() => {

ui/src/components/Sim/hooks/worker.ts

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import {
2-
ISimulationAggregatedDataState,
3-
} from "@/contexts/SimContext/types";
41
import * as cbor from "cborg";
52
import type { ReadableStream } from "stream/web";
63
import { IServerMessage, EServerMessageType } from "../types";
@@ -9,13 +6,6 @@ export type TWorkerRequest =
96
| {
107
type: "START";
118
tracePath: string;
12-
aggregated: true;
13-
speedMultiplier: number;
14-
}
15-
| {
16-
type: "START";
17-
tracePath: string;
18-
aggregated?: false;
199
includeTransactions?: boolean;
2010
}
2111
| { type: "STOP" };
@@ -172,35 +162,6 @@ const consumeStream = async (
172162
postMessage({ type: "DONE", tracePath });
173163
};
174164

175-
// TODO: unused
176-
const consumeAggregateStream = async (
177-
stream: ReadableStream<ISimulationAggregatedDataState>,
178-
tracePath: string,
179-
speedMultiplier: number,
180-
) => {
181-
let lastTimestamp = 0;
182-
for await (const aggregatedData of stream) {
183-
const nodes = new Map();
184-
for (const [id, stats] of Object.entries(aggregatedData.nodes)) {
185-
nodes.set(id, stats);
186-
}
187-
aggregatedData.nodes = nodes;
188-
189-
const elapsedMs = (aggregatedData.progress - lastTimestamp) * 1000;
190-
lastTimestamp = aggregatedData.progress;
191-
await new Promise((resolve) =>
192-
setTimeout(resolve, elapsedMs / speedMultiplier),
193-
);
194-
195-
// TODO: re-create when needed
196-
// postMessage({
197-
// type: "EVENT",
198-
// tracePath,
199-
// aggregatedData,
200-
// } as TWorkerResponse);
201-
}
202-
postMessage({ type: "DONE", tracePath });
203-
};
204165

205166
let controller = new AbortController();
206167
onmessage = (e: MessageEvent<TWorkerRequest>) => {
@@ -211,32 +172,13 @@ onmessage = (e: MessageEvent<TWorkerRequest>) => {
211172
}
212173

213174
controller = new AbortController();
214-
if (request.aggregated) {
215-
createEventStream<ISimulationAggregatedDataState>(
216-
request.tracePath,
217-
controller.signal,
175+
createEventStream<IServerMessage>(request.tracePath, controller.signal)
176+
.then((stream) =>
177+
consumeStream(stream, request.tracePath, request.includeTransactions),
218178
)
219-
.then((stream) =>
220-
consumeAggregateStream(
221-
stream,
222-
request.tracePath,
223-
request.speedMultiplier,
224-
),
225-
)
226-
.catch((err) => {
227-
if (err.name !== "AbortError") {
228-
throw err;
229-
}
230-
});
231-
} else {
232-
createEventStream<IServerMessage>(request.tracePath, controller.signal)
233-
.then((stream) =>
234-
consumeStream(stream, request.tracePath, request.includeTransactions),
235-
)
236-
.catch((err) => {
237-
if (err.name !== "AbortError") {
238-
throw err;
239-
}
240-
});
241-
}
179+
.catch((err) => {
180+
if (err.name !== "AbortError") {
181+
throw err;
182+
}
183+
});
242184
};

ui/src/contexts/SimContext/context.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ export const defaultAggregatedData: ISimulationAggregatedDataState = {
2424
export const defaultState: ISimContextState = {
2525
allScenarios: [],
2626
activeScenario: "",
27-
aggregated: true,
2827
graph: {
2928
canvasRef: { current: null },
3029
canvasOffsetX: 0,

ui/src/contexts/SimContext/reducer.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ export const reducer = (
1616
activeScenario: scenario.name,
1717
maxTime: scenario.duration,
1818
tracePath: scenario.trace,
19-
aggregated: scenario.aggregated,
2019
topologyPath: scenario.topology,
2120
};
2221
}
@@ -34,7 +33,6 @@ export const reducer = (
3433
activeScenario: scenario.name,
3534
maxTime: scenario.duration,
3635
tracePath: scenario.trace,
37-
aggregated: scenario.aggregated,
3836
topologyPath: scenario.topology,
3937
topologyLoaded:
4038
state.topologyLoaded && scenario.topology === state.topologyPath,
@@ -60,9 +58,6 @@ export const reducer = (
6058
}
6159

6260

63-
case "SET_AGGREGATED_DATA": {
64-
return { ...state, aggregatedData: action.payload };
65-
}
6661

6762
case "SET_CANVAS_PROPS": {
6863
return {

ui/src/contexts/SimContext/types.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ export interface IScenario {
7575
topology: string;
7676
duration: number;
7777
trace: string;
78-
aggregated: boolean;
7978
}
8079

8180
export interface ISimContextState {
@@ -84,7 +83,6 @@ export interface ISimContextState {
8483
graph: IGraphContextState;
8584
aggregatedData: ISimulationAggregatedDataState;
8685
tracePath: string;
87-
aggregated: boolean;
8886
topography: ITransformedNodeMap;
8987
topologyPath: string;
9088
topologyLoaded: boolean;
@@ -113,8 +111,6 @@ export type TSimContextActions =
113111
canvasOffsetY: ((prev: number) => number) | number;
114112
}>;
115113
}
116-
// TODO: unused
117-
| { type: "SET_AGGREGATED_DATA"; payload: ISimulationAggregatedDataState }
118114
| {
119115
type: "BATCH_UPDATE";
120116
payload: Partial<ISimContextState>;

0 commit comments

Comments
 (0)