Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions quactography/classical/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy as np

def save_graph(G, output_base, copies=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chaque fonction devrait avoir une documentation qui décrit ce que fait la fonction, les paramètres et les retours

"""
Save the structure of a 2D grid graph into compressed `.npz` files.

Parameters
----------
G : networkx.Graph
A 2D grid graph where each node is represented as a tuple `(x, y)`.
The graph typically connects nodes to their 4-neighbors (up, down, left, right).
Nodes corresponding to obstacles should already be removed from the graph.
output_base : str
Base name for the output files. Each saved file will be named as
"<output_base>_<index>.npz".
copies : int, optional (default=1)
Number of identical copies of the graph to save.

Returns
-------
None
Files are saved to disk in NumPy's compressed `.npz` format, containing:
- 'nodes': array of node coordinates
- 'edges': array of edge pairs (as tuples of coordinates)

Example
-------
>>> save_graph(G, "grid_graph", copies=3)
Saves: grid_graph_0.npz, grid_graph_1.npz, grid_graph_2.npz
"""
for i in range(copies):
output_file = f"{output_base}_{i}.npz"
nodes = list(G.nodes())
edges = list(G.edges())

nodes_array = np.array(nodes)
edges_array = np.array(edges)

np.savez(output_file, nodes=nodes_array, edges=edges_array)
print(f"Copy {i+1}/{copies} saved as '{output_file}'.")
52 changes: 52 additions & 0 deletions quactography/classical/utils/Dijkstra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
def dijkstra_stepwise(G, start, target, diagonal_mode="nondiagonal"):
start_time = time.time()
distances = {node: float('inf') for node in G.nodes()}
distances[start] = 0
previous_nodes = {node: None for node in G.nodes()}
evaluated_nodes = []
path_to_current = []
priority_queue = [(0, start)]
heapq.heapify(priority_queue)

while priority_queue:
current_distance, current_node = heapq.heappop(priority_queue)
if current_node not in evaluated_nodes:
evaluated_nodes.append(current_node)

# Reconstruction du chemin actuel
temp_path = []
node = current_node
while node is not None:
temp_path.append(node)
if node in previous_nodes: # ✅ Vérification pour éviter KeyError
node = previous_nodes[node]
else:
break # ✅ Stopper si le nœud n'est pas connu
temp_path.reverse()
path_to_current.append(temp_path)

if current_node == target:
break

if diagonal_mode == "diagonal":
neighbors = list(get_neighbors_diagonal(current_node, G))
else:
neighbors = list(G.neighbors(current_node))

for neighbor in neighbors:
if neighbor not in evaluated_nodes:
edge_weight = G[current_node][neighbor].get("weight", 1) # Récupérer le poids réel
new_distance = current_distance + edge_weight
if new_distance < distances[neighbor]:
distances[neighbor] = new_distance
previous_nodes[neighbor] = current_node
heapq.heappush(priority_queue, (new_distance, neighbor))

if distances[target] == float('inf'):
print("⚠️ Aucun chemin trouvé entre le point de départ et l'arrivée.")
return None, None # Retourner None pour indiquer l'absence de chemin

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time of Dijkstra: {execution_time:.4f} secondes")
return evaluated_nodes, path_to_current, current_distance
54 changes: 54 additions & 0 deletions quactography/classical/utils/random_grid_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import numpy as np
import random
import networkx as nx

def generer_grille(size, obstacle_mode="ratio", obstacle_ratio=0.2, obstacle_number=20):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anglais

"""
Generate a random 2D grid and its corresponding NetworkX graph.

Parameters
----------
size : int
Size of the grid (the grid will be of shape `size x size`).
Must be a positive integer.
obstacle_mode : str
Strategy used to place obstacles in the grid. Options are:
- "ratio": place a proportion of obstacles based on `obstacle_ratio`
- "number": place a fixed number of obstacles based on `obstacle_number`
obstacle_ratio : float
Used only if `obstacle_mode` is "ratio". Defines the proportion of cells
to be turned into obstacles. Must be a float between 0 and 1.
obstacle_number : int
Used only if `obstacle_mode` is "number". Defines the exact number of obstacles
to place in the grid. Must be a positive integer.

Returns
-------
grid : np.ndarray
A 2D NumPy array of shape `(size, size)` representing the grid, where `1`
denotes an obstacle and `0` a free cell.
G : networkx.Graph
A 2D grid graph where each node is a tuple `(x, y)`. Edges connect 4-neighboring
nodes (up, down, left, right). Nodes corresponding to obstacles are removed.
"""
n = size
grid = np.zeros((n, n))
G = nx.grid_2d_graph(n, n)

obstacles = set()

if obstacle_mode == "ratio":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tu pourrais détecter automatiquement ton mode en fonction de la valeur de obstacle num. Si 0<=num<1 c'est ratio, si num >= 1 c'est le count.

num_obstacles = int(n * n * obstacle_ratio)
else:
num_obstacles = obstacle_number

while len(obstacles) < num_obstacles:
x, y = random.randint(0, n - 1), random.randint(0, n - 1)
if (x, y) != (0, 0) and (x, y) != (n - 1, n - 1):
obstacles.add((x, y))
grid[x, y] = 1 # Add an obstacle
if (x, y) in G:
G.remove_node((x, y))

return grid, G

101 changes: 101 additions & 0 deletions scripts/generate_random_grids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Generate random grids/graphs with obstacles.
This script can optionally suppress the output
display of the grid and graph nodes.
The generated graphs are saved as .npz files.
"""

import argparse
import sys

from quactography.classical.utils.random_grid_generator import generate_grid
from quactography.classical.io import save_graph

def _build_arg_parser():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter
)

parser.add_argument(
'--size', type=int, default=10,
help="Size of the grid (the grid will be of shape size x size)."
)

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--ratio', type=float,
help="Ratio of obstacles (e.g., 0.2 = 20%)."
)
group.add_argument(
'--number', type=int,
help="Exact number of obstacles."
)

parser.add_argument(
'--output', required=True,
help="Output format: 'filename.npz;<number>'. "
"Generates <number> files like 'filename_0.npz', etc."
)

parser.add_argument(
'--save_only', action='store_true',
help="If set, suppresses grid and node outputs."
)

return parser


def parse_output_arg(output_str):
try:
file, number_str = output_str.split(';')
number = int(number_str)
if number <= 0:
raise ValueError("The number of files must be greater than 0.")
return file, number
except ValueError:
raise ValueError(
f"Invalid output format: '{output_str}'. "
"Expected format is 'filename.npz;<number>'."
)


def main():
parser = _build_arg_parser()
args = parser.parse_args()

try:
file, number = parse_output_arg(args.output)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)

mode = 'ratio' if args.ratio is not None else 'number'
value = args.ratio if args.ratio is not None else args.number

if mode == 'ratio' and not (0 <= value <= 1):
print("Error: Ratio must be between 0 and 1.", file=sys.stderr)
sys.exit(1)

for i in range(number):
grid, G = (
generate_grid(args.size, 'ratio', value)
if mode == 'ratio'
else generate_grid(args.size, 'number', value, value)
)

save_graph(G, f"{file}_{i}.npz")

if not args.save_only:
print(f"Graph {i + 1}/{number} saved as '{file}_{i}.npz'")
if not args.save_only:
print("Generated grid:")
print(grid)
print("Graph nodes:", list(G.nodes))


if __name__ == "__main__":
main()
111 changes: 111 additions & 0 deletions scripts/shortest_pathfinder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Find the shortest path between 2 points
in a graph using Dijkstra or A* algorithm.
Supports graphs loaded from JSON or NPZ files,
and optionally allows diagonal movement.
"""

import argparse
import os
import sys
from quactography.classical.utils.random_Dijkstra import dijkstra_stepwise
from quactography.classical.utils.random_Astar import astar_stepwise, heuristic
from quactography.classical.io import load_the_graph


def _build_arg_parser():
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"--in_graph", type=str, required=True,
help="Path to the input graph file (.json or .npz)"
)
parser.add_argument(
"--shortestpath", choices=['Dijkstra', 'A*'], default='Dijkstra',
help="Shortest path algorithm to use: 'Dijkstra' or 'A*'"
)
parser.add_argument(
"--start", type=str, required=True,
help="Start node, e.g. '3,4'"
)
parser.add_argument(
"--target", type=str, required=True,
help="Target node, e.g. '7,8'"
)
parser.add_argument(
"--diagonal_mode", choices=['diagonal', 'nondiagonal'],
default='nondiagonal',
help="Allow diagonal movement or not"
)
return parser


