🚀Transform your business with AI-powered process optimization
Infrastructure Services
Data Persistence

Data Persistence Service

The Data Persistence Service provides comprehensive multi-model data storage and management capabilities across all Sindhan AI platform components. It offers unified data access patterns, ACID transactions, data modeling, and advanced data management features for relational, document, graph, and time-series data.

Overview and Purpose

Data Persistence is a foundational infrastructure service that abstracts and manages all data storage needs across the platform. It provides a unified data access layer that supports multiple data models while ensuring consistency, reliability, and performance at scale.

Key Benefits

  • Multi-Model Support: Relational, document, graph, and time-series data in one platform
  • ACID Transactions: Full transactional integrity across different data models
  • Unified API: Single interface for all data operations regardless of storage type
  • Automatic Scaling: Horizontal and vertical scaling based on demand
  • Data Governance: Comprehensive data lineage, privacy, and compliance controls
  • High Availability: Built-in replication, backup, and disaster recovery

Implementation Status

PhaseStatusDescription
Phase 1ImplementedPostgreSQL, MongoDB, basic CRUD operations, transactions
Phase 2ImplementedRedis caching, backup/restore, data modeling, performance optimization
Phase 3ImplementedNeo4j graph database, InfluxDB time-series, advanced query optimization

Current Version: v3.1.0 Next Release: v3.2.0 (Q2 2024)

Core Capabilities

1. Multi-Model Data Storage

  • Relational databases (PostgreSQL, MySQL) for structured data
  • Document databases (MongoDB, CouchDB) for semi-structured data
  • Graph databases (Neo4j) for relationship-heavy data
  • Time-series databases (InfluxDB, TimescaleDB) for temporal data
  • Key-value stores (Redis, etcd) for caching and session storage

2. Unified Data Access Layer

  • Single API for all data operations (CRUD, queries, transactions)
  • GraphQL and REST interfaces for flexible data access
  • ORM/ODM abstraction layer for different data models
  • Query optimization and execution planning
  • Connection pooling and resource management

3. Transaction Management

  • ACID transactions within single databases
  • Distributed transactions across multiple databases
  • Saga pattern for long-running business transactions
  • Compensation mechanisms for transaction rollback
  • Eventual consistency patterns for distributed data

4. Data Modeling and Schema Management

  • Dynamic schema evolution and versioning
  • Data validation and constraint enforcement
  • Relationship modeling across different data stores
  • Index management and query optimization
  • Data migration and transformation tools

5. Caching and Performance Optimization

  • Multi-level caching (L1: in-memory, L2: Redis, L3: database)
  • Query result caching and invalidation
  • Database connection pooling and optimization
  • Read replicas and load balancing
  • Performance monitoring and tuning

6. Backup, Recovery, and Data Management

  • Automated backup scheduling and retention policies
  • Point-in-time recovery and disaster recovery
  • Data archival and lifecycle management
  • Data encryption at rest and in transit
  • Compliance and audit trail management

Architecture

Integration Patterns

Unified Data Access API

from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum
import asyncio
 
class DataStoreType(Enum):
    RELATIONAL = "relational"
    DOCUMENT = "document"
    GRAPH = "graph"
    TIMESERIES = "timeseries"
    CACHE = "cache"
 
@dataclass
class QueryResult:
    data: List[Dict[str, Any]]
    total_count: int
    has_more: bool
    cursor: Optional[str] = None
 
@dataclass
class TransactionContext:
    transaction_id: str
    isolation_level: str
    timeout: int
    participants: List[str]
 
