Source code for graphizy.algorithms

"""
Graph algorithms for graphizy

.. moduleauthor:: Charles Fosseprez
.. contact:: charles.fosseprez.pro@gmail.com
.. license:: GPL2 or later
.. copyright:: Copyright (C) 2025 Charles Fosseprez
"""

import logging
import time
import random
import timeit
from typing import List, Tuple, Dict, Any, Union, Optional
from scipy.spatial.distance import cdist, pdist, squareform
import numpy as np
from collections import defaultdict, deque

from graphizy.exceptions import (
    InvalidPointArrayError, SubdivisionError, TriangulationError,
    GraphCreationError, PositionGenerationError, DependencyError,
    IgraphMethodError
)
from graphizy.data_interface import DataInterface
from .exceptions import handle_subdivision_bounds_error, InvalidDataShapeError

try:
    import cv2
except ImportError:
    raise DependencyError("OpenCV is required but not installed. Install with: pip install opencv-python")

try:
    import igraph as ig
except ImportError:
    raise DependencyError("python-igraph is required but not installed. Install with: pip install python-igraph")

try:
    from scipy.spatial.distance import pdist, squareform
except ImportError:
    raise DependencyError("scipy is required but not installed. Install with: pip install scipy")


[docs] def normalize_distance_metric(metric: str) -> str: """ Normalize distance metric names to scipy-compatible format. Args: metric: Distance metric name Returns: Scipy-compatible metric name """ metric_mapping = { 'manhattan': 'cityblock', 'l1': 'cityblock', 'euclidean': 'euclidean', 'l2': 'euclidean', 'chebyshev': 'chebyshev', 'linf': 'chebyshev' } return metric_mapping.get(metric.lower(), metric.lower())
[docs] def normalize_id(obj_id: Any) -> str: """ Normalize object ID to consistent string format for real-time applications. Optimized for performance: - Handles int, float, str inputs - Converts float IDs like 1.0, 2.0 to "1", "2" - Preserves non-integer floats as-is Args: obj_id: Object ID of any type Returns: Normalized string ID """ if isinstance(obj_id, str): return obj_id elif isinstance(obj_id, (int, np.integer)): return str(int(obj_id)) elif isinstance(obj_id, (float, np.floating)): # Check if it's an integer float (e.g., 1.0, 2.0) if obj_id.is_integer(): return str(int(obj_id)) else: return str(obj_id) else: return str(obj_id)
[docs] def make_subdiv(point_array: np.ndarray, dimensions: Union[List, Tuple], do_print: bool = False) -> Any: """Make the opencv subdivision with enhanced error handling Args: point_array: A numpy array of the points to add dimensions: The dimension of the image (width, height) do_print: Whether to print debug information Returns: An opencv subdivision object Raises: SubdivisionError: If subdivision creation fails """ logger = logging.getLogger('graphizy.algorithms.make_subdiv') try: # Input validation with enhanced error reporting if point_array is None or point_array.size == 0: raise SubdivisionError("Point array cannot be None or empty", point_array, dimensions) if len(dimensions) != 2: raise SubdivisionError("Dimensions must be a tuple/list of 2 values", point_array, dimensions) if dimensions[0] <= 0 or dimensions[1] <= 0: raise SubdivisionError("Dimensions must be positive", point_array, dimensions) width, height = dimensions logger.debug(f"make_subdiv: {len(point_array)} points, dimensions {dimensions}") logger.debug( f"Point ranges: X[{point_array[:, 0].min():.1f}, {point_array[:, 0].max():.1f}], Y[{point_array[:, 1].min():.1f}, {point_array[:, 1].max():.1f}]") # Check type and convert if needed if not isinstance(point_array.flat[0], (np.floating, float)): logger.warning(f"Converting points from {type(point_array.flat[0])} to float32") if isinstance(point_array, np.ndarray): point_array = point_array.astype("float32") else: particle_stack = [[float(x), float(y)] for x, y in point_array] point_array = np.array(particle_stack) # Enhanced bounds checking with detailed error reporting # Validate X coordinates if np.any(point_array[:, 0] < 0): bad_points = point_array[point_array[:, 0] < 0] raise SubdivisionError(f"Found {len(bad_points)} points with X < 0", point_array, dimensions) if np.any(point_array[:, 0] >= width): handle_subdivision_bounds_error(point_array, dimensions, 'x') # Validate Y coordinates if np.any(point_array[:, 1] < 0): bad_points = point_array[point_array[:, 1] < 0] raise SubdivisionError(f"Found {len(bad_points)} points with Y < 0", point_array, dimensions) if np.any(point_array[:, 1] >= height): handle_subdivision_bounds_error(point_array, dimensions, 'y') # Timer timer = time.time() # Create rectangle (normal coordinate system: width, height) rect = (0, 0, width, height) logger.debug(f"Creating OpenCV rectangle: {rect}") if do_print: unique_points = len(np.unique(point_array, axis=0)) print(f"Processing {len(point_array)} positions ({unique_points} unique)") print(f"Rectangle: {rect}") outside_count = (point_array[:, 0] >= width).sum() + (point_array[:, 1] >= height).sum() print(f"Points outside bounds: {outside_count}") # Create subdivision subdiv = cv2.Subdiv2D(rect) # Insert points into subdiv with error tracking logger.debug(f"Inserting {len(point_array)} points into subdivision") points_list = [tuple(point) for point in point_array] failed_insertions = 0 for i, point in enumerate(points_list): try: subdiv.insert(point) except cv2.error as e: failed_insertions += 1 logger.warning(f"Failed to insert point {i} {point}: {e}") continue if failed_insertions > 0: logger.warning(f"Failed to insert {failed_insertions}/{len(points_list)} points") if failed_insertions == len(points_list): raise SubdivisionError("Failed to insert all points", point_array, dimensions) elapsed_time = round((time.time() - timer) * 1000, 3) logger.debug(f"Subdivision creation took {elapsed_time}ms") return subdiv except SubdivisionError: # Re-raise SubdivisionError as-is (they already have context) raise except Exception as e: # Convert other exceptions to SubdivisionError with context error = SubdivisionError(f"Failed to create subdivision: {str(e)}", point_array, dimensions, original_exception=e) error.log_error() raise error
[docs] def make_delaunay(subdiv: Any) -> np.ndarray: """Return a Delaunay triangulation Args: subdiv: An opencv subdivision Returns: A triangle list Raises: TriangulationError: If triangulation fails """ try: if subdiv is None: raise TriangulationError("Subdivision cannot be None") triangle_list = subdiv.getTriangleList() if len(triangle_list) == 0: logging.warning("No triangles found in subdivision") return triangle_list except Exception as e: raise TriangulationError(f"Failed to create Delaunay triangulation: {str(e)}")
[docs] def get_delaunay(point_array: np.ndarray, dim: Union[List, Tuple]) -> np.ndarray: """Make the delaunay triangulation of set of points Args: point_array: Array of points dim: Dimensions Returns: Triangle list Raises: TriangulationError: If triangulation fails """ try: subdiv = make_subdiv(point_array, dim) return make_delaunay(subdiv) except Exception as e: raise TriangulationError(f"Failed to get Delaunay triangulation: {str(e)}")
[docs] def find_vertex(trilist: List, subdiv: Any, g: Any) -> Any: """Find vertices in triangulation and add edges to graph Args: trilist: List of triangles subdiv: OpenCV subdivision g: igraph Graph object Returns: Modified graph Raises: GraphCreationError: If vertex finding fails """ try: if trilist is None or len(trilist) == 0: raise GraphCreationError("Triangle list cannot be empty") if subdiv is None: raise GraphCreationError("Subdivision cannot be None") if g is None: raise GraphCreationError("Graph cannot be None") for tri in trilist: if len(tri) != 6: logging.warning(f"Invalid triangle format: expected 6 values, got {len(tri)}") continue try: vertex1, _ = subdiv.findNearest((tri[0], tri[1])) vertex2, _ = subdiv.findNearest((tri[2], tri[3])) vertex3, _ = subdiv.findNearest((tri[4], tri[5])) # -4 because https://stackoverflow.com/a/52377891/18493005 edges = [ (vertex1 - 4, vertex2 - 4), (vertex2 - 4, vertex3 - 4), (vertex3 - 4, vertex1 - 4), ] # Validate vertex indices max_vertex = g.vcount() valid_edges = [] for edge in edges: if 0 <= edge[0] < max_vertex and 0 <= edge[1] < max_vertex: valid_edges.append(edge) else: logging.warning(f"Invalid edge {edge}, graph has {max_vertex} vertices") if valid_edges: g.add_edges(valid_edges) except Exception as e: logging.warning(f"Failed to process triangle {tri}: {e}") continue return g except Exception as e: raise GraphCreationError(f"Failed to find vertices: {str(e)}")
def _are_points_collinear(points, tolerance=1e-10): """ Check if points are approximately collinear Args: points: numpy array of shape (n, 2) with x, y coordinates tolerance: tolerance for collinearity check Returns: bool: True if points are collinear """ if len(points) < 3: return True # Use cross product to check collinearity # For points A, B, C: they're collinear if (B-A) × (C-A) ≈ 0 A, B, C = points[0], points[1], points[2] # Cross product in 2D: (B-A) × (C-A) = (B_x-A_x)(C_y-A_y) - (B_y-A_y)(C_x-A_x) cross_product = ((B[0] - A[0]) * (C[1] - A[1]) - (B[1] - A[1]) * (C[0] - A[0])) return abs(cross_product) < tolerance
[docs] def graph_delaunay(graph: Any, subdiv: Any, trilist: List) -> Any: """From CV to original ID and igraph Args: graph: igraph object subdiv: OpenCV subdivision trilist: List of triangles Returns: Modified graph Raises: GraphCreationError: If graph creation fails """ try: if graph is None: raise GraphCreationError("Graph cannot be None") if subdiv is None: raise GraphCreationError("Subdivision cannot be None") if trilist is None or len(trilist) == 0: num_vertices = len(graph.vs) if num_vertices < 3: raise GraphCreationError( f"Delaunay triangulation requires at least 3 points, got {num_vertices}. " f"Provide more points for meaningful triangulation." ) elif num_vertices == 3: # Special case: exactly 3 points should form 1 triangle # Check if points are collinear positions = np.array([(v["x"], v["y"]) for v in graph.vs]) if _are_points_collinear(positions): raise GraphCreationError( "Cannot create Delaunay triangulation: the 3 points are collinear. " "Provide points that form a proper triangle." ) else: # Create the single triangle manually logging.warning("Creating manual triangle for 3-point case") graph.add_edge(0, 1) graph.add_edge(1, 2) graph.add_edge(2, 0) return graph elif num_vertices <= 10: # Small dataset: provide more helpful error message raise GraphCreationError( f"No valid triangles found for {num_vertices} points. " f"This can happen with collinear points or points outside the valid range. " f"Try using more well-distributed points (recommended: ≥10 points)." ) else: # Larger dataset with no triangles: likely a serious issue raise GraphCreationError( f"No triangles found in Delaunay triangulation for {num_vertices} points. " f"Check that points are within valid coordinate ranges and not all collinear." ) edges_set = set() # Iterate over the triangle list for tri in trilist: if len(tri) != 6: logging.warning(f"Invalid triangle format: expected 6 values, got {len(tri)}") continue try: vertex1 = subdiv.locate((tri[0], tri[1]))[2] - 4 vertex2 = subdiv.locate((tri[2], tri[3]))[2] - 4 vertex3 = subdiv.locate((tri[4], tri[5]))[2] - 4 # Validate vertex indices max_vertex = graph.vcount() if not (0 <= vertex1 < max_vertex and 0 <= vertex2 < max_vertex and 0 <= vertex3 < max_vertex): logging.warning( f"Invalid vertices: {vertex1}, {vertex2}, {vertex3} for graph with {max_vertex} vertices") continue edges_set.add((vertex1, vertex2)) edges_set.add((vertex2, vertex3)) edges_set.add((vertex3, vertex1)) except Exception as e: logging.warning(f"Failed to process triangle {tri}: {e}") continue # Convert to list and remove duplicates edges_set = list({*map(tuple, map(sorted, edges_set))}) if edges_set: graph.add_edges(edges_set) # Remove redundant edges graph.simplify() return graph except Exception as e: raise GraphCreationError(f"Failed to create Delaunay graph: {str(e)}")
[docs] def get_distance(position_array: np.ndarray, proximity_thresh: float, metric: str = "euclidean") -> List[List[int]]: """Filter points by proximity, return the points within specified distance of each point Args: position_array: Array of position of shape (n, 2) proximity_thresh: Only keep points within this distance metric: Type of distance calculated Returns: List of lists containing indices of nearby points Raises: GraphCreationError: If distance calculation fails """ try: if position_array is None or position_array.size == 0: raise GraphCreationError("Position array cannot be None or empty") if position_array.ndim != 2 or position_array.shape[1] != 2: raise GraphCreationError("Position array must be 2D with shape (n, 2)") if proximity_thresh <= 0: raise GraphCreationError("Proximity threshold must be positive") # Normalize the metric name to scipy-compatible format normalized_metric = normalize_distance_metric(metric) square_dist = squareform(pdist(position_array, metric=normalized_metric)) proxi_list = [] for i, row in enumerate(square_dist): nearby_indices = np.where((row < proximity_thresh) & (row > 0))[0].tolist() proxi_list.append(nearby_indices) return proxi_list except Exception as e: raise GraphCreationError(f"Failed to calculate distances: {str(e)}")
[docs] def graph_distance(graph: Any, position2d: np.ndarray, proximity_thresh: float, metric: str = "euclidean") -> Any: """Construct a distance graph Args: graph: igraph Graph object position2d: 2D position array proximity_thresh: Distance threshold metric: Distance metric Returns: Modified graph Raises: GraphCreationError: If distance graph creation fails """ try: if graph is None: raise GraphCreationError("Graph cannot be None") # Get the list of points within distance of each other proxi_list = get_distance(position2d, proximity_thresh, metric) # Make the edges edges_set = set() for i, point_list in enumerate(proxi_list): if i >= graph.vcount(): logging.warning(f"Point index {i} exceeds graph vertex count {graph.vcount()}") continue valid_points = [x for x in point_list if x < graph.vcount()] if len(valid_points) != len(point_list): logging.warning(f"Some points in proximity list exceed graph vertex count") tlist = [(i, x) for x in valid_points] edges_set.update(tlist) edges_set = list({*map(tuple, map(sorted, edges_set))}) # Add the edges if edges_set: graph.add_edges(edges_set) # Simplify the graph graph.simplify() return graph except Exception as e: raise GraphCreationError(f"Failed to create distance graph: {str(e)}")
[docs] def create_graph_array(point_array: np.ndarray, data_shape: Optional[List[Tuple[str, Any]]] = None) -> Any: """Create a graph from a point array, dynamically adding attributes. Args: point_array: Array of points with columns corresponding to the data_shape. data_shape: List of tuples defining the data structure, e.g., [('id', int), ('x', float), ('velocity', float)]. If None, defaults to the legacy [id, x, y] behavior. Returns: igraph Graph object. Raises: GraphCreationError: If graph creation fails. """ try: if point_array is None or point_array.size == 0: raise GraphCreationError("Point array cannot be None or empty") if point_array.ndim != 2: raise GraphCreationError(f"Point array must be 2D, got {point_array.ndim}D") timer = time.time() n_vertices = len(point_array) graph = ig.Graph(n=n_vertices) # If no data_shape is provided, fall back to the old hardcoded behavior for compatibility. if data_shape is None: logging.warning("No data_shape provided to create_graph_array, defaulting to [id, x, y].") if point_array.shape[1] < 3: raise GraphCreationError("Point array must have at least 3 columns [id, x, y] for default mode.") graph.vs["id"] = point_array[:, 0] graph.vs["x"] = point_array[:, 1] graph.vs["y"] = point_array[:, 2] # Use the ID as the name for a nice summary string. graph.vs["name"] = [normalize_id(val) for val in point_array[:, 0]] logging.debug(f"Graph creation (default) took {round((time.time() - timer) * 1000, 3)}ms") return graph # --- DYNAMIC ATTRIBUTE CREATION --- id_col_index = -1 for i, (attr_name, attr_type) in enumerate(data_shape): if i < point_array.shape[1]: # igraph's 'name' attribute must be a string. if attr_name == "name": graph.vs[attr_name] = [str(val) for val in point_array[:, i]] else: graph.vs[attr_name] = point_array[:, i] if attr_name == "id": id_col_index = i else: logging.debug( f"[!] data_shape specifies attribute '{attr_name}' at column {i}, " f"but data only has {point_array.shape[1]} columns. Skipping." ) # Ensure 'name' attribute exists for compatibility, even if not in data_shape. # This makes graph.summary() look good. if "name" not in [ds[0] for ds in data_shape]: if id_col_index != -1: # Use the 'id' column to create the 'name' attribute. id_values = point_array[:, id_col_index] graph.vs["name"] = [normalize_id(val) for val in id_values] else: # Fallback if no 'id' is present. graph.vs["name"] = [str(i) for i in range(n_vertices)] logging.debug(f"Graph creation (dynamic) took {round((time.time() - timer) * 1000, 3)}ms") return graph except Exception as e: raise GraphCreationError(f"Failed to create graph from array: {str(e)}") from e
[docs] def create_graph_dict(point_dict: Dict[str, Any]) -> Any: """Create a graph from a point dictionary. This function dynamically adds all keys from the dictionary as vertex attributes. Args: point_dict: Dictionary with keys 'id', 'x', 'y', and other optional attributes. All values should be lists or arrays of the same length. Returns: igraph Graph object Raises: GraphCreationError: If graph creation fails """ try: if not point_dict: raise GraphCreationError("Point dictionary cannot be empty") # Core keys are required for basic functionality required_keys = ['id', 'x', 'y'] missing_keys = [key for key in required_keys if key not in point_dict] if missing_keys: raise GraphCreationError(f"Missing required keys: {missing_keys}") # Check that all arrays have the same length lengths = {key: len(val) for key, val in point_dict.items()} if len(set(lengths.values())) > 1: raise GraphCreationError(f"All arrays in dictionary must have the same length. Got: {lengths}") timer = time.time() n_vertices = len(point_dict["id"]) graph = ig.Graph(n=n_vertices) # Dynamically add all provided attributes for attr_name, values in point_dict.items(): if attr_name == "name": graph.vs[attr_name] = [str(val) for val in values] else: graph.vs[attr_name] = values # Ensure 'name' attribute exists for compatibility if not provided if "name" not in point_dict: id_values = point_dict["id"] graph.vs["name"] = [normalize_id(val) for val in id_values] logging.debug(f"Graph creation from dict took {round((time.time() - timer) * 1000, 3)}ms") return graph except Exception as e: raise GraphCreationError(f"Failed to create graph from dictionary: {str(e)}") from e
[docs] def call_igraph_method(graph: Any, method_name: str, *args, **kwargs) -> Any: """Call any igraph method on the graph safely Args: graph: igraph Graph object method_name: Name of the method to call *args: Positional arguments for the method **kwargs: Keyword arguments for the method Returns: Result of the method call Raises: IgraphMethodError: If method call fails """ try: if graph is None: raise IgraphMethodError("Graph cannot be None") if not method_name: raise IgraphMethodError("Method name cannot be empty") if not hasattr(graph, method_name): raise IgraphMethodError(f"Graph does not have method '{method_name}'") method = getattr(graph, method_name) if not callable(method): raise IgraphMethodError(f"'{method_name}' is not a callable method") result = method(*args, **kwargs) logging.debug(f"Successfully called {method_name} on graph") return result except Exception as e: raise IgraphMethodError(f"Failed to call method '{method_name}': {str(e)}")
[docs] def create_delaunay_graph(data_points: Union[np.ndarray, Dict[str, Any]], aspect: str = "array", dimension: Tuple[int, int] = (1200, 1200), data_shape: Optional[List[Tuple[str, Any]]] = None) -> Any: """Create a Delaunay triangulation graph from point data Args: data_points: Point data as array or dictionary aspect: Data format ("array" or "dict") dimension: Image dimensions (width, height) data_shape: shape of the data to pass (if extra column of information) Returns: igraph Graph object with Delaunay triangulation Raises: GraphCreationError: If Delaunay graph creation fails """ try: timer0 = time.time() # Create and populate the graph with points if aspect == "array": if not isinstance(data_points, np.ndarray): raise GraphCreationError("Expected numpy array for 'array' aspect") # Simple type check - reject string/object IDs if data_points.dtype.kind in ['U', 'S', 'O']: raise GraphCreationError("Object IDs must be numeric, not string type") graph = create_graph_array(data_points, data_shape=data_shape) # Make triangulation with appropriate columns (assuming standard format [id, x, y]) pos_array = np.stack(( data_points[:, 1], # x position (column 1) data_points[:, 2] # y position (column 2) ), axis=1) subdiv = make_subdiv(pos_array, dimension) tri_list = subdiv.getTriangleList() elif aspect == "dict": if isinstance(data_points, dict): graph = create_graph_dict(data_points) pos_array = np.stack((data_points["x"], data_points["y"]), axis=1) elif isinstance(data_points, np.ndarray): # Convert array to dict format first data_interface = DataInterface() # Use default data shape data_points = data_interface.to_array(data_points) graph = create_graph_dict(data_points) pos_array = np.stack((data_points["x"], data_points["y"]), axis=1) else: raise GraphCreationError("Invalid data format for 'dict' aspect") subdiv = make_subdiv(pos_array, dimension) tri_list = subdiv.getTriangleList() else: raise GraphCreationError("Graph data interface could not be understood") logging.debug(f"Creation and Triangulation took {round((time.time() - timer0) * 1000, 3)}ms") timer1 = time.time() # Populate edges graph = graph_delaunay(graph, subdiv, tri_list) logging.debug(f"Conversion took {round((time.time() - timer1) * 1000, 3)}ms") return graph except Exception as e: raise GraphCreationError(f"Failed to create Delaunay graph: {str(e)}")
[docs] def create_proximity_graph(data_points: Union[np.ndarray, Dict[str, Any]], proximity_thresh: float, aspect: str = "array", metric: str = "euclidean", data_shape: Optional[List[Tuple[str, Any]]] = None ) -> Any: """Create a proximity graph from point data Args: data_points: Point data as array or dictionary proximity_thresh: Distance threshold for connections aspect: Data format ("array" or "dict") metric: Distance metric to use for the graph construction data_shape: shape of the data to pass (if extra column of information) Returns: igraph Graph object with proximity connections and distance attributes Raises: GraphCreationError: If proximity graph creation fails """ try: timer_prox = timeit.default_timer() if aspect == "array": if not isinstance(data_points, np.ndarray): raise GraphCreationError("Expected numpy array for 'array' aspect") graph = create_graph_array(data_points, data_shape=data_shape) pos_array = np.stack(( data_points[:, 1], # x position (column 1) data_points[:, 2] # y position (column 2) ), axis=1) elif aspect == "dict": if isinstance(data_points, dict): graph = create_graph_dict(data_points) pos_array = np.stack((data_points["x"], data_points["y"]), axis=1) elif isinstance(data_points, np.ndarray): data_interface = DataInterface() data_points = data_interface.to_array(data_points) graph = create_graph_dict(data_points) pos_array = np.stack((data_points["x"], data_points["y"]), axis=1) else: raise GraphCreationError("Invalid data format for 'dict' aspect") else: raise GraphCreationError("Graph data interface could not be understood") # Create proximity connections with optimized vectorized approach graph = graph_distance_optimized(graph, pos_array, proximity_thresh, metric=metric) end_prox = timeit.default_timer() logging.debug(f"Distance calculation took {round((end_prox - timer_prox) * 1000, 3)}ms") return graph except Exception as e: raise GraphCreationError(f"Failed to create proximity graph: {str(e)}")
[docs] def graph_distance_optimized(graph: Any, position2d: np.ndarray, proximity_thresh: float, metric: str = "euclidean", ) -> Any: """Construct a distance graph using optimized vectorized operations Args: graph: igraph Graph object position2d: 2D position array proximity_thresh: Distance threshold metric: Distance metric Returns: Modified graph with distance attributes on edges Raises: GraphCreationError: If distance graph creation fails """ try: if graph is None: raise GraphCreationError("Graph cannot be None") if position2d is None or position2d.size == 0: raise GraphCreationError("Position array cannot be None or empty") if position2d.ndim != 2 or position2d.shape[1] != 2: raise GraphCreationError("Position array must be 2D with shape (n, 2)") if proximity_thresh <= 0: raise GraphCreationError("Proximity threshold must be positive") # Normalize the metric name to scipy-compatible format normalized_metric = normalize_distance_metric(metric) # Calculate full distance matrix using scipy's optimized pdist + squareform square_dist = squareform(pdist(position2d, metric=normalized_metric)) # Use upper triangle indices to avoid duplicate edges (k=1 excludes diagonal) i_idx, j_idx = np.triu_indices_from(square_dist, k=1) # Apply threshold filter: distance < threshold AND distance > 0 distances = square_dist[i_idx, j_idx] mask = (distances < proximity_thresh) & (distances > 0) # Filter valid edges and their weights valid_i = i_idx[mask] valid_j = j_idx[mask] valid_distances = distances[mask] # Validate vertex indices against graph max_vertex = graph.vcount() vertex_mask = (valid_i < max_vertex) & (valid_j < max_vertex) if not np.all(vertex_mask): logging.warning(f"Some vertices exceed graph vertex count {max_vertex}") valid_i = valid_i[vertex_mask] valid_j = valid_j[vertex_mask] valid_distances = valid_distances[vertex_mask] # Create edge list and add all edges at once if len(valid_i) > 0: edges_to_add = list(zip(valid_i, valid_j)) graph.add_edges(edges_to_add) # Add distance attributes graph.es['distance'] = valid_distances.tolist() graph.es['weight'] = valid_distances.tolist() # Many algorithms expect 'weight' # Simplify graph to remove any potential duplicates graph.simplify() return graph except Exception as e: raise GraphCreationError(f"Failed to create optimized distance graph: {str(e)}")
[docs] def create_knn_graph(positions: np.ndarray, k: int = 3, aspect: str = "array", data_shape: Optional[List[Tuple[str, Any]]] = None) -> Any: """Create graph connecting each point to its k nearest neighbors Args: positions: Point data array k: Number of nearest neighbors aspect: Data format data_shape: shape of the data to pass (if extra column of information) """ try: # Validate k parameter if k <= 0: raise GraphCreationError("k must be positive") if k >= len(positions): raise GraphCreationError(f"k ({k}) must be less than number of points ({len(positions)})") if aspect == "array": graph = create_graph_array(positions, data_shape) pos_2d = positions[:, 1:3] else: raise NotImplementedError("Dict aspect not implemented for k-nearest") # Calculate distances distances = cdist(pos_2d, pos_2d) # Find k nearest neighbors for each point edges_to_add = [] for i, row in enumerate(distances): nearest_indices = np.argsort(row)[:k + 1] nearest_indices = nearest_indices[nearest_indices != i][:k] for j in nearest_indices: edge = tuple(sorted([i, j])) edges_to_add.append(edge) # Remove duplicates and add edges unique_edges = list(set(edges_to_add)) if unique_edges: graph.add_edges(unique_edges) return graph except Exception as e: raise GraphCreationError(f"Failed to create k-nearest graph: {str(e)}")
[docs] def create_mst_graph(positions: np.ndarray, aspect: str = "array", metric: str = "euclidean", data_shape: Optional[List[Tuple[str, Any]]] = None ) -> Any: """Create minimum spanning tree graph from a standardized array. Args: positions: Point data array aspect: Data format metric: Distance metric for MST construction data_shape: shape of the data to pass (if extra column of information) """ try: if aspect == "array": graph = create_graph_array(positions, data_shape=data_shape) pos_2d = positions[:, 1:3] else: raise NotImplementedError("Dict aspect not implemented for MST") assert isinstance(metric, str), f"Expected 'metric' to be a string, got {type(metric).__name__}" normalized_metric = normalize_distance_metric(metric) distances = squareform(pdist(pos_2d, metric=normalized_metric)) # Create complete graph for MST algorithm complete_graph = ig.Graph(n=len(positions), directed=False) edges_to_add = [] weights = [] for i in range(len(positions)): for j in range(i + 1, len(positions)): edges_to_add.append((i, j)) weights.append(distances[i, j]) complete_graph.add_edges(edges_to_add) complete_graph.es['weight'] = weights # Get MST mst_graph = complete_graph.spanning_tree(weights="weight") # Transfer edges to original graph graph.add_edges(mst_graph.get_edgelist()) graph.es['weight'] = mst_graph.es['weight'] return graph except Exception as e: raise GraphCreationError(f"Failed to create MST: {str(e)}")
[docs] def create_gabriel_graph(positions: np.ndarray, aspect: str = "array", data_shape: Optional[List[Tuple[str, Any]]] = None) -> Any: """ Create a Gabriel graph from point positions using an optimized, vectorized approach. A Gabriel graph is a subgraph of the Delaunay triangulation where for any edge (p, q), the disk with diameter pq contains no other point r. Args: positions: Point data array with shape (n, >=3) containing [id, x, y, ...]. aspect: Data format (currently only "array" is supported). data_shape: shape of the data to pass (if extra column of information) Returns: igraph.Graph: An igraph Graph object representing the Gabriel graph. Raises: GraphCreationError: If graph creation fails. """ try: if aspect != "array": raise NotImplementedError("Dict aspect is not yet implemented for Gabriel graph") # 1. Initial setup graph = create_graph_array(positions, data_shape=data_shape) # Use float64 for better precision in geometric calculations pos_2d = positions[:, 1:3].astype(np.float64) n_points = len(pos_2d) if n_points < 2: return graph # 2. Start with the Delaunay triangulation. This is a crucial optimization, # as a Gabriel graph is always a subgraph of the Delaunay graph. This # reduces the number of candidate edges from O(N^2) to O(N). max_x, max_y = np.max(pos_2d, axis=0) temp_graph = create_delaunay_graph( positions, aspect="array", dimension=(int(max_x) + 1, int(max_y) + 1), data_shape=data_shape ) if temp_graph.ecount() == 0: return graph # No edges to check # 3. Get Delaunay edges and corresponding point coordinates delaunay_edges = np.array(temp_graph.get_edgelist()) source_indices, target_indices = delaunay_edges[:, 0], delaunay_edges[:, 1] p1s = pos_2d[source_indices] # Coordinates of all source points p2s = pos_2d[target_indices] # Coordinates of all target points # 4. Vectorized calculation of disk centers and radii for ALL edges at once # Center of the disk for each edge (p1, p2) is (p1+p2)/2 centers = (p1s + p2s) / 2.0 # Squared radius of the disk is ||(p1-p2)/2||^2 radii_sq = np.sum(((p1s - p2s) / 2.0)**2, axis=1) # 5. Vectorized check for the Gabriel condition. # We check if any other point `pk` falls inside the disk of an edge `(pi, pj)`. # Condition: ||pk - center_ij||^2 < radius_ij^2 # Expand dimensions for broadcasting to compute the difference between every # disk center and every point in the dataset. # `dist_sq_matrix[e, k]` will be the squared distance from the center of edge `e` to point `k`. dist_sq_matrix = cdist(centers, pos_2d, 'sqeuclidean') # Compare every distance to the corresponding edge's radius. # `is_inside[e, k]` is True if point `k` is inside the disk of edge `e`. is_inside = dist_sq_matrix < radii_sq[:, np.newaxis] - 1e-10 # Tolerance for float precision # 6. Mask out the endpoints for each edge. # A point cannot invalidate an edge it belongs to. We create a boolean # mask to efficiently set `is_inside` to False for these cases. n_edges = len(delaunay_edges) row_indices = np.arange(n_edges) is_inside[row_indices, source_indices] = False is_inside[row_indices, target_indices] = False # 7. Identify Gabriel edges. # An edge is a Gabriel edge if NO other point is inside its disk. # We check if `any` value is True along the points axis (axis=1). has_intruder = np.any(is_inside, axis=1) gabriel_edges = delaunay_edges[~has_intruder] # 8. Add the final Gabriel edges to the graph if gabriel_edges.size > 0: graph.add_edges(gabriel_edges.tolist()) return graph except Exception as e: # Wrap the exception for consistent error handling raise GraphCreationError(f"Failed to create Gabriel graph: {str(e)}") from e
[docs] def create_voronoi_cell_graph(positions: np.ndarray, dimension: Tuple[int, int], aspect: str = "array", ) -> Any: """ Create graph from Voronoi diagram structure: - Nodes are Voronoi vertices (intersections of cell boundaries) - Edges connect adjacent Voronoi vertices """ from scipy.spatial import Voronoi try: if aspect == "array": pos_2d = positions[:, 1:3] else: raise NotImplementedError("Dict aspect not implemented for Voronoi cell graph") # Compute Voronoi diagram vor = Voronoi(pos_2d) # Create graph with Voronoi vertices as nodes n_vertices = len(vor.vertices) graph = ig.Graph(n=n_vertices) # Set vertex attributes (Voronoi vertex coordinates) graph.vs["id"] = list(range(n_vertices)) graph.vs["x"] = vor.vertices[:, 0] graph.vs["y"] = vor.vertices[:, 1] graph.vs["name"] = list(range(n_vertices)) # Add edges between adjacent Voronoi vertices edges_to_add = [] for ridge_vertices in vor.ridge_vertices: if -1 not in ridge_vertices: # Skip infinite ridges edges_to_add.append(tuple(ridge_vertices)) if edges_to_add: graph.add_edges(edges_to_add) # Create position array for distance calculation voronoi_positions = np.column_stack([ graph.vs["id"], graph.vs["x"], graph.vs["y"] ]) return graph except Exception as e: raise GraphCreationError(f"Failed to create Voronoi cell graph: {str(e)}")
[docs] def create_visibility_graph(positions: np.ndarray, obstacles: Optional[List] = None, aspect: str = "array", ) -> Any: """ Create visibility graph where points are connected if they have line-of-sight. Args: positions: Point data array obstacles: List of obstacle polygons (optional) aspect: Data format add_distance: Whether to add distance attributes """ try: if aspect == "array": graph = create_graph_array(positions) pos_2d = positions[:, 1:3] else: raise NotImplementedError("Dict aspect not implemented for visibility graph") n_points = len(pos_2d) edges_to_add = [] # Check visibility between all pairs for i in range(n_points): for j in range(i + 1, n_points): if _has_line_of_sight(pos_2d[i], pos_2d[j], obstacles): edges_to_add.append((i, j)) if edges_to_add: graph.add_edges(edges_to_add) return graph except Exception as e: raise GraphCreationError(f"Failed to create visibility graph: {str(e)}")
def _has_line_of_sight(p1: np.ndarray, p2: np.ndarray, obstacles: Optional[List] = None) -> bool: """Check if two points have unobstructed line of sight""" if obstacles is None: return True # Check if line segment p1-p2 intersects any obstacle for obstacle in obstacles: if _line_intersects_polygon(p1, p2, obstacle): return False return True def _line_intersects_polygon(p1: np.ndarray, p2: np.ndarray, polygon: np.ndarray) -> bool: """Check if line segment intersects with polygon using ray casting""" # Implementation of line-polygon intersection # This is a standard computational geometry algorithm pass
[docs] def create_graph(data_points: Union[np.ndarray, Dict[str, Any]], graph_type: str, aspect: str = "array", dimension: Tuple[int, int] = (1200, 1200), **kwargs) -> Any: """Create any type of graph from point data Args: data_points: Point data as array or dictionary graph_type: Type of graph ("delaunay", "proximity", "knn", "mst", "gabriel") aspect: Data format ("array" or "dict") dimension: Image dimensions (width, height) **kwargs: Graph-specific parameters Returns: igraph Graph object Raises: GraphCreationError: If graph creation fails ValueError: If unknown graph type """ try: graph_type = graph_type.lower() if graph_type == "delaunay": return create_delaunay_graph(data_points, aspect, dimension) elif graph_type == "proximity": proximity_thresh = kwargs.get('proximity_thresh', 100.0) metric = kwargs.get('metric', 'euclidean') return create_proximity_graph(data_points, proximity_thresh, aspect, metric) elif graph_type == "knn" or graph_type == "k_nearest": k = kwargs.get('k', 4) return create_knn_graph(data_points, k, aspect) elif graph_type == "mst" or graph_type == "minimum_spanning_tree": metric = kwargs.get('metric', 'euclidean') return create_mst_graph(data_points, aspect, metric) elif graph_type == "gabriel": return create_gabriel_graph(data_points, aspect) else: raise ValueError(f"Unknown graph type: {graph_type}. " f"Supported types: delaunay, proximity, knn, mst, gabriel") except Exception as e: raise GraphCreationError(f"Failed to create {graph_type} graph: {str(e)}")