Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 124 additions & 6 deletions Atif/Gsoc-HealingStones/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,77 @@ def __init__(self):
'red': {'min': [100, 0, 0], 'max': [255, 100, 100]}
}

# ------------------------------------------------------------------ #
# HIGH PRIORITY DEFENSIVE CHECKS #
# ------------------------------------------------------------------ #

MAX_FILE_SIZE_MB = 500
MIN_FILE_SIZE_BYTES = 80 # A valid PLY header is at least ~80 bytes

def _validate_path(self, filepath: Path) -> Tuple[List[str], List[str]]:
"""
Check that the path exists, is a regular file, has a .ply extension,
and is readable.

Returns:
(errors, warnings) — errors are fatal; warnings are informational.
"""
errors: List[str] = []
warnings: List[str] = []

if not filepath.exists():
errors.append(f"File does not exist: {filepath}")
return errors, warnings # Nothing more to check

if not filepath.is_file():
errors.append(f"Path is not a regular file: {filepath}")
return errors, warnings

if filepath.suffix.lower() != '.ply':
errors.append(
f"Unexpected file extension '{filepath.suffix}' — expected '.ply'"
)

if not os.access(filepath, os.R_OK):
errors.append(f"File is not readable (permission denied): {filepath}")

return errors, warnings

def _check_file_size(self, filepath: Path) -> Tuple[List[str], List[str]]:
"""
Reject files that are suspiciously small (likely corrupt/empty) and
warn on files so large they may exhaust memory.

Returns:
(errors, warnings)
"""
errors: List[str] = []
warnings: List[str] = []

try:
size_bytes = filepath.stat().st_size
except OSError as exc:
errors.append(f"Could not stat file: {exc}")
return errors, warnings

size_mb = size_bytes / (1024 ** 2)

if size_bytes < self.MIN_FILE_SIZE_BYTES:
errors.append(
f"File is suspiciously small ({size_bytes} bytes) — "
"likely corrupt or empty"
)

if size_mb > self.MAX_FILE_SIZE_MB:
warnings.append(
f"File is very large ({size_mb:.1f} MB) — "
"processing may be slow or exhaust memory"
)

return errors, warnings

# ------------------------------------------------------------------ #

def validate_ply_file(self, filepath: str) -> Dict:
"""
Comprehensive validation of a single PLY file
Expand All @@ -37,7 +108,23 @@ def validate_ply_file(self, filepath: str) -> Dict:
'errors': [],
'statistics': {}
}


# --- HIGH 1: path / extension / permission checks ---
path_errors, path_warnings = self._validate_path(filepath)
results['errors'].extend(path_errors)
results['warnings'].extend(path_warnings)
if path_errors:
results['valid'] = False
return results

# --- HIGH 2: file size checks ---
size_errors, size_warnings = self._check_file_size(filepath)
results['errors'].extend(size_errors)
results['warnings'].extend(size_warnings)
if size_errors:
results['valid'] = False
return results

try:
# Load mesh
mesh = o3d.io.read_triangle_mesh(str(filepath))
Expand Down Expand Up @@ -127,8 +214,9 @@ def _analyze_colors(self, colors: np.ndarray) -> Dict:
'color': center.astype(int).tolist(),
'percentage': float(cluster_size / len(colors) * 100)
})
except:
pass
except Exception as e:
analysis['dominant_colors'] = []
analysis['kmeans_error'] = str(e)

return analysis

Expand Down Expand Up @@ -347,13 +435,23 @@ def clean_mesh(self, mesh: o3d.geometry.TriangleMesh) -> o3d.geometry.TriangleMe

def normalize_mesh_scale(self, mesh: o3d.geometry.TriangleMesh, target_size: float = 1.0) -> o3d.geometry.TriangleMesh:
"""Normalize mesh to a standard scale"""
# --- MEDIUM 2: guard against zero extent and invalid target_size ---
if target_size <= 0:
raise ValueError(
f"target_size must be a positive number, got {target_size}"
)

bbox = mesh.get_axis_aligned_bounding_box()
extent = bbox.get_extent()
max_extent = np.max(extent)

max_extent = float(np.max(extent))

if max_extent == 0.0:
print("Warning: Mesh has zero bounding-box extent — skipping normalization")
return mesh

scale_factor = target_size / max_extent
mesh.scale(scale_factor, center=mesh.get_center())

print(f"Scaled mesh by factor {scale_factor:.3f}")
return mesh

Expand Down Expand Up @@ -410,6 +508,26 @@ def preprocess_file(self, input_path: str, output_path: str,
enhance_colors: bool = True) -> bool:
"""Preprocess a single PLY file"""
try:
input_path_obj = Path(input_path)
output_path_obj = Path(output_path)

# --- MEDIUM 1a: prevent overwriting the source file ---
if input_path_obj.resolve() == output_path_obj.resolve():
print(
f"Error: Output path is the same as input path ({input_path}). "
"Aborting to avoid data loss."
)
return False

# --- MEDIUM 1b: ensure output directory exists and is writable ---
output_dir = output_path_obj.parent
output_dir.mkdir(parents=True, exist_ok=True)
if not os.access(output_dir, os.W_OK):
print(
f"Error: No write permission to output directory: {output_dir}"
)
return False

print(f"Preprocessing {input_path}...")

# Load mesh
Expand Down
63 changes: 45 additions & 18 deletions Atif/Gsoc-HealingStones/feature_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ def compute_surface_curvature(self, point_cloud, radius=0.02):
radius: Radius for local curvature estimation
"""
try:
# Ensure normals are computed
# Work on a copy so we never mutate the caller's point cloud
# in-place (estimate_normals modifies the object permanently).
point_cloud = o3d.geometry.PointCloud(point_cloud)
if not point_cloud.has_normals():
point_cloud.estimate_normals()

