Skip to content

feat(kafka): add support for Confluence Producers #6833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion aws_lambda_powertools/utilities/kafka/deserializer/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Any

from google.protobuf.internal.decoder import _DecodeVarint # type: ignore[attr-defined]
from google.protobuf.json_format import MessageToDict

from aws_lambda_powertools.utilities.kafka.deserializer.base import DeserializerBase
Expand Down Expand Up @@ -43,6 +44,12 @@ def deserialize(self, data: bytes | str) -> dict:
When the data cannot be deserialized according to the message class,
typically due to data format incompatibility or incorrect message class.

Notes
-----
This deserializer handles both standard Protocol Buffer format and the Confluent
Schema Registry format which includes message index information. It will first try
standard deserialization and fall back to message index handling if needed.

Example
--------
>>> # Assuming proper protobuf setup
Expand All @@ -54,11 +61,56 @@ def deserialize(self, data: bytes | str) -> dict:
... except KafkaConsumerDeserializationError as e:
... print(f"Failed to deserialize: {e}")
"""
value = self._decode_input(data)
try:
value = self._decode_input(data)
message = self.message_class()
message.ParseFromString(value)
return MessageToDict(message, preserving_proto_field_name=True)
except Exception:
return self._deserialize_with_message_index(value, self.message_class())

def _deserialize_with_message_index(self, data: bytes, parser: Any) -> dict:
"""
Deserialize protobuf message with Confluent message index handling.

Parameters
----------
data : bytes
data
parser : google.protobuf.message.Message
Protobuf message instance to parse the data into

Returns
-------
dict
Dictionary representation of the parsed protobuf message with original field names

Raises
------
KafkaConsumerDeserializationError
If deserialization fails

Notes
-----
This method handles the special case of Confluent Schema Registry's message index
format, where the message is prefixed with either a single 0 (for the first schema)
or a list of schema indexes. The actual protobuf message follows these indexes.
"""

buffer = memoryview(data)
pos = 0

try:
first_value, new_pos = _DecodeVarint(buffer, pos)
pos = new_pos

if first_value != 0:
for _ in range(first_value):
_, new_pos = _DecodeVarint(buffer, pos)
pos = new_pos

parser.ParseFromString(data[pos:])
return MessageToDict(parser, preserving_proto_field_name=True)
except Exception as e:
raise KafkaConsumerDeserializationError(
f"Error trying to deserialize protobuf data - {type(e).__name__}: {str(e)}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package org.demo.kafka.protobuf;

option java_package = "org.demo.kafka.protobuf";
option java_outer_classname = "ProtobufProductOuterClass";
option java_multiple_files = true;

message ProtobufProduct {
int32 id = 1;
string name = 2;
double price = 3;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from aws_lambda_powertools.utilities.kafka.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig

# Import confluent complex schema
from .confluent_protobuf_pb2 import ProtobufProduct

# Import the generated protobuf classes
from .user_pb2 import Key, User

Expand Down Expand Up @@ -335,3 +338,87 @@ def test_kafka_consumer_without_protobuf_key_schema():
# Verify the error message mentions the missing key schema
assert "key_schema" in str(excinfo.value)
assert "PROTOBUF" in str(excinfo.value)


def test_confluent_complex_schema(lambda_context):
# GIVEN
# A scenario where a complex schema is used with the PROTOBUF schema type
complex_event = {
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "COkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
},
{
"topic": "mytopic",
"partition": 0,
"offset": 16,
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
},
{
"topic": "mytopic",
"partition": 0,
"offset": 17,
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [{"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}],
},
],
},
}

# GIVEN A Kafka consumer configured to deserialize Protobuf data
# using the User protobuf message type as the schema
schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=ProtobufProduct,
)

processed_records = []

@kafka_consumer(schema_config=schema_config)
def handler(event: ConsumerRecords, context):
for record in event.records:
processed_records.append(
{"id": record.value["id"], "name": record.value["name"], "price": record.value["price"]},
)
return {"processed": len(processed_records)}

# WHEN The handler processes a Kafka event containing Protobuf-encoded data
result = handler(complex_event, lambda_context)

# THEN
# The handler should successfully process both records
# and return the correct count
assert result == {"processed": 3}

# All records should be correctly deserialized with proper values
assert len(processed_records) == 3

# First record should contain decoded values
assert processed_records[0]["id"] == 1001
assert processed_records[0]["name"] == "Laptop"

# Second record should contain decoded values
assert processed_records[1]["id"] == 1001
assert processed_records[1]["name"] == "Laptop"

# Third record should contain decoded values
assert processed_records[2]["id"] == 1001
assert processed_records[2]["name"] == "Laptop"
Loading