🚀Transform your business with AI-powered process optimization
Infrastructure Services
Event & Messaging

Event & Messaging Service

The Event & Messaging Service provides comprehensive asynchronous communication and event streaming capabilities across all Sindhan AI platform components. It enables loose coupling between services through event-driven architecture, supporting pub/sub messaging, event sourcing, and real-time data streaming.

Overview and Purpose

Event & Messaging is a critical infrastructure service that enables asynchronous communication patterns across the distributed platform. It provides reliable message delivery, event streaming, and real-time data processing capabilities that are essential for building resilient, scalable microservices architectures.

Key Benefits

  • Loose Coupling: Services communicate through events without direct dependencies
  • Scalable Architecture: Horizontal scaling through distributed messaging
  • Reliable Delivery: At-least-once and exactly-once delivery guarantees
  • Event Sourcing: Complete audit trail of all system events
  • Real-Time Processing: Stream processing and real-time analytics
  • Fault Tolerance: Built-in resilience and automatic recovery mechanisms

Implementation Status

PhaseStatusDescription
Phase 1ImplementedApache Kafka, basic pub/sub, message queues, event routing
Phase 2🚧 In ProgressEvent sourcing, stream processing, dead letter queues, schema registry
Phase 3📋 PlannedAdvanced stream analytics, ML-powered event correlation, multi-region replication

Current Version: v2.0.0 Next Release: v2.2.0 (Q2 2024)

Core Capabilities

1. Publish/Subscribe Messaging

  • Topic-based message publishing and subscription
  • Dynamic topic creation and management
  • Message filtering and routing
  • Broadcast and multicast messaging patterns
  • Message ordering and partitioning

2. Message Queues and Routing

  • Point-to-point message queues
  • Priority queues and message scheduling
  • Message routing based on content and metadata
  • Dead letter queues for failed message processing
  • Message transformation and enrichment

3. Event Sourcing and Streaming

  • Complete event history storage and replay
  • Event store with snapshotting capabilities
  • Stream processing with windowing and aggregation
  • Real-time event correlation and pattern matching
  • Event-driven state management

4. Reliable Message Delivery

  • At-least-once and exactly-once delivery semantics
  • Message acknowledgment and retry mechanisms
  • Duplicate detection and deduplication
  • Message persistence and durability guarantees
  • Transactional message publishing

5. Schema Management and Evolution

  • Schema registry for message structure validation
  • Schema versioning and backward compatibility
  • Automatic schema evolution and migration
  • Type-safe message serialization/deserialization
  • Cross-language schema compatibility

6. Stream Processing and Analytics

  • Real-time stream processing with Apache Kafka Streams
  • Complex event processing (CEP) patterns
  • Windowed aggregations and temporal analytics
  • Stream joins and enrichment
  • Event-driven machine learning pipelines

Architecture

Integration Patterns

Event Publishing and Subscription

import asyncio
import json
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import uuid
 
@dataclass
class Event:
    event_type: str
    event_version: str
    source: str
    data: Dict[str, Any]
    timestamp: datetime = None
    event_id: str = None
    correlation_id: str = None
    trace_id: str = None
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.utcnow()
        if not self.event_id:
            self.event_id = str(uuid.uuid4())
 
class EventPublisher:
    def __init__(self, broker_config: Dict[str, Any]):
        self.broker_config = broker_config
        self.kafka_producer = None
        self.schema_registry = SchemaRegistry(broker_config.get('schema_registry_url'))
        self._initialize_producer()
    
    def _initialize_producer(self):
        """Initialize Kafka producer with configuration"""
        from kafka import KafkaProducer
        
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=self.broker_config['kafka_servers'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8'),
            key_serializer=lambda x: x.encode('utf-8') if x else None,
            acks='all',  # Wait for all replicas to acknowledge
            retries=3,
            max_in_flight_requests_per_connection=1,  # Ensure ordering
            enable_idempotence=True,  # Exactly-once semantics
            compression_type='gzip'
        )
    
    async def publish_event(self, topic: str, event: Event, 
                           partition_key: str = None) -> bool:
        """Publish event to specified topic"""
        try:
            # Validate event against schema
            await self.schema_registry.validate_event(event)
            
            # Serialize event
            event_data = {
                'event_id': event.event_id,
                'event_type': event.event_type,
                'event_version': event.event_version,
                'source': event.source,
                'timestamp': event.timestamp.isoformat(),
                'correlation_id': event.correlation_id,
                'trace_id': event.trace_id,
                'data': event.data
            }
            
            # Publish to Kafka
            future = self.kafka_producer.send(
                topic,
                value=event_data,
                key=partition_key or event.event_id
            )
            
            # Wait for acknowledgment
            record_metadata = future.get(timeout=10)
            
            print(f"Event published to {record_metadata.topic} "
                  f"partition {record_metadata.partition} "
                  f"offset {record_metadata.offset}")
            
            return True
            
        except Exception as e:
            print(f"Failed to publish event: {e}")
            return False
    
    async def publish_batch(self, topic: str, events: List[Event]) -> Dict[str, int]:
        """Publish multiple events in batch"""
        results = {'success': 0, 'failed': 0}
        
        for event in events:
            success = await self.publish_event(topic, event)
            if success:
                results['success'] += 1
            else:
                results['failed'] += 1
        
        return results
 
