Source code for graphizy.streaming

"""
Real-time data streaming managers for Graphizy.

This module provides both thread-based and asyncio-based managers for handling
real-time data streams, processing them into graphs, and invoking callbacks.

.. moduleauthor:: Charles Fosseprez
.. contact:: charles.fosseprez.pro@gmail.com
.. license:: GPL-2.0-or-later
.. copyright:: Copyright (C) 2025 Charles Fosseprez
"""

import logging
import asyncio
import threading
from queue import Queue, Empty, Full
from typing import Callable, Any, TYPE_CHECKING
import numpy as np
import time

if TYPE_CHECKING:
    from .main import Graphing


[docs] class StreamManager: """Manages streaming data processing for real-time graph updates using threads.""" def __init__(self, grapher: 'Graphing', buffer_size: int = 1000, update_interval: float = 0.1, auto_memory: bool = True): self.grapher = grapher self.buffer_size = buffer_size self.update_interval = update_interval self.auto_memory = auto_memory self.data_queue = Queue(maxsize=buffer_size) self.is_streaming = False self.callbacks = [] # Performance metrics self.stats = { 'updates_processed': 0, 'avg_processing_time': 0, 'dropped_frames': 0 }
[docs] def add_callback(self, callback: Callable[[Any], None]) -> None: """Add a callback function to be called on each graph update.""" self.callbacks.append(callback)
[docs] def start_streaming(self) -> None: """Start the streaming processing thread.""" if self.is_streaming: return self.is_streaming = True self.stream_thread = threading.Thread(target=self._process_stream, daemon=True) self.stream_thread.start()
[docs] def stop_streaming(self) -> None: """Stop the streaming processing thread.""" self.is_streaming = False if hasattr(self, 'stream_thread'): self.stream_thread.join(timeout=1.0)
[docs] def push_data(self, data_points: np.ndarray) -> bool: """ Push new data to the streaming queue. Returns: bool: True if data was queued, False if the queue is full. """ try: self.data_queue.put_nowait(data_points) return True except Full: self.stats['dropped_frames'] += 1 return False
def _process_stream(self) -> None: """Main streaming processing loop, run in a separate thread.""" while self.is_streaming: try: # Get the latest data point, discarding older ones to reduce latency data = self.data_queue.get(timeout=self.update_interval) while not self.data_queue.empty(): try: data = self.data_queue.get_nowait() self.stats['dropped_frames'] += 1 except Empty: break # Process the latest data start_time = time.time() self._process_data_update(data) # Update performance stats processing_time = time.time() - start_time self.stats['updates_processed'] += 1 self.stats['avg_processing_time'] = ( (self.stats['avg_processing_time'] * (self.stats['updates_processed'] - 1) + processing_time) / self.stats['updates_processed'] ) except Empty: # This is expected when no new data arrives within the timeout continue except Exception as e: logging.error(f"Stream processing error: {e}") def _process_data_update(self, data_points: np.ndarray) -> None: """Process a single data update and invoke callbacks.""" try: # Auto-update graphs if configured if hasattr(self.grapher, 'graph_types'): graphs = self.grapher.update_graphs( data_points, use_memory=self.auto_memory, update_memory=self.auto_memory ) # Call registered callbacks for callback in self.callbacks: try: callback(graphs) except Exception as e: logging.error(f"Callback error: {e}") except Exception as e: logging.error(f"Data processing error: {e}")
[docs] class AsyncStreamManager: """Async version of StreamManager for high-throughput applications using asyncio.""" def __init__(self, grapher: 'Graphing', buffer_size: int = 1000): self.grapher = grapher self.data_queue = asyncio.Queue(maxsize=buffer_size) self.callbacks = [] self.processing_task = None self.is_streaming = False self.stats = { 'updates_processed': 0, 'dropped_frames': 0 }
[docs] def add_callback(self, callback: Callable[[Any], None]) -> None: """Add a callback function to be called on each graph update.""" self.callbacks.append(callback)
[docs] async def start_streaming(self) -> None: """Start the async streaming task.""" if self.is_streaming: return self.is_streaming = True self.processing_task = asyncio.create_task(self._process_stream())
[docs] async def stop_streaming(self) -> None: """Stop the async streaming task gracefully.""" self.is_streaming = False if self.processing_task: self.processing_task.cancel() try: await self.processing_task except asyncio.CancelledError: logging.info("Async streaming task stopped.") self.processing_task = None
[docs] async def push_data(self, data_points: np.ndarray) -> bool: """Push data asynchronously without blocking.""" try: self.data_queue.put_nowait(data_points) return True except asyncio.QueueFull: self.stats['dropped_frames'] += 1 return False
async def _process_stream(self) -> None: """Async processing loop.""" while self.is_streaming: try: # Wait for the next item, but with a timeout to remain responsive data = await asyncio.wait_for(self.data_queue.get(), timeout=0.1) # Discard older frames to process the most recent one while not self.data_queue.empty(): try: data = self.data_queue.get_nowait() self.stats['dropped_frames'] += 1 except asyncio.QueueEmpty: break await self._process_data_async(data) self.stats['updates_processed'] += 1 except asyncio.TimeoutError: continue # No data, just loop except asyncio.CancelledError: break # Exit loop on cancellation except Exception as e: logging.error(f"Async stream error: {e}") async def _process_data_async(self, data_points: np.ndarray) -> None: """Process data asynchronously and call callbacks.""" # In a real high-performance scenario, you might run this in an executor # for CPU-bound work. For consistency, we call it directly here. if hasattr(self.grapher, 'graph_types'): graphs = self.grapher.update_graphs(data_points, use_memory=True, update_memory=True) # Call callbacks for callback in self.callbacks: try: # If the callback is async, await it if asyncio.iscoroutinefunction(callback): await callback(graphs) else: callback(graphs) except Exception as e: logging.error(f"Async callback error: {e}")