diff --git a/quactography/classical/io.py b/quactography/classical/io.py new file mode 100644 index 0000000..81e58c9 --- /dev/null +++ b/quactography/classical/io.py @@ -0,0 +1,40 @@ +import numpy as np + +def save_graph(G, output_base, copies=1): + """ + Save the structure of a 2D grid graph into compressed `.npz` files. + + Parameters + ---------- + G : networkx.Graph + A 2D grid graph where each node is represented as a tuple `(x, y)`. + The graph typically connects nodes to their 4-neighbors (up, down, left, right). + Nodes corresponding to obstacles should already be removed from the graph. + output_base : str + Base name for the output files. Each saved file will be named as + "_.npz". + copies : int, optional (default=1) + Number of identical copies of the graph to save. + + Returns + ------- + None + Files are saved to disk in NumPy's compressed `.npz` format, containing: + - 'nodes': array of node coordinates + - 'edges': array of edge pairs (as tuples of coordinates) + + Example + ------- + >>> save_graph(G, "grid_graph", copies=3) + Saves: grid_graph_0.npz, grid_graph_1.npz, grid_graph_2.npz + """ + for i in range(copies): + output_file = f"{output_base}_{i}.npz" + nodes = list(G.nodes()) + edges = list(G.edges()) + + nodes_array = np.array(nodes) + edges_array = np.array(edges) + + np.savez(output_file, nodes=nodes_array, edges=edges_array) + print(f"Copy {i+1}/{copies} saved as '{output_file}'.") diff --git a/quactography/classical/utils/Dijkstra.py b/quactography/classical/utils/Dijkstra.py new file mode 100644 index 0000000..0119621 --- /dev/null +++ b/quactography/classical/utils/Dijkstra.py @@ -0,0 +1,52 @@ +def dijkstra_stepwise(G, start, target, diagonal_mode="nondiagonal"): + start_time = time.time() + distances = {node: float('inf') for node in G.nodes()} + distances[start] = 0 + previous_nodes = {node: None for node in G.nodes()} + evaluated_nodes = [] + path_to_current = [] + priority_queue = [(0, start)] + heapq.heapify(priority_queue) + + while priority_queue: + current_distance, current_node = heapq.heappop(priority_queue) + if current_node not in evaluated_nodes: + evaluated_nodes.append(current_node) + + # Reconstruction du chemin actuel + temp_path = [] + node = current_node + while node is not None: + temp_path.append(node) + if node in previous_nodes: # ✅ Vérification pour éviter KeyError + node = previous_nodes[node] + else: + break # ✅ Stopper si le nœud n'est pas connu + temp_path.reverse() + path_to_current.append(temp_path) + + if current_node == target: + break + + if diagonal_mode == "diagonal": + neighbors = list(get_neighbors_diagonal(current_node, G)) + else: + neighbors = list(G.neighbors(current_node)) + + for neighbor in neighbors: + if neighbor not in evaluated_nodes: + edge_weight = G[current_node][neighbor].get("weight", 1) # Récupérer le poids réel + new_distance = current_distance + edge_weight + if new_distance < distances[neighbor]: + distances[neighbor] = new_distance + previous_nodes[neighbor] = current_node + heapq.heappush(priority_queue, (new_distance, neighbor)) + + if distances[target] == float('inf'): + print("⚠️ Aucun chemin trouvé entre le point de départ et l'arrivée.") + return None, None # Retourner None pour indiquer l'absence de chemin + + end_time = time.time() + execution_time = end_time - start_time + print(f"Execution time of Dijkstra: {execution_time:.4f} secondes") + return evaluated_nodes, path_to_current, current_distance diff --git a/quactography/classical/utils/random_grid_generator.py b/quactography/classical/utils/random_grid_generator.py new file mode 100644 index 0000000..d905570 --- /dev/null +++ b/quactography/classical/utils/random_grid_generator.py @@ -0,0 +1,54 @@ +import numpy as np +import random +import networkx as nx + +def generer_grille(size, obstacle_mode="ratio", obstacle_ratio=0.2, obstacle_number=20): + """ + Generate a random 2D grid and its corresponding NetworkX graph. + + Parameters + ---------- + size : int + Size of the grid (the grid will be of shape `size x size`). + Must be a positive integer. + obstacle_mode : str + Strategy used to place obstacles in the grid. Options are: + - "ratio": place a proportion of obstacles based on `obstacle_ratio` + - "number": place a fixed number of obstacles based on `obstacle_number` + obstacle_ratio : float + Used only if `obstacle_mode` is "ratio". Defines the proportion of cells + to be turned into obstacles. Must be a float between 0 and 1. + obstacle_number : int + Used only if `obstacle_mode` is "number". Defines the exact number of obstacles + to place in the grid. Must be a positive integer. + + Returns + ------- + grid : np.ndarray + A 2D NumPy array of shape `(size, size)` representing the grid, where `1` + denotes an obstacle and `0` a free cell. + G : networkx.Graph + A 2D grid graph where each node is a tuple `(x, y)`. Edges connect 4-neighboring + nodes (up, down, left, right). Nodes corresponding to obstacles are removed. + """ + n = size + grid = np.zeros((n, n)) + G = nx.grid_2d_graph(n, n) + + obstacles = set() + + if obstacle_mode == "ratio": + num_obstacles = int(n * n * obstacle_ratio) + else: + num_obstacles = obstacle_number + + while len(obstacles) < num_obstacles: + x, y = random.randint(0, n - 1), random.randint(0, n - 1) + if (x, y) != (0, 0) and (x, y) != (n - 1, n - 1): + obstacles.add((x, y)) + grid[x, y] = 1 # Add an obstacle + if (x, y) in G: + G.remove_node((x, y)) + + return grid, G + diff --git a/scripts/generate_random_grids.py b/scripts/generate_random_grids.py new file mode 100644 index 0000000..3fa4808 --- /dev/null +++ b/scripts/generate_random_grids.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Generate random grids/graphs with obstacles. +This script can optionally suppress the output +display of the grid and graph nodes. +The generated graphs are saved as .npz files. +""" + +import argparse +import sys + +from quactography.classical.utils.random_grid_generator import generate_grid +from quactography.classical.io import save_graph + +def _build_arg_parser(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter + ) + + parser.add_argument( + '--size', type=int, default=10, + help="Size of the grid (the grid will be of shape size x size)." + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + '--ratio', type=float, + help="Ratio of obstacles (e.g., 0.2 = 20%)." + ) + group.add_argument( + '--number', type=int, + help="Exact number of obstacles." + ) + + parser.add_argument( + '--output', required=True, + help="Output format: 'filename.npz;'. " + "Generates files like 'filename_0.npz', etc." + ) + + parser.add_argument( + '--save_only', action='store_true', + help="If set, suppresses grid and node outputs." + ) + + return parser + + +def parse_output_arg(output_str): + try: + file, number_str = output_str.split(';') + number = int(number_str) + if number <= 0: + raise ValueError("The number of files must be greater than 0.") + return file, number + except ValueError: + raise ValueError( + f"Invalid output format: '{output_str}'. " + "Expected format is 'filename.npz;'." + ) + + +def main(): + parser = _build_arg_parser() + args = parser.parse_args() + + try: + file, number = parse_output_arg(args.output) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + mode = 'ratio' if args.ratio is not None else 'number' + value = args.ratio if args.ratio is not None else args.number + + if mode == 'ratio' and not (0 <= value <= 1): + print("Error: Ratio must be between 0 and 1.", file=sys.stderr) + sys.exit(1) + + for i in range(number): + grid, G = ( + generate_grid(args.size, 'ratio', value) + if mode == 'ratio' + else generate_grid(args.size, 'number', value, value) + ) + + save_graph(G, f"{file}_{i}.npz") + + if not args.save_only: + print(f"Graph {i + 1}/{number} saved as '{file}_{i}.npz'") + if not args.save_only: + print("Generated grid:") + print(grid) + print("Graph nodes:", list(G.nodes)) + + +if __name__ == "__main__": + main() diff --git a/scripts/shortest_pathfinder.py b/scripts/shortest_pathfinder.py new file mode 100644 index 0000000..6b0cc0c --- /dev/null +++ b/scripts/shortest_pathfinder.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Find the shortest path between 2 points +in a graph using Dijkstra or A* algorithm. +Supports graphs loaded from JSON or NPZ files, +and optionally allows diagonal movement. +""" + +import argparse +import os +import sys +from quactography.classical.utils.random_Dijkstra import dijkstra_stepwise +from quactography.classical.utils.random_Astar import astar_stepwise, heuristic +from quactography.classical.io import load_the_graph + + +def _build_arg_parser(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "--in_graph", type=str, required=True, + help="Path to the input graph file (.json or .npz)" + ) + parser.add_argument( + "--shortestpath", choices=['Dijkstra', 'A*'], default='Dijkstra', + help="Shortest path algorithm to use: 'Dijkstra' or 'A*'" + ) + parser.add_argument( + "--start", type=str, required=True, + help="Start node, e.g. '3,4'" + ) + parser.add_argument( + "--target", type=str, required=True, + help="Target node, e.g. '7,8'" + ) + parser.add_argument( + "--diagonal_mode", choices=['diagonal', 'nondiagonal'], + default='nondiagonal', + help="Allow diagonal movement or not" + ) + return parser + + +def parse_node(node_str): + try: + parts = node_str.strip().split(',') + return tuple(int(p) for p in parts if p.strip() != '') + except ValueError as e: + raise ValueError( + f"Invalid node format: '{node_str}' (expected format: x,y)" + ) from e + + +def main(): + parser = _build_arg_parser() + args = parser.parse_args() + + if not os.path.isfile(args.in_graph): + print(f"Error: File '{args.in_graph}' not found.") + sys.exit(1) + + try: + start = parse_node(args.start) + target = parse_node(args.target) + except ValueError as e: + print(f"Error parsing node: {e}") + sys.exit(1) + + G = load_the_graph(args.in_graph) + + if start not in G.nodes(): + print(f"Start node {start} not in graph.") + print(f"Available nodes: {list(G.nodes())[:5]}...") + sys.exit(1) + + if target not in G.nodes(): + print(f"Target node {target} not in graph.") + print(f"Available nodes: {list(G.nodes())[:5]}...") + sys.exit(1) + + print( + f"Finding shortest path from {start} to {target} using {args.shortestpath}..." + ) + + if args.shortestpath == "Dijkstra": + evaluated_nodes, path_history, path_cost = dijkstra_stepwise( + G, start, target, args.diagonal_mode + ) + else: + evaluated_nodes, path_history, path_cost = astar_stepwise( + G, start, target, args.diagonal_mode + ) + + if path_history is None: + print("⚠️ Aucun chemin trouvé.") + sys.exit(0) + + shortest_path = [tuple(int(x) for x in n) for n in path_history[-1]] + + print("\nShortest path:") + print(" → ".join(map(str, shortest_path))) + print(f"Path cost: {path_cost:.2f}") + print(f"Nodes evaluated: {len(evaluated_nodes)}") + + +if __name__ == "__main__": + main()