Source code for graphizy.main

"""
Main graphing class for graphizy

This module provides the primary interface for creating, manipulating, and visualizing
various types of graphs including Delaunay triangulations, proximity graphs, k-nearest
neighbor graphs, Gabriel graphs, minimum spanning trees, and memory-based graphs.

.. moduleauthor:: Charles Fosseprez
.. contact:: charles.fosseprez.pro@gmail.com
.. license:: GPL2 or later
.. copyright:: Copyright (C) 2025 Charles Fosseprez

Examples:
    Basic usage::

        from graphizy import Graphing
        import numpy as np

        # Create sample data
        data = np.random.rand(100, 3)  # 100 points with [id, x, y]

        # Initialize graphing object
        grapher = Graphing(dimension=(800, 600), aspect="array")

        # Create different types of graphs
        delaunay_graph = grapher.make_delaunay(data)
        proximity_graph = grapher.make_proximity(data, proximity_thresh=50.0)

        # Visualize
        image = grapher.draw_graph(delaunay_graph)
        grapher.show_graph(image, title="Delaunay Triangulation")
"""

import logging
import time
import timeit
from typing import Union, Dict, Any, List, Tuple, Optional, TYPE_CHECKING
import numpy as np
from networkx.algorithms.clique import make_max_clique_graph

if TYPE_CHECKING:
    from .networkx_bridge import NetworkXAnalyzer
    from .streaming import StreamManager, AsyncStreamManager

from graphizy.config import (
    GraphizyConfig, DrawingConfig, GraphConfig, MemoryConfig, WeightConfig,
    GenerationConfig, LoggingConfig,
)
from graphizy.exceptions import (
    InvalidAspectError, InvalidDimensionError, GraphCreationError,
    IgraphMethodError, DrawingError,
)
from graphizy.algorithms import (
    create_graph_array, create_graph_dict, call_igraph_method,
    create_delaunay_graph, create_proximity_graph,
    create_mst_graph, create_knn_graph, create_gabriel_graph,
)
from graphizy.analysis import GraphAnalysisResult
from graphizy.data_interface import DataInterface
from graphizy.memory import (
    MemoryManager, update_memory_from_custom_function
)
from graphizy.weight import (WeightComputer, setup_realtime_weight_computer)
from graphizy.drawing import Visualizer
from graphizy.plugins_logic import get_graph_registry


