From 8aa854cc7433ae54419914459a73753c34a06fe0 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Thu, 19 Jun 2025 20:51:50 +0200 Subject: [PATCH] docs(kafka): add preview docs page --- docs/features/index.md | 8 + docs/features/kafka.md | 345 ++++++++++++++++++ docs/index.md | 1 + .../kafka/advancedWorkingWithArkType.ts | 39 ++ .../kafka/advancedWorkingWithIdempotency.ts | 53 +++ .../advancedWorkingWithRecordMetadata.ts | 44 +++ .../kafka/advancedWorkingWithValibot.ts | 42 +++ .../snippets/kafka/advancedWorkingWithZod.ts | 39 ++ .../snippets/kafka/diagrams/intro.mermaid | 11 + .../usingESMWithJsonSchemaRegistry.mermaid | 26 ++ .../usingESMWithSchemaRegistry.mermaid | 26 ++ .../usingESMWithoutSchemaRegistry.mermaid | 20 + examples/snippets/kafka/gettingStartedAvro.ts | 19 + examples/snippets/kafka/gettingStartedJson.ts | 17 + .../snippets/kafka/gettingStartedKeyValue.ts | 70 ++++ .../kafka/gettingStartedPrimitiveValues.ts | 23 ++ .../snippets/kafka/gettingStartedProtobuf.ts | 19 + .../snippets/kafka/gettingStartedValueOnly.ts | 0 .../kafka/samples/user.es6.generated.d.ts | 120 ++++++ .../kafka/samples/user.es6.generated.js | 262 +++++++++++++ .../templates/gettingStartedWithMsk.yaml | 20 + examples/snippets/package.json | 4 + lerna.json | 1 + mkdocs.yml | 4 + package-lock.json | 63 +++- packages/kafka/README.md | 0 packages/kafka/package.json | 13 +- packages/kafka/typedoc.json | 11 + 28 files changed, 1293 insertions(+), 7 deletions(-) create mode 100644 docs/features/kafka.md create mode 100644 examples/snippets/kafka/advancedWorkingWithArkType.ts create mode 100644 examples/snippets/kafka/advancedWorkingWithIdempotency.ts create mode 100644 examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts create mode 100644 examples/snippets/kafka/advancedWorkingWithValibot.ts create mode 100644 examples/snippets/kafka/advancedWorkingWithZod.ts create mode 100644 examples/snippets/kafka/diagrams/intro.mermaid create mode 100644 examples/snippets/kafka/diagrams/usingESMWithJsonSchemaRegistry.mermaid create mode 100644 examples/snippets/kafka/diagrams/usingESMWithSchemaRegistry.mermaid create mode 100644 examples/snippets/kafka/diagrams/usingESMWithoutSchemaRegistry.mermaid create mode 100644 examples/snippets/kafka/gettingStartedAvro.ts create mode 100644 examples/snippets/kafka/gettingStartedJson.ts create mode 100644 examples/snippets/kafka/gettingStartedKeyValue.ts create mode 100644 examples/snippets/kafka/gettingStartedPrimitiveValues.ts create mode 100644 examples/snippets/kafka/gettingStartedProtobuf.ts create mode 100644 examples/snippets/kafka/gettingStartedValueOnly.ts create mode 100644 examples/snippets/kafka/samples/user.es6.generated.d.ts create mode 100644 examples/snippets/kafka/samples/user.es6.generated.js create mode 100644 examples/snippets/kafka/templates/gettingStartedWithMsk.yaml create mode 100644 packages/kafka/README.md create mode 100644 packages/kafka/typedoc.json diff --git a/docs/features/index.md b/docs/features/index.md index f06ae4701d..43653c06dc 100644 --- a/docs/features/index.md +++ b/docs/features/index.md @@ -87,4 +87,12 @@ description: Features of Powertools for AWS Lambda [:octicons-arrow-right-24: Read more](./validation.md) +- __Kafka__ + + --- + + Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions. + + [:octicons-arrow-right-24: Read more](./kafka.md) + diff --git a/docs/features/kafka.md b/docs/features/kafka.md new file mode 100644 index 0000000000..0ccd15078b --- /dev/null +++ b/docs/features/kafka.md @@ -0,0 +1,345 @@ +--- +title: Kafka Consumer +description: Utility +status: new +--- + +???+ info "Work in progress" + This documentation page is a work in progress for an upcoming feature in Powertools for AWS Lambda. If you're seeing this page, it means the release process is underway, but the feature is not yet available on npm. Please check back soon for the final version. + +The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem. + +```mermaid +--8<-- "examples/snippets/kafka/diagrams/intro.mermaid" +``` + +## Key features + +* Automatic deserialization of Kafka messages (JSON, Avro, and Protocol Buffers) +* Simplified event record handling with intuitive interface +* Support for key and value deserialization +* Support for [Standard Schema](https://github.com/standard-schema/standard-schema) output parsing (e.g., Zod, Valibot, ArkType) +* Support for Event Source Mapping (ESM) with and without Schema Registry integration +* Out of the box error handling for deserialization issues + +## Terminology + +**Event Source Mapping (ESM)** A Lambda feature that reads from streaming sources (like Kafka) and invokes your Lambda function. It manages polling, batching, and error handling automatically, eliminating the need for consumer management code. + +**Record Key and Value** A Kafka messages contain two important parts: an optional key that determines the partition and a value containing the actual message data. Both are base64-encoded in Lambda events and can be independently deserialized. + +**Deserialization** The process of converting binary data (base64-encoded in Lambda events) into usable Python objects according to a specific format like JSON, Avro, or Protocol Buffers. Powertools handles this conversion automatically. + +**SchemaConfig class** Contains parameters that tell Powertools how to interpret message data, including the format type (JSON, Avro, Protocol Buffers) and optional schema definitions needed for binary formats. + +**Output parsing** A [Standard Schema](https://github.com/standard-schema/standard-schema) used to parse your data at runtime, allowing you to define how the deserialized data should be structured and validated. + +**Schema Registry** A centralized service that stores and validates schemas, ensuring producers and consumers maintain compatibility when message formats evolve over time. + +## Moving from traditional Kafka consumers + +Lambda processes Kafka messages as discrete events rather than continuous streams, requiring a different approach to consumer development that Powertools for AWS helps standardize. + +| Aspect | Traditional Kafka Consumers | Lambda Kafka Consumer | +|-----------------------|-------------------------------------|----------------------------------------------------------------| +| **Model** | Pull-based (you poll for messages) | Push-based (Lambda invoked with messages) | +| **Scaling** | Manual scaling configuration | Automatic scaling to partition count | +| **State** | Long-running application with state | Stateless, ephemeral executions | +| **Offsets** | Manual offset management | Automatic offset commitment | +| **Schema Validation** | Client-side schema validation | Optional Schema Registry integration with Event Source Mapping | +| **Error Handling** | Per-message retry control | Batch-level retry policies | + +## Getting started + +### Installation + +Depending on the schema types you want to use, install the library and the corresponding libraries: + +=== "JSON" + ```bash + npm install @aws-lambda-powertools/kafka + ``` + +=== "Avro" + ```bash + npm install @aws-lambda-powertools/kafka avro-js + ``` + +=== "Protobuf" + ```bash + npm install @aws-lambda-powertools/kafka protobufjs + ``` + +Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType. + +### Required resources + +To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster. + +=== "gettingStartedWithMsk.yaml" + + ```yaml + --8<-- "examples/snippets/kafka/templates/gettingStartedWithMsk.yaml" + ``` + +### Using ESM with Schema Registry + +The Event Source Mapping configuration determines which mode is used. With `JSON`, Lambda converts all messages to JSON before invoking your function. With `SOURCE` mode, Lambda preserves the original format, requiring you function to handle the appropriate deserialization. + +Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration. + +### Processing Kafka events + +The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format. + +???+ tip "Using Avro is recommended" + We recommend Avro for production Kafka implementations due to its schema evolution capabilities, compact binary format, and integration with Schema Registry. This offers better type safety and forward/backward compatibility compared to JSON. + +=== "Avro Messages" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedAvro.ts" + ``` + +=== "Protocol Buffers" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedProtobuf.ts" + ``` + +=== "JSON Messages" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedJson.ts" + ``` + +### Deserializing keys and values + +The `kafkaConsumer` function can deserialize both keys and values independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message. + +=== "index.ts" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:func" + ``` + +=== "types.ts" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:types" + ``` + +=== "ProductKey.avsc" + + ```json + --8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:2:8" + ``` + +=== "ProductInfo.avsc" + + ```json + --8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:12:20" + ``` + +You can configure the `kafkaConsumer` to handle only the value. This allows you to optimize your Lambda function for the specific data structure of your Kafka messages. + +### Handling primitive types + +When working with primitive data types (strings, integers, etc.) rather than structured objects, you can simplify your configuration by omitting the schema specification for that component. Powertools for AWS will deserialize the value always as a string. + +???+ tip "Common pattern: Keys with primitive values" + Using primitive types (strings, integers) as Kafka message keys is a common pattern for partitioning and identifying messages. The Kafka consumer automatically handles these primitive keys without requiring special configuration, making it easy to implement this popular design pattern. + +=== "Primitive key" + + ```typescript + --8<-- "examples/snippets/kafka/gettingStartedPrimitiveValues.ts" + ``` + +### Message format support and comparison + +The Kafka consumer utility supports multiple serialization formats to match your existing Kafka implementation. Choose the format that best suits your needs based on performance, schema evolution requirements, and ecosystem compatibility. + +???+ tip "Selecting the right format" + For new applications, consider Avro or Protocol Buffers over JSON. Both provide schema validation, evolution support, and significantly better performance with smaller message sizes. Avro is particularly well-suited for Kafka due to its built-in schema evolution capabilities. + +=== "Supported Formats" + + | Format | Schema Type | Description | Required Parameters | + |----------------------|--------------|-----------------------------------|--------------------------------------| + | **JSON** | `"JSON"` | Human-readable text format | None | + | **Avro** | `"AVRO"` | Compact binary format with schema | `value.schema` (Avro schema string) | + | **Protocol Buffers** | `"PROTOBUF"` | Efficient binary format | `value.schema` (Proto message class) | + +=== "Format Comparison" + + | Feature | JSON | Avro | Protocol Buffers | + |-------------------------------|----------|----------------------|-------------------------| + | **Schema Definition** | Optional | Required JSON schema | Required Protobuf class | + | **Schema Evolution** | None | Strong support | Strong support | + | **Size Efficiency** | Low | High | Highest | + | **Processing Speed** | Slower | Fast | Fastest | + | **Human Readability** | High | Low | Low | + | **Implementation Complexity** | Low | Medium | Medium | + | **Additional Dependencies** | None | `avro-js` module | `protobufjs` module | + +Choose the serialization format that best fits your needs: + +* **JSON**: Best for simplicity and when schema flexibility is important +* **Avro**: Best for systems with evolving schemas and when compatibility is critical +* **Protocol Buffers**: Best for performance-critical systems with structured data + +## Advanced + +### Accessing record metadata + +Each Kafka record contains important metadata that you can access alongside the deserialized message content. This metadata helps with message processing, troubleshooting, and implementing advanced patterns like exactly-once processing. + +=== "Working with Record Metadata" + + ```typescript + --8<-- "examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts" + ``` + +For debugging purposes, you can also access the original key, value, and headers in their base64-encoded form, these are available in the `originalValue`, `originalKey`, and `originalHeaders` properties of the `record`. + +#### Available metadata properties + +| Property | Description | Example Use Case | +|-------------------|-----------------------------------------------------|---------------------------------------------| +| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers | +| `partition` | Kafka partition number | Tracking message distribution | +| `offset` | Position in the partition | De-duplication, exactly-once processing | +| `timestamp` | Unix timestamp when record was created | Event timing analysis | +| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification | +| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs | +| `key` | Deserialized message key | Customer ID or entity identifier | +| `value` | Deserialized message content | The actual business data | +| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization | +| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization | +| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization | + +### Custom output serializers + +You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations. + +=== "Zod" + + ```typescript + --8<-- "examples/snippets/kafka/advancedWorkingWithZod.ts" + ``` + +=== "Valibot" + + ```typescript + --8<-- "examples/snippets/kafka/advancedWorkingWithValibot.ts" + ``` + +=== "ArkType" + + ```typescript + --8<-- "examples/snippets/kafka/advancedWorkingWithArkType.ts" + ``` + +#### Exception types + +| Exception | Description | Common Causes | +|-----------|-------------|---------------| +| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration | +| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema | +| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) | +| `KafkaConsumerOutputSerializerError` | Raised when output serializer fails | Error in custom serializer function, incompatible data, or validation failures in Pydantic models | + +### Integrating with Idempotency + +When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once. + +The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics. + +=== "Idempotent Kafka Processing" + + ```typescript + --8<-- "examples/snippets/kafka/advancedWorkingWithIdempotency.ts" + ``` + +TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once. + +### Troubleshooting + +#### Deserialization failures + +When encountering deserialization errors with your Kafka messages, follow this systematic troubleshooting approach to identify and resolve the root cause. + +First, check that your schema definition exactly matches the message format. Even minor discrepancies can cause deserialization failures, especially with binary formats like Avro and Protocol Buffers. + +For binary messages that fail to deserialize, examine the raw encoded data: + +```python +# DO NOT include this code in production handlers +# For troubleshooting purposes only +import base64 + +raw_bytes = base64.b64decode(record.raw_value) +print(f"Message size: {len(raw_bytes)} bytes") +print(f"First 50 bytes (hex): {raw_bytes[:50].hex()}") +``` + +#### Schema compatibility issues + +Schema compatibility issues often manifest as successful connections but failed deserialization. Common causes include: + +* **Schema evolution without backward compatibility**: New producer schema is incompatible with consumer schema +* **Field type mismatches**: For example, a field changed from string to integer across systems +* **Missing required fields**: Fields required by the consumer schema but absent in the message +* **Default value discrepancies**: Different handling of default values between languages + +When using Schema Registry, verify schema compatibility rules are properly configured for your topics and that all applications use the same registry. + +#### Memory and timeout optimization + +Lambda functions processing Kafka messages may encounter resource constraints, particularly with large batches or complex processing logic. + +For memory errors: + +* Increase Lambda memory allocation, which also provides more CPU resources +* Process fewer records per batch by adjusting the `BatchSize` parameter in your event source mapping +* Consider optimizing your message format to reduce memory footprint + +For timeout issues: + +* Extend your Lambda function timeout setting to accommodate processing time +* Implement chunked or asynchronous processing patterns for time-consuming operations +* Monitor and optimize database operations, external API calls, or other I/O operations in your handler + +???+ tip "Monitoring memory usage" + Use CloudWatch metrics to track your function's memory utilization. If it consistently exceeds 80% of allocated memory, consider increasing the memory allocation or optimizing your code. + +## Kafka consumer workflow + +### Using ESM with Schema Registry validation (SOURCE) + +
+```mermaid +--8<-- "examples/snippets/kafka/diagrams/usingESMWithSchemaRegistry.mermaid" +``` +
+ +### Using ESM with Schema Registry deserialization (JSON) + +
+```mermaid +--8<-- "examples/snippets/kafka/diagrams/usingESMWithJsonSchemaRegistry.mermaid" +``` +
+ +### Using ESM without Schema Registry integration + +
+```mermaid +--8<-- "examples/snippets/kafka/diagrams/usingESMWithoutSchemaRegistry.mermaid" +``` +
+ +## Testing your code + +TBD diff --git a/docs/index.md b/docs/index.md index d633659c76..1595509d5d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -54,6 +54,7 @@ Powertools for AWS Lambda (TypeScript) is built as a modular toolkit, so you can | [JMESPath Functions](./features/jmespath.md) | Built-in JMESPath functions to easily deserialize common encoded JSON payloads in Lambda functions. | | [Parser](./features/parser.md) | Utility to parse and validate AWS Lambda event payloads using Zod, a TypeScript-first schema declaration and validation library. | | [Validation](./features/validation.md) | JSON Schema validation for events and responses, including JMESPath support to unwrap events before validation. | +| [Kafka](./features/kafka.md) | Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions. | ## Examples diff --git a/examples/snippets/kafka/advancedWorkingWithArkType.ts b/examples/snippets/kafka/advancedWorkingWithArkType.ts new file mode 100644 index 0000000000..3e7cc5b37e --- /dev/null +++ b/examples/snippets/kafka/advancedWorkingWithArkType.ts @@ -0,0 +1,39 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { type } from 'arktype'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = type({ + productId: 'string', + quantity: 'number.integer >= 1', + price: 'number.integer', +}); + +const OrderSchema = type({ + id: 'string', + customerId: 'string', + items: OrderItemSchema.array().moreThanLength(0), + createdAt: 'string.date', + totalAmount: 'number.integer >= 0', +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer( + async (event, _context) => { + for (const { + value: { id, items }, + } of event.records) { + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); diff --git a/examples/snippets/kafka/advancedWorkingWithIdempotency.ts b/examples/snippets/kafka/advancedWorkingWithIdempotency.ts new file mode 100644 index 0000000000..1c4a58c132 --- /dev/null +++ b/examples/snippets/kafka/advancedWorkingWithIdempotency.ts @@ -0,0 +1,53 @@ +import { + IdempotencyConfig, + makeIdempotent, +} from '@aws-lambda-powertools/idempotency'; +import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { User } from './samples/user.es6.generated.js'; // protobuf generated class + +const logger = new Logger({ serviceName: 'kafka-consumer' }); +const persistenceStore = new DynamoDBPersistenceLayer({ + tableName: 'IdempotencyTable', +}); + +const schemaConfig = { + value: { + type: SchemaType.PROTOBUF, + schema: User, + }, +} satisfies SchemaConfig; + +const processRecord = makeIdempotent( + async (user, topic, partition, offset) => { + logger.info('processing user', { + userId: user.id, + meta: { + topic, + partition, + offset, + }, + }); + + // ...your business logic here + + return { + success: true, + userId: user.id, + }; + }, + { + persistenceStore, + config: new IdempotencyConfig({ + eventKeyJmesPath: `topic & '-' & partition & '-' & offset`, + }), + } +); + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value, topic, partition, offset } of event.records) { + await processRecord(value, topic, partition, offset); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts new file mode 100644 index 0000000000..23fdc79ad7 --- /dev/null +++ b/examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts @@ -0,0 +1,44 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +export const handler = kafkaConsumer( + async (event, _context) => { + for (const { + value, + topic, + partition, + offset, + timestamp, + headers, + } of event.records) { + logger.info(`processing message from topic ${topic}`, { + partition, + offset, + timestamp, + }); + + if (headers) { + for (const header of headers) { + logger.debug(`Header: ${header.key}`, { + value: header.value, + }); + } + } + + // Process the deserialized value + logger.info('User data', { + userId: value.id, + userName: value.name, + }); + } + }, + { + value: { + type: SchemaType.PROTOBUF, + schema: User, + }, + } +); diff --git a/examples/snippets/kafka/advancedWorkingWithValibot.ts b/examples/snippets/kafka/advancedWorkingWithValibot.ts new file mode 100644 index 0000000000..ff33c818b4 --- /dev/null +++ b/examples/snippets/kafka/advancedWorkingWithValibot.ts @@ -0,0 +1,42 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import * as v from 'valibot'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = v.object({ + productId: v.string(), + quantity: v.pipe(v.number(), v.integer(), v.toMinValue(1)), + price: v.pipe(v.number(), v.integer()), +}); + +const OrderSchema = v.object({ + id: v.string(), + customerId: v.string(), + items: v.pipe( + v.array(OrderItemSchema), + v.minLength(1, 'Order must have at least one item') + ), + createdAt: v.pipe(v.string(), v.isoDateTime()), + totalAmount: v.pipe(v.number(), v.toMinValue(0)), +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer>( + async (event, _context) => { + for (const { + value: { id, items }, + } of event.records) { + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); diff --git a/examples/snippets/kafka/advancedWorkingWithZod.ts b/examples/snippets/kafka/advancedWorkingWithZod.ts new file mode 100644 index 0000000000..3ebe835484 --- /dev/null +++ b/examples/snippets/kafka/advancedWorkingWithZod.ts @@ -0,0 +1,39 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { z } from 'zod/v4'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const OrderItemSchema = z.object({ + productId: z.string(), + quantity: z.number().int().positive(), + price: z.number().positive(), +}); + +const OrderSchema = z.object({ + id: z.string(), + customerId: z.string(), + items: z.array(OrderItemSchema).min(1, 'Order must have at least one item'), + createdAt: z.iso.datetime(), + totalAmount: z.number().positive(), +}); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + parserSchema: OrderSchema, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer>( + async (event, _context) => { + for (const { + value: { id, items }, + } of event.records) { + logger.setCorrelationId(id); + logger.debug(`order includes ${items.length} items`); + } + }, + schemaConfig +); diff --git a/examples/snippets/kafka/diagrams/intro.mermaid b/examples/snippets/kafka/diagrams/intro.mermaid new file mode 100644 index 0000000000..a86d656efd --- /dev/null +++ b/examples/snippets/kafka/diagrams/intro.mermaid @@ -0,0 +1,11 @@ +flowchart LR + KafkaTopic["Kafka Topic"] --> MSK["Amazon MSK"] + KafkaTopic --> MSKServerless["Amazon MSK Serverless"] + KafkaTopic --> SelfHosted["Self-hosted Kafka"] + MSK --> EventSourceMapping["Event Source Mapping"] + MSKServerless --> EventSourceMapping + SelfHosted --> EventSourceMapping + EventSourceMapping --> Lambda["Lambda Function"] + Lambda --> KafkaConsumer["Kafka Consumer Utility"] + KafkaConsumer --> Deserialization["Deserialization"] + Deserialization --> YourLogic["Your Business Logic"] \ No newline at end of file diff --git a/examples/snippets/kafka/diagrams/usingESMWithJsonSchemaRegistry.mermaid b/examples/snippets/kafka/diagrams/usingESMWithJsonSchemaRegistry.mermaid new file mode 100644 index 0000000000..6b1e1f91c5 --- /dev/null +++ b/examples/snippets/kafka/diagrams/usingESMWithJsonSchemaRegistry.mermaid @@ -0,0 +1,26 @@ +sequenceDiagram + participant Kafka + participant ESM as Event Source Mapping + participant SchemaRegistry as Schema Registry + participant Lambda + participant KafkaConsumer + participant YourCode + Kafka->>+ESM: Send batch of records + ESM->>+SchemaRegistry: Validate and deserialize + SchemaRegistry->>SchemaRegistry: Deserialize records + SchemaRegistry-->>-ESM: Return deserialized data + ESM->>+Lambda: Invoke with pre-deserialized JSON records + Lambda->>+KafkaConsumer: Pass Kafka event + KafkaConsumer->>KafkaConsumer: Parse event structure + loop For each record + KafkaConsumer->>KafkaConsumer: Record is already deserialized + alt Output serializer provided + KafkaConsumer->>KafkaConsumer: Apply output serializer + end + end + KafkaConsumer->>+YourCode: Provide ConsumerRecords + YourCode->>YourCode: Process records + YourCode-->>-KafkaConsumer: Return result + KafkaConsumer-->>-Lambda: Pass result back + Lambda-->>-ESM: Return response + ESM-->>-Kafka: Acknowledge processed batch \ No newline at end of file diff --git a/examples/snippets/kafka/diagrams/usingESMWithSchemaRegistry.mermaid b/examples/snippets/kafka/diagrams/usingESMWithSchemaRegistry.mermaid new file mode 100644 index 0000000000..5a29d771c8 --- /dev/null +++ b/examples/snippets/kafka/diagrams/usingESMWithSchemaRegistry.mermaid @@ -0,0 +1,26 @@ +sequenceDiagram + participant Kafka + participant ESM as Event Source Mapping + participant SchemaRegistry as Schema Registry + participant Lambda + participant KafkaConsumer + participant YourCode + Kafka->>+ESM: Send batch of records + ESM->>+SchemaRegistry: Validate schema + SchemaRegistry-->>-ESM: Confirm schema is valid + ESM->>+Lambda: Invoke with validated records (still encoded) + Lambda->>+KafkaConsumer: Pass Kafka event + KafkaConsumer->>KafkaConsumer: Parse event structure + loop For each record + KafkaConsumer->>KafkaConsumer: Decode base64 data + KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type + alt Output serializer provided + KafkaConsumer->>KafkaConsumer: Apply output serializer + end + end + KafkaConsumer->>+YourCode: Provide ConsumerRecords + YourCode->>YourCode: Process records + YourCode-->>-KafkaConsumer: Return result + KafkaConsumer-->>-Lambda: Pass result back + Lambda-->>-ESM: Return response + ESM-->>-Kafka: Acknowledge processed batch \ No newline at end of file diff --git a/examples/snippets/kafka/diagrams/usingESMWithoutSchemaRegistry.mermaid b/examples/snippets/kafka/diagrams/usingESMWithoutSchemaRegistry.mermaid new file mode 100644 index 0000000000..f68ee58074 --- /dev/null +++ b/examples/snippets/kafka/diagrams/usingESMWithoutSchemaRegistry.mermaid @@ -0,0 +1,20 @@ +sequenceDiagram + participant Kafka + participant Lambda + participant KafkaConsumer + participant YourCode + Kafka->>+Lambda: Invoke with batch of records (direct integration) + Lambda->>+KafkaConsumer: Pass raw Kafka event + KafkaConsumer->>KafkaConsumer: Parse event structure + loop For each record + KafkaConsumer->>KafkaConsumer: Decode base64 data + KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type + alt Output serializer provided + KafkaConsumer->>KafkaConsumer: Apply output serializer + end + end + KafkaConsumer->>+YourCode: Provide ConsumerRecords + YourCode->>YourCode: Process records + YourCode-->>-KafkaConsumer: Return result + KafkaConsumer-->>-Lambda: Pass result back + Lambda-->>-Kafka: Acknowledge processed batch \ No newline at end of file diff --git a/examples/snippets/kafka/gettingStartedAvro.ts b/examples/snippets/kafka/gettingStartedAvro.ts new file mode 100644 index 0000000000..2b2ce2066e --- /dev/null +++ b/examples/snippets/kafka/gettingStartedAvro.ts @@ -0,0 +1,19 @@ +import { readFileSync } from 'node:fs'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.AVRO, + schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'), + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/gettingStartedJson.ts b/examples/snippets/kafka/gettingStartedJson.ts new file mode 100644 index 0000000000..7030fb7512 --- /dev/null +++ b/examples/snippets/kafka/gettingStartedJson.ts @@ -0,0 +1,17 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.JSON, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/gettingStartedKeyValue.ts b/examples/snippets/kafka/gettingStartedKeyValue.ts new file mode 100644 index 0000000000..3ba56caa78 --- /dev/null +++ b/examples/snippets/kafka/gettingStartedKeyValue.ts @@ -0,0 +1,70 @@ +const keySchema = ` +{ + "type": "record", + "name": "ProductKey", + "fields": [ + {"name": "product_id", "type": "string"} + ] +} +`; + +const valueSchema = ` +{ + "type": "record", + "name": "ProductInfo", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "price", "type": "double"}, + {"name": "in_stock", "type": "boolean"} + ] +} +`; + +// --8<-- [start:types] + +type ProductKey = { + productId: string; +}; + +type ProductInfo = { + name: string; + price: number; + inStock: boolean; +}; + +// --8<-- [end:types] + +// --8<-- [start:func] + +import { readFileSync } from 'node:fs'; +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + key: { + type: SchemaType.AVRO, + schema: readFileSync(new URL('./ProductKey.avsc', import.meta.url), 'utf8'), + }, + value: { + type: SchemaType.AVRO, + schema: readFileSync( + new URL('./productInfo.avsc', import.meta.url), + 'utf8' + ), + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer( + async (event, _context) => { + for (const { key, value } of event.records) { + logger.info('processing product ID', { productId: key.productId }); + logger.info('product', { name: value.name, price: value.price }); + } + }, + schemaConfig +); + +// --8<-- [end:func] diff --git a/examples/snippets/kafka/gettingStartedPrimitiveValues.ts b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts new file mode 100644 index 0000000000..dcf30e939d --- /dev/null +++ b/examples/snippets/kafka/gettingStartedPrimitiveValues.ts @@ -0,0 +1,23 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import { Logger } from '@aws-lambda-powertools/logger'; + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +export const handler = kafkaConsumer( + async (event, _context) => { + for (const record of event.records) { + // Key is automatically decoded as UTF-8 string + const { key } = record; + // Value is parsed as JSON object + const { value } = record; + + logger.info('received value', { + key, + product: { + id: value.id, + name: value.name, + }, + }); + } + } +); diff --git a/examples/snippets/kafka/gettingStartedProtobuf.ts b/examples/snippets/kafka/gettingStartedProtobuf.ts new file mode 100644 index 0000000000..34c9368f28 --- /dev/null +++ b/examples/snippets/kafka/gettingStartedProtobuf.ts @@ -0,0 +1,19 @@ +import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka'; +import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types'; +import { Logger } from '@aws-lambda-powertools/logger'; +import { User } from './samples/user.es6.generated.js'; // protobuf generated class + +const logger = new Logger({ serviceName: 'kafka-consumer' }); + +const schemaConfig = { + value: { + type: SchemaType.PROTOBUF, + schema: User, + }, +} satisfies SchemaConfig; + +export const handler = kafkaConsumer(async (event, _context) => { + for (const { value } of event.records) { + logger.info('received value', { value }); + } +}, schemaConfig); diff --git a/examples/snippets/kafka/gettingStartedValueOnly.ts b/examples/snippets/kafka/gettingStartedValueOnly.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/snippets/kafka/samples/user.es6.generated.d.ts b/examples/snippets/kafka/samples/user.es6.generated.d.ts new file mode 100644 index 0000000000..277a592edc --- /dev/null +++ b/examples/snippets/kafka/samples/user.es6.generated.d.ts @@ -0,0 +1,120 @@ +import * as $protobuf from "protobufjs"; +import Long = require("long"); +/** Properties of a User. */ +export interface IUser { + /** User id */ + id?: number | null; + + /** User name */ + name?: string | null; + + /** User price */ + price?: number | null; +} + +/** Represents a User. */ +export class User implements IUser { + /** + * Constructs a new User. + * @param [properties] Properties to set + */ + constructor(properties?: IUser); + + /** User id. */ + public id: number; + + /** User name. */ + public name: string; + + /** User price. */ + public price: number; + + /** + * Creates a new User instance using the specified properties. + * @param [properties] Properties to set + * @returns User instance + */ + public static create(properties?: IUser): User; + + /** + * Encodes the specified User message. Does not implicitly {@link User.verify|verify} messages. + * @param message User message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encode( + message: IUser, + writer?: $protobuf.Writer, + ): $protobuf.Writer; + + /** + * Encodes the specified User message, length delimited. Does not implicitly {@link User.verify|verify} messages. + * @param message User message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encodeDelimited( + message: IUser, + writer?: $protobuf.Writer, + ): $protobuf.Writer; + + /** + * Decodes a User message from the specified reader or buffer. + * @param reader Reader or buffer to decode from + * @param [length] Message length if known beforehand + * @returns User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode( + reader: $protobuf.Reader | Uint8Array, + length?: number, + ): User; + + /** + * Decodes a User message from the specified reader or buffer, length delimited. + * @param reader Reader or buffer to decode from + * @returns User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decodeDelimited(reader: $protobuf.Reader | Uint8Array): User; + + /** + * Verifies a User message. + * @param message Plain object to verify + * @returns `null` if valid, otherwise the reason why it is not + */ + public static verify(message: { [k: string]: any }): string | null; + + /** + * Creates a User message from a plain object. Also converts values to their respective internal types. + * @param object Plain object + * @returns User + */ + public static fromObject(object: { [k: string]: any }): User; + + /** + * Creates a plain object from a User message. Also converts values to other types if specified. + * @param message User + * @param [options] Conversion options + * @returns Plain object + */ + public static toObject( + message: User, + options?: $protobuf.IConversionOptions, + ): { [k: string]: any }; + + /** + * Converts this User to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + + /** + * Gets the default type url for User + * @param [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns The default type url + */ + public static getTypeUrl(typeUrlPrefix?: string): string; +} diff --git a/examples/snippets/kafka/samples/user.es6.generated.js b/examples/snippets/kafka/samples/user.es6.generated.js new file mode 100644 index 0000000000..2b21fddd88 --- /dev/null +++ b/examples/snippets/kafka/samples/user.es6.generated.js @@ -0,0 +1,262 @@ +/*eslint-disable block-scoped-var, id-length, no-control-regex, no-magic-numbers, no-prototype-builtins, no-redeclare, no-shadow, no-var, sort-vars*/ +import * as $protobuf from "protobufjs/minimal"; + +// Common aliases +const $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +const $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +export const User = $root.User = (() => { + + /** + * Properties of a User. + * @exports IUser + * @interface IUser + * @property {number|null} [id] User id + * @property {string|null} [name] User name + * @property {number|null} [price] User price + */ + + /** + * Constructs a new User. + * @exports User + * @classdesc Represents a User. + * @implements IUser + * @constructor + * @param {IUser=} [properties] Properties to set + */ + function User(properties) { + if (properties) + for (let keys = Object.keys(properties), i = 0; i < keys.length; ++i) + if (properties[keys[i]] != null) + this[keys[i]] = properties[keys[i]]; + } + + /** + * User id. + * @member {number} id + * @memberof User + * @instance + */ + User.prototype.id = 0; + + /** + * User name. + * @member {string} name + * @memberof User + * @instance + */ + User.prototype.name = ""; + + /** + * User price. + * @member {number} price + * @memberof User + * @instance + */ + User.prototype.price = 0; + + /** + * Creates a new User instance using the specified properties. + * @function create + * @memberof User + * @static + * @param {IUser=} [properties] Properties to set + * @returns {User} User instance + */ + User.create = function create(properties) { + return new User(properties); + }; + + /** + * Encodes the specified User message. Does not implicitly {@link User.verify|verify} messages. + * @function encode + * @memberof User + * @static + * @param {IUser} message User message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + User.encode = function encode(message, writer) { + if (!writer) + writer = $Writer.create(); + if (message.id != null && Object.hasOwnProperty.call(message, "id")) + writer.uint32(/* id 1, wireType 0 =*/8).int32(message.id); + if (message.name != null && Object.hasOwnProperty.call(message, "name")) + writer.uint32(/* id 2, wireType 2 =*/18).string(message.name); + if (message.price != null && Object.hasOwnProperty.call(message, "price")) + writer.uint32(/* id 3, wireType 1 =*/25).double(message.price); + return writer; + }; + + /** + * Encodes the specified User message, length delimited. Does not implicitly {@link User.verify|verify} messages. + * @function encodeDelimited + * @memberof User + * @static + * @param {IUser} message User message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + User.encodeDelimited = function encodeDelimited(message, writer) { + return this.encode(message, writer).ldelim(); + }; + + /** + * Decodes a User message from the specified reader or buffer. + * @function decode + * @memberof User + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @param {number} [length] Message length if known beforehand + * @returns {User} User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + User.decode = function decode(reader, length, error) { + if (!(reader instanceof $Reader)) + reader = $Reader.create(reader); + let end = length === undefined ? reader.len : reader.pos + length, message = new $root.User(); + while (reader.pos < end) { + let tag = reader.uint32(); + if (tag === error) + break; + switch (tag >>> 3) { + case 1: { + message.id = reader.int32(); + break; + } + case 2: { + message.name = reader.string(); + break; + } + case 3: { + message.price = reader.double(); + break; + } + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }; + + /** + * Decodes a User message from the specified reader or buffer, length delimited. + * @function decodeDelimited + * @memberof User + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @returns {User} User + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + User.decodeDelimited = function decodeDelimited(reader) { + if (!(reader instanceof $Reader)) + reader = new $Reader(reader); + return this.decode(reader, reader.uint32()); + }; + + /** + * Verifies a User message. + * @function verify + * @memberof User + * @static + * @param {Object.} message Plain object to verify + * @returns {string|null} `null` if valid, otherwise the reason why it is not + */ + User.verify = function verify(message) { + if (typeof message !== "object" || message === null) + return "object expected"; + if (message.id != null && message.hasOwnProperty("id")) + if (!$util.isInteger(message.id)) + return "id: integer expected"; + if (message.name != null && message.hasOwnProperty("name")) + if (!$util.isString(message.name)) + return "name: string expected"; + if (message.price != null && message.hasOwnProperty("price")) + if (typeof message.price !== "number") + return "price: number expected"; + return null; + }; + + /** + * Creates a User message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof User + * @static + * @param {Object.} object Plain object + * @returns {User} User + */ + User.fromObject = function fromObject(object) { + if (object instanceof $root.User) + return object; + let message = new $root.User(); + if (object.id != null) + message.id = object.id | 0; + if (object.name != null) + message.name = String(object.name); + if (object.price != null) + message.price = Number(object.price); + return message; + }; + + /** + * Creates a plain object from a User message. Also converts values to other types if specified. + * @function toObject + * @memberof User + * @static + * @param {User} message User + * @param {$protobuf.IConversionOptions} [options] Conversion options + * @returns {Object.} Plain object + */ + User.toObject = function toObject(message, options) { + if (!options) + options = {}; + let object = {}; + if (options.defaults) { + object.id = 0; + object.name = ""; + object.price = 0; + } + if (message.id != null && message.hasOwnProperty("id")) + object.id = message.id; + if (message.name != null && message.hasOwnProperty("name")) + object.name = message.name; + if (message.price != null && message.hasOwnProperty("price")) + object.price = options.json && !isFinite(message.price) ? String(message.price) : message.price; + return object; + }; + + /** + * Converts this User to JSON. + * @function toJSON + * @memberof User + * @instance + * @returns {Object.} JSON object + */ + User.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * Gets the default type url for User + * @function getTypeUrl + * @memberof User + * @static + * @param {string} [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns {string} The default type url + */ + User.getTypeUrl = function getTypeUrl(typeUrlPrefix) { + if (typeUrlPrefix === undefined) { + typeUrlPrefix = "type.googleapis.com"; + } + return typeUrlPrefix + "/User"; + }; + + return User; +})(); + +export { $root as default }; diff --git a/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml b/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml new file mode 100644 index 0000000000..ab14d728ff --- /dev/null +++ b/examples/snippets/kafka/templates/gettingStartedWithMsk.yaml @@ -0,0 +1,20 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Resources: + KafkaConsumerFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + Runtime: python3.13 + Timeout: 30 + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !GetAtt MyMSKCluster.Arn + Topics: + - my-topic-1 + - my-topic-2 + Policies: + - AWSLambdaMSKExecutionRole \ No newline at end of file diff --git a/examples/snippets/package.json b/examples/snippets/package.json index 29bb77d313..51c7510968 100644 --- a/examples/snippets/package.json +++ b/examples/snippets/package.json @@ -44,5 +44,9 @@ "aws-sdk": "^2.1692.0", "aws-sdk-client-mock": "^4.1.0", "zod": "^3.25.67" + }, + "dependencies": { + "arktype": "^2.1.20", + "valibot": "^1.1.0" } } diff --git a/lerna.json b/lerna.json index d1a5ab193a..5fccf3baaf 100644 --- a/lerna.json +++ b/lerna.json @@ -12,6 +12,7 @@ "packages/parser", "packages/event-handler", "packages/validation", + "packages/kafka", "examples/app", "layers", "examples/snippets" diff --git a/mkdocs.yml b/mkdocs.yml index 75fb29e2c9..bd1f584dfb 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -27,6 +27,8 @@ watch: [ examples/snippets/tracer, packages/validation/src, examples/snippets/validation, + packages/kafka/src, + examples/snippets/kafka, ] nav: @@ -52,6 +54,7 @@ nav: - features/jmespath.md - features/parser.md - features/validation.md + - features/kafka.md - Environment variables: environment-variables.md - Upgrade guide: upgrade.md - Community Content: we_made_this.md @@ -179,6 +182,7 @@ plugins: - features/jmespath.md - features/parser.md - features/validation.md + - features/kafka.md Environment variables: - environment-variables.md Upgrade guide: diff --git a/package-lock.json b/package-lock.json index a85f8fe6f5..eeac4553fe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -83,6 +83,10 @@ "name": "code-snippets", "version": "2.21.0", "license": "MIT-0", + "dependencies": { + "arktype": "^2.1.20", + "valibot": "^1.1.0" + }, "devDependencies": { "@aws-lambda-powertools/batch": "^2.21.0", "@aws-lambda-powertools/event-handler": "^2.21.0", @@ -136,6 +140,21 @@ "node": ">=6.0.0" } }, + "node_modules/@ark/schema": { + "version": "0.46.0", + "resolved": "https://registry.npmjs.org/@ark/schema/-/schema-0.46.0.tgz", + "integrity": "sha512-c2UQdKgP2eqqDArfBqQIJppxJHvNNXuQPeuSPlDML4rjw+f1cu0qAlzOG4b8ujgm9ctIDWwhpyw6gjG5ledIVQ==", + "license": "MIT", + "dependencies": { + "@ark/util": "0.46.0" + } + }, + "node_modules/@ark/util": { + "version": "0.46.0", + "resolved": "https://registry.npmjs.org/@ark/util/-/util-0.46.0.tgz", + "integrity": "sha512-JPy/NGWn/lvf1WmGCPw2VGpBg5utZraE84I7wli18EDF3p3zc/e9WolT35tINeZO3l7C77SjqRJeAUoT0CvMRg==", + "license": "MIT" + }, "node_modules/@aws-cdk/asset-awscli-v1": { "version": "2.2.237", "resolved": "https://registry.npmjs.org/@aws-cdk/asset-awscli-v1/-/asset-awscli-v1-2.2.237.tgz", @@ -14784,6 +14803,16 @@ "integrity": "sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==", "dev": true }, + "node_modules/arktype": { + "version": "2.1.20", + "resolved": "https://registry.npmjs.org/arktype/-/arktype-2.1.20.tgz", + "integrity": "sha512-IZCEEXaJ8g+Ijd59WtSYwtjnqXiwM8sWQ5EjGamcto7+HVN9eK0C4p0zDlCuAwWhpqr6fIBkxPuYDl4/Mcj/+Q==", + "license": "MIT", + "dependencies": { + "@ark/schema": "0.46.0", + "@ark/util": "0.46.0" + } + }, "node_modules/array-differ": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/array-differ/-/array-differ-3.0.0.tgz", @@ -24618,7 +24647,7 @@ "version": "5.8.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.3.tgz", "integrity": "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ==", - "dev": true, + "devOptional": true, "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", @@ -24761,6 +24790,20 @@ "uuid": "dist/bin/uuid" } }, + "node_modules/valibot": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/valibot/-/valibot-1.1.0.tgz", + "integrity": "sha512-Nk8lX30Qhu+9txPYTwM0cFlWLdPFsFr6LblzqIySfbZph9+BFsAHsNvHOymEviUepeIW6KFHzpX8TKhbptBXXw==", + "license": "MIT", + "peerDependencies": { + "typescript": ">=5" + }, + "peerDependenciesMeta": { + "typescript": { + "optional": true + } + } + }, "node_modules/validate-npm-package-license": { "version": "3.0.4", "resolved": "https://registry.npmjs.org/validate-npm-package-license/-/validate-npm-package-license-3.0.4.tgz", @@ -25498,19 +25541,27 @@ "protobufjs": "^7.5.3", "zod": "^3.25.67" }, - "peerDependencies": { - "zod": ">=3.24.0", + "peerDependencies": { + "arktype": ">=2.0.0", + "avro-js": ">=1.12.0", + "protobufjs": ">=7.5.3", "valibot": ">=1.0.0", - "arktype": ">=2.0.0" + "zod": ">=3.24.0" }, "peerDependenciesMeta": { - "zod": { + "arktype": { + "optional": true + }, + "avro-js": { + "optional": true + }, + "protobufjs": { "optional": true }, "valibot": { "optional": true }, - "arktype": { + "zod": { "optional": true } } diff --git a/packages/kafka/README.md b/packages/kafka/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 6660df63be..7d4052f074 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,5 +1,6 @@ { "name": "@aws-lambda-powertools/kafka", + "description": "Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions", "version": "2.21.0", "author": { "name": "Amazon Web Services", @@ -34,7 +35,17 @@ "bugs": { "url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" }, - "keywords": [], + "keywords": [ + "aws", + "lambda", + "powertools", + "kafka", + "event", + "schema", + "validation", + "typescript", + "nodejs" + ], "dependencies": { "@aws-lambda-powertools/commons": "2.21.0", "@standard-schema/spec": "^1.0.0" diff --git a/packages/kafka/typedoc.json b/packages/kafka/typedoc.json new file mode 100644 index 0000000000..e56c23f196 --- /dev/null +++ b/packages/kafka/typedoc.json @@ -0,0 +1,11 @@ +{ + "extends": [ + "../../typedoc.base.json" + ], + "entryPoints": [ + "./src/index.ts", + "./src/types/types.ts", + "./src/errors.ts" + ], + "readme": "README.md" +} \ No newline at end of file