Search & Indexing Service
The Search & Indexing Service provides comprehensive full-text search, data indexing, and information retrieval capabilities across all Sindhan AI platform components. It enables fast, relevant search across structured and unstructured data with advanced features like semantic search, faceted navigation, and real-time indexing.
Overview and Purpose
Search & Indexing is a critical infrastructure service that makes all platform data searchable and discoverable. It provides enterprise-grade search capabilities including full-text search, faceted navigation, semantic search, and advanced analytics that enable users to quickly find relevant information across the entire platform.
Key Benefits
- Universal Search: Search across all platform data sources and content types
- Real-Time Indexing: Immediate search availability for new and updated content
- Semantic Search: AI-powered understanding of search intent and context
- High Performance: Sub-second search response times with scalable architecture
- Advanced Analytics: Search analytics and content insights
- Multi-Language Support: Search capabilities across multiple languages and locales
Implementation Status
| Phase | Status | Description |
|---|---|---|
| Phase 1 | ✅ Implemented | Elasticsearch cluster, basic full-text search, document indexing |
| Phase 2 | 🚧 In Progress | Semantic search, advanced analytics, search suggestions, faceted navigation |
| Phase 3 | 📋 Planned | AI-powered search optimization, federated search, knowledge graphs |
Current Version: v1.7.0 Next Release: v2.0.0 (Q2 2024)
Core Capabilities
1. Full-Text Search and Indexing
- Real-time document indexing and search
- Advanced query parsing and analysis
- Relevance scoring and ranking algorithms
- Boolean and phrase search capabilities
- Wildcard and fuzzy search support
2. Semantic Search and AI Integration
- Vector-based semantic search using embeddings
- Natural language query processing
- Intent recognition and query expansion
- Contextual search recommendations
- Knowledge graph integration
3. Faceted Search and Navigation
- Dynamic facet generation based on content
- Multi-dimensional filtering and navigation
- Aggregation and statistical analysis
- Drill-down and breadcrumb navigation
- Custom facet configuration
4. Real-Time Data Synchronization
- Event-driven indexing from data sources
- Change detection and incremental updates
- Bulk indexing for large datasets
- Data pipeline integration
- Index optimization and maintenance
5. Search Analytics and Insights
- Search query analysis and trending
- Click-through rate tracking
- Content performance analytics
- User behavior analysis
- Search optimization recommendations
6. Multi-Source Federation
- Federated search across multiple data sources
- Cross-platform content aggregation
- Unified search experience
- Source-specific ranking and filtering
- Distributed search coordination
Architecture
Integration Patterns
Advanced Search API Implementation
import asyncio
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
from elasticsearch import AsyncElasticsearch
import numpy as np
class SearchType(Enum):
FULL_TEXT = "full_text"
SEMANTIC = "semantic"
HYBRID = "hybrid"
FACETED = "faceted"
AUTOCOMPLETE = "autocomplete"
class SortOrder(Enum):
RELEVANCE = "relevance"
DATE_DESC = "date_desc"
DATE_ASC = "date_asc"
ALPHABETICAL = "alphabetical"
POPULARITY = "popularity"
@dataclass
class SearchRequest:
query: str
search_type: SearchType = SearchType.FULL_TEXT
filters: Dict[str, Any] = field(default_factory=dict)
facets: List[str] = field(default_factory=list)
sort_order: SortOrder = SortOrder.RELEVANCE
page: int = 1
page_size: int = 20
highlight: bool = True
include_suggestions: bool = True
boost_fields: Dict[str, float] = field(default_factory=dict)
user_context: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SearchResult:
total_hits: int
documents: List[Dict[str, Any]]
facets: Dict[str, Any] = field(default_factory=dict)
suggestions: List[str] = field(default_factory=list)
query_time_ms: float = 0
search_id: str = ""
has_more: bool = False
class SearchEngine:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.es_client = AsyncElasticsearch(
hosts=config['elasticsearch']['hosts'],
http_auth=(config['elasticsearch']['username'], config['elasticsearch']['password']),
use_ssl=config['elasticsearch'].get('use_ssl', True),
verify_certs=config['elasticsearch'].get('verify_certs', True)
)
self.vector_client = self._initialize_vector_client(config)
self.embedding_service = EmbeddingService(config.get('embedding_service'))
# Search configuration
self.default_indices = config.get('default_indices', ['documents', 'content'])
self.max_page_size = config.get('max_page_size', 100)
self.search_timeout = config.get('search_timeout', '30s')
async def search(self, request: SearchRequest) -> SearchResult:
"""Execute search based on search type"""
start_time = datetime.utcnow()
try:
if request.search_type == SearchType.SEMANTIC:
result = await self._semantic_search(request)
elif request.search_type == SearchType.HYBRID:
result = await self._hybrid_search(request)
elif request.search_type == SearchType.FACETED:
result = await self._faceted_search(request)
elif request.search_type == SearchType.AUTOCOMPLETE:
result = await self._autocomplete_search(request)
else:
result = await self._full_text_search(request)
# Add query performance metrics
query_time = (datetime.utcnow() - start_time).total_seconds() * 1000
result.query_time_ms = query_time
result.search_id = f"search_{int(start_time.timestamp())}"
# Track search analytics
await self._track_search_analytics(request, result)
return result
except Exception as e:
print(f"Search error: {e}")
return SearchResult(total_hits=0, documents=[])
async def _full_text_search(self, request: SearchRequest) -> SearchResult:
"""Execute full-text search using Elasticsearch"""
# Build Elasticsearch query
es_query = {
"query": self._build_full_text_query(request),
"highlight": self._build_highlight_config(request) if request.highlight else {},
"sort": self._build_sort_config(request.sort_order),
"from": (request.page - 1) * request.page_size,
"size": min(request.page_size, self.max_page_size),
"timeout": self.search_timeout
}
# Add aggregations for facets
if request.facets:
es_query["aggs"] = self._build_facet_aggregations(request.facets)
# Execute search
response = await self.es_client.search(
index=",".join(self.default_indices),
body=es_query
)
# Process results
documents = []
for hit in response['hits']['hits']:
doc = hit['_source']
doc['_score'] = hit['_score']
doc['_id'] = hit['_id']
# Add highlights
if 'highlight' in hit:
doc['_highlights'] = hit['highlight']
documents.append(doc)
# Process facets
facets = {}
if 'aggregations' in response:
facets = self._process_facet_aggregations(response['aggregations'])
# Generate suggestions
suggestions = []
if request.include_suggestions:
suggestions = await self._generate_search_suggestions(request.query)
return SearchResult(
total_hits=response['hits']['total']['value'],
documents=documents,
facets=facets,
suggestions=suggestions,
has_more=(request.page * request.page_size) < response['hits']['total']['value']
)
async def _semantic_search(self, request: SearchRequest) -> SearchResult:
"""Execute semantic search using vector embeddings"""
# Generate query embedding
query_embedding = await self.embedding_service.generate_embedding(request.query)
# Search for similar vectors
vector_results = await self.vector_client.similarity_search(
query_vector=query_embedding,
limit=request.page_size,
threshold=0.7
)
# Get document details from Elasticsearch
doc_ids = [result['id'] for result in vector_results]
if not doc_ids:
return SearchResult(total_hits=0, documents=[])
es_query = {
"query": {
"terms": {
"_id": doc_ids
}
},
"size": len(doc_ids)
}
response = await self.es_client.search(
index=",".join(self.default_indices),
body=es_query
)
# Merge vector scores with document data
documents = []
score_map = {result['id']: result['score'] for result in vector_results}
for hit in response['hits']['hits']:
doc = hit['_source']
doc['_id'] = hit['_id']
doc['_score'] = score_map.get(hit['_id'], 0)
doc['_semantic_score'] = score_map.get(hit['_id'], 0)
documents.append(doc)
# Sort by semantic score
documents.sort(key=lambda x: x['_semantic_score'], reverse=True)
return SearchResult(
total_hits=len(documents),
documents=documents,
has_more=False # Vector search typically returns all relevant results
)
async def _hybrid_search(self, request: SearchRequest) -> SearchResult:
"""Execute hybrid search combining full-text and semantic search"""
# Execute both search types
full_text_request = SearchRequest(**request.__dict__)
full_text_request.search_type = SearchType.FULL_TEXT
full_text_request.page_size = request.page_size * 2 # Get more for merging
semantic_request = SearchRequest(**request.__dict__)
semantic_request.search_type = SearchType.SEMANTIC
semantic_request.page_size = request.page_size * 2
full_text_result, semantic_result = await asyncio.gather(
self._full_text_search(full_text_request),
self._semantic_search(semantic_request)
)
# Merge and rank results
merged_documents = await self._merge_search_results(
full_text_result.documents,
semantic_result.documents,
request
)
# Apply pagination
start_idx = (request.page - 1) * request.page_size
end_idx = start_idx + request.page_size
paginated_documents = merged_documents[start_idx:end_idx]
return SearchResult(
total_hits=len(merged_documents),
documents=paginated_documents,
facets=full_text_result.facets, # Use facets from full-text search
suggestions=full_text_result.suggestions,
has_more=end_idx < len(merged_documents)
)
def _build_full_text_query(self, request: SearchRequest) -> Dict[str, Any]:
"""Build Elasticsearch query for full-text search"""
# Base multi-match query
query = {
"bool": {
"must": [
{
"multi_match": {
"query": request.query,
"fields": self._get_search_fields(request),
"type": "best_fields",
"fuzziness": "AUTO",
"prefix_length": 2
}
}
],
"filter": []
}
}
# Add filters
for field, value in request.filters.items():
if isinstance(value, list):
query["bool"]["filter"].append({
"terms": {field: value}
})
elif isinstance(value, dict):
# Range filter
query["bool"]["filter"].append({
"range": {field: value}
})
else:
query["bool"]["filter"].append({
"term": {field: value}
})
return query
def _get_search_fields(self, request: SearchRequest) -> List[str]:
"""Get search fields with boost values"""
default_fields = [
"title^3",
"content^1",
"tags^2",
"description^1.5",
"keywords^2"
]
# Apply custom boosts
if request.boost_fields:
boosted_fields = []
for field, boost in request.boost_fields.items():
boosted_fields.append(f"{field}^{boost}")
return boosted_fields
return default_fields
async def _generate_search_suggestions(self, query: str) -> List[str]:
"""Generate search suggestions based on query"""
# Use completion suggester
suggest_query = {
"query_suggestions": {
"text": query,
"completion": {
"field": "suggest",
"size": 5,
"skip_duplicates": True
}
}
}
response = await self.es_client.search(
index="search_suggestions",
body={"suggest": suggest_query}
)
suggestions = []
if 'suggest' in response:
for suggestion in response['suggest']['query_suggestions'][0]['options']:
suggestions.append(suggestion['text'])
return suggestions
# Document indexing service
class DocumentIndexer:
def __init__(self, search_engine: SearchEngine):
self.search_engine = search_engine
self.es_client = search_engine.es_client
self.embedding_service = search_engine.embedding_service
# Indexing configuration
self.batch_size = 100
self.index_mappings = self._get_index_mappings()
async def index_document(self, index: str, document: Dict[str, Any],
doc_id: str = None) -> bool:
"""Index a single document"""
try:
# Enrich document with metadata
enriched_doc = await self._enrich_document(document)
# Generate embeddings for semantic search
if 'content' in enriched_doc:
embedding = await self.embedding_service.generate_embedding(
enriched_doc['content']
)
enriched_doc['content_embedding'] = embedding.tolist()
# Index document
response = await self.es_client.index(
index=index,
id=doc_id,
body=enriched_doc
)
# Index in vector database for semantic search
if 'content_embedding' in enriched_doc:
await self.search_engine.vector_client.index_vector(
doc_id or response['_id'],
enriched_doc['content_embedding'],
metadata={
'title': enriched_doc.get('title', ''),
'type': enriched_doc.get('type', 'document'),
'index': index
}
)
return True
except Exception as e:
print(f"Indexing error: {e}")
return False
async def bulk_index_documents(self, index: str,
documents: List[Dict[str, Any]]) -> Dict[str, int]:
"""Bulk index multiple documents"""
success_count = 0
error_count = 0
# Process documents in batches
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
# Prepare bulk request
bulk_body = []
for doc in batch:
# Enrich document
enriched_doc = await self._enrich_document(doc)
# Generate embeddings
if 'content' in enriched_doc:
embedding = await self.embedding_service.generate_embedding(
enriched_doc['content']
)
enriched_doc['content_embedding'] = embedding.tolist()
# Add to bulk request
doc_id = doc.get('id') or f"{index}_{i + len(bulk_body)}"
bulk_body.extend([
{"index": {"_index": index, "_id": doc_id}},
enriched_doc
])
try:
# Execute bulk request
response = await self.es_client.bulk(body=bulk_body)
# Process response
for item in response['items']:
if 'index' in item:
if item['index'].get('status') in [200, 201]:
success_count += 1
else:
error_count += 1
except Exception as e:
print(f"Bulk indexing error: {e}")
error_count += len(batch)
return {
'success': success_count,
'errors': error_count
}
async def _enrich_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich document with additional metadata"""
enriched = document.copy()
# Add timestamp
enriched['indexed_at'] = datetime.utcnow().isoformat()
# Add suggestions field for autocomplete
if 'title' in enriched:
enriched['suggest'] = {
"input": [enriched['title']],
"weight": 10
}
# Extract and classify content
if 'content' in enriched:
# Language detection
language = await self._detect_language(enriched['content'])
enriched['language'] = language
# Content classification
categories = await self._classify_content(enriched['content'])
enriched['categories'] = categories
# Entity extraction
entities = await self._extract_entities(enriched['content'])
enriched['entities'] = entities
return enriched
# Usage example
search_config = {
'elasticsearch': {
'hosts': ['elasticsearch1.sindhan.ai:9200', 'elasticsearch2.sindhan.ai:9200'],
'username': 'search_user',
'password': 'secure_password',
'use_ssl': True
},
'vector_database': {
'type': 'pinecone',
'api_key': 'your-pinecone-api-key',
'environment': 'production'
},
'embedding_service': {
'model': 'sentence-transformers/all-MiniLM-L6-v2',
'batch_size': 32
},
'default_indices': ['documents', 'content', 'knowledge_base'],
'max_page_size': 50
}
# Initialize search engine
search_engine = SearchEngine(search_config)
# Execute different types of searches
search_request = SearchRequest(
query="artificial intelligence machine learning",
search_type=SearchType.HYBRID,
filters={
'type': ['article', 'research_paper'],
'published_date': {
'gte': '2023-01-01',
'lte': '2024-01-01'
}
},
facets=['type', 'author', 'category'],
sort_order=SortOrder.RELEVANCE,
page=1,
page_size=20,
boost_fields={
'title': 3.0,
'abstract': 2.0,
'content': 1.0
}
)
result = await search_engine.search(search_request)
print(f"Found {result.total_hits} documents")
for doc in result.documents:
print(f"- {doc.get('title', 'Untitled')} (Score: {doc['_score']:.2f})")Implementation Roadmap
Phase 1: Foundation (Completed)
Status: ✅ Released v1.0.0
- Elasticsearch cluster setup and configuration
- Basic full-text search capabilities
- Document indexing and real-time updates
- Search API with filtering and pagination
- Basic analytics and monitoring
- Multi-index search support
Phase 2: Advanced Features (In Progress)
Status: 🚧 Target v2.0.0 - Q2 2024
- Semantic search with vector embeddings
- Faceted search and navigation
- Advanced autocomplete and suggestions
- Search analytics and optimization
- Multi-language search support
- Content classification and entity extraction
Phase 3: AI-Powered Search (Planned)
Status: 📋 Target v2.5.0 - Q3 2024
- Neural search and ranking optimization
- Knowledge graph integration
- Federated search across multiple sources
- Conversational search interfaces
- Personalized search results
- Advanced search analytics and insights
Benefits and Value
User Experience Benefits
- Fast Discovery: Sub-second search response times across all content
- Relevant Results: AI-powered ranking delivers the most relevant content first
- Intuitive Navigation: Faceted search and filters make content exploration easy
- Smart Suggestions: Autocomplete and query suggestions improve search efficiency
Content Management Benefits
- Universal Searchability: All platform content is automatically searchable
- Real-Time Updates: Content changes are immediately reflected in search results
- Rich Metadata: Automated content enrichment with categories, entities, and classifications
- Performance Insights: Analytics show content performance and user engagement
Business Benefits
- Improved Productivity: Users find information faster, reducing time to insights
- Enhanced User Engagement: Better search experience increases platform usage
- Content ROI: Analytics show which content provides the most value
- Competitive Advantage: Superior search capabilities differentiate the platform
Related Services
Direct Dependencies
- Data Persistence: Source data for indexing and search
- Configuration Management: Search configuration and index settings
- Platform Observability: Search performance monitoring
Service Integrations
- Event & Messaging: Real-time content update notifications
- Security & Authentication: Secure search with access control
- AI Agents: AI-powered search and content discovery
Consuming Services
- All Platform Applications: Universal search across all applications
- Knowledge Management: Enterprise knowledge base search
- Content Management: Document and media search capabilities
- Analytics Services: Search-driven business intelligence and insights
The Search & Indexing Service provides the discovery foundation that makes all platform content searchable, discoverable, and actionable for users across the entire Sindhan AI ecosystem.