go_zoom_kinesis/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//! Go Zoom Kinesis - A robust AWS Kinesis stream processor
//!
//! This library provides a production-ready implementation for processing AWS Kinesis streams
//! with features like:
//!
//! - Automatic checkpointing with pluggable storage backends
//! - Configurable retry logic with exponential backoff
//! - Concurrent shard processing with rate limiting
//! - Comprehensive monitoring and metrics
//! - Graceful shutdown handling
//!
//! # Basic Usage
//!
//! ```rust,no_run
//! use go_zoom_kinesis::{
//!     KinesisProcessor, ProcessorConfig, RecordProcessor,
//!     processor::RecordMetadata, processor::InitialPosition,
//!     store::InMemoryCheckpointStore,
//!     monitoring::MonitoringConfig,
//!     error::{ProcessorError, ProcessingError},
//! };
//! use aws_sdk_kinesis::{Client, types::Record};
//! use std::time::Duration;
//! use async_trait::async_trait;
//!
//! #[derive(Clone)]
//! struct MyProcessor;
//!
//! #[async_trait]
//! impl RecordProcessor for MyProcessor {
//!     type Item = ();
//!
//!     async fn process_record<'a>(
//!         &self,
//!         record: &'a Record,
//!         metadata: RecordMetadata<'a>,
//!     ) -> Result<Option<Self::Item>, ProcessingError> {
//!         println!("Processing record: {:?}", record);
//!         Ok(None)
//!     }
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), ProcessorError> {
//!     let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
//!     let client = Client::new(&config);
//!
//!     let config = ProcessorConfig {
//!         stream_name: "my-stream".to_string(),
//!         batch_size: 100,
//!         api_timeout: Duration::from_secs(30),
//!         processing_timeout: Duration::from_secs(300),
//!         max_retries: Some(3),
//!         shard_refresh_interval: Duration::from_secs(60),
//!         initial_position: InitialPosition::TrimHorizon,
//!         prefer_stored_checkpoint: true,
//!         monitoring: MonitoringConfig {
//!             enabled: true,
//!             ..Default::default()
//!         },
//!         ..Default::default()
//!     };
//!
//!     let processor = MyProcessor;
//!     let store = InMemoryCheckpointStore::new();
//!
//!     let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
//!     let (processor, _monitoring_rx) = KinesisProcessor::new(
//!         config,
//!         processor,
//!         client,
//!         store,
//!     );
//!
//!     processor.run(shutdown_rx).await
//! }
//! ```
//!
//! # Stream Position Configuration
//!
//! ```rust,no_run
//! use go_zoom_kinesis::{RecordProcessor, error::ProcessingError};
//! use go_zoom_kinesis::processor::RecordMetadata;
//! use aws_sdk_kinesis::types::Record;
//! use async_trait::async_trait;
//! use anyhow::Result;
//!
//! struct MyProcessor;
//!
//! #[async_trait]
//! impl RecordProcessor for MyProcessor {
//!     // Define the associated type - in this case we don't produce any items
//!     type Item = ();
//!
//!     async fn process_record<'a>(
//!         &self,
//!         record: &'a Record,
//!         metadata: RecordMetadata<'a>,
//!    ) -> Result<Option<Self::Item>, ProcessingError> {
//!        match process_data(record).await {
//!             Ok(_) => Ok(None),  // No item produced
//!             Err(e) => {
//!                 // Custom error handling with metadata context
//!                 tracing::error!(
//!                     error = %e,
//!                     shard_id = %metadata.shard_id(),
//!                     attempt = %metadata.attempt_number(),
//!                     "Failed to process record"
//!                 );
//!                 Err(ProcessingError::soft(e)) // Will be retried
//!             }
//!         }
//!     }
//! }
//!
//! // Example processing function
//! async fn process_data(_record: &Record) -> Result<()> {
//!    Ok(())
//! }
//! ```
//!
//! # DynamoDB Checkpoint Store
//!
//! ```rust,no_run
//! use go_zoom_kinesis::store::DynamoDbCheckpointStore;
//!
//!
//! async fn example() -> anyhow::Result<()> {
//!     let config = aws_config::load_from_env().await;
//!     let dynamo_client = aws_sdk_dynamodb::Client::new(&config);
//!     let checkpoint_store = DynamoDbCheckpointStore::new(
//!         dynamo_client,
//!         "checkpoints-table".to_string(),
//!         "my-app-".to_string(),
//!     );
//!     Ok(())
//! }
//! ```
pub mod error;
pub mod processor;
pub mod retry;
pub mod store;

pub mod client;

// Make test utilities available for integration tests
pub mod monitoring;
#[cfg(test)]
pub mod test;
#[cfg(test)]
mod tests;

pub use error::{ProcessorError, Result};
pub use processor::{KinesisProcessor, ProcessorConfig};
pub use retry::{Backoff, ExponentialBackoff};

// Re-export main traits
pub use crate::processor::RecordProcessor;
pub use crate::store::CheckpointStore;

// Re-export implementations

pub use crate::store::memory::InMemoryCheckpointStore;

pub use crate::store::dynamodb::DynamoDbCheckpointStore;