Platform Observability Implementation Specification
This technical specification provides the detailed implementation design for the Platform Observability Service (sindhan-observability) Rust crate, showcasing a comprehensive observability framework with type-safe instrumentation and high-performance async design.
Design Patterns Applied
The implementation leverages multiple software design patterns optimized for observability:
- Builder Pattern: For configuration builders and instrumentation setup
- Factory Pattern: For exporter creation and format detection
- Observer Pattern: For metric collection and event notifications
- Strategy Pattern: For sampling strategies and export formats
- Decorator Pattern: For instrumentation wrapping and enhancement
- Registry Pattern: For metric and tracer management
- Pipeline Pattern: For export processing and batching
- Context Pattern: For correlation and trace propagation
Core Type System and Traits
Base Observability Traits
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use anyhow::{Result, Context};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use opentelemetry::trace::{TraceId, SpanId};
use uuid::Uuid;
/// Core trait defining observability provider behavior
/// **Pattern Applied: Strategy Pattern**
#[async_trait]
pub trait ObservabilityProvider: Send + Sync + 'static {
type MetricsType: MetricsCollector + Send + Sync;
type LoggingType: StructuredLogger + Send + Sync;
type TracingType: DistributedTracer + Send + Sync;
async fn metrics(&self) -> &Self::MetricsType;
async fn logging(&self) -> &Self::LoggingType;
async fn tracing(&self) -> &Self::TracingType;
async fn shutdown(&self) -> Result<()>;
async fn flush(&self) -> Result<()>;
}
/// Trait for metrics collection with type safety
/// **Pattern Applied: Registry Pattern**
pub trait MetricsCollector: Send + Sync {
fn create_counter(&self, name: &str, description: &str, labels: Labels) -> Counter;
fn create_gauge(&self, name: &str, description: &str, labels: Labels) -> Gauge;
fn create_histogram(&self, name: &str, description: &str, labels: Labels) -> Histogram;
fn create_summary(&self, name: &str, description: &str, labels: Labels) -> Summary;
fn register_metric<T: Metric>(&self, metric: T) -> Result<()>;
fn export_metrics(&self) -> Result<Vec<MetricFamily>>;
}
/// Trait for structured logging with correlation
/// **Pattern Applied: Context Pattern**
pub trait StructuredLogger: Send + Sync {
fn log(&self, level: LogLevel, message: &str, context: LogContext);
fn info(&self, message: &str, context: LogContext);
fn warn(&self, message: &str, context: LogContext);
fn error(&self, message: &str, context: LogContext);
fn debug(&self, message: &str, context: LogContext);
fn trace(&self, message: &str, context: LogContext);
fn with_correlation_id(&self, correlation_id: CorrelationId) -> Box<dyn StructuredLogger>;
fn with_trace_context(&self, trace_id: TraceId, span_id: SpanId) -> Box<dyn StructuredLogger>;
}
/// Trait for distributed tracing
/// **Pattern Applied: Decorator Pattern**
pub trait DistributedTracer: Send + Sync {
fn start_span(&self, name: &str) -> Span;
fn start_span_with_parent(&self, name: &str, parent: &SpanContext) -> Span;
fn current_span(&self) -> Option<Span>;
fn with_span<F, R>(&self, span: Span, f: F) -> R
where
F: FnOnce() -> R;
async fn flush(&self) -> Result<()>;
}Core Configuration System
/// Comprehensive observability configuration
/// **Pattern Applied: Builder Pattern**
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObservabilityConfig {
pub service_name: String,
pub service_version: String,
pub environment: String,
pub metrics: MetricsConfig,
pub logging: LoggingConfig,
pub tracing: TracingConfig,
pub export: ExportConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsConfig {
pub enabled: bool,
pub collection_interval: std::time::Duration,
pub prometheus_endpoint: Option<String>,
pub custom_metrics: Vec<CustomMetricConfig>,
pub sampling_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingConfig {
pub enabled: bool,
pub level: LogLevel,
pub format: LogFormat,
pub structured: bool,
pub correlation_tracking: bool,
pub outputs: Vec<LogOutput>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingConfig {
pub enabled: bool,
pub sampling_strategy: SamplingStrategy,
pub jaeger_endpoint: Option<String>,
pub otlp_endpoint: Option<String>,
pub batch_export: bool,
pub max_export_batch_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportConfig {
pub prometheus: PrometheusExportConfig,
pub jaeger: JaegerExportConfig,
pub otlp: OtlpExportConfig,
pub custom: Vec<CustomExportConfig>,
}
/// Builder for observability configuration
/// **Pattern Applied: Builder Pattern**
pub struct ObservabilityConfigBuilder {
service_name: Option<String>,
service_version: Option<String>,
environment: Option<String>,
metrics_config: Option<MetricsConfig>,
logging_config: Option<LoggingConfig>,
tracing_config: Option<TracingConfig>,
export_config: Option<ExportConfig>,
}
impl ObservabilityConfigBuilder {
pub fn new() -> Self {
Self {
service_name: None,
service_version: None,
environment: None,
metrics_config: None,
logging_config: None,
tracing_config: None,
export_config: None,
}
}
pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = Some(name.into());
self
}
pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
self.service_version = Some(version.into());
self
}
pub fn with_environment(mut self, env: impl Into<String>) -> Self {
self.environment = Some(env.into());
self
}
pub fn with_metrics(mut self, config: MetricsConfig) -> Self {
self.metrics_config = Some(config);
self
}
pub fn with_logging(mut self, config: LoggingConfig) -> Self {
self.logging_config = Some(config);
self
}
pub fn with_tracing(mut self, config: TracingConfig) -> Self {
self.tracing_config = Some(config);
self
}
pub fn with_export(mut self, config: ExportConfig) -> Self {
self.export_config = Some(config);
self
}
pub fn build(self) -> Result<ObservabilityConfig> {
Ok(ObservabilityConfig {
service_name: self.service_name.ok_or_else(|| anyhow::anyhow!("Service name is required"))?,
service_version: self.service_version.unwrap_or_else(|| "unknown".to_string()),
environment: self.environment.unwrap_or_else(|| "development".to_string()),
metrics: self.metrics_config.unwrap_or_default(),
logging: self.logging_config.unwrap_or_default(),
tracing: self.tracing_config.unwrap_or_default(),
export: self.export_config.unwrap_or_default(),
})
}
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
enabled: true,
collection_interval: std::time::Duration::from_secs(15),
prometheus_endpoint: Some("0.0.0.0:9090".to_string()),
custom_metrics: Vec::new(),
sampling_rate: 1.0,
}
}
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
enabled: true,
level: LogLevel::Info,
format: LogFormat::Json,
structured: true,
correlation_tracking: true,
outputs: vec![LogOutput::Stdout],
}
}
}
impl Default for TracingConfig {
fn default() -> Self {
Self {
enabled: true,
sampling_strategy: SamplingStrategy::Probabilistic(1.0),
jaeger_endpoint: None,
otlp_endpoint: None,
batch_export: true,
max_export_batch_size: 512,
}
}
}
impl Default for ExportConfig {
fn default() -> Self {
Self {
prometheus: PrometheusExportConfig::default(),
jaeger: JaegerExportConfig::default(),
otlp: OtlpExportConfig::default(),
custom: Vec::new(),
}
}
}Metrics System Implementation
/// High-performance metrics registry with type safety
/// **Pattern Applied: Registry Pattern + Factory Pattern**
pub struct MetricsRegistry {
counters: Arc<RwLock<HashMap<String, Arc<Counter>>>>,
gauges: Arc<RwLock<HashMap<String, Arc<Gauge>>>>,
histograms: Arc<RwLock<HashMap<String, Arc<Histogram>>>>,
summaries: Arc<RwLock<HashMap<String, Arc<Summary>>>>,
exporters: Vec<Box<dyn MetricsExporter>>,
config: MetricsConfig,
}
impl MetricsRegistry {
pub fn new(config: MetricsConfig) -> Self {
Self {
counters: Arc::new(RwLock::new(HashMap::new())),
gauges: Arc::new(RwLock::new(HashMap::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
summaries: Arc::new(RwLock::new(HashMap::new())),
exporters: Vec::new(),
config,
}
}
pub async fn create_counter(
&self,
name: &str,
description: &str,
labels: Labels,
) -> Counter {
let key = format!("{}_{}", name, labels.fingerprint());
{
let counters = self.counters.read().await;
if let Some(counter) = counters.get(&key) {
return counter.as_ref().clone();
}
}
let counter = Counter::new(name, description, labels);
let arc_counter = Arc::new(counter.clone());
{
let mut counters = self.counters.write().await;
counters.insert(key, arc_counter);
}
counter
}
pub async fn create_gauge(
&self,
name: &str,
description: &str,
labels: Labels,
) -> Gauge {
let key = format!("{}_{}", name, labels.fingerprint());
{
let gauges = self.gauges.read().await;
if let Some(gauge) = gauges.get(&key) {
return gauge.as_ref().clone();
}
}
let gauge = Gauge::new(name, description, labels);
let arc_gauge = Arc::new(gauge.clone());
{
let mut gauges = self.gauges.write().await;
gauges.insert(key, arc_gauge);
}
gauge
}
pub async fn create_histogram(
&self,
name: &str,
description: &str,
labels: Labels,
) -> Histogram {
let key = format!("{}_{}", name, labels.fingerprint());
{
let histograms = self.histograms.read().await;
if let Some(histogram) = histograms.get(&key) {
return histogram.as_ref().clone();
}
}
let histogram = Histogram::new(name, description, labels);
let arc_histogram = Arc::new(histogram.clone());
{
let mut histograms = self.histograms.write().await;
histograms.insert(key, arc_histogram);
}
histogram
}
pub async fn export_metrics(&self) -> Result<Vec<MetricFamily>> {
let mut families = Vec::new();
// Export counters
{
let counters = self.counters.read().await;
for counter in counters.values() {
families.push(counter.to_metric_family());
}
}
// Export gauges
{
let gauges = self.gauges.read().await;
for gauge in gauges.values() {
families.push(gauge.to_metric_family());
}
}
// Export histograms
{
let histograms = self.histograms.read().await;
for histogram in histograms.values() {
families.push(histogram.to_metric_family());
}
}
// Export summaries
{
let summaries = self.summaries.read().await;
for summary in summaries.values() {
families.push(summary.to_metric_family());
}
}
Ok(families)
}
pub async fn flush_to_exporters(&self) -> Result<()> {
let metric_families = self.export_metrics().await?;
for exporter in &self.exporters {
exporter.export(&metric_families).await
.with_context(|| "Failed to export metrics")?;
}
Ok(())
}
}
/// Thread-safe counter implementation with atomic operations
#[derive(Debug, Clone)]
pub struct Counter {
name: String,
description: String,
labels: Labels,
value: Arc<std::sync::atomic::AtomicU64>,
}
impl Counter {
pub fn new(name: &str, description: &str, labels: Labels) -> Self {
Self {
name: name.to_string(),
description: description.to_string(),
labels,
value: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn increment(&self, labels: Labels) {
self.add(1, labels);
}
pub fn add(&self, value: u64, _labels: Labels) {
self.value.fetch_add(value, std::sync::atomic::Ordering::Relaxed);
}
pub fn get(&self) -> u64 {
self.value.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn to_metric_family(&self) -> MetricFamily {
MetricFamily {
name: self.name.clone(),
description: self.description.clone(),
metric_type: MetricType::Counter,
metrics: vec![Metric {
labels: self.labels.clone(),
value: MetricValue::Counter(self.get()),
}],
}
}
}
/// Thread-safe gauge implementation
#[derive(Debug, Clone)]
pub struct Gauge {
name: String,
description: String,
labels: Labels,
value: Arc<std::sync::RwLock<f64>>,
}
impl Gauge {
pub fn new(name: &str, description: &str, labels: Labels) -> Self {
Self {
name: name.to_string(),
description: description.to_string(),
labels,
value: Arc::new(std::sync::RwLock::new(0.0)),
}
}
pub fn set(&self, value: f64, _labels: Labels) {
if let Ok(mut v) = self.value.write() {
*v = value;
}
}
pub fn increment(&self, _labels: Labels) {
self.add(1.0, _labels);
}
pub fn decrement(&self, _labels: Labels) {
self.add(-1.0, _labels);
}
pub fn add(&self, value: f64, _labels: Labels) {
if let Ok(mut v) = self.value.write() {
*v += value;
}
}
pub fn get(&self) -> f64 {
self.value.read().unwrap_or_else(|_| std::sync::RwLock::new(0.0).read().unwrap()).clone()
}
pub fn to_metric_family(&self) -> MetricFamily {
MetricFamily {
name: self.name.clone(),
description: self.description.clone(),
metric_type: MetricType::Gauge,
metrics: vec![Metric {
labels: self.labels.clone(),
value: MetricValue::Gauge(self.get()),
}],
}
}
}
/// Lock-free histogram implementation with configurable buckets
#[derive(Debug, Clone)]
pub struct Histogram {
name: String,
description: String,
labels: Labels,
buckets: Vec<f64>,
bucket_counts: Vec<Arc<std::sync::atomic::AtomicU64>>,
sum: Arc<std::sync::atomic::AtomicU64>, // Store as integer for atomic operations
count: Arc<std::sync::atomic::AtomicU64>,
}
impl Histogram {
pub fn new(name: &str, description: &str, labels: Labels) -> Self {
let buckets = vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, f64::INFINITY
];
let bucket_counts = buckets.iter()
.map(|_| Arc::new(std::sync::atomic::AtomicU64::new(0)))
.collect();
Self {
name: name.to_string(),
description: description.to_string(),
labels,
buckets,
bucket_counts,
sum: Arc::new(std::sync::atomic::AtomicU64::new(0)),
count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn record(&self, value: f64, _labels: Labels) {
// Convert to integer representation for atomic operations
let value_bits = value.to_bits();
// Update sum and count
self.sum.fetch_add(value_bits, std::sync::atomic::Ordering::Relaxed);
self.count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Update bucket counts
for (i, &bucket) in self.buckets.iter().enumerate() {
if value <= bucket {
self.bucket_counts[i].fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
}
pub fn get_bucket_counts(&self) -> Vec<u64> {
self.bucket_counts.iter()
.map(|count| count.load(std::sync::atomic::Ordering::Relaxed))
.collect()
}
pub fn get_sum(&self) -> f64 {
let sum_bits = self.sum.load(std::sync::atomic::Ordering::Relaxed);
f64::from_bits(sum_bits)
}
pub fn get_count(&self) -> u64 {
self.count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn to_metric_family(&self) -> MetricFamily {
let bucket_counts = self.get_bucket_counts();
let buckets: Vec<Bucket> = self.buckets.iter()
.zip(bucket_counts.iter())
.map(|(&upper_bound, &count)| Bucket {
upper_bound,
cumulative_count: count,
})
.collect();
MetricFamily {
name: self.name.clone(),
description: self.description.clone(),
metric_type: MetricType::Histogram,
metrics: vec![Metric {
labels: self.labels.clone(),
value: MetricValue::Histogram(HistogramValue {
sample_count: self.get_count(),
sample_sum: self.get_sum(),
buckets,
}),
}],
}
}
}Structured Logging Implementation
/// High-performance structured logger with correlation tracking
/// **Pattern Applied: Context Pattern + Observer Pattern**
pub struct StructuredLogger {
service_name: String,
service_version: String,
environment: String,
level: LogLevel,
format: LogFormat,
outputs: Vec<Box<dyn LogOutput>>,
correlation_id: Option<CorrelationId>,
trace_context: Option<TraceContext>,
}
impl StructuredLogger {
pub fn builder() -> StructuredLoggerBuilder {
StructuredLoggerBuilder::new()
}
pub fn log(&self, level: LogLevel, message: &str, context: LogContext) {
if level < self.level {
return;
}
let log_entry = LogEntry::builder()
.with_timestamp(chrono::Utc::now())
.with_level(level)
.with_service_name(&self.service_name)
.with_service_version(&self.service_version)
.with_environment(&self.environment)
.with_message(message)
.with_context(context)
.with_correlation_id(self.correlation_id)
.with_trace_context(self.trace_context)
.build();
let formatted = match self.format {
LogFormat::Json => self.format_json(&log_entry),
LogFormat::Text => self.format_text(&log_entry),
LogFormat::Logfmt => self.format_logfmt(&log_entry),
};
for output in &self.outputs {
output.write(&formatted);
}
}
pub fn info(&self, message: &str, context: LogContext) {
self.log(LogLevel::Info, message, context);
}
pub fn warn(&self, message: &str, context: LogContext) {
self.log(LogLevel::Warn, message, context);
}
pub fn error(&self, message: &str, context: LogContext) {
self.log(LogLevel::Error, message, context);
}
pub fn debug(&self, message: &str, context: LogContext) {
self.log(LogLevel::Debug, message, context);
}
pub fn trace(&self, message: &str, context: LogContext) {
self.log(LogLevel::Trace, message, context);
}
fn format_json(&self, entry: &LogEntry) -> String {
serde_json::to_string(entry).unwrap_or_else(|_| {
format!(r#"{{"error": "Failed to serialize log entry", "message": "{}"}}"#, entry.message)
})
}
fn format_text(&self, entry: &LogEntry) -> String {
format!(
"{} [{}] {} - {} {}\n",
entry.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
entry.level,
entry.service_name,
entry.message,
entry.context.to_string()
)
}
fn format_logfmt(&self, entry: &LogEntry) -> String {
format!(
"ts={} level={} service={} msg=\"{}\" {}\n",
entry.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ"),
entry.level,
entry.service_name,
entry.message,
entry.context.to_logfmt()
)
}
}
impl StructuredLogger {
pub fn with_correlation_id(&self, correlation_id: CorrelationId) -> Box<dyn StructuredLogger> {
let mut logger = self.clone();
logger.correlation_id = Some(correlation_id);
Box::new(logger)
}
pub fn with_trace_context(&self, trace_id: TraceId, span_id: SpanId) -> Box<dyn StructuredLogger> {
let mut logger = self.clone();
logger.trace_context = Some(TraceContext { trace_id, span_id });
Box::new(logger)
}
}
/// Builder for structured logger configuration
/// **Pattern Applied: Builder Pattern**
pub struct StructuredLoggerBuilder {
service_name: Option<String>,
service_version: Option<String>,
environment: Option<String>,
level: LogLevel,
format: LogFormat,
outputs: Vec<Box<dyn LogOutput>>,
}
impl StructuredLoggerBuilder {
pub fn new() -> Self {
Self {
service_name: None,
service_version: None,
environment: None,
level: LogLevel::Info,
format: LogFormat::Json,
outputs: vec![Box::new(StdoutOutput::new())],
}
}
pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = Some(name.into());
self
}
pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
self.service_version = Some(version.into());
self
}
pub fn with_environment(mut self, env: impl Into<String>) -> Self {
self.environment = Some(env.into());
self
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn with_json_format(mut self) -> Self {
self.format = LogFormat::Json;
self
}
pub fn with_text_format(mut self) -> Self {
self.format = LogFormat::Text;
self
}
pub fn add_output(mut self, output: Box<dyn LogOutput>) -> Self {
self.outputs.push(output);
self
}
pub fn build(self) -> Result<StructuredLogger> {
Ok(StructuredLogger {
service_name: self.service_name.ok_or_else(|| anyhow::anyhow!("Service name is required"))?,
service_version: self.service_version.unwrap_or_else(|| "unknown".to_string()),
environment: self.environment.unwrap_or_else(|| "development".to_string()),
level: self.level,
format: self.format,
outputs: self.outputs,
correlation_id: None,
trace_context: None,
})
}
}
/// Log context for structured fields
#[derive(Debug, Clone, Serialize)]
pub struct LogContext {
fields: HashMap<String, serde_json::Value>,
correlation_id: Option<CorrelationId>,
trace_id: Option<TraceId>,
span_id: Option<SpanId>,
user_id: Option<String>,
request_id: Option<String>,
}
impl LogContext {
pub fn new() -> Self {
Self {
fields: HashMap::new(),
correlation_id: None,
trace_id: None,
span_id: None,
user_id: None,
request_id: None,
}
}
pub fn with_field<T: Serialize>(mut self, key: &str, value: T) -> Self {
self.fields.insert(
key.to_string(),
serde_json::to_value(value).unwrap_or(serde_json::Value::Null),
);
self
}
pub fn with_correlation_id(mut self, correlation_id: CorrelationId) -> Self {
self.correlation_id = Some(correlation_id);
self
}
pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
self.trace_id = Some(trace_id);
self
}
pub fn with_span_id(mut self, span_id: SpanId) -> Self {
self.span_id = Some(span_id);
self
}
pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
self.user_id = Some(user_id.into());
self
}
pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
self.request_id = Some(request_id.into());
self
}
pub fn with_error(mut self, error: &dyn std::error::Error) -> Self {
self.fields.insert(
"error".to_string(),
serde_json::json!({
"type": error.type_name(),
"message": error.to_string(),
"chain": error.chain().map(|e| e.to_string()).collect::<Vec<_>>(),
}),
);
self
}
pub fn with_structured_data<T: Serialize>(mut self, data: &T) -> Self {
if let Ok(value) = serde_json::to_value(data) {
if let serde_json::Value::Object(map) = value {
for (key, value) in map {
self.fields.insert(key, value);
}
}
}
self
}
pub fn to_logfmt(&self) -> String {
let mut parts = Vec::new();
if let Some(correlation_id) = &self.correlation_id {
parts.push(format!("correlation_id={}", correlation_id));
}
if let Some(trace_id) = &self.trace_id {
parts.push(format!("trace_id={}", trace_id));
}
if let Some(span_id) = &self.span_id {
parts.push(format!("span_id={}", span_id));
}
if let Some(user_id) = &self.user_id {
parts.push(format!("user_id=\"{}\"", user_id));
}
if let Some(request_id) = &self.request_id {
parts.push(format!("request_id=\"{}\"", request_id));
}
for (key, value) in &self.fields {
match value {
serde_json::Value::String(s) => parts.push(format!("{}=\"{}\"", key, s)),
serde_json::Value::Number(n) => parts.push(format!("{}={}", key, n)),
serde_json::Value::Bool(b) => parts.push(format!("{}={}", key, b)),
_ => parts.push(format!("{}=\"{}\"", key, value)),
}
}
parts.join(" ")
}
}
impl Default for LogContext {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for LogContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_logfmt())
}
}Distributed Tracing Implementation
/// High-performance distributed tracer with OpenTelemetry compatibility
/// **Pattern Applied: Decorator Pattern + Context Pattern**
pub struct DistributedTracer {
service_name: String,
service_version: String,
tracer_provider: Arc<TracerProvider>,
span_processor: Arc<dyn SpanProcessor>,
sampler: Box<dyn Sampler>,
resource: Arc<Resource>,
}
impl DistributedTracer {
pub fn builder() -> DistributedTracerBuilder {
DistributedTracerBuilder::new()
}
pub fn start_span(&self, name: &str) -> Span {
self.start_span_with_builder(SpanBuilder::new(name))
}
pub fn start_span_with_parent(&self, name: &str, parent: &SpanContext) -> Span {
self.start_span_with_builder(
SpanBuilder::new(name).with_parent_context(parent.clone())
)
}
pub fn start_span_with_builder(&self, builder: SpanBuilder) -> Span {
let span_context = SpanContext::new(
TraceId::generate(),
SpanId::generate(),
TraceFlags::default(),
false,
TraceState::default(),
);
let span = Span::new(
span_context,
builder.name,
builder.attributes,
builder.links,
builder.start_time.unwrap_or_else(|| SystemTime::now()),
self.span_processor.clone(),
);
// Apply sampling decision
if let SamplingResult::NotSampled = self.sampler.should_sample(&span) {
return Span::non_recording();
}
span
}
pub fn current_span(&self) -> Option<Span> {
CURRENT_SPAN.with(|span| span.borrow().clone())
}
pub fn with_span<F, R>(&self, span: Span, f: F) -> R
where
F: FnOnce() -> R,
{
CURRENT_SPAN.with(|current| {
let _guard = SpanGuard::new(current, Some(span));
f()
})
}
pub async fn flush(&self) -> Result<()> {
self.span_processor.force_flush().await
}
}
/// Span implementation with automatic lifecycle management
#[derive(Debug, Clone)]
pub struct Span {
context: SpanContext,
name: String,
attributes: HashMap<String, AttributeValue>,
events: Vec<Event>,
status: SpanStatus,
start_time: SystemTime,
end_time: Option<SystemTime>,
span_processor: Arc<dyn SpanProcessor>,
recording: bool,
}
impl Span {
pub fn new(
context: SpanContext,
name: String,
attributes: HashMap<String, AttributeValue>,
_links: Vec<Link>,
start_time: SystemTime,
span_processor: Arc<dyn SpanProcessor>,
) -> Self {
Self {
context,
name,
attributes,
events: Vec::new(),
status: SpanStatus::Unset,
start_time,
end_time: None,
span_processor,
recording: true,
}
}
pub fn non_recording() -> Self {
Self {
context: SpanContext::empty_context(),
name: String::new(),
attributes: HashMap::new(),
events: Vec::new(),
status: SpanStatus::Unset,
start_time: SystemTime::now(),
end_time: None,
span_processor: Arc::new(NoopSpanProcessor),
recording: false,
}
}
pub fn set_attribute<T: Into<AttributeValue>>(&mut self, key: &str, value: T) {
if self.recording {
self.attributes.insert(key.to_string(), value.into());
}
}
pub fn add_event(&mut self, name: &str, attributes: HashMap<String, AttributeValue>) {
if self.recording {
self.events.push(Event {
name: name.to_string(),
timestamp: SystemTime::now(),
attributes,
});
}
}
pub fn record_exception(&mut self, exception: &dyn std::error::Error) {
if self.recording {
let mut attributes = HashMap::new();
attributes.insert("exception.type".to_string(), exception.type_name().into());
attributes.insert("exception.message".to_string(), exception.to_string().into());
if let Some(source) = exception.source() {
attributes.insert("exception.cause".to_string(), source.to_string().into());
}
self.add_event("exception", attributes);
}
}
pub fn set_status(&mut self, status: SpanStatus) {
if self.recording {
self.status = status;
}
}
pub fn end(&mut self) {
if self.recording && self.end_time.is_none() {
self.end_time = Some(SystemTime::now());
self.span_processor.on_end(self.clone());
}
}
pub fn enter(&self) -> SpanGuard {
CURRENT_SPAN.with(|current| {
SpanGuard::new(current, Some(self.clone()))
})
}
pub fn context(&self) -> &SpanContext {
&self.context
}
pub fn is_recording(&self) -> bool {
self.recording
}
}
impl Drop for Span {
fn drop(&mut self) {
self.end();
}
}
/// Automatic span lifecycle management
pub struct SpanGuard {
previous: Option<Span>,
current_slot: *const std::cell::RefCell<Option<Span>>,
}
impl SpanGuard {
fn new(
current_slot: &std::cell::RefCell<Option<Span>>,
span: Option<Span>,
) -> Self {
let previous = current_slot.borrow().clone();
*current_slot.borrow_mut() = span;
Self {
previous,
current_slot: current_slot as *const _,
}
}
}
impl Drop for SpanGuard {
fn drop(&mut self) {
unsafe {
*(*self.current_slot).borrow_mut() = self.previous.take();
}
}
}
thread_local! {
static CURRENT_SPAN: std::cell::RefCell<Option<Span>> = std::cell::RefCell::new(None);
}
/// Span builder for flexible span creation
/// **Pattern Applied: Builder Pattern**
#[derive(Debug, Clone)]
pub struct SpanBuilder {
name: String,
parent_context: Option<SpanContext>,
span_kind: SpanKind,
attributes: HashMap<String, AttributeValue>,
links: Vec<Link>,
start_time: Option<SystemTime>,
}
impl SpanBuilder {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
parent_context: None,
span_kind: SpanKind::Internal,
attributes: HashMap::new(),
links: Vec::new(),
start_time: None,
}
}
pub fn with_parent_context(mut self, context: SpanContext) -> Self {
self.parent_context = Some(context);
self
}
pub fn with_kind(mut self, kind: SpanKind) -> Self {
self.span_kind = kind;
self
}
pub fn with_attribute<T: Into<AttributeValue>>(mut self, key: &str, value: T) -> Self {
self.attributes.insert(key.to_string(), value.into());
self
}
pub fn with_start_time(mut self, start_time: SystemTime) -> Self {
self.start_time = Some(start_time);
self
}
pub fn start(self, tracer: &DistributedTracer) -> Span {
tracer.start_span_with_builder(self)
}
}Export Pipeline Implementation
Multi-Format Exporters
/// Export pipeline with multiple format support
/// **Pattern Applied: Strategy Pattern + Pipeline Pattern**
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::mpsc;
#[async_trait]
pub trait MetricsExporter: Send + Sync {
async fn export(&self, metrics: &[MetricFamily]) -> Result<()>;
fn name(&self) -> &'static str;
}
#[async_trait]
pub trait TracingExporter: Send + Sync {
async fn export(&self, spans: &[SpanData]) -> Result<()>;
fn name(&self) -> &'static str;
}
/// Prometheus metrics exporter
pub struct PrometheusExporter {
endpoint: String,
push_gateway: Option<String>,
job_name: String,
client: reqwest::Client,
}
impl PrometheusExporter {
pub fn new(config: &PrometheusExportConfig) -> Self {
Self {
endpoint: config.endpoint.clone(),
push_gateway: config.push_gateway.clone(),
job_name: config.job_name.clone(),
client: reqwest::Client::new(),
}
}
}
#[async_trait]
impl MetricsExporter for PrometheusExporter {
async fn export(&self, metrics: &[MetricFamily]) -> Result<()> {
let formatted = self.format_prometheus_metrics(metrics)?;
if let Some(push_gateway) = &self.push_gateway {
// Push to Prometheus Push Gateway
let url = format!("{}/metrics/job/{}", push_gateway, self.job_name);
self.client
.post(&url)
.header("Content-Type", "text/plain")
.body(formatted)
.send()
.await?;
} else {
// Serve metrics endpoint
// Implementation would involve setting up HTTP server
}
Ok(())
}
fn name(&self) -> &'static str {
"prometheus"
}
}
impl PrometheusExporter {
fn format_prometheus_metrics(&self, metrics: &[MetricFamily]) -> Result<String> {
let mut output = String::new();
for family in metrics {
// HELP line
output.push_str(&format!("# HELP {} {}\n", family.name, family.description));
// TYPE line
let type_str = match family.metric_type {
MetricType::Counter => "counter",
MetricType::Gauge => "gauge",
MetricType::Histogram => "histogram",
MetricType::Summary => "summary",
};
output.push_str(&format!("# TYPE {} {}\n", family.name, type_str));
// Metrics
for metric in &family.metrics {
match &metric.value {
MetricValue::Counter(value) => {
output.push_str(&format!(
"{}{} {}\n",
family.name,
self.format_labels(&metric.labels),
value
));
}
MetricValue::Gauge(value) => {
output.push_str(&format!(
"{}{} {}\n",
family.name,
self.format_labels(&metric.labels),
value
));
}
MetricValue::Histogram(hist) => {
// Histogram buckets
for bucket in &hist.buckets {
output.push_str(&format!(
"{}_bucket{{le=\"{}\"{} {}\n",
family.name,
bucket.upper_bound,
self.format_labels_suffix(&metric.labels),
bucket.cumulative_count
));
}
// +Inf bucket
output.push_str(&format!(
"{}_bucket{{le=\"+Inf\"{} {}\n",
family.name,
self.format_labels_suffix(&metric.labels),
hist.sample_count
));
// Count and sum
output.push_str(&format!(
"{}_count{} {}\n",
family.name,
self.format_labels(&metric.labels),
hist.sample_count
));
output.push_str(&format!(
"{}_sum{} {}\n",
family.name,
self.format_labels(&metric.labels),
hist.sample_sum
));
}
MetricValue::Summary(_) => {
// Summary implementation
// Similar to histogram but with quantiles
}
}
}
}
Ok(output)
}
fn format_labels(&self, labels: &Labels) -> String {
if labels.is_empty() {
String::new()
} else {
format!("{{{}}}",
labels.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v))
.collect::<Vec<_>>()
.join(",")
)
}
}
fn format_labels_suffix(&self, labels: &Labels) -> String {
if labels.is_empty() {
"}".to_string()
} else {
format!(",{}}}",
labels.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v))
.collect::<Vec<_>>()
.join(",")
)
}
}
}
/// Jaeger tracing exporter
pub struct JaegerExporter {
collector_endpoint: String,
agent_endpoint: String,
client: reqwest::Client,
batch_processor: Arc<BatchSpanProcessor>,
}
impl JaegerExporter {
pub fn new(config: &JaegerExportConfig) -> Self {
Self {
collector_endpoint: config.collector_endpoint.clone(),
agent_endpoint: config.agent_endpoint.clone(),
client: reqwest::Client::new(),
batch_processor: Arc::new(BatchSpanProcessor::new(512, Duration::from_secs(1))),
}
}
}
#[async_trait]
impl TracingExporter for JaegerExporter {
async fn export(&self, spans: &[SpanData]) -> Result<()> {
let jaeger_spans = self.convert_to_jaeger_spans(spans)?;
let batch = JaegerBatch {
process: JaegerProcess {
service_name: "sindhan-service".to_string(),
tags: vec![],
},
spans: jaeger_spans,
};
let json_payload = serde_json::to_string(&batch)?;
self.client
.post(&format!("{}/api/traces", self.collector_endpoint))
.header("Content-Type", "application/json")
.body(json_payload)
.send()
.await?;
Ok(())
}
fn name(&self) -> &'static str {
"jaeger"
}
}
impl JaegerExporter {
fn convert_to_jaeger_spans(&self, spans: &[SpanData]) -> Result<Vec<JaegerSpan>> {
spans.iter()
.map(|span| Ok(JaegerSpan {
trace_id: span.context.trace_id().to_string(),
span_id: span.context.span_id().to_string(),
parent_span_id: span.parent_span_id.map(|id| id.to_string()),
operation_name: span.name.clone(),
start_time: span.start_time.duration_since(UNIX_EPOCH)?.as_micros() as u64,
duration: span.end_time
.unwrap_or_else(|| SystemTime::now())
.duration_since(span.start_time)?
.as_micros() as u64,
tags: self.convert_attributes_to_tags(&span.attributes)?,
logs: self.convert_events_to_logs(&span.events)?,
process: None,
}))
.collect()
}
fn convert_attributes_to_tags(&self, attributes: &HashMap<String, AttributeValue>) -> Result<Vec<JaegerTag>> {
attributes.iter()
.map(|(key, value)| Ok(JaegerTag {
key: key.clone(),
value: self.attribute_value_to_string(value),
tag_type: self.get_jaeger_tag_type(value),
}))
.collect()
}
fn convert_events_to_logs(&self, events: &[Event]) -> Result<Vec<JaegerLog>> {
events.iter()
.map(|event| Ok(JaegerLog {
timestamp: event.timestamp.duration_since(UNIX_EPOCH)?.as_micros() as u64,
fields: vec![
JaegerTag {
key: "event".to_string(),
value: event.name.clone(),
tag_type: "string".to_string(),
}
],
}))
.collect()
}
fn attribute_value_to_string(&self, value: &AttributeValue) -> String {
match value {
AttributeValue::String(s) => s.clone(),
AttributeValue::Bool(b) => b.to_string(),
AttributeValue::I64(i) => i.to_string(),
AttributeValue::F64(f) => f.to_string(),
AttributeValue::Array(arr) => format!("{:?}", arr),
}
}
fn get_jaeger_tag_type(&self, value: &AttributeValue) -> String {
match value {
AttributeValue::String(_) => "string",
AttributeValue::Bool(_) => "bool",
AttributeValue::I64(_) => "number",
AttributeValue::F64(_) => "number",
AttributeValue::Array(_) => "string",
}.to_string()
}
}
/// OTLP (OpenTelemetry Protocol) exporter
pub struct OtlpExporter {
endpoint: String,
headers: HashMap<String, String>,
client: reqwest::Client,
timeout: Duration,
}
impl OtlpExporter {
pub fn new(config: &OtlpExportConfig) -> Self {
Self {
endpoint: config.endpoint.clone(),
headers: config.headers.clone(),
client: reqwest::Client::new(),
timeout: config.timeout,
}
}
}
#[async_trait]
impl TracingExporter for OtlpExporter {
async fn export(&self, spans: &[SpanData]) -> Result<()> {
let otlp_request = self.create_otlp_request(spans)?;
let protobuf_payload = self.serialize_to_protobuf(&otlp_request)?;
let mut request = self.client
.post(&format!("{}/v1/traces", self.endpoint))
.header("Content-Type", "application/x-protobuf")
.timeout(self.timeout)
.body(protobuf_payload);
for (key, value) in &self.headers {
request = request.header(key, value);
}
request.send().await?;
Ok(())
}
fn name(&self) -> &'static str {
"otlp"
}
}
impl OtlpExporter {
fn create_otlp_request(&self, spans: &[SpanData]) -> Result<OtlpTraceRequest> {
// Implementation would create OTLP protobuf structures
todo!("Implement OTLP protobuf conversion")
}
fn serialize_to_protobuf(&self, request: &OtlpTraceRequest) -> Result<Vec<u8>> {
// Implementation would serialize to protobuf
todo!("Implement protobuf serialization")
}
}Complete Configuration System
Environment-based Configuration
/// Environment-driven configuration with validation
use std::env;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentConfig {
pub service_name: String,
pub service_version: String,
pub environment: String,
pub metrics: MetricsConfig,
pub logging: LoggingConfig,
pub tracing: TracingConfig,
pub export: ExportConfig,
}
impl EnvironmentConfig {
pub fn from_env() -> Result<Self> {
Ok(Self {
service_name: env::var("SINDHAN_SERVICE_NAME")
.context("SINDHAN_SERVICE_NAME environment variable is required")?,
service_version: env::var("SINDHAN_SERVICE_VERSION")
.unwrap_or_else(|_| "unknown".to_string()),
environment: env::var("SINDHAN_ENVIRONMENT")
.unwrap_or_else(|_| "development".to_string()),
metrics: MetricsConfig::from_env()?,
logging: LoggingConfig::from_env()?,
tracing: TracingConfig::from_env()?,
export: ExportConfig::from_env()?,
})
}
}
impl MetricsConfig {
pub fn from_env() -> Result<Self> {
Ok(Self {
enabled: env::var("SINDHAN_METRICS_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
collection_interval: env::var("SINDHAN_METRICS_COLLECTION_INTERVAL")
.unwrap_or_else(|_| "15s".to_string())
.parse()
.map_err(|_| anyhow::anyhow!("Invalid duration format"))?,
prometheus_endpoint: env::var("SINDHAN_METRICS_PROMETHEUS_ENDPOINT").ok(),
custom_metrics: Vec::new(), // Would be loaded from config file
sampling_rate: env::var("SINDHAN_METRICS_SAMPLING_RATE")
.unwrap_or_else(|_| "1.0".to_string())
.parse()
.unwrap_or(1.0),
})
}
}
impl LoggingConfig {
pub fn from_env() -> Result<Self> {
let level_str = env::var("SINDHAN_LOGGING_LEVEL")
.unwrap_or_else(|_| "info".to_string());
let level = match level_str.to_lowercase().as_str() {
"trace" => LogLevel::Trace,
"debug" => LogLevel::Debug,
"info" => LogLevel::Info,
"warn" => LogLevel::Warn,
"error" => LogLevel::Error,
_ => LogLevel::Info,
};
let format_str = env::var("SINDHAN_LOGGING_FORMAT")
.unwrap_or_else(|_| "json".to_string());
let format = match format_str.to_lowercase().as_str() {
"json" => LogFormat::Json,
"text" => LogFormat::Text,
"logfmt" => LogFormat::Logfmt,
_ => LogFormat::Json,
};
Ok(Self {
enabled: env::var("SINDHAN_LOGGING_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
level,
format,
structured: env::var("SINDHAN_LOGGING_STRUCTURED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
correlation_tracking: env::var("SINDHAN_LOGGING_CORRELATION_TRACKING")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
outputs: vec![LogOutput::Stdout], // Default to stdout
})
}
}
impl TracingConfig {
pub fn from_env() -> Result<Self> {
let sampling_str = env::var("SINDHAN_TRACING_SAMPLING_STRATEGY")
.unwrap_or_else(|_| "probabilistic:1.0".to_string());
let sampling_strategy = Self::parse_sampling_strategy(&sampling_str)?;
Ok(Self {
enabled: env::var("SINDHAN_TRACING_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
sampling_strategy,
jaeger_endpoint: env::var("SINDHAN_TRACING_JAEGER_ENDPOINT").ok(),
otlp_endpoint: env::var("SINDHAN_TRACING_OTLP_ENDPOINT").ok(),
batch_export: env::var("SINDHAN_TRACING_BATCH_EXPORT")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true),
max_export_batch_size: env::var("SINDHAN_TRACING_MAX_EXPORT_BATCH_SIZE")
.unwrap_or_else(|_| "512".to_string())
.parse()
.unwrap_or(512),
})
}
fn parse_sampling_strategy(strategy_str: &str) -> Result<SamplingStrategy> {
match strategy_str {
"always_on" => Ok(SamplingStrategy::AlwaysOn),
"always_off" => Ok(SamplingStrategy::AlwaysOff),
s if s.starts_with("probabilistic:") => {
let rate_str = s.strip_prefix("probabilistic:").unwrap();
let rate: f64 = rate_str.parse()
.context("Invalid probabilistic sampling rate")?;
Ok(SamplingStrategy::Probabilistic(rate))
}
s if s.starts_with("rate_limiting:") => {
let limit_str = s.strip_prefix("rate_limiting:").unwrap();
let limit: u32 = limit_str.parse()
.context("Invalid rate limiting value")?;
Ok(SamplingStrategy::RateLimiting { max_traces_per_second: limit })
}
_ => Err(anyhow::anyhow!("Unknown sampling strategy: {}", strategy_str)),
}
}
}Complete Application Example
Full Working Example with All Components
// Complete example showing all observability features
use sindhan_observability::{
ObservabilityProvider, ObservabilityConfig,
metrics::{Counter, Histogram, Gauge, MetricsRegistry},
logging::{StructuredLogger, LogLevel, LogContext},
tracing::{Tracer, SpanBuilder},
instrumentation::{instrument, instrument_async},
exporters::{PrometheusExporter, JaegerExporter, OtlpExporter},
config::{MetricsConfig, LoggingConfig, TracingConfig},
};
use anyhow::Result;
use std::time::Duration;
use tokio::time::sleep;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct UserProfile {
pub id: String,
pub name: String,
pub email: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserRequest {
pub name: String,
pub email: String,
}
// Comprehensive service implementation
pub struct UserService {
observability: ObservabilityProvider,
database: DatabaseConnection,
cache: CacheConnection,
// Service-specific metrics
request_count: Counter,
request_duration: Histogram,
active_connections: Gauge,
error_count: Counter,
// Business metrics
user_registrations: Counter,
user_logins: Counter,
active_users: Gauge,
}
impl UserService {
pub async fn new(
observability: ObservabilityProvider,
database: DatabaseConnection,
cache: CacheConnection,
) -> Result<Self> {
let metrics = observability.metrics();
// Create service metrics
let request_count = metrics.create_counter(
"user_service_requests_total",
"Total number of requests to user service",
Labels::new().with("service", "user-service"),
).await;
let request_duration = metrics.create_histogram(
"user_service_request_duration_seconds",
"Request duration in seconds",
Labels::new().with("service", "user-service"),
).await;
let active_connections = metrics.create_gauge(
"user_service_active_connections",
"Number of active database connections",
Labels::new().with("service", "user-service"),
).await;
let error_count = metrics.create_counter(
"user_service_errors_total",
"Total number of errors",
Labels::new().with("service", "user-service"),
).await;
// Business metrics
let user_registrations = metrics.create_counter(
"user_registrations_total",
"Total number of user registrations",
Labels::new(),
).await;
let user_logins = metrics.create_counter(
"user_logins_total",
"Total number of user logins",
Labels::new(),
).await;
let active_users = metrics.create_gauge(
"active_users_current",
"Currently active users",
Labels::new(),
).await;
Ok(Self {
observability,
database,
cache,
request_count,
request_duration,
active_connections,
error_count,
user_registrations,
user_logins,
active_users,
})
}
#[instrument_async(
name = "create_user",
skip(self, request),
fields(
user.email = %request.email,
user.name = %request.name
)
)]
pub async fn create_user(&self, request: CreateUserRequest) -> Result<UserProfile> {
let start_time = std::time::Instant::now();
let logger = self.observability.logging();
let tracer = self.observability.tracing();
// Log the request start
logger.info(
"Creating new user",
LogContext::new()
.with_field("email", &request.email)
.with_field("name", &request.name),
);
// Increment request counter
self.request_count.increment(
Labels::new()
.with("operation", "create_user")
.with("method", "POST")
);
// Create operation span
let span = tracer.start_span("user_creation")
.with_attribute("user.email", &request.email)
.with_attribute("operation", "create");
let _guard = span.enter();
let result = async {
// Validate request
self.validate_create_request(&request).await?;
// Check if user already exists
if self.user_exists_by_email(&request.email).await? {
return Err(anyhow::anyhow!("User with email {} already exists", request.email));
}
// Create user in database
let user = self.create_user_in_database(&request).await?;
// Cache the user
self.cache_user(&user).await?;
// Update business metrics
self.user_registrations.increment(Labels::new());
self.active_users.increment(Labels::new());
Ok(user)
}.await;
// Record metrics and logs
let duration = start_time.elapsed();
self.request_duration.record(
duration.as_secs_f64(),
Labels::new()
.with("operation", "create_user")
.with("status", if result.is_ok() { "success" } else { "error" })
);
match &result {
Ok(user) => {
logger.info(
"User created successfully",
LogContext::new()
.with_field("user_id", &user.id)
.with_field("email", &user.email)
.with_field("duration_ms", duration.as_millis()),
);
span.set_attribute("user.id", &user.id);
span.set_status(SpanStatus::Ok);
}
Err(error) => {
self.error_count.increment(
Labels::new()
.with("operation", "create_user")
.with("error_type", error.type_name())
);
logger.error(
"Failed to create user",
LogContext::new()
.with_field("email", &request.email)
.with_field("duration_ms", duration.as_millis())
.with_error(error),
);
span.record_exception(error);
span.set_status(SpanStatus::error(error.to_string()));
}
}
result
}
#[instrument_async(
name = "get_user",
skip(self),
fields(user.id = %user_id)
)]
pub async fn get_user(&self, user_id: &str) -> Result<Option<UserProfile>> {
let start_time = std::time::Instant::now();
let logger = self.observability.logging();
let tracer = self.observability.tracing();
logger.debug(
"Fetching user",
LogContext::new().with_field("user_id", user_id),
);
self.request_count.increment(
Labels::new()
.with("operation", "get_user")
.with("method", "GET")
);
// Try cache first
let result = match self.get_user_from_cache(user_id).await? {
Some(user) => {
logger.debug(
"User found in cache",
LogContext::new().with_field("user_id", user_id),
);
Ok(Some(user))
}
None => {
// Fallback to database
let user = self.get_user_from_database(user_id).await?;
if let Some(ref user) = user {
// Cache for future requests
self.cache_user(user).await?;
}
Ok(user)
}
};
let duration = start_time.elapsed();
self.request_duration.record(
duration.as_secs_f64(),
Labels::new()
.with("operation", "get_user")
.with("status", if result.is_ok() { "success" } else { "error" })
);
result
}
#[instrument_async(
name = "authenticate_user",
skip(self, password),
fields(user.email = %email)
)]
pub async fn authenticate_user(&self, email: &str, password: &str) -> Result<bool> {
let start_time = std::time::Instant::now();
let logger = self.observability.logging();
logger.info(
"Authenticating user",
LogContext::new().with_field("email", email),
);
self.request_count.increment(
Labels::new()
.with("operation", "authenticate")
.with("method", "POST")
);
let result = self.verify_password(email, password).await;
match &result {
Ok(true) => {
self.user_logins.increment(
Labels::new().with("status", "success")
);
logger.info(
"User authenticated successfully",
LogContext::new().with_field("email", email),
);
}
Ok(false) => {
self.user_logins.increment(
Labels::new().with("status", "failed")
);
logger.warn(
"Authentication failed - invalid credentials",
LogContext::new().with_field("email", email),
);
}
Err(error) => {
self.error_count.increment(
Labels::new()
.with("operation", "authenticate")
.with("error_type", error.type_name())
);
logger.error(
"Authentication error",
LogContext::new()
.with_field("email", email)
.with_error(error),
);
}
}
let duration = start_time.elapsed();
self.request_duration.record(
duration.as_secs_f64(),
Labels::new()
.with("operation", "authenticate")
.with("status", match &result {
Ok(true) => "success",
Ok(false) => "failed",
Err(_) => "error",
})
);
result
}
// Helper methods with instrumentation
#[instrument_async(skip(self))]
async fn validate_create_request(&self, request: &CreateUserRequest) -> Result<()> {
if request.name.is_empty() {
return Err(anyhow::anyhow!("Name cannot be empty"));
}
if request.email.is_empty() || !request.email.contains('@') {
return Err(anyhow::anyhow!("Invalid email format"));
}
Ok(())
}
#[instrument_async(skip(self))]
async fn user_exists_by_email(&self, email: &str) -> Result<bool> {
// Database query with automatic instrumentation
let span = self.observability.tracing()
.start_span("db_query")
.with_attribute("db.operation", "SELECT")
.with_attribute("db.table", "users");
let _guard = span.enter();
// Simulate database call
sleep(Duration::from_millis(10)).await;
Ok(false) // Simplified
}
#[instrument_async(skip(self, request))]
async fn create_user_in_database(&self, request: &CreateUserRequest) -> Result<UserProfile> {
let span = self.observability.tracing()
.start_span("db_insert")
.with_attribute("db.operation", "INSERT")
.with_attribute("db.table", "users");
let _guard = span.enter();
self.active_connections.increment(Labels::new());
// Simulate database call
sleep(Duration::from_millis(25)).await;
let user = UserProfile {
id: uuid::Uuid::new_v4().to_string(),
name: request.name.clone(),
email: request.email.clone(),
created_at: chrono::Utc::now(),
};
self.active_connections.decrement(Labels::new());
Ok(user)
}
#[instrument_async(skip(self, user))]
async fn cache_user(&self, user: &UserProfile) -> Result<()> {
let span = self.observability.tracing()
.start_span("cache_set")
.with_attribute("cache.operation", "SET")
.with_attribute("cache.key", format!("user:{}", user.id));
let _guard = span.enter();
// Simulate cache operation
sleep(Duration::from_millis(5)).await;
Ok(())
}
#[instrument_async(skip(self))]
async fn get_user_from_cache(&self, user_id: &str) -> Result<Option<UserProfile>> {
let span = self.observability.tracing()
.start_span("cache_get")
.with_attribute("cache.operation", "GET")
.with_attribute("cache.key", format!("user:{}", user_id));
let _guard = span.enter();
// Simulate cache lookup
sleep(Duration::from_millis(2)).await;
Ok(None) // Simplified - cache miss
}
#[instrument_async(skip(self))]
async fn get_user_from_database(&self, user_id: &str) -> Result<Option<UserProfile>> {
let span = self.observability.tracing()
.start_span("db_query")
.with_attribute("db.operation", "SELECT")
.with_attribute("db.table", "users")
.with_attribute("user.id", user_id);
let _guard = span.enter();
self.active_connections.increment(Labels::new());
// Simulate database query
sleep(Duration::from_millis(15)).await;
self.active_connections.decrement(Labels::new());
// Simulate found user
Ok(Some(UserProfile {
id: user_id.to_string(),
name: "Test User".to_string(),
email: "test@example.com".to_string(),
created_at: chrono::Utc::now(),
}))
}
#[instrument_async(skip(self, password))]
async fn verify_password(&self, email: &str, password: &str) -> Result<bool> {
let span = self.observability.tracing()
.start_span("password_verify")
.with_attribute("user.email", email);
let _guard = span.enter();
// Simulate password verification
sleep(Duration::from_millis(100)).await; // Intentionally slow for bcrypt simulation
Ok(password == "correct_password") // Simplified
}
}
// Application initialization and shutdown
#[tokio::main]
async fn main() -> Result<()> {
// Initialize observability
let observability = initialize_observability().await?;
let logger = observability.logging();
logger.info(
"Starting user service",
LogContext::new()
.with_field("version", env!("CARGO_PKG_VERSION"))
.with_field("environment", std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string())),
);
// Initialize service
let database = DatabaseConnection::new().await?;
let cache = CacheConnection::new().await?;
let user_service = UserService::new(observability.clone(), database, cache).await?;
// Start background tasks
let metrics_task = tokio::spawn(metrics_collection_task(observability.clone()));
let health_check_task = tokio::spawn(health_check_task(observability.clone()));
// Simulate some operations
simulate_user_operations(&user_service).await?;
// Graceful shutdown
logger.info("Shutting down user service", LogContext::new());
// Cancel background tasks
metrics_task.abort();
health_check_task.abort();
// Flush all observability data
observability.flush().await?;
observability.shutdown().await?;
Ok(())
}
async fn initialize_observability() -> Result<ObservabilityProvider> {
let config = if let Ok(config_path) = std::env::var("OBSERVABILITY_CONFIG") {
// Load from file
let content = tokio::fs::read_to_string(config_path).await?;
toml::from_str(&content)?
} else {
// Use environment variables
ObservabilityConfig::from_env()?
};
ObservabilityProvider::from_config(config).await
}
async fn simulate_user_operations(service: &UserService) -> Result<()> {
// Create a few users
for i in 0..5 {
let request = CreateUserRequest {
name: format!("User {}", i),
email: format!("user{}@example.com", i),
};
match service.create_user(request).await {
Ok(user) => println!("Created user: {}", user.id),
Err(e) => println!("Failed to create user: {}", e),
}
// Small delay
sleep(Duration::from_millis(100)).await;
}
// Simulate some gets and authentications
for i in 0..10 {
let user_id = format!("user-{}", i % 3);
// Get user
match service.get_user(&user_id).await {
Ok(Some(user)) => {
// Try authentication
let auth_result = service.authenticate_user(&user.email, "correct_password").await;
println!("Auth result for {}: {:?}", user.email, auth_result);
}
Ok(None) => println!("User {} not found", user_id),
Err(e) => println!("Error getting user {}: {}", user_id, e),
}
sleep(Duration::from_millis(50)).await;
}
Ok(())
}
// Background task for system metrics
async fn metrics_collection_task(observability: ObservabilityProvider) {
let metrics = observability.metrics();
let logger = observability.logging();
let system_cpu = metrics.create_gauge(
"system_cpu_usage_percent",
"System CPU usage percentage",
Labels::new(),
).await;
let system_memory = metrics.create_gauge(
"system_memory_usage_bytes",
"System memory usage in bytes",
Labels::new(),
).await;
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
// Simulate system metrics collection
let cpu_usage = (rand::random::<f64>() * 100.0).min(100.0);
let memory_usage = (rand::random::<f64>() * 8_000_000_000.0); // Up to 8GB
system_cpu.set(cpu_usage, Labels::new());
system_memory.set(memory_usage, Labels::new());
logger.trace(
"System metrics collected",
LogContext::new()
.with_field("cpu_usage", cpu_usage)
.with_field("memory_usage", memory_usage),
);
}
}
// Health check task
async fn health_check_task(observability: ObservabilityProvider) {
let metrics = observability.metrics();
let logger = observability.logging();
let health_status = metrics.create_gauge(
"service_health_status",
"Service health status (1=healthy, 0=unhealthy)",
Labels::new(),
).await;
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
// Perform health checks
let is_healthy = perform_health_checks().await;
health_status.set(if is_healthy { 1.0 } else { 0.0 }, Labels::new());
if is_healthy {
logger.debug("Health check passed", LogContext::new());
} else {
logger.warn("Health check failed", LogContext::new());
}
}
}
async fn perform_health_checks() -> bool {
// Simulate health checks
sleep(Duration::from_millis(50)).await;
true // Always healthy in this example
}
// Mock types for the example
struct DatabaseConnection;
impl DatabaseConnection {
async fn new() -> Result<Self> { Ok(Self) }
}
struct CacheConnection;
impl CacheConnection {
async fn new() -> Result<Self> { Ok(Self) }
}This implementation provides a comprehensive, type-safe, and high-performance observability framework with async-first design principles and zero-cost abstractions. It includes complete configuration management, export pipelines, instrumentation patterns, and real-world usage examples.