From 02e1fbed77254ecb4fd2a4e68d134228fb47e4ad Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 23 Jun 2025 12:24:30 +0100 Subject: [PATCH 1/3] chore: clarify parallel/ordering in sqs queues --- docs/features/batch.md | 58 +++---------------- .../snippets/batch/gettingStartedSQSFifo.ts | 10 ++-- .../batch/gettingStartedSQSFifoAsync.ts | 22 ------- .../gettingStartedSQSFifoSkipGroupOnError.ts | 8 +-- 4 files changed, 18 insertions(+), 80 deletions(-) delete mode 100644 examples/snippets/batch/gettingStartedSQSFifoAsync.ts diff --git a/docs/features/batch.md b/docs/features/batch.md index dd019c2bf8..1810f45265 100644 --- a/docs/features/batch.md +++ b/docs/features/batch.md @@ -49,16 +49,15 @@ journey Records expired: 1: Failure ``` -This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration: +This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration: - * [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. * [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint. ???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it" - We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible. + We recommend implementing processing logic in an [idempotent manner](./idempotency.md){target="_blank"} whenever possible. You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation. @@ -72,7 +71,7 @@ Install the library in your project npm i @aws-lambda-powertools/batch ``` -For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed. +For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, so that the response from the Batch Processing utility can inform the service which records failed to be processed. Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned. @@ -108,8 +107,8 @@ Processing batches from SQS works in three stages: 2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion 3. Use **`processPartialResponse`** to kick off processing -???+ info - This code example optionally uses Logger for completion. +!!! note + By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-async-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues). === "index.ts" @@ -147,30 +146,18 @@ By default, we will stop processing at the first failure and mark unprocessed me Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID. -=== "Recommended" +=== "index.ts" ```typescript hl_lines="1-4 8 20" --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" ``` - - 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) - -=== "Async processing" - - ```typescript hl_lines="1-4 8 20" - --8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts" - ``` -=== "Enabling skipGroupOnError flag" +=== "with `skipGroupOnError`" ```typescript hl_lines="1-4 13 30" --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" ``` -!!! Note - Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`. - If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`. - ### Processing messages from Kinesis Processing batches from Kinesis works in three stages: @@ -179,9 +166,6 @@ Processing batches from Kinesis works in three stages: 2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion 3. Use **`processPartialResponse`** to kick off processing -???+ info - This code example optionally uses Logger for completion. - === "index.ts" ```typescript hl_lines="1-5 9 12 19-21" @@ -407,32 +391,6 @@ sequenceDiagram Kinesis and DynamoDB streams mechanism with multiple batch item failures -### Async or sync processing - -There are two processors you can use with this utility: - -* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously -* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously - -In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously. -There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another. -For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions. - -!!! info "Note that you need match your processing function with the right batch processor" - *If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse` - * If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync` - -The difference between the two processors is in how they handle record processing: - -* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order. -* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one. - -???+ question "When is this useful?" - - For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently. - - The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). - ## Advanced ### Accessing processed messages @@ -492,6 +450,8 @@ By default, the `BatchProcessor` processes records in parallel using `Promise.al !!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel." +When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID. + ```typescript hl_lines="8 17" title="Sequential async processing" --8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts" ``` diff --git a/examples/snippets/batch/gettingStartedSQSFifo.ts b/examples/snippets/batch/gettingStartedSQSFifo.ts index b4b3cc5ddf..5920480cf6 100644 --- a/examples/snippets/batch/gettingStartedSQSFifo.ts +++ b/examples/snippets/batch/gettingStartedSQSFifo.ts @@ -1,14 +1,14 @@ import { - SqsFifoPartialProcessor, - processPartialResponseSync, + SqsFifoPartialProcessorAsync, + processPartialResponse, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { SQSHandler, SQSRecord } from 'aws-lambda'; -const processor = new SqsFifoPartialProcessor(); // (1)! +const processor = new SqsFifoPartialProcessorAsync(); const logger = new Logger(); -const recordHandler = (record: SQSRecord): void => { +const recordHandler = async (record: SQSRecord): Promise => { const payload = record.body; if (payload) { const item = JSON.parse(payload); @@ -17,6 +17,6 @@ const recordHandler = (record: SQSRecord): void => { }; export const handler: SQSHandler = async (event, context) => - processPartialResponseSync(event, recordHandler, processor, { + processPartialResponse(event, recordHandler, processor, { context, }); diff --git a/examples/snippets/batch/gettingStartedSQSFifoAsync.ts b/examples/snippets/batch/gettingStartedSQSFifoAsync.ts deleted file mode 100644 index 5920480cf6..0000000000 --- a/examples/snippets/batch/gettingStartedSQSFifoAsync.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { - SqsFifoPartialProcessorAsync, - processPartialResponse, -} from '@aws-lambda-powertools/batch'; -import { Logger } from '@aws-lambda-powertools/logger'; -import type { SQSHandler, SQSRecord } from 'aws-lambda'; - -const processor = new SqsFifoPartialProcessorAsync(); -const logger = new Logger(); - -const recordHandler = async (record: SQSRecord): Promise => { - const payload = record.body; - if (payload) { - const item = JSON.parse(payload); - logger.info('Processed item', { item }); - } -}; - -export const handler: SQSHandler = async (event, context) => - processPartialResponse(event, recordHandler, processor, { - context, - }); diff --git a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts index a4edc0c2d4..d772ef83e0 100644 --- a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts +++ b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts @@ -1,6 +1,6 @@ import { - SqsFifoPartialProcessor, - processPartialResponseSync, + SqsFifoPartialProcessorAsync, + processPartialResponse, } from '@aws-lambda-powertools/batch'; import { Logger } from '@aws-lambda-powertools/logger'; import type { @@ -10,7 +10,7 @@ import type { SQSRecord, } from 'aws-lambda'; -const processor = new SqsFifoPartialProcessor(); +const processor = new SqsFifoPartialProcessorAsync(); const logger = new Logger(); const recordHandler = (record: SQSRecord): void => { @@ -25,7 +25,7 @@ export const handler = async ( event: SQSEvent, context: Context ): Promise => { - return processPartialResponseSync(event, recordHandler, processor, { + return processPartialResponse(event, recordHandler, processor, { context, skipGroupOnError: true, }); From cbc6a06e6aa1aa74b9388e20f360f4a4b766731e Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 23 Jun 2025 12:27:08 +0100 Subject: [PATCH 2/3] chore: mark sync methods as deprecated --- packages/batch/src/BatchProcessorSync.ts | 1 + packages/batch/src/processPartialResponseSync.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/batch/src/BatchProcessorSync.ts b/packages/batch/src/BatchProcessorSync.ts index 2018355f88..8feb8f14fa 100644 --- a/packages/batch/src/BatchProcessorSync.ts +++ b/packages/batch/src/BatchProcessorSync.ts @@ -80,6 +80,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * ``` * * @param eventType The type of event to process (SQS, Kinesis, DynamoDB) + * @deprecated Use {@link BasePartialBatchProcessor} instead, this class is deprecated and will be removed in the next major version. */ class BatchProcessorSync extends BasePartialBatchProcessor { /** diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index d3216e57fd..39e1dce7c1 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -7,6 +7,8 @@ import type { } from './types.js'; /** + * @deprecated Use {@link processPartialResponse} instead, this function is deprecated and will be removed in the next major version. + * * Higher level function to process a batch of records synchronously * and handle partial failure cases. * From 43522d9b201940c83c875cdfcbb148451df81e07 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Mon, 23 Jun 2025 15:12:00 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Stefano Vozza --- docs/features/batch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/features/batch.md b/docs/features/batch.md index 1810f45265..f1dd9a5da8 100644 --- a/docs/features/batch.md +++ b/docs/features/batch.md @@ -49,7 +49,7 @@ journey Records expired: 1: Failure ``` -This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration: +This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration: * [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted. * [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.