-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
309 lines (257 loc) · 14.9 KB
/
main.py
File metadata and controls
309 lines (257 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
### functions would go in this file. Edit as you see fit. ####
#pandas data analysis tool
import pandas as pd
#system function
import os
#read current path and needed fixed folder structure
from pathlib import Path
#3D interactive plot
import plotly.express as px
#static plot
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage
def load_exnode_data(folder_path_arg=None): # Renamed parameter to avoid conflict
# find all files are in the folder
if folder_path_arg is None:
project_root = Path(os.getcwd())
effective_folder_path = project_root.parent / "inputs"
else:
effective_folder_path = Path(folder_path_arg).parent / "inputs"
print(f"Using data folder path: {effective_folder_path}")
all_data_points = []
# Get a list of all .exnode files in the folder
file_lists = [f for f in os.listdir(effective_folder_path) if f.endswith('.exnode')]
if not file_lists:
print(f"No .exnode files found in {effective_folder_path}")
return pd.DataFrame() # Return an empty DataFrame if no files are found
else:
print(f"Found {len(file_lists)} .exnode files: {file_lists}")
for file_name in file_lists:
file_path_full = os.path.join(effective_folder_path, file_name)
print(f"\nProcessing file: {file_name}")
data_points_current_file = []
current_node_id = None
current_node_values = [] # To store x, y, z, avg_intensity sequentially
# error handle
try:
with open(file_path_full, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line.startswith('Node:'):
# check if its first node if not store it
if current_node_id is not None:
# load 4 values - check if 4 values are present before appending
if len(current_node_values) == 4:
data_points_current_file.append({
'file_source': file_name,
'node_id': current_node_id,
'x': current_node_values[0],
'y': current_node_values[1],
'z': current_node_values[2],
'avg_intensity': current_node_values[3]
})
else:
print(f"Warning in {file_name}: Node {current_node_id} has {len(current_node_values)} values, expected 4. Skipping this node's data.")
# Start new node
current_node_id = int(line.split(':')[1].strip())
current_node_values = [] # Reset for new node
elif current_node_id is not None and line != '': # We are inside a node's data block and line is not empty
# Assumes data lines are always valid floats
try:
value = float(line)
current_node_values.append(value)
except ValueError:
# The print statement's indentation was incorrect relative to the 'break'
print(f"Warning in {file_name}: Non-numeric or unexpected data '{line}' found for Node {current_node_id} at line {line_num}. Skipping value.")
except FileNotFoundError: # Added outer except blocks
print(f"Error: File {file_path_full} not found. Skipping.")
# After loop, process the last node's data for the current file
if current_node_id is not None:
# Assumes exactly 4 values are always present for the last node
if len(current_node_values) == 4:
data_points_current_file.append({
'file_source': file_name,
'node_id': current_node_id,
'x': current_node_values[0],
'y': current_node_values[1],
'z': current_node_values[2],
'avg_intensity': current_node_values[3]
})
else:
print(f"Warning in {file_name}: Last node {current_node_id} has {len(current_node_values)} values, expected 4. Skipping this node's data.")
all_data_points.extend(data_points_current_file)
print(f"Finished processing {file_name}. Added {len(data_points_current_file)} data points.")
# Create DataFrame from all collected data points
df = pd.DataFrame(all_data_points)
# Display some info
print("\n--- Combined DataFrame Info ---")
if not df.empty:
print("First 5 data points:")
print(df.head())
print("last 5 data points:")
print(df.tail())
print(f"\nTotal data points read from all files: {len(df)}")
else:
print("No data points were successfully read.")
return df
def plot_plotly_3d_scatter(df, x_col='x', y_col='y', z_col='z', color_col='avg_intensity', title='Interactive 3D Visualization of Data Points by Intensity (Plotly)'):
"""
Generates an interactive 3D scatter plot using plotly.express.
Args:
df (pd.DataFrame): The input DataFrame.
x_col (str): The column name for the x-axis. Defaults to 'x'.
y_col (str): The column name for the y-axis. Defaults to 'y'.
z_col (str): The column name for the z-axis. Defaults to 'z'.
color_col (str): The column name to use for coloring the points. Defaults to 'avg_intensity'.
title (str): The title of the plot. Defaults to 'Interactive 3D Visualization of Data Points by Intensity (Plotly)'.
"""
if df.empty:
print("DataFrame is empty. No data to visualize interactively.")
return
fig_interactive = px.scatter_3d(df,
x=x_col,
y=y_col,
z=z_col,
color=color_col,
title=title)
fig_interactive.show()
def plot_mpl(df, x_col='x', y_col='y', z_col='z', color_col='avg_intensity', title='3D Visualization of Data Points by Intensity (Matplotlib)'):
"""
Generates a static 3D scatter plot using matplotlib.
Args:
df (pd.DataFrame): The input DataFrame.
x_col (str): The column name for the x-axis. Defaults to 'x'.
y_col (str): The column name for the y-axis. Defaults to 'y'.
z_col (str): The column name for the z-axis. Defaults to 'z'.
color_col (str): The column name to use for coloring the points. Defaults to 'avg_intensity'.
title (str): The title of the plot. Defaults to '3D Visualization of Data Points by Intensity (Matplotlib)'.
"""
if not df.empty:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
scatter = ax.scatter(df[x_col], df[y_col], df[z_col], c=df[color_col], cmap='viridis', s=5)
ax.set_xlabel('X Coordinate')
ax.set_ylabel('Y Coordinate')
ax.set_zlabel('Z Coordinate')
ax.set_title(title)
# Add a color bar
cbar = fig.colorbar(scatter, ax=ax, pad=0.1)
cbar.set_label('Average Intensity')
plt.show()
else:
print("DataFrame is empty. No data to visualize.")
def perform_and_plot_all_kmeans_clusters(dataframe, n_clusters=5):
"""
Performs K-means clustering for various feature sets and visualizes the results.
Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
n_clusters (int): The number of clusters to use for K-means.
"""
print(f"Performing K-means clustering with {n_clusters} clusters...")
# 1. K-means Clustering (Avg Intensity Only)
kmeans_intensity = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
dataframe['intensity_cluster_label'] = kmeans_intensity.fit_predict(dataframe[['avg_intensity']])
print("Intensity K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='intensity_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - Avg Intensity Only)')
# 2. K-means Clustering (X, Y, Z Coordinates Only)
kmeans_xyz = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
dataframe['xyz_cluster_label'] = kmeans_xyz.fit_predict(dataframe[['x', 'y', 'z']])
print("Location (X,Y,Z) K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='xyz_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - X, Y, Z Coordinates Only)')
# 3. K-means Clustering (X, Y, Z, and Avg Intensity Combined)
kmeans_xyzi = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi_kmean = scaler_xyzi.fit_transform(features_xyzi)
#dataframe['xyzi_cluster_label'] = kmeans_xyzi.fit_predict(dataframe[['x', 'y', 'z', 'avg_intensity']])
dataframe['xyzi_cluster_label'] = kmeans_xyzi.fit_predict(scaled_features_xyzi_kmean)
print("Combined (X,Y,Z,Intensity) K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='xyzi_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - X, Y, Z, and Avg Intensity)')
print("All K-means clustering and plotting processes finished.")
def perform_and_plot_all_dbscan_clusters(dataframe, eps_val=0.5, min_samples_val=8):
"""
Performs DBSCAN clustering for various feature sets and visualizes the results.
Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
eps_val (float): The maximum distance between two samples for one to be considered
as in the neighborhood of the other.
min_samples_val (int): The number of samples (or total weight) in a neighborhood for
a point to be considered as a core point.
"""
print(f"Performing DBSCAN clustering with eps={eps_val} and min_samples={min_samples_val}...")
# 1. DBSCAN Clustering (X, Y, Z, and Avg Intensity Combined)
features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi = scaler_xyzi.fit_transform(features_xyzi)
dbscan_xyzi = DBSCAN(eps=eps_val, min_samples=min_samples_val)
dataframe['dbscan_xyzi_cluster_label'] = dbscan_xyzi.fit_predict(scaled_features_xyzi)
print("DBSCAN (X,Y,Z,Intensity) clustering completed.")
num_clusters_xyzi = len(set(dataframe['dbscan_xyzi_cluster_label'])) - (1 if -1 in dataframe['dbscan_xyzi_cluster_label'].values else 0)
num_noise_points_xyzi = (dataframe['dbscan_xyzi_cluster_label'] == -1).sum()
print(f"Number of clusters (Combined): {num_clusters_xyzi}, Noise points: {num_noise_points_xyzi}")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='dbscan_xyzi_cluster_label',
title=f'DBSCAN Clustering (eps={eps_val}, min_samples={min_samples_val} - X, Y, Z, and Avg Intensity Combined)')
print("All DBSCAN clustering and plotting processes finished.")
def perform_and_plot_all_agglomerative_clusters(dataframe, n_clusters=5, linkage='ward'):
"""
Performs Agglomerative Clustering for the combined feature set and visualizes the results.
Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
n_clusters (int): The number of clusters to form.
linkage (str): Which linkage criterion to use. E.g., 'ward', 'complete', 'average', 'single'.
"""
print(f"Performing Agglomerative Clustering with {n_clusters} clusters and linkage='{linkage}'...")
features_agglomerative = dataframe[['x', 'y', 'z', 'avg_intensity']]
# Scale the features
scaler = StandardScaler()
scaled_features_agglomerative = scaler.fit_transform(features_agglomerative)
# Apply Agglomerative Clustering
agg_clustering = AgglomerativeClustering(n_clusters=n_clusters, linkage=linkage)
dataframe[f'agglomerative_cluster_label_{n_clusters}'] = agg_clustering.fit_predict(scaled_features_agglomerative)
print("Agglomerative Clustering applied using 'x', 'y', 'z', and 'avg_intensity' features.")
num_clusters_agglomerative = len(set(dataframe[f'agglomerative_cluster_label_{n_clusters}']))
print("Number of clusters found:", num_clusters_agglomerative)
# Visualize the clusters
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col=f'agglomerative_cluster_label_{n_clusters}',
title=f'Agglomerative Clustering ({n_clusters} Clusters, Linkage: {linkage} - X, Y, Z, and Avg Intensity)')
print("Agglomerative clustering and plotting process finished.")
def plot_agglomerative_dendrogram(dataframe, linkage_method='ward', p_val=30):
"""
Generates and plots a dendrogram for Agglomerative Clustering using combined features.
Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
linkage (str): Which linkage criterion to use for the dendrogram.
p_val (int): The number of last merged clusters to show when truncating the dendrogram.
"""
print(f"\nGenerating dendrogram for Combined (X,Y,Z,Intensity) Agglomerative Clustering with linkage='{linkage}'...")
features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi = scaler_xyzi.fit_transform(features_xyzi)
linkage_matrix = linkage(scaled_features_xyzi, method=linkage_method)
plt.figure(figsize=(15, 7))
plt.title(f'Hierarchical Clustering Dendrogram (Linkage: {linkage_method} - X, Y, Z, Avg Intensity)')
plt.xlabel('Sample Index or Cluster Size')
plt.ylabel('Distance')
dendrogram(
linkage_matrix,
leaf_rotation=90., # rotates the x axis labels
leaf_font_size=8., # font size for the x axis labels
truncate_mode='lastp', # show only the last p merged clusters
p=p_val, # show only the last p merged clusters
show_leaf_counts=True
)
plt.show()
print("Dendrogram generated.")