Workflow Orchestration Service
The Workflow Orchestration Service provides comprehensive process automation and complex workflow management capabilities across all Sindhan AI platform components. It enables the coordination of multi-step business processes, long-running operations, and complex distributed transactions through declarative workflow definitions.
Overview and Purpose
Workflow Orchestration is a critical infrastructure service that coordinates complex business processes across multiple services and systems. It provides declarative workflow definition, state management, error handling, and monitoring capabilities that enable the automation of sophisticated business logic and operational processes.
Key Benefits
- Process Automation: Automated execution of complex multi-step business processes
- Declarative Workflows: Visual workflow design and configuration-driven execution
- State Management: Reliable state persistence and recovery for long-running processes
- Error Handling: Comprehensive error handling, retry logic, and compensation patterns
- Scalable Execution: Distributed workflow execution with horizontal scaling
- Monitoring & Analytics: Complete visibility into workflow performance and outcomes
Implementation Status
| Phase | Status | Description |
|---|---|---|
| Phase 1 | 📋 Planned | Basic workflow engine, simple task orchestration, state management |
| Phase 2 | 📋 Planned | Advanced workflow patterns, error handling, compensation, monitoring |
| Phase 3 | 📋 Planned | AI-powered workflow optimization, dynamic routing, predictive analytics |
Current Version: v0.5.0 (Preview) Next Release: v1.0.0 (Q3 2024)
Core Capabilities
1. Workflow Definition and Management
- YAML/JSON-based workflow definitions
- Visual workflow designer and editor
- Workflow versioning and lifecycle management
- Template-based workflow creation
- Workflow validation and testing frameworks
2. Task Orchestration and Execution
- Sequential and parallel task execution
- Conditional branching and decision points
- Loop and iteration support
- Task dependency management
- Dynamic task generation and execution
3. State Management and Persistence
- Workflow state persistence and recovery
- Checkpoint and rollback mechanisms
- State sharing between workflow steps
- Long-running workflow support
- State machine pattern implementation
4. Error Handling and Resilience
- Comprehensive error handling strategies
- Retry policies with backoff algorithms
- Circuit breaker patterns for external services
- Compensation and rollback mechanisms
- Dead letter queue integration
5. Integration and Connectivity
- REST API and webhook integrations
- Message queue and event-driven triggers
- Database and file system operations
- Third-party service integrations
- Custom task plugin framework
6. Monitoring and Analytics
- Real-time workflow execution monitoring
- Performance metrics and analytics
- Workflow success/failure tracking
- Resource utilization monitoring
- Business process intelligence
Architecture
Integration Patterns
Workflow Definition and Execution
# User Registration Workflow Definition
apiVersion: workflow/v1
kind: Workflow
metadata:
name: user-registration-workflow
version: "1.2.0"
description: "Complete user registration process with validation and notifications"
tags: ["user-management", "registration", "onboarding"]
spec:
# Workflow configuration
config:
timeout: "30m"
retry_policy:
max_attempts: 3
backoff: "exponential"
initial_delay: "5s"
max_delay: "5m"
# Input parameters
input:
schema:
type: object
properties:
username:
type: string
minLength: 3
maxLength: 50
email:
type: string
format: email
password:
type: string
minLength: 8
profile_data:
type: object
required: ["username", "email", "password"]
# Workflow steps
steps:
# Step 1: Validate user input
- name: validate_input
type: validation
config:
validation_rules:
- field: "email"
rule: "unique_email"
service: "user-service"
- field: "username"
rule: "unique_username"
service: "user-service"
on_failure:
action: "terminate"
message: "Validation failed"
# Step 2: Create user account
- name: create_user_account
type: service_call
depends_on: ["validate_input"]
config:
service: "user-service"
endpoint: "/api/users"
method: "POST"
payload:
username: "{{ input.username }}"
email: "{{ input.email }}"
password_hash: "{{ hash(input.password) }}"
status: "pending"
output:
user_id: "{{ response.id }}"
on_failure:
action: "retry"
max_attempts: 3
# Step 3: Create user profile (parallel with Step 4)
- name: create_user_profile
type: service_call
depends_on: ["create_user_account"]
config:
service: "profile-service"
endpoint: "/api/profiles"
method: "POST"
payload:
user_id: "{{ steps.create_user_account.output.user_id }}"
profile_data: "{{ input.profile_data }}"
parallel: true
# Step 4: Send welcome email (parallel with Step 3)
- name: send_welcome_email
type: service_call
depends_on: ["create_user_account"]
config:
service: "notification-service"
endpoint: "/api/emails/send"
method: "POST"
payload:
to: "{{ input.email }}"
template: "welcome_email"
variables:
username: "{{ input.username }}"
user_id: "{{ steps.create_user_account.output.user_id }}"
parallel: true
on_failure:
action: "continue" # Don't fail workflow if email fails
# Step 5: Wait for email verification
- name: wait_for_verification
type: wait_for_event
depends_on: ["send_welcome_email"]
config:
event_type: "user.email_verified"
timeout: "24h"
filter:
user_id: "{{ steps.create_user_account.output.user_id }}"
on_timeout:
action: "compensate"
compensation_workflow: "user-cleanup-workflow"
# Step 6: Activate user account
- name: activate_user_account
type: service_call
depends_on: ["wait_for_verification", "create_user_profile"]
config:
service: "user-service"
endpoint: "/api/users/{{ steps.create_user_account.output.user_id }}/activate"
method: "PUT"
# Step 7: Send activation notification
- name: send_activation_notification
type: service_call
depends_on: ["activate_user_account"]
config:
service: "notification-service"
endpoint: "/api/notifications/send"
method: "POST"
payload:
user_id: "{{ steps.create_user_account.output.user_id }}"
type: "account_activated"
message: "Your account has been successfully activated!"
# Compensation workflow for failure scenarios
compensation:
steps:
- name: cleanup_user_account
type: service_call
config:
service: "user-service"
endpoint: "/api/users/{{ steps.create_user_account.output.user_id }}"
method: "DELETE"
- name: cleanup_user_profile
type: service_call
condition: "{{ steps.create_user_profile.completed }}"
config:
service: "profile-service"
endpoint: "/api/profiles/user/{{ steps.create_user_account.output.user_id }}"
method: "DELETE"
# Output specification
output:
schema:
type: object
properties:
user_id:
type: string
status:
type: string
activation_date:
type: string
format: date-timeWorkflow Engine Implementation
import asyncio
import json
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import uuid
class WorkflowStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
COMPENSATING = "compensating"
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
RETRYING = "retrying"
@dataclass
class WorkflowExecution:
workflow_id: str
execution_id: str
status: WorkflowStatus
input_data: Dict[str, Any]
output_data: Dict[str, Any] = field(default_factory=dict)
context: Dict[str, Any] = field(default_factory=dict)
started_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
@dataclass
class TaskExecution:
task_name: str
execution_id: str
status: TaskStatus
input_data: Dict[str, Any]
output_data: Dict[str, Any] = field(default_factory=dict)
started_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
retry_count: int = 0
error_message: Optional[str] = None
class WorkflowEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.active_workflows = {}
self.task_handlers = {}
self.event_handlers = {}
self.running = False
# Initialize components
self.state_manager = StateManager(config.get('state_store'))
self.task_executor = TaskExecutor(config.get('execution'))
self.event_subscriber = EventSubscriber(config.get('events'))
self._register_default_handlers()
def _register_default_handlers(self):
"""Register default task handlers"""
self.task_handlers.update({
'service_call': self._handle_service_call,
'validation': self._handle_validation,
'wait_for_event': self._handle_wait_for_event,
'parallel_group': self._handle_parallel_group,
'conditional': self._handle_conditional,
'loop': self._handle_loop
})
async def start_workflow(self, workflow_definition: Dict[str, Any],
input_data: Dict[str, Any]) -> str:
"""Start a new workflow execution"""
execution_id = str(uuid.uuid4())
workflow_id = workflow_definition['metadata']['name']
# Create workflow execution
execution = WorkflowExecution(
workflow_id=workflow_id,
execution_id=execution_id,
status=WorkflowStatus.PENDING,
input_data=input_data
)
# Validate input against schema
await self._validate_input(workflow_definition, input_data)
# Store workflow state
await self.state_manager.save_workflow_state(execution)
# Start execution
self.active_workflows[execution_id] = {
'definition': workflow_definition,
'execution': execution,
'task_queue': asyncio.Queue(),
'completed_tasks': set(),
'running_tasks': set()
}
# Queue initial tasks
await self._queue_ready_tasks(execution_id)
# Start workflow execution in background
asyncio.create_task(self._execute_workflow(execution_id))
return execution_id
async def _execute_workflow(self, execution_id: str):
"""Execute workflow tasks"""
workflow_context = self.active_workflows[execution_id]
execution = workflow_context['execution']
definition = workflow_context['definition']
task_queue = workflow_context['task_queue']
try:
execution.status = WorkflowStatus.RUNNING
await self.state_manager.save_workflow_state(execution)
# Process tasks until completion
while not task_queue.empty() or workflow_context['running_tasks']:
# Get next task
if not task_queue.empty():
task_def = await task_queue.get()
task_execution = TaskExecution(
task_name=task_def['name'],
execution_id=execution_id,
status=TaskStatus.PENDING,
input_data=self._resolve_task_input(task_def, execution)
)
# Execute task
workflow_context['running_tasks'].add(task_def['name'])
asyncio.create_task(
self._execute_task(execution_id, task_def, task_execution)
)
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
# Check if all tasks completed successfully
if self._all_tasks_completed(workflow_context):
execution.status = WorkflowStatus.COMPLETED
execution.completed_at = datetime.utcnow()
execution.output_data = self._build_workflow_output(
definition, workflow_context
)
else:
execution.status = WorkflowStatus.FAILED
execution.error_message = "Some tasks failed to complete"
except Exception as e:
execution.status = WorkflowStatus.FAILED
execution.error_message = str(e)
# Execute compensation if defined
if 'compensation' in definition['spec']:
await self._execute_compensation(execution_id, definition['spec']['compensation'])
finally:
execution.completed_at = datetime.utcnow()
await self.state_manager.save_workflow_state(execution)
# Cleanup
if execution_id in self.active_workflows:
del self.active_workflows[execution_id]
async def _execute_task(self, execution_id: str, task_def: Dict[str, Any],
task_execution: TaskExecution):
"""Execute individual task"""
workflow_context = self.active_workflows[execution_id]
try:
task_execution.status = TaskStatus.RUNNING
# Get task handler
task_type = task_def.get('type', 'service_call')
handler = self.task_handlers.get(task_type)
if not handler:
raise ValueError(f"Unknown task type: {task_type}")
# Execute task with retry logic
max_attempts = task_def.get('config', {}).get('max_attempts', 1)
for attempt in range(max_attempts):
try:
task_execution.retry_count = attempt
result = await handler(task_def, task_execution)
task_execution.output_data = result
task_execution.status = TaskStatus.COMPLETED
task_execution.completed_at = datetime.utcnow()
break
except Exception as e:
if attempt == max_attempts - 1:
# Last attempt failed
task_execution.status = TaskStatus.FAILED
task_execution.error_message = str(e)
break
else:
# Retry with backoff
task_execution.status = TaskStatus.RETRYING
backoff_delay = self._calculate_backoff(attempt, task_def)
await asyncio.sleep(backoff_delay)
except Exception as e:
task_execution.status = TaskStatus.FAILED
task_execution.error_message = str(e)
finally:
# Update workflow context
workflow_context['running_tasks'].discard(task_execution.task_name)
workflow_context['completed_tasks'].add(task_execution.task_name)
# Store task state
await self.state_manager.save_task_state(task_execution)
# Queue dependent tasks
await self._queue_dependent_tasks(execution_id, task_execution.task_name)
async def _handle_service_call(self, task_def: Dict[str, Any],
task_execution: TaskExecution) -> Dict[str, Any]:
"""Handle service call task"""
config = task_def['config']
service = config['service']
endpoint = config['endpoint']
method = config.get('method', 'GET')
payload = config.get('payload', {})
# Resolve template variables in payload
resolved_payload = self._resolve_templates(payload, task_execution.input_data)
# Make service call
response = await self.task_executor.call_service(
service, endpoint, method, resolved_payload
)
return response
async def _handle_validation(self, task_def: Dict[str, Any],
task_execution: TaskExecution) -> Dict[str, Any]:
"""Handle validation task"""
config = task_def['config']
validation_rules = config['validation_rules']
for rule in validation_rules:
field = rule['field']
rule_type = rule['rule']
service = rule.get('service')
field_value = task_execution.input_data.get(field)
# Execute validation rule
is_valid = await self.task_executor.validate_field(
field_value, rule_type, service
)
if not is_valid:
raise ValueError(f"Validation failed for field: {field}")
return {"validation_result": "passed"}
async def _handle_wait_for_event(self, task_def: Dict[str, Any],
task_execution: TaskExecution) -> Dict[str, Any]:
"""Handle wait for event task"""
config = task_def['config']
event_type = config['event_type']
timeout = config.get('timeout', '1h')
event_filter = config.get('filter', {})
# Parse timeout
timeout_seconds = self._parse_duration(timeout)
# Wait for event
event_data = await self.event_subscriber.wait_for_event(
event_type, event_filter, timeout_seconds
)
return {"event_data": event_data}
# Usage example
workflow_config = {
'state_store': {
'type': 'postgresql',
'connection_string': 'postgresql://workflow:password@db.sindhan.ai/workflows'
},
'execution': {
'max_concurrent_tasks': 10,
'default_timeout': '30m'
},
'events': {
'kafka_servers': ['kafka.sindhan.ai:9092'],
'group_id': 'workflow-engine'
}
}
# Initialize workflow engine
engine = WorkflowEngine(workflow_config)
# Load workflow definition
with open('user-registration-workflow.yaml', 'r') as f:
workflow_def = yaml.safe_load(f)
# Start workflow execution
input_data = {
'username': 'john_doe',
'email': 'john@example.com',
'password': 'secure_password123',
'profile_data': {
'first_name': 'John',
'last_name': 'Doe',
'phone': '+1-555-0123'
}
}
execution_id = await engine.start_workflow(workflow_def, input_data)
print(f"Started workflow execution: {execution_id}")
# Monitor workflow progress
while True:
status = await engine.get_workflow_status(execution_id)
print(f"Workflow status: {status.status}")
if status.status in [WorkflowStatus.COMPLETED, WorkflowStatus.FAILED]:
break
await asyncio.sleep(5)
print(f"Workflow completed with output: {status.output_data}")Implementation Roadmap
Phase 1: Foundation (Planned)
Status: 📋 Target v1.0.0 - Q3 2024
- Core workflow engine with basic orchestration
- YAML-based workflow definitions
- Sequential and parallel task execution
- Basic state management and persistence
- Simple error handling and retry mechanisms
- REST API for workflow management
Phase 2: Advanced Features (Planned)
Status: 📋 Target v1.5.0 - Q4 2024
- Visual workflow designer and editor
- Advanced error handling and compensation patterns
- Event-driven workflow triggers
- Conditional branching and loops
- Workflow versioning and migration
- Comprehensive monitoring and analytics
Phase 3: Intelligence and Optimization (Planned)
Status: 📋 Target v2.0.0 - Q1 2025
- AI-powered workflow optimization
- Dynamic task routing and load balancing
- Predictive workflow analytics
- Auto-scaling and resource optimization
- Advanced integration patterns
- Machine learning-driven process improvement
Benefits and Value
Process Benefits
- Automation: Eliminate manual coordination of complex business processes
- Consistency: Ensure consistent execution of business logic across all instances
- Reliability: Built-in error handling and recovery mechanisms
- Visibility: Complete transparency into process execution and performance
Operational Benefits
- Scalability: Distributed execution with horizontal scaling capabilities
- Maintainability: Declarative workflow definitions separate from implementation
- Monitoring: Real-time visibility into workflow performance and issues
- Integration: Seamless integration with existing services and systems
Business Benefits
- Efficiency: Faster process execution through automation
- Quality: Reduced errors through consistent process execution
- Agility: Rapid deployment of new business processes
- Compliance: Built-in audit trails and process documentation
Related Services
Direct Dependencies
- Event & Messaging: Event-driven workflow triggers and notifications
- Data Persistence: Workflow state and execution history storage
- Configuration Management: Workflow configuration and secrets
Service Integrations
- Service Discovery: Dynamic service endpoint resolution
- Security & Authentication: Secure workflow execution and access control
- Platform Observability: Workflow monitoring and performance metrics
Consuming Services
- Business Process Services: Complex multi-step business logic automation
- AI Agent Coordination: Orchestration of AI agent interactions and workflows
- Data Processing Pipelines: ETL and data transformation workflows
- Integration Processes: Third-party system integration and data synchronization
The Workflow Orchestration Service provides the coordination layer that enables complex business processes to be automated, monitored, and optimized across the entire Sindhan AI platform.