Skip to content

Commit 197b092

Browse files
modified project structure
1 parent 5a15a56 commit 197b092

21 files changed

+468
-44
lines changed

pyproject.toml

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ python = "^3.11"
1111
pydantic = "^1.10.6"
1212
sqlalchemy = "^2.0.7"
1313
pandas = "^1.5.3"
14+
loguru = "^0.6.0"
15+
openpyxl = "^3.1.2"
16+
pypika = "^0.48.9"
1417

1518

1619
[build-system]

src/app/api/end.py

Whitespace-only changes.

src/app/etl_exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class AutoETLException(Exception):
2+
""" Exception handler
3+
4+
"""

src/app/logger.py

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import logging
2+
import logging.config
3+
from datetime import datetime
4+
5+
6+
def setup_logging(log_dir: str):
7+
"""Load logging configuration"""
8+
9+
log_file_name = log_dir + '/' + 'minimal-app-' + datetime.now().strftime("%Y-%m-%d") + '.log'
10+
11+
loging_config = {
12+
'version': 1,
13+
'disable_existing_loggers': False,
14+
'loggers': {
15+
'root': {
16+
'level': 'INFO',
17+
'handlers': ['debug_console_handler', 'info_rotating_file_handler'],
18+
},
19+
'src': {
20+
'level': 'DEBUG',
21+
'propagate': False,
22+
'handlers': ['info_rotating_file_handler', 'debug_console_handler'],
23+
},
24+
},
25+
'handlers': {
26+
'debug_console_handler': {
27+
'level': 'DEBUG',
28+
'formatter': 'console',
29+
'class': 'logging.StreamHandler',
30+
'stream': 'ext://sys.stdout',
31+
},
32+
'info_rotating_file_handler': {
33+
'level': 'WARN',
34+
'formatter': 'file',
35+
'class': 'logging.handlers.RotatingFileHandler',
36+
'filename': log_file_name,
37+
'mode': 'a',
38+
'maxBytes': 1048576,
39+
'backupCount': 10
40+
}
41+
},
42+
'formatters': {
43+
'console': {
44+
'format': '%(levelname)s: %(name)s - %(message)s'
45+
},
46+
'file': {
47+
'format': '%(asctime)s-%(levelname)s-%(name)s-%(process)d::%(module)s|%(lineno)s:: %(message)s'
48+
},
49+
},
50+
}
51+
52+
logging.config.dictConfig(loging_config)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .parser import ConfigParser, MetadataParser

src/app/services/config_parser/metadata_parser.py

Whitespace-only changes.
+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import json
2+
import logging
3+
from typing import Any
4+
5+
import pandas as pd
6+
from pydantic.dataclasses import dataclass
7+
8+
from app.etl_exceptions import AutoETLException
9+
from app.utils import is_valid_file
10+
11+
logger = logging.getLogger(__name__)
12+
13+
sheet_names = ["target_table", "joins_and_filters"]
14+
15+
16+
@dataclass
17+
class MetadataParser:
18+
metadata_file_path: str
19+
20+
def validate_file(self) -> pd.ExcelFile:
21+
"""method to validate the metadata excel file
22+
23+
Raises:
24+
AutoETLException: Exception if sheet names not found in the file
25+
26+
Returns:
27+
ExcelFile
28+
"""
29+
logger.info("Validating the metadata file")
30+
if not is_valid_file(self.metadata_file_path):
31+
logger.error("File path not correct - %s", self.metadata_file_path)
32+
raise AutoETLException(
33+
f"File path not correct - {self.metadata_file_path}")
34+
meta_xls = pd.ExcelFile(self.metadata_file_path)
35+
current_sheets = meta_xls.sheet_names
36+
37+
if not all(sheet in current_sheets for sheet in sheet_names):
38+
logger.error(
39+
"Sheet names not set properly in metadata excel file")
40+
raise AutoETLException(
41+
"Sheet names not set properly in metadata excel file")
42+
return meta_xls
43+
44+
@classmethod
45+
def get_metadata_parser(cls,
46+
file_path: str) -> 'MetadataParser':
47+
"""method to get the metadata parser object
48+
49+
Args:
50+
file_path (str): path of the metadata file
51+
52+
Returns:
53+
MetadataParser
54+
"""
55+
return MetadataParser(file_path)
56+
57+
58+
@dataclass
59+
class ConfigParser:
60+
config_file_path: str
61+
62+
def validate_file(self) -> Any:
63+
"""method to validate the config Json file
64+
65+
Raises:
66+
AutoETLException: Exception if required configs are missing
67+
68+
Returns:
69+
Dictionary
70+
"""
71+
logger.info("Validating the config file")
72+
if not is_valid_file(self.config_file_path):
73+
logger.error("File path not correct - %s", self.config_file_path)
74+
raise AutoETLException(
75+
f"File path not correct - {self.config_file_path}")
76+
77+
with open(self.config_file_path) as fp:
78+
config_json = json.load(fp)
79+
80+
if config_json.get('target', None) is None:
81+
logger.error(
82+
"Target system not set properly in config file")
83+
raise AutoETLException(
84+
"Target system not set properly in config file")
85+
return config_json
86+
87+
@classmethod
88+
def get_config_parser(cls,
89+
file_path: str) -> 'ConfigParser':
90+
"""method to get the config parser object
91+
92+
Args:
93+
file_path (str): path of the config file
94+
95+
Returns:
96+
ConfigParser
97+
"""
98+
return ConfigParser(file_path)

