🚀Transform your business with AI-powered process optimization
Infrastructure Services
Search & Indexing

Search & Indexing Service

The Search & Indexing Service provides comprehensive full-text search, data indexing, and information retrieval capabilities across all Sindhan AI platform components. It enables fast, relevant search across structured and unstructured data with advanced features like semantic search, faceted navigation, and real-time indexing.

Overview and Purpose

Search & Indexing is a critical infrastructure service that makes all platform data searchable and discoverable. It provides enterprise-grade search capabilities including full-text search, faceted navigation, semantic search, and advanced analytics that enable users to quickly find relevant information across the entire platform.

Key Benefits

  • Universal Search: Search across all platform data sources and content types
  • Real-Time Indexing: Immediate search availability for new and updated content
  • Semantic Search: AI-powered understanding of search intent and context
  • High Performance: Sub-second search response times with scalable architecture
  • Advanced Analytics: Search analytics and content insights
  • Multi-Language Support: Search capabilities across multiple languages and locales

Implementation Status

PhaseStatusDescription
Phase 1ImplementedElasticsearch cluster, basic full-text search, document indexing
Phase 2🚧 In ProgressSemantic search, advanced analytics, search suggestions, faceted navigation
Phase 3📋 PlannedAI-powered search optimization, federated search, knowledge graphs

Current Version: v1.7.0 Next Release: v2.0.0 (Q2 2024)

Core Capabilities

1. Full-Text Search and Indexing

  • Real-time document indexing and search
  • Advanced query parsing and analysis
  • Relevance scoring and ranking algorithms
  • Boolean and phrase search capabilities
  • Wildcard and fuzzy search support

2. Semantic Search and AI Integration

  • Vector-based semantic search using embeddings
  • Natural language query processing
  • Intent recognition and query expansion
  • Contextual search recommendations
  • Knowledge graph integration

3. Faceted Search and Navigation

  • Dynamic facet generation based on content
  • Multi-dimensional filtering and navigation
  • Aggregation and statistical analysis
  • Drill-down and breadcrumb navigation
  • Custom facet configuration

4. Real-Time Data Synchronization

  • Event-driven indexing from data sources
  • Change detection and incremental updates
  • Bulk indexing for large datasets
  • Data pipeline integration
  • Index optimization and maintenance

5. Search Analytics and Insights

  • Search query analysis and trending
  • Click-through rate tracking
  • Content performance analytics
  • User behavior analysis
  • Search optimization recommendations

6. Multi-Source Federation

  • Federated search across multiple data sources
  • Cross-platform content aggregation
  • Unified search experience
  • Source-specific ranking and filtering
  • Distributed search coordination

Architecture

Integration Patterns

Advanced Search API Implementation

import asyncio
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
from elasticsearch import AsyncElasticsearch
import numpy as np
 
class SearchType(Enum):
    FULL_TEXT = "full_text"
    SEMANTIC = "semantic"
    HYBRID = "hybrid"
    FACETED = "faceted"
    AUTOCOMPLETE = "autocomplete"
 
class SortOrder(Enum):
    RELEVANCE = "relevance"
    DATE_DESC = "date_desc"
    DATE_ASC = "date_asc"
    ALPHABETICAL = "alphabetical"
    POPULARITY = "popularity"
 
@dataclass
class SearchRequest:
    query: str
    search_type: SearchType = SearchType.FULL_TEXT
    filters: Dict[str, Any] = field(default_factory=dict)
    facets: List[str] = field(default_factory=list)
    sort_order: SortOrder = SortOrder.RELEVANCE
    page: int = 1
    page_size: int = 20
    highlight: bool = True
    include_suggestions: bool = True
    boost_fields: Dict[str, float] = field(default_factory=dict)
    user_context: Dict[str, Any] = field(default_factory=dict)
 
@dataclass
class SearchResult:
    total_hits: int
    documents: List[Dict[str, Any]]
    facets: Dict[str, Any] = field(default_factory=dict)
    suggestions: List[str] = field(default_factory=list)
    query_time_ms: float = 0
    search_id: str = ""
    has_more: bool = False
 
