Concurrent Flow Demo

This example demonstrates Routilux’s concurrent execution capabilities, showing how multiple routines can execute in parallel to improve performance.

Overview

The demo simulates a real-world scenario where data needs to be fetched from multiple sources, processed, and aggregated. Using concurrent execution, all data fetching operations run in parallel, significantly reducing total execution time.

Key Features Demonstrated

  • Unified Event Queue: Both sequential and concurrent modes use the same queue mechanism

  • Automatic Flow Detection: No need to pass flow parameter in emit() calls

  • Non-blocking emit(): Event emission returns immediately after enqueuing tasks

  • Concurrent execution strategy

  • Multiple parallel routines

  • Performance comparison (sequential vs concurrent)

  • Thread-safe state management

  • Error handling in concurrent execution

  • Serialization of concurrent flows

  • Dynamic strategy switching

  • wait_for_completion(): Proper waiting for async tasks

Example Code

  1#!/usr/bin/env python
  2"""
  3Concurrent Flow Execution Demo
  4
  5This demo demonstrates Routilux's concurrent execution capabilities.
  6It shows how multiple routines can execute in parallel using thread pools,
  7significantly improving performance for I/O-bound operations.
  8
  9Features demonstrated:
 10- Concurrent execution strategy
 11- Multiple parallel routines
 12- Dependency handling
 13- Performance comparison (sequential vs concurrent)
 14- Thread-safe state management
 15- Error handling in concurrent execution
 16- Serialization of concurrent flows
 17"""
 18
 19import json
 20import time
 21from typing import Any, Dict
 22
 23from routilux import ErrorHandler, ErrorStrategy, Flow, Routine
 24
 25# ============================================================================
 26# Routine Definitions
 27# ============================================================================
 28
 29
 30class DataFetcher(Routine):
 31    """Fetch data from multiple sources concurrently"""
 32
 33    def __init__(self):
 34        super().__init__()
 35        self.input_slot = self.define_slot("trigger", handler=self.fetch_data)
 36        self.output_event = self.define_event("data_fetched", ["data", "source", "timestamp"])
 37
 38    def configure(self, source_name: str = None, delay: float = 0.2):
 39        """Configure the fetcher"""
 40        self.set_config(source_name=source_name or "unknown", delay=delay)
 41
 42    @property
 43    def source_name(self):
 44        return self.get_config("source_name", "unknown")
 45
 46    @property
 47    def delay(self):
 48        return self.get_config("delay", 0.2)
 49
 50    def fetch_data(self, **kwargs):
 51        """Simulate fetching data from a source (I/O operation)"""
 52        # Execution state should be stored in JobState, not routine._stats
 53
 54        # Simulate network delay
 55        time.sleep(self.delay)
 56
 57        # Simulate fetching data
 58        data = {
 59            "source": self.source_name,
 60            "content": f"Data from {self.source_name}",
 61            "size": len(self.source_name) * 10,
 62            "fetched_at": time.time(),
 63        }
 64
 65        # Flow is automatically detected from routine context
 66        self.emit(
 67            "data_fetched",
 68            data=data,
 69            source=self.source_name,
 70            timestamp=time.time(),
 71        )
 72
 73
 74class DataProcessor(Routine):
 75    """Process fetched data"""
 76
 77    def __init__(self):
 78        super().__init__()
 79        self.input_slot = self.define_slot("data_input", handler=self.process_data)
 80        self.output_event = self.define_event(
 81            "data_processed", ["result", "processor_id", "processing_time"]
 82        )
 83
 84    def configure(self, processor_id: str = None):
 85        """Configure the processor"""
 86        self.set_config(processor_id=processor_id or "unknown")
 87
 88    @property
 89    def processor_id(self):
 90        return self.get_config("processor_id", "unknown")
 91
 92    def process_data(self, data: Dict[str, Any], source: str, timestamp: float):
 93        """Process the fetched data"""
 94        # Execution state should be stored in JobState, not routine._stats
 95
 96        start_time = time.time()
 97
 98        # Simulate processing (CPU-bound operation)
 99        processed = {
100            "original": data,

Running the Example

python examples/concurrent_flow_demo.py

Expected Output

The demo will show:

  1. Concurrent Execution Test: Demonstrates parallel execution of multiple data fetchers

  2. Performance Comparison: Shows execution time difference between sequential and concurrent modes

  3. Error Handling: Demonstrates error handling in concurrent scenarios

  4. Serialization: Shows that concurrent flows can be serialized and deserialized

  5. Strategy Switching: Demonstrates dynamic strategy changes

Performance Results

Typical performance improvements:

  • Sequential Execution: ~0.65-0.75 seconds for 3 parallel tasks

  • Concurrent Execution: ~0.25-0.30 seconds for the same tasks

  • Speedup: 2-3x faster with concurrent execution

The actual speedup depends on: - Number of parallel routines - I/O wait time - System resources - Thread pool size

Key Concepts

Concurrent Execution Strategy

When a flow is created with execution_strategy="concurrent", tasks are processed concurrently using a thread pool:

flow = Flow(
    execution_strategy="concurrent",
    max_workers=5
)

Unified Queue Mechanism: - Both sequential and concurrent modes use the same event queue - Sequential mode: max_workers=1 (one task at a time) - Concurrent mode: max_workers>1 (multiple tasks in parallel) - Tasks are processed fairly in queue order

Automatic Flow Detection: - emit() automatically detects flow from routine context - No need to pass flow parameter in most cases - Flow context is set automatically by Flow.execute() and Flow.resume()

def process_data(self, data=None, **kwargs):
    # Flow is automatically detected - no need to pass it!
    self.emit("output", result=f"Processed: {data}")

Thread Pool Management

The max_workers parameter controls the maximum number of concurrent threads:

flow = Flow(execution_strategy="concurrent", max_workers=10)

Thread Safety

All state updates are thread-safe: - Routine stats are protected - JobState updates are synchronized - Execution tracking is safe

Error Handling

Errors in concurrent execution are handled the same way as sequential execution:

flow.set_error_handler(ErrorHandler(strategy=ErrorStrategy.CONTINUE))
# Errors in one routine don't block others

Serialization

Concurrent flows can be serialized and deserialized:

# Serialize
data = flow.serialize()

# Deserialize
new_flow = Flow()
new_flow.deserialize(data)
# Execution strategy and max_workers are preserved

Waiting for Completion

In concurrent execution, tasks run asynchronously. Use JobState.wait_for_completion() to wait for all tasks:

from routilux.job_state import JobState

flow = Flow(execution_strategy="concurrent")
job_state = flow.execute("entry_routine")

# Wait for all concurrent tasks to complete
JobState.wait_for_completion(flow, job_state, timeout=10.0)

# Now all tasks are guaranteed to be finished

Resource Cleanup

Always properly shut down concurrent flows to clean up resources:

from routilux.job_state import JobState

flow = Flow(execution_strategy="concurrent")
try:
    job_state = flow.execute("entry_routine")
    JobState.wait_for_completion(flow, job_state, timeout=10.0)
finally:
    flow.shutdown(wait=True)  # Clean up thread pool

Use Cases

Concurrent execution is ideal for:

  • Multiple API Calls: Fetching data from multiple APIs simultaneously

  • Database Queries: Running multiple independent queries in parallel

  • File Processing: Processing multiple files concurrently

  • Network Operations: Any I/O-bound operations that can run in parallel

  • Data Aggregation: Collecting data from multiple sources simultaneously

Best Practices

  1. Choose Appropriate max_workers: Too many threads can cause overhead

  2. Use for I/O-bound Operations: Concurrent execution is most beneficial for I/O-bound tasks

  3. Handle Errors Properly: Use appropriate error handling strategies

  4. Monitor Performance: Use ExecutionTracker to monitor concurrent execution performance

  5. Test Both Strategies: Compare sequential and concurrent performance for your use case

  6. Wait for Completion: Always call wait_for_completion() after execution to ensure all tasks finish

  7. Clean Up Resources: Always call shutdown() when done with a concurrent flow, preferably in a try/finally block

See Also