🚀Transform your business with AI-powered process optimization
Infrastructure Services
Service Discovery

Service Discovery Service

The Service Discovery Service provides dynamic service registration, discovery, and health monitoring capabilities across all Sindhan AI platform components. It enables services to find and communicate with each other without hardcoded dependencies, supporting scalable microservices architecture.

Overview and Purpose

Service Discovery is a critical infrastructure service that enables dynamic service-to-service communication in a distributed microservices environment. It provides automatic service registration, health monitoring, load balancing, and failover capabilities that are essential for building resilient and scalable systems.

Key Benefits

  • Dynamic Service Location: Automatic discovery of service instances
  • Health Monitoring: Continuous health checking and failover
  • Load Balancing: Intelligent traffic distribution across instances
  • Zero Configuration: Automatic service registration and deregistration
  • Service Mesh Integration: Seamless integration with Istio and other service meshes
  • Multi-Environment Support: Consistent service discovery across environments

Implementation Status

PhaseStatusDescription
Phase 1ImplementedBasic service registration, discovery, health checks
Phase 2📋 PlannedAdvanced load balancing, service mesh integration, multi-region support
Phase 3📋 PlannedAI-powered routing, predictive scaling, advanced analytics

Current Version: v1.2.0 Next Release: v1.5.0 (Q2 2024)

Core Capabilities

1. Service Registration and Deregistration

  • Automatic service registration on startup
  • Graceful deregistration on shutdown
  • Service metadata and tags support
  • Version-aware service registration
  • Environment-specific service namespacing

2. Service Discovery and Resolution

  • DNS-based service discovery
  • HTTP API for service lookup
  • Client-side and server-side load balancing
  • Service topology mapping
  • Cross-datacenter service discovery

3. Health Monitoring and Management

  • HTTP, TCP, and custom health checks
  • Configurable health check intervals
  • Automatic unhealthy service removal
  • Health status aggregation and reporting
  • Dependency health propagation

4. Load Balancing and Traffic Management

  • Multiple load balancing algorithms (round-robin, least connections, weighted)
  • Traffic shaping and rate limiting
  • Circuit breaker pattern implementation
  • Canary deployments and A/B testing support
  • Geographic routing and affinity

5. Service Mesh Integration

  • Istio service mesh compatibility
  • Envoy proxy configuration
  • mTLS certificate distribution
  • Traffic policy enforcement
  • Observability integration

6. Configuration and Policy Management

  • Service-specific routing policies
  • Traffic splitting configurations
  • Retry and timeout policies
  • Security policies and access control
  • Configuration hot reloading

Architecture

Integration Patterns

Automatic Service Registration

import requests
import socket
import threading
import time
from typing import Dict, List, Optional
 
class ServiceDiscoveryClient:
    def __init__(self, discovery_url: str, service_name: str, 
                 service_port: int, health_check_path: str = '/health'):
        self.discovery_url = discovery_url
        self.service_name = service_name
        self.service_port = service_port
        self.health_check_path = health_check_path
        self.service_id = f"{service_name}-{socket.gethostname()}-{service_port}"
        self.registration_data = None
        self.heartbeat_thread = None
        self.running = False
    
    def register(self, tags: List[str] = None, metadata: Dict[str, str] = None):
        """Register service with discovery service"""
        self.registration_data = {
            'id': self.service_id,
            'name': self.service_name,
            'address': self._get_local_ip(),
            'port': self.service_port,
            'tags': tags or [],
            'metadata': metadata or {},
            'health_check': {
                'http': f"http://{self._get_local_ip()}:{self.service_port}{self.health_check_path}",
                'interval': '10s',
                'timeout': '3s',
                'deregister_critical_service_after': '30s'
            }
        }
        
        response = requests.post(
            f"{self.discovery_url}/v1/agent/service/register",
            json=self.registration_data
        )
        
        if response.status_code == 200:
            print(f"Service {self.service_name} registered successfully")
            self._start_heartbeat()
        else:
            raise Exception(f"Service registration failed: {response.text}")
    
    def deregister(self):
        """Deregister service from discovery service"""
        self.running = False
        if self.heartbeat_thread:
            self.heartbeat_thread.join()
        
        response = requests.put(
            f"{self.discovery_url}/v1/agent/service/deregister/{self.service_id}"
        )
        
        if response.status_code == 200:
            print(f"Service {self.service_name} deregistered successfully")
        else:
            print(f"Service deregistration failed: {response.text}")
    
    def discover_service(self, service_name: str, healthy_only: bool = True) -> List[Dict]:
        """Discover instances of a service"""
        params = {'passing': 'true'} if healthy_only else {}
        
        response = requests.get(
            f"{self.discovery_url}/v1/health/service/{service_name}",
            params=params
        )
        
        if response.status_code == 200:
            services = response.json()
            return [
                {
                    'id': service['Service']['ID'],
                    'address': service['Service']['Address'],
                    'port': service['Service']['Port'],
                    'tags': service['Service']['Tags'],
                    'metadata': service['Service']['Meta']
                }
                for service in services
            ]
        else:
            return []
    
    def get_service_url(self, service_name: str, load_balance: bool = True) -> Optional[str]:
        """Get URL for a service instance"""
        instances = self.discover_service(service_name)
        
        if not instances:
            return None
        
        if load_balance:
            # Simple round-robin load balancing
            instance = instances[hash(time.time()) % len(instances)]
        else:
            instance = instances[0]
        
        return f"http://{instance['address']}:{instance['port']}"
    
    def _get_local_ip(self):
        """Get local IP address"""
        try:
            # Connect to a non-routable address to determine local IP
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                s.connect(('10.254.254.254', 80))
                return s.getsockname()[0]
        except Exception:
            return '127.0.0.1'
    
    def _start_heartbeat(self):
        """Start heartbeat thread for service health"""
        self.running = True
        self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop)
        self.heartbeat_thread.daemon = True
        self.heartbeat_thread.start()
    
    def _heartbeat_loop(self):
        """Heartbeat loop to maintain service registration"""
        while self.running:
            try:
                # Send heartbeat
                response = requests.put(
                    f"{self.discovery_url}/v1/agent/check/pass/service:{self.service_id}"
                )
                
                if response.status_code != 200:
                    print(f"Heartbeat failed: {response.text}")
                
            except Exception as e:
                print(f"Heartbeat error: {e}")
            
            time.sleep(30)  # Send heartbeat every 30 seconds
 