class DataPersistenceClient:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.adapters = {}
        self.transaction_manager = TransactionManager()
        self.cache_manager = CacheManager()
        self._initialize_adapters()
    
    def _initialize_adapters(self):
        """Initialize data store adapters"""
        if 'postgresql' in self.config:
            self.adapters['postgresql'] = PostgreSQLAdapter(self.config['postgresql'])
        if 'mongodb' in self.config:
            self.adapters['mongodb'] = MongoDBAdapter(self.config['mongodb'])
        if 'neo4j' in self.config:
            self.adapters['neo4j'] = Neo4jAdapter(self.config['neo4j'])
        if 'influxdb' in self.config:
            self.adapters['influxdb'] = InfluxDBAdapter(self.config['influxdb'])
        if 'redis' in self.config:
            self.adapters['redis'] = RedisAdapter(self.config['redis'])
    
    async def create(self, store_type: DataStoreType, collection: str, 
                    data: Union[Dict, List[Dict]], transaction_id: str = None) -> Dict[str, Any]:
        """Create new documents/records"""
        adapter = self._get_adapter(store_type)
        
        # Apply caching strategy
        cache_key = f"{store_type.value}:{collection}:create"
        
        try:
            if transaction_id:
                result = await adapter.create_with_transaction(collection, data, transaction_id)
            else:
                result = await adapter.create(collection, data)
            
            # Invalidate related cache
            await self.cache_manager.invalidate_pattern(f"{store_type.value}:{collection}:*")
            
            return result
        except Exception as e:
            raise DataPersistenceException(f"Create operation failed: {str(e)}")
    
    async def read(self, store_type: DataStoreType, collection: str,
                  filters: Dict[str, Any] = None, options: Dict[str, Any] = None) -> QueryResult:
        """Read documents/records with filtering and pagination"""
        adapter = self._get_adapter(store_type)
        
        # Check cache first
        cache_key = self._generate_cache_key(store_type, collection, filters, options)
        cached_result = await self.cache_manager.get(cache_key)
        
        if cached_result:
            return QueryResult(**cached_result)
        
        try:
            result = await adapter.read(collection, filters or {}, options or {})
            
            # Cache the result
            await self.cache_manager.set(cache_key, result.__dict__, ttl=300)
            
            return result
        except Exception as e:
            raise DataPersistenceException(f"Read operation failed: {str(e)}")
    
    async def update(self, store_type: DataStoreType, collection: str,
                    filters: Dict[str, Any], data: Dict[str, Any], 
                    transaction_id: str = None) -> Dict[str, Any]:
        """Update documents/records"""
        adapter = self._get_adapter(store_type)
        
        try:
            if transaction_id:
                result = await adapter.update_with_transaction(collection, filters, data, transaction_id)
            else:
                result = await adapter.update(collection, filters, data)
            
            # Invalidate related cache
            await self.cache_manager.invalidate_pattern(f"{store_type.value}:{collection}:*")
            
            return result
        except Exception as e:
            raise DataPersistenceException(f"Update operation failed: {str(e)}")
    
    async def delete(self, store_type: DataStoreType, collection: str,
                    filters: Dict[str, Any], transaction_id: str = None) -> Dict[str, Any]:
        """Delete documents/records"""
        adapter = self._get_adapter(store_type)
        
        try:
            if transaction_id:
                result = await adapter.delete_with_transaction(collection, filters, transaction_id)
            else:
                result = await adapter.delete(collection, filters)
            
            # Invalidate related cache
            await self.cache_manager.invalidate_pattern(f"{store_type.value}:{collection}:*")
            
            return result
        except Exception as e:
            raise DataPersistenceException(f"Delete operation failed: {str(e)}")
    
    async def query(self, store_type: DataStoreType, query: str, 
                   parameters: Dict[str, Any] = None) -> QueryResult:
        """Execute custom queries"""
        adapter = self._get_adapter(store_type)
        
        # Check if query is cacheable
        cache_key = f"{store_type.value}:query:{hash(query + str(parameters))}"
        cached_result = await self.cache_manager.get(cache_key)
        
        if cached_result and self._is_query_cacheable(query):
            return QueryResult(**cached_result)
        
        try:
            result = await adapter.execute_query(query, parameters or {})
            
            # Cache read-only queries
            if self._is_query_cacheable(query):
                await self.cache_manager.set(cache_key, result.__dict__, ttl=180)
            
            return result
        except Exception as e:
            raise DataPersistenceException(f"Query execution failed: {str(e)}")
    
    async def begin_transaction(self, stores: List[DataStoreType], 
                               isolation_level: str = "READ_COMMITTED",
                               timeout: int = 30) -> str:
        """Begin distributed transaction"""
        return await self.transaction_manager.begin_transaction(
            [self._get_adapter(store) for store in stores],
            isolation_level,
            timeout
        )
    
    async def commit_transaction(self, transaction_id: str) -> bool:
        """Commit distributed transaction"""
        return await self.transaction_manager.commit_transaction(transaction_id)
    
    async def rollback_transaction(self, transaction_id: str) -> bool:
        """Rollback distributed transaction"""
        return await self.transaction_manager.rollback_transaction(transaction_id)
    
    def _get_adapter(self, store_type: DataStoreType):
        """Get appropriate adapter for store type"""
        if store_type == DataStoreType.RELATIONAL:
            return self.adapters.get('postgresql') or self.adapters.get('mysql')
        elif store_type == DataStoreType.DOCUMENT:
            return self.adapters.get('mongodb')
        elif store_type == DataStoreType.GRAPH:
            return self.adapters.get('neo4j')
        elif store_type == DataStoreType.TIMESERIES:
            return self.adapters.get('influxdb')
        elif store_type == DataStoreType.CACHE:
            return self.adapters.get('redis')
        else:
            raise ValueError(f"Unsupported store type: {store_type}")
 
