Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .doc_gen/metadata/serverless_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -852,3 +852,47 @@ serverless_MSK_Lambda:
services:
lambda:
kafka:
serverless_DynamoDB_Lambda_batch_processor:
title: Process &DDB; Stream records with &LAM; Powertools Batch Processor
title_abbrev: Process &DDB; Stream records with &LAM; Powertools Batch Processor
synopsis: implement a Lambda function that processes DynamoDB Stream records using the AWS Lambda Powertools Batch Processor utility to handle partial failures gracefully and prevent Lambda from retrying the entire batch when only some records fail.
category: Serverless examples
languages:
JavaScript:
versions:
- sdk_version: 3
github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb
excerpts:
- description: Processing &DDB; Stream records with Powertools Batch Processor using JavaScript.
snippet_files:
- tools-powertools-batch-processor-ddb/example.js
- description: Processing &DDB; Stream records with Powertools Batch Processor using TypeScript.
snippet_files:
- tools-powertools-batch-processor-ddb/example.ts
Python:
versions:
- sdk_version: 3
github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb
excerpts:
- description: Processing &DDB; Stream records with Powertools Batch Processor using Python.
snippet_files:
- tools-powertools-batch-processor-ddb/example.py
Java:
versions:
- sdk_version: 2
github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb
excerpts:
- description: Processing &DDB; Stream records with Powertools Batch Processor using Java.
snippet_files:
- tools-powertools-batch-processor-ddb/example.java
.NET:
versions:
- sdk_version: 3
github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb
excerpts:
- description: Processing &DDB; Stream records with Powertools Batch Processor using .NET.
snippet_files:
- tools-powertools-batch-processor-ddb/Function.cs
services:
lambda:
dynamodb:
28 changes: 28 additions & 0 deletions tools-powertools-batch-processor-ddb/Function.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
public class Customer
{
public string? CustomerId { get; set; }
public string? Name { get; set; }
public string? Email { get; set; }
public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer>
{
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
{
Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");

if (string.IsNullOrEmpty(customer.Email))
{
throw new ArgumentException("Customer email is required");
}

return await Task.FromResult(RecordHandlerResult.None);
}
}

[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
{
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
26 changes: 26 additions & 0 deletions tools-powertools-batch-processor-ddb/example.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

public DynamoDBStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withDynamoDbBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}

@Override
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
return handler.processBatch(ddbEvent, context);
}

private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
// Process the change record
}
}
25 changes: 25 additions & 0 deletions tools-powertools-batch-processor-ddb/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from "@aws-lambda-powertools/batch";
import { Logger } from "@aws-lambda-powertools/logger";

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record) => {
if (record.dynamodb?.NewImage) {
logger.info("Processing record", { record: record.dynamodb.NewImage });
const message = record.dynamodb.NewImage.Message.S;
if (message) {
const payload = JSON.parse(message);
logger.info("Processed item", { item: payload });
}
}
};

export const handler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
34 changes: 34 additions & 0 deletions tools-powertools-batch-processor-ddb/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
if record.dynamodb and record.dynamodb.new_image:
logger.info(record.dynamodb.new_image)
message = record.dynamodb.new_image.get("Message")
if message:
payload: dict = json.loads(message)
logger.info(payload)


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event, record_handler=record_handler, processor=processor, context=context
)
26 changes: 26 additions & 0 deletions tools-powertools-batch-processor-ddb/example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from "@aws-lambda-powertools/batch";
import { Logger } from "@aws-lambda-powertools/logger";
import type { DynamoDBRecord, DynamoDBStreamHandler } from "aws-lambda";

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: DynamoDBRecord): Promise<void> => {
if (record.dynamodb?.NewImage) {
logger.info("Processing record", { record: record.dynamodb.NewImage });
const message = record.dynamodb.NewImage.Message.S;
if (message) {
const payload = JSON.parse(message);
logger.info("Processed item", { item: payload });
}
}
};

export const handler: DynamoDBStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});