def parse_node(node_str):
try:
parts = node_str.strip().split(',')
return tuple(int(p) for p in parts if p.strip() != '')
except ValueError as e:
raise ValueError(
f"Invalid node format: '{node_str}' (expected format: x,y)"
) from e


def main():
parser = _build_arg_parser()
args = parser.parse_args()

if not os.path.isfile(args.in_graph):
print(f"Error: File '{args.in_graph}' not found.")
sys.exit(1)

try:
start = parse_node(args.start)
target = parse_node(args.target)
except ValueError as e:
print(f"Error parsing node: {e}")
sys.exit(1)

G = load_the_graph(args.in_graph)

if start not in G.nodes():
print(f"Start node {start} not in graph.")
print(f"Available nodes: {list(G.nodes())[:5]}...")
sys.exit(1)

if target not in G.nodes():
print(f"Target node {target} not in graph.")
print(f"Available nodes: {list(G.nodes())[:5]}...")
sys.exit(1)

print(
f"Finding shortest path from {start} to {target} using {args.shortestpath}..."
)

if args.shortestpath == "Dijkstra":
evaluated_nodes, path_history, path_cost = dijkstra_stepwise(
G, start, target, args.diagonal_mode
)
else:
evaluated_nodes, path_history, path_cost = astar_stepwise(
G, start, target, args.diagonal_mode
)

if path_history is None:
print("⚠️ Aucun chemin trouvé.")
sys.exit(0)

shortest_path = [tuple(int(x) for x in n) for n in path_history[-1]]

print("\nShortest path:")
print(" → ".join(map(str, shortest_path)))
print(f"Path cost: {path_cost:.2f}")
print(f"Nodes evaluated: {len(evaluated_nodes)}")


if __name__ == "__main__":
main()