# Usage example
config = {
    'postgresql': {
        'host': 'postgres.sindhan.ai',
        'port': 5432,
        'database': 'sindhan_platform',
        'username': 'app_user',
        'password': 'secure_password',
        'pool_size': 20
    },
    'mongodb': {
        'uri': 'mongodb://mongo.sindhan.ai:27017/sindhan_platform',
        'options': {'maxPoolSize': 50}
    },
    'redis': {
        'host': 'redis.sindhan.ai',
        'port': 6379,
        'db': 0
    }
}
 
# Initialize client
data_client = DataPersistenceClient(config)
 
# Create user in relational database
user_data = {
    'username': 'john_doe',
    'email': 'john@example.com',
    'created_at': '2024-01-15T10:30:00Z'
}
 
user_result = await data_client.create(
    DataStoreType.RELATIONAL,
    'users',
    user_data
)
 
# Store user preferences in document database
preferences = {
    'user_id': user_result['id'],
    'theme': 'dark',
    'notifications': {
        'email': True,
        'push': False
    },
    'language': 'en'
}
 
await data_client.create(
    DataStoreType.DOCUMENT,
    'user_preferences',
    preferences
)
 
# Query users with caching
users = await data_client.read(
    DataStoreType.RELATIONAL,
    'users',
    filters={'active': True},
    options={'limit': 10, 'offset': 0, 'sort': 'created_at DESC'}
)

Transaction Management Across Multiple Stores

class DistributedTransactionManager:
    def __init__(self):
        self.active_transactions = {}
        self.saga_coordinator = SagaCoordinator()
    
    async def execute_saga(self, saga_definition: Dict[str, Any]) -> bool:
        """Execute a saga pattern for distributed transactions"""
        saga_id = str(uuid.uuid4())
        
        try:
            # Execute steps in sequence
            completed_steps = []
            
            for step in saga_definition['steps']:
                step_result = await self._execute_saga_step(step)
                completed_steps.append({
                    'step': step,
                    'result': step_result,
                    'compensation': step.get('compensation')
                })
            
            return True
            
        except Exception as e:
            # Execute compensation in reverse order
            await self._execute_compensation(completed_steps)
            raise SagaExecutionException(f"Saga execution failed: {str(e)}")
    
    async def _execute_saga_step(self, step: Dict[str, Any]) -> Any:
        """Execute individual saga step"""
        store_type = DataStoreType(step['store_type'])
        operation = step['operation']
        
        if operation == 'create':
            return await self.data_client.create(
                store_type,
                step['collection'],
                step['data']
            )
        elif operation == 'update':
            return await self.data_client.update(
                store_type,
                step['collection'],
                step['filters'],
                step['data']
            )
        elif operation == 'delete':
            return await self.data_client.delete(
                store_type,
                step['collection'],
                step['filters']
            )
    
    async def _execute_compensation(self, completed_steps: List[Dict[str, Any]]):
        """Execute compensation logic for failed saga"""
        for step_info in reversed(completed_steps):
            compensation = step_info.get('compensation')
            if compensation:
                try:
                    await self._execute_saga_step(compensation)
                except Exception as e:
                    # Log compensation failure but continue
                    logger.error(f"Compensation failed: {e}")
 
