-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_loader.py
More file actions
58 lines (51 loc) · 2.24 KB
/
data_loader.py
File metadata and controls
58 lines (51 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from sqlalchemy import create_engine
import pandas as pd
import os
# create list of files to import
table_list = []
print('Loading list of files to upload in db.')
for path, subdirs, files in os.walk(f'{os.getcwd()}/example_data'):
for name in files:
if name in ['.DS_Store']:
pass
data = os.path.join(path, name)
data = data.split('/')
system = data[-3]
schema = data[-2]
table = data[-1].split('.')[0]
ext = data[-1].split('.')[-1]
if system not in ['teaching_airflow', 'myapp'] and table not in ['readme']:
table_list.append({
"engine": system,
"schema": schema,
"table": table,
"ext": ext
})
print(f'Load done. Found {len(table_list)} files to upload.')
print('generating db engines')
psql = create_engine('postgresql://airflow:airflow@teaching_airflow_postgres_1:5432')
mysql = create_engine('mysql://root:airflow@teaching_airflow_mysql_1:3306/public')
# getting list of schema in mysql
mysql_schema = pd.read_sql('select SCHEMA_NAME AS "schema" from information_schema.SCHEMATA;', con=mysql)['schema'].to_list()
psql_schema = pd.read_sql('Select schema_name as schema from information_schema.schemata;', con=psql)['schema'].to_list()
print('Loading data into databases')
for element in table_list:
df = pd.read_csv(f'{os.getcwd()}/example_data/{element["engine"]}/{element["schema"]}/{element["table"]}.{element["ext"]}')
db_engine = psql if element['engine'] == 'psql' else mysql
if element['engine'] == 'mysql':
if element["schema"] not in mysql_schema:
mysql.execute(f"create schema {element['schema']}; ")
mysql.execute(f"GRANT ALL PRIVILEGES ON {element['schema']}.* TO 'airflow'@'%';")
mysql_schema.append(element["schema"])
else:
if element["schema"] not in psql_schema:
psql.execute(f"create schema {element['schema']}; ")
psql_schema.append(element["schema"])
df.to_sql(
name=element['table'],
schema=element['schema'],
con=db_engine,
if_exists='replace'
)
print(f'Loaded table {element["schema"]}.{element["table"]} into {element["engine"]}')
print('Load done.')