Source code for graphizy.analysis

# src/graphizy/analysis.py

"""
Analysis result objects for Graphizy.

This module provides classes that encapsulate the results of graph analysis,
offering a more intuitive, object-oriented API for accessing and exploring
graph metrics.

.. moduleauthor:: Charles Fosseprez
.. contact:: charles.fosseprez.pro@gmail.com
.. license:: GPL2 or later
.. copyright:: Copyright (C) 2025 Charles Fosseprez
"""

import igraph as ig
import numpy as np
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Tuple
from dataclasses import dataclass

from .exceptions import IgraphMethodError

if TYPE_CHECKING:
    from .main import Graphing  # To avoid circular import, for type hinting only


[docs] @dataclass class PercolationResult: """Results from percolation analysis""" interaction_ranges: List[float] largest_cluster_sizes: List[int] total_clusters: List[int] percolation_probabilities: List[float] critical_range: Optional[float]
[docs] def get_critical_probability(self) -> float: """Get percolation probability at critical range""" if self.critical_range is None: return 0.0 # Find closest range to critical closest_idx = min(range(len(self.interaction_ranges)), key=lambda i: abs(self.interaction_ranges[i] - self.critical_range)) return self.percolation_probabilities[closest_idx]
[docs] @dataclass class SocialRole: """Individual social role classification""" roles: List[str] stats: Dict[str, float]
[docs] def is_bridge(self) -> bool: return 'bridge' in self.roles
[docs] def is_hub(self) -> bool: return 'hub' in self.roles
[docs] def is_peripheral(self) -> bool: return 'peripheral' in self.roles
[docs] @dataclass class AccessibilityResult: """Results from accessibility analysis""" service_type: str service_distance: float population_count: int service_count: int coverage_statistics: Dict[str, float] underserved_areas: List[Dict]
[docs] def get_coverage_percentage(self) -> float: return self.coverage_statistics.get('served_percentage', 0.0)
[docs] def get_equity_score(self) -> float: """Calculate spatial equity score (higher = more equitable)""" served_pct = self.coverage_statistics.get('served_percentage', 0.0) well_served_pct = self.coverage_statistics.get('well_served_percentage', 0.0) # Balance between basic coverage and quality of coverage return (served_pct * 0.6 + well_served_pct * 0.4) / 100.0
[docs] class PercolationAnalyzer: """ Analyzer for percolation phenomena and critical behavior in spatial networks. This class provides tools for studying phase transitions, cluster formation, and critical thresholds in spatial systems. """ def __init__(self, grapher: 'Graphing'): self.grapher = grapher
[docs] def analyze_percolation_threshold(self, positions: np.ndarray, interaction_ranges: List[float]) -> PercolationResult: """ Analyze percolation behavior as a function of interaction range. Args: positions: Array of particle positions in format [id, x, y] interaction_ranges: List of interaction ranges to test Returns: PercolationResult with threshold analysis """ largest_cluster_sizes = [] total_clusters = [] percolation_probabilities = [] for interaction_range in interaction_ranges: try: # Create proximity graph graph = self.grapher.make_graph("proximity", positions, proximity_thresh=interaction_range) if graph.vcount() > 0: # Find connected components (clusters) clusters = graph.connected_components() cluster_sizes = [len(cluster) for cluster in clusters] largest_cluster = max(cluster_sizes) if cluster_sizes else 0 largest_cluster_sizes.append(largest_cluster) total_clusters.append(len(cluster_sizes)) # Percolation probability percolation_prob = largest_cluster / len(positions) percolation_probabilities.append(percolation_prob) else: largest_cluster_sizes.append(0) total_clusters.append(0) percolation_probabilities.append(0) except Exception: largest_cluster_sizes.append(0) total_clusters.append(0) percolation_probabilities.append(0) # Estimate critical threshold critical_range = self._estimate_critical_threshold( interaction_ranges, percolation_probabilities ) return PercolationResult( interaction_ranges=interaction_ranges, largest_cluster_sizes=largest_cluster_sizes, total_clusters=total_clusters, percolation_probabilities=percolation_probabilities, critical_range=critical_range )
def _estimate_critical_threshold(self, ranges: List[float], probabilities: List[float]) -> Optional[float]: """Estimate critical percolation threshold from steepest gradient""" if not probabilities or max(probabilities) < 0.1: return ranges[-1] if ranges else None # Find steepest increase in percolation probability derivatives = np.gradient(probabilities, ranges) critical_idx = np.argmax(derivatives) return ranges[critical_idx]
[docs] def detect_phase_transition(self, result: PercolationResult, gradient_threshold: float = 0.1) -> Dict[str, Any]: """ Detect phase transition characteristics from percolation results. Args: result: PercolationResult from analyze_percolation_threshold gradient_threshold: Minimum gradient to identify transition Returns: Dictionary with phase transition analysis """ if not result.percolation_probabilities: return {'has_transition': False} # Calculate gradient gradients = np.gradient(result.percolation_probabilities, result.interaction_ranges) max_gradient_idx = np.argmax(gradients) max_gradient = gradients[max_gradient_idx] has_transition = max_gradient > gradient_threshold return { 'has_transition': has_transition, 'transition_range': result.interaction_ranges[max_gradient_idx] if has_transition else None, 'transition_sharpness': max_gradient, 'pre_transition_prob': result.percolation_probabilities[max_gradient_idx-1] if max_gradient_idx > 0 else 0, 'post_transition_prob': result.percolation_probabilities[max_gradient_idx+1] if max_gradient_idx < len(result.percolation_probabilities)-1 else 1 }
[docs] class SocialNetworkAnalyzer: """ Analyzer for social network patterns and role identification. This class provides tools for identifying social roles, tracking temporal changes in network position, and analyzing social dynamics. """ def __init__(self, grapher: 'Graphing'): self.grapher = grapher
[docs] def identify_social_roles(self, graph: ig.Graph, betweenness_threshold: float = 0.8, degree_threshold: float = 0.8) -> Dict[int, SocialRole]: """ Identify social roles based on network centrality measures. Args: graph: igraph Graph object betweenness_threshold: Percentile threshold for bridge identification degree_threshold: Percentile threshold for hub identification Returns: Dictionary mapping node IDs to SocialRole objects """ if graph.vcount() == 0: return {} social_roles = {} try: # Calculate centrality measures betweenness = graph.betweenness() degree = graph.degree() # Calculate thresholds betweenness_cutoff = np.percentile(betweenness, betweenness_threshold * 100) degree_cutoff = np.percentile(degree, degree_threshold * 100) # Assign roles for i, vertex in enumerate(graph.vs): node_id = vertex["id"] if "id" in vertex.attributes() else i roles = [] stats = { 'betweenness': betweenness[i], 'degree': degree[i] } # Bridge/Broker: High betweenness centrality if betweenness[i] >= betweenness_cutoff: roles.append('bridge') # Hub/Popular: High degree centrality if degree[i] >= degree_cutoff: roles.append('hub') # Peripheral: Low on all measures if (betweenness[i] < np.percentile(betweenness, 20) and degree[i] < np.percentile(degree, 20)): roles.append('peripheral') social_roles[node_id] = SocialRole( roles=roles if roles else ['regular'], stats=stats ) except Exception: # Return empty roles if calculation fails for i, vertex in enumerate(graph.vs): node_id = vertex["id"] if "id" in vertex.attributes() else i social_roles[node_id] = SocialRole( roles=['regular'], stats={'betweenness': 0.0, 'degree': 0.0} ) return social_roles
[docs] def track_temporal_roles(self, graph_sequence: List[ig.Graph]) -> Dict[int, Dict[str, List]]: """ Track how social roles evolve over time. Args: graph_sequence: List of graphs representing temporal sequence Returns: Dictionary with temporal role tracking for each individual """ temporal_roles = {} for timestep, graph in enumerate(graph_sequence): roles = self.identify_social_roles(graph) for node_id, role in roles.items(): if node_id not in temporal_roles: temporal_roles[node_id] = { 'timesteps': [], 'roles': [], 'betweenness': [], 'degree': [] } temporal_roles[node_id]['timesteps'].append(timestep) temporal_roles[node_id]['roles'].append(role.roles) temporal_roles[node_id]['betweenness'].append(role.stats['betweenness']) temporal_roles[node_id]['degree'].append(role.stats['degree']) return temporal_roles
[docs] def get_role_stability(self, temporal_roles: Dict[int, Dict[str, List]]) -> Dict[int, float]: """ Calculate role stability for each individual. Args: temporal_roles: Output from track_temporal_roles Returns: Dictionary mapping node IDs to stability scores (0-1) """ stability_scores = {} for node_id, data in temporal_roles.items(): if len(data['roles']) < 2: stability_scores[node_id] = 1.0 continue # Count role transitions transitions = 0 prev_roles = set(data['roles'][0]) for current_roles in data['roles'][1:]: current_roles_set = set(current_roles) if current_roles_set != prev_roles: transitions += 1 prev_roles = current_roles_set # Stability = 1 - (transitions / max_possible_transitions) max_transitions = len(data['roles']) - 1 stability_scores[node_id] = 1.0 - (transitions / max_transitions) if max_transitions > 0 else 1.0 return stability_scores
[docs] class AccessibilityAnalyzer: """ Analyzer for spatial accessibility and service coverage. This class provides tools for analyzing urban accessibility, service coverage, and spatial equity in service distribution. """ def __init__(self, grapher: 'Graphing'): self.grapher = grapher
[docs] def analyze_service_accessibility(self, population_points: np.ndarray, service_points: np.ndarray, service_type: str, service_distance: float) -> AccessibilityResult: """ Analyze accessibility to urban services. Args: population_points: Array of population locations [id, x, y] service_points: Array of service locations [id, x, y] service_type: Type of service being analyzed service_distance: Maximum acceptable distance to service Returns: AccessibilityResult with coverage analysis """ served_count = 0 well_served_count = 0 underserved_areas = [] for pop_point in population_points: accessible_services = 0 # Check distance to each service for service_point in service_points: distance = np.linalg.norm(pop_point[1:3] - service_point[1:3]) if distance <= service_distance: accessible_services += 1 if accessible_services > 0: served_count += 1 if accessible_services >= 2: well_served_count += 1 if accessible_services == 0: underserved_areas.append({ 'position': pop_point[1:3].tolist(), 'accessible_services': accessible_services, 'id': pop_point[0] }) # Coverage statistics coverage_statistics = { 'served_population': served_count, 'served_percentage': (served_count / len(population_points)) * 100, 'well_served_population': well_served_count, 'well_served_percentage': (well_served_count / len(population_points)) * 100, 'underserved_count': len(underserved_areas) } return AccessibilityResult( service_type=service_type, service_distance=service_distance, population_count=len(population_points), service_count=len(service_points), coverage_statistics=coverage_statistics, underserved_areas=underserved_areas )
[docs] def compare_accessibility(self, results_list: List[AccessibilityResult]) -> Dict[str, Any]: """ Compare accessibility across multiple services or scenarios. Args: results_list: List of AccessibilityResult objects Returns: Dictionary with comparative analysis """ comparison = { 'services': [], 'coverage_percentages': [], 'equity_scores': [], 'best_service': None, 'worst_service': None, 'average_coverage': 0.0 } for result in results_list: comparison['services'].append(result.service_type) comparison['coverage_percentages'].append(result.get_coverage_percentage()) comparison['equity_scores'].append(result.get_equity_score()) if comparison['coverage_percentages']: best_idx = np.argmax(comparison['coverage_percentages']) worst_idx = np.argmin(comparison['coverage_percentages']) comparison['best_service'] = comparison['services'][best_idx] comparison['worst_service'] = comparison['services'][worst_idx] comparison['average_coverage'] = np.mean(comparison['coverage_percentages']) return comparison
[docs] def identify_service_gaps(self, accessibility_result: AccessibilityResult, cluster_distance: float = 200.0) -> List[Dict]: """ Identify spatial clusters of underserved areas (service gaps). Args: accessibility_result: Result from analyze_service_accessibility cluster_distance: Distance threshold for clustering underserved areas Returns: List of service gap clusters with statistics """ if not accessibility_result.underserved_areas: return [] # Extract positions of underserved areas underserved_positions = np.array([ [i] + area['position'] for i, area in enumerate(accessibility_result.underserved_areas) ]) try: # Create graph of underserved areas gap_graph = self.grapher.make_graph("proximity", underserved_positions, proximity_thresh=cluster_distance) if gap_graph.vcount() == 0: return [] # Find clusters of underserved areas clusters = gap_graph.connected_components() service_gaps = [] for i, cluster in enumerate(clusters): cluster_positions = [underserved_positions[idx][1:3] for idx in cluster] cluster_center = np.mean(cluster_positions, axis=0) service_gaps.append({ 'gap_id': i, 'size': len(cluster), 'center': cluster_center.tolist(), 'severity': len(cluster) / len(accessibility_result.underserved_areas), 'affected_population_ids': [accessibility_result.underserved_areas[idx]['id'] for idx in cluster] }) # Sort by severity (largest gaps first) service_gaps.sort(key=lambda x: x['severity'], reverse=True) return service_gaps except Exception: # Fallback: treat each underserved area as individual gap return [ { 'gap_id': i, 'size': 1, 'center': area['position'], 'severity': 1 / len(accessibility_result.underserved_areas), 'affected_population_ids': [area['id']] } for i, area in enumerate(accessibility_result.underserved_areas) ]
[docs] class GraphAnalysisResult: """ A lazy-loading object holding the results of a graph analysis. This object behaves like both a standard object (e.g., `results.density`) and a dictionary (e.g., `results['density']`), providing maximum flexibility. Metrics are computed on-demand the first time they are accessed and then cached for subsequent calls. """ def __init__(self, graph: ig.Graph, grapher: 'Graphing'): """ Initialize the result object. This is a lightweight operation. Args: graph: The igraph.Graph object to be analyzed. grapher: The Graphing instance used for the analysis. """ self._graph = graph self._grapher = grapher self._metric_cache: Dict[str, Any] = {} # Initialize advanced analyzers self._percolation_analyzer = None self._social_analyzer = None self._accessibility_analyzer = None # --- Advanced Analysis Properties --- @property def percolation_analyzer(self) -> PercolationAnalyzer: """Get percolation analyzer instance (lazy-loaded)""" if self._percolation_analyzer is None: self._percolation_analyzer = PercolationAnalyzer(self._grapher) return self._percolation_analyzer @property def social_analyzer(self) -> SocialNetworkAnalyzer: """Get social network analyzer instance (lazy-loaded)""" if self._social_analyzer is None: self._social_analyzer = SocialNetworkAnalyzer(self._grapher) return self._social_analyzer @property def accessibility_analyzer(self) -> AccessibilityAnalyzer: """Get accessibility analyzer instance (lazy-loaded)""" if self._accessibility_analyzer is None: self._accessibility_analyzer = AccessibilityAnalyzer(self._grapher) return self._accessibility_analyzer # --- Properties for common metrics (computed lazily) --- @property def vertex_count(self) -> int: """Returns the number of vertices. (Cached on first access)""" return self.get_metric('vcount', default_value=0) @property def edge_count(self) -> int: """Returns the number of edges. (Cached on first access)""" return self.get_metric('ecount', default_value=0) @property def density(self) -> float: """Returns the graph density. (Cached on first access)""" return self._grapher.density(self._graph) @property def is_connected(self) -> bool: """Returns True if the graph is fully connected. (Cached on first access)""" return self.get_metric('is_connected', default_value=False) @property def num_components(self) -> int: """Returns the number of disconnected components. (Cached on first access)""" components = self.get_metric('connected_components', return_format='raw') return len(components) if components else 0 @property def average_path_length(self) -> Optional[float]: """Returns the average shortest path length of the largest component. (Cached)""" return self.get_metric('average_path_length', component_mode="largest", default_value=None) @property def diameter(self) -> Optional[int]: """Returns the diameter of the largest component. (Cached)""" return self.get_metric('diameter', component_mode="largest", default_value=None) @property def transitivity(self) -> Optional[float]: """Returns the global clustering coefficient (transitivity). (Cached)""" return self.get_metric('transitivity_undirected', default_value=None) # --- Core On-the-fly Metric Computation ---
[docs] def get_metric(self, metric_name: str, **kwargs) -> Any: """ Computes any igraph metric on the fly using the robust `call_method_safe`. Results are cached to avoid re-computation. For dictionary results, entries with `None` values are filtered out. """ cache_key = f"{metric_name}_{sorted(kwargs.items())}" if cache_key in self._metric_cache: return self._metric_cache[cache_key] result = self._grapher.call_method_safe(self._graph, metric_name, **kwargs) # For per-vertex metrics that can fail on some nodes (e.g., pagerank # on disconnected graphs), filter out None values for robustness. if kwargs.get('return_format') == 'dict' and isinstance(result, dict): result = {k: v for k, v in result.items() if v is not None} self._metric_cache[cache_key] = result return result
# --- Helper Methods for Common Statistical Tasks ---
[docs] def get_top_n_by(self, metric_name: str, n: int = 5, **kwargs) -> List[tuple]: """ Returns the top N nodes sorted by a given per-vertex metric. Handles None values by treating them as the lowest possible value. """ # First, check if the metric_name is a direct vertex attribute. if metric_name in self._graph.vs.attributes(): # It's a vertex attribute. Create the dictionary manually. # We use the 'name' attribute which is set to the ID for good summaries. ids = self._graph.vs["name"] values = self._graph.vs[metric_name] metric_dict = dict(zip(ids, values)) else: # If not, assume it's a computable metric (igraph method). kwargs['return_format'] = 'dict' metric_dict = self.get_metric(metric_name, **kwargs) if not isinstance(metric_dict, dict): raise TypeError(f"Metric '{metric_name}' did not return a dictionary.") # Filter out non-numeric values before sorting for robustness valid_items = [ item for item in metric_dict.items() if isinstance(item[1], (int, float, np.number)) ] sorted_items = sorted( valid_items, key=lambda item: item[1] if item[1] is not None else -float('inf'), reverse=True ) return sorted_items[:n]
[docs] def get_metric_stats(self, metric_name: str, **kwargs) -> Dict[str, float]: """ Computes descriptive statistics for a numeric metric. Handles None values by ignoring them in the calculation. """ # First, check if the metric_name is a direct vertex attribute. if metric_name in self._graph.vs.attributes(): values = self._graph.vs[metric_name] else: # If not, assume it's a computable metric (igraph method). kwargs['return_format'] = 'list' values = self.get_metric(metric_name, **kwargs) if not isinstance(values, list): raise TypeError(f"Metric '{metric_name}' did not return a list of values.") # Ensure we only process numeric types, ignoring strings or other objects. numeric_values = [ v for v in values if v is not None and isinstance(v, (int, float, np.number)) ] if not numeric_values: return {'mean': 0.0, 'std': 0.0, 'min': 0.0, 'max': 0.0, 'median': 0.0, 'count': 0} values_arr = np.array(numeric_values) return { 'mean': float(np.mean(values_arr)), 'std': float(np.std(values_arr)), 'min': float(np.min(values_arr)), 'max': float(np.max(values_arr)), 'median': float(np.median(values_arr)), 'count': len(values_arr) }
# --- Dictionary-like Access & Representation ---
[docs] def summary(self) -> str: """Provides a clean, readable summary of the key metrics.""" lines = [ f"Graph Analysis Summary:", f" - Vertices: {self.vertex_count}", f" - Edges: {self.edge_count}", f" - Density: {self.density:.4f}", f" - Connected: {self.is_connected}", ] if not self.is_connected: lines.append(f" - Components: {self.num_components}") if self.average_path_length is not None: lines.append(f" - Avg. Path Length (largest comp): {self.average_path_length:.2f}") if self.diameter is not None: lines.append(f" - Diameter (largest comp): {self.diameter}") if self.transitivity is not None: lines.append(f" - Clustering (Transitivity): {self.transitivity:.4f}") return "\n".join(lines)
def __str__(self) -> str: return self.summary() def __repr__(self) -> str: return f"<GraphAnalysisResult: {self.vertex_count} vertices, {self.edge_count} edges>" def __getitem__(self, key: str) -> Any: """Allows dictionary-style access, e.g., `results['density']`.""" if hasattr(self, key): return getattr(self, key) # Check the cache directly before calling get_metric. cache_key = f"{key}_{sorted({}.items())}" if cache_key in self._metric_cache: return self._metric_cache[cache_key] try: # If not a property and not in cache, compute it. return self.get_metric(key) except IgraphMethodError as e: raise KeyError(f"Metric or property '{key}' not found.") from e def __contains__(self, key: str) -> bool: """ Allows using the 'in' operator, e.g., `'density' in results`. This is primarily for backward compatibility with tests. """ # Check if it's a defined property on the class. return hasattr(self, key)