# Example: User registration saga
user_registration_saga = {
    'name': 'user_registration',
    'description': 'Register new user across multiple services',
    'steps': [
        {
            'name': 'create_user_account',
            'store_type': 'relational',
            'operation': 'create',
            'collection': 'users',
            'data': {
                'username': 'john_doe',
                'email': 'john@example.com',
                'status': 'pending'
            },
            'compensation': {
                'name': 'delete_user_account',
                'store_type': 'relational',
                'operation': 'delete',
                'collection': 'users',
                'filters': {'username': 'john_doe'}
            }
        },
        {
            'name': 'create_user_profile',
            'store_type': 'document',
            'operation': 'create',
            'collection': 'user_profiles',
            'data': {
                'username': 'john_doe',
                'profile': {'first_name': 'John', 'last_name': 'Doe'}
            },
            'compensation': {
                'name': 'delete_user_profile',
                'store_type': 'document',
                'operation': 'delete',
                'collection': 'user_profiles',
                'filters': {'username': 'john_doe'}
            }
        },
        {
            'name': 'send_welcome_email',
            'store_type': 'relational',
            'operation': 'create',
            'collection': 'email_queue',
            'data': {
                'to': 'john@example.com',
                'template': 'welcome',
                'status': 'pending'
            }
        },
        {
            'name': 'activate_user_account',
            'store_type': 'relational',
            'operation': 'update',
            'collection': 'users',
            'filters': {'username': 'john_doe'},
            'data': {'status': 'active'}
        }
    ]
}
 
# Execute saga
saga_manager = DistributedTransactionManager()
success = await saga_manager.execute_saga(user_registration_saga)

GraphQL Data Access Interface

import graphene
from graphene import ObjectType, String, Int, List, Field, Mutation, Schema
from typing import Dict, Any
 
class UserType(graphene.ObjectType):
    id = graphene.ID()
    username = graphene.String()
    email = graphene.String()
    created_at = graphene.DateTime()
    preferences = graphene.Field(lambda: UserPreferencesType)
    
    async def resolve_preferences(self, info):
        # Fetch from document database
        return await data_client.read(
            DataStoreType.DOCUMENT,
            'user_preferences',
            filters={'user_id': self.id}
        )
 
class UserPreferencesType(graphene.ObjectType):
    user_id = graphene.ID()
    theme = graphene.String()
    language = graphene.String()
    notifications = graphene.JSONString()
 
class Query(ObjectType):
    users = List(UserType, limit=Int(default_value=10), offset=Int(default_value=0))
    user = Field(UserType, id=graphene.ID(required=True))
    
    async def resolve_users(self, info, limit, offset):
        result = await data_client.read(
            DataStoreType.RELATIONAL,
            'users',
            options={'limit': limit, 'offset': offset}
        )
        return result.data
    
    async def resolve_user(self, info, id):
        result = await data_client.read(
            DataStoreType.RELATIONAL,
            'users',
            filters={'id': id}
        )
        return result.data[0] if result.data else None
 
