Skip to content

docs(batch): clarify ordering/async processing #4081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 9 additions & 49 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,15 @@ journey
Records expired: 1: Failure
```

This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:

<!-- markdownlint-disable MD013 -->
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.

<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
<!-- markdownlint-disable MD013 -->
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
We recommend implementing processing logic in an [idempotent manner](./idempotency.md){target="_blank"} whenever possible.

You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.

Expand All @@ -72,7 +71,7 @@ Install the library in your project
npm i @aws-lambda-powertools/batch
```

For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.
For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, so that the response from the Batch Processing utility can inform the service which records failed to be processed.

Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.

Expand Down Expand Up @@ -108,8 +107,8 @@ Processing batches from SQS works in three stages:
2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion
3. Use **`processPartialResponse`** to kick off processing

???+ info
This code example optionally uses Logger for completion.
!!! note
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-async-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).

=== "index.ts"

Expand Down Expand Up @@ -147,30 +146,18 @@ By default, we will stop processing at the first failure and mark unprocessed me

Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.

=== "Recommended"
=== "index.ts"

```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Async processing"

```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts"
```

=== "Enabling skipGroupOnError flag"
=== "with `skipGroupOnError`"

```typescript hl_lines="1-4 13 30"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
```

!!! Note
Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`.
If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`.

### Processing messages from Kinesis

Processing batches from Kinesis works in three stages:
Expand All @@ -179,9 +166,6 @@ Processing batches from Kinesis works in three stages:
2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion
3. Use **`processPartialResponse`** to kick off processing

???+ info
This code example optionally uses Logger for completion.

=== "index.ts"

```typescript hl_lines="1-5 9 12 19-21"
Expand Down Expand Up @@ -407,32 +391,6 @@ sequenceDiagram
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
</center>

### Async or sync processing

There are two processors you can use with this utility:

* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously
* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously

In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously.
There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another.
For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions.

!!! info "Note that you need match your processing function with the right batch processor"
*If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`

The difference between the two processors is in how they handle record processing:

* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order.
* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one.

???+ question "When is this useful?"

For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.

The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).

## Advanced

### Accessing processed messages
Expand Down Expand Up @@ -492,6 +450,8 @@ By default, the `BatchProcessor` processes records in parallel using `Promise.al

!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel."

When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.

```typescript hl_lines="8 17" title="Sequential async processing"
--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts"
```
Expand Down
10 changes: 5 additions & 5 deletions examples/snippets/batch/gettingStartedSQSFifo.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import {
SqsFifoPartialProcessor,
processPartialResponseSync,
SqsFifoPartialProcessorAsync,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new SqsFifoPartialProcessor(); // (1)!
const processor = new SqsFifoPartialProcessorAsync();
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
Expand All @@ -17,6 +17,6 @@ const recordHandler = (record: SQSRecord): void => {
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponseSync(event, recordHandler, processor, {
processPartialResponse(event, recordHandler, processor, {
context,
});
22 changes: 0 additions & 22 deletions examples/snippets/batch/gettingStartedSQSFifoAsync.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
SqsFifoPartialProcessor,
processPartialResponseSync,
SqsFifoPartialProcessorAsync,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
Expand All @@ -10,7 +10,7 @@ import type {
SQSRecord,
} from 'aws-lambda';

const processor = new SqsFifoPartialProcessor();
const processor = new SqsFifoPartialProcessorAsync();
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
Expand All @@ -25,7 +25,7 @@ export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponseSync(event, recordHandler, processor, {
return processPartialResponse(event, recordHandler, processor, {
context,
skipGroupOnError: true,
});
Expand Down
1 change: 1 addition & 0 deletions packages/batch/src/BatchProcessorSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* ```
*
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
* @deprecated Use {@link BasePartialBatchProcessor} instead, this class is deprecated and will be removed in the next major version.
*/
class BatchProcessorSync extends BasePartialBatchProcessor {
/**
Expand Down
2 changes: 2 additions & 0 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
} from './types.js';

/**
* @deprecated Use {@link processPartialResponse} instead, this function is deprecated and will be removed in the next major version.
*
* Higher level function to process a batch of records synchronously
* and handle partial failure cases.
*
Expand Down