class SearchEngine:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.es_client = AsyncElasticsearch(
            hosts=config['elasticsearch']['hosts'],
            http_auth=(config['elasticsearch']['username'], config['elasticsearch']['password']),
            use_ssl=config['elasticsearch'].get('use_ssl', True),
            verify_certs=config['elasticsearch'].get('verify_certs', True)
        )
        self.vector_client = self._initialize_vector_client(config)
        self.embedding_service = EmbeddingService(config.get('embedding_service'))
        
        # Search configuration
        self.default_indices = config.get('default_indices', ['documents', 'content'])
        self.max_page_size = config.get('max_page_size', 100)
        self.search_timeout = config.get('search_timeout', '30s')
    
    async def search(self, request: SearchRequest) -> SearchResult:
        """Execute search based on search type"""
        start_time = datetime.utcnow()
        
        try:
            if request.search_type == SearchType.SEMANTIC:
                result = await self._semantic_search(request)
            elif request.search_type == SearchType.HYBRID:
                result = await self._hybrid_search(request)
            elif request.search_type == SearchType.FACETED:
                result = await self._faceted_search(request)
            elif request.search_type == SearchType.AUTOCOMPLETE:
                result = await self._autocomplete_search(request)
            else:
                result = await self._full_text_search(request)
            
            # Add query performance metrics
            query_time = (datetime.utcnow() - start_time).total_seconds() * 1000
            result.query_time_ms = query_time
            result.search_id = f"search_{int(start_time.timestamp())}"
            
            # Track search analytics
            await self._track_search_analytics(request, result)
            
            return result
            
        except Exception as e:
            print(f"Search error: {e}")
            return SearchResult(total_hits=0, documents=[])
    
    async def _full_text_search(self, request: SearchRequest) -> SearchResult:
        """Execute full-text search using Elasticsearch"""
        
        # Build Elasticsearch query
        es_query = {
            "query": self._build_full_text_query(request),
            "highlight": self._build_highlight_config(request) if request.highlight else {},
            "sort": self._build_sort_config(request.sort_order),
            "from": (request.page - 1) * request.page_size,
            "size": min(request.page_size, self.max_page_size),
            "timeout": self.search_timeout
        }
        
        # Add aggregations for facets
        if request.facets:
            es_query["aggs"] = self._build_facet_aggregations(request.facets)
        
        # Execute search
        response = await self.es_client.search(
            index=",".join(self.default_indices),
            body=es_query
        )
        
        # Process results
        documents = []
        for hit in response['hits']['hits']:
            doc = hit['_source']
            doc['_score'] = hit['_score']
            doc['_id'] = hit['_id']
            
            # Add highlights
            if 'highlight' in hit:
                doc['_highlights'] = hit['highlight']
            
            documents.append(doc)
        
        # Process facets
        facets = {}
        if 'aggregations' in response:
            facets = self._process_facet_aggregations(response['aggregations'])
        
        # Generate suggestions
        suggestions = []
        if request.include_suggestions:
            suggestions = await self._generate_search_suggestions(request.query)
        
        return SearchResult(
            total_hits=response['hits']['total']['value'],
            documents=documents,
            facets=facets,
            suggestions=suggestions,
            has_more=(request.page * request.page_size) < response['hits']['total']['value']
        )
    
    async def _semantic_search(self, request: SearchRequest) -> SearchResult:
        """Execute semantic search using vector embeddings"""
        
        # Generate query embedding
        query_embedding = await self.embedding_service.generate_embedding(request.query)
        
        # Search for similar vectors
        vector_results = await self.vector_client.similarity_search(
            query_vector=query_embedding,
            limit=request.page_size,
            threshold=0.7
        )
        
        # Get document details from Elasticsearch
        doc_ids = [result['id'] for result in vector_results]
        
        if not doc_ids:
            return SearchResult(total_hits=0, documents=[])
        
        es_query = {
            "query": {
                "terms": {
                    "_id": doc_ids
                }
            },
            "size": len(doc_ids)
        }
        
        response = await self.es_client.search(
            index=",".join(self.default_indices),
            body=es_query
        )
        
        # Merge vector scores with document data
        documents = []
        score_map = {result['id']: result['score'] for result in vector_results}
        
        for hit in response['hits']['hits']:
            doc = hit['_source']
            doc['_id'] = hit['_id']
            doc['_score'] = score_map.get(hit['_id'], 0)
            doc['_semantic_score'] = score_map.get(hit['_id'], 0)
            documents.append(doc)
        
        # Sort by semantic score
        documents.sort(key=lambda x: x['_semantic_score'], reverse=True)
        
        return SearchResult(
            total_hits=len(documents),
            documents=documents,
            has_more=False  # Vector search typically returns all relevant results
        )
    
    async def _hybrid_search(self, request: SearchRequest) -> SearchResult:
        """Execute hybrid search combining full-text and semantic search"""
        
        # Execute both search types
        full_text_request = SearchRequest(**request.__dict__)
        full_text_request.search_type = SearchType.FULL_TEXT
        full_text_request.page_size = request.page_size * 2  # Get more for merging
        
        semantic_request = SearchRequest(**request.__dict__)
        semantic_request.search_type = SearchType.SEMANTIC
        semantic_request.page_size = request.page_size * 2
        
        full_text_result, semantic_result = await asyncio.gather(
            self._full_text_search(full_text_request),
            self._semantic_search(semantic_request)
        )
        
        # Merge and rank results
        merged_documents = await self._merge_search_results(
            full_text_result.documents,
            semantic_result.documents,
            request
        )
        
        # Apply pagination
        start_idx = (request.page - 1) * request.page_size
        end_idx = start_idx + request.page_size
        paginated_documents = merged_documents[start_idx:end_idx]
        
        return SearchResult(
            total_hits=len(merged_documents),
            documents=paginated_documents,
            facets=full_text_result.facets,  # Use facets from full-text search
            suggestions=full_text_result.suggestions,
            has_more=end_idx < len(merged_documents)
        )
    
    def _build_full_text_query(self, request: SearchRequest) -> Dict[str, Any]:
        """Build Elasticsearch query for full-text search"""
        
        # Base multi-match query
        query = {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": request.query,
                            "fields": self._get_search_fields(request),
                            "type": "best_fields",
                            "fuzziness": "AUTO",
                            "prefix_length": 2
                        }
                    }
                ],
                "filter": []
            }
        }
        
        # Add filters
        for field, value in request.filters.items():
            if isinstance(value, list):
                query["bool"]["filter"].append({
                    "terms": {field: value}
                })
            elif isinstance(value, dict):
                # Range filter
                query["bool"]["filter"].append({
                    "range": {field: value}
                })
            else:
                query["bool"]["filter"].append({
                    "term": {field: value}
                })
        
        return query
    
    def _get_search_fields(self, request: SearchRequest) -> List[str]:
        """Get search fields with boost values"""
        default_fields = [
            "title^3",
            "content^1",
            "tags^2",
            "description^1.5",
            "keywords^2"
        ]
        
        # Apply custom boosts
        if request.boost_fields:
            boosted_fields = []
            for field, boost in request.boost_fields.items():
                boosted_fields.append(f"{field}^{boost}")
            return boosted_fields
        
        return default_fields
    
    async def _generate_search_suggestions(self, query: str) -> List[str]:
        """Generate search suggestions based on query"""
        
        # Use completion suggester
        suggest_query = {
            "query_suggestions": {
                "text": query,
                "completion": {
                    "field": "suggest",
                    "size": 5,
                    "skip_duplicates": True
                }
            }
        }
        
        response = await self.es_client.search(
            index="search_suggestions",
            body={"suggest": suggest_query}
        )
        
        suggestions = []
        if 'suggest' in response:
            for suggestion in response['suggest']['query_suggestions'][0]['options']:
                suggestions.append(suggestion['text'])
        
        return suggestions
 