class CreateUserMutation(Mutation):
    class Arguments:
        username = String(required=True)
        email = String(required=True)
        preferences = graphene.JSONString()
    
    user = Field(UserType)
    
    async def mutate(self, info, username, email, preferences=None):
        # Begin distributed transaction
        transaction_id = await data_client.begin_transaction([
            DataStoreType.RELATIONAL,
            DataStoreType.DOCUMENT
        ])
        
        try:
            # Create user
            user_result = await data_client.create(
                DataStoreType.RELATIONAL,
                'users',
                {'username': username, 'email': email},
                transaction_id=transaction_id
            )
            
            # Create preferences if provided
            if preferences:
                await data_client.create(
                    DataStoreType.DOCUMENT,
                    'user_preferences',
                    {'user_id': user_result['id'], **preferences},
                    transaction_id=transaction_id
                )
            
            # Commit transaction
            await data_client.commit_transaction(transaction_id)
            
            return CreateUserMutation(user=user_result)
            
        except Exception as e:
            await data_client.rollback_transaction(transaction_id)
            raise Exception(f"User creation failed: {str(e)}")
 
class Mutation(ObjectType):
    create_user = CreateUserMutation.Field()
 
# Create GraphQL schema
schema = Schema(query=Query, mutation=Mutation)
 
# Usage with GraphQL query
query = """
    query GetUsers($limit: Int, $offset: Int) {
        users(limit: $limit, offset: $offset) {
            id
            username
            email
            preferences {
                theme
                language
                notifications
            }
        }
    }
"""
 
variables = {"limit": 10, "offset": 0}
result = await schema.execute_async(query, variable_values=variables)

Implementation Roadmap

Phase 1: Foundation (Completed)

Status: ✅ Released v1.0.0

  • PostgreSQL and MySQL relational database support
  • MongoDB document database integration
  • Basic CRUD operations and query interface
  • Connection pooling and resource management
  • Database migrations and schema management
  • Basic backup and restore functionality

Phase 2: Advanced Features (Completed)

Status: ✅ Released v2.0.0

  • Redis caching layer integration
  • Transaction management and ACID compliance
  • Advanced query optimization and indexing
  • Data validation and constraint enforcement
  • Performance monitoring and tuning
  • Automated backup scheduling and retention

Phase 3: Multi-Model and Analytics (Completed)

Status: ✅ Released v3.0.0

  • Neo4j graph database integration
  • InfluxDB time-series database support
  • GraphQL API for flexible data access
  • Distributed transaction management
  • Advanced caching strategies
  • Data lifecycle management and archival

Phase 4: Intelligence and Automation (Planned)

Status: 📋 Target v4.0.0 - Q3 2024

  • AI-powered query optimization
  • Automated data partitioning and sharding
  • Predictive caching and pre-loading
  • Advanced data governance and compliance
  • Real-time data synchronization
  • Blockchain integration for immutable records

Benefits and Value

Performance Benefits

  • Multi-Level Caching: Significant reduction in database load and response times
  • Query Optimization: AI-powered query planning and execution optimization
  • Connection Pooling: Efficient resource utilization and improved throughput
  • Read Replicas: Horizontal scaling for read-heavy workloads

Developer Benefits

  • Unified API: Single interface for all data operations across different stores
  • Type Safety: Strong typing and schema validation
  • Transaction Support: ACID compliance across distributed systems
  • Flexible Querying: GraphQL and SQL support for complex data retrieval

Business Benefits

  • Data Consistency: Reliable data integrity across all platform operations
  • Scalability: Support for growing data volumes and user base
  • Cost Optimization: Efficient resource utilization and automated scaling
  • Compliance: Built-in data governance and audit capabilities

Related Services

Direct Dependencies

Service Integrations

Consuming Services

  • All Platform Services: Every service uses data persistence for storage needs
  • AI Agents: Training data, model parameters, and inference results storage
  • Analytics Services: Business data analysis and reporting
  • User Management: User profiles, preferences, and authentication data

The Data Persistence Service provides the reliable, scalable, and flexible data foundation that enables all other platform services to store, retrieve, and manage data efficiently and securely.