# Usage example
discovery_client = ServiceDiscoveryClient(
    discovery_url='http://consul.sindhan.ai:8500',
    service_name='user-service',
    service_port=8080,
    health_check_path='/health'
)
 
# Register service on startup
discovery_client.register(
    tags=['api', 'user-management'],
    metadata={'version': '1.2.3', 'environment': 'production'}
)
 
# Discover and call another service
payment_service_url = discovery_client.get_service_url('payment-service')
if payment_service_url:
    response = requests.post(f"{payment_service_url}/api/charge", json=payment_data)

Service Mesh Integration with Istio

# Service registration via Kubernetes service
apiVersion: v1
kind: Service
metadata:
  name: user-service
  namespace: sindhan-platform
  labels:
    app: user-service
    version: v1.2.3
  annotations:
    service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
spec:
  selector:
    app: user-service
  ports:
  - name: http
    port: 8080
    targetPort: 8080
  - name: grpc
    port: 9090
    targetPort: 9090
 
---
# Istio DestinationRule for load balancing
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: user-service
  namespace: sindhan-platform
spec:
  host: user-service
  trafficPolicy:
    loadBalancer:
      simple: LEAST_CONN
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 10
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 3
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
  subsets:
  - name: v1
    labels:
      version: v1.2.3
  - name: canary
    labels:
      version: v1.3.0-canary
 
---
# VirtualService for traffic routing
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service
  namespace: sindhan-platform
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        canary:
          exact: "true"
    route:
    - destination:
        host: user-service
        subset: canary
      weight: 100
  - route:
    - destination:
        host: user-service
        subset: v1
      weight: 90
    - destination:
        host: user-service
        subset: canary
      weight: 10
    fault:
      delay:
        percentage:
          value: 0.1
        fixedDelay: 5s
    retries:
      attempts: 3
      perTryTimeout: 10s

Client-Side Load Balancing

package servicediscovery
 
import (
    "context"
    "fmt"
    "math/rand"
    "net/http"
    "sync"
    "time"
    
    "github.com/hashicorp/consul/api"
)
 
type ServiceInstance struct {
    ID       string
    Address  string
    Port     int
    Tags     []string
    Metadata map[string]string
    Healthy  bool
}
 
type LoadBalancer interface {
    Choose(instances []ServiceInstance) *ServiceInstance
}
 
type RoundRobinLB struct {
    counter uint64
    mutex   sync.Mutex
}
 
func (lb *RoundRobinLB) Choose(instances []ServiceInstance) *ServiceInstance {
    if len(instances) == 0 {
        return nil
    }
    
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    lb.counter++
    index := lb.counter % uint64(len(instances))
    return &instances[index]
}
 
type ServiceClient struct {
    consulClient *api.Client
    loadBalancer LoadBalancer
    httpClient   *http.Client
    cache        map[string][]ServiceInstance
    cacheMutex   sync.RWMutex
    cacheTTL     time.Duration
}
 
func NewServiceClient(consulAddr string) (*ServiceClient, error) {
    config := api.DefaultConfig()
    config.Address = consulAddr
    
    consulClient, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }
    
    return &ServiceClient{
        consulClient: consulClient,
        loadBalancer: &RoundRobinLB{},
        httpClient: &http.Client{
            Timeout: 30 * time.Second,
        },
        cache:    make(map[string][]ServiceInstance),
        cacheTTL: 30 * time.Second,
    }, nil
}
 