[docs] class Graphing: """ Main graphing class for creating and visualizing various types of graphs. This class provides a unified interface for creating different types of graphs from point data, including geometric graphs (Delaunay, Gabriel), proximity-based graphs (k-NN, proximity), and spanning trees. It also supports memory-based graphs for temporal analysis and comprehensive graph visualization. The class supports two data formats: - "array": NumPy arrays with columns [id, x, y] - "dict": Dictionaries with keys "id", "x", "y" Attributes: config (GraphizyConfig): Configuration object containing graph and drawing settings dimension (Tuple[int, int]): Canvas dimensions (width, height) aspect (str): Data format ("array" or "dict") dinter (DataInterface): Data interface for handling different data formats memory_manager (MemoryManager): Optional memory manager for temporal graphs # Drawing configuration shortcuts line_thickness (int): Thickness of graph edges line_color (Tuple[int, int, int]): RGB color for edges point_thickness (int): Thickness of point borders point_radius (int): Radius of graph vertices point_color (Tuple[int, int, int]): RGB color for vertices Examples: >>> # Basic initialization >>> grapher = Graphing(dimension=(800, 600), aspect="array") >>> # With custom configuration >>> config = GraphizyConfig() >>> config.drawing.line_color = (255, 0, 0) # Red edges >>> grapher = Graphing(config=config) >>> # Create and visualize a graph >>> data = np.random.rand(50, 3) >>> graph = grapher.make_delaunay(data) >>> image = grapher.draw_graph(graph) >>> grapher.show_graph(image) """ def __init__(self, config: Optional[GraphizyConfig] = None, **kwargs): """ Initialize Graphing object with a flexible configuration system. You can provide a pre-made GraphizyConfig object for detailed control, or override specific settings directly with keyword arguments for ease of use. Args: config: A pre-configured GraphizyConfig object. If None, a default config is created. **kwargs: Keyword arguments to override default settings. These are applied on top of the provided or default config. Examples: - dimension=(800, 600) - line_color=(255, 0, 0) - proximity_thresh=75.0 - data_shape=[('id', int), ('x', int), ('y', int)] Raises: GraphCreationError: If initialization fails due to configuration issues. Examples: >>> # Easiest way: Use keyword arguments >>> grapher = Graphing(dimension=(800, 600), line_color=(255, 0, 0)) >>> # Power-user way: Create and pass a config object >>> my_config = GraphizyConfig() >>> my_config.drawing.point_radius = 10 >>> grapher = Graphing(config=my_config) >>> # Hybrid: Use a base config and override with a keyword >>> grapher = Graphing(config=my_config, point_radius=5) # 5 wins """ try: # If a config object is passed, use it and update it. # If not, create a new config directly from the keyword arguments, # which ensures the __post_init__ validation in the config classes runs. if config: self.config = config.copy() self.config.update(**kwargs) else: self.config = GraphizyConfig(**kwargs) # Set main attributes from the final configuration self.dimension = self.config.graph.dimension self.aspect = self.config.graph.aspect # Initialize data interface for handling different data formats self.data_interface = DataInterface(self.config.graph.data_shape) # Get the graph registry self.registry = get_graph_registry() # Initialize optional managers self.memory_manager = None self.weight_computer = None self.fast_computer = None if self.config.weight.auto_compute_weights: logging.info("Auto-computation of weights is enabled.") self.init_weight_computer() # Initialize the visualizer self.visualizer = Visualizer(self.config.drawing, self.config.graph.dimension) logging.info(f"Graphing object initialized: {self.dimension} canvas, '{self.aspect}' aspect") except (InvalidDimensionError, InvalidAspectError, ValueError) as e: # Re-raise specific, expected configuration errors as-is. raise except Exception as e: # Wrap any other unexpected errors in a generic GraphCreationError. raise GraphCreationError(f"Failed to initialize Graphing object: {str(e)}") from e # ============================================================================ # CONFIGURATIONS FUNCTIONS # ============================================================================ @property def drawing_config(self) -> DrawingConfig: """ Get current drawing configuration. Returns: DrawingConfig: Current drawing configuration object containing line and point styling parameters. """ return self.config.drawing @property def graph_config(self) -> GraphConfig: """ Get current graph configuration. Returns: GraphConfig: Current graph configuration object containing dimension, aspect, and algorithm parameters. """ return self.config.graph
[docs] def update_config(self, **kwargs) -> None: """ Update configuration parameters at runtime. This method allows dynamic reconfiguration of the Graphing object without requiring re-initialization. Changes are applied immediately and cached values are updated. Args: **kwargs: Configuration parameters to update. Can include nested parameters using dictionary syntax: - drawing={'line_color': (255,0,0), 'point_radius': 8} - graph={'proximity_threshold': 100.0} - Direct parameters: line_thickness=3, aspect='dict' Raises: GraphCreationError: If configuration update fails due to invalid parameters. Examples: >>> # Update drawing parameters >>> grapher.update_config( ... drawing={'line_color': (0, 255, 0), 'line_thickness': 2} ... ) >>> # Update graph parameters >>> grapher.update_config( ... graph={'proximity_threshold': 75.0, 'distance_metric': 'manhattan'} ... ) >>> # Mixed updates >>> grapher.update_config( ... line_color=(255, 255, 0), ... graph={'dimension': (1200, 800)} ... ) """ try: self.config.update(**kwargs) # Update instance variables if graph config changed if 'graph' in kwargs or 'dimension' in kwargs: self.dimension = self.config.graph.dimension if 'graph' in kwargs or 'aspect' in kwargs: self.aspect = self.config.graph.aspect if 'graph' in kwargs and 'data_shape' in kwargs.get('graph', {}): self.data_interface = DataInterface(self.config.graph.data_shape) logging.info("Configuration updated successfully") except Exception as e: raise GraphCreationError(f"Failed to update configuration: {str(e)}")
[docs] @staticmethod def identify_graph(graph: Any) -> Any: """ Replace graph vertex names with proper particle IDs for consistency. This method ensures that graph vertices have consistent naming by setting the "name" attribute to match the "id" attribute. This is useful for maintaining data consistency across different graph operations. Args: graph: igraph Graph object to modify. Returns: Any: The modified graph object with updated vertex names. Raises: GraphCreationError: If graph is None or modification fails. Note: This method modifies the graph in-place and also returns it for method chaining convenience. Examples: >>> graph = grapher.make_delaunay(data) >>> identified_graph = Graphing.identify_graph(graph) >>> # Now graph.vs["name"] == graph.vs["id"] for all vertices """ try: if graph is None: raise GraphCreationError("Graph cannot be None") graph.vs["name"] = graph.vs["id"] return graph except Exception as e: raise GraphCreationError(f"Failed to identify graph: {str(e)}")
[docs] def set_graph_type(self, graph_type: Union[str, List[str], Tuple[str]], **default_kwargs): """ Set the type(s) of graph to generate automatically during updates. This method configures the Graphing object to automatically create specific graph types when update_graphs() is called with new data. Supports single or multiple graph types with default parameters. Args: graph_type: Graph type(s) to generate automatically. Can be: - str: Single graph type (e.g., 'delaunay') - List[str]: Multiple graph types (e.g., ['delaunay', 'proximity']) - Tuple[str]: Multiple graph types as tuple **default_kwargs: Default parameters for graph creation, applied to all types. Type-specific parameters can be set using update_graph_params(). Raises: ValueError: If any graph_type is not recognized. GraphCreationError: If configuration fails. Examples: >>> # Set single graph type >>> grapher.set_graph_type('delaunay') >>> # Set multiple graph types >>> grapher.set_graph_type(['delaunay', 'proximity', 'knn']) >>> # Set with default parameters >>> grapher.set_graph_type('proximity', proximity_thresh=50.0, metric='euclidean') >>> # Set multiple types with defaults >>> grapher.set_graph_type(['knn', 'gabriel'], k=6) # k applies only to knn """ try: # Normalize input to list if isinstance(graph_type, str): self.graph_types = [graph_type] elif isinstance(graph_type, (list, tuple)): self.graph_types = list(graph_type) else: raise ValueError(f"graph_type must be str, list, or tuple, got {type(graph_type)}") # Validate all graph types are recognized available_types = set(self.list_graph_types().keys()) for gtype in self.graph_types: if gtype not in available_types: raise ValueError(f"Unknown graph type '{gtype}'. Available: {sorted(available_types)}") # Store default parameters for each graph type self.graph_type_params = {} for gtype in self.graph_types: self.graph_type_params[gtype] = default_kwargs.copy() # Store current graphs (will be populated by update_graphs) self.current_graphs = {} logging.info(f"Graph types set to: {self.graph_types}") if default_kwargs: logging.info(f"Default parameters: {default_kwargs}") except Exception as e: raise GraphCreationError(f"Failed to set graph type: {str(e)}")
[docs] def clear_graph_types(self): """ Clear all configured graph types and current graphs. """ self.graph_types = [] self.graph_type_params = {} self.current_graphs = {} logging.info("Cleared all graph types")
[docs] def get_graph_type_info(self) -> Dict[str, Any]: """ Get information about current graph type configuration. Returns: Dict[str, Any]: Configuration information including types, parameters, and status. """ if not hasattr(self, 'graph_types'): return {'configured': False, 'message': 'No graph types configured'} return { 'configured': True, 'graph_types': self.graph_types.copy(), 'parameters': self.graph_type_params.copy(), 'current_graphs_available': { gtype: (graph is not None) for gtype, graph in getattr(self, 'current_graphs', {}).items() } }
[docs] def update_graph_params(self, graph_type: str, **kwargs): """ Update parameters for a specific graph type. Args: graph_type: The graph type to update parameters for. **kwargs: Parameters to set for this graph type. Examples: >>> grapher.set_graph_type(['proximity', 'knn']) >>> grapher.update_graph_params('proximity', proximity_thresh=75.0, metric='manhattan') >>> grapher.update_graph_params('knn', k=8) """ if not hasattr(self, 'graph_types') or graph_type not in self.graph_types: raise ValueError(f"Graph type '{graph_type}' not in current types: {getattr(self, 'graph_types', [])}") self.graph_type_params[graph_type].update(kwargs) logging.info(f"Updated parameters for '{graph_type}': {kwargs}")
[docs] def update(self, **kwargs): """ Update configuration values at runtime from keyword arguments. This method can intelligently route flat keys (e.g., 'line_color') to the correct nested config object (e.g., self.drawing). """ for key, value in kwargs.items(): # Check for nested dictionary updates first (e.g., drawing={...}) if hasattr(self, key) and isinstance(getattr(self, key), (DrawingConfig, GraphConfig, MemoryConfig, WeightConfig, GenerationConfig, LoggingConfig)): config_obj = getattr(self, key) if isinstance(value, dict): for nested_key, nested_value in value.items(): if hasattr(config_obj, nested_key): setattr(config_obj, nested_key, nested_value) else: raise ValueError(f"Unknown config key in '{key}': {nested_key}") else: # Allow replacing the whole object, e.g., config.update(drawing=my_drawing_config) setattr(self, key, value) # Route flat keys to the correct sub-config elif hasattr(self.drawing, key): setattr(self.drawing, key, value) elif hasattr(self.graph, key): setattr(self.graph, key, value) elif hasattr(self.generation, key): setattr(self.generation, key, value) elif hasattr(self.memory, key): setattr(self.memory, key, value) elif hasattr(self.weight, key): setattr(self.weight, key, value) elif hasattr(self.logging, key): setattr(self.logging, key, value) else: raise ValueError(f"Unknown configuration key: {key}")
# ============================================================================ # CONVENIENT CONVERSION # ============================================================================ def _get_data_as_array(self, data_points: Union[np.ndarray, Dict[str, Any]]) -> np.ndarray: """ Internal helper that delegates to DataInterface for all conversions. """ try: return self.data_interface.to_array(data_points) except Exception as e: raise GraphCreationError(f"Failed to convert data to array format: {str(e)}") # ============================================================================ # CORE UPDATES FUNCTIONS # ============================================================================
[docs] def update_graphs(self, data_points: Union[np.ndarray, Dict[str, Any]], update_memory: Optional[bool] = None, use_memory: Optional[bool] = None, compute_weights: Optional[bool] = None, **override_kwargs) -> Dict[str, Any]: """ Update all configured graph types with new data using smart memory and weight defaults. This method automatically creates graphs of all types specified by set_graph_type() using the provided data. Optionally updates memory manager, computes weights, and returns all generated graphs. Uses the same smart defaults as make_graph(). Processing Flow: graph → memory → weights (for each graph type) Args: data_points: New point data in the format specified by self.aspect. update_memory: Whether to update memory manager with new graphs. If None and memory manager exists, defaults based on use_memory. Only works if memory_manager is initialized. use_memory: Whether to create memory-enhanced graphs from existing connections. If None and memory manager exists, defaults to True. Only works if memory_manager is initialized. compute_weights: Whether to compute edge weights for final graphs. If None, uses self.auto_compute_weights default. Only works if weight_computer is initialized. **override_kwargs: Parameters that override defaults for this update only. Returns: Dict[str, Any]: Dictionary mapping graph type names to generated graph objects. Each graph follows the pipeline: base_graph → memory → weights Smart Defaults: Memory: - If memory_manager exists and use_memory=None → use_memory=True - If use_memory=True and update_memory=None → update_memory=True - If no memory_manager → both default to False Weights: - If compute_weights=None → use self.auto_compute_weights - If weight_computer not set → weights skipped regardless of setting Examples: >>> # Set up automatic graph generation with memory and weights >>> grapher.set_graph_type(['delaunay', 'proximity', 'knn']) >>> grapher.init_memory_manager(max_memory_size=200) >>> grapher.init_weight_computer(WeightComputer(method="distance")) >>> grapher.update_graph_params('proximity', proximity_thresh=60.0) >>> grapher.update_graph_params('knn', k=5) >>> # Basic update - uses all smart defaults >>> new_data = np.random.rand(100, 3) * 100 >>> graphs = grapher.update_graphs(new_data) >>> # Each graph: structure → memory → weights >>> # Explicit control over all processing steps >>> graphs = grapher.update_graphs( ... new_data, ... use_memory=False, # Skip memory ... update_memory=True, # But learn from current ... compute_weights=True # Force weights ... ) >>> # Memory + parameter overrides >>> graphs = grapher.update_graphs( ... new_data, ... use_memory=True, ... compute_weights=False, # Skip weights this time ... proximity_thresh=75.0 # Override proximity threshold ... ) >>> # Different settings per call >>> learning_graphs = grapher.update_graphs(new_data, use_memory=False, update_memory=True) >>> memory_graphs = grapher.update_graphs(new_data, use_memory=True, update_memory=False) Note: - All graphs follow the same pipeline: graph → memory → weights - Memory processing can change which edges exist before weights are computed - Weight computation adds attributes to final edge set - Smart defaults minimize configuration while maintaining full control - Failed graphs are set to None but don't stop other graph generation """ try: if not hasattr(self, 'graph_types'): raise GraphCreationError("No graph types set. Call set_graph_type() first.") # Apply smart defaults based on memory manager state if self.memory_manager is not None: # Memory manager exists - default to using memory if use_memory is None: use_memory = True # If using memory, default to updating it too (continuous learning) if use_memory and update_memory is None: update_memory = True else: # No memory manager - default to no memory operations if use_memory is None: use_memory = False if update_memory is None: update_memory = False # Apply smart defaults for weight computation if compute_weights is None: compute_weights = self.config.weight.auto_compute_weights timer_start = time.time() updated_graphs = {} # Generate each configured graph type with full pipeline for graph_type in self.graph_types: try: # Get stored parameters for this graph type graph_params = self.graph_type_params[graph_type].copy() # Create the graph using make_graph with full pipeline: graph → memory → weights graph = self.make_graph( graph_type=graph_type, data_points=data_points, graph_params=graph_params, update_memory=update_memory, # Memory processing use_memory=use_memory, # Memory processing compute_weights=compute_weights, # Weight processing **override_kwargs # These override graph_params ) updated_graphs[graph_type] = graph logging.debug(f"Updated {graph_type} graph successfully") except Exception as e: logging.error(f"Failed to update {graph_type} graph: {e}") updated_graphs[graph_type] = None # Store current graphs self.current_graphs = updated_graphs elapsed_ms = round((time.time() - timer_start) * 1000, 3) successful_updates = sum(1 for g in updated_graphs.values() if g is not None) # Enhanced logging with memory and weight info processing_status = [] if self.memory_manager is not None: processing_status.append(f"memory: use={use_memory}, update={update_memory}") if self.weight_computer is not None: processing_status.append(f"weights: compute={compute_weights}") status_str = f" ({', '.join(processing_status)})" if processing_status else "" logging.info( f"Updated {successful_updates}/{len(self.graph_types)} graphs in {elapsed_ms}ms{status_str}") return updated_graphs except Exception as e: raise GraphCreationError(f"Failed to update graphs: {str(e)}")
[docs] def update_graphs_memory_only(self, data_points: Union[np.ndarray, Dict[str, Any]], compute_weights: Optional[bool] = None, **override_kwargs) -> Dict[str, Any]: """ Convenience method to update graphs using only memory (no current data learning). This creates graphs purely from accumulated memory connections without updating the memory with current data. Useful for seeing what the "remembered" graph structure looks like. Can still compute weights on the memory-based edges. Args: data_points: Current point data (positions only, connections from memory). compute_weights: Whether to compute weights on memory-based edges. If None, uses auto_compute_weights default. **override_kwargs: Parameter overrides for graph creation. Returns: Dict[str, Any]: Dictionary of memory-based graphs with optional weights. Examples: >>> # Build up memory over time >>> grapher.update_graphs(data1) # Learn from data1 >>> grapher.update_graphs(data2) # Learn from data2 >>> # See what the accumulated memory looks like (with weights) >>> memory_graphs = grapher.update_graphs_memory_only(current_data, compute_weights=True) >>> # Pure memory structure without weights >>> memory_structure = grapher.update_graphs_memory_only(current_data, compute_weights=False) """ return self.update_graphs( data_points=data_points, use_memory=True, update_memory=False, # Don't learn from current compute_weights=compute_weights, **override_kwargs )
[docs] def update_graphs_learning_only(self, data_points: Union[np.ndarray, Dict[str, Any]], compute_weights: Optional[bool] = None, **override_kwargs) -> Dict[str, Any]: """ Convenience method to create regular graphs and update memory (no memory usage). This creates graphs from current data and adds the connections to memory for future use, but doesn't use existing memory for the current graphs. Can still compute weights on the current edges. Args: data_points: Current point data. compute_weights: Whether to compute weights on current edges. If None, uses auto_compute_weights default. **override_kwargs: Parameter overrides for graph creation. Returns: Dict[str, Any]: Dictionary of current graphs with optional weights (memory updated as side effect). Examples: >>> # Build up memory without using it yet (with weights for analysis) >>> grapher.update_graphs_learning_only(data1, compute_weights=True) >>> grapher.update_graphs_learning_only(data2, compute_weights=True) >>> # Now use accumulated memory >>> memory_graphs = grapher.update_graphs_memory_only(current_data) """ return self.update_graphs( data_points=data_points, use_memory=False, # Don't use existing memory update_memory=True, # But learn from current compute_weights=compute_weights, **override_kwargs )
[docs] def get_current_graphs(self) -> Dict[str, Any]: """ Get the most recently generated graphs. Returns: Dict[str, Any]: Dictionary of current graphs by type name. """ return getattr(self, 'current_graphs', {})
[docs] def get_current_graph(self, graph_type: str) -> Any: """ Get the most recent graph of a specific type. Args: graph_type: The type of graph to retrieve. Returns: Any: The igraph Graph object, or None if not available. """ current_graphs = self.get_current_graphs() return current_graphs.get(graph_type, None)
# ============================================================================ # PLUGIN SYSTEM METHODS # ============================================================================
[docs] def make_graph(self, graph_type: str, data_points: Union[np.ndarray, Dict[str, Any]], graph_params: Optional[Dict] = None, update_memory: Optional[bool] = None, use_memory: Optional[bool] = None, compute_weights: Optional[bool] = None, do_timing: bool = False, validate_data: bool = False, **kwargs) -> Any: """ Create a graph using the extensible plugin system with intelligent memory defaults. This method provides access to both built-in and community-contributed graph types through a unified interface. It automatically handles data format conversion and passes the appropriate parameters to the graph creation algorithm. Optionally integrates with memory system using smart defaults. Args: graph_type: Name of the graph type to create. Built-in types include: 'delaunay', 'proximity', 'knn', 'gabriel', 'mst', 'memory'. Additional types may be available through plugins. data_points: Point data in the format specified by self.aspect: - For "array": NumPy array with shape (n, 3) containing [id, x, y] - For "dict": Dictionary with keys "id", "x", "y" as lists/arrays graph_params: Dictionary of parameters specific to the graph type. If None, uses empty dict. These are the algorithm-specific parameters: - proximity: {'proximity_thresh': 50.0, 'metric': 'euclidean'} - knn: {'k': 5} - mst: {'metric': 'euclidean'} - etc. update_memory: Whether to update memory manager with the created graph. If None and memory manager exists, defaults based on use_memory. Only works if memory_manager is initialized. use_memory: Whether to create a memory-enhanced graph from existing connections. If None and memory manager exists, defaults to True. Only works if memory_manager is initialized. compute_weights: Whether to compute edges weights. Only works if weight_computer is initialized. do_timing: Whether to print the performances validate_data: Whether to validate the data (careful at each call this will degrade the performances) **kwargs: Additional graph-type specific parameters that override graph_params. These are merged with graph_params, with kwargs taking precedence. Returns: Any: igraph Graph object of the specified type, optionally memory-enhanced. Raises: ValueError: If graph_type is not found in the registry. GraphCreationError: If graph creation fails due to invalid parameters or computation errors. Smart Defaults: - If memory_manager exists and use_memory=None → use_memory=True - If use_memory=True and update_memory=None → update_memory=True - If no memory_manager → both default to False Examples: >>> # Simple direct usage (most common) >>> graph = grapher.make_graph('delaunay', data) >>> connections = grapher.make_graph('proximity', data, proximity_thresh=80.0) >>> knn_graph = grapher.make_graph('knn', data, k=5) >>> # Using graph_params dictionary (for complex configs) >>> prox_params = {'proximity_thresh': 75.0, 'metric': 'manhattan'} >>> graph = grapher.make_graph('proximity', data, graph_params=prox_params) >>> # Mixed usage - kwargs override graph_params >>> graph = grapher.make_graph('proximity', data, ... graph_params={'proximity_thresh': 50.0}, ... proximity_thresh=100.0) # This wins >>> # Memory control with direct parameters >>> graph = grapher.make_graph('knn', data, k=8, use_memory=False, update_memory=True) >>> # Both styles work seamlessly >>> algorithm_params = {'proximity_thresh': 60.0, 'metric': 'euclidean'} >>> graph1 = grapher.make_graph('proximity', data, graph_params=algorithm_params) >>> graph2 = grapher.make_graph('proximity', data, proximity_thresh=60.0, metric='euclidean') >>> # graph1 and graph2 are equivalent Note: - Direct kwargs are the most convenient: make_graph('proximity', data, proximity_thresh=80.0) - graph_params provides clean organization for complex configurations - kwargs override graph_params for convenient parameter overrides - Both styles can be mixed: graph_params for base config, kwargs for overrides - Smart defaults make memory usage automatic when memory_manager exists - Explicit parameters always override defaults - use_memory=True: Uses EXISTING memory connections from previous calls - update_memory=True: Adds current graph connections to memory for future use - Memory creates historical connection patterns for temporal analysis """ # Handle parameters if graph_params is None: graph_params = {} final_params = graph_params.copy() final_params.update(kwargs) # Smart defaults for memory if self.memory_manager is not None: if use_memory is None: use_memory = True if use_memory and update_memory is None: update_memory = True else: if use_memory is None: use_memory = False if update_memory is None: update_memory = False # Smart defaults for weights if compute_weights is None: compute_weights = self.config.weight.auto_compute_weights try: data_array = self.data_interface.to_array(data_points, validate_data=validate_data) # STEP 1: Create base graph if do_timing: start_time_graph = time.perf_counter() graph = self.registry.create_graph( graph_type=graph_type, data_points=data_array, dimension=self.dimension, data_shape=self.data_interface.data_shape, **final_params ) if do_timing: end_time_graph = time.perf_counter() - start_time_graph start_time_memory = time.perf_counter() # STEP 2: Apply memory processing (modifies graph structure) graph = self._maybe_apply_memory(graph, use_memory, update_memory) if do_timing: end_time_memory = time.perf_counter() - start_time_memory start_time_weights = time.perf_counter() # STEP 3: Compute weights (adds attributes to existing edges) graph = self._maybe_compute_weights(graph, compute_weights) if do_timing: end_time_weights = time.perf_counter() - start_time_weights print( f"Graph '{graph_type}' with data shape: {data_array.shape} > {end_time_graph*1000:.1f} ms /" f" memory >{end_time_memory*1000}/ ms" f" weights >{end_time_weights*1000} ms") return graph except Exception as e: raise GraphCreationError(f"Failed to create {graph_type} graph: {str(e)}")
[docs] @staticmethod def list_graph_types(category: Optional[str] = None) -> Dict[str, Any]: """ List all available graph types in the plugin registry. Args: category: Optional category filter to show only specific types: - 'built-in': Core graph types included with graphizy - 'community': Community-contributed plugins - 'experimental': Experimental or unstable plugins - None: Show all available types Returns: Dict[str, Any]: Dictionary mapping graph type names to their information. Each entry contains metadata about the graph type including description, category, version, and available parameters. Examples: >>> # List all graph types >>> all_types = Graphing.list_graph_types() >>> for name, info in all_types.items(): ... print(f"{name}: {info['description']}") >>> # List only built-in types >>> builtin_types = Graphing.list_graph_types(category='built-in') >>> # Check if specific type is available >>> available_types = Graphing.list_graph_types() >>> if 'delaunay' in available_types: ... print("Delaunay triangulation is available") """ from .plugins_logic import get_graph_registry registry = get_graph_registry() return registry.list_plugins(category)
[docs] @staticmethod def get_plugin_info(graph_type: str) -> Dict[str, Any]: """ Get detailed information about a specific graph type. Args: graph_type: Name of the graph type to query. Returns: Dict[str, Any]: Detailed information including: - info: General information (description, category, version) - parameters: List of available parameters with descriptions - examples: Usage examples if available - requirements: Special requirements or dependencies Raises: ValueError: If graph_type is not found in the registry. Examples: >>> # Get info about proximity graphs >>> prox_info = Graphing.get_plugin_info('proximity') # ✅ Fixed method name >>> print(prox_info['info']['description']) >>> print("Parameters:", prox_info['parameters']) >>> # Check parameter details before calling >>> knn_info = Graphing.get_plugin_info('knn') # ✅ Fixed method name >>> k_param = knn_info['parameters']['k'] >>> print(f"k parameter: {k_param['description']}") """ from .plugins_logic import get_graph_registry registry = get_graph_registry() plugin = registry.get_plugin(graph_type) return { "info": plugin.info.__dict__, "parameters": plugin.info.parameters }
# ============================================================================ # VISUALIZATION METHODS (Delegated to Visualizer) # ============================================================================
[docs] def draw_graph(self, graph: Any, **kwargs) -> np.ndarray: """ Draw a graph to an image array. This method provides a convenient top-level API by delegating the drawing task to the internal Visualizer instance. Args: graph: igraph Graph object to draw. **kwargs: Additional arguments for the visualizer, e.g., 'radius', 'thickness'. Returns: np.ndarray: An RGB image array of the drawn graph. """ try: return self.visualizer.draw_graph(graph, **kwargs) except Exception as e: raise DrawingError(f"Failed to draw graph: {e}") from e
[docs] def draw_all_graphs(self, **kwargs) -> Dict[str, np.ndarray]: """ Draw all current graphs to image arrays. Args: **kwargs: Drawing parameters passed to draw_graph(). Returns: Dict[str, np.ndarray]: Dictionary mapping graph types to image arrays. """ images = {} current_graphs = self.get_current_graphs() for graph_type, graph in current_graphs.items(): if graph is not None: try: images[graph_type] = self.draw_graph(graph, **kwargs) except Exception as e: logging.error(f"Failed to draw {graph_type} graph: {e}") images[graph_type] = None return images
[docs] def draw_memory_graph(self, graph: Any, **kwargs) -> np.ndarray: """ Draw a memory graph with optional age-based coloring. Delegates to the Visualizer's draw_memory_graph method. Args: graph: igraph Graph object to draw. **kwargs: Additional arguments like 'use_age_colors', 'alpha_range'. Returns: np.ndarray: An RGB image array of the drawn memory graph. """ try: return self.visualizer.draw_memory_graph(graph, **kwargs) except Exception as e: raise DrawingError(f"Failed to draw memory graph: {e}") from e
[docs] def overlay_graph(self, image_graph: np.ndarray, graph: Any) -> np.ndarray: """ Overlay an additional graph onto an existing image. Delegates to the Visualizer's overlay_graph method. Args: image_graph: The base image to draw on. graph: The igraph Graph object to overlay. Returns: np.ndarray: The modified image array. """ try: return self.visualizer.overlay_graph(image_graph, graph) except Exception as e: raise DrawingError(f"Failed to overlay graph: {e}") from e
[docs] def overlay_collision(self, image_graph: np.ndarray, graph: Any) -> np.ndarray: """ Overlay an additional graph onto an existing image. Delegates to the Visualizer's overlay_graph method. Args: image_graph: The base image to draw on. graph: The igraph Graph object to overlay. Returns: np.ndarray: The modified image array. """ try: return self.visualizer.overlay_collision(image_graph, graph) except Exception as e: raise DrawingError(f"Failed to overlay graph: {e}") from e
[docs] def show_graph(self, image_graph: np.ndarray, title: str = "Graphizy", **kwargs) -> None: """ Display a graph image in a window. Delegates to the Visualizer's show_graph method. Args: image_graph: The image array to display. title: The title of the window. **kwargs: Additional arguments like 'block'. """ try: self.visualizer.show_graph(image_graph, title, **kwargs) except Exception as e: raise DrawingError(f"Failed to show graph: {e}") from e
[docs] def show_all_graphs(self, **kwargs): """ Display all current graphs in separate windows. Args: **kwargs: Parameters passed to show_graph(). """ images = self.draw_all_graphs() for graph_type, image in images.items(): if image is not None: title = kwargs.get('title', f"Graphizy - {graph_type.title()}") self.show_graph(image, title=title)
[docs] def save_graph(self, image_graph: np.ndarray, filename: str) -> None: """ Save a graph image to a file. Delegates to the Visualizer's save_graph method. Args: image_graph: The image array to save. filename: The path to save the file to. """ try: self.visualizer.save_graph(image_graph, filename) except Exception as e: raise DrawingError(f"Failed to save graph: {e}") from e
# ============================================================================ # MEMORY MANAGEMENT METHODS # ============================================================================
[docs] def init_memory_manager(self, max_memory_size: int = 100, max_iterations: int = None, track_edge_ages: bool = True) -> 'MemoryManager': """ Initialize memory manager for temporal graph analysis. The memory manager enables tracking of graph connections over time, allowing for analysis of persistent vs. transient relationships and temporal patterns in graph structure. Args: max_memory_size: Maximum number of connections to remember. Older connections are forgotten when this limit is reached. Larger values provide longer memory but use more resources. max_iterations: Maximum number of time steps to track. If None, tracks indefinitely until max_memory_size is reached. track_edge_ages: Whether to track the age/persistence of each edge. Enables advanced temporal analysis but uses more memory. Returns: MemoryManager: The initialized memory manager instance. Raises: GraphCreationError: If memory manager initialization fails. Examples: >>> # Basic memory manager >>> memory_mgr = grapher.init_memory_manager() >>> # Large memory for long-term analysis >>> memory_mgr = grapher.init_memory_manager( ... max_memory_size=1000, ... max_iterations=100, ... track_edge_ages=True ... ) >>> # Lightweight memory for real-time applications >>> memory_mgr = grapher.init_memory_manager( ... max_memory_size=50, ... track_edge_ages=False ... ) Note: - Must be called before using memory-based graph methods - Only one memory manager per Graphing instance - Memory manager persists until explicitly reset or object destroyed """ try: self.memory_manager = MemoryManager(max_memory_size, max_iterations, track_edge_ages) logging.info(f"Memory manager initialized: max_size={max_memory_size}, " f"max_iterations={max_iterations}, track_ages={track_edge_ages}") self.visualizer.memory_manager=self.memory_manager return self.memory_manager except Exception as e: raise GraphCreationError(f"Failed to initialize memory manager: {str(e)}")
def _ensure_memory_integration(self, operation_name: str): """Helper to check memory manager state before operations""" if self.memory_manager is None: logging.warning(f"{operation_name} called but no memory manager initialized") return False return True def _maybe_apply_memory(self, graph: Any, use_memory: bool, update_memory: bool) -> Any: """ Apply memory processing to modify graph structure based on historical connections. This method provides flexible control over memory operations in graph processing workflows. It can learn from current graphs to build temporal connection patterns and/or create memory-enhanced graphs that combine current and historical connections. Memory processing enables analysis of temporal stability, identification of core vs. peripheral connections, and tracking of relationship persistence over time in dynamic systems. Args: graph (Any): Input igraph Graph object to process. Must have vertex "id" attributes for proper memory integration. All vertex and edge attributes are preserved. use_memory (bool): If True, returns a memory-enhanced graph that includes both current connections and historical connections from memory. If False, returns the original graph (potentially after memory update). update_memory (bool): If True, learns connection patterns from the input graph and adds them to the memory system for future use. If False, uses existing memory without learning from current graph. Returns: Any: igraph Graph object with behavior determined by parameter combination: - use_memory=False, update_memory=True: Returns original graph unchanged, but memory system is updated with current connections for future use. - use_memory=True, update_memory=True: Returns memory-enhanced graph with additional edges from historical connections. Enhanced graph includes: * Original edges with all attributes preserved * Memory edges with attributes: "memory_based"=True, "age"=iterations, "weight"=strength - use_memory=True, update_memory=False: Returns memory-enhanced graph using existing memory without learning from current graph. - use_memory=False, update_memory=False: Returns original graph unchanged (no memory operations performed). Raises: GraphCreationError: If memory operations fail due to invalid graph structure, missing vertex IDs, or internal memory system errors. Examples: >>> # Learn from current graph without enhancement (training mode) >>> result = grapher._maybe_apply_memory( ... proximity_graph, ... use_memory=False, ... update_memory=True ... ) >>> # result is identical to proximity_graph, but memory updated >>> assert result.ecount() == proximity_graph.ecount() >>> # Create memory-enhanced graph with historical connections (analysis mode) >>> enhanced = grapher._maybe_apply_memory( ... current_graph, ... use_memory=True, ... update_memory=True ... ) >>> print(f"Original: {current_graph.ecount()} edges") >>> print(f"Enhanced: {enhanced.ecount()} edges") >>> # Enhanced graph has current + historical edges >>> # Use existing memory without learning (inference mode) >>> memory_only = grapher._maybe_apply_memory( ... sparse_graph, ... use_memory=True, ... update_memory=False ... ) >>> # Uses existing memory patterns without updating from sparse_graph Note: - Requires memory_manager to be initialized via init_memory_manager() - Memory-enhanced graphs preserve all original vertex and edge attributes - Historical connections are only added if both vertices exist in current graph - Memory cleanup occurs automatically when configured size limits are exceeded - Performance scales with graph size; vectorized operations provide 5-10x speedup - Logging at DEBUG level provides detailed operation timing and edge counts See Also: init_memory_manager(): Initialize memory system with size and aging parameters get_memory_stats(): Retrieve statistics about current memory state and performance clear_memory(): Clear all historical connections from memory system MemoryManager.add_graph_vectorized(): Low-level vectorized memory operations """ if not self.memory_manager: return graph if update_memory and not use_memory: # Learn from current graph but do not return the memory graph self.memory_manager.add_graph_vectorized(graph, return_memory_graph=False) logging.debug("Updated memory with current graph connections") return graph if use_memory: # Learn from current graph and create memory-enhanced graph memory_graph = self.memory_manager.add_graph_vectorized(graph, return_memory_graph=True) logging.debug("Created memory-enhanced graph from historical connections") return memory_graph return graph
[docs] def make_memory_graph(self, data_points: Union[np.ndarray, Dict[str, Any]], memory_connections: Optional[Dict] = None) -> Any: """ Create a graph based on accumulated memory connections. Memory graphs use historical connection data to create graphs that represent persistent relationships over time. This is useful for analyzing temporal stability and identifying core vs. peripheral connections in dynamic systems. Args: data_points: Current point data in the format specified by self.aspect: - For "array": NumPy array with shape (n, 3) containing [id, x, y] - For "dict": Dictionary with keys "id", "x", "y" as lists/arrays memory_connections: Optional explicit memory connections. If None, uses the current memory manager's accumulated connections. Format: {(id1, id2): connection_strength, ...} Returns: Any: igraph Graph object representing memory-based connections. Edge weights may represent connection persistence/frequency. Raises: GraphCreationError: If memory manager is not initialized and no memory_connections provided, or if graph creation fails. Examples: >>> # Initialize memory and accumulate connections over time >>> grapher.init_memory_manager(max_memory_size=200) >>> >>> # Update memory with multiple proximity snapshots >>> for t in range(10): ... dynamic_data = get_data_at_time(t) # Your data source ... grapher.update_memory_with_proximity(dynamic_data) >>> >>> # Create memory graph from accumulated connections >>> current_data = get_current_data() >>> memory_graph = grapher.make_memory_graph(current_data) >>> # Use explicit memory connections >>> custom_memory = {(1, 2): 0.8, (2, 3): 0.6, (1, 3): 0.3} >>> memory_graph = grapher.make_memory_graph(data, custom_memory) Note: - Requires either initialized memory manager or explicit connections - Memory graphs can be much sparser than instantaneous graphs - Edge weights typically represent temporal persistence - Useful for identifying stable vs. transient relationships """ try: # if memory_connections is None: if self.memory_manager is None: raise GraphCreationError("No memory manager initialized and no connections provided") # memory_connections = self.memory_manager.get_current_memory_graph() return self.memory_manager.create_memory_graph(data_points) except Exception as e: raise GraphCreationError(f"Failed to create memory graph: {str(e)}")
[docs] def update_memory_with_graph(self, graph: Any) -> Dict[str, List[str]]: """ Update memory manager from any existing graph object. This method extracts connections from an existing igraph Graph object and adds them to the memory manager. This is useful for incorporating connections computed by external algorithms or for combining multiple graph types in memory. Args: graph: igraph Graph object with vertices having "id" attributes and edges representing connections to remember. Raises: GraphCreationError: If memory manager is not initialized or update fails. Examples: >>> # Initialize memory >>> grapher.init_memory_manager() >>> >>> # Create various graph types and add to memory >>> delaunay_graph = grapher.make_delaunay(data) >>> grapher.update_memory_with_graph(delaunay_graph) >>> >>> knn_graph = grapher.make_knn(data, k=5) >>> grapher.update_memory_with_graph(knn_graph) >>> >>> # Memory now contains union of both graph types >>> combined_memory_graph = grapher.make_memory_graph(data) >>> # Update with external graph >>> external_graph = some_external_algorithm(data) >>> grapher.update_memory_with_graph(external_graph) Note: - Graph must have vertex "id" attributes matching your data - All edges in the graph will be added to memory - Useful for combining multiple graph construction methods - Can be used with any igraph-compatible graph object """ try: if self.memory_manager is None: raise GraphCreationError("Memory manager not initialized") return self.memory_manager.add_graph_vectorized(graph) except Exception as e: raise GraphCreationError(f"Failed to update memory with graph: {str(e)}")
[docs] def update_memory_with_custom(self, data_points: Union[np.ndarray, Dict[str, Any]], connection_function: callable, **kwargs) -> Dict[str, List[str]]: """ Update memory using a custom connection function. This method allows integration of custom graph algorithms or connection rules with the memory system. The connection function should return pairs of point IDs that should be connected. Args: data_points: Point data in the format specified by self.aspect: - For "array": NumPy array with shape (n, 3) containing [id, x, y] - For "dict": Dictionary with keys "id", "x", "y" as lists/arrays connection_function: Callable that takes data_points and returns connections. Should return iterable of (id1, id2) tuples or similar. **kwargs: Additional arguments passed to the connection function. Raises: GraphCreationError: If memory manager is not initialized or update fails. Examples: >>> # Define custom connection rule >>> def angular_connections(data_points, angle_thresh=45): ... \"\"\"Connect points with similar angles from origin\"\"\" ... connections = [] ... # Your custom logic here ... angles = np.arctan2(data_points[:, 2], data_points[:, 1]) # y, x ... for i, angle_i in enumerate(angles): ... for j, angle_j in enumerate(angles[i+1:], i+1): ... if abs(angle_i - angle_j) < np.radians(angle_thresh): ... connections.append((data_points[i, 0], data_points[j, 0])) ... return connections >>> >>> # Initialize memory and use custom function >>> grapher.init_memory_manager() >>> grapher.update_memory_with_custom( ... data, ... angular_connections, ... angle_thresh=30 ... ) >>> # Example with lambda function for simple rules >>> grapher.update_memory_with_custom( ... data, ... lambda pts: [(pts[i,0], pts[j,0]) for i in range(len(pts)) ... for j in range(i+1, len(pts)) ... if abs(pts[i,1] - pts[j,1]) < 10] # Connect similar x-coords ... ) Note: - Connection function should be efficient for large datasets - Function should return iterable of (id1, id2) pairs - Memory manager handles deduplication and aging automatically - Useful for domain-specific connection rules """ try: if self.memory_manager is None: raise GraphCreationError("Memory manager not initialized") return update_memory_from_custom_function( data_points, self.memory_manager, connection_function, self.aspect, **kwargs ) except Exception as e: raise GraphCreationError(f"Failed to update memory with custom function: {str(e)}")
[docs] def get_memory_analysis(self) -> Dict[str, Any]: """ Get comprehensive memory analysis including age statistics. This method provides detailed analysis of the temporal patterns in the memory manager's data, including connection persistence, age distributions, and temporal stability metrics. Returns: Dict[str, Any]: Comprehensive analysis including: - Basic statistics (count, usage, etc.) - Age distributions and temporal patterns - Connection persistence metrics - Stability analysis - Temporal trends Examples: >>> analysis = grapher.get_memory_analysis() >>> print("Connection age distribution:") >>> for age, count in analysis['age_distribution'].items(): ... print(f" Age {age}: {count} connections") >>> >>> print(f"Average connection persistence: {analysis['avg_persistence']:.2f}") """ try: if self.memory_manager is None: return {"error": "Memory manager not initialized"} return self.memory_manager.get_memory_stats() except Exception as e: return {"error": f"Failed to get memory analysis: {str(e)}"}
# ============================================================================ # WEIGHT COMPUTATION METHODS # ============================================================================
[docs] def init_weight_computer(self, **kwargs): """Initialize flexible weight computer.""" # If parameters are not passed directly, use the ones from the config if 'method' not in kwargs: kwargs['method'] = self.config.weight.weight_method if 'target_attribute' not in kwargs: kwargs['target_attribute'] = self.config.weight.weight_attribute if 'formula' not in kwargs and kwargs.get('method') == "formula": kwargs['formula'] = self.config.weight.weight_formula self.weight_computer = WeightComputer(**kwargs) self.config.weight.auto_compute_weights = True
[docs] def compute_edge_attribute(self, graph, attribute_name, method="formula", **kwargs): """Compute any edge attribute.""" if not hasattr(self, 'weight_computer'): self.weight_computer = WeightComputer() return self.weight_computer.compute_attribute( graph, attribute_name, method=method, **kwargs )
[docs] def setup_fast_attributes(self, **config): """Setup fast attribute computer for real-time use.""" self.fast_computer = setup_realtime_weight_computer(**config)
[docs] def compute_all_attributes_fast(self, graph): """Compute all pre-configured attributes quickly.""" if hasattr(self, 'fast_computer'): return self.fast_computer.compute_multiple_attributes_fast( graph, self.fast_computer._default_config ) return graph
def _maybe_compute_weights(self, graph: Any, compute_weights: bool = None) -> Any: """Internal method to auto-compute weights if enabled""" # Use the passed parameter, falling back to config if None if compute_weights is None: compute_weights = self.config.weight.auto_compute_weights if self.fast_computer: self.compute_all_attributes_fast(graph) if compute_weights and self.weight_computer is not None: return self.weight_computer.compute_weights(graph) return graph
[docs] def get_weight_analysis(self) -> Dict[str, Any]: """:Todo implement the weight statistics""" pass
# ============================================================================ # NETWORKX BRIDGE # ============================================================================
[docs] def get_networkx_analyzer(self) -> 'NetworkXAnalyzer': """ Get NetworkX analyzer for advanced graph analysis. Returns: NetworkXAnalyzer instance for this Graphing object Examples: >>> # Get analyzer >>> nx_analyzer = grapher.get_networkx_analyzer() >>> >>> # Analyze current graphs >>> analysis = nx_analyzer.analyze('delaunay') >>> print(f"Communities: {analysis['num_communities']}") >>> >>> # Direct NetworkX access >>> nx_graph = nx_analyzer.get_networkx('proximity') >>> custom_centrality = nx.eigenvector_centrality(nx_graph) """ from .networkx_bridge import NetworkXAnalyzer return NetworkXAnalyzer(self)
[docs] def to_networkx(self, graph_type: str = None, igraph_graph: Any = None) -> Any: """ Convert graph to NetworkX format. Args: graph_type: Type from current graphs igraph_graph: Manual igraph to convert Returns: NetworkX Graph object """ from .networkx_bridge import to_networkx if igraph_graph is not None: return to_networkx(igraph_graph) if graph_type is None: raise ValueError("Must provide either graph_type or igraph_graph") current_graphs = self.get_current_graphs() if graph_type not in current_graphs: raise ValueError(f"Graph type '{graph_type}' not found") return to_networkx(current_graphs[graph_type])
# ============================================================================ # ASYNC STEAM METHOD # ============================================================================
[docs] def create_stream_manager(self, buffer_size: int = 1000, update_interval: float = 0.1, auto_memory: bool = True) -> 'StreamManager': """Create a stream manager for real-time data processing""" from .streaming import StreamManager return StreamManager(self, buffer_size, update_interval, auto_memory)
[docs] def create_async_stream_manager(self, buffer_size: int = 1000) -> 'AsyncStreamManager': """Create async stream manager for high-performance streaming""" from .streaming import AsyncStreamManager return AsyncStreamManager(self, buffer_size)
# ============================================================================ # GRAPH ANALYSIS AND METRICS METHODS # ============================================================================
[docs] @staticmethod def get_connections_per_object(graph: Any) -> Dict[Any, int]: """ Calculate the degree (number of connections) for each vertex in the graph. This method provides a user-friendly mapping from original object IDs to their connectivity counts, which is essential for analyzing graph structure and identifying hubs or isolated nodes. Args: graph: igraph Graph object with vertices having "id" attributes. Returns: Dict[Any, int]: Dictionary mapping each object's original ID to its degree. Empty dict if graph is None or has no vertices. Raises: IgraphMethodError: If degree calculation fails. Examples: >>> connections = Graphing.get_connections_per_object(graph) >>> print(f"Object 101 has {connections[101]} connections") >>> >>> # Find most connected objects >>> sorted_objects = sorted(connections.items(), key=lambda x: x[1], reverse=True) >>> print(f"Most connected: {sorted_objects[:5]}") >>> >>> # Find isolated objects >>> isolated = [obj_id for obj_id, degree in connections.items() if degree == 0] >>> print(f"Isolated objects: {isolated}") >>> # Degree distribution analysis >>> from collections import Counter >>> degree_dist = Counter(connections.values()) >>> print(f"Degree distribution: {dict(degree_dist)}") Note: - Returns degree in graph-theoretic sense (number of incident edges) - For undirected graphs, each edge contributes 1 to each endpoint's degree - For directed graphs, returns total degree (in-degree + out-degree) - Empty graphs return empty dictionary - Object IDs must be stored in vertex "id" attribute """ try: if graph is None or graph.vcount() == 0: return {} # Get degrees and map to original IDs degrees = graph.degree() object_ids = graph.vs["id"] return {obj_id: degree for obj_id, degree in zip(object_ids, degrees)} except Exception as e: raise IgraphMethodError(f"Failed to get connections per object: {str(e)}")
[docs] @staticmethod def average_path_length(graph: Any) -> float: """ Calculate the average shortest path length between all pairs of vertices. This metric indicates how "close" vertices are to each other on average. Lower values suggest better connectivity and shorter communication paths. Args: graph: igraph Graph object, must be connected for meaningful results. Returns: float: Average path length across all vertex pairs. Raises: IgraphMethodError: If calculation fails (e.g., disconnected graph). Examples: >>> avg_path = Graphing.average_path_length(graph) >>> print(f"Average path length: {avg_path:.2f}") >>> # Compare different graph types >>> delaunay_avg = Graphing.average_path_length(delaunay_graph) >>> mst_avg = Graphing.average_path_length(mst_graph) >>> print(f"Delaunay: {delaunay_avg:.2f}, MST: {mst_avg:.2f}") Note: - Requires connected graph (use call_method_safe for disconnected graphs) - Computed over all pairs of vertices - Values typically range from 1 (complete graph) to n-1 (path graph) - Higher values indicate less efficient connectivity """ try: return call_igraph_method(graph, "average_path_length") except Exception as e: raise IgraphMethodError(f"Failed to calculate average path length: {str(e)}")
[docs] @staticmethod def density(graph: Any) -> float: """ Calculate the density of the graph. Density is the ratio of actual edges to possible edges, indicating how close the graph is to being complete. Values range from 0 (no edges) to 1 (complete graph). Args: graph: igraph Graph object. Returns: float: Graph density between 0.0 and 1.0. Examples: >>> density = Graphing.density(graph) >>> print(f"Graph density: {density:.3f} ({density*100:.1f}% of possible edges)") >>> # Compare sparsity of different graph types >>> print(f"Delaunay density: {Graphing.density(delaunay_graph):.3f}") >>> print(f"MST density: {Graphing.density(mst_graph):.3f}") >>> print(f"k-NN density: {Graphing.density(knn_graph):.3f}") Note: - MSTs have density 2(n-1)/(n(n-1)) = 2/(n) for n vertices - Complete graphs have density 1.0 - Empty graphs have density 0.0 - Useful for comparing graph sparsity """ try: dens = call_igraph_method(graph, "density") if np.isnan(dens): dens = 0.0 return dens except Exception as e: raise IgraphMethodError(f"Failed to calculate density: {str(e)}")
[docs] def call_method_brutal(self, graph: Any, method_name: str, return_format: str = "auto", *args, **kwargs) -> Any: """ Call any igraph method with intelligent return type formatting. This method provides flexible access to igraph's extensive method library with automatic formatting of results into user-friendly formats. It handles the conversion between igraph's internal representations and more intuitive Python data structures. Args: graph: igraph Graph object to operate on. method_name: Name of the igraph method to call (e.g., "betweenness", "closeness"). return_format: Output format specification: - "auto": Intelligent format detection (recommended) - "dict": Force dict format {object_id: value} for per-vertex results - "list": Force list format [value1, value2, ...] for array results - "raw": Return exactly what igraph provides (no processing) *args: Positional arguments passed to the igraph method. **kwargs: Keyword arguments passed to the igraph method. Returns: Any: Method result formatted according to return_format: - Per-vertex results: dict mapping object_id -> value (auto/dict) - Per-edge results: list of values (auto/list) - Scalar results: single value (all formats) - Complex results: depends on method and format Raises: IgraphMethodError: If method call fails or method doesn't exist. ValueError: If return_format is invalid. Examples: >>> # Get degree centrality as dict >>> degrees = grapher.call_method_brutal(graph, "degree", "dict") >>> print(f"Object 5 degree: {degrees[5]}") >>> # Get betweenness centrality with auto-formatting >>> betweenness = grapher.call_method_brutal(graph, "betweenness") >>> # Returns dict {object_id: betweenness_value} >>> # Get raw igraph output >>> raw_closeness = grapher.call_method_brutal(graph, "closeness", "raw") >>> # Call method with parameters >>> shortest_paths = grapher.call_method_brutal( ... graph, "shortest_paths", "raw", ... source=0, target=5 ... ) >>> # Edge-related method (returns list) >>> edge_betweenness = grapher.call_method_brutal(graph, "edge_betweenness") Note: - "auto" format is usually the most convenient - Per-vertex methods automatically map to object IDs when possible - Some methods may not support all return formats - Use "raw" format when you need igraph's exact output - Method availability depends on igraph version and graph type """ try: # Validate return_format parameter valid_formats = ["auto", "dict", "list", "raw"] if return_format not in valid_formats: raise ValueError(f"return_format must be one of {valid_formats}, got: {return_format}") # Call the underlying igraph method result = call_igraph_method(graph, method_name, *args, **kwargs) # Handle return formatting based on parameter if return_format == "raw": return result elif return_format == "list": # Force list format for list-like results if isinstance(result, list): return result elif hasattr(result, '__iter__') and not isinstance(result, (str, dict)): return list(result) else: return result elif return_format == "dict": # Force dict format for per-vertex results if isinstance(result, list): if len(result) == graph.vcount(): return {obj_id: value for obj_id, value in zip(graph.vs["id"], result)} else: # List doesn't match vertex count, return as-is with warning logging.warning(f"Method {method_name} returned list of length {len(result)} " f"but graph has {graph.vcount()} vertices. Returning raw list.") return result elif hasattr(result, '__iter__') and not isinstance(result, (str, dict)): # Convert other iterables to list and try dict conversion result_list = list(result) if len(result_list) == graph.vcount(): return {obj_id: value for obj_id, value in zip(graph.vs["id"], result_list)} else: return result_list else: return result elif return_format == "auto": # Intelligent automatic formatting (enhanced logic) if isinstance(result, list): # Check if it's a per-vertex result if len(result) == graph.vcount(): # Per-vertex result - return as dict mapping object_id -> value return {obj_id: value for obj_id, value in zip(graph.vs["id"], result)} elif len(result) == graph.ecount(): # Per-edge result - return as list (could enhance later for edge mapping) return result else: # Other list result (like connected components) - return as-is return result elif isinstance(result, (int, float, bool, str, type(None))): # Scalar values or None - return as-is return result elif hasattr(result, '__iter__') and not isinstance(result, (str, dict)): # Other iterable types (like igraph specific objects) try: result_list = list(result) if len(result_list) == graph.vcount(): # Looks like per-vertex data return {obj_id: value for obj_id, value in zip(graph.vs["id"], result_list)} else: return result_list except: # If conversion fails, return as-is return result else: # Complex objects, custom types, etc. - return as-is return result except ValueError: raise except Exception as e: raise IgraphMethodError(f"Failed to call method '{method_name}': {str(e)}")
[docs] def get_connectivity_info(self, graph: Any) -> Dict[str, Any]: """ Get comprehensive connectivity information about the graph. This method analyzes the graph's connectivity structure, identifying connected components and providing statistics about graph cohesion. Essential for understanding graph topology and planning analyses. Args: graph: igraph Graph object to analyze. Returns: Dict[str, Any]: Comprehensive connectivity information: - is_connected: Boolean indicating if graph is fully connected - num_components: Number of disconnected components - components: List of vertex lists for each component - component_sizes: List of component sizes - largest_component_size: Size of largest component - largest_component_index: Index of largest component - connectivity_ratio: Fraction of vertices in largest component - isolation_ratio: Fraction of isolated vertices (size-1 components) Examples: >>> conn_info = grapher.get_connectivity_info(graph) >>> if conn_info['is_connected']: ... print("Graph is fully connected") ... else: ... print(f"Graph has {conn_info['num_components']} components") ... print(f"Largest component: {conn_info['largest_component_size']} vertices") >>> # Analyze fragmentation >>> if conn_info['isolation_ratio'] > 0.1: ... print(f"Warning: {conn_info['isolation_ratio']:.1%} vertices are isolated") >>> # Focus analysis on largest component >>> if not conn_info['is_connected']: ... largest_comp = conn_info['components'][conn_info['largest_component_index']] ... subgraph = graph.subgraph(largest_comp) ... # Analyze subgraph... Note: - Connected components are maximal sets of mutually reachable vertices - Component indices refer to the components list - Isolated vertices form size-1 components - Useful for determining appropriate analysis methods """ try: components_result = self.call_method_brutal(graph, 'connected_components', "raw") # Convert to list of lists if it's an igraph-specific type if hasattr(components_result, '__iter__') and not isinstance(components_result, list): components = [list(comp) for comp in components_result] else: components = components_result is_connected = len(components) == 1 component_sizes = [len(comp) for comp in components] connectivity_info = { 'is_connected': is_connected, 'num_components': len(components), 'components': components, 'component_sizes': component_sizes, 'largest_component_size': max(component_sizes) if component_sizes else 0, 'largest_component_index': np.argmax(component_sizes) if component_sizes else None, 'connectivity_ratio': max(component_sizes) / graph.vcount() if graph.vcount() > 0 and component_sizes else 0, 'isolation_ratio': sum(1 for size in component_sizes if size == 1) / graph.vcount() if graph.vcount() > 0 else 0 } return connectivity_info except Exception as e: raise IgraphMethodError(f"Failed to get connectivity info: {str(e)}")
[docs] def is_connected(self, graph: Any) -> bool: """ Check if the graph is connected (single component). Args: graph: igraph Graph object to test. Returns: bool: True if graph is connected, False otherwise. """ return self.call_method_safe(graph, 'is_connected')
[docs] def call_method_safe(self, graph: Any, method_name: str, return_format: str = "auto", component_mode: str = "connected_only", handle_disconnected: bool = True, default_value: Any = None, *args, **kwargs) -> Any: """ Resilient version of call_method that handles disconnected graphs intelligently. Many graph algorithms fail on disconnected graphs. This method provides robust computation by applying different strategies for handling disconnected components, with graceful fallback to default values when computation fails. Args: graph: igraph Graph object to analyze. method_name: Name of the igraph method to call. return_format: Output format ("auto", "dict", "list", "raw"). component_mode: Strategy for disconnected graphs: - "all": Compute on all components separately - "largest": Compute only on largest component - "connected_only": Compute only on components with >1 vertex handle_disconnected: Whether to apply special disconnected graph handling. default_value: Value to return/use when computation fails (default: None). *args: Positional arguments for the igraph method. **kwargs: Keyword arguments for the igraph method. Returns: Any: Method result with appropriate disconnected graph handling and formatting. Examples: >>> # Safe diameter computation (fails on disconnected graphs normally) >>> diameter = grapher.call_method_safe(graph, "diameter", default_value=float('inf')) >>> # Betweenness centrality for all components >>> betweenness = grapher.call_method_safe( ... graph, "betweenness", "dict", ... component_mode="all", default_value=0.0 ... ) >>> # Average path length only for largest component >>> avg_path = grapher.call_method_safe( ... graph, "average_path_length", ... component_mode="largest", default_value=None ... ) >>> # Robust clustering coefficient >>> clustering = grapher.call_method_safe( ... graph, "transitivity_local_undirected", "dict", ... component_mode="connected_only", default_value=0.0 ... ) Note: - Automatically detects connectivity-sensitive methods - Provides meaningful results even for highly fragmented graphs - Maps component-level results back to full graph vertex space - Graceful degradation with informative logging - Essential for robust analysis pipelines """ try: if not hasattr(graph, method_name): raise IgraphMethodError(f"Graph does not have method '{method_name}'") # Methods that always work regardless of connectivity CONNECTIVITY_SAFE_METHODS = { 'degree', 'density', 'vcount', 'ecount', 'connected_components', 'transitivity_undirected', 'transitivity_local_undirected', 'is_connected' } # Methods that fail on disconnected graphs CONNECTIVITY_SENSITIVE_METHODS = { 'diameter', 'average_path_length', 'betweenness', 'closeness', 'shortest_paths', 'get_shortest_paths' } # If method is connectivity-safe or we're not handling disconnected graphs, use normal call if (method_name in CONNECTIVITY_SAFE_METHODS or not handle_disconnected): try: result = self.call_method_brutal(graph, method_name, return_format, *args, **kwargs) # Handle NaN values in the result return self._clean_nan_values(result, default_value) except Exception as e: if default_value is not None: return default_value raise # For connectivity-sensitive methods, check connectivity first connectivity_info = self.get_connectivity_info(graph) if connectivity_info['is_connected']: # Graph is connected - safe to compute normally result = self.call_method_brutal(graph, method_name, return_format, *args, **kwargs) return self._clean_nan_values(result, default_value) # Graph is disconnected - handle based on component_mode if component_mode == "largest": return self._compute_on_largest_component(graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs) elif component_mode == "all": return self._compute_on_all_components(graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs) elif component_mode == "connected_only": return self._compute_on_connected_components(graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs) else: raise ValueError( f"Invalid component_mode: {component_mode}. Must be 'largest', 'all', or 'connected_only'") except Exception as e: if default_value is not None: logging.warning(f"Method '{method_name}' failed: {e}. Returning default value: {default_value}") return default_value raise IgraphMethodError(f"Failed to call resilient method '{method_name}': {str(e)}")
def _clean_nan_values(self, result, default_value=0.0): """Clean NaN and inf values from results, replacing with default_value.""" if isinstance(result, (list, np.ndarray)): return [default_value if (isinstance(x, float) and (np.isnan(x) or np.isinf(x))) else x for x in result] elif isinstance(result, dict): return {k: (default_value if (isinstance(v, float) and (np.isnan(v) or np.isinf(v))) else v) for k, v in result.items()} elif isinstance(result, float) and (np.isnan(result) or np.isinf(result)): return default_value return result def _compute_on_largest_component(self, graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs): """Compute metric on largest component only.""" components = connectivity_info['components'] largest_component = max(components, key=len) if components else [] if len(largest_component) < 2: # Component too small for meaningful computation if return_format in ["list", "dict"]: return [default_value] * graph.vcount() if default_value is not None else [] return default_value try: # Create subgraph of largest component subgraph = graph.subgraph(largest_component) result = self.call_method_brutal(subgraph, method_name, "raw", *args, **kwargs) # Map result back to full graph if needed if return_format in ["list", "dict"] and isinstance(result, list): # Create full result array with default values full_result = [default_value] * graph.vcount() for i, vertex_idx in enumerate(largest_component): if i < len(result): full_result[vertex_idx] = result[i] if return_format == "dict": return {graph.vs[i]["id"]: full_result[i] for i in range(len(full_result))} return full_result return self._format_result(result, return_format, graph) except Exception as e: logging.warning(f"Computation on largest component failed: {e}") if return_format in ["list", "dict"]: return [default_value] * graph.vcount() if default_value is not None else [] return default_value def _compute_on_all_components(self, graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs): """Compute metric on all components separately.""" components = connectivity_info['components'] if return_format in ["list", "dict"]: full_result = [default_value] * graph.vcount() else: component_results = [] for component in components: if len(component) < 2: # Component too small - use default values if return_format in ["list", "dict"]: for vertex_idx in component: full_result[vertex_idx] = default_value else: component_results.append(default_value) continue try: # Create subgraph and compute metric subgraph = graph.subgraph(component) result = self.call_method_brutal(subgraph, method_name, "raw", *args, **kwargs) if return_format in ["list", "dict"]: # Map results back to full graph if isinstance(result, list): for i, vertex_idx in enumerate(component): if i < len(result): full_result[vertex_idx] = result[i] else: # Scalar result - apply to all nodes in component for vertex_idx in component: full_result[vertex_idx] = result else: component_results.append(result) except Exception as e: logging.warning(f"Computation on component failed: {e}") if return_format in ["list", "dict"]: for vertex_idx in component: full_result[vertex_idx] = default_value else: component_results.append(default_value) if return_format in ["list", "dict"]: if return_format == "dict": return {graph.vs[i]["id"]: full_result[i] for i in range(len(full_result))} return full_result else: return component_results def _compute_on_connected_components(self, graph, connectivity_info, method_name, return_format, default_value, *args, **kwargs): """Compute metric only on components with size > 1.""" components = connectivity_info['components'] connected_components = [comp for comp in components if len(comp) > 1] if not connected_components: # No connected components if return_format in ["list", "dict"]: result = [default_value] * graph.vcount() if return_format == "dict": return {graph.vs[i]["id"]: result[i] for i in range(len(result))} return result return default_value # Use the all components approach but only for connected ones modified_connectivity = dict(connectivity_info) modified_connectivity['components'] = connected_components return self._compute_on_all_components(graph, modified_connectivity, method_name, return_format, default_value, *args, **kwargs) def _format_result(self, result, return_format, graph): """Format result according to return_format.""" if return_format == "raw": return result elif return_format == "list": return list(result) if hasattr(result, '__iter__') and not isinstance(result, str) else [result] elif return_format == "dict": if isinstance(result, list) and len(result) == graph.vcount(): return {graph.vs[i]["id"]: result[i] for i in range(len(result))} else: return {"global": result} else: # auto return result
[docs] def compute_component_metrics(self, graph: Any, metrics_list: List[str], component_mode: str = "largest") -> Dict[str, Any]: """ Compute multiple graph metrics with consistent component handling. This method efficiently computes multiple metrics on the same graph with unified handling of disconnected components. Ideal for comprehensive graph analysis with consistent treatment of connectivity issues. Args: graph: igraph Graph object to analyze. metrics_list: List of metric names to compute. Examples: ['degree', 'betweenness', 'closeness', 'diameter', 'transitivity_undirected', 'average_path_length'] component_mode: Strategy for disconnected graphs ("all", "largest", "connected_only"). Returns: Dict[str, Any]: Dictionary with computed metrics: - connectivity_info: Detailed connectivity analysis - [metric_name]: Result for each requested metric - Failed metrics are set to None with warning logged Examples: >>> # Comprehensive analysis of a graph >>> metrics = grapher.compute_component_metrics( ... graph, ... ['degree', 'betweenness', 'closeness', 'diameter', 'transitivity_undirected'], ... component_mode="all" ... ) >>> >>> print(f"Graph diameter: {metrics['diameter']}") >>> print(f"Average degree: {np.mean(list(metrics['degree'].values()))}") >>> >>> # Check connectivity >>> if not metrics['connectivity_info']['is_connected']: ... print(f"Warning: Graph has {metrics['connectivity_info']['num_components']} components") >>> # Focus on largest component only >>> largest_metrics = grapher.compute_component_metrics( ... graph, ... ['average_path_length', 'diameter', 'betweenness'], ... component_mode="largest" ... ) Note: - Provides comprehensive analysis in a single call - Handles disconnected graphs gracefully - Includes connectivity analysis automatically - Failed metrics are logged but don't stop other computations - Efficient for multiple related metrics on same graph """ try: results = {} connectivity_info = self.get_connectivity_info(graph) # Add connectivity information results['connectivity_info'] = connectivity_info # Compute each metric for metric_name in metrics_list: try: result = self.call_method_safe( graph, metric_name, "auto", component_mode=component_mode, handle_disconnected=True, default_value=0.0 ) results[metric_name] = result except Exception as e: logging.warning(f"Failed to compute {metric_name}: {e}") results[metric_name] = None return results except Exception as e: raise IgraphMethodError(f"Failed to compute component metrics: {str(e)}")
[docs] @staticmethod def call_method_raw(graph: Any, method_name: str, *args, **kwargs) -> Any: """ Call any igraph method on the graph, returning unformatted output. This method provides direct access to igraph's methods without any processing or formatting of the results. Useful when you need the exact output format that igraph provides. Args: graph: igraph Graph object to operate on. method_name: Name of the igraph method to call. *args: Positional arguments for the method. **kwargs: Keyword arguments for the method. Returns: Any: Exact result from the igraph method call, no processing applied. Raises: IgraphMethodError: If method call fails or method doesn't exist. Examples: >>> # Get raw degree sequence >>> raw_degrees = Graphing.call_method_raw(graph, "degree") >>> print(type(raw_degrees)) # <class 'list'> >>> # Get raw connected components >>> raw_components = Graphing.call_method_raw(graph, "connected_components") >>> print(type(raw_components)) # igraph-specific type >>> # Call with parameters >>> raw_paths = Graphing.call_method_raw( ... graph, "shortest_paths", ... source=0, target=[1, 2, 3] ... ) Note: - No processing, formatting, or error handling beyond basic method call - Returns exactly what igraph provides (may be igraph-specific types) - Use when you need maximum control over the output format - Static method - can be called without Graphing instance """ return call_igraph_method(graph, method_name, *args, **kwargs)
[docs] def get_graph_info(self, graph: Any) -> GraphAnalysisResult: """ Get a lazy-loading analysis object for the graph. This method is the entry point for all graph analysis. It returns a powerful result object where metrics are computed on-demand, ensuring maximum performance and a responsive user experience. Args: graph: igraph Graph object to analyze. Returns: GraphAnalysisResult: An object for lazily accessing graph metrics. Examples: >>> # This call is instantaneous >>> results = grapher.get_graph_info(graph) >>> # The first access to a property computes the metric >>> print(f"Density: {results.density}") >>> # Subsequent access is instant (from cache) >>> print(f"Graph density is {results.density:.4f}") >>> # Compute advanced metrics on the fly >>> top_hubs = results.get_top_n_by('degree', n=3) >>> betweenness_stats = results.get_metric_stats('betweenness') """ if graph is None: raise GraphCreationError("Cannot get info for a None graph.") # The method is now just a factory. All computation is deferred. return GraphAnalysisResult(graph, self)