Skip to content

docs(kafka): add preview docs page #4072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/features/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,12 @@ description: Features of Powertools for AWS Lambda

[:octicons-arrow-right-24: Read more](./validation.md)

- __Kafka__

---

Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions.

[:octicons-arrow-right-24: Read more](./kafka.md)

</div>
345 changes: 345 additions & 0 deletions docs/features/kafka.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Powertools for AWS Lambda (TypeScript) is built as a modular toolkit, so you can
| [JMESPath Functions](./features/jmespath.md) | Built-in JMESPath functions to easily deserialize common encoded JSON payloads in Lambda functions. |
| [Parser](./features/parser.md) | Utility to parse and validate AWS Lambda event payloads using Zod, a TypeScript-first schema declaration and validation library. |
| [Validation](./features/validation.md) | JSON Schema validation for events and responses, including JMESPath support to unwrap events before validation. |
| [Kafka](./features/kafka.md) | Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions. |

## Examples

Expand Down
39 changes: 39 additions & 0 deletions examples/snippets/kafka/advancedWorkingWithArkType.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { type } from 'arktype';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const OrderItemSchema = type({
productId: 'string',
quantity: 'number.integer >= 1',
price: 'number.integer',
});

const OrderSchema = type({
id: 'string',
customerId: 'string',
items: OrderItemSchema.array().moreThanLength(0),
createdAt: 'string.date',
totalAmount: 'number.integer >= 0',
});

const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer<unknown, typeof OrderSchema.infer>(
async (event, _context) => {
for (const {
value: { id, items },
} of event.records) {
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
53 changes: 53 additions & 0 deletions examples/snippets/kafka/advancedWorkingWithIdempotency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {
IdempotencyConfig,
makeIdempotent,
} from '@aws-lambda-powertools/idempotency';
import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { User } from './samples/user.es6.generated.js'; // protobuf generated class

const logger = new Logger({ serviceName: 'kafka-consumer' });
const persistenceStore = new DynamoDBPersistenceLayer({
tableName: 'IdempotencyTable',
});

const schemaConfig = {
value: {
type: SchemaType.PROTOBUF,
schema: User,
},
} satisfies SchemaConfig;

const processRecord = makeIdempotent(
async (user, topic, partition, offset) => {
logger.info('processing user', {
userId: user.id,
meta: {
topic,
partition,
offset,
},
});

// ...your business logic here

return {
success: true,
userId: user.id,
};
},
{
persistenceStore,
config: new IdempotencyConfig({
eventKeyJmesPath: `topic & '-' & partition & '-' & offset`,
}),
}
);

export const handler = kafkaConsumer(async (event, _context) => {
for (const { value, topic, partition, offset } of event.records) {
await processRecord(value, topic, partition, offset);
}
}, schemaConfig);
44 changes: 44 additions & 0 deletions examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class

const logger = new Logger({ serviceName: 'kafka-consumer' });

export const handler = kafkaConsumer<unknown, IUser>(
async (event, _context) => {
for (const {
value,
topic,
partition,
offset,
timestamp,
headers,
} of event.records) {
logger.info(`processing message from topic ${topic}`, {
partition,
offset,
timestamp,
});

if (headers) {
for (const header of headers) {
logger.debug(`Header: ${header.key}`, {
value: header.value,
});
}
}

// Process the deserialized value
logger.info('User data', {
userId: value.id,
userName: value.name,
});
}
},
{
value: {
type: SchemaType.PROTOBUF,
schema: User,
},
}
);
42 changes: 42 additions & 0 deletions examples/snippets/kafka/advancedWorkingWithValibot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import * as v from 'valibot';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const OrderItemSchema = v.object({
productId: v.string(),
quantity: v.pipe(v.number(), v.integer(), v.toMinValue(1)),
price: v.pipe(v.number(), v.integer()),
});

const OrderSchema = v.object({
id: v.string(),
customerId: v.string(),
items: v.pipe(
v.array(OrderItemSchema),
v.minLength(1, 'Order must have at least one item')
),
createdAt: v.pipe(v.string(), v.isoDateTime()),
totalAmount: v.pipe(v.number(), v.toMinValue(0)),
});

const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer<unknown, v.InferInput<typeof OrderSchema>>(
async (event, _context) => {
for (const {
value: { id, items },
} of event.records) {
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
39 changes: 39 additions & 0 deletions examples/snippets/kafka/advancedWorkingWithZod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { z } from 'zod/v4';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const OrderItemSchema = z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
});

const OrderSchema = z.object({
id: z.string(),
customerId: z.string(),
items: z.array(OrderItemSchema).min(1, 'Order must have at least one item'),
createdAt: z.iso.datetime(),
totalAmount: z.number().positive(),
});

const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer<unknown, z.infer<typeof OrderSchema>>(
async (event, _context) => {
for (const {
value: { id, items },
} of event.records) {
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
11 changes: 11 additions & 0 deletions examples/snippets/kafka/diagrams/intro.mermaid
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
flowchart LR
KafkaTopic["Kafka Topic"] --> MSK["Amazon MSK"]
KafkaTopic --> MSKServerless["Amazon MSK Serverless"]
KafkaTopic --> SelfHosted["Self-hosted Kafka"]
MSK --> EventSourceMapping["Event Source Mapping"]
MSKServerless --> EventSourceMapping
SelfHosted --> EventSourceMapping
EventSourceMapping --> Lambda["Lambda Function"]
Lambda --> KafkaConsumer["Kafka Consumer Utility"]
KafkaConsumer --> Deserialization["Deserialization"]
Deserialization --> YourLogic["Your Business Logic"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
sequenceDiagram
participant Kafka
participant ESM as Event Source Mapping
participant SchemaRegistry as Schema Registry
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+ESM: Send batch of records
ESM->>+SchemaRegistry: Validate and deserialize
SchemaRegistry->>SchemaRegistry: Deserialize records
SchemaRegistry-->>-ESM: Return deserialized data
ESM->>+Lambda: Invoke with pre-deserialized JSON records
Lambda->>+KafkaConsumer: Pass Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Record is already deserialized
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-ESM: Return response
ESM-->>-Kafka: Acknowledge processed batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
sequenceDiagram
participant Kafka
participant ESM as Event Source Mapping
participant SchemaRegistry as Schema Registry
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+ESM: Send batch of records
ESM->>+SchemaRegistry: Validate schema
SchemaRegistry-->>-ESM: Confirm schema is valid
ESM->>+Lambda: Invoke with validated records (still encoded)
Lambda->>+KafkaConsumer: Pass Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Decode base64 data
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-ESM: Return response
ESM-->>-Kafka: Acknowledge processed batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
sequenceDiagram
participant Kafka
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+Lambda: Invoke with batch of records (direct integration)
Lambda->>+KafkaConsumer: Pass raw Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Decode base64 data
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-Kafka: Acknowledge processed batch
19 changes: 19 additions & 0 deletions examples/snippets/kafka/gettingStartedAvro.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { readFileSync } from 'node:fs';
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const schemaConfig = {
value: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer(async (event, _context) => {
for (const { value } of event.records) {
logger.info('received value', { value });
}
}, schemaConfig);
17 changes: 17 additions & 0 deletions examples/snippets/kafka/gettingStartedJson.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const schemaConfig = {
value: {
type: SchemaType.JSON,
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer(async (event, _context) => {
for (const { value } of event.records) {
logger.info('received value', { value });
}
}, schemaConfig);
Loading
Loading