# Document indexing service
class DocumentIndexer:
    def __init__(self, search_engine: SearchEngine):
        self.search_engine = search_engine
        self.es_client = search_engine.es_client
        self.embedding_service = search_engine.embedding_service
        
        # Indexing configuration
        self.batch_size = 100
        self.index_mappings = self._get_index_mappings()
    
    async def index_document(self, index: str, document: Dict[str, Any], 
                           doc_id: str = None) -> bool:
        """Index a single document"""
        try:
            # Enrich document with metadata
            enriched_doc = await self._enrich_document(document)
            
            # Generate embeddings for semantic search
            if 'content' in enriched_doc:
                embedding = await self.embedding_service.generate_embedding(
                    enriched_doc['content']
                )
                enriched_doc['content_embedding'] = embedding.tolist()
            
            # Index document
            response = await self.es_client.index(
                index=index,
                id=doc_id,
                body=enriched_doc
            )
            
            # Index in vector database for semantic search
            if 'content_embedding' in enriched_doc:
                await self.search_engine.vector_client.index_vector(
                    doc_id or response['_id'],
                    enriched_doc['content_embedding'],
                    metadata={
                        'title': enriched_doc.get('title', ''),
                        'type': enriched_doc.get('type', 'document'),
                        'index': index
                    }
                )
            
            return True
            
        except Exception as e:
            print(f"Indexing error: {e}")
            return False
    
    async def bulk_index_documents(self, index: str, 
                                  documents: List[Dict[str, Any]]) -> Dict[str, int]:
        """Bulk index multiple documents"""
        success_count = 0
        error_count = 0
        
        # Process documents in batches
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            
            # Prepare bulk request
            bulk_body = []
            
            for doc in batch:
                # Enrich document
                enriched_doc = await self._enrich_document(doc)
                
                # Generate embeddings
                if 'content' in enriched_doc:
                    embedding = await self.embedding_service.generate_embedding(
                        enriched_doc['content']
                    )
                    enriched_doc['content_embedding'] = embedding.tolist()
                
                # Add to bulk request
                doc_id = doc.get('id') or f"{index}_{i + len(bulk_body)}"
                bulk_body.extend([
                    {"index": {"_index": index, "_id": doc_id}},
                    enriched_doc
                ])
            
            try:
                # Execute bulk request
                response = await self.es_client.bulk(body=bulk_body)
                
                # Process response
                for item in response['items']:
                    if 'index' in item:
                        if item['index'].get('status') in [200, 201]:
                            success_count += 1
                        else:
                            error_count += 1
                            
            except Exception as e:
                print(f"Bulk indexing error: {e}")
                error_count += len(batch)
        
        return {
            'success': success_count,
            'errors': error_count
        }
    
    async def _enrich_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
        """Enrich document with additional metadata"""
        enriched = document.copy()
        
        # Add timestamp
        enriched['indexed_at'] = datetime.utcnow().isoformat()
        
        # Add suggestions field for autocomplete
        if 'title' in enriched:
            enriched['suggest'] = {
                "input": [enriched['title']],
                "weight": 10
            }
        
        # Extract and classify content
        if 'content' in enriched:
            # Language detection
            language = await self._detect_language(enriched['content'])
            enriched['language'] = language
            
            # Content classification
            categories = await self._classify_content(enriched['content'])
            enriched['categories'] = categories
            
            # Entity extraction
            entities = await self._extract_entities(enriched['content'])
            enriched['entities'] = entities
        
        return enriched
 
