@@ -71,8 +71,6 @@ def get_jdbc_url(self) -> str:
7171 return (
7272 f"jdbc:{ self ._DRIVER } ://{ self ._get_secret ('host' )} :{ self ._get_secret ('port' )} ;"
7373 f"databaseName={ self ._get_secret ('database' )} ;"
74- f"user={ self ._get_secret ('user' )} ;"
75- f"password={ self ._get_secret ('password' )} ;"
7674 f"encrypt={ self ._get_secret ('encrypt' )} ;"
7775 f"trustServerCertificate={ self ._get_secret ('trustServerCertificate' )} ;"
7876 )
@@ -96,10 +94,10 @@ def read_data(
9694 prepare_query_string = ""
9795 try :
9896 if options is None :
99- df = self .reader (query , prepare_query_string ).load ()
97+ df = self .reader (query , { "prepareQuery" : prepare_query_string } ).load ()
10098 else :
101- options = self ._get_jdbc_reader_options (options )
102- df = self ._get_jdbc_reader (table_query , self . get_jdbc_url , self . _DRIVER ). options ( ** options ).load ()
99+ spark_options = self ._get_jdbc_reader_options (options )
100+ df = self .reader (table_query , spark_options ).load ()
103101 return df .select ([col (column ).alias (column .lower ()) for column in df .columns ])
104102 except (RuntimeError , PySparkException ) as e :
105103 return self .log_and_throw_exception (e , "data" , table_query )
@@ -126,15 +124,22 @@ def get_schema(
126124 try :
127125 logger .debug (f"Fetching schema using query: \n `{ schema_query } `" )
128126 logger .info (f"Fetching Schema: Started at: { datetime .now ()} " )
129- df = self .reader (schema_query ).load ()
127+ df = self .reader (schema_query , {} ).load ()
130128 schema_metadata = df .select ([col (c ).alias (c .lower ()) for c in df .columns ]).collect ()
131129 logger .info (f"Schema fetched successfully. Completed at: { datetime .now ()} " )
132130 return [self ._map_meta_column (field , normalize ) for field in schema_metadata ]
133131 except (RuntimeError , PySparkException ) as e :
134132 return self .log_and_throw_exception (e , "schema" , schema_query )
135133
136- def reader (self , query : str , prepare_query_str = "" ) -> DataFrameReader :
137- return self ._get_jdbc_reader (query , self .get_jdbc_url , self ._DRIVER , {"prepareQuery" : prepare_query_str })
134+ def reader (self , query : str , options : dict ) -> DataFrameReader :
135+ creds = self ._get_user_password ()
136+ return self ._get_jdbc_reader (query , self .get_jdbc_url , self ._DRIVER , {** options , ** creds })
137+
138+ def _get_user_password (self ) -> dict :
139+ return {
140+ "user" : self ._get_secret ("user" ),
141+ "password" : self ._get_secret ("password" ),
142+ }
138143
139144 def normalize_identifier (self , identifier : str ) -> NormalizedIdentifier :
140145 return DialectUtils .normalize_identifier (
0 commit comments