pub trait RecordProcessor: Send + Sync {
type Item: Send + Clone + Sync + 'static;
// Required method
fn process_record<'a, 'life0, 'async_trait>(
&'life0 self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Item>, ProcessingError>> + Send + 'async_trait>>
where Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait;
// Provided method
fn before_checkpoint<'life0, 'life1, 'async_trait>(
&'life0 self,
_processed_items: Vec<Self::Item>,
_metadata: CheckpointMetadata<'life1>,
) -> Pin<Box<dyn Future<Output = Result<(), BeforeCheckpointError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
}
Expand description
Trait for implementing record processing logic
Implementors should handle the business logic for processing individual records and return processed data through the associated Item type. Additional context about the record and its processing state is provided through the metadata parameter.
§Examples
use go_zoom_kinesis::{RecordProcessor};
use go_zoom_kinesis::processor::{RecordMetadata, CheckpointMetadata};
use go_zoom_kinesis::error::{ProcessingError, BeforeCheckpointError};
use aws_sdk_kinesis::types::Record;
use tracing::warn;
// Example data processing function
async fn process_data(data: &[u8]) -> anyhow::Result<String> {
// Simulate some data processing
Ok(String::from_utf8_lossy(data).to_string())
}
#[derive(Clone)]
struct MyProcessor;
#[async_trait::async_trait]
impl RecordProcessor for MyProcessor {
type Item = String;
async fn process_record<'a>(
&self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> std::result::Result<Option<Self::Item>, ProcessingError> {
// Process record data
let data = record.data().as_ref();
match process_data(data).await {
Ok(processed) => Ok(Some(processed)),
Err(e) => {
// Use metadata for detailed error context
warn!(
shard_id = %metadata.shard_id(),
sequence = %metadata.sequence_number(),
error = %e,
"Processing failed"
);
Err(ProcessingError::soft(e)) // Will be retried forever
}
}
}
async fn before_checkpoint(
&self,
processed_items: Vec<Self::Item>,
metadata: CheckpointMetadata<'_>,
) -> std::result::Result<(), BeforeCheckpointError> {
// Optional validation before checkpointing
if processed_items.is_empty() {
return Err(BeforeCheckpointError::soft(
anyhow::anyhow!("No items to checkpoint")
)); // Will retry before_checkpoint
}
Ok(())
}
}
§Type Parameters
Item
- The type of data produced by processing records
§Record Processing
The process_record
method returns:
Ok(Some(item))
- Processing succeeded and produced an itemOk(None)
- Processing succeeded but produced no itemErr(ProcessingError::SoftFailure)
- Temporary failure, will retry foreverErr(ProcessingError::HardFailure)
- Permanent failure, skip record
§Checkpoint Validation
The before_checkpoint
method allows validation before checkpointing and returns:
Ok(())
- Proceed with checkpointErr(BeforeCheckpointError::SoftError)
- Retry before_checkpointErr(BeforeCheckpointError::HardError)
- Stop trying before_checkpoint but proceed with checkpoint
§Metadata Access
The RecordMetadata
parameter provides:
shard_id()
- ID of the shard this record came fromsequence_number()
- Sequence number of the recordapproximate_arrival_timestamp()
- When the record arrived in Kinesispartition_key()
- Partition key used for the recordexplicit_hash_key()
- Optional explicit hash key
The CheckpointMetadata
parameter provides:
shard_id
- ID of the shard being checkpointedsequence_number
- Sequence number being checkpointed
Required Associated Types§
Required Methods§
sourcefn process_record<'a, 'life0, 'async_trait>(
&'life0 self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Item>, ProcessingError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
fn process_record<'a, 'life0, 'async_trait>(
&'life0 self,
record: &'a Record,
metadata: RecordMetadata<'a>,
) -> Pin<Box<dyn Future<Output = Result<Option<Self::Item>, ProcessingError>> + Send + 'async_trait>>where
Self: 'async_trait,
'a: 'async_trait,
'life0: 'async_trait,
Process a single record from the Kinesis stream
§Arguments
record
- The Kinesis record to processmetadata
- Additional context about the record and processing attempt
§Returns
Ok(Some(item))
if processing succeeded and produced an itemOk(None)
if processing succeeded but produced no itemErr(ProcessingError::SoftFailure)
for retriable errors (retries forever)Err(ProcessingError::HardFailure)
for permanent failures (skips record)
Provided Methods§
sourcefn before_checkpoint<'life0, 'life1, 'async_trait>(
&'life0 self,
_processed_items: Vec<Self::Item>,
_metadata: CheckpointMetadata<'life1>,
) -> Pin<Box<dyn Future<Output = Result<(), BeforeCheckpointError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn before_checkpoint<'life0, 'life1, 'async_trait>(
&'life0 self,
_processed_items: Vec<Self::Item>,
_metadata: CheckpointMetadata<'life1>,
) -> Pin<Box<dyn Future<Output = Result<(), BeforeCheckpointError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Validate processed items before checkpointing
§Arguments
processed_items
- Successfully processed items from the batchmetadata
- Information about the checkpoint operation
§Returns
Ok(())
to proceed with checkpointErr(BeforeCheckpointError::SoftError)
to retry before_checkpointErr(BeforeCheckpointError::HardError)
to stop trying before_checkpoint