Crate go_zoom_kinesis

source
Expand description

Go Zoom Kinesis - A robust AWS Kinesis stream processor

This library provides a production-ready implementation for processing AWS Kinesis streams with features like:

  • Automatic checkpointing with pluggable storage backends
  • Configurable retry logic with exponential backoff
  • Concurrent shard processing with rate limiting
  • Comprehensive monitoring and metrics
  • Graceful shutdown handling

§Basic Usage

use go_zoom_kinesis::{
    KinesisProcessor, ProcessorConfig, RecordProcessor,
    processor::RecordMetadata, processor::InitialPosition,
    store::InMemoryCheckpointStore,
    monitoring::MonitoringConfig,
    error::{ProcessorError, ProcessingError},
};
use aws_sdk_kinesis::{Client, types::Record};
use std::time::Duration;
use async_trait::async_trait;

#[derive(Clone)]
struct MyProcessor;

#[async_trait]
impl RecordProcessor for MyProcessor {
    type Item = ();

    async fn process_record<'a>(
        &self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
    ) -> Result<Option<Self::Item>, ProcessingError> {
        println!("Processing record: {:?}", record);
        Ok(None)
    }
}

#[tokio::main]
async fn main() -> Result<(), ProcessorError> {
    let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
    let client = Client::new(&config);

    let config = ProcessorConfig {
        stream_name: "my-stream".to_string(),
        batch_size: 100,
        api_timeout: Duration::from_secs(30),
        processing_timeout: Duration::from_secs(300),
        max_retries: Some(3),
        shard_refresh_interval: Duration::from_secs(60),
        initial_position: InitialPosition::TrimHorizon,
        prefer_stored_checkpoint: true,
        monitoring: MonitoringConfig {
            enabled: true,
            ..Default::default()
        },
        ..Default::default()
    };

    let processor = MyProcessor;
    let store = InMemoryCheckpointStore::new();

    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
    let (processor, _monitoring_rx) = KinesisProcessor::new(
        config,
        processor,
        client,
        store,
    );

    processor.run(shutdown_rx).await
}

§Stream Position Configuration

use go_zoom_kinesis::{RecordProcessor, error::ProcessingError};
use go_zoom_kinesis::processor::RecordMetadata;
use aws_sdk_kinesis::types::Record;
use async_trait::async_trait;
use anyhow::Result;

struct MyProcessor;

#[async_trait]
impl RecordProcessor for MyProcessor {
    // Define the associated type - in this case we don't produce any items
    type Item = ();

    async fn process_record<'a>(
        &self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
   ) -> Result<Option<Self::Item>, ProcessingError> {
       match process_data(record).await {
            Ok(_) => Ok(None),  // No item produced
            Err(e) => {
                // Custom error handling with metadata context
                tracing::error!(
                    error = %e,
                    shard_id = %metadata.shard_id(),
                    attempt = %metadata.attempt_number(),
                    "Failed to process record"
                );
                Err(ProcessingError::soft(e)) // Will be retried
            }
        }
    }
}

// Example processing function
async fn process_data(_record: &Record) -> Result<()> {
   Ok(())
}

§DynamoDB Checkpoint Store

use go_zoom_kinesis::store::DynamoDbCheckpointStore;


async fn example() -> anyhow::Result<()> {
    let config = aws_config::load_from_env().await;
    let dynamo_client = aws_sdk_dynamodb::Client::new(&config);
    let checkpoint_store = DynamoDbCheckpointStore::new(
        dynamo_client,
        "checkpoints-table".to_string(),
        "my-app-".to_string(),
    );
    Ok(())
}

Re-exports§

Modules§

  • Error types for the Kinesis processor
  • Monitoring system for tracking Kinesis processor performance and health
  • Core processor implementation for handling Kinesis streams
  • Retry and backoff functionality for the Kinesis processor
  • Checkpoint storage implementations for the Kinesis processor