Platform Observability Test Plan
This comprehensive test plan follows Test-Driven Development (TDD) principles to ensure systematic testing of all Platform Observability functionalities. The plan covers unit tests, integration tests, performance benchmarks, and end-to-end observability scenarios with a focus on type safety and async behavior.
Test-Driven Development Strategy
TDD Cycle Implementation
Red-Green-Refactor Cycle:
- Red: Write failing tests first for observability components
- Green: Write minimal Rust code to make tests pass
- Refactor: Optimize for performance while maintaining observability guarantees
Test Pyramid Structure
Unit Test Specifications
1. Metrics System Tests
1.1 Counter Tests
#[cfg(test)]
mod counter_tests {
use super::*;
use std::sync::Arc;
use tokio::time::Duration;
#[tokio::test]
async fn test_counter_creation_and_increment() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let labels = Labels::new().with("service", "test-service");
// Act
let counter = registry.create_counter(
"test_counter",
"Test counter description",
labels.clone()
).await;
counter.increment(Labels::new());
counter.add(5, Labels::new());
// Assert
assert_eq!(counter.get(), 6);
let metric_family = counter.to_metric_family();
assert_eq!(metric_family.name, "test_counter");
assert_eq!(metric_family.description, "Test counter description");
assert_eq!(metric_family.metric_type, MetricType::Counter);
}
#[tokio::test]
async fn test_counter_thread_safety() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let counter = Arc::new(
registry.create_counter(
"concurrent_counter",
"Concurrent test counter",
Labels::new().with("test", "concurrency")
).await
);
// Act - Spawn multiple tasks incrementing the counter
let mut handles = Vec::new();
for i in 0..100 {
let counter_clone = counter.clone();
let handle = tokio::spawn(async move {
for _ in 0..10 {
counter_clone.increment(Labels::new().with("thread", &i.to_string()));
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await.unwrap();
}
// Assert
assert_eq!(counter.get(), 1000); // 100 tasks * 10 increments each
}
#[tokio::test]
async fn test_counter_with_different_labels() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
// Act
let counter1 = registry.create_counter(
"http_requests",
"HTTP requests",
Labels::new().with("method", "GET").with("status", "200")
).await;
let counter2 = registry.create_counter(
"http_requests",
"HTTP requests",
Labels::new().with("method", "POST").with("status", "201")
).await;
counter1.increment(Labels::new());
counter1.increment(Labels::new());
counter2.add(3, Labels::new());
// Assert
assert_eq!(counter1.get(), 2);
assert_eq!(counter2.get(), 3);
let metrics = registry.export_metrics().await.unwrap();
assert_eq!(metrics.len(), 2); // Two different label combinations
}
#[tokio::test]
async fn test_counter_metric_family_export() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let counter = registry.create_counter(
"export_test_counter",
"Counter for export testing",
Labels::new().with("env", "test")
).await;
counter.add(42, Labels::new());
// Act
let metric_families = registry.export_metrics().await.unwrap();
// Assert
assert!(!metric_families.is_empty());
let counter_family = metric_families.iter()
.find(|family| family.name == "export_test_counter")
.expect("Counter metric family should exist");
assert_eq!(counter_family.description, "Counter for export testing");
assert_eq!(counter_family.metrics.len(), 1);
if let MetricValue::Counter(value) = &counter_family.metrics[0].value {
assert_eq!(*value, 42);
} else {
panic!("Expected counter metric value");
}
}
}1.2 Gauge Tests
#[cfg(test)]
mod gauge_tests {
use super::*;
#[tokio::test]
async fn test_gauge_set_and_get() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let gauge = registry.create_gauge(
"test_gauge",
"Test gauge description",
Labels::new().with("type", "test")
).await;
// Act & Assert
gauge.set(3.14, Labels::new());
assert_eq!(gauge.get(), 3.14);
gauge.increment(Labels::new());
assert_eq!(gauge.get(), 4.14);
gauge.decrement(Labels::new());
assert_eq!(gauge.get(), 3.14);
gauge.add(-1.14, Labels::new());
assert_eq!(gauge.get(), 2.0);
}
#[tokio::test]
async fn test_gauge_concurrent_modifications() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let gauge = Arc::new(
registry.create_gauge(
"concurrent_gauge",
"Concurrent test gauge",
Labels::new()
).await
);
// Act - Concurrent increments and decrements
let mut handles = Vec::new();
// 50 tasks incrementing
for _ in 0..50 {
let gauge_clone = gauge.clone();
handles.push(tokio::spawn(async move {
gauge_clone.increment(Labels::new());
}));
}
// 30 tasks decrementing
for _ in 0..30 {
let gauge_clone = gauge.clone();
handles.push(tokio::spawn(async move {
gauge_clone.decrement(Labels::new());
}));
}
for handle in handles {
handle.await.unwrap();
}
// Assert
assert_eq!(gauge.get(), 20.0); // 50 increments - 30 decrements
}
#[tokio::test]
async fn test_gauge_metric_export() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let gauge = registry.create_gauge(
"memory_usage",
"Memory usage in bytes",
Labels::new().with("component", "cache")
).await;
gauge.set(1024.0 * 1024.0 * 512.0, Labels::new()); // 512 MB
// Act
let metric_families = registry.export_metrics().await.unwrap();
// Assert
let gauge_family = metric_families.iter()
.find(|family| family.name == "memory_usage")
.expect("Gauge metric family should exist");
if let MetricValue::Gauge(value) = &gauge_family.metrics[0].value {
assert_eq!(*value, 1024.0 * 1024.0 * 512.0);
} else {
panic!("Expected gauge metric value");
}
}
}1.3 Histogram Tests
#[cfg(test)]
mod histogram_tests {
use super::*;
#[tokio::test]
async fn test_histogram_record_and_buckets() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let histogram = registry.create_histogram(
"request_duration",
"Request duration in seconds",
Labels::new().with("endpoint", "/api/users")
).await;
// Act - Record various values
histogram.record(0.001, Labels::new()); // 1ms
histogram.record(0.050, Labels::new()); // 50ms
histogram.record(0.100, Labels::new()); // 100ms
histogram.record(1.500, Labels::new()); // 1.5s
histogram.record(10.000, Labels::new()); // 10s
// Assert
assert_eq!(histogram.get_count(), 5);
assert!((histogram.get_sum() - 11.651).abs() < 0.001);
let bucket_counts = histogram.get_bucket_counts();
assert!(bucket_counts[0] >= 1); // 0.005 bucket should have at least 1 (0.001)
assert!(bucket_counts[2] >= 2); // 0.025 bucket should have at least 2 (0.001, 0.050)
assert!(bucket_counts[11] == 5); // Infinity bucket should have all 5
}
#[tokio::test]
async fn test_histogram_performance_with_many_observations() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let histogram = registry.create_histogram(
"performance_test",
"Performance test histogram",
Labels::new()
).await;
let start_time = std::time::Instant::now();
// Act - Record 100,000 observations
for i in 0..100_000 {
let value = (i as f64) / 100_000.0; // Values from 0 to 1
histogram.record(value, Labels::new());
}
let duration = start_time.elapsed();
// Assert
assert_eq!(histogram.get_count(), 100_000);
assert!(duration.as_millis() < 100); // Should complete in under 100ms
let bucket_counts = histogram.get_bucket_counts();
assert!(bucket_counts.iter().all(|&count| count > 0)); // All buckets should have some data
}
#[tokio::test]
async fn test_histogram_concurrent_recording() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let histogram = Arc::new(
registry.create_histogram(
"concurrent_histogram",
"Concurrent recording test",
Labels::new()
).await
);
// Act - Concurrent recording from multiple tasks
let mut handles = Vec::new();
for task_id in 0..10 {
let histogram_clone = histogram.clone();
handles.push(tokio::spawn(async move {
for i in 0..1000 {
let value = (task_id as f64 + i as f64) / 1000.0;
histogram_clone.record(value, Labels::new());
}
}));
}
for handle in handles {
handle.await.unwrap();
}
// Assert
assert_eq!(histogram.get_count(), 10_000); // 10 tasks * 1000 recordings each
assert!(histogram.get_sum() > 0.0);
let bucket_counts = histogram.get_bucket_counts();
assert_eq!(bucket_counts[bucket_counts.len() - 1], 10_000); // All values in infinity bucket
}
#[tokio::test]
async fn test_histogram_metric_export() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
let histogram = registry.create_histogram(
"response_time",
"HTTP response time",
Labels::new().with("method", "GET")
).await;
// Record some test values
histogram.record(0.001, Labels::new());
histogram.record(0.005, Labels::new());
histogram.record(0.010, Labels::new());
histogram.record(0.100, Labels::new());
// Act
let metric_families = registry.export_metrics().await.unwrap();
// Assert
let histogram_family = metric_families.iter()
.find(|family| family.name == "response_time")
.expect("Histogram metric family should exist");
if let MetricValue::Histogram(hist_value) = &histogram_family.metrics[0].value {
assert_eq!(hist_value.sample_count, 4);
assert!((hist_value.sample_sum - 0.116).abs() < 0.001);
assert!(!hist_value.buckets.is_empty());
// Check that bucket counts are cumulative
let mut prev_count = 0;
for bucket in &hist_value.buckets {
assert!(bucket.cumulative_count >= prev_count);
prev_count = bucket.cumulative_count;
}
} else {
panic!("Expected histogram metric value");
}
}
}2. Structured Logging Tests
2.1 Logger Creation and Configuration Tests
#[cfg(test)]
mod structured_logging_tests {
use super::*;
use std::sync::{Arc, Mutex};
struct TestLogOutput {
logs: Arc<Mutex<Vec<String>>>,
}
impl TestLogOutput {
fn new() -> Self {
Self {
logs: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_logs(&self) -> Vec<String> {
self.logs.lock().unwrap().clone()
}
}
impl LogOutput for TestLogOutput {
fn write(&self, message: &str) {
self.logs.lock().unwrap().push(message.to_string());
}
}
#[tokio::test]
async fn test_structured_logger_creation() {
// Arrange & Act
let logger = StructuredLogger::builder()
.with_service_name("test-service")
.with_service_version("1.0.0")
.with_environment("test")
.with_level(LogLevel::Debug)
.build();
// Assert
assert!(logger.is_ok());
let logger = logger.unwrap();
assert_eq!(logger.service_name, "test-service");
assert_eq!(logger.service_version, "1.0.0");
assert_eq!(logger.environment, "test");
assert_eq!(logger.level, LogLevel::Debug);
}
#[tokio::test]
async fn test_structured_logging_with_context() {
// Arrange
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = StructuredLogger::builder()
.with_service_name("context-test-service")
.with_service_version("2.0.0")
.with_environment("test")
.add_output(Box::new(test_output))
.build()
.unwrap();
let context = LogContext::new()
.with_field("user_id", "user-123")
.with_field("request_id", "req-456")
.with_field("duration_ms", 42);
// Act
logger.info("User action completed", context);
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 1);
let log_entry: serde_json::Value = serde_json::from_str(&logs[0]).unwrap();
assert_eq!(log_entry["level"], "INFO");
assert_eq!(log_entry["message"], "User action completed");
assert_eq!(log_entry["service"], "context-test-service");
assert_eq!(log_entry["user_id"], "user-123");
assert_eq!(log_entry["request_id"], "req-456");
assert_eq!(log_entry["duration_ms"], 42);
}
#[tokio::test]
async fn test_log_level_filtering() {
// Arrange
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = StructuredLogger::builder()
.with_service_name("level-test-service")
.with_level(LogLevel::Warn) // Only warn and above
.add_output(Box::new(test_output))
.build()
.unwrap();
// Act
logger.debug("Debug message", LogContext::new());
logger.info("Info message", LogContext::new());
logger.warn("Warning message", LogContext::new());
logger.error("Error message", LogContext::new());
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 2); // Only warn and error should be logged
let log1: serde_json::Value = serde_json::from_str(&logs[0]).unwrap();
let log2: serde_json::Value = serde_json::from_str(&logs[1]).unwrap();
assert_eq!(log1["level"], "WARN");
assert_eq!(log1["message"], "Warning message");
assert_eq!(log2["level"], "ERROR");
assert_eq!(log2["message"], "Error message");
}
#[tokio::test]
async fn test_correlation_id_propagation() {
// Arrange
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = StructuredLogger::builder()
.with_service_name("correlation-test")
.add_output(Box::new(test_output))
.build()
.unwrap();
let correlation_id = CorrelationId::generate();
let context = LogContext::new().with_correlation_id(correlation_id);
// Act
logger.info("First message", context.clone());
logger.error("Second message", context);
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 2);
let log1: serde_json::Value = serde_json::from_str(&logs[0]).unwrap();
let log2: serde_json::Value = serde_json::from_str(&logs[1]).unwrap();
assert_eq!(log1["correlation_id"], log2["correlation_id"]);
assert!(!log1["correlation_id"].as_str().unwrap().is_empty());
}
#[tokio::test]
async fn test_error_logging_with_context() {
// Arrange
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = StructuredLogger::builder()
.with_service_name("error-test")
.add_output(Box::new(test_output))
.build()
.unwrap();
let error = anyhow::anyhow!("Database connection failed");
let context = LogContext::new()
.with_field("operation", "user_lookup")
.with_error(&*error);
// Act
logger.error("Database operation failed", context);
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 1);
let log_entry: serde_json::Value = serde_json::from_str(&logs[0]).unwrap();
assert_eq!(log_entry["level"], "ERROR");
assert_eq!(log_entry["operation"], "user_lookup");
assert!(log_entry["error"].is_object());
assert_eq!(log_entry["error"]["message"], "Database connection failed");
}
#[tokio::test]
async fn test_structured_data_logging() {
// Arrange
#[derive(serde::Serialize)]
struct UserAction {
action_type: String,
user_id: String,
resource_id: String,
metadata: serde_json::Value,
}
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = StructuredLogger::builder()
.with_service_name("structured-test")
.add_output(Box::new(test_output))
.build()
.unwrap();
let user_action = UserAction {
action_type: "file_upload".to_string(),
user_id: "user-789".to_string(),
resource_id: "file-456".to_string(),
metadata: serde_json::json!({
"file_size": 1024,
"file_type": "image/png"
}),
};
let context = LogContext::new().with_structured_data(&user_action);
// Act
logger.info("User action logged", context);
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 1);
let log_entry: serde_json::Value = serde_json::from_str(&logs[0]).unwrap();
assert_eq!(log_entry["action_type"], "file_upload");
assert_eq!(log_entry["user_id"], "user-789");
assert_eq!(log_entry["resource_id"], "file-456");
assert_eq!(log_entry["metadata"]["file_size"], 1024);
assert_eq!(log_entry["metadata"]["file_type"], "image/png");
}
#[tokio::test]
async fn test_concurrent_logging() {
// Arrange
let test_output = TestLogOutput::new();
let logs_ref = test_output.logs.clone();
let logger = Arc::new(
StructuredLogger::builder()
.with_service_name("concurrent-test")
.add_output(Box::new(test_output))
.build()
.unwrap()
);
// Act - Concurrent logging from multiple tasks
let mut handles = Vec::new();
for task_id in 0..10 {
let logger_clone = logger.clone();
handles.push(tokio::spawn(async move {
for i in 0..100 {
let context = LogContext::new()
.with_field("task_id", task_id)
.with_field("iteration", i);
logger_clone.info(&format!("Task {} iteration {}", task_id, i), context);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
// Assert
let logs = logs_ref.lock().unwrap();
assert_eq!(logs.len(), 1000); // 10 tasks * 100 iterations each
// Verify that all logs are valid JSON
for log in logs.iter() {
let _: serde_json::Value = serde_json::from_str(log).unwrap();
}
}
}3. Distributed Tracing Tests
3.1 Span Creation and Lifecycle Tests
#[cfg(test)]
mod distributed_tracing_tests {
use super::*;
#[tokio::test]
async fn test_span_creation_and_attributes() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("trace-test")
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
// Act
let mut span = tracer.start_span("test_operation");
span.set_attribute("user.id", "user-123");
span.set_attribute("operation.type", "database_query");
span.set_attribute("rows.affected", 42i64);
// Assert
assert!(span.is_recording());
assert_eq!(span.context().trace_id().to_string().len(), 32); // 128-bit trace ID as hex
assert_eq!(span.context().span_id().to_string().len(), 16); // 64-bit span ID as hex
let attributes = &span.attributes;
assert_eq!(attributes.get("user.id").unwrap().as_str(), Some("user-123"));
assert_eq!(attributes.get("operation.type").unwrap().as_str(), Some("database_query"));
assert_eq!(attributes.get("rows.affected").unwrap().as_i64(), Some(42));
}
#[tokio::test]
async fn test_span_hierarchy_and_context_propagation() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("hierarchy-test")
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
// Act
let parent_span = tracer.start_span("parent_operation");
let parent_context = parent_span.context().clone();
let child_span = tracer.start_span_with_parent("child_operation", &parent_context);
let child_context = child_span.context();
// Assert
assert_eq!(child_context.trace_id(), parent_context.trace_id());
assert_ne!(child_context.span_id(), parent_context.span_id());
// In a real implementation, we'd check parent_span_id relationship
}
#[tokio::test]
async fn test_span_events_and_exceptions() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("events-test")
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
// Act
let mut span = tracer.start_span("operation_with_events");
// Add a custom event
let mut event_attributes = HashMap::new();
event_attributes.insert("cache.hit".to_string(), AttributeValue::Bool(true));
event_attributes.insert("cache.key".to_string(), AttributeValue::String("user:123".to_string()));
span.add_event("cache_lookup", event_attributes);
// Record an exception
let error = anyhow::anyhow!("Connection timeout");
span.record_exception(&*error);
span.set_status(SpanStatus::error("Operation failed due to timeout"));
// Assert
assert_eq!(span.events.len(), 2); // cache_lookup + exception
assert_eq!(span.events[0].name, "cache_lookup");
assert_eq!(span.events[1].name, "exception");
if let SpanStatus::Error { description } = &span.status {
assert_eq!(description, "Operation failed due to timeout");
} else {
panic!("Expected error status");
}
}
#[tokio::test]
async fn test_span_context_propagation_across_tasks() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("async-test")
.build()
.unwrap();
let tracer = Arc::new(
DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap()
);
// Act
let parent_span = tracer.start_span("async_parent");
let trace_id = parent_span.context().trace_id();
let tracer_clone = tracer.clone();
let child_task = tokio::spawn(async move {
// Simulate span context propagation
let child_span = tracer_clone.start_span_with_parent(
"async_child",
parent_span.context()
);
child_span.context().trace_id()
});
let child_trace_id = child_task.await.unwrap();
// Assert
assert_eq!(trace_id, child_trace_id);
}
#[tokio::test]
async fn test_span_sampling() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("sampling-test")
.with_tracing(TracingConfig {
enabled: true,
sampling_strategy: SamplingStrategy::Probabilistic(0.0), // Never sample
..Default::default()
})
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
// Act
let span = tracer.start_span("sampled_out_span");
// Assert
assert!(!span.is_recording()); // Should not be recording due to sampling
}
#[tokio::test]
async fn test_span_performance_overhead() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("performance-test")
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
let start_time = std::time::Instant::now();
// Act - Create many spans rapidly
for i in 0..10_000 {
let mut span = tracer.start_span(&format!("operation_{}", i));
span.set_attribute("iteration", i as i64);
span.end();
}
let duration = start_time.elapsed();
// Assert
assert!(duration.as_millis() < 100); // Should complete in under 100ms
}
}4. Integration Tests
4.1 End-to-End Observability Tests
#[cfg(test)]
mod integration_tests {
use super::*;
#[tokio::test]
async fn test_complete_observability_workflow() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("integration-test")
.with_service_version("1.0.0")
.with_environment("test")
.build()
.unwrap();
let observability = ObservabilityProvider::from_config(config).await.unwrap();
let metrics = observability.metrics().await;
let logger = observability.logging().await;
let tracer = observability.tracing().await;
// Act - Simulate a complete request flow
let request_counter = metrics.create_counter(
"http_requests_total",
"Total HTTP requests",
Labels::new().with("service", "integration-test")
).await;
let response_time = metrics.create_histogram(
"http_request_duration_seconds",
"HTTP request duration",
Labels::new().with("service", "integration-test")
).await;
let start_time = std::time::Instant::now();
let mut span = tracer.start_span("http_request");
span.set_attribute("http.method", "GET");
span.set_attribute("http.url", "/api/users/123");
// Simulate business logic
let user_id = "user-123";
let correlation_id = CorrelationId::generate();
logger.info(
"Processing user request",
LogContext::new()
.with_correlation_id(correlation_id)
.with_field("user_id", user_id)
.with_field("endpoint", "/api/users/123")
);
// Simulate database operation
let mut db_span = tracer.start_span_with_parent("database_query", span.context());
db_span.set_attribute("db.statement", "SELECT * FROM users WHERE id = $1");
db_span.set_attribute("db.name", "user_db");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // Simulate DB latency
db_span.end();
// Complete request
let duration = start_time.elapsed();
response_time.record(duration.as_secs_f64(), Labels::new().with("status", "200"));
request_counter.increment(Labels::new().with("method", "GET").with("status", "200"));
span.set_attribute("http.status_code", 200i64);
span.set_status(SpanStatus::Ok);
span.end();
logger.info(
"Request completed successfully",
LogContext::new()
.with_correlation_id(correlation_id)
.with_field("user_id", user_id)
.with_field("duration_ms", duration.as_millis())
.with_field("status", 200)
);
// Assert - Verify observability data
assert_eq!(request_counter.get(), 1);
assert_eq!(response_time.get_count(), 1);
assert!(response_time.get_sum() > 0.0);
let metric_families = metrics.export_metrics().await.unwrap();
assert!(metric_families.len() >= 2); // At least counter and histogram
// Flush all exporters
observability.flush().await.unwrap();
}
#[tokio::test]
async fn test_error_handling_and_observability() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("error-test")
.build()
.unwrap();
let observability = ObservabilityProvider::from_config(config).await.unwrap();
let metrics = observability.metrics().await;
let logger = observability.logging().await;
let tracer = observability.tracing().await;
let error_counter = metrics.create_counter(
"errors_total",
"Total errors",
Labels::new().with("service", "error-test")
).await;
// Act - Simulate an error scenario
let mut span = tracer.start_span("failing_operation");
span.set_attribute("operation.type", "data_processing");
let error = anyhow::anyhow!("Database connection lost");
let correlation_id = CorrelationId::generate();
// Log the error
logger.error(
"Operation failed due to database error",
LogContext::new()
.with_correlation_id(correlation_id)
.with_field("operation", "data_processing")
.with_error(&*error)
);
// Record error in span
span.record_exception(&*error);
span.set_status(SpanStatus::error("Database connection lost"));
span.end();
// Increment error metrics
error_counter.increment(
Labels::new()
.with("error_type", "database_connection")
.with("operation", "data_processing")
);
// Assert
assert_eq!(error_counter.get(), 1);
if let SpanStatus::Error { description } = &span.status {
assert_eq!(description, "Database connection lost");
} else {
panic!("Expected error status");
}
}
#[tokio::test]
async fn test_high_throughput_observability() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("throughput-test")
.with_tracing(TracingConfig {
batch_export: true,
max_export_batch_size: 1000,
..Default::default()
})
.build()
.unwrap();
let observability = ObservabilityProvider::from_config(config).await.unwrap();
let metrics = observability.metrics().await;
let tracer = observability.tracing().await;
let throughput_counter = metrics.create_counter(
"operations_total",
"Total operations",
Labels::new()
).await;
let start_time = std::time::Instant::now();
// Act - High throughput simulation
let mut tasks = Vec::new();
for task_id in 0..10 {
let counter = throughput_counter.clone();
let tracer_clone = tracer.clone();
tasks.push(tokio::spawn(async move {
for i in 0..1000 {
let mut span = tracer_clone.start_span(&format!("operation_{}_{}", task_id, i));
span.set_attribute("task_id", task_id as i64);
span.set_attribute("iteration", i as i64);
counter.increment(Labels::new().with("task", &task_id.to_string()));
span.end();
}
}));
}
for task in tasks {
task.await.unwrap();
}
let duration = start_time.elapsed();
// Assert
assert_eq!(throughput_counter.get(), 10_000);
assert!(duration.as_millis() < 1000); // Should complete in under 1 second
// Verify export performance
let export_start = std::time::Instant::now();
observability.flush().await.unwrap();
let export_duration = export_start.elapsed();
assert!(export_duration.as_millis() < 500); // Export should be fast
}
}5. Performance Benchmark Tests
5.1 Metrics Performance Tests
#[cfg(test)]
mod performance_benchmarks {
use super::*;
use criterion::{black_box, Criterion};
#[tokio::test]
async fn benchmark_counter_operations() {
let registry = MetricsRegistry::new(MetricsConfig::default());
let counter = registry.create_counter(
"benchmark_counter",
"Counter for benchmarking",
Labels::new()
).await;
let start = std::time::Instant::now();
// Benchmark counter increments
for _ in 0..1_000_000 {
counter.increment(Labels::new());
}
let duration = start.elapsed();
// Assert performance requirements
assert!(duration.as_nanos() / 1_000_000 < 100); // Under 100ns per operation
assert_eq!(counter.get(), 1_000_000);
}
#[tokio::test]
async fn benchmark_histogram_recording() {
let registry = MetricsRegistry::new(MetricsConfig::default());
let histogram = registry.create_histogram(
"benchmark_histogram",
"Histogram for benchmarking",
Labels::new()
).await;
let start = std::time::Instant::now();
// Benchmark histogram recordings
for i in 0..100_000 {
let value = (i as f64) / 100_000.0;
histogram.record(value, Labels::new());
}
let duration = start.elapsed();
// Assert performance requirements
assert!(duration.as_nanos() / 100_000 < 1000); // Under 1μs per operation
assert_eq!(histogram.get_count(), 100_000);
}
#[tokio::test]
async fn benchmark_structured_logging() {
let logger = StructuredLogger::builder()
.with_service_name("benchmark-service")
.add_output(Box::new(NoopLogOutput::new())) // Discard output for benchmarking
.build()
.unwrap();
let start = std::time::Instant::now();
// Benchmark log operations
for i in 0..50_000 {
let context = LogContext::new()
.with_field("iteration", i)
.with_field("benchmark", true);
logger.info("Benchmark log message", context);
}
let duration = start.elapsed();
// Assert performance requirements
assert!(duration.as_nanos() / 50_000 < 2000); // Under 2μs per log operation
}
#[tokio::test]
async fn benchmark_span_creation_and_completion() {
let config = ObservabilityConfig::builder()
.with_service_name("benchmark-tracer")
.build()
.unwrap();
let tracer = DistributedTracer::builder()
.with_config(config)
.build()
.await
.unwrap();
let start = std::time::Instant::now();
// Benchmark span operations
for i in 0..25_000 {
let mut span = tracer.start_span(&format!("benchmark_span_{}", i));
span.set_attribute("iteration", i as i64);
span.end();
}
let duration = start.elapsed();
// Assert performance requirements
assert!(duration.as_nanos() / 25_000 < 4000); // Under 4μs per span operation
}
struct NoopLogOutput;
impl NoopLogOutput {
fn new() -> Self {
Self
}
}
impl LogOutput for NoopLogOutput {
fn write(&self, _message: &str) {
// Discard the message for benchmarking
}
}
}6. Error Handling and Edge Case Tests
6.1 Export Failure Handling Tests
#[cfg(test)]
mod error_handling_tests {
use super::*;
#[tokio::test]
async fn test_metrics_export_failure_recovery() {
// Arrange
struct FailingMetricsExporter {
failure_count: Arc<std::sync::atomic::AtomicUsize>,
max_failures: usize,
}
impl FailingMetricsExporter {
fn new(max_failures: usize) -> Self {
Self {
failure_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
max_failures,
}
}
}
#[async_trait]
impl MetricsExporter for FailingMetricsExporter {
async fn export(&self, _metrics: &[MetricFamily]) -> Result<()> {
let current_failures = self.failure_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if current_failures < self.max_failures {
return Err(anyhow::anyhow!("Simulated export failure"));
}
Ok(())
}
}
let registry = MetricsRegistry::new(MetricsConfig::default());
let failing_exporter = FailingMetricsExporter::new(3); // Fail 3 times then succeed
// Act & Assert
// First 3 exports should fail
for i in 0..3 {
let result = failing_exporter.export(&[]).await;
assert!(result.is_err(), "Export {} should fail", i);
}
// Fourth export should succeed
let result = failing_exporter.export(&[]).await;
assert!(result.is_ok(), "Export should succeed after retries");
}
#[tokio::test]
async fn test_large_metric_payload_handling() {
// Arrange
let registry = MetricsRegistry::new(MetricsConfig::default());
// Create a large number of metrics
let mut counters = Vec::new();
for i in 0..10_000 {
let counter = registry.create_counter(
&format!("large_payload_counter_{}", i),
"Counter for large payload test",
Labels::new().with("id", &i.to_string())
).await;
counter.increment(Labels::new());
counters.push(counter);
}
// Act
let start_time = std::time::Instant::now();
let metric_families = registry.export_metrics().await;
let export_duration = start_time.elapsed();
// Assert
assert!(metric_families.is_ok());
let families = metric_families.unwrap();
assert_eq!(families.len(), 10_000);
assert!(export_duration.as_millis() < 1000); // Should handle large payloads efficiently
}
#[tokio::test]
async fn test_concurrent_modification_safety() {
// Arrange
let registry = Arc::new(MetricsRegistry::new(MetricsConfig::default()));
let counter = Arc::new(
registry.create_counter(
"concurrent_safety_counter",
"Counter for concurrency safety test",
Labels::new()
).await
);
// Act - Concurrent modifications while exporting
let mut tasks = Vec::new();
// Task continuously incrementing counter
let counter_clone = counter.clone();
tasks.push(tokio::spawn(async move {
for _ in 0..1000 {
counter_clone.increment(Labels::new());
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
}
}));
// Task continuously exporting metrics
let registry_clone = registry.clone();
tasks.push(tokio::spawn(async move {
for _ in 0..100 {
let _ = registry_clone.export_metrics().await;
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}));
for task in tasks {
task.await.unwrap();
}
// Assert - No panics or data corruption
let final_value = counter.get();
assert_eq!(final_value, 1000);
let final_export = registry.export_metrics().await.unwrap();
assert!(!final_export.is_empty());
}
#[tokio::test]
async fn test_memory_pressure_handling() {
// Arrange
let config = ObservabilityConfig::builder()
.with_service_name("memory-pressure-test")
.build()
.unwrap();
let observability = ObservabilityProvider::from_config(config).await.unwrap();
let tracer = observability.tracing().await;
// Act - Create many spans to test memory handling
let mut spans = Vec::new();
for i in 0..100_000 {
let mut span = tracer.start_span(&format!("memory_test_span_{}", i));
span.set_attribute("large_data", "x".repeat(1000)); // 1KB per span
spans.push(span);
// Periodically end spans to simulate real usage
if i % 1000 == 0 {
for _ in 0..100 {
if let Some(mut span) = spans.pop() {
span.end();
}
}
}
}
// End remaining spans
for mut span in spans {
span.end();
}
// Assert - Memory usage should be reasonable
// In a real implementation, we'd check memory metrics
// For now, we just ensure the test completes without OOM
assert!(true);
}
}Test Execution Strategy
Test Organization
// tests/lib.rs
mod unit_tests {
mod metrics_tests;
mod logging_tests;
mod tracing_tests;
mod configuration_tests;
}
mod integration_tests {
mod end_to_end_tests;
mod export_pipeline_tests;
mod instrumentation_tests;
}
mod performance_tests {
mod metrics_benchmarks;
mod logging_benchmarks;
mod tracing_benchmarks;
mod memory_usage_tests;
}
mod compatibility_tests {
mod opentelemetry_compliance;
mod prometheus_format_tests;
mod cross_platform_tests;
}Test Configuration
# Cargo.toml test configuration
[package]
name = "sindhan-observability"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
async-trait = "0.1"
opentelemetry = "0.20"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
tokio-test = "0.4"
proptest = "1.0"
tempfile = "3.0"
[[bench]]
name = "observability_benchmarks"
harness = false
[features]
default = []
test-utils = []Continuous Integration Configuration
# .github/workflows/observability-test.yml
name: Observability Test Suite
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
rust: [stable, beta]
steps:
- uses: actions/checkout@v4
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true
components: rustfmt, clippy
- name: Run unit tests
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --all-features --verbose
- name: Run integration tests
uses: actions-rs/cargo@v1
with:
command: test
args: --test '*' --all-features
- name: Run benchmarks
uses: actions-rs/cargo@v1
with:
command: bench
args: --all-features
- name: Memory leak detection
run: |
cargo test --release --features test-utils -- --nocapture
# Additional memory profiling would go here
performance:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Run performance benchmarks
run: |
cargo bench --all-features -- --output-format json | tee benchmark_results.json
- name: Performance regression detection
run: |
# Compare with baseline performance metrics
# Fail if performance degrades beyond thresholdTest Coverage Requirements
Coverage Targets
- Unit Tests: 95%+ line coverage for core observability components
- Integration Tests: 90%+ feature coverage for end-to-end scenarios
- Performance Tests: All critical paths benchmarked with SLA requirements
- Compatibility Tests: 100% OpenTelemetry and Prometheus compliance
Test Quality Metrics
- Test Reliability: 99.9% pass rate across all environments
- Test Performance: Full test suite under 10 minutes
- Test Maintainability: Automated test generation where possible
- Test Documentation: All test purposes and scenarios documented
Test Execution Commands
# Run all tests
cargo test
# Run specific test categories
cargo test unit_tests
cargo test integration_tests
cargo test performance_tests
# Run tests with coverage
cargo tarpaulin --all-features --timeout 300
# Run benchmarks
cargo bench
# Run tests in release mode for performance testing
cargo test --release --features test-utils
# Run memory leak detection tests
cargo test --release -- --nocapture --test-threads=1
# Run compatibility tests
cargo test compatibility_tests --features opentelemetry-compat
# Performance regression testing
cargo bench --save-baseline main
cargo bench --baseline mainThis comprehensive test plan ensures systematic validation of all Platform Observability functionalities with a focus on performance, correctness, and production readiness. The TDD approach guarantees high code quality while the performance benchmarks ensure the observability system meets stringent performance requirements for high-throughput scenarios.