go_zoom_kinesis::processor

Trait RecordProcessor

source
pub trait RecordProcessor: Send + Sync {
    type Item: Send + Clone + Sync + 'static;

    // Required method
    fn process_record<'a, 'life0, 'async_trait>(
        &'life0 self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Self::Item>, ProcessingError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'a: 'async_trait,
             'life0: 'async_trait;

    // Provided method
    fn before_checkpoint<'life0, 'life1, 'async_trait>(
        &'life0 self,
        _processed_items: Vec<Self::Item>,
        _metadata: CheckpointMetadata<'life1>,
    ) -> Pin<Box<dyn Future<Output = Result<(), BeforeCheckpointError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait { ... }
}
Expand description

Trait for implementing record processing logic

Implementors should handle the business logic for processing individual records and return processed data through the associated Item type. Additional context about the record and its processing state is provided through the metadata parameter.

§Examples

use go_zoom_kinesis::{RecordProcessor};
use go_zoom_kinesis::processor::{RecordMetadata, CheckpointMetadata};
use go_zoom_kinesis::error::{ProcessingError, BeforeCheckpointError};
use aws_sdk_kinesis::types::Record;
use tracing::warn;

// Example data processing function
async fn process_data(data: &[u8]) -> anyhow::Result<String> {
    // Simulate some data processing
    Ok(String::from_utf8_lossy(data).to_string())
}

#[derive(Clone)]
struct MyProcessor;

#[async_trait::async_trait]
impl RecordProcessor for MyProcessor {
    type Item = String;

    async fn process_record<'a>(
        &self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
    ) -> std::result::Result<Option<Self::Item>, ProcessingError> {
        // Process record data
        let data = record.data().as_ref();

        match process_data(data).await {
            Ok(processed) => Ok(Some(processed)),
            Err(e) => {
                // Use metadata for detailed error context
                warn!(
                    shard_id = %metadata.shard_id(),
                    sequence = %metadata.sequence_number(),
                    error = %e,
                    "Processing failed"
                );
                Err(ProcessingError::soft(e)) // Will be retried forever
            }
        }
    }

    async fn before_checkpoint(
        &self,
        processed_items: Vec<Self::Item>,
        metadata: CheckpointMetadata<'_>,
    ) -> std::result::Result<(), BeforeCheckpointError> {
        // Optional validation before checkpointing
        if processed_items.is_empty() {
            return Err(BeforeCheckpointError::soft(
                anyhow::anyhow!("No items to checkpoint")
            )); // Will retry before_checkpoint
        }
        Ok(())
    }
}

§Type Parameters

  • Item - The type of data produced by processing records

§Record Processing

The process_record method returns:

  • Ok(Some(item)) - Processing succeeded and produced an item
  • Ok(None) - Processing succeeded but produced no item
  • Err(ProcessingError::SoftFailure) - Temporary failure, will retry forever
  • Err(ProcessingError::HardFailure) - Permanent failure, skip record

§Checkpoint Validation

The before_checkpoint method allows validation before checkpointing and returns:

  • Ok(()) - Proceed with checkpoint
  • Err(BeforeCheckpointError::SoftError) - Retry before_checkpoint
  • Err(BeforeCheckpointError::HardError) - Stop trying before_checkpoint but proceed with checkpoint

§Metadata Access

The RecordMetadata parameter provides:

  • shard_id() - ID of the shard this record came from
  • sequence_number() - Sequence number of the record
  • approximate_arrival_timestamp() - When the record arrived in Kinesis
  • partition_key() - Partition key used for the record
  • explicit_hash_key() - Optional explicit hash key

The CheckpointMetadata parameter provides:

  • shard_id - ID of the shard being checkpointed
  • sequence_number - Sequence number being checkpointed

Required Associated Types§

source

type Item: Send + Clone + Sync + 'static

The type of data produced by processing records

Required Methods§

source

fn process_record<'a, 'life0, 'async_trait>( &'life0 self, record: &'a Record, metadata: RecordMetadata<'a>, ) -> Pin<Box<dyn Future<Output = Result<Option<Self::Item>, ProcessingError>> + Send + 'async_trait>>
where Self: 'async_trait, 'a: 'async_trait, 'life0: 'async_trait,

Process a single record from the Kinesis stream

§Arguments
  • record - The Kinesis record to process
  • metadata - Additional context about the record and processing attempt
§Returns
  • Ok(Some(item)) if processing succeeded and produced an item
  • Ok(None) if processing succeeded but produced no item
  • Err(ProcessingError::SoftFailure) for retriable errors (retries forever)
  • Err(ProcessingError::HardFailure) for permanent failures (skips record)

Provided Methods§

source

fn before_checkpoint<'life0, 'life1, 'async_trait>( &'life0 self, _processed_items: Vec<Self::Item>, _metadata: CheckpointMetadata<'life1>, ) -> Pin<Box<dyn Future<Output = Result<(), BeforeCheckpointError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Validate processed items before checkpointing

§Arguments
  • processed_items - Successfully processed items from the batch
  • metadata - Information about the checkpoint operation
§Returns
  • Ok(()) to proceed with checkpoint
  • Err(BeforeCheckpointError::SoftError) to retry before_checkpoint
  • Err(BeforeCheckpointError::HardError) to stop trying before_checkpoint

Implementors§