class EventSubscriber:
    def __init__(self, broker_config: Dict[str, Any], group_id: str):
        self.broker_config = broker_config
        self.group_id = group_id
        self.kafka_consumer = None
        self.handlers = {}
        self.running = False
        self._initialize_consumer()
    
    def _initialize_consumer(self):
        """Initialize Kafka consumer with configuration"""
        from kafka import KafkaConsumer
        
        self.kafka_consumer = KafkaConsumer(
            bootstrap_servers=self.broker_config['kafka_servers'],
            group_id=self.group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            auto_offset_reset='earliest',
            enable_auto_commit=False,  # Manual commit for better control
            max_poll_records=10,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000
        )
    
    def subscribe(self, topics: List[str]):
        """Subscribe to specified topics"""
        self.kafka_consumer.subscribe(topics)
    
    def register_handler(self, event_type: str, handler: Callable):
        """Register event handler for specific event type"""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    async def start_consuming(self):
        """Start consuming messages"""
        self.running = True
        
        try:
            while self.running:
                message_batch = self.kafka_consumer.poll(timeout_ms=1000)
                
                for topic_partition, messages in message_batch.items():
                    for message in messages:
                        await self._process_message(message)
                
                # Commit offsets after processing batch
                self.kafka_consumer.commit()
                
        except Exception as e:
            print(f"Error in message consumption: {e}")
        finally:
            self.kafka_consumer.close()
    
    async def _process_message(self, message):
        """Process individual message"""
        try:
            event_data = message.value
            event_type = event_data.get('event_type')
            
            if event_type in self.handlers:
                # Create event object
                event = Event(
                    event_type=event_data['event_type'],
                    event_version=event_data['event_version'],
                    source=event_data['source'],
                    data=event_data['data'],
                    timestamp=datetime.fromisoformat(event_data['timestamp']),
                    event_id=event_data['event_id'],
                    correlation_id=event_data.get('correlation_id'),
                    trace_id=event_data.get('trace_id')
                )
                
                # Call all registered handlers
                for handler in self.handlers[event_type]:
                    await self._call_handler_safely(handler, event)
            
        except Exception as e:
            print(f"Error processing message: {e}")
            # Send to dead letter queue
            await self._send_to_dlq(message, str(e))
    
    async def _call_handler_safely(self, handler: Callable, event: Event):
        """Call event handler with error handling"""
        try:
            if asyncio.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)
        except Exception as e:
            print(f"Handler error for {event.event_type}: {e}")
            # Could implement retry logic here
    
    async def _send_to_dlq(self, message, error_reason: str):
        """Send failed message to dead letter queue"""
        dlq_topic = f"{message.topic}.dlq"
        
        dlq_message = {
            'original_message': message.value,
            'error_reason': error_reason,
            'failed_at': datetime.utcnow().isoformat(),
            'topic': message.topic,
            'partition': message.partition,
            'offset': message.offset
        }
        
        # Publish to DLQ (implementation depends on DLQ setup)
        # self.dlq_publisher.publish(dlq_topic, dlq_message)
 
# Usage example
broker_config = {
    'kafka_servers': ['kafka1.sindhan.ai:9092', 'kafka2.sindhan.ai:9092'],
    'schema_registry_url': 'http://schema-registry.sindhan.ai:8081'
}
 
# Publisher
publisher = EventPublisher(broker_config)
 
# Create and publish user registration event
user_registered_event = Event(
    event_type='user.registered',
    event_version='1.0',
    source='user-service',
    data={
        'user_id': '12345',
        'username': 'john_doe',
        'email': 'john@example.com',
        'registration_method': 'web'
    },
    correlation_id='req-abc-123'
)
 
