Event & Messaging Service
The Event & Messaging Service provides comprehensive asynchronous communication and event streaming capabilities across all Sindhan AI platform components. It enables loose coupling between services through event-driven architecture, supporting pub/sub messaging, event sourcing, and real-time data streaming.
Overview and Purpose
Event & Messaging is a critical infrastructure service that enables asynchronous communication patterns across the distributed platform. It provides reliable message delivery, event streaming, and real-time data processing capabilities that are essential for building resilient, scalable microservices architectures.
Key Benefits
- Loose Coupling: Services communicate through events without direct dependencies
- Scalable Architecture: Horizontal scaling through distributed messaging
- Reliable Delivery: At-least-once and exactly-once delivery guarantees
- Event Sourcing: Complete audit trail of all system events
- Real-Time Processing: Stream processing and real-time analytics
- Fault Tolerance: Built-in resilience and automatic recovery mechanisms
Implementation Status
| Phase | Status | Description |
|---|---|---|
| Phase 1 | ✅ Implemented | Apache Kafka, basic pub/sub, message queues, event routing |
| Phase 2 | 🚧 In Progress | Event sourcing, stream processing, dead letter queues, schema registry |
| Phase 3 | 📋 Planned | Advanced stream analytics, ML-powered event correlation, multi-region replication |
Current Version: v2.0.0 Next Release: v2.2.0 (Q2 2024)
Core Capabilities
1. Publish/Subscribe Messaging
- Topic-based message publishing and subscription
- Dynamic topic creation and management
- Message filtering and routing
- Broadcast and multicast messaging patterns
- Message ordering and partitioning
2. Message Queues and Routing
- Point-to-point message queues
- Priority queues and message scheduling
- Message routing based on content and metadata
- Dead letter queues for failed message processing
- Message transformation and enrichment
3. Event Sourcing and Streaming
- Complete event history storage and replay
- Event store with snapshotting capabilities
- Stream processing with windowing and aggregation
- Real-time event correlation and pattern matching
- Event-driven state management
4. Reliable Message Delivery
- At-least-once and exactly-once delivery semantics
- Message acknowledgment and retry mechanisms
- Duplicate detection and deduplication
- Message persistence and durability guarantees
- Transactional message publishing
5. Schema Management and Evolution
- Schema registry for message structure validation
- Schema versioning and backward compatibility
- Automatic schema evolution and migration
- Type-safe message serialization/deserialization
- Cross-language schema compatibility
6. Stream Processing and Analytics
- Real-time stream processing with Apache Kafka Streams
- Complex event processing (CEP) patterns
- Windowed aggregations and temporal analytics
- Stream joins and enrichment
- Event-driven machine learning pipelines
Architecture
Integration Patterns
Event Publishing and Subscription
import asyncio
import json
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import uuid
@dataclass
class Event:
event_type: str
event_version: str
source: str
data: Dict[str, Any]
timestamp: datetime = None
event_id: str = None
correlation_id: str = None
trace_id: str = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow()
if not self.event_id:
self.event_id = str(uuid.uuid4())
class EventPublisher:
def __init__(self, broker_config: Dict[str, Any]):
self.broker_config = broker_config
self.kafka_producer = None
self.schema_registry = SchemaRegistry(broker_config.get('schema_registry_url'))
self._initialize_producer()
def _initialize_producer(self):
"""Initialize Kafka producer with configuration"""
from kafka import KafkaProducer
self.kafka_producer = KafkaProducer(
bootstrap_servers=self.broker_config['kafka_servers'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8') if x else None,
acks='all', # Wait for all replicas to acknowledge
retries=3,
max_in_flight_requests_per_connection=1, # Ensure ordering
enable_idempotence=True, # Exactly-once semantics
compression_type='gzip'
)
async def publish_event(self, topic: str, event: Event,
partition_key: str = None) -> bool:
"""Publish event to specified topic"""
try:
# Validate event against schema
await self.schema_registry.validate_event(event)
# Serialize event
event_data = {
'event_id': event.event_id,
'event_type': event.event_type,
'event_version': event.event_version,
'source': event.source,
'timestamp': event.timestamp.isoformat(),
'correlation_id': event.correlation_id,
'trace_id': event.trace_id,
'data': event.data
}
# Publish to Kafka
future = self.kafka_producer.send(
topic,
value=event_data,
key=partition_key or event.event_id
)
# Wait for acknowledgment
record_metadata = future.get(timeout=10)
print(f"Event published to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
return True
except Exception as e:
print(f"Failed to publish event: {e}")
return False
async def publish_batch(self, topic: str, events: List[Event]) -> Dict[str, int]:
"""Publish multiple events in batch"""
results = {'success': 0, 'failed': 0}
for event in events:
success = await self.publish_event(topic, event)
if success:
results['success'] += 1
else:
results['failed'] += 1
return results
class EventSubscriber:
def __init__(self, broker_config: Dict[str, Any], group_id: str):
self.broker_config = broker_config
self.group_id = group_id
self.kafka_consumer = None
self.handlers = {}
self.running = False
self._initialize_consumer()
def _initialize_consumer(self):
"""Initialize Kafka consumer with configuration"""
from kafka import KafkaConsumer
self.kafka_consumer = KafkaConsumer(
bootstrap_servers=self.broker_config['kafka_servers'],
group_id=self.group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
auto_offset_reset='earliest',
enable_auto_commit=False, # Manual commit for better control
max_poll_records=10,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
def subscribe(self, topics: List[str]):
"""Subscribe to specified topics"""
self.kafka_consumer.subscribe(topics)
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler for specific event type"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
async def start_consuming(self):
"""Start consuming messages"""
self.running = True
try:
while self.running:
message_batch = self.kafka_consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
await self._process_message(message)
# Commit offsets after processing batch
self.kafka_consumer.commit()
except Exception as e:
print(f"Error in message consumption: {e}")
finally:
self.kafka_consumer.close()
async def _process_message(self, message):
"""Process individual message"""
try:
event_data = message.value
event_type = event_data.get('event_type')
if event_type in self.handlers:
# Create event object
event = Event(
event_type=event_data['event_type'],
event_version=event_data['event_version'],
source=event_data['source'],
data=event_data['data'],
timestamp=datetime.fromisoformat(event_data['timestamp']),
event_id=event_data['event_id'],
correlation_id=event_data.get('correlation_id'),
trace_id=event_data.get('trace_id')
)
# Call all registered handlers
for handler in self.handlers[event_type]:
await self._call_handler_safely(handler, event)
except Exception as e:
print(f"Error processing message: {e}")
# Send to dead letter queue
await self._send_to_dlq(message, str(e))
async def _call_handler_safely(self, handler: Callable, event: Event):
"""Call event handler with error handling"""
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
print(f"Handler error for {event.event_type}: {e}")
# Could implement retry logic here
async def _send_to_dlq(self, message, error_reason: str):
"""Send failed message to dead letter queue"""
dlq_topic = f"{message.topic}.dlq"
dlq_message = {
'original_message': message.value,
'error_reason': error_reason,
'failed_at': datetime.utcnow().isoformat(),
'topic': message.topic,
'partition': message.partition,
'offset': message.offset
}
# Publish to DLQ (implementation depends on DLQ setup)
# self.dlq_publisher.publish(dlq_topic, dlq_message)
# Usage example
broker_config = {
'kafka_servers': ['kafka1.sindhan.ai:9092', 'kafka2.sindhan.ai:9092'],
'schema_registry_url': 'http://schema-registry.sindhan.ai:8081'
}
# Publisher
publisher = EventPublisher(broker_config)
# Create and publish user registration event
user_registered_event = Event(
event_type='user.registered',
event_version='1.0',
source='user-service',
data={
'user_id': '12345',
'username': 'john_doe',
'email': 'john@example.com',
'registration_method': 'web'
},
correlation_id='req-abc-123'
)
await publisher.publish_event('user-events', user_registered_event)
# Subscriber
subscriber = EventSubscriber(broker_config, 'notification-service-group')
async def handle_user_registered(event: Event):
"""Handle user registration event"""
user_data = event.data
print(f"Sending welcome email to {user_data['email']}")
# Send welcome email
await send_welcome_email(user_data['email'], user_data['username'])
# Publish email sent event
email_sent_event = Event(
event_type='email.sent',
event_version='1.0',
source='notification-service',
data={
'user_id': user_data['user_id'],
'email_type': 'welcome',
'recipient': user_data['email']
},
correlation_id=event.correlation_id
)
await publisher.publish_event('notification-events', email_sent_event)
# Register handler and start consuming
subscriber.register_handler('user.registered', handle_user_registered)
subscriber.subscribe(['user-events'])
await subscriber.start_consuming()Event Sourcing Implementation
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
import json
from datetime import datetime
class EventStore:
def __init__(self, kafka_config: Dict[str, Any]):
self.kafka_config = kafka_config
self.producer = EventPublisher(kafka_config)
self.consumer = EventSubscriber(kafka_config, 'event-store-group')
self.snapshots = {}
async def append_event(self, aggregate_id: str, event: Event,
expected_version: int = None) -> bool:
"""Append event to aggregate stream"""
stream_name = f"aggregate-{aggregate_id}"
# Add aggregate metadata
event.data['aggregate_id'] = aggregate_id
event.data['aggregate_version'] = expected_version
try:
success = await self.producer.publish_event(stream_name, event, aggregate_id)
if success:
# Update local cache
if aggregate_id not in self.snapshots:
self.snapshots[aggregate_id] = []
self.snapshots[aggregate_id].append(event)
return success
except Exception as e:
print(f"Failed to append event: {e}")
return False
async def get_events(self, aggregate_id: str,
from_version: int = 0) -> List[Event]:
"""Get events for aggregate from specified version"""
# Check local cache first
if aggregate_id in self.snapshots:
cached_events = self.snapshots[aggregate_id]
return [e for e in cached_events if e.data.get('aggregate_version', 0) >= from_version]
# Query from Kafka (implementation would depend on Kafka consumer API)
# This is a simplified example
stream_name = f"aggregate-{aggregate_id}"
events = await self._query_kafka_stream(stream_name, from_version)
return events
async def create_snapshot(self, aggregate_id: str,
aggregate_state: Dict[str, Any],
version: int):
"""Create snapshot of aggregate state"""
snapshot_event = Event(
event_type='aggregate.snapshot',
event_version='1.0',
source='event-store',
data={
'aggregate_id': aggregate_id,
'aggregate_type': aggregate_state.get('type', 'unknown'),
'aggregate_version': version,
'state': aggregate_state
}
)
snapshot_stream = f"snapshots-{aggregate_id}"
await self.producer.publish_event(snapshot_stream, snapshot_event, aggregate_id)
class AggregateRoot(ABC):
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.version = 0
self.uncommitted_events = []
@abstractmethod
def apply_event(self, event: Event):
"""Apply event to aggregate state"""
pass
def raise_event(self, event: Event):
"""Raise new event"""
event.data['aggregate_id'] = self.aggregate_id
event.data['aggregate_version'] = self.version + 1
self.apply_event(event)
self.uncommitted_events.append(event)
self.version += 1
def mark_events_as_committed(self):
"""Mark all uncommitted events as committed"""
self.uncommitted_events.clear()
def load_from_history(self, events: List[Event]):
"""Reconstruct aggregate from event history"""
for event in events:
self.apply_event(event)
self.version = event.data.get('aggregate_version', self.version + 1)
# Example aggregate
class UserAggregate(AggregateRoot):
def __init__(self, user_id: str):
super().__init__(user_id)
self.email = None
self.username = None
self.status = 'inactive'
self.created_at = None
def register(self, username: str, email: str):
"""Register new user"""
if self.status != 'inactive':
raise ValueError("User already registered")
event = Event(
event_type='user.registered',
event_version='1.0',
source='user-aggregate',
data={
'username': username,
'email': email,
'registered_at': datetime.utcnow().isoformat()
}
)
self.raise_event(event)
def activate(self):
"""Activate user account"""
if self.status != 'inactive':
raise ValueError("User is not in inactive state")
event = Event(
event_type='user.activated',
event_version='1.0',
source='user-aggregate',
data={
'activated_at': datetime.utcnow().isoformat()
}
)
self.raise_event(event)
def apply_event(self, event: Event):
"""Apply event to aggregate state"""
event_type = event.event_type
if event_type == 'user.registered':
self.username = event.data['username']
self.email = event.data['email']
self.created_at = event.data['registered_at']
self.status = 'pending'
elif event_type == 'user.activated':
self.status = 'active'
elif event_type == 'user.deactivated':
self.status = 'inactive'
# Repository pattern for event sourcing
class EventSourcedRepository:
def __init__(self, event_store: EventStore):
self.event_store = event_store
async def get_by_id(self, aggregate_id: str,
aggregate_class: type) -> Optional[AggregateRoot]:
"""Get aggregate by ID"""
events = await self.event_store.get_events(aggregate_id)
if not events:
return None
aggregate = aggregate_class(aggregate_id)
aggregate.load_from_history(events)
return aggregate
async def save(self, aggregate: AggregateRoot):
"""Save aggregate events"""
for event in aggregate.uncommitted_events:
success = await self.event_store.append_event(
aggregate.aggregate_id,
event,
aggregate.version
)
if not success:
raise Exception(f"Failed to save event: {event.event_type}")
aggregate.mark_events_as_committed()
# Usage example
event_store = EventStore(broker_config)
user_repository = EventSourcedRepository(event_store)
# Create and register new user
user = UserAggregate('user-12345')
user.register('john_doe', 'john@example.com')
user.activate()
# Save user events
await user_repository.save(user)
# Load user from events
loaded_user = await user_repository.get_by_id('user-12345', UserAggregate)
print(f"User status: {loaded_user.status}") # Should be 'active'Implementation Roadmap
Phase 1: Foundation (Completed)
Status: ✅ Released v1.0.0
- Apache Kafka cluster setup and management
- Basic pub/sub messaging patterns
- Message queues with RabbitMQ integration
- Event routing and filtering
- Basic monitoring and alerting
- REST API for message publishing
Phase 2: Advanced Features (In Progress)
Status: 🚧 Target v2.2.0 - Q2 2024
- Event sourcing implementation
- Schema registry integration
- Stream processing with Kafka Streams
- Dead letter queue handling
- Message transformation and enrichment
- Advanced consumer group management
Phase 3: Intelligence and Analytics (Planned)
Status: 📋 Target v3.0.0 - Q3 2024
- Complex event processing (CEP)
- ML-powered event correlation and anomaly detection
- Real-time stream analytics and dashboards
- Multi-region event replication
- Event-driven machine learning pipelines
- Advanced message routing and orchestration
Benefits and Value
Architecture Benefits
- Loose Coupling: Services communicate through events without direct dependencies
- Scalability: Horizontal scaling through distributed messaging and partitioning
- Resilience: Fault tolerance through message persistence and retry mechanisms
- Flexibility: Easy to add new consumers without changing producers
Operational Benefits
- Audit Trail: Complete event history for debugging and compliance
- Real-Time Processing: Immediate response to business events
- Load Balancing: Distributed processing across multiple consumer instances
- Monitoring: Comprehensive visibility into message flows and processing
Business Benefits
- Event-Driven Insights: Real-time business intelligence and analytics
- Process Automation: Automated workflows triggered by business events
- Customer Experience: Real-time notifications and personalized experiences
- Data Integration: Seamless data flow between different business systems
Related Services
Direct Dependencies
- Configuration Management: Messaging service configuration and topic management
- Security & Authentication: Secure message publishing and consumption
- Platform Observability: Message flow monitoring and alerting
Service Integrations
- Service Discovery: Dynamic consumer registration and load balancing
- Data Persistence: Event store implementation and message persistence
- Workflow Orchestration: Event-driven workflow triggers
Consuming Services
- All Platform Services: Every service can publish and consume events
- AI Agents: Event-driven agent coordination and communication
- Analytics Services: Real-time data processing and business intelligence
- Notification Services: Event-triggered notifications and alerts
The Event & Messaging Service provides the communication backbone that enables all platform services to interact asynchronously, supporting scalable, resilient, and loosely-coupled architectures.