Skip to content

Commit 819b6d7

Browse files
hmstepanekmergify[bot]TimPansino
authored
Add aws entity support for Kinesis (#1283)
* Add support for kinesis * Fixup: comment * Minor tweaks to kinesis * Fixup: missing library name --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Tim Pansino <[email protected]>
1 parent 95d54a7 commit 819b6d7

File tree

2 files changed

+443
-12
lines changed

2 files changed

+443
-12
lines changed

newrelic/hooks/external_botocore.py

+202-12
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,17 @@ def extract_sqs(*args, **kwargs):
5858
return queue_value.rsplit("/", 1)[-1]
5959

6060

61-
def extract_sqs_agent_attrs(*args, **kwargs):
61+
def extract_kinesis(*args, **kwargs):
62+
# The stream name can be passed as the StreamName or as part of the StreamARN.
63+
stream_value = kwargs.get("StreamName", "Unknown")
64+
if stream_value == "Unknown":
65+
arn = kwargs.get("StreamARN", None)
66+
if arn:
67+
stream_value = arn.split("/", 1)[-1]
68+
return stream_value
69+
70+
71+
def extract_sqs_agent_attrs(instance, *args, **kwargs):
6272
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
6373
agent_attrs = {}
6474
try:
@@ -75,6 +85,31 @@ def extract_sqs_agent_attrs(*args, **kwargs):
7585
return agent_attrs
7686

7787

88+
def extract_kinesis_agent_attrs(instance, *args, **kwargs):
89+
# Try to capture AWS Kinesis ARN from the StreamARN parameter or by generating the ARN from various discoverable
90+
# info. Log any exception to debug.
91+
agent_attrs = {}
92+
try:
93+
stream_arn = kwargs.get("StreamARN", None)
94+
if stream_arn:
95+
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
96+
agent_attrs["cloud.resource_id"] = stream_arn
97+
else:
98+
stream_name = kwargs.get("StreamName", None)
99+
transaction = current_transaction()
100+
settings = transaction.settings if transaction.settings else global_settings()
101+
account_id = settings.cloud.aws.account_id if settings and settings.cloud.aws.account_id else None
102+
region = None
103+
if hasattr(instance, "_client_config") and hasattr(instance._client_config, "region_name"):
104+
region = instance._client_config.region_name
105+
if stream_name and account_id and region:
106+
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
107+
agent_attrs["cloud.resource_id"] = f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}"
108+
except Exception as e:
109+
_logger.debug("Failed to capture AWS Kinesis info.", exc_info=True)
110+
return agent_attrs
111+
112+
78113
def extract(argument_names, default=None):
79114
def extractor_list(*args, **kwargs):
80115
for argument_name in argument_names:
@@ -954,17 +989,61 @@ def _nr_dynamodb_datastore_trace_wrapper_(wrapped, instance, args, kwargs):
954989
return _nr_dynamodb_datastore_trace_wrapper_
955990

956991

957-
def sqs_message_trace(
992+
def aws_function_trace(
993+
operation,
994+
destination_name,
995+
params={},
996+
terminal=False,
997+
async_wrapper=None,
998+
extract_agent_attrs=None,
999+
library=None,
1000+
):
1001+
@function_wrapper
1002+
def _nr_aws_function_trace_wrapper_(wrapped, instance, args, kwargs):
1003+
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
1004+
if not wrapper:
1005+
parent = current_trace()
1006+
if not parent:
1007+
return wrapped(*args, **kwargs)
1008+
else:
1009+
parent = None
1010+
1011+
_destination_name = destination_name(*args, **kwargs)
1012+
1013+
trace = FunctionTrace(
1014+
name=_destination_name,
1015+
group=f"{library}/{operation}",
1016+
params=params,
1017+
terminal=terminal,
1018+
parent=parent,
1019+
source=wrapped,
1020+
)
1021+
1022+
# Attach extracted agent attributes.
1023+
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
1024+
trace.agent_attributes.update(_agent_attrs)
1025+
1026+
if wrapper: # pylint: disable=W0125,W0126
1027+
return wrapper(wrapped, trace)(*args, **kwargs)
1028+
1029+
with trace:
1030+
return wrapped(*args, **kwargs)
1031+
1032+
return _nr_aws_function_trace_wrapper_
1033+
1034+
1035+
def aws_message_trace(
9581036
operation,
9591037
destination_type,
9601038
destination_name,
9611039
params={},
9621040
terminal=True,
9631041
async_wrapper=None,
9641042
extract_agent_attrs=None,
1043+
library=None,
9651044
):
9661045
@function_wrapper
967-
def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
1046+
def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs):
9681047
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
9691048
if not wrapper:
9701049
parent = current_trace()
@@ -973,7 +1052,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
9731052
else:
9741053
parent = None
9751054

976-
_library = "SQS"
1055+
_library = library
9771056
_operation = operation
9781057
_destination_type = destination_type
9791058
_destination_name = destination_name(*args, **kwargs)
@@ -990,7 +1069,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
9901069
)
9911070

9921071
# Attach extracted agent attributes.
993-
_agent_attrs = extract_agent_attrs(*args, **kwargs)
1072+
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
9941073
trace.agent_attributes.update(_agent_attrs)
9951074

9961075
if wrapper: # pylint: disable=W0125,W0126
@@ -999,7 +1078,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
9991078
with trace:
10001079
return wrapped(*args, **kwargs)
10011080