await publisher.publish_event('user-events', user_registered_event)
 
# Subscriber
subscriber = EventSubscriber(broker_config, 'notification-service-group')
 
async def handle_user_registered(event: Event):
    """Handle user registration event"""
    user_data = event.data
    print(f"Sending welcome email to {user_data['email']}")
    
    # Send welcome email
    await send_welcome_email(user_data['email'], user_data['username'])
    
    # Publish email sent event
    email_sent_event = Event(
        event_type='email.sent',
        event_version='1.0',
        source='notification-service',
        data={
            'user_id': user_data['user_id'],
            'email_type': 'welcome',
            'recipient': user_data['email']
        },
        correlation_id=event.correlation_id
    )
    
    await publisher.publish_event('notification-events', email_sent_event)
 
# Register handler and start consuming
subscriber.register_handler('user.registered', handle_user_registered)
subscriber.subscribe(['user-events'])
await subscriber.start_consuming()

Event Sourcing Implementation

from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
import json
from datetime import datetime
 
class EventStore:
    def __init__(self, kafka_config: Dict[str, Any]):
        self.kafka_config = kafka_config
        self.producer = EventPublisher(kafka_config)
        self.consumer = EventSubscriber(kafka_config, 'event-store-group')
        self.snapshots = {}
    
    async def append_event(self, aggregate_id: str, event: Event, 
                          expected_version: int = None) -> bool:
        """Append event to aggregate stream"""
        stream_name = f"aggregate-{aggregate_id}"
        
        # Add aggregate metadata
        event.data['aggregate_id'] = aggregate_id
        event.data['aggregate_version'] = expected_version
        
        try:
            success = await self.producer.publish_event(stream_name, event, aggregate_id)
            
            if success:
                # Update local cache
                if aggregate_id not in self.snapshots:
                    self.snapshots[aggregate_id] = []
                self.snapshots[aggregate_id].append(event)
            
            return success
            
        except Exception as e:
            print(f"Failed to append event: {e}")
            return False
    
    async def get_events(self, aggregate_id: str, 
                        from_version: int = 0) -> List[Event]:
        """Get events for aggregate from specified version"""
        # Check local cache first
        if aggregate_id in self.snapshots:
            cached_events = self.snapshots[aggregate_id]
            return [e for e in cached_events if e.data.get('aggregate_version', 0) >= from_version]
        
        # Query from Kafka (implementation would depend on Kafka consumer API)
        # This is a simplified example
        stream_name = f"aggregate-{aggregate_id}"
        events = await self._query_kafka_stream(stream_name, from_version)
        
        return events
    
    async def create_snapshot(self, aggregate_id: str, 
                             aggregate_state: Dict[str, Any], 
                             version: int):
        """Create snapshot of aggregate state"""
        snapshot_event = Event(
            event_type='aggregate.snapshot',
            event_version='1.0',
            source='event-store',
            data={
                'aggregate_id': aggregate_id,
                'aggregate_type': aggregate_state.get('type', 'unknown'),
                'aggregate_version': version,
                'state': aggregate_state
            }
        )
        
        snapshot_stream = f"snapshots-{aggregate_id}"
        await self.producer.publish_event(snapshot_stream, snapshot_event, aggregate_id)
 
class AggregateRoot(ABC):
    def __init__(self, aggregate_id: str):
        self.aggregate_id = aggregate_id
        self.version = 0
        self.uncommitted_events = []
    
    @abstractmethod
    def apply_event(self, event: Event):
        """Apply event to aggregate state"""
        pass
    
    def raise_event(self, event: Event):
        """Raise new event"""
        event.data['aggregate_id'] = self.aggregate_id
        event.data['aggregate_version'] = self.version + 1
        
        self.apply_event(event)
        self.uncommitted_events.append(event)
        self.version += 1
    
    def mark_events_as_committed(self):
        """Mark all uncommitted events as committed"""
        self.uncommitted_events.clear()
    
    def load_from_history(self, events: List[Event]):
        """Reconstruct aggregate from event history"""
        for event in events:
            self.apply_event(event)
            self.version = event.data.get('aggregate_version', self.version + 1)
 