src/app/services/io/__init__.py

Whitespace-only changes.

src/app/services/io/databases/__init__.py

Whitespace-only changes.

src/app/services/io/files/__init__.py

Whitespace-only changes.

src/app/services/query_builder.py

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .dialact import RedshiftDialact
2+
from .query_builder import QueryBuilder
+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import logging
2+
from typing import Any, Dict, List
3+
4+
from pydantic.dataclasses import dataclass
5+
from pypika import Query, Schema, Table
6+
from pypika.enums import Dialects, JoinType
7+
from pypika.queries import QueryBuilder
8+
from pypika.utils import builder
9+
10+
from app.services.query_builder.join_queries import JoinQuery
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class RedshiftQuery(Query):
16+
"""
17+
Query class for AWS Redshift
18+
"""
19+
20+
@classmethod
21+
def _builder(cls, **kwargs: Any) -> "RedshiftQueryBuilder":
22+
return RedshiftQueryBuilder(**kwargs)
23+
24+
25+
class RedshiftQueryBuilder(QueryBuilder):
26+
QUERY_CLS = RedshiftQuery
27+
28+
def __init__(self, **kwargs: Any) -> None:
29+
super().__init__(dialect=Dialects.REDSHIFT, **kwargs)
30+
31+
@builder
32+
def join(
33+
self, item: Table, how: JoinType = JoinType.inner
34+
) -> "JoinQuery":
35+
if isinstance(item, Table):
36+
return JoinQuery(self, item, how, label="table")
37+
38+
raise ValueError(f"Cannot join on type {type(item)}")
39+
40+
def inner_join(self, item: Table) -> "JoinQuery":
41+
return self.join(item, JoinType.inner)
42+
43+
def left_join(self, item: Table) -> "JoinQuery":
44+
return self.join(item, JoinType.left)
45+
46+
47+
@dataclass()
48+
class RedshiftDialact:
49+
target_table_conf: List[Dict]
50+
joins_and_filers_conf: Dict[str, Dict]
51+
52+
def build(self) -> None:
53+
"""Method to trrigger Redshift query builder
54+
"""
55+
logger.info('building Redshift query from the mappings file')
56+
57+
_query = RedshiftQuery()
58+
for _index in self.joins_and_filers_conf:
59+
60+
_map = self.joins_and_filers_conf[_index]
61+
62+
if int(_index) == 0:
63+
schema1, schema2 = Schema(_map['driving_table'].split(
64+
'.')[0]), Schema(_map['reference_table'].split('.')[0])
65+
66+
table1, table2 = Table(_map['driving_table'].split('.')[1],
67+
schema=schema1, alias=_map['driving_table_alias']), \
68+
Table(_map['reference_table'].split('.')[1],
69+
schema=schema2, alias=_map['reference_table_alias'])
70+
71+
_query = _query.from_(table1).inner_join( # type: ignore
72+
table2).on(_map['join_condition'])
73+
else:
74+
schema2 = Schema(_map['reference_table'].split(
75+
'.')[0])
76+
77+
table2 = Table(_map['reference_table'].split('.')[1],
78+
schema=schema2, alias=_map['driving_table_alias'])
79+
80+
_query = _query.left_join(table2).on(
81+
_map['join_condition'])
82+
83+
print(_query.select('*'))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
from typing import Any, Optional, Union
3+
4+
from pydantic.dataclasses import dataclass
5+
from pypika import Criterion, Table
6+
from pypika.enums import JoinType
7+
from pypika.queries import Join, Joiner, QueryBuilder
8+
from pypika.terms import Term
9+
10+
from app.etl_exceptions import AutoETLException
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Config:
16+
arbitrary_types_allowed = True
17+
18+
19+
@dataclass(config=Config)
20+
class JoinQuery(Joiner):
21+
query: QueryBuilder
22+
item: Table
23+
how: JoinType
24+
label: str
25+
26+
def __post_init__(self):
27+
super().__init__(self.query, self.item, self.how, self.label)
28+
29+
def on(self, criterion: Criterion, collate: Optional[str] = None) -> QueryBuilder:
30+
if criterion is None:
31+
logger.error(
32+
"Parameter 'criterion' is required for a %s JOIN but was not supplied.", self.label)
33+
raise AutoETLException(
34+
f"Parameter 'criterion' is required for a {self.label} JOIN but was not supplied.")
35+
36+
self.query.do_join(JoinQueryOn(
37+
self.item, self.how, criterion, collate)) # type: ignore
38+
return self.query
39+
40+
41+
@dataclass(config=Config)
42+
class JoinQueryOn(Join):
43+
def __init__(self, item: Term, how: JoinType, criteria: Union[QueryBuilder, str],
44+
collate: Optional[str] = None) -> None:
45+
super().__init__(item, how)
46+
self.criterion = criteria
47+
self.collate = collate
48+
49+
def get_sql(self, **kwargs: Any) -> str:
50+
join_sql = super().get_sql(**kwargs)
51+
criteria = self.criterion if isinstance(
52+
self.criterion, str) else self.criterion.get_sql(subquery=True, **kwargs)
53+
return f"{join_sql} ON {criteria} {f' COLLATE {self.collate}' if self.collate else ''}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import dataclasses
2+
import logging
3+
import os
4+
5+
from pydantic.dataclasses import dataclass
6+
7+
from app.etl_exceptions import AutoETLException
8+
from app.services.config_parser import ConfigParser, MetadataParser
9+
from app.services.query_builder import RedshiftDialact
10+
from app.utils import excel_to_json, validate_joins_mapping
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Config:
16+
arbitrary_types_allowed = True
17+
18+
19+
@dataclass(config=Config)
20+
class QueryBuilder:
21+
metadata_file_path: str
22+
config_file_path: str
23+
metadata_parser: MetadataParser = dataclasses.field(init=False)
24+
config_parser: ConfigParser = dataclasses.field(init=False)
25+
26+
def __post_init__(self):
27+
self.metadata_parser: MetadataParser = MetadataParser.get_metadata_parser(
28+
self.metadata_file_path)
29+
self.config_parser: ConfigParser = ConfigParser.get_config_parser(
30+
self.config_file_path)
31+
32+
def run(self) -> None:
33+
""" Method to trigger the build
34+
"""
35+
logger.info("Initialising Auto ETL")
36+
logger.info("Found metadata file - %s",
37+
os.path.basename(self.metadata_file_path))
38+
logger.info("Found config file - %s",
39+
os.path.basename(self.config_file_path))
40+
41+
meta_xls = self.metadata_parser.validate_file()
42+
_config = self.config_parser.validate_file()
43+
44+
logger.info("building query from meta_file -> %s",
45+
{self.metadata_file_path})
46+
47+
target_table_json = excel_to_json(meta_xls, 'target_table', 'records')
48+
joins_and_filters = excel_to_json(
49+
meta_xls, 'joins_and_filters', 'index')
50+
51+
validate_joins_mapping(joins_and_filters)
52+
53+
match _config['target']:
54+
case 'redshift':
55+
RedshiftDialact(target_table_json, joins_and_filters).build()
56+
57+
case _:
58+
logger.error(
59+
"Target system - %s not supported yet.", _config.get('target', 'None'))
60+
raise AutoETLException(
61+
f"Target system - {_config['target']} not supported yet.")

0 commit comments

Comments
 (0)