-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase_utils.py
64 lines (47 loc) · 1.98 KB
/
database_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import yaml
import sqlalchemy
import psycopg2
class DatabaseConnector():
def read_db_creds(self):
with open('db_creds.yaml', 'r') as file:
creds = yaml.safe_load(file)
return creds
def init_db_engine(self):
# Read the credentials from the YAML file
creds = self.read_db_creds()
# Extract the necessary credentials
database_type = 'postgresql'
host = creds['RDS_HOST']
username = creds['RDS_USER']
password = creds['RDS_PASSWORD']
database = creds['RDS_DATABASE']
port = creds['RDS_PORT']
# Construct the database connection URL
db_conn_url = f"{database_type}://{username}:{password}@{host}:{port}/{database}"
# Create and return the SQLAlchemy engine
engine = sqlalchemy.create_engine(db_conn_url)
return engine
def list_db_tables(self):
engine = self.init_db_engine()
with engine.connect() as connection:
# Get the metadata of the database
metadata = sqlalchemy.MetaData()
metadata.reflect(bind=connection)
# Get the table names from the metadata
table_names = metadata.tables.keys()
return table_names
def upload_to_db(self, df, table_name):
DATABASE_TYPE = 'postgresql'
DBAPI = 'psycopg2'
HOST = 'localhost'
USER = 'postgres'
PASSWORD = '....'
DATABASE = 'sales_data'
PORT = 5432
local_engine = sqlalchemy.create_engine(f"{DATABASE_TYPE}+{DBAPI}://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}")
df.to_sql(table_name, local_engine, if_exists='replace')
if __name__ == '__main__':
print('Tables in Database:', DatabaseConnector().list_db_tables())
from data_cleaning import DataCleaning
from data_extraction import DataExtractor
DatabaseConnector().upload_to_db(DataCleaning().clean_user_data(),'dim_users')