func (sc *ServiceClient) DiscoverService(serviceName string) ([]ServiceInstance, error) {
    // Check cache first
    sc.cacheMutex.RLock()
    if instances, exists := sc.cache[serviceName]; exists {
        sc.cacheMutex.RUnlock()
        return instances, nil
    }
    sc.cacheMutex.RUnlock()
    
    // Query Consul for healthy service instances
    services, _, err := sc.consulClient.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
    }
    
    var instances []ServiceInstance
    for _, service := range services {
        instance := ServiceInstance{
            ID:       service.Service.ID,
            Address:  service.Service.Address,
            Port:     service.Service.Port,
            Tags:     service.Service.Tags,
            Metadata: service.Service.Meta,
            Healthy:  true,
        }
        instances = append(instances, instance)
    }
    
    // Update cache
    sc.cacheMutex.Lock()
    sc.cache[serviceName] = instances
    sc.cacheMutex.Unlock()
    
    // Set cache expiration
    go func() {
        time.Sleep(sc.cacheTTL)
        sc.cacheMutex.Lock()
        delete(sc.cache, serviceName)
        sc.cacheMutex.Unlock()
    }()
    
    return instances, nil
}
 
func (sc *ServiceClient) CallService(serviceName, path string, method string, body []byte) (*http.Response, error) {
    instances, err := sc.DiscoverService(serviceName)
    if err != nil {
        return nil, err
    }
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("no healthy instances found for service %s", serviceName)
    }
    
    // Choose instance using load balancer
    instance := sc.loadBalancer.Choose(instances)
    if instance == nil {
        return nil, fmt.Errorf("load balancer returned no instance")
    }
    
    // Construct URL
    url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
    
    // Create request
    req, err := http.NewRequest(method, url, strings.NewReader(string(body)))
    if err != nil {
        return nil, err
    }
    
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Service-Discovery", "consul")
    
    // Make request with retry logic
    maxRetries := 3
    for i := 0; i < maxRetries; i++ {
        resp, err := sc.httpClient.Do(req)
        if err == nil && resp.StatusCode < 500 {
            return resp, nil
        }
        
        if i < maxRetries-1 {
            // Try different instance on failure
            instances = sc.removeInstance(instances, instance.ID)
            if len(instances) > 0 {
                instance = sc.loadBalancer.Choose(instances)
                url = fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
                req.URL, _ = req.URL.Parse(url)
            }
            
            time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
        }
    }
    
    return nil, fmt.Errorf("all retry attempts failed for service %s", serviceName)
}
 
func (sc *ServiceClient) removeInstance(instances []ServiceInstance, instanceID string) []ServiceInstance {
    var result []ServiceInstance
    for _, instance := range instances {
        if instance.ID != instanceID {
            result = append(result, instance)
        }
    }
    return result
}
 
// Usage example
client, err := NewServiceClient("consul.sindhan.ai:8500")
if err != nil {
    log.Fatal(err)
}
 
// Call payment service
response, err := client.CallService("payment-service", "/api/charge", "POST", paymentData)
if err != nil {
    log.Printf("Failed to call payment service: %v", err)
    return
}
defer response.Body.Close()

Implementation Roadmap

Phase 1: Foundation (Completed)

Status: ✅ Released v1.0.0

  • Basic service registration and discovery
  • HTTP and TCP health checks
  • DNS-based service resolution
  • Integration with Consul service registry
  • Basic load balancing algorithms
  • Kubernetes service integration

Phase 2: Advanced Features (Planned)

Status: 📋 Target v1.5.0 - Q2 2024

  • Advanced load balancing strategies (weighted, geo-aware)
  • Service mesh integration with Istio
  • Circuit breaker and retry mechanisms
  • Canary deployment support
  • Multi-region service discovery
  • Advanced health check types

Phase 3: Intelligence and Automation (Planned)

Status: 📋 Target v2.0.0 - Q3 2024

  • AI-powered traffic routing and optimization
  • Predictive scaling based on service discovery patterns
  • Automated service dependency mapping
  • Performance-based routing decisions
  • Service topology visualization
  • Advanced analytics and reporting

Benefits and Value

Operational Benefits

  • Dynamic Scaling: Automatic service instance discovery enables horizontal scaling
  • High Availability: Health checking and failover ensure service reliability
  • Zero Downtime: Rolling updates without service interruption
  • Load Distribution: Intelligent traffic distribution across service instances

Developer Benefits

  • Simplified Configuration: No hardcoded service endpoints
  • Service Abstraction: Services communicate by name, not IP addresses
  • Development Flexibility: Easy testing with different service configurations
  • Dependency Management: Clear service dependency visualization

Business Benefits

  • Cost Optimization: Efficient resource utilization through load balancing
  • Improved Performance: Reduced latency through intelligent routing
  • Enhanced Reliability: Automatic failover and health monitoring
  • Faster Development: Reduced complexity in service integration

Related Services

Direct Dependencies

Service Integrations

Consuming Services

  • All Microservices: Every service uses discovery for inter-service communication
  • Load Balancers: Dynamic backend configuration based on service discovery
  • API Gateway: Dynamic routing to backend services
  • Monitoring Systems: Service topology and health monitoring

The Service Discovery Service is essential for building scalable, resilient microservices architectures that can adapt dynamically to changing infrastructure and traffic patterns.