use crate::monitoring::metrics::BatchMetrics;
#[allow(unused_imports)]
use std::collections::HashSet;
#[allow(unused_imports)]
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[allow(unused_imports)]
use tokio::sync::{mpsc, Mutex};
#[allow(unused_imports)]
use tracing::debug;
#[derive(Debug, Clone)]
pub struct MonitoringConfig {
pub enabled: bool,
pub channel_size: usize,
pub metrics_interval: Duration,
pub include_retry_details: bool,
pub rate_limit: Option<u32>,
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enabled: false,
channel_size: 1000,
metrics_interval: Duration::from_secs(60),
include_retry_details: false,
rate_limit: Some(1000),
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessingEvent {
pub timestamp: SystemTime,
pub shard_id: String,
pub event_type: ProcessingEventType,
}
#[derive(Debug, Clone)]
pub enum ProcessingEventType {
RecordAttempt {
sequence_number: String,
success: bool,
attempt_number: u32,
duration: Duration,
error: Option<String>,
is_final_attempt: bool,
},
BatchStart {
timestamp: SystemTime,
},
BatchMetrics {
metrics: BatchMetrics,
},
BatchError {
error: String,
duration: Duration,
},
BatchComplete {
successful_count: usize,
failed_count: usize,
duration: Duration,
},
RecordSuccess {
sequence_number: String,
checkpoint_success: bool,
},
RecordFailure {
sequence_number: String,
error: String,
},
CheckpointFailure {
sequence_number: String,
error: String,
},
ShardEvent {
event_type: ShardEventType,
details: Option<String>,
},
Iterator {
event_type: IteratorEventType,
error: Option<String>,
},
Checkpoint {
sequence_number: String,
success: bool,
error: Option<String>,
},
}
#[derive(Debug, Clone)]
pub enum ShardEventType {
Started,
Completed,
Error,
Interrupted,
}
#[derive(Debug, Clone)]
pub enum IteratorEventType {
Expired,
Renewed,
Failed,
}
impl ProcessingEvent {
pub fn batch_complete(
shard_id: String,
successful_count: usize,
failed_count: usize,
duration: Duration,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::BatchComplete {
successful_count,
failed_count,
duration,
},
}
}
pub fn batch_start(shard_id: String) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::BatchStart {
timestamp: SystemTime::now(),
},
}
}
pub fn batch_metrics(shard_id: String, metrics: BatchMetrics) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::BatchMetrics { metrics },
}
}
pub fn batch_error(shard_id: String, error: String, duration: Duration) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::BatchError { error, duration },
}
}
pub fn record_success(
shard_id: String,
sequence_number: String,
checkpoint_success: bool,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::RecordSuccess {
sequence_number,
checkpoint_success,
},
}
}
pub fn record_failure(shard_id: String, sequence_number: String, error: String) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::RecordFailure {
sequence_number,
error,
},
}
}
pub fn checkpoint_failure(shard_id: String, sequence_number: String, error: String) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::CheckpointFailure {
sequence_number,
error,
},
}
}
pub fn record_attempt(
shard_id: String,
sequence_number: String,
success: bool,
attempt_number: u32,
duration: Duration,
error: Option<String>,
is_final_attempt: bool,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::RecordAttempt {
sequence_number,
success,
attempt_number,
duration,
error,
is_final_attempt,
},
}
}
pub fn shard_event(
shard_id: String,
event_type: ShardEventType,
details: Option<String>,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::ShardEvent {
event_type,
details,
},
}
}
pub fn checkpoint(
shard_id: String,
sequence_number: String,
success: bool,
error: Option<String>,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::Checkpoint {
sequence_number,
success,
error,
},
}
}
pub fn iterator(
shard_id: String,
event_type: IteratorEventType,
error: Option<String>,
) -> Self {
Self {
timestamp: SystemTime::now(),
shard_id,
event_type: ProcessingEventType::Iterator { event_type, error },
}
}
}
#[derive(Debug)]
pub struct TestMonitoringHarness {
pub monitoring_rx: mpsc::Receiver<ProcessingEvent>,
events_seen: Arc<Mutex<HashSet<String>>>,
event_history: Arc<Mutex<Vec<ProcessingEvent>>>,
}
impl TestMonitoringHarness {
pub fn new(monitoring_rx: mpsc::Receiver<ProcessingEvent>) -> Self {
Self {
monitoring_rx,
events_seen: Arc::new(Mutex::new(HashSet::new())),
event_history: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn wait_for_events(&mut self, expected_events: &[&str]) -> anyhow::Result<()> {
let timeout = Duration::from_secs(5);
let start = std::time::Instant::now();
while let Some(event) = self.monitoring_rx.recv().await {
if start.elapsed() > timeout {
let events = self.events_seen.lock().await;
return Err(anyhow::anyhow!(
"Timeout waiting for events. Expected: {:?}, Seen: {:?}",
expected_events,
events
));
}
self.process_event(&event).await?;
let events = self.events_seen.lock().await;
if expected_events.iter().all(|e| events.contains(*e)) {
debug!("All expected events seen: {:?}", expected_events);
return Ok(());
}
}
let events = self.events_seen.lock().await;
Err(anyhow::anyhow!(
"Channel closed before seeing all events. Expected: {:?}, Seen: {:?}",
expected_events,
events
))
}
async fn process_event(&self, event: &ProcessingEvent) -> anyhow::Result<()> {
let mut events = self.events_seen.lock().await;
let mut history = self.event_history.lock().await;
history.push(event.clone());
match &event.event_type {
ProcessingEventType::Iterator { event_type, error } => match event_type {
IteratorEventType::Expired => {
events.insert("iterator_expired".to_string());
if let Some(err) = error {
events.insert(format!("iterator_error_{}", err));
}
}
IteratorEventType::Renewed => {
events.insert("iterator_renewed".to_string());
}
IteratorEventType::Failed => {
events.insert("iterator_failed".to_string());
if let Some(err) = error {
events.insert(format!("iterator_failure_{}", err));
}
}
},
ProcessingEventType::RecordAttempt {
sequence_number,
success,
attempt_number,
error,
is_final_attempt,
..
} => {
let status = if *success { "success" } else { "failure" };
events.insert(format!("record_attempt_{}_{}", sequence_number, status));
events.insert(format!(
"record_attempt_{}_try_{}",
sequence_number, attempt_number
));
if *is_final_attempt {
events.insert(format!("record_final_attempt_{}", sequence_number));
}
if let Some(err) = error {
events.insert(format!("record_error_{}_{}", sequence_number, err));
}
}
ProcessingEventType::RecordSuccess {
sequence_number,
checkpoint_success,
} => {
events.insert(format!("record_success_{}", sequence_number));
if *checkpoint_success {
events.insert(format!("checkpoint_success_{}", sequence_number));
}
}
ProcessingEventType::RecordFailure {
sequence_number,
error,
} => {
events.insert(format!("record_failure_{}", sequence_number));
events.insert(format!(
"record_failure_{}_error_{}",
sequence_number, error
));
}
ProcessingEventType::BatchComplete {
successful_count,
failed_count,
..
} => {
events.insert(format!(
"batch_complete_{}_{}",
successful_count, failed_count
));
}
ProcessingEventType::ShardEvent {
event_type,
details,
} => match event_type {
ShardEventType::Started => {
events.insert("shard_started".to_string());
}
ShardEventType::Completed => {
events.insert("shard_completed".to_string());
}
ShardEventType::Error => {
events.insert("shard_error".to_string());
if let Some(detail) = details {
events.insert(format!("shard_error_{}", detail));
}
}
ShardEventType::Interrupted => {
events.insert("shard_interrupted".to_string());
if let Some(detail) = details {
events.insert(format!("shard_interrupted_{}", detail));
}
}
},
ProcessingEventType::Checkpoint {
sequence_number,
success,
error,
} => {
let status = if *success { "success" } else { "failure" };
events.insert(format!("checkpoint_{}_{}", sequence_number, status));
if let Some(err) = error {
events.insert(format!("checkpoint_error_{}_{}", sequence_number, err));
}
}
ProcessingEventType::CheckpointFailure {
sequence_number,
error,
} => {
events.insert(format!("checkpoint_failure_{}", sequence_number));
events.insert(format!(
"checkpoint_failure_{}_error_{}",
sequence_number, error
));
}
ProcessingEventType::BatchStart { timestamp } => {
events.insert(format!("batch_start_{:?}", timestamp));
}
ProcessingEventType::BatchMetrics { metrics } => {
events.insert(format!(
"batch_metrics_success_{}",
metrics.successful_count
));
events.insert(format!("batch_metrics_failed_{}", metrics.failed_count));
}
ProcessingEventType::BatchError { error, duration } => {
events.insert(format!("batch_error_{}", error));
events.insert(format!("batch_error_duration_{:?}", duration));
}
}
Ok(())
}
pub async fn get_events_seen(&self) -> HashSet<String> {
self.events_seen.lock().await.clone()
}
pub async fn get_event_history(&self) -> Vec<ProcessingEvent> {
self.event_history.lock().await.clone()
}
pub async fn has_seen_event(&self, event: &str) -> bool {
self.events_seen.lock().await.contains(event)
}
pub async fn get_event_count(&self, event_prefix: &str) -> usize {
self.events_seen
.lock()
.await
.iter()
.filter(|e| e.starts_with(event_prefix))
.count()
}
pub async fn clear(&self) {
self.events_seen.lock().await.clear();
self.event_history.lock().await.clear();
}
pub async fn dump_history(&self) {
let history = self.event_history.lock().await;
debug!("=== Event History ===");
for (i, event) in history.iter().enumerate() {
debug!("[{}] {:?}", i, event);
}
debug!("=== End History ===");
}
}