Service Discovery Service
The Service Discovery Service provides dynamic service registration, discovery, and health monitoring capabilities across all Sindhan AI platform components. It enables services to find and communicate with each other without hardcoded dependencies, supporting scalable microservices architecture.
Overview and Purpose
Service Discovery is a critical infrastructure service that enables dynamic service-to-service communication in a distributed microservices environment. It provides automatic service registration, health monitoring, load balancing, and failover capabilities that are essential for building resilient and scalable systems.
Key Benefits
- Dynamic Service Location: Automatic discovery of service instances
- Health Monitoring: Continuous health checking and failover
- Load Balancing: Intelligent traffic distribution across instances
- Zero Configuration: Automatic service registration and deregistration
- Service Mesh Integration: Seamless integration with Istio and other service meshes
- Multi-Environment Support: Consistent service discovery across environments
Implementation Status
| Phase | Status | Description |
|---|---|---|
| Phase 1 | ✅ Implemented | Basic service registration, discovery, health checks |
| Phase 2 | 📋 Planned | Advanced load balancing, service mesh integration, multi-region support |
| Phase 3 | 📋 Planned | AI-powered routing, predictive scaling, advanced analytics |
Current Version: v1.2.0 Next Release: v1.5.0 (Q2 2024)
Core Capabilities
1. Service Registration and Deregistration
- Automatic service registration on startup
- Graceful deregistration on shutdown
- Service metadata and tags support
- Version-aware service registration
- Environment-specific service namespacing
2. Service Discovery and Resolution
- DNS-based service discovery
- HTTP API for service lookup
- Client-side and server-side load balancing
- Service topology mapping
- Cross-datacenter service discovery
3. Health Monitoring and Management
- HTTP, TCP, and custom health checks
- Configurable health check intervals
- Automatic unhealthy service removal
- Health status aggregation and reporting
- Dependency health propagation
4. Load Balancing and Traffic Management
- Multiple load balancing algorithms (round-robin, least connections, weighted)
- Traffic shaping and rate limiting
- Circuit breaker pattern implementation
- Canary deployments and A/B testing support
- Geographic routing and affinity
5. Service Mesh Integration
- Istio service mesh compatibility
- Envoy proxy configuration
- mTLS certificate distribution
- Traffic policy enforcement
- Observability integration
6. Configuration and Policy Management
- Service-specific routing policies
- Traffic splitting configurations
- Retry and timeout policies
- Security policies and access control
- Configuration hot reloading
Architecture
Integration Patterns
Automatic Service Registration
import requests
import socket
import threading
import time
from typing import Dict, List, Optional
class ServiceDiscoveryClient:
def __init__(self, discovery_url: str, service_name: str,
service_port: int, health_check_path: str = '/health'):
self.discovery_url = discovery_url
self.service_name = service_name
self.service_port = service_port
self.health_check_path = health_check_path
self.service_id = f"{service_name}-{socket.gethostname()}-{service_port}"
self.registration_data = None
self.heartbeat_thread = None
self.running = False
def register(self, tags: List[str] = None, metadata: Dict[str, str] = None):
"""Register service with discovery service"""
self.registration_data = {
'id': self.service_id,
'name': self.service_name,
'address': self._get_local_ip(),
'port': self.service_port,
'tags': tags or [],
'metadata': metadata or {},
'health_check': {
'http': f"http://{self._get_local_ip()}:{self.service_port}{self.health_check_path}",
'interval': '10s',
'timeout': '3s',
'deregister_critical_service_after': '30s'
}
}
response = requests.post(
f"{self.discovery_url}/v1/agent/service/register",
json=self.registration_data
)
if response.status_code == 200:
print(f"Service {self.service_name} registered successfully")
self._start_heartbeat()
else:
raise Exception(f"Service registration failed: {response.text}")
def deregister(self):
"""Deregister service from discovery service"""
self.running = False
if self.heartbeat_thread:
self.heartbeat_thread.join()
response = requests.put(
f"{self.discovery_url}/v1/agent/service/deregister/{self.service_id}"
)
if response.status_code == 200:
print(f"Service {self.service_name} deregistered successfully")
else:
print(f"Service deregistration failed: {response.text}")
def discover_service(self, service_name: str, healthy_only: bool = True) -> List[Dict]:
"""Discover instances of a service"""
params = {'passing': 'true'} if healthy_only else {}
response = requests.get(
f"{self.discovery_url}/v1/health/service/{service_name}",
params=params
)
if response.status_code == 200:
services = response.json()
return [
{
'id': service['Service']['ID'],
'address': service['Service']['Address'],
'port': service['Service']['Port'],
'tags': service['Service']['Tags'],
'metadata': service['Service']['Meta']
}
for service in services
]
else:
return []
def get_service_url(self, service_name: str, load_balance: bool = True) -> Optional[str]:
"""Get URL for a service instance"""
instances = self.discover_service(service_name)
if not instances:
return None
if load_balance:
# Simple round-robin load balancing
instance = instances[hash(time.time()) % len(instances)]
else:
instance = instances[0]
return f"http://{instance['address']}:{instance['port']}"
def _get_local_ip(self):
"""Get local IP address"""
try:
# Connect to a non-routable address to determine local IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('10.254.254.254', 80))
return s.getsockname()[0]
except Exception:
return '127.0.0.1'
def _start_heartbeat(self):
"""Start heartbeat thread for service health"""
self.running = True
self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
def _heartbeat_loop(self):
"""Heartbeat loop to maintain service registration"""
while self.running:
try:
# Send heartbeat
response = requests.put(
f"{self.discovery_url}/v1/agent/check/pass/service:{self.service_id}"
)
if response.status_code != 200:
print(f"Heartbeat failed: {response.text}")
except Exception as e:
print(f"Heartbeat error: {e}")
time.sleep(30) # Send heartbeat every 30 seconds
# Usage example
discovery_client = ServiceDiscoveryClient(
discovery_url='http://consul.sindhan.ai:8500',
service_name='user-service',
service_port=8080,
health_check_path='/health'
)
# Register service on startup
discovery_client.register(
tags=['api', 'user-management'],
metadata={'version': '1.2.3', 'environment': 'production'}
)
# Discover and call another service
payment_service_url = discovery_client.get_service_url('payment-service')
if payment_service_url:
response = requests.post(f"{payment_service_url}/api/charge", json=payment_data)Service Mesh Integration with Istio
# Service registration via Kubernetes service
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: sindhan-platform
labels:
app: user-service
version: v1.2.3
annotations:
service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
spec:
selector:
app: user-service
ports:
- name: http
port: 8080
targetPort: 8080
- name: grpc
port: 9090
targetPort: 9090
---
# Istio DestinationRule for load balancing
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service
namespace: sindhan-platform
spec:
host: user-service
trafficPolicy:
loadBalancer:
simple: LEAST_CONN
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 10
maxRequestsPerConnection: 2
outlierDetection:
consecutiveErrors: 3
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
subsets:
- name: v1
labels:
version: v1.2.3
- name: canary
labels:
version: v1.3.0-canary
---
# VirtualService for traffic routing
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service
namespace: sindhan-platform
spec:
hosts:
- user-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: user-service
subset: canary
weight: 100
- route:
- destination:
host: user-service
subset: v1
weight: 90
- destination:
host: user-service
subset: canary
weight: 10
fault:
delay:
percentage:
value: 0.1
fixedDelay: 5s
retries:
attempts: 3
perTryTimeout: 10sClient-Side Load Balancing
package servicediscovery
import (
"context"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/hashicorp/consul/api"
)
type ServiceInstance struct {
ID string
Address string
Port int
Tags []string
Metadata map[string]string
Healthy bool
}
type LoadBalancer interface {
Choose(instances []ServiceInstance) *ServiceInstance
}
type RoundRobinLB struct {
counter uint64
mutex sync.Mutex
}
func (lb *RoundRobinLB) Choose(instances []ServiceInstance) *ServiceInstance {
if len(instances) == 0 {
return nil
}
lb.mutex.Lock()
defer lb.mutex.Unlock()
lb.counter++
index := lb.counter % uint64(len(instances))
return &instances[index]
}
type ServiceClient struct {
consulClient *api.Client
loadBalancer LoadBalancer
httpClient *http.Client
cache map[string][]ServiceInstance
cacheMutex sync.RWMutex
cacheTTL time.Duration
}
func NewServiceClient(consulAddr string) (*ServiceClient, error) {
config := api.DefaultConfig()
config.Address = consulAddr
consulClient, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceClient{
consulClient: consulClient,
loadBalancer: &RoundRobinLB{},
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
cache: make(map[string][]ServiceInstance),
cacheTTL: 30 * time.Second,
}, nil
}
func (sc *ServiceClient) DiscoverService(serviceName string) ([]ServiceInstance, error) {
// Check cache first
sc.cacheMutex.RLock()
if instances, exists := sc.cache[serviceName]; exists {
sc.cacheMutex.RUnlock()
return instances, nil
}
sc.cacheMutex.RUnlock()
// Query Consul for healthy service instances
services, _, err := sc.consulClient.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
}
var instances []ServiceInstance
for _, service := range services {
instance := ServiceInstance{
ID: service.Service.ID,
Address: service.Service.Address,
Port: service.Service.Port,
Tags: service.Service.Tags,
Metadata: service.Service.Meta,
Healthy: true,
}
instances = append(instances, instance)
}
// Update cache
sc.cacheMutex.Lock()
sc.cache[serviceName] = instances
sc.cacheMutex.Unlock()
// Set cache expiration
go func() {
time.Sleep(sc.cacheTTL)
sc.cacheMutex.Lock()
delete(sc.cache, serviceName)
sc.cacheMutex.Unlock()
}()
return instances, nil
}
func (sc *ServiceClient) CallService(serviceName, path string, method string, body []byte) (*http.Response, error) {
instances, err := sc.DiscoverService(serviceName)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no healthy instances found for service %s", serviceName)
}
// Choose instance using load balancer
instance := sc.loadBalancer.Choose(instances)
if instance == nil {
return nil, fmt.Errorf("load balancer returned no instance")
}
// Construct URL
url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
// Create request
req, err := http.NewRequest(method, url, strings.NewReader(string(body)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Service-Discovery", "consul")
// Make request with retry logic
maxRetries := 3
for i := 0; i < maxRetries; i++ {
resp, err := sc.httpClient.Do(req)
if err == nil && resp.StatusCode < 500 {
return resp, nil
}
if i < maxRetries-1 {
// Try different instance on failure
instances = sc.removeInstance(instances, instance.ID)
if len(instances) > 0 {
instance = sc.loadBalancer.Choose(instances)
url = fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
req.URL, _ = req.URL.Parse(url)
}
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}
}
return nil, fmt.Errorf("all retry attempts failed for service %s", serviceName)
}
func (sc *ServiceClient) removeInstance(instances []ServiceInstance, instanceID string) []ServiceInstance {
var result []ServiceInstance
for _, instance := range instances {
if instance.ID != instanceID {
result = append(result, instance)
}
}
return result
}
// Usage example
client, err := NewServiceClient("consul.sindhan.ai:8500")
if err != nil {
log.Fatal(err)
}
// Call payment service
response, err := client.CallService("payment-service", "/api/charge", "POST", paymentData)
if err != nil {
log.Printf("Failed to call payment service: %v", err)
return
}
defer response.Body.Close()Implementation Roadmap
Phase 1: Foundation (Completed)
Status: ✅ Released v1.0.0
- Basic service registration and discovery
- HTTP and TCP health checks
- DNS-based service resolution
- Integration with Consul service registry
- Basic load balancing algorithms
- Kubernetes service integration
Phase 2: Advanced Features (Planned)
Status: 📋 Target v1.5.0 - Q2 2024
- Advanced load balancing strategies (weighted, geo-aware)
- Service mesh integration with Istio
- Circuit breaker and retry mechanisms
- Canary deployment support
- Multi-region service discovery
- Advanced health check types
Phase 3: Intelligence and Automation (Planned)
Status: 📋 Target v2.0.0 - Q3 2024
- AI-powered traffic routing and optimization
- Predictive scaling based on service discovery patterns
- Automated service dependency mapping
- Performance-based routing decisions
- Service topology visualization
- Advanced analytics and reporting
Benefits and Value
Operational Benefits
- Dynamic Scaling: Automatic service instance discovery enables horizontal scaling
- High Availability: Health checking and failover ensure service reliability
- Zero Downtime: Rolling updates without service interruption
- Load Distribution: Intelligent traffic distribution across service instances
Developer Benefits
- Simplified Configuration: No hardcoded service endpoints
- Service Abstraction: Services communicate by name, not IP addresses
- Development Flexibility: Easy testing with different service configurations
- Dependency Management: Clear service dependency visualization
Business Benefits
- Cost Optimization: Efficient resource utilization through load balancing
- Improved Performance: Reduced latency through intelligent routing
- Enhanced Reliability: Automatic failover and health monitoring
- Faster Development: Reduced complexity in service integration
Related Services
Direct Dependencies
- Configuration Management: Service discovery configuration and policies
- Platform Observability: Service health monitoring and metrics
- Security & Authentication: Secure service registration and discovery
Service Integrations
- Event & Messaging: Service lifecycle event notifications
- Data Persistence: Service registry data storage
- Resource Management: Integration with container orchestration
Consuming Services
- All Microservices: Every service uses discovery for inter-service communication
- Load Balancers: Dynamic backend configuration based on service discovery
- API Gateway: Dynamic routing to backend services
- Monitoring Systems: Service topology and health monitoring
The Service Discovery Service is essential for building scalable, resilient microservices architectures that can adapt dynamically to changing infrastructure and traffic patterns.