diff --git a/.gitignore b/.gitignore index a799fe3ee0..d0b5c91519 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ pypirc # Distribution / packaging .Python env/ +.venv/ build/ develop-eggs/ dist/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 7207297d9f..3b53e49117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - **GFQL / Oracle**: Introduced `graphistry.gfql.ref.enumerator`, a pandas-only reference implementation that enumerates fixed-length chains, enforces local + same-path predicates, applies strict null semantics, enforces safety caps, and emits alias tags/optional path bindings for use as a correctness oracle. +- **GFQL / cuDF same-path**: Added execution-mode gate `GRAPHISTRY_CUDF_SAME_PATH_MODE` (auto/oracle/strict) for GFQL cuDF same-path executor. Auto falls back to oracle when GPU unavailable; strict requires cuDF or raises. Oracle path retains safety caps and alias-tag propagation. +- **GFQL / cuDF executor**: Implemented same-path pruning path (wavefront backward filtering, min/max summaries for inequalities, value-aware equality filters) with oracle fallback. CUDF chains with WHERE now dispatch through the same-path executor. ### Tests - **GFQL**: Added deterministic + property-based oracle tests (triangles, alias reuse, cuDF conversions, Hypothesis) plus parity checks ensuring pandas GFQL chains match the oracle outputs. +- **GFQL / cuDF same-path**: Added strict/auto mode coverage for cuDF executor fallback behavior to keep CI stable while GPU kernels are wired up. +- **GFQL / cuDF same-path**: Added GPU-path parity tests (equality/inequality) over CPU data to guard semantics while GPU CI remains unavailable. - **Layouts**: Added comprehensive test coverage for `circle_layout()` and `group_in_a_box_layout()` with partition support (CPU/GPU) ## [0.45.9 - 2025-11-10] diff --git a/graphistry/compute/chain.py b/graphistry/compute/chain.py index 87cb8101c6..8c4bdf1f51 100644 --- a/graphistry/compute/chain.py +++ b/graphistry/compute/chain.py @@ -1,4 +1,5 @@ import logging +# ruff: noqa: E501 from typing import Dict, Union, cast, List, Tuple, Sequence, Optional, TYPE_CHECKING from graphistry.Engine import Engine, EngineAbstract, df_concat, df_to_engine, resolve_engine @@ -11,6 +12,11 @@ from .typing import DataFrameT from .util import generate_safe_column_name from graphistry.compute.validate.validate_schema import validate_chain_schema +from graphistry.gfql.same_path_types import ( + WhereComparison, + parse_where_json, + where_to_json, +) if TYPE_CHECKING: from graphistry.compute.exceptions import GFQLSchemaError, GFQLValidationError @@ -23,8 +29,13 @@ class Chain(ASTSerializable): - def __init__(self, chain: List[ASTObject]) -> None: + def __init__( + self, + chain: List[ASTObject], + where: Optional[Sequence[WhereComparison]] = None, + ) -> None: self.chain = chain + self.where = list(where or []) def validate(self, collect_all: bool = False) -> Optional[List['GFQLValidationError']]: """Override to collect all chain validation errors.""" @@ -114,7 +125,14 @@ def from_json(cls, d: Dict[str, JSONVal], validate: bool = True) -> 'Chain': f"Chain field must be a list, got {type(d['chain']).__name__}" ) - out = cls([ASTObject_from_json(op, validate=validate) for op in d['chain']]) + where_raw = d.get('where') + where = parse_where_json( + cast(Optional[Sequence[Dict[str, Dict[str, str]]]], where_raw) + ) + out = cls( + [ASTObject_from_json(op, validate=validate) for op in d['chain']], + where=where, + ) if validate: out.validate() return out @@ -125,10 +143,13 @@ def to_json(self, validate=True) -> Dict[str, JSONVal]: """ if validate: self.validate() - return { + data: Dict[str, JSONVal] = { 'type': self.__class__.__name__, 'chain': [op.to_json() for op in self.chain] } + if self.where: + data['where'] = where_to_json(self.where) + return data def validate_schema(self, g: Plottable, collect_all: bool = False) -> Optional[List['GFQLSchemaError']]: """Validate this chain against a graph's schema without executing. diff --git a/graphistry/compute/gfql/cudf_executor.py b/graphistry/compute/gfql/cudf_executor.py new file mode 100644 index 0000000000..5918e23cf5 --- /dev/null +++ b/graphistry/compute/gfql/cudf_executor.py @@ -0,0 +1,832 @@ +"""cuDF-based GFQL executor with same-path WHERE planning. + +This module hosts the GPU execution path for GFQL chains that require +same-path predicate enforcement. The actual kernels / dataframe +operations are implemented in follow-up steps; for now we centralize the +structure so the planner and chain machinery have a single place to hook +into. +""" + +from __future__ import annotations + +import os +from collections import defaultdict +from dataclasses import dataclass +from typing import Dict, Literal, Sequence, Set, List, Optional, Any, cast + +import pandas as pd + +from graphistry.Engine import Engine +from graphistry.Plottable import Plottable +from graphistry.compute.ast import ASTCall, ASTEdge, ASTNode, ASTObject +from graphistry.gfql.ref.enumerator import OracleCaps, enumerate_chain +from graphistry.gfql.same_path_plan import SamePathPlan, plan_same_path +from graphistry.gfql.same_path_types import WhereComparison +from graphistry.compute.typing import DataFrameT + +AliasKind = Literal["node", "edge"] + +__all__ = [ + "AliasBinding", + "SamePathExecutorInputs", + "CuDFSamePathExecutor", + "build_same_path_inputs", + "execute_same_path_chain", +] + +_CUDF_MODE_ENV = "GRAPHISTRY_CUDF_SAME_PATH_MODE" + + +@dataclass(frozen=True) +class AliasBinding: + """Metadata describing which chain step an alias refers to.""" + + alias: str + step_index: int + kind: AliasKind + ast: ASTObject + + +@dataclass(frozen=True) +class SamePathExecutorInputs: + """Container for all metadata needed by the cuDF executor.""" + + graph: Plottable + chain: Sequence[ASTObject] + where: Sequence[WhereComparison] + plan: SamePathPlan + engine: Engine + alias_bindings: Dict[str, AliasBinding] + column_requirements: Dict[str, Set[str]] + include_paths: bool = False + + +class CuDFSamePathExecutor: + """Runs a forward/backward/forward pass using cuDF dataframes.""" + + def __init__(self, inputs: SamePathExecutorInputs) -> None: + self.inputs = inputs + self.forward_steps: List[Plottable] = [] + self.alias_frames: Dict[str, DataFrameT] = {} + self._node_column = inputs.graph._node + self._edge_column = inputs.graph._edge + self._source_column = inputs.graph._source + self._destination_column = inputs.graph._destination + self._minmax_summaries: Dict[str, Dict[str, DataFrameT]] = defaultdict(dict) + self._equality_values: Dict[str, Dict[str, Set[Any]]] = defaultdict(dict) + + def run(self) -> Plottable: + """Execute full cuDF traversal. + + Currently defaults to an oracle-backed path unless GPU kernels are + explicitly enabled and available. Alias frames are updated from the + oracle tags so downstream consumers can inspect per-alias bindings. + """ + self._forward() + if self._should_attempt_gpu(): + return self._run_gpu() + return self._run_oracle() + + def _forward(self) -> None: + graph = self.inputs.graph + ops = self.inputs.chain + self.forward_steps = [] + + for idx, op in enumerate(ops): + if isinstance(op, ASTCall): + current_g = self.forward_steps[-1] if self.forward_steps else graph + prev_nodes = None + else: + current_g = graph + prev_nodes = ( + None if not self.forward_steps else self.forward_steps[-1]._nodes + ) + g_step = op( + g=current_g, + prev_node_wavefront=prev_nodes, + target_wave_front=None, + engine=self.inputs.engine, + ) + self.forward_steps.append(g_step) + self._capture_alias_frame(op, g_step, idx) + + def _backward(self) -> None: + raise NotImplementedError + + def _finalize(self) -> Plottable: + raise NotImplementedError + + def _capture_alias_frame( + self, op: ASTObject, step_result: Plottable, step_index: int + ) -> None: + alias = getattr(op, "_name", None) + if not alias or alias not in self.inputs.alias_bindings: + return + binding = self.inputs.alias_bindings[alias] + frame = ( + step_result._nodes + if binding.kind == "node" + else step_result._edges + ) + if frame is None: + kind = "node" if binding.kind == "node" else "edge" + raise ValueError( + f"Alias '{alias}' did not produce a {kind} frame" + ) + required = set(self.inputs.column_requirements.get(alias, set())) + id_col = self._node_column if binding.kind == "node" else self._edge_column + if id_col: + required.add(id_col) + missing = [col for col in required if col not in frame.columns] + if missing: + cols = ", ".join(missing) + raise ValueError( + f"Alias '{alias}' missing required columns: {cols}" + ) + subset_cols = [col for col in required] + alias_frame = frame[subset_cols].copy() + self.alias_frames[alias] = alias_frame + self._capture_minmax(alias, alias_frame, id_col) + self._capture_equality_values(alias, alias_frame) + self._apply_ready_clauses() + + # --- Execution selection helpers ------------------------------------------------- + + def _should_attempt_gpu(self) -> bool: + """Decide whether to try GPU kernels for same-path execution.""" + + mode = os.environ.get(_CUDF_MODE_ENV, "auto").lower() + if mode not in {"auto", "oracle", "strict"}: + mode = "auto" + + # force oracle path + if mode == "oracle": + return False + + # only CUDF engine supports GPU fastpath + if self.inputs.engine != Engine.CUDF: + return False + + try: # check cudf presence + import cudf # type: ignore # noqa: F401 + except Exception: + if mode == "strict": + raise RuntimeError( + "cuDF engine requested with strict mode but cudf is unavailable" + ) + return False + return True + + # --- Oracle (CPU) fallback ------------------------------------------------------- + + def _run_oracle(self) -> Plottable: + oracle = enumerate_chain( + self.inputs.graph, + self.inputs.chain, + where=self.inputs.where, + include_paths=self.inputs.include_paths, + caps=OracleCaps( + max_nodes=1000, max_edges=5000, max_length=20, max_partial_rows=1_000_000 + ), + ) + self._update_alias_frames_from_oracle(oracle.tags) + return self._materialize_from_oracle(oracle.nodes, oracle.edges) + + # --- GPU path placeholder -------------------------------------------------------- + + def _run_gpu(self) -> Plottable: + """GPU-style path using captured wavefronts and same-path pruning.""" + + allowed_tags = self._compute_allowed_tags() + path_state = self._backward_prune(allowed_tags) + return self._materialize_filtered(path_state) + + def _update_alias_frames_from_oracle( + self, tags: Dict[str, Set[Any]] + ) -> None: + """Filter captured frames using oracle tags to ensure path coherence.""" + + for alias, binding in self.inputs.alias_bindings.items(): + if alias not in tags: + # if oracle didn't emit the alias, leave any existing capture intact + continue + ids = tags.get(alias, set()) + frame = self._lookup_binding_frame(binding) + if frame is None: + continue + id_col = self._node_column if binding.kind == "node" else self._edge_column + if id_col is None: + continue + filtered = frame[frame[id_col].isin(ids)].copy() + self.alias_frames[alias] = filtered + + def _lookup_binding_frame(self, binding: AliasBinding) -> Optional[DataFrameT]: + if binding.step_index >= len(self.forward_steps): + return None + step_result = self.forward_steps[binding.step_index] + return ( + step_result._nodes + if binding.kind == "node" + else step_result._edges + ) + + def _materialize_from_oracle( + self, nodes_df: DataFrameT, edges_df: DataFrameT + ) -> Plottable: + """Build a Plottable from oracle node/edge outputs, preserving bindings.""" + + g = self.inputs.graph + edge_id = g._edge + src = g._source + dst = g._destination + node_id = g._node + + if node_id and node_id not in nodes_df.columns: + raise ValueError(f"Oracle nodes missing id column '{node_id}'") + if dst and dst not in edges_df.columns: + raise ValueError(f"Oracle edges missing destination column '{dst}'") + if src and src not in edges_df.columns: + raise ValueError(f"Oracle edges missing source column '{src}'") + if edge_id and edge_id not in edges_df.columns: + # Enumerators may synthesize an edge id column when original graph lacked one + if "__enumerator_edge_id__" in edges_df.columns: + edges_df = edges_df.rename(columns={"__enumerator_edge_id__": edge_id}) + else: + raise ValueError(f"Oracle edges missing id column '{edge_id}'") + + g_out = g.nodes(nodes_df, node=node_id) + g_out = g_out.edges(edges_df, source=src, destination=dst, edge=edge_id) + return g_out + + # --- GPU helpers --------------------------------------------------------------- + + def _compute_allowed_tags(self) -> Dict[str, Set[Any]]: + """Seed allowed ids from alias frames (post-forward pruning).""" + + out: Dict[str, Set[Any]] = {} + for alias, binding in self.inputs.alias_bindings.items(): + frame = self.alias_frames.get(alias) + if frame is None: + continue + id_col = self._node_column if binding.kind == "node" else self._edge_column + if id_col is None or id_col not in frame.columns: + continue + out[alias] = self._series_values(frame[id_col]) + return out + + def _capture_minmax( + self, alias: str, frame: DataFrameT, id_col: Optional[str] + ) -> None: + if not id_col: + return + cols = self.inputs.column_requirements.get(alias, set()) + target_cols = [ + col for col in cols if self.inputs.plan.requires_minmax(alias) and col in frame.columns + ] + if not target_cols: + return + grouped = frame.groupby(id_col) + for col in target_cols: + summary = grouped[col].agg(["min", "max"]).reset_index() + self._minmax_summaries[alias][col] = summary + + def _capture_equality_values( + self, alias: str, frame: DataFrameT + ) -> None: + cols = self.inputs.column_requirements.get(alias, set()) + participates = any( + alias in bitset.aliases for bitset in self.inputs.plan.bitsets.values() + ) + if not participates: + return + for col in cols: + if col in frame.columns: + self._equality_values[alias][col] = self._series_values(frame[col]) + + @dataclass + class _PathState: + allowed_nodes: Dict[int, Set[Any]] + allowed_edges: Dict[int, Set[Any]] + + def _backward_prune(self, allowed_tags: Dict[str, Set[Any]]) -> "_PathState": + """Propagate allowed ids backward across edges to enforce path coherence.""" + + node_indices: List[int] = [] + edge_indices: List[int] = [] + for idx, op in enumerate(self.inputs.chain): + if isinstance(op, ASTNode): + node_indices.append(idx) + elif isinstance(op, ASTEdge): + edge_indices.append(idx) + if not node_indices: + raise ValueError("Same-path executor requires at least one node step") + if len(node_indices) != len(edge_indices) + 1: + raise ValueError("Chain must alternate node/edge steps for same-path execution") + + allowed_nodes: Dict[int, Set[Any]] = {} + allowed_edges: Dict[int, Set[Any]] = {} + + # Seed node allowances from tags or full frames + for idx in node_indices: + node_alias = self._alias_for_step(idx) + frame = self.forward_steps[idx]._nodes + if frame is None or self._node_column is None: + continue + if node_alias and node_alias in allowed_tags: + allowed_nodes[idx] = set(allowed_tags[node_alias]) + else: + allowed_nodes[idx] = self._series_values(frame[self._node_column]) + + # Walk edges backward + for edge_idx, right_node_idx in reversed(list(zip(edge_indices, node_indices[1:]))): + edge_alias = self._alias_for_step(edge_idx) + left_node_idx = node_indices[node_indices.index(right_node_idx) - 1] + edges_df = self.forward_steps[edge_idx]._edges + if edges_df is None: + continue + + filtered = edges_df + if self._destination_column and self._destination_column in filtered.columns: + allowed_dst = allowed_nodes.get(right_node_idx) + if allowed_dst is not None: + filtered = filtered[ + filtered[self._destination_column].isin(list(allowed_dst)) + ] + + # Apply value-based clauses between adjacent aliases + left_alias = self._alias_for_step(left_node_idx) + right_alias = self._alias_for_step(right_node_idx) + if left_alias and right_alias: + filtered = self._filter_edges_by_clauses( + filtered, left_alias, right_alias, allowed_nodes + ) + + if edge_alias and edge_alias in allowed_tags: + allowed_edge_ids = allowed_tags[edge_alias] + if self._edge_column and self._edge_column in filtered.columns: + filtered = filtered[ + filtered[self._edge_column].isin(list(allowed_edge_ids)) + ] + + if self._destination_column and self._destination_column in filtered.columns: + allowed_dst_actual = self._series_values(filtered[self._destination_column]) + current_dst = allowed_nodes.get(right_node_idx, set()) + allowed_nodes[right_node_idx] = ( + current_dst & allowed_dst_actual if current_dst else allowed_dst_actual + ) + + if self._edge_column and self._edge_column in filtered.columns: + allowed_edges[edge_idx] = self._series_values(filtered[self._edge_column]) + + if self._source_column and self._source_column in filtered.columns: + allowed_src = self._series_values(filtered[self._source_column]) + current = allowed_nodes.get(left_node_idx, set()) + allowed_nodes[left_node_idx] = current & allowed_src if current else allowed_src + + return self._PathState(allowed_nodes=allowed_nodes, allowed_edges=allowed_edges) + + def _filter_edges_by_clauses( + self, + edges_df: DataFrameT, + left_alias: str, + right_alias: str, + allowed_nodes: Dict[int, Set[Any]], + ) -> DataFrameT: + """Filter edges using WHERE clauses that connect adjacent aliases.""" + + relevant = [ + clause + for clause in self.inputs.where + if {clause.left.alias, clause.right.alias} == {left_alias, right_alias} + ] + if not relevant or not self._source_column or not self._destination_column: + return edges_df + + left_frame = self.alias_frames.get(left_alias) + right_frame = self.alias_frames.get(right_alias) + if left_frame is None or right_frame is None or self._node_column is None: + return edges_df + + out_df = edges_df + left_allowed = allowed_nodes.get(self.inputs.alias_bindings[left_alias].step_index) + right_allowed = allowed_nodes.get(self.inputs.alias_bindings[right_alias].step_index) + + lf = left_frame + rf = right_frame + if left_allowed is not None: + lf = lf[lf[self._node_column].isin(list(left_allowed))] + if right_allowed is not None: + rf = rf[rf[self._node_column].isin(list(right_allowed))] + + left_cols = list(self.inputs.column_requirements.get(left_alias, [])) + right_cols = list(self.inputs.column_requirements.get(right_alias, [])) + if self._node_column in left_cols: + left_cols.remove(self._node_column) + if self._node_column in right_cols: + right_cols.remove(self._node_column) + + lf = lf[[self._node_column] + left_cols].rename(columns={self._node_column: "__left_id__"}) + rf = rf[[self._node_column] + right_cols].rename(columns={self._node_column: "__right_id__"}) + + out_df = out_df.merge( + lf, + left_on=self._source_column, + right_on="__left_id__", + how="inner", + ) + out_df = out_df.merge( + rf, + left_on=self._destination_column, + right_on="__right_id__", + how="inner", + suffixes=("", "__r"), + ) + + for clause in relevant: + left_col = clause.left.column if clause.left.alias == left_alias else clause.right.column + right_col = clause.right.column if clause.right.alias == right_alias else clause.left.column + if clause.op in {">", ">=", "<", "<="}: + out_df = self._apply_inequality_clause( + out_df, clause, left_alias, right_alias, left_col, right_col + ) + else: + col_left_name = f"__val_left_{left_col}" + col_right_name = f"__val_right_{right_col}" + out_df = out_df.rename(columns={ + left_col: col_left_name, + f"{left_col}__r": col_left_name if f"{left_col}__r" in out_df.columns else col_left_name, + }) + placeholder = {} + if right_col in out_df.columns: + placeholder[right_col] = col_right_name + if f"{right_col}__r" in out_df.columns: + placeholder[f"{right_col}__r"] = col_right_name + if placeholder: + out_df = out_df.rename(columns=placeholder) + if col_left_name in out_df.columns and col_right_name in out_df.columns: + mask = self._evaluate_clause(out_df[col_left_name], clause.op, out_df[col_right_name]) + out_df = out_df[mask] + + return out_df + + def _apply_inequality_clause( + self, + out_df: DataFrameT, + clause: WhereComparison, + left_alias: str, + right_alias: str, + left_col: str, + right_col: str, + ) -> DataFrameT: + left_summary = self._minmax_summaries.get(left_alias, {}).get(left_col) + right_summary = self._minmax_summaries.get(right_alias, {}).get(right_col) + + # Fall back to raw values if summaries are missing + lsum = None + rsum = None + if left_summary is not None: + lsum = left_summary.rename( + columns={ + left_summary.columns[0]: "__left_id__", + "min": f"{left_col}__min", + "max": f"{left_col}__max", + } + ) + if right_summary is not None: + rsum = right_summary.rename( + columns={ + right_summary.columns[0]: "__right_id__", + "min": f"{right_col}__min_r", + "max": f"{right_col}__max_r", + } + ) + merged = out_df + if lsum is not None: + merged = merged.merge(lsum, on="__left_id__", how="inner") + if rsum is not None: + merged = merged.merge(rsum, on="__right_id__", how="inner") + + if lsum is None or rsum is None: + col_left = left_col if left_col in merged.columns else left_col + col_right = ( + f"{right_col}__r" if f"{right_col}__r" in merged.columns else right_col + ) + if col_left in merged.columns and col_right in merged.columns: + mask = self._evaluate_clause(merged[col_left], clause.op, merged[col_right]) + return merged[mask] + return merged + + l_min = merged.get(f"{left_col}__min") + l_max = merged.get(f"{left_col}__max") + r_min = merged.get(f"{right_col}__min_r") + r_max = merged.get(f"{right_col}__max_r") + + if ( + l_min is None + or l_max is None + or r_min is None + or r_max is None + or f"{left_col}__min" not in merged.columns + or f"{left_col}__max" not in merged.columns + or f"{right_col}__min_r" not in merged.columns + or f"{right_col}__max_r" not in merged.columns + ): + return merged + + if clause.op == ">": + return merged[merged[f"{left_col}__min"] > merged[f"{right_col}__max_r"]] + if clause.op == ">=": + return merged[merged[f"{left_col}__min"] >= merged[f"{right_col}__max_r"]] + if clause.op == "<": + return merged[merged[f"{left_col}__max"] < merged[f"{right_col}__min_r"]] + # <= + return merged[merged[f"{left_col}__max"] <= merged[f"{right_col}__min_r"]] + + @staticmethod + def _evaluate_clause(series_left: Any, op: str, series_right: Any) -> Any: + if op == "==": + return series_left == series_right + if op == "!=": + return series_left != series_right + if op == ">": + return series_left > series_right + if op == ">=": + return series_left >= series_right + if op == "<": + return series_left < series_right + if op == "<=": + return series_left <= series_right + return False + + def _materialize_filtered(self, path_state: "_PathState") -> Plottable: + """Build result graph from allowed node/edge ids and refresh alias frames.""" + + nodes_df = self.inputs.graph._nodes + node_id = self._node_column + edge_id = self._edge_column + src = self._source_column + dst = self._destination_column + + edge_frames = [ + self.forward_steps[idx]._edges + for idx, op in enumerate(self.inputs.chain) + if isinstance(op, ASTEdge) and self.forward_steps[idx]._edges is not None + ] + concatenated_edges = self._concat_frames(edge_frames) + edges_df = concatenated_edges if concatenated_edges is not None else self.inputs.graph._edges + + if nodes_df is None or edges_df is None or node_id is None or src is None or dst is None: + raise ValueError("Graph bindings are incomplete for same-path execution") + + allowed_node_ids: Set[Any] = ( + set().union(*path_state.allowed_nodes.values()) if path_state.allowed_nodes else set() + ) + allowed_edge_ids: Set[Any] = ( + set().union(*path_state.allowed_edges.values()) if path_state.allowed_edges else set() + ) + + filtered_nodes = ( + nodes_df[nodes_df[node_id].isin(list(allowed_node_ids))] + if allowed_node_ids + else nodes_df.iloc[0:0] + ) + filtered_edges = edges_df + filtered_edges = ( + filtered_edges[filtered_edges[dst].isin(list(allowed_node_ids))] + if allowed_node_ids + else filtered_edges.iloc[0:0] + ) + if allowed_edge_ids and edge_id and edge_id in filtered_edges.columns: + filtered_edges = filtered_edges[filtered_edges[edge_id].isin(list(allowed_edge_ids))] + + for alias, binding in self.inputs.alias_bindings.items(): + frame = filtered_nodes if binding.kind == "node" else filtered_edges + id_col = self._node_column if binding.kind == "node" else self._edge_column + if id_col is None or id_col not in frame.columns: + continue + required = set(self.inputs.column_requirements.get(alias, set())) + required.add(id_col) + subset = frame[[c for c in frame.columns if c in required]].copy() + self.alias_frames[alias] = subset + + return self._materialize_from_oracle(filtered_nodes, filtered_edges) + + def _alias_for_step(self, step_index: int) -> Optional[str]: + for alias, binding in self.inputs.alias_bindings.items(): + if binding.step_index == step_index: + return alias + return None + + @staticmethod + def _concat_frames(frames: Sequence[DataFrameT]) -> Optional[DataFrameT]: + if not frames: + return None + first = frames[0] + if first.__class__.__module__.startswith("cudf"): + import cudf # type: ignore + + return cudf.concat(frames, ignore_index=True) + return pd.concat(frames, ignore_index=True) + + + def _apply_ready_clauses(self) -> None: + if not self.inputs.where: + return + ready = [ + clause + for clause in self.inputs.where + if clause.left.alias in self.alias_frames + and clause.right.alias in self.alias_frames + ] + for clause in ready: + self._prune_clause(clause) + + def _prune_clause(self, clause: WhereComparison) -> None: + if clause.op == "!=": + return # No global prune for inequality-yet + lhs = self.alias_frames[clause.left.alias] + rhs = self.alias_frames[clause.right.alias] + left_col = clause.left.column + right_col = clause.right.column + + if clause.op == "==": + allowed = self._common_values(lhs[left_col], rhs[right_col]) + self.alias_frames[clause.left.alias] = self._filter_by_values( + lhs, left_col, allowed + ) + self.alias_frames[clause.right.alias] = self._filter_by_values( + rhs, right_col, allowed + ) + elif clause.op == ">": + right_min = self._safe_min(rhs[right_col]) + left_max = self._safe_max(lhs[left_col]) + if right_min is not None: + self.alias_frames[clause.left.alias] = lhs[lhs[left_col] > right_min] + if left_max is not None: + self.alias_frames[clause.right.alias] = rhs[rhs[right_col] < left_max] + elif clause.op == ">=": + right_min = self._safe_min(rhs[right_col]) + left_max = self._safe_max(lhs[left_col]) + if right_min is not None: + self.alias_frames[clause.left.alias] = lhs[lhs[left_col] >= right_min] + if left_max is not None: + self.alias_frames[clause.right.alias] = rhs[ + rhs[right_col] <= left_max + ] + elif clause.op == "<": + right_max = self._safe_max(rhs[right_col]) + left_min = self._safe_min(lhs[left_col]) + if right_max is not None: + self.alias_frames[clause.left.alias] = lhs[lhs[left_col] < right_max] + if left_min is not None: + self.alias_frames[clause.right.alias] = rhs[ + rhs[right_col] > left_min + ] + elif clause.op == "<=": + right_max = self._safe_max(rhs[right_col]) + left_min = self._safe_min(lhs[left_col]) + if right_max is not None: + self.alias_frames[clause.left.alias] = lhs[ + lhs[left_col] <= right_max + ] + if left_min is not None: + self.alias_frames[clause.right.alias] = rhs[ + rhs[right_col] >= left_min + ] + + @staticmethod + def _filter_by_values( + frame: DataFrameT, column: str, values: Set[Any] + ) -> DataFrameT: + if not values: + return frame.iloc[0:0] + allowed = list(values) + mask = frame[column].isin(allowed) + return frame[mask] + + @staticmethod + def _common_values(series_a: Any, series_b: Any) -> Set[Any]: + vals_a = CuDFSamePathExecutor._series_values(series_a) + vals_b = CuDFSamePathExecutor._series_values(series_b) + return vals_a & vals_b + + @staticmethod + def _series_values(series: Any) -> Set[Any]: + pandas_series = CuDFSamePathExecutor._to_pandas_series(series) + return set(pandas_series.dropna().unique().tolist()) + + @staticmethod + def _safe_min(series: Any) -> Optional[Any]: + pandas_series = CuDFSamePathExecutor._to_pandas_series(series).dropna() + if pandas_series.empty: + return None + value = pandas_series.min() + if pd.isna(value): + return None + return value + + @staticmethod + def _safe_max(series: Any) -> Optional[Any]: + pandas_series = CuDFSamePathExecutor._to_pandas_series(series).dropna() + if pandas_series.empty: + return None + value = pandas_series.max() + if pd.isna(value): + return None + return value + + @staticmethod + def _to_pandas_series(series: Any) -> pd.Series: + if hasattr(series, "to_pandas"): + return series.to_pandas() + if isinstance(series, pd.Series): + return series + return pd.Series(series) + + +def build_same_path_inputs( + g: Plottable, + chain: Sequence[ASTObject], + where: Sequence[WhereComparison], + engine: Engine, + include_paths: bool = False, +) -> SamePathExecutorInputs: + """Construct executor inputs, deriving planner metadata and validations.""" + + bindings = _collect_alias_bindings(chain) + _validate_where_aliases(bindings, where) + required_columns = _collect_required_columns(where) + plan = plan_same_path(where) + + return SamePathExecutorInputs( + graph=g, + chain=list(chain), + where=list(where), + plan=plan, + engine=engine, + alias_bindings=bindings, + column_requirements=required_columns, + include_paths=include_paths, + ) + + +def execute_same_path_chain( + g: Plottable, + chain: Sequence[ASTObject], + where: Sequence[WhereComparison], + engine: Engine, + include_paths: bool = False, +) -> Plottable: + """Convenience wrapper used by Chain execution once hooked up.""" + + inputs = build_same_path_inputs(g, chain, where, engine, include_paths) + executor = CuDFSamePathExecutor(inputs) + return executor.run() + + +def _collect_alias_bindings(chain: Sequence[ASTObject]) -> Dict[str, AliasBinding]: + bindings: Dict[str, AliasBinding] = {} + for idx, step in enumerate(chain): + alias = getattr(step, "_name", None) + if not alias: + continue + if not isinstance(alias, str): + continue + if isinstance(step, ASTNode): + kind: AliasKind = "node" + elif isinstance(step, ASTEdge): + kind = "edge" + else: + continue + + if alias in bindings: + raise ValueError(f"Duplicate alias '{alias}' detected in chain") + bindings[alias] = AliasBinding(alias, idx, kind, step) + return bindings + + +def _collect_required_columns( + where: Sequence[WhereComparison], +) -> Dict[str, Set[str]]: + requirements: Dict[str, Set[str]] = defaultdict(set) + for clause in where: + requirements[clause.left.alias].add(clause.left.column) + requirements[clause.right.alias].add(clause.right.column) + return {alias: set(cols) for alias, cols in requirements.items()} + + +def _validate_where_aliases( + bindings: Dict[str, AliasBinding], + where: Sequence[WhereComparison], +) -> None: + if not where: + return + referenced = {clause.left.alias for clause in where} | { + clause.right.alias for clause in where + } + missing = sorted(alias for alias in referenced if alias not in bindings) + if missing: + missing_str = ", ".join(missing) + raise ValueError( + f"WHERE references aliases with no node/edge bindings: {missing_str}" + ) diff --git a/graphistry/compute/gfql_unified.py b/graphistry/compute/gfql_unified.py index 0cbb22a469..8c77788428 100644 --- a/graphistry/compute/gfql_unified.py +++ b/graphistry/compute/gfql_unified.py @@ -1,8 +1,9 @@ """GFQL unified entrypoint for chains and DAGs""" +# ruff: noqa: E501 -from typing import List, Union, Optional, Dict, Any +from typing import List, Union, Optional, Dict, Any, cast from graphistry.Plottable import Plottable -from graphistry.Engine import EngineAbstract +from graphistry.Engine import Engine, EngineAbstract from graphistry.util import setup_logger from .ast import ASTObject, ASTLet, ASTNode, ASTEdge from .chain import Chain, chain as chain_impl @@ -16,6 +17,11 @@ QueryType, expand_policy ) +from graphistry.gfql.same_path_types import parse_where_json +from graphistry.compute.gfql.cudf_executor import ( + build_same_path_inputs, + execute_same_path_chain, +) logger = setup_logger(__name__) @@ -227,8 +233,22 @@ def policy(context: PolicyContext) -> None: e.query_type = policy_context.get('query_type') raise - # Handle dict convenience first (convert to ASTLet) - if isinstance(query, dict): + # Handle dict convenience first + if isinstance(query, dict) and "chain" in query: + chain_items: List[ASTObject] = [] + for item in query["chain"]: + if isinstance(item, dict): + from .ast import from_json + chain_items.append(from_json(item)) + elif isinstance(item, ASTObject): + chain_items.append(item) + else: + raise TypeError(f"Unsupported chain entry type: {type(item)}") + where_meta = parse_where_json( + cast(Optional[List[Dict[str, Dict[str, str]]]], query.get("where")) + ) + query = Chain(chain_items, where=where_meta) + elif isinstance(query, dict): # Auto-wrap ASTNode and ASTEdge values in Chain for GraphOperation compatibility wrapped_dict = {} for key, value in query.items(): @@ -256,13 +276,13 @@ def policy(context: PolicyContext) -> None: logger.debug('GFQL executing as Chain') if output is not None: logger.warning('output parameter ignored for chain queries') - return chain_impl(self, query.chain, engine, policy=expanded_policy, context=context) + return _chain_dispatch(self, query, engine, expanded_policy, context) elif isinstance(query, ASTObject): # Single ASTObject -> execute as single-item chain logger.debug('GFQL executing single ASTObject as chain') if output is not None: logger.warning('output parameter ignored for chain queries') - return chain_impl(self, [query], engine, policy=expanded_policy, context=context) + return _chain_dispatch(self, Chain([query]), engine, expanded_policy, context) elif isinstance(query, list): logger.debug('GFQL executing list as chain') if output is not None: @@ -277,7 +297,7 @@ def policy(context: PolicyContext) -> None: else: converted_query.append(item) - return chain_impl(self, converted_query, engine, policy=expanded_policy, context=context) + return _chain_dispatch(self, Chain(converted_query), engine, expanded_policy, context) else: raise TypeError( f"Query must be ASTObject, List[ASTObject], Chain, ASTLet, or dict. " @@ -291,3 +311,32 @@ def policy(context: PolicyContext) -> None: # Reset policy depth if policy: context.policy_depth = policy_depth + + +def _chain_dispatch( + g: Plottable, + chain_obj: Chain, + engine: Union[EngineAbstract, str], + policy: Optional[PolicyDict], + context: ExecutionContext, +) -> Plottable: + """Dispatch chain execution, including cuDF same-path executor when applicable.""" + + is_cudf = engine == EngineAbstract.CUDF or engine == "cudf" + if is_cudf and chain_obj.where: + engine_enum = Engine.CUDF + inputs = build_same_path_inputs( + g, + chain_obj.chain, + chain_obj.where, + engine=engine_enum, + include_paths=False, + ) + return execute_same_path_chain( + inputs.graph, + inputs.chain, + inputs.where, + inputs.engine, + inputs.include_paths, + ) + return chain_impl(g, chain_obj.chain, engine, policy=policy, context=context) diff --git a/graphistry/gfql/ref/enumerator.py b/graphistry/gfql/ref/enumerator.py index a06f930cbb..1b94633423 100644 --- a/graphistry/gfql/ref/enumerator.py +++ b/graphistry/gfql/ref/enumerator.py @@ -1,9 +1,10 @@ """Minimal GFQL reference enumerator used as the correctness oracle.""" +# ruff: noqa: E501 from __future__ import annotations from dataclasses import dataclass -from typing import Any, Dict, List, Literal, Optional, Sequence, Set, Tuple +from typing import Any, Dict, List, Optional, Sequence, Set, Tuple import pandas as pd @@ -16,21 +17,7 @@ from graphistry.compute.ast import ASTEdge, ASTNode, ASTObject from graphistry.compute.chain import Chain from graphistry.compute.filter_by_dict import filter_by_dict -ComparisonOp = Literal["==", "!=", "<", "<=", ">", ">="] - - - -@dataclass(frozen=True) -class StepColumnRef: - alias: str - column: str - - -@dataclass(frozen=True) -class WhereComparison: - left: StepColumnRef - op: ComparisonOp - right: StepColumnRef +from graphistry.gfql.same_path_types import ComparisonOp, WhereComparison @dataclass(frozen=True) @@ -49,14 +36,6 @@ class OracleResult: paths: Optional[List[Dict[str, Any]]] = None -def col(alias: str, column: str) -> StepColumnRef: - return StepColumnRef(alias, column) - - -def compare(left: StepColumnRef, op: ComparisonOp, right: StepColumnRef) -> WhereComparison: - return WhereComparison(left, op, right) - - def enumerate_chain( g: Plottable, ops: Sequence[ASTObject], diff --git a/graphistry/gfql/same_path_plan.py b/graphistry/gfql/same_path_plan.py new file mode 100644 index 0000000000..8ea0b5d08e --- /dev/null +++ b/graphistry/gfql/same_path_plan.py @@ -0,0 +1,62 @@ +"""Planner toggles for same-path WHERE comparisons.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, Optional, Sequence, Set + +from graphistry.gfql.same_path_types import WhereComparison + + +@dataclass +class BitsetPlan: + aliases: Set[str] + lane_count: int = 64 + + +@dataclass +class StateTablePlan: + aliases: Set[str] + cap: int = 128 + + +@dataclass +class SamePathPlan: + minmax_aliases: Dict[str, Set[str]] = field(default_factory=dict) + bitsets: Dict[str, BitsetPlan] = field(default_factory=dict) + state_tables: Dict[str, StateTablePlan] = field(default_factory=dict) + + def requires_minmax(self, alias: str) -> bool: + return alias in self.minmax_aliases + + +def plan_same_path( + where: Optional[Sequence[WhereComparison]], + max_bitset_domain: int = 64, + state_cap: int = 128, +) -> SamePathPlan: + plan = SamePathPlan() + if not where: + return plan + + for clause in where: + if clause.op in {"<", "<=", ">", ">="}: + for ref in (clause.left, clause.right): + plan.minmax_aliases.setdefault(ref.alias, set()).add(ref.column) + elif clause.op in {"==", "!="}: + key = _equality_key(clause) + plan.bitsets.setdefault(key, BitsetPlan(set())).aliases.update( + {clause.left.alias, clause.right.alias} + ) + + return plan + + +def _equality_key(clause: WhereComparison) -> str: + cols = sorted( + [ + f"{clause.left.alias}.{clause.left.column}", + f"{clause.right.alias}.{clause.right.column}", + ] + ) + return "::".join(cols) diff --git a/graphistry/gfql/same_path_types.py b/graphistry/gfql/same_path_types.py new file mode 100644 index 0000000000..467b7058e9 --- /dev/null +++ b/graphistry/gfql/same_path_types.py @@ -0,0 +1,99 @@ +"""Shared data structures for same-path WHERE comparisons.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Literal, Optional, Sequence + + +ComparisonOp = Literal[ + "==", + "!=", + "<", + "<=", + ">", + ">=", +] + + +@dataclass(frozen=True) +class StepColumnRef: + alias: str + column: str + + +@dataclass(frozen=True) +class WhereComparison: + left: StepColumnRef + op: ComparisonOp + right: StepColumnRef + + +def col(alias: str, column: str) -> StepColumnRef: + return StepColumnRef(alias, column) + + +def compare( + left: StepColumnRef, op: ComparisonOp, right: StepColumnRef +) -> WhereComparison: + return WhereComparison(left, op, right) + + +def parse_column_ref(ref: str) -> StepColumnRef: + if "." not in ref: + raise ValueError(f"Column reference '{ref}' must be alias.column") + alias, column = ref.split(".", 1) + if not alias or not column: + raise ValueError(f"Invalid column reference '{ref}'") + return StepColumnRef(alias, column) + + +def parse_where_json( + where_json: Optional[Sequence[Dict[str, Dict[str, str]]]] +) -> List[WhereComparison]: + if not where_json: + return [] + clauses: List[WhereComparison] = [] + for entry in where_json: + if not isinstance(entry, dict) or len(entry) != 1: + raise ValueError(f"Invalid WHERE clause: {entry}") + op_name, payload = next(iter(entry.items())) + if op_name not in {"eq", "neq", "gt", "lt", "ge", "le"}: + raise ValueError(f"Unsupported WHERE operator '{op_name}'") + op_map: Dict[str, ComparisonOp] = { + "eq": "==", + "neq": "!=", + "gt": ">", + "lt": "<", + "ge": ">=", + "le": "<=", + } + left = parse_column_ref(payload["left"]) + right = parse_column_ref(payload["right"]) + clauses.append(WhereComparison(left, op_map[op_name], right)) + return clauses + + +def where_to_json(where: Sequence[WhereComparison]) -> List[Dict[str, Dict[str, str]]]: + result: List[Dict[str, Dict[str, str]]] = [] + op_map: Dict[str, str] = { + "==": "eq", + "!=": "neq", + ">": "gt", + "<": "lt", + ">=": "ge", + "<=": "le", + } + for clause in where: + op_name = op_map.get(clause.op) + if not op_name: + continue + result.append( + { + op_name: { + "left": f"{clause.left.alias}.{clause.left.column}", + "right": f"{clause.right.alias}.{clause.right.column}", + } + } + ) + return result diff --git a/graphistry/tests/compute/test_chain_where.py b/graphistry/tests/compute/test_chain_where.py new file mode 100644 index 0000000000..8c8c77eb46 --- /dev/null +++ b/graphistry/tests/compute/test_chain_where.py @@ -0,0 +1,49 @@ +import pandas as pd + +from graphistry.compute import n, e_forward +from graphistry.compute.chain import Chain +from graphistry.gfql.same_path_types import col, compare +from graphistry.tests.test_compute import CGFull + + +def test_chain_where_roundtrip(): + chain = Chain([n({'type': 'account'}, name='a'), e_forward(), n(name='c')], where=[ + compare(col('a', 'owner_id'), '==', col('c', 'owner_id')) + ]) + json_data = chain.to_json() + assert 'where' in json_data + restored = Chain.from_json(json_data) + assert len(restored.where) == 1 + + +def test_chain_from_json_literal(): + json_chain = { + 'chain': [ + n({'type': 'account'}, name='a').to_json(), + e_forward().to_json(), + n({'type': 'user'}, name='c').to_json(), + ], + 'where': [ + {'eq': {'left': 'a.owner_id', 'right': 'c.owner_id'}} + ], + } + chain = Chain.from_json(json_chain) + assert len(chain.where) == 1 + + +def test_gfql_chain_dict_with_where_executes(): + nodes_df = n({'type': 'account'}, name='a').to_json() + edge_json = e_forward().to_json() + user_json = n({'type': 'user'}, name='c').to_json() + json_chain = { + 'chain': [nodes_df, edge_json, user_json], + 'where': [{'eq': {'left': 'a.owner_id', 'right': 'c.owner_id'}}], + } + nodes_df = pd.DataFrame([ + {'id': 'acct1', 'type': 'account', 'owner_id': 'user1'}, + {'id': 'user1', 'type': 'user'}, + ]) + edges_df = pd.DataFrame([{'src': 'acct1', 'dst': 'user1'}]) + g = CGFull().nodes(nodes_df, 'id').edges(edges_df, 'src', 'dst') + res = g.gfql(json_chain) + assert res._nodes is not None diff --git a/mypy.ini b/mypy.ini index 30489d63c4..d1c7f6211a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.8 +python_version = 3.11 # TODO check tests exclude = graph_vector_pb2|versioneer|_version|graphistry/tests @@ -19,6 +19,9 @@ ignore_missing_imports = True [mypy-cupy.*] ignore_missing_imports = True +[mypy-tqdm.*] +ignore_missing_imports = True + [mypy-dask.*] ignore_missing_imports = True diff --git a/tests/gfql/ref/test_cudf_executor_inputs.py b/tests/gfql/ref/test_cudf_executor_inputs.py new file mode 100644 index 0000000000..ae3714b253 --- /dev/null +++ b/tests/gfql/ref/test_cudf_executor_inputs.py @@ -0,0 +1,441 @@ +import pandas as pd +import pytest + +from graphistry.Engine import Engine +from graphistry.compute import n, e_forward +from graphistry.compute.gfql.cudf_executor import ( + build_same_path_inputs, + CuDFSamePathExecutor, + execute_same_path_chain, +) +from graphistry.compute.gfql_unified import gfql +from graphistry.compute.chain import Chain +from graphistry.gfql.same_path_types import col, compare +from graphistry.gfql.ref.enumerator import OracleCaps, enumerate_chain +from graphistry.tests.test_compute import CGFull +from graphistry.compute.gfql.cudf_executor import _CUDF_MODE_ENV + + +def _make_graph(): + nodes = pd.DataFrame( + [ + {"id": "acct1", "type": "account", "owner_id": "user1", "score": 5}, + {"id": "acct2", "type": "account", "owner_id": "user2", "score": 9}, + {"id": "user1", "type": "user", "score": 7}, + {"id": "user2", "type": "user", "score": 3}, + ] + ) + edges = pd.DataFrame( + [ + {"src": "acct1", "dst": "user1"}, + {"src": "acct2", "dst": "user2"}, + ] + ) + return CGFull().nodes(nodes, "id").edges(edges, "src", "dst") + + +def test_build_inputs_collects_alias_metadata(): + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user", "id": "user1"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "owner_id"))] + graph = _make_graph() + + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + + assert set(inputs.alias_bindings) == {"a", "r", "c"} + assert inputs.column_requirements["a"] == {"owner_id"} + assert inputs.column_requirements["c"] == {"owner_id"} + assert inputs.plan.bitsets + + +def test_missing_alias_raises(): + chain = [n(name="a"), e_forward(name="r"), n(name="c")] + where = [compare(col("missing", "x"), "==", col("c", "owner_id"))] + graph = _make_graph() + + with pytest.raises(ValueError): + build_same_path_inputs(graph, chain, where, Engine.PANDAS) + + +def test_forward_captures_alias_frames_and_prunes(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user", "id": "user1"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + + assert "a" in executor.alias_frames + a_nodes = executor.alias_frames["a"] + assert set(a_nodes.columns) == {"id", "owner_id"} + assert list(a_nodes["id"]) == ["acct1"] + + +def test_forward_matches_oracle_tags_on_equality(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert oracle.tags is not None + assert set(executor.alias_frames["a"]["id"]) == oracle.tags["a"] + assert set(executor.alias_frames["c"]["id"]) == oracle.tags["c"] + + +def test_run_materializes_oracle_sets(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + + result = execute_same_path_chain(graph, chain, where, Engine.PANDAS) + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + + assert result._nodes is not None + assert result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) + + +def test_forward_minmax_prune_matches_oracle(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "score"), "<", col("c", "score"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert oracle.tags is not None + assert set(executor.alias_frames["a"]["id"]) == oracle.tags["a"] + assert set(executor.alias_frames["c"]["id"]) == oracle.tags["c"] + + +def test_strict_mode_without_cudf_raises(monkeypatch): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + monkeypatch.setenv(_CUDF_MODE_ENV, "strict") + inputs = build_same_path_inputs(graph, chain, where, Engine.CUDF) + executor = CuDFSamePathExecutor(inputs) + + cudf_available = True + try: + import cudf # type: ignore # noqa: F401 + except Exception: + cudf_available = False + + if cudf_available: + # If cudf exists, strict mode should proceed to GPU path (currently routes to oracle) + executor.run() + else: + with pytest.raises(RuntimeError): + executor.run() + + +def test_auto_mode_without_cudf_falls_back(monkeypatch): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + monkeypatch.setenv(_CUDF_MODE_ENV, "auto") + inputs = build_same_path_inputs(graph, chain, where, Engine.CUDF) + executor = CuDFSamePathExecutor(inputs) + result = executor.run() + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + + +def test_gpu_path_parity_equality(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + result = executor._run_gpu() + + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) + + +def test_gpu_path_parity_inequality(): + graph = _make_graph() + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "score"), ">", col("c", "score"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + result = executor._run_gpu() + + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) + + +def _assert_parity(graph, chain, where): + inputs = build_same_path_inputs(graph, chain, where, Engine.PANDAS) + executor = CuDFSamePathExecutor(inputs) + executor._forward() + result = executor._run_gpu() + oracle = enumerate_chain( + graph, + chain, + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=50, max_edges=50), + ) + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) + + +def test_topology_parity_scenarios(): + scenarios = [] + + nodes_cycle = pd.DataFrame( + [ + {"id": "a1", "type": "account", "value": 1}, + {"id": "a2", "type": "account", "value": 3}, + {"id": "b1", "type": "user", "value": 5}, + {"id": "b2", "type": "user", "value": 2}, + ] + ) + edges_cycle = pd.DataFrame( + [ + {"src": "a1", "dst": "b1"}, + {"src": "a1", "dst": "b2"}, # branch + {"src": "b1", "dst": "a2"}, # cycle back + ] + ) + chain_cycle = [ + n({"type": "account"}, name="a"), + e_forward(name="r1"), + n({"type": "user"}, name="b"), + e_forward(name="r2"), + n({"type": "account"}, name="c"), + ] + where_cycle = [compare(col("a", "value"), "<", col("c", "value"))] + scenarios.append((nodes_cycle, edges_cycle, chain_cycle, where_cycle, None)) + + nodes_mixed = pd.DataFrame( + [ + {"id": "a1", "type": "account", "owner_id": "u1", "score": 2}, + {"id": "a2", "type": "account", "owner_id": "u2", "score": 7}, + {"id": "u1", "type": "user", "score": 9}, + {"id": "u2", "type": "user", "score": 1}, + {"id": "u3", "type": "user", "score": 5}, + ] + ) + edges_mixed = pd.DataFrame( + [ + {"src": "a1", "dst": "u1"}, + {"src": "a2", "dst": "u2"}, + {"src": "a2", "dst": "u3"}, + ] + ) + chain_mixed = [ + n({"type": "account"}, name="a"), + e_forward(name="r1"), + n({"type": "user"}, name="b"), + e_forward(name="r2"), + n({"type": "account"}, name="c"), + ] + where_mixed = [ + compare(col("a", "owner_id"), "==", col("b", "id")), + compare(col("b", "score"), ">", col("c", "score")), + ] + scenarios.append((nodes_mixed, edges_mixed, chain_mixed, where_mixed, None)) + + nodes_edge_filter = pd.DataFrame( + [ + {"id": "acct1", "type": "account", "owner_id": "user1"}, + {"id": "acct2", "type": "account", "owner_id": "user2"}, + {"id": "user1", "type": "user"}, + {"id": "user2", "type": "user"}, + {"id": "user3", "type": "user"}, + ] + ) + edges_edge_filter = pd.DataFrame( + [ + {"src": "acct1", "dst": "user1", "etype": "owns"}, + {"src": "acct2", "dst": "user2", "etype": "owns"}, + {"src": "acct1", "dst": "user3", "etype": "follows"}, + ] + ) + chain_edge_filter = [ + n({"type": "account"}, name="a"), + e_forward({"etype": "owns"}, name="r"), + n({"type": "user"}, name="c"), + ] + where_edge_filter = [compare(col("a", "owner_id"), "==", col("c", "id"))] + scenarios.append((nodes_edge_filter, edges_edge_filter, chain_edge_filter, where_edge_filter, {"dst": {"user1", "user2"}})) + + for nodes_df, edges_df, chain, where, edge_expect in scenarios: + graph = CGFull().nodes(nodes_df, "id").edges(edges_df, "src", "dst") + _assert_parity(graph, chain, where) + if edge_expect: + assert graph._edge is None or "etype" in edges_df.columns # guard unused expectation + result = execute_same_path_chain(graph, chain, where, Engine.PANDAS) + assert result._edges is not None + if "dst" in edge_expect: + assert set(result._edges["dst"]) == edge_expect["dst"] + + +def test_cudf_gpu_path_if_available(): + cudf = pytest.importorskip("cudf") + nodes = cudf.DataFrame( + [ + {"id": "acct1", "type": "account", "owner_id": "user1", "score": 5}, + {"id": "acct2", "type": "account", "owner_id": "user2", "score": 9}, + {"id": "user1", "type": "user", "score": 7}, + {"id": "user2", "type": "user", "score": 3}, + ] + ) + edges = cudf.DataFrame( + [ + {"src": "acct1", "dst": "user1"}, + {"src": "acct2", "dst": "user2"}, + ] + ) + graph = CGFull().nodes(nodes, "id").edges(edges, "src", "dst") + chain = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + inputs = build_same_path_inputs(graph, chain, where, Engine.CUDF) + executor = CuDFSamePathExecutor(inputs) + result = executor.run() + + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"].to_pandas()) == {"acct1", "acct2"} + assert set(result._edges["src"].to_pandas()) == {"acct1", "acct2"} + + +def test_dispatch_dict_where_triggers_executor(): + pytest.importorskip("cudf") + graph = _make_graph() + query = { + "chain": [ + {"type": "Node", "name": "a", "filter_dict": {"type": "account"}}, + {"type": "Edge", "name": "r", "direction": "forward", "hops": 1}, + {"type": "Node", "name": "c", "filter_dict": {"type": "user"}}, + ], + "where": [{"eq": {"left": "a.owner_id", "right": "c.id"}}], + } + result = gfql(graph, query, engine=Engine.CUDF) + oracle = enumerate_chain( + graph, [n({"type": "account"}, name="a"), e_forward(name="r"), n({"type": "user"}, name="c")], + where=[compare(col("a", "owner_id"), "==", col("c", "id"))], + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) + + +def test_dispatch_chain_list_and_single_ast(): + graph = _make_graph() + chain_ops = [ + n({"type": "account"}, name="a"), + e_forward(name="r"), + n({"type": "user"}, name="c"), + ] + where = [compare(col("a", "owner_id"), "==", col("c", "id"))] + + for query in [Chain(chain_ops, where=where), chain_ops]: + result = gfql(graph, query, engine=Engine.PANDAS) + oracle = enumerate_chain( + graph, + chain_ops if isinstance(query, list) else list(chain_ops), + where=where, + include_paths=False, + caps=OracleCaps(max_nodes=20, max_edges=20), + ) + assert result._nodes is not None and result._edges is not None + assert set(result._nodes["id"]) == set(oracle.nodes["id"]) + assert set(result._edges["src"]) == set(oracle.edges["src"]) + assert set(result._edges["dst"]) == set(oracle.edges["dst"]) diff --git a/tests/gfql/ref/test_enumerator_parity.py b/tests/gfql/ref/test_enumerator_parity.py index e5bee253a5..c28ea97408 100644 --- a/tests/gfql/ref/test_enumerator_parity.py +++ b/tests/gfql/ref/test_enumerator_parity.py @@ -44,9 +44,13 @@ def _run_parity_case(nodes, edges, ops): if not alias: continue if isinstance(op, ASTNode): - assert oracle.tags.get(alias, set()) == _alias_bindings(gfql_nodes, g._node, alias) + assert oracle.tags.get(alias, set()) == _alias_bindings( + gfql_nodes, g._node, alias + ) elif isinstance(op, ASTEdge): - assert oracle.tags.get(alias, set()) == _alias_bindings(gfql_edges, g._edge, alias) + assert oracle.tags.get(alias, set()) == _alias_bindings( + gfql_edges, g._edge, alias + ) CASES = [ @@ -62,7 +66,8 @@ def _run_parity_case(nodes, edges, ops): {"edge_id": "e2", "src": "acct2", "dst": "acct3", "type": "txn"}, {"edge_id": "e3", "src": "acct3", "dst": "acct1", "type": "txn"}, ], - [n({"type": "account"}, name="start"), e_forward({"type": "txn"}, name="hop"), n({"type": "account"}, name="end")], + [n({"type": "account"}, name="start"), e_forward({"type": "txn"}, name="hop"), +n({"type": "account"}, name="end")], ), ( "reverse", @@ -75,7 +80,8 @@ def _run_parity_case(nodes, edges, ops): {"edge_id": "owns1", "src": "acct1", "dst": "user1", "type": "owns"}, {"edge_id": "owns2", "src": "acct2", "dst": "user1", "type": "owns"}, ], - [n({"type": "user"}, name="u"), e_reverse({"type": "owns"}, name="owns_rev"), n({"type": "account"}, name="acct")], + [n({"type": "user"}, name="u"), e_reverse({"type": "owns"}, name="owns_rev"), +n({"type": "account"}, name="acct")], ), ( "two_hop", @@ -109,7 +115,11 @@ def _run_parity_case(nodes, edges, ops): {"edge_id": "e12", "src": "n1", "dst": "n2", "type": "path"}, {"edge_id": "e23", "src": "n2", "dst": "n3", "type": "path"}, ], - [n({"type": "node"}, name="start"), e_undirected({"type": "path"}, name="hop"), n({"type": "node"}, name="end")], + [ + n({"type": "node"}, name="start"), + e_undirected({"type": "path"}, name="hop"), + n({"type": "node"}, name="end"), + ], ), ( "empty", @@ -118,7 +128,8 @@ def _run_parity_case(nodes, edges, ops): {"id": "acct2", "type": "account"}, ], [{"edge_id": "e1", "src": "acct1", "dst": "acct2", "type": "txn"}], - [n({"type": "user"}, name="start"), e_forward({"type": "txn"}, name="hop"), n({"type": "user"}, name="end")], + [n({"type": "user"}, name="start"), e_forward({"type": "txn"}, name="hop"), +n({"type": "user"}, name="end")], ), ( "cycle", @@ -151,7 +162,8 @@ def _run_parity_case(nodes, edges, ops): {"edge_id": "e2", "src": "acct1", "dst": "acct3", "type": "txn"}, {"edge_id": "e3", "src": "acct3", "dst": "acct4", "type": "txn"}, ], - [n({"type": "account"}, name="root"), e_forward({"type": "txn"}, name="first_hop"), n({"type": "account"}, name="child")], + [n({"type": "account"}, name="root"), e_forward({"type": "txn"}, +name="first_hop"), n({"type": "account"}, name="child")], ), ] diff --git a/tests/gfql/ref/test_ref_enumerator.py b/tests/gfql/ref/test_ref_enumerator.py index 81af62ef78..7b0adba23e 100644 --- a/tests/gfql/ref/test_ref_enumerator.py +++ b/tests/gfql/ref/test_ref_enumerator.py @@ -5,7 +5,8 @@ from types import SimpleNamespace from graphistry.compute import n, e_forward, e_undirected -from graphistry.gfql.ref.enumerator import OracleCaps, col, compare, enumerate_chain +from graphistry.gfql.ref.enumerator import OracleCaps, enumerate_chain +from graphistry.gfql.same_path_types import col, compare def _plottable(nodes, edges): @@ -35,7 +36,8 @@ def _col_set(df: pd.DataFrame, column: str) -> Set[str]: {"edge_id": "e1", "src": "acct1", "dst": "acct2", "type": "txn"}, {"edge_id": "e2", "src": "acct2", "dst": "user1", "type": "owns"}, ], - "ops": [n({"type": "account"}, name="a"), e_forward({"type": "txn"}), n(name="b")], + "ops": [n({"type": "account"}, name="a"), e_forward({"type": "txn"}), + n(name="b")], "expect": {"nodes": {"acct1", "acct2"}, "edges": {"e1"}}, }, { @@ -48,8 +50,10 @@ def _col_set(df: pd.DataFrame, column: str) -> Set[str]: ], "edges": [ {"edge_id": "e_good", "src": "acct_good", "dst": "user1", "type": "owns"}, - {"edge_id": "e_bad_match", "src": "acct_bad", "dst": "user2", "type": "owns"}, - {"edge_id": "e_bad_wrong", "src": "acct_bad", "dst": "user1", "type": "owns"}, + {"edge_id": "e_bad_match", "src": "acct_bad", "dst": "user2", "type": + "owns"}, + {"edge_id": "e_bad_wrong", "src": "acct_bad", "dst": "user1", "type": + "owns"}, ], "ops": [ n({"type": "account"}, name="a"), @@ -61,7 +65,8 @@ def _col_set(df: pd.DataFrame, column: str) -> Set[str]: "expect": { "nodes": {"acct_good", "acct_bad", "user1", "user2"}, "edges": {"e_good", "e_bad_match"}, - "tags": {"a": {"acct_good", "acct_bad"}, "r": {"e_good", "e_bad_match"}, "c": {"user1", "user2"}}, + "tags": {"a": {"acct_good", "acct_bad"}, "r": {"e_good", "e_bad_match"}, + "c": {"user1", "user2"}}, "paths": [ {"a": "acct_good", "c": "user1", "r": "e_good"}, {"a": "acct_bad", "c": "user2", "r": "e_bad_match"}, @@ -152,8 +157,10 @@ def __init__(self, df): def to_pandas(self): return self._df.copy() - g = _plottable(Dummy(pd.DataFrame([{"id": "n1"}])), Dummy(pd.DataFrame([{"edge_id": "e1", "src": "n1", "dst": "n1"}]))) - result = enumerate_chain(g, [n(name="a")], caps=OracleCaps(max_nodes=20, max_edges=20)) + g = _plottable(Dummy(pd.DataFrame([{"id": "n1"}])), Dummy(pd.DataFrame([{"edge_id": + "e1", "src": "n1", "dst": "n1"}]))) + result = enumerate_chain(g, [n(name="a")], caps=OracleCaps(max_nodes=20, + max_edges=20)) assert _col_set(result.nodes, "id") == {"n1"} @@ -209,9 +216,11 @@ def test_paths_are_deterministically_sorted(): @st.composite def small_graph_cases(draw): - nodes = draw(st.lists(st.sampled_from(NODE_POOL), min_size=2, max_size=4, unique=True)) + nodes = draw(st.lists(st.sampled_from(NODE_POOL), min_size=2, max_size=4, + unique=True)) node_rows = [{"id": node, "value": draw(st.integers(0, 3))} for node in nodes] - edges = draw(st.lists(st.tuples(st.sampled_from(nodes), st.sampled_from(nodes)), min_size=1, max_size=5)) + edges = draw(st.lists(st.tuples(st.sampled_from(nodes), st.sampled_from(nodes)), + min_size=1, max_size=5)) edge_rows = [ {"edge_id": EDGE_POOL[i % len(EDGE_POOL)], "src": src, "dst": dst} for i, (src, dst) in enumerate(edges) @@ -241,7 +250,8 @@ def test_enumerator_paths_cover_outputs(case): [n(name="a"), e_forward(name="rel"), n(name="c")], where=case["where"], include_paths=True, - caps=OracleCaps(max_nodes=10, max_edges=10, max_length=4, max_partial_rows=10_000), + caps=OracleCaps(max_nodes=10, max_edges=10, max_length=4, + max_partial_rows=10_000), ) path_nodes = { diff --git a/tests/gfql/ref/test_same_path_plan.py b/tests/gfql/ref/test_same_path_plan.py new file mode 100644 index 0000000000..120ce656da --- /dev/null +++ b/tests/gfql/ref/test_same_path_plan.py @@ -0,0 +1,18 @@ +from graphistry.gfql.same_path_plan import plan_same_path +from graphistry.gfql.same_path_types import col, compare + + +def test_plan_minmax_and_bitset(): + where = [ + compare(col("a", "balance"), ">", col("c", "credit")), + compare(col("a", "owner"), "==", col("c", "owner")), + ] + plan = plan_same_path(where) + assert plan.minmax_aliases == {"a": {"balance"}, "c": {"credit"}} + assert any("owner" in key for key in plan.bitsets) + + +def test_plan_empty_when_no_where(): + plan = plan_same_path(None) + assert plan.minmax_aliases == {} + assert plan.bitsets == {}