Skip to content

Commit f25c71d

Browse files
add kinesis -> lambda support
1 parent 0d1f701 commit f25c71d

File tree

4 files changed

+114
-2
lines changed

4 files changed

+114
-2
lines changed

datadog_lambda/dsm.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def set_dsm_context(event, event_source):
1212
_dsm_set_sqs_context(event)
1313
elif event_source.equals(EventTypes.SNS):
1414
_dsm_set_sns_context(event)
15+
elif event_source.equals(EventTypes.KINESIS):
16+
_dsm_set_kinesis_context(event)
1517

1618

1719
def _dsm_set_sqs_context(event):
@@ -37,6 +39,16 @@ def _dsm_set_sns_context(event):
3739
_set_dsm_context_for_record(sns_data, "sns", arn)
3840

3941

42+
def _dsm_set_kinesis_context(event):
43+
records = event.get("Records")
44+
if records is None:
45+
return
46+
47+
for record in records:
48+
arn = record.get("eventSourceARN", "")
49+
_set_dsm_context_for_record(record, "kinesis", arn)
50+
51+
4052
def _set_dsm_context_for_record(record, type, arn):
4153
from ddtrace.data_streams import set_consume_checkpoint
4254

4.34 MB
Binary file not shown.
4.31 MB
Binary file not shown.

tests/test_dsm.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
set_dsm_context,
88
_dsm_set_sqs_context,
99
_dsm_set_sns_context,
10+
_dsm_set_kinesis_context,
1011
_get_dsm_context_from_lambda,
1112
)
1213
from datadog_lambda.trigger import EventTypes, _EventSource
@@ -24,12 +25,14 @@ def setUp(self):
2425

2526
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda")
2627
self.mock_get_dsm_context_from_lambda = patcher.start()
28+
self.addCleanup(patcher.stop)
29+
2730
patcher = patch("datadog_lambda.dsm._dsm_set_sns_context")
2831
self.mock_dsm_set_sns_context = patcher.start()
2932
self.addCleanup(patcher.stop)
3033

31-
patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
32-
self.mock_data_streams_processor = patcher.start()
34+
patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context")
35+
self.mock_dsm_set_kinesis_context = patcher.start()
3336
self.addCleanup(patcher.stop)
3437

3538
def test_non_sqs_event_source_does_nothing(self):
@@ -263,6 +266,103 @@ def test_sns_multiple_records_process_each_record(self):
263266
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
264267
self.assertEqual(pathway_ctx, expected_contexts[i])
265268

269+
def test_kinesis_event_with_no_records_does_nothing(self):
270+
"""Test that events where Records is None don't trigger DSM processing"""
271+
events_with_no_records = [
272+
{},
273+
{"Records": None},
274+
{"someOtherField": "value"},
275+
]
276+
277+
for event in events_with_no_records:
278+
_dsm_set_kinesis_context(event)
279+
self.mock_set_consume_checkpoint.assert_not_called()
280+
281+
def test_kinesis_event_triggers_dsm_kinesis_context(self):
282+
"""Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
283+
kinesis_event = {
284+
"Records": [
285+
{
286+
"eventSource": "aws:kinesis",
287+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
288+
"kinesis": {
289+
"data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==",
290+
"partitionKey": "partition-key",
291+
},
292+
}
293+
]
294+
}
295+
296+
event_source = _EventSource(EventTypes.KINESIS)
297+
set_dsm_context(kinesis_event, event_source)
298+
299+
self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event)
300+
301+
def test_kinesis_multiple_records_process_each_record(self):
302+
"""Test that each record in a Kinesis event gets processed individually"""
303+
multi_record_event = {
304+
"Records": [
305+
{
306+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
307+
"kinesis": {
308+
"data": base64.b64encode(
309+
json.dumps({"dd-pathway-ctx-base64": "context1"}).encode("utf-8")
310+
).decode("utf-8"),
311+
"partitionKey": "partition-1",
312+
},
313+
},
314+
{
315+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
316+
"kinesis": {
317+
"data": base64.b64encode(
318+
json.dumps({"dd-pathway-ctx-base64": "context2"}).encode("utf-8")
319+
).decode("utf-8"),
320+
"partitionKey": "partition-2",
321+
},
322+
},
323+
{
324+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
325+
"kinesis": {
326+
"data": base64.b64encode(
327+
json.dumps({"dd-pathway-ctx-base64": "context3"}).encode("utf-8")
328+
).decode("utf-8"),
329+
"partitionKey": "partition-3",
330+
},
331+
},
332+
]
333+
}
334+
335+
self.mock_get_dsm_context_from_lambda.side_effect = [
336+
{"dd-pathway-ctx-base64": "context1"},
337+
{"dd-pathway-ctx-base64": "context2"},
338+
{"dd-pathway-ctx-base64": "context3"},
339+
]
340+
341+
_dsm_set_kinesis_context(multi_record_event)
342+
343+
self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3)
344+
345+
calls = self.mock_set_consume_checkpoint.call_args_list
346+
expected_arns = [
347+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
348+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
349+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
350+
]
351+
expected_contexts = ["context1", "context2", "context3"]
352+
353+
for i, call in enumerate(calls):
354+
args, kwargs = call
355+
service_type = args[0]
356+
arn = args[1]
357+
carrier_get_func = args[2]
358+
359+
self.assertEqual(service_type, "kinesis")
360+
361+
self.assertEqual(arn, expected_arns[i])
362+
363+
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
364+
self.assertEqual(pathway_ctx, expected_contexts[i])
365+
266366

267367
class TestGetDSMContext(unittest.TestCase):
268368
def test_sqs_to_lambda_string_value_format(self):

0 commit comments

Comments
 (0)