Skip to content

Commit 37e1f60

Browse files
saartochner-lumigoCircleCI
andauthored
feat: add kinesis batch trigger (#257)
Co-authored-by: CircleCI <[email protected]>
1 parent 5424838 commit 37e1f60

File tree

3 files changed

+48
-7
lines changed

3 files changed

+48
-7
lines changed

.secrets.baseline

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,14 @@
168168
"filename": "src/test/unit/event/test_event_trigger.py",
169169
"hashed_secret": "885bb9903f72e004ff2974807b70e7c970d3e6d5",
170170
"is_verified": false,
171-
"line_number": 451
171+
"line_number": 476
172172
},
173173
{
174174
"type": "Hex High Entropy String",
175175
"filename": "src/test/unit/event/test_event_trigger.py",
176176
"hashed_secret": "3fae06dc55a618caed1d794dcd512bfe7e76c9f1",
177177
"is_verified": false,
178-
"line_number": 485
178+
"line_number": 510
179179
}
180180
],
181181
"src/test/unit/test_lumigo_utils.py": [
@@ -220,5 +220,5 @@
220220
}
221221
]
222222
},
223-
"generated_at": "2022-08-17T15:30:21Z"
223+
"generated_at": "2022-08-22T12:01:52Z"
224224
}

src/lumigo_tracer/event/event_trigger.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,7 @@ def _parse_streams(event: dict) -> Dict[str, str]:
227227
if triggered_by == "sqs":
228228
result.update(_parse_sqs_event(event))
229229
elif triggered_by == "kinesis":
230-
result[MESSAGE_ID_KEY] = safe_get(event, ["Records", 0, "kinesis", "sequenceNumber"])
231-
event_id = safe_get(event, ["Records", 0, "eventID"])
232-
if isinstance(event_id, str):
233-
result["shardId"] = event_id.split(":", 1)[0]
230+
result.update(_parse_kinesis_event(event))
234231
elif triggered_by == "dynamodb":
235232
result.update(_parse_dynamomdb_event(event))
236233
return result
@@ -258,6 +255,25 @@ def _parse_dynamomdb_event(event) -> Dict[str, Union[int, List[str]]]:
258255
}
259256

260257

258+
def _parse_kinesis_event(event) -> Dict[str, Union[int, str, List[str], List[Dict[str, str]]]]:
259+
result = {}
260+
message_ids = []
261+
records = safe_get(event, ["Records"], default=[])
262+
for record in records:
263+
message_id = safe_get(record, ["kinesis", "sequenceNumber"])
264+
if message_id:
265+
message_ids.append(message_id)
266+
if message_ids:
267+
if len(message_ids) == 1:
268+
result[MESSAGE_ID_KEY] = message_ids[0]
269+
else:
270+
result[MESSAGE_IDS_KEY] = message_ids
271+
event_id = safe_get(event, ["Records", 0, "eventID"])
272+
if isinstance(event_id, str):
273+
result["shardId"] = event_id.split(":", 1)[0]
274+
return result
275+
276+
261277
def _parse_sqs_event(event) -> Dict[str, Union[int, str, List[str], List[Dict[str, str]]]]:
262278
message_ids = []
263279
chained_resources: List[Dict[str, str]] = []

src/test/unit/event/test_event_trigger.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,31 @@
100100
"shardId": "shardId-000000000006",
101101
},
102102
),
103+
( # kinesis example trigger - multiple records
104+
{
105+
"Records": [
106+
{
107+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789:stream/kinesis-stream-name",
108+
"eventSource": "aws:kinesis",
109+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
110+
"kinesis": {"sequenceNumber": "12"},
111+
},
112+
{
113+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789:stream/kinesis-stream-name",
114+
"eventSource": "aws:kinesis",
115+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
116+
"kinesis": {"sequenceNumber": "34"},
117+
},
118+
]
119+
},
120+
{
121+
"triggeredBy": "kinesis",
122+
"arn": "arn:aws:kinesis:us-east-1:123456789:stream/kinesis-stream-name",
123+
"messageIds": ["12", "34"],
124+
"recordsNum": 2,
125+
"shardId": "shardId-000000000006",
126+
},
127+
),
103128
( # SQS example trigger
104129
{
105130
"Records": [

0 commit comments

Comments
 (0)