# Example aggregate
class UserAggregate(AggregateRoot):
    def __init__(self, user_id: str):
        super().__init__(user_id)
        self.email = None
        self.username = None
        self.status = 'inactive'
        self.created_at = None
    
    def register(self, username: str, email: str):
        """Register new user"""
        if self.status != 'inactive':
            raise ValueError("User already registered")
        
        event = Event(
            event_type='user.registered',
            event_version='1.0',
            source='user-aggregate',
            data={
                'username': username,
                'email': email,
                'registered_at': datetime.utcnow().isoformat()
            }
        )
        
        self.raise_event(event)
    
    def activate(self):
        """Activate user account"""
        if self.status != 'inactive':
            raise ValueError("User is not in inactive state")
        
        event = Event(
            event_type='user.activated',
            event_version='1.0',
            source='user-aggregate',
            data={
                'activated_at': datetime.utcnow().isoformat()
            }
        )
        
        self.raise_event(event)
    
    def apply_event(self, event: Event):
        """Apply event to aggregate state"""
        event_type = event.event_type
        
        if event_type == 'user.registered':
            self.username = event.data['username']
            self.email = event.data['email']
            self.created_at = event.data['registered_at']
            self.status = 'pending'
        
        elif event_type == 'user.activated':
            self.status = 'active'
        
        elif event_type == 'user.deactivated':
            self.status = 'inactive'
 
# Repository pattern for event sourcing
class EventSourcedRepository:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    async def get_by_id(self, aggregate_id: str, 
                       aggregate_class: type) -> Optional[AggregateRoot]:
        """Get aggregate by ID"""
        events = await self.event_store.get_events(aggregate_id)
        
        if not events:
            return None
        
        aggregate = aggregate_class(aggregate_id)
        aggregate.load_from_history(events)
        
        return aggregate
    
    async def save(self, aggregate: AggregateRoot):
        """Save aggregate events"""
        for event in aggregate.uncommitted_events:
            success = await self.event_store.append_event(
                aggregate.aggregate_id,
                event,
                aggregate.version
            )
            
            if not success:
                raise Exception(f"Failed to save event: {event.event_type}")
        
        aggregate.mark_events_as_committed()
 
# Usage example
event_store = EventStore(broker_config)
user_repository = EventSourcedRepository(event_store)
 
# Create and register new user
user = UserAggregate('user-12345')
user.register('john_doe', 'john@example.com')
user.activate()
 
# Save user events
await user_repository.save(user)
 
# Load user from events
loaded_user = await user_repository.get_by_id('user-12345', UserAggregate)
print(f"User status: {loaded_user.status}")  # Should be 'active'

Implementation Roadmap

Phase 1: Foundation (Completed)

Status: ✅ Released v1.0.0

  • Apache Kafka cluster setup and management
  • Basic pub/sub messaging patterns
  • Message queues with RabbitMQ integration
  • Event routing and filtering
  • Basic monitoring and alerting
  • REST API for message publishing

Phase 2: Advanced Features (In Progress)

Status: 🚧 Target v2.2.0 - Q2 2024

  • Event sourcing implementation
  • Schema registry integration
  • Stream processing with Kafka Streams
  • Dead letter queue handling
  • Message transformation and enrichment
  • Advanced consumer group management

Phase 3: Intelligence and Analytics (Planned)

Status: 📋 Target v3.0.0 - Q3 2024

  • Complex event processing (CEP)
  • ML-powered event correlation and anomaly detection
  • Real-time stream analytics and dashboards
  • Multi-region event replication
  • Event-driven machine learning pipelines
  • Advanced message routing and orchestration

Benefits and Value

Architecture Benefits

  • Loose Coupling: Services communicate through events without direct dependencies
  • Scalability: Horizontal scaling through distributed messaging and partitioning
  • Resilience: Fault tolerance through message persistence and retry mechanisms
  • Flexibility: Easy to add new consumers without changing producers

Operational Benefits

  • Audit Trail: Complete event history for debugging and compliance
  • Real-Time Processing: Immediate response to business events
  • Load Balancing: Distributed processing across multiple consumer instances
  • Monitoring: Comprehensive visibility into message flows and processing

Business Benefits

  • Event-Driven Insights: Real-time business intelligence and analytics
  • Process Automation: Automated workflows triggered by business events
  • Customer Experience: Real-time notifications and personalized experiences
  • Data Integration: Seamless data flow between different business systems

Related Services

Direct Dependencies

Service Integrations

Consuming Services

  • All Platform Services: Every service can publish and consume events
  • AI Agents: Event-driven agent coordination and communication
  • Analytics Services: Real-time data processing and business intelligence
  • Notification Services: Event-triggered notifications and alerts

The Event & Messaging Service provides the communication backbone that enables all platform services to interact asynchronously, supporting scalable, resilient, and loosely-coupled architectures.