Skip to content

docs(kafka): add kafka documentation #6834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
973 changes: 973 additions & 0 deletions docs/utilities/kafka.md

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions examples/kafka/consumer/src/getting_started_with_avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Define the Avro schema
avro_schema = """
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""

# Configure schema
schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=avro_schema,
)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
user = record.value # Dictionary from avro message

logger.info(f"Processing user: {user['name']}, age {user['age']}")

return {"statusCode": 200}
18 changes: 18 additions & 0 deletions examples/kafka/consumer/src/getting_started_with_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Configure schema
schema_config = SchemaConfig(value_schema_type="JSON")


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
user = record.value # Dictionary from avro message

logger.info(f"Processing user: {user['name']}, age {user['age']}")

return {"statusCode": 200}
24 changes: 24 additions & 0 deletions examples/kafka/consumer/src/getting_started_with_protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

# Import generated protobuf class
from .user_pb2 import User # type: ignore[import-not-found]

logger = Logger()

# Configure schema for protobuf
schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User, # The protobuf message class
)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
user = record.value # Dictionary from avro message

logger.info(f"Processing user: {user['name']}, age {user['age']}")

return {"statusCode": 200}
49 changes: 49 additions & 0 deletions examples/kafka/consumer/src/working_with_key_and_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Define schemas for both components
key_schema = """
{
"type": "record",
"name": "ProductKey",
"fields": [
{"name": "product_id", "type": "string"}
]
}
"""

value_schema = """
{
"type": "record",
"name": "ProductInfo",
"fields": [
{"name": "name", "type": "string"},
{"name": "price", "type": "double"},
{"name": "in_stock", "type": "boolean"}
]
}
"""

# Configure both key and value schemas
schema_config = SchemaConfig(
key_schema_type="AVRO",
key_schema=key_schema,
value_schema_type="AVRO",
value_schema=value_schema,
)


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Access both deserialized components
key = record.key
value = record.value

logger.info(f"Processing key: {key['product_id']}")
logger.info(f"Processing value: {value['name']}")

return {"statusCode": 200}
23 changes: 23 additions & 0 deletions examples/kafka/consumer/src/working_with_primitive_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Only configure value schema
schema_config = SchemaConfig(value_schema_type="JSON")


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Key is automatically decoded as UTF-8 string
key = record.key

# Value is deserialized as JSON
value = record.value

logger.info(f"Processing key: {key}")
logger.info(f"Processing value: {value['name']}")

return {"statusCode": 200}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()


@kafka_consumer
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Key is automatically decoded as UTF-8 string
key = record.key

# Value is automatically decoded as UTF-8 string
value = record.value

logger.info(f"Processing key: {key}")
logger.info(f"Processing value: {value}")

return {"statusCode": 200}
22 changes: 22 additions & 0 deletions examples/kafka/consumer/src/working_with_value_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()

# Configure only value schema
schema_config = SchemaConfig(value_schema_type="JSON")


@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Key remains as string (if present)
if record.key is not None:
logger.info(f"Message key: {record.key}")

# Value is deserialized as JSON
value = record.value
logger.info(f"Order #{value['order_id']} - Total: ${value['total']}")

return {"statusCode": 200}
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nav:
- core/event_handler/bedrock_agents.md
- utilities/parameters.md
- utilities/batch.md
- utilities/kafka.md
- utilities/typing.md
- utilities/validation.md
- utilities/data_classes.md
Expand Down Expand Up @@ -220,6 +221,7 @@ plugins:
- utilities/parameters.md
- utilities/batch.md
- utilities/typing.md
- utilities/kafka.md
- utilities/validation.md
- utilities/data_classes.md
- utilities/parser.md
Expand Down
Loading