-
Notifications
You must be signed in to change notification settings - Fork 2
/
etl_scripts.py
72 lines (55 loc) · 2.25 KB
/
etl_scripts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
'''
etl_scripts include all the functions used in etl_jobs.py scripts.
'''
# importing required modules
# Standard library imports
import pandas as pd
# Third party imports
from zipfile import ZipFile
from sqlalchemy import inspect, create_engine
# This function will extract the zip file
def extract_zip(zip_name, extract_path):
# opening the zip file in READ mode
with ZipFile(zip_name, 'r') as zip:
# extracting all the files
print('Extracting all the files now...')
zip.extractall(extract_path)
count_zip = len(zip.infolist())
print('total zip extracted', count_zip)
return
# This function will establish the connection with MySQL
def establish_connection(user, password, host, database):
path = 'mysql+pymysql://' + user + ':' + password + '@' + host + '/' + database
engine = create_engine(path)
print('Connection sucessfully established with engine', engine)
return engine
# This fucntion extract the column name from MySQL table
def sql_table_column(table, engine_name):
col_names = [col["name"]
for col in inspect(engine_name).get_columns(table)]
print('column names are %s for table %s' % (col_names, table))
return col_names
# This function will transform the table
def transform_table(table_name, file_path, engine_name):
Header = sql_table_column(table_name, engine_name)
path = file_path + table_name + ".csv"
print("file path is ", path)
# to read table
#data = pd.read_table(path, sep='|', names=Header, index_col=False)
# to read csv
data = pd.read_csv(path, sep=',', names=Header, index_col=False)
data = data.dropna(how='all', axis='columns')
print(data.head())
print('Table %s is tranformed' % (table_name))
return data
# This function will insert the data from python to MySQL DB
def insert_data_sql(data, sql_tablename, engine):
print('Data is trying to insert for table ' + sql_tablename)
try:
with engine.connect() as conn, conn.begin():
data.to_sql(sql_tablename, conn, if_exists='append',
index=False, index_label=True)
except Exception as e:
print('Data could not be inserted for table ' + e)
print('Data is inserted for table ', sql_tablename)
return