Expand Down Expand Up @@ -106,33 +108,39 @@ def compute_boundary_features(self, points):
return {
'boundary_length': boundary_length,
'compactness': compactness,
'boundary_points': boundary_points,
# Convert to plain Python list so this dict is JSON-serialisable
# and consistent with every other field returned by this class.
'boundary_points': boundary_points.tolist(),
'num_boundary_points': len(boundary_points)
}
except:
except Exception as e:
return {
'boundary_length': 0,
'compactness': 0,
'boundary_points': np.array([]),
'num_boundary_points': 0
'boundary_points': [],
'num_boundary_points': 0,
'boundary_error': str(e)
}

def compute_geometric_moments(self, points):
"""Compute geometric moments for shape description"""
if len(points) == 0:
return {}

# Center the points
centroid = np.mean(points, axis=0)
centered_points = points - centroid


# Compute moments
moments = {}

# Second moments (covariance)
cov_matrix = np.cov(centered_points.T)
eigenvals, eigenvecs = np.linalg.eigh(cov_matrix)


# Second moments via PCA — consistent with compute_surface_normal
# and avoids manually building the covariance matrix + eigh.
pca = PCA(n_components=min(3, points.shape[1]))
pca.fit(points)
# explained_variance_ == eigenvalues of the covariance matrix
eigenvals = pca.explained_variance_
eigenvecs = pca.components_ # shape (n_components, n_features)

moments['eigenvalues'] = eigenvals.tolist()
moments['principal_axes'] = eigenvecs.tolist()
moments['shape_ratio'] = eigenvals[1] / eigenvals[0] if eigenvals[0] > 1e-10 else 0
Expand Down Expand Up @@ -226,8 +234,27 @@ def extract_all_features(self, fragments):

def visualize_surface_features(self, fragment, color, surface_idx):
"""Visualize features of a specific break surface"""
if color not in fragment['break_surfaces'] or surface_idx >= len(fragment['break_surfaces'][color]):
print("Invalid surface specified")
# Guard 1: break_surfaces key / colour presence
if color not in fragment.get('break_surfaces', {}):
print(f"No break surfaces found for colour '{color}'")
return
if surface_idx >= len(fragment['break_surfaces'][color]):
print(
f"surface_idx {surface_idx} out of range — "
f"only {len(fragment['break_surfaces'][color])} surface(s) for '{color}'"
)
return
# Guard 2: features must have been extracted before visualising
if color not in fragment.get('features', {}):
print(
f"Features have not been extracted for colour '{color}'. "
"Run extract_all_features() first."
)
return
if surface_idx >= len(fragment['features'][color]):
print(
f"Feature entry missing for surface_idx {surface_idx} of '{color}'"
)
return

surface = fragment['break_surfaces'][color][surface_idx]
Expand All @@ -253,10 +280,10 @@ def visualize_surface_features(self, fragment, color, surface_idx):
normal[0], normal[1], normal[2],
length=0.05, color='black', arrow_length_ratio=0.1)

# Boundary points if available
# Boundary points if available (stored as plain list — convert for numpy ops)
if len(features['boundary_points']) > 0:
boundary = features['boundary_points']
ax1.scatter(boundary[:, 0], boundary[:, 1], boundary[:, 2],
boundary = np.array(features['boundary_points'])
ax1.scatter(boundary[:, 0], boundary[:, 1], boundary[:, 2],
c='red', s=50, alpha=0.8)

# Feature summary
Expand Down