From 05038466f6ed0b9c6212da1eb9682ed6ccff1b14 Mon Sep 17 00:00:00 2001 From: xiaowei sun <56322541+xiaowei-zillow@users.noreply.github.com> Date: Wed, 19 Jul 2023 09:53:33 -0700 Subject: [PATCH] Update aws.py (#38) * Update aws.py add function for users to send message to aws sqs * Update aws.py fix the format * Update aws.py reformat --- zdatasets/utils/aws.py | 55 +++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/zdatasets/utils/aws.py b/zdatasets/utils/aws.py index ba0b864..6cb3a20 100644 --- a/zdatasets/utils/aws.py +++ b/zdatasets/utils/aws.py @@ -37,17 +37,14 @@ def get_aws_session(role_arn: str = None, profile_name: str = None) -> Session: role_arn=role_arn, ) - # Retrieve crednetials of the assumed role and auto-refresh - credentials = DeferredRefreshableCredentials( - method="assume-role", refresh_using=fetcher.fetch_credentials + # Create new session with assumed role and auto-refresh + botocore_session = Session() + botocore_session._credentials = DeferredRefreshableCredentials( + method="assume-role", + refresh_using=fetcher.fetch_credentials, ) - return boto3.Session( - aws_access_key_id=credentials.access_key, - aws_secret_access_key=credentials.secret_key, - aws_session_token=credentials.token, - region_name=region_name, - ) + return boto3.Session(botocore_session=botocore_session, region_name=region_name) def get_aws_client(role_arn: str, service: str): @@ -63,6 +60,46 @@ def get_aws_client(role_arn: str, service: str): return session.client(service) +def send_sqs_message( + queue_url: str, + message_body: str, + *, # keyword-only parameters + role_arn: str = None, + profile_name: str = None, +) -> None: + """ + Args: + queue_url: the url of SQS to write messages to, + i.e.'https://sqs..amazonaws.com//' + message_body: i.e. ‘{"date_key": "2023-01-01", "accuracy_threshold": 0.5}' + role_arn: optional + profile_name: optional, mainly for testing/debugging purposes + Returns: None + Exceptions: + botocore.exceptions.ClientError # no permissions to assume role or to SendMessage to SQS + AWS.SimpleQueueService.NonExistentQueue + SQS.Client.exceptions.InvalidMessageContents + SQS.Client.exceptions.UnsupportedOperation + """ + # Create session from given iam role and/or aws profile + session = get_aws_session(role_arn, profile_name) + + # Create an SQS client from the session + sqs = session.client("sqs") + + # Send message to SQS queue + try: + response = sqs.send_message(QueueUrl=queue_url, MessageBody=message_body) + + _logger.debug( + f"Successfully sent the message {message_body} " + f"to sqs {queue_url} with MessageId {response['MessageId']}" + ) + except Exception as err: + _logger.error(f"Failed to send the message {message_body} to sqs {queue_url}") + raise err + + def get_paginated_list_objects_iterator( s3_client: boto3.session.Session.client, search: Optional[str] = "Content", **kwargs ) -> Iterable: