From 9d5bfd42e53cfc3a73076c144d5cb8d12e4a63ec Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Sun, 9 Mar 2025 15:01:32 +0100 Subject: [PATCH 1/6] added s3 region for copy into job related to issue #2349 --- dlt/destinations/impl/redshift/redshift.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 00e093a516..30efe2651b 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -65,17 +65,19 @@ def __init__( file_path: str, staging_credentials: Optional[CredentialsConfiguration] = None, staging_iam_role: str = None, + s3_region: str = "us-east-1", # Add region as a parameter ) -> None: super().__init__(file_path, staging_credentials) self._staging_iam_role = staging_iam_role + self._s3_region = s3_region # Store region self._job_client: "RedshiftClient" = None def run(self) -> None: self._sql_client = self._job_client.sql_client - # we assume s3 credentials where provided for the staging + # Assume S3 credentials were provided for the staging credentials = "" if self._staging_iam_role: - credentials = f"IAM_ROLE '{self._staging_iam_role}'" + credentials = f"IAM_ROLE '{self._staging_iam_role}' REGION '{self._s3_region}'" elif self._staging_credentials and isinstance( self._staging_credentials, AwsCredentialsWithoutDefaults ): @@ -83,10 +85,10 @@ def run(self) -> None: aws_secret_key = self._staging_credentials.aws_secret_access_key credentials = ( "CREDENTIALS" - f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}'" + f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={self._s3_region}'" ) - # get format + # Get format ext = os.path.splitext(self._bucket_path)[1][1:] file_type = "" dateformat = "" @@ -97,15 +99,12 @@ def run(self) -> None: compression = "" if is_compression_disabled() else "GZIP" elif ext == "parquet": file_type = "PARQUET" - # if table contains json types then SUPER field will be used. - # https://docs.aws.amazon.com/redshift/latest/dg/ingest-super.html if table_schema_has_type(self._load_table, "json"): file_type += " SERIALIZETOJSON" else: raise ValueError(f"Unsupported file type {ext} for Redshift.") with self._sql_client.begin_transaction(): - # TODO: if we ever support csv here remember to add column names to COPY self._sql_client.execute_sql(f""" COPY {self._sql_client.make_qualified_table_name(self.load_table_name)} FROM '{self._bucket_path}' From 2164cc3a321dc5b2116630ea76918444f2ef8dec Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Sun, 9 Mar 2025 15:04:25 +0100 Subject: [PATCH 2/6] aaded accidently removed comments back --- dlt/destinations/impl/redshift/redshift.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 30efe2651b..d69aca2024 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -74,7 +74,7 @@ def __init__( def run(self) -> None: self._sql_client = self._job_client.sql_client - # Assume S3 credentials were provided for the staging + # we ssume S3 credentials were provided for the staging credentials = "" if self._staging_iam_role: credentials = f"IAM_ROLE '{self._staging_iam_role}' REGION '{self._s3_region}'" @@ -88,7 +88,7 @@ def run(self) -> None: f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={self._s3_region}'" ) - # Get format + # get format ext = os.path.splitext(self._bucket_path)[1][1:] file_type = "" dateformat = "" @@ -99,12 +99,15 @@ def run(self) -> None: compression = "" if is_compression_disabled() else "GZIP" elif ext == "parquet": file_type = "PARQUET" + # if table contains json types then SUPER field will be used. + # https://docs.aws.amazon.com/redshift/latest/dg/ingest-super.html if table_schema_has_type(self._load_table, "json"): file_type += " SERIALIZETOJSON" else: raise ValueError(f"Unsupported file type {ext} for Redshift.") with self._sql_client.begin_transaction(): + # TODO: if we ever support csv here remember to add column names to COPY self._sql_client.execute_sql(f""" COPY {self._sql_client.make_qualified_table_name(self.load_table_name)} FROM '{self._bucket_path}' From 41d127cdb2905a9d173adde26ac370c4f70d5f7d Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Sun, 9 Mar 2025 15:13:19 +0100 Subject: [PATCH 3/6] fixed typo in comment --- dlt/destinations/impl/redshift/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index d69aca2024..14952bc88c 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -74,7 +74,7 @@ def __init__( def run(self) -> None: self._sql_client = self._job_client.sql_client - # we ssume S3 credentials were provided for the staging + # we assume S3 credentials were provided for the staging credentials = "" if self._staging_iam_role: credentials = f"IAM_ROLE '{self._staging_iam_role}' REGION '{self._s3_region}'" From 36f176d5acb9251a73855f77fa6d383a20a04a35 Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Tue, 11 Mar 2025 18:04:17 +0100 Subject: [PATCH 4/6] made region optional and use AwsCredentialsWithoutDefaults for region --- dlt/destinations/impl/redshift/redshift.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 14952bc88c..f8b3da798c 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -65,28 +65,35 @@ def __init__( file_path: str, staging_credentials: Optional[CredentialsConfiguration] = None, staging_iam_role: str = None, - s3_region: str = "us-east-1", # Add region as a parameter + staging_region_name: str = None, ) -> None: super().__init__(file_path, staging_credentials) self._staging_iam_role = staging_iam_role - self._s3_region = s3_region # Store region + self._region_name = staging_region_name self._job_client: "RedshiftClient" = None def run(self) -> None: self._sql_client = self._job_client.sql_client - # we assume S3 credentials were provided for the staging + # we assume s3 credentials where provided for the staging credentials = "" if self._staging_iam_role: - credentials = f"IAM_ROLE '{self._staging_iam_role}' REGION '{self._s3_region}'" + credentials = f"IAM_ROLE '{self._staging_iam_role}'" + if self._region_name: + credentials += f" REGION '{self._region_name}'" elif self._staging_credentials and isinstance( self._staging_credentials, AwsCredentialsWithoutDefaults ): aws_access_key = self._staging_credentials.aws_access_key_id aws_secret_key = self._staging_credentials.aws_secret_access_key + region_name = self._staging_credentials.region_name credentials = ( "CREDENTIALS" - f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={self._s3_region}'" + f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}" ) + if region_name: + credentials += f"region={region_name};" + + credentials += "'" # Closing single quote # get format ext = os.path.splitext(self._bucket_path)[1][1:] From 6d93da3700375b58b073c0d02d646ef11ac90578 Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Tue, 11 Mar 2025 21:28:48 +0100 Subject: [PATCH 5/6] added new query template if the region_name exists --- dlt/destinations/impl/redshift/redshift.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index f8b3da798c..f76a9be63e 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -91,9 +91,10 @@ def run(self) -> None: f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}" ) if region_name: - credentials += f"region={region_name};" - - credentials += "'" # Closing single quote + credentials = ( + "CREDENTIALS" + f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={region_name};'" + ) # get format ext = os.path.splitext(self._bucket_path)[1][1:] From 683b91a4fa2ffdc1ed804c0f2bb45c0584a3f1fd Mon Sep 17 00:00:00 2001 From: Yannik Sacherer Date: Tue, 11 Mar 2025 21:29:56 +0100 Subject: [PATCH 6/6] indented query --- dlt/destinations/impl/redshift/redshift.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index f76a9be63e..27cb64b55d 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -92,9 +92,9 @@ def run(self) -> None: ) if region_name: credentials = ( - "CREDENTIALS" - f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={region_name};'" - ) + "CREDENTIALS" + f" 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key};region={region_name};'" + ) # get format ext = os.path.splitext(self._bucket_path)[1][1:]