# Usage example
search_config = {
    'elasticsearch': {
        'hosts': ['elasticsearch1.sindhan.ai:9200', 'elasticsearch2.sindhan.ai:9200'],
        'username': 'search_user',
        'password': 'secure_password',
        'use_ssl': True
    },
    'vector_database': {
        'type': 'pinecone',
        'api_key': 'your-pinecone-api-key',
        'environment': 'production'
    },
    'embedding_service': {
        'model': 'sentence-transformers/all-MiniLM-L6-v2',
        'batch_size': 32
    },
    'default_indices': ['documents', 'content', 'knowledge_base'],
    'max_page_size': 50
}
 
# Initialize search engine
search_engine = SearchEngine(search_config)
 
# Execute different types of searches
search_request = SearchRequest(
    query="artificial intelligence machine learning",
    search_type=SearchType.HYBRID,
    filters={
        'type': ['article', 'research_paper'],
        'published_date': {
            'gte': '2023-01-01',
            'lte': '2024-01-01'
        }
    },
    facets=['type', 'author', 'category'],
    sort_order=SortOrder.RELEVANCE,
    page=1,
    page_size=20,
    boost_fields={
        'title': 3.0,
        'abstract': 2.0,
        'content': 1.0
    }
)
 
result = await search_engine.search(search_request)
print(f"Found {result.total_hits} documents")
for doc in result.documents:
    print(f"- {doc.get('title', 'Untitled')} (Score: {doc['_score']:.2f})")

Implementation Roadmap

Phase 1: Foundation (Completed)

Status: ✅ Released v1.0.0

  • Elasticsearch cluster setup and configuration
  • Basic full-text search capabilities
  • Document indexing and real-time updates
  • Search API with filtering and pagination
  • Basic analytics and monitoring
  • Multi-index search support

Phase 2: Advanced Features (In Progress)

Status: 🚧 Target v2.0.0 - Q2 2024

  • Semantic search with vector embeddings
  • Faceted search and navigation
  • Advanced autocomplete and suggestions
  • Search analytics and optimization
  • Multi-language search support
  • Content classification and entity extraction

Phase 3: AI-Powered Search (Planned)

Status: 📋 Target v2.5.0 - Q3 2024

  • Neural search and ranking optimization
  • Knowledge graph integration
  • Federated search across multiple sources
  • Conversational search interfaces
  • Personalized search results
  • Advanced search analytics and insights

Benefits and Value

User Experience Benefits

  • Fast Discovery: Sub-second search response times across all content
  • Relevant Results: AI-powered ranking delivers the most relevant content first
  • Intuitive Navigation: Faceted search and filters make content exploration easy
  • Smart Suggestions: Autocomplete and query suggestions improve search efficiency

Content Management Benefits

  • Universal Searchability: All platform content is automatically searchable
  • Real-Time Updates: Content changes are immediately reflected in search results
  • Rich Metadata: Automated content enrichment with categories, entities, and classifications
  • Performance Insights: Analytics show content performance and user engagement

Business Benefits

  • Improved Productivity: Users find information faster, reducing time to insights
  • Enhanced User Engagement: Better search experience increases platform usage
  • Content ROI: Analytics show which content provides the most value
  • Competitive Advantage: Superior search capabilities differentiate the platform

Related Services

Direct Dependencies

Service Integrations

Consuming Services

  • All Platform Applications: Universal search across all applications
  • Knowledge Management: Enterprise knowledge base search
  • Content Management: Document and media search capabilities
  • Analytics Services: Search-driven business intelligence and insights

The Search & Indexing Service provides the discovery foundation that makes all platform content searchable, discoverable, and actionable for users across the entire Sindhan AI ecosystem.