1002-
return _nr_sqs_message_trace_wrapper_
1081+
return _nr_aws_message_trace_wrapper_
10031082

10041083

10051084
def wrap_emit_api_params(wrapped, instance, args, kwargs):
@@ -1059,14 +1138,125 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
10591138
("dynamodb", "delete_table"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
10601139
("dynamodb", "query"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "query"),
10611140
("dynamodb", "scan"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "scan"),
1062-
("sqs", "send_message"): sqs_message_trace(
1063-
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
1141+
("kinesis", "add_tags_to_stream"): aws_function_trace(
1142+
"add_tags_to_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1143+
),
1144+
("kinesis", "create_stream"): aws_function_trace(
1145+
"create_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1146+
),
1147+
("kinesis", "decrease_stream_retention_period"): aws_function_trace(
1148+
"decrease_stream_retention_period",
1149+
extract_kinesis,
1150+
extract_agent_attrs=extract_kinesis_agent_attrs,
1151+
library="Kinesis",
1152+
),
1153+
("kinesis", "delete_resource_policy"): aws_function_trace(
1154+
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1155+
),
1156+
("kinesis", "delete_stream"): aws_function_trace(
1157+
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1158+
),
1159+
("kinesis", "deregister_stream_consumer"): aws_function_trace(
1160+
"deregister_stream_consumer",
1161+
extract_kinesis,
1162+
extract_agent_attrs=extract_kinesis_agent_attrs,
1163+
library="Kinesis",
1164+
),
1165+
("kinesis", "describe_limits"): aws_function_trace(
1166+
"describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1167+
),
1168+
("kinesis", "describe_stream"): aws_function_trace(
1169+
"describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1170+
),
1171+
("kinesis", "describe_stream_consumer"): aws_function_trace(
1172+
"describe_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1173+
),
1174+
("kinesis", "describe_stream_summary"): aws_function_trace(
1175+
"describe_stream_summary", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1176+
),
1177+
("kinesis", "disable_enhanced_monitoring"): aws_function_trace(
1178+
"disable_enhanced_monitoring",
1179+
extract_kinesis,
1180+
extract_agent_attrs=extract_kinesis_agent_attrs,
1181+
library="Kinesis",
1182+
),
1183+
("kinesis", "enable_enhanced_monitoring"): aws_function_trace(
1184+
"enable_enhanced_monitoring",
1185+
extract_kinesis,
1186+
extract_agent_attrs=extract_kinesis_agent_attrs,
1187+
library="Kinesis",
1188+
),
1189+
("kinesis", "get_resource_policy"): aws_function_trace(
1190+
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1191+
),
1192+
("kinesis", "get_shard_iterator"): aws_function_trace(
1193+
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1194+
),
1195+
("kinesis", "increase_stream_retention_period"): aws_function_trace(
1196+
"increase_stream_retention_period",
1197+
extract_kinesis,
1198+
extract_agent_attrs=extract_kinesis_agent_attrs,
1199+
library="Kinesis",
1200+
),
1201+
("kinesis", "list_shards"): aws_function_trace(
1202+
"list_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1203+
),
1204+
("kinesis", "list_stream_consumers"): aws_function_trace(
1205+
"list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1206+
),
1207+
("kinesis", "list_streams"): aws_function_trace(
1208+
"list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1209+
),
1210+
("kinesis", "list_tags_for_stream"): aws_function_trace(
1211+
"list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1212+
),
1213+
("kinesis", "merge_shards"): aws_function_trace(
1214+
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1215+
),
1216+
("kinesis", "put_resource_policy"): aws_function_trace(
1217+
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1218+
),
1219+
("kinesis", "register_stream_consumer"): aws_function_trace(
1220+
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1221+
),
1222+
("kinesis", "remove_tags_from_stream"): aws_function_trace(
1223+
"remove_tags_from_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1224+
),
1225+
("kinesis", "split_shard"): aws_function_trace(
1226+
"split_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1227+
),
1228+
("kinesis", "start_stream_encryption"): aws_function_trace(
1229+
"start_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1230+
),
1231+
("kinesis", "stop_stream_encryption"): aws_function_trace(
1232+
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1233+
),
1234+
("kinesis", "subscribe_to_shard"): aws_function_trace(
1235+
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1236+
),
1237+
("kinesis", "update_shard_count"): aws_function_trace(
1238+
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1239+
),
1240+
("kinesis", "update_stream_mode"): aws_function_trace(
1241+
"update_stream_mode", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1242+
),
1243+
("kinesis", "put_record"): aws_message_trace(
1244+
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1245+
),
1246+
("kinesis", "put_records"): aws_message_trace(
1247+
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1248+
),
1249+
("kinesis", "get_records"): aws_message_trace(
1250+
"Consume", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
1251+
),
1252+
("sqs", "send_message"): aws_message_trace(
1253+
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
10641254
),
1065-
("sqs", "send_message_batch"): sqs_message_trace(
1066-
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
1255+
("sqs", "send_message_batch"): aws_message_trace(
1256+
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
10671257
),
1068-
("sqs", "receive_message"): sqs_message_trace(
1069-
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
1258+
("sqs", "receive_message"): aws_message_trace(
1259+
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
10701260
),
10711261
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
10721262
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(

0 commit comments

Comments
 (0)