Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove timezone parameter #63

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions mara_pipelines/commands/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, file_name: str, compression: Compression, target_table: str,
mapper_script_file_name: str = None, make_unique: bool = False,
db_alias: str = None, csv_format: bool = False, skip_header: bool = False,
delimiter_char: str = None, quote_char: str = None,
null_value_string: str = None, timezone: str = None) -> None:
null_value_string: str = None) -> None:
super().__init__()
self.file_name = file_name
self.compression = compression
Expand All @@ -51,7 +51,6 @@ def __init__(self, file_name: str, compression: Compression, target_table: str,
self.delimiter_char = delimiter_char
self.quote_char = quote_char
self.null_value_string = null_value_string
self.timezone = timezone

def db_alias(self):
return self._db_alias or config.default_db_alias()
Expand All @@ -61,7 +60,7 @@ def shell_command(self):
self.db_alias(), csv_format=self.csv_format, target_table=self.target_table,
skip_header=self.skip_header,
delimiter_char=self.delimiter_char, quote_char=self.quote_char,
null_value_string=self.null_value_string, timezone=self.timezone)
null_value_string=self.null_value_string)
if not isinstance(mara_db.dbs.db(self.db_alias()), mara_db.dbs.BigQueryDB):
return \
f'{uncompressor(self.compression)} "{pathlib.Path(config.data_dir()) / self.file_name}" \\\n' \
Expand Down Expand Up @@ -93,20 +92,18 @@ def html_doc_items(self) -> [(str, str)]:
('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char != None else None]),
('null value string',
_.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None]),
('time zone', _.tt[self.timezone]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]


class ReadSQLite(sql._SQLCommand):
def __init__(self, sqlite_file_name: str, target_table: str,
sql_statement: str = None, sql_file_name: str = None, replace: {str: str} = None,
db_alias: str = None, timezone: str = None) -> None:
db_alias: str = None) -> None:
sql._SQLCommand.__init__(self, sql_statement, sql_file_name, replace)
self.sqlite_file_name = sqlite_file_name

self.target_table = target_table
self._db_alias = db_alias
self.timezone = timezone

@property
def db_alias(self):
Expand All @@ -116,14 +113,13 @@ def shell_command(self):
return (sql._SQLCommand.shell_command(self)
+ ' | ' + mara_db.shell.copy_command(
mara_db.dbs.SQLiteDB(file_name=config.data_dir().absolute() / self.sqlite_file_name),
self.db_alias, self.target_table, timezone=self.timezone))
self.db_alias, self.target_table))

def html_doc_items(self) -> [(str, str)]:
return [('sqlite file name', _.i[self.sqlite_file_name])] \
+ sql._SQLCommand.html_doc_items(self, None) \
+ [('target_table', _.tt[self.target_table]),
('db alias', _.tt[self.db_alias]),
('time zone', _.tt[self.timezone]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]


Expand All @@ -133,7 +129,7 @@ class ReadScriptOutput(pipelines.Command):
def __init__(self, file_name: str, target_table: str, make_unique: bool = False,
db_alias: str = None, csv_format: bool = False, skip_header: bool = False,
delimiter_char: str = None, quote_char: str = None,
null_value_string: str = None, timezone: str = None) -> None:
null_value_string: str = None) -> None:
super().__init__()
self.file_name = file_name
self.make_unique = make_unique
Expand All @@ -145,7 +141,6 @@ def __init__(self, file_name: str, target_table: str, make_unique: bool = False,
self.delimiter_char = delimiter_char
self.quote_char = quote_char
self.null_value_string = null_value_string
self.timezone = timezone

def db_alias(self):
return self._db_alias or config.default_db_alias()
Expand All @@ -156,7 +151,7 @@ def shell_command(self):
+ ' | ' + mara_db.shell.copy_from_stdin_command(
self.db_alias(), csv_format=self.csv_format, target_table=self.target_table, skip_header=self.skip_header,
delimiter_char=self.delimiter_char, quote_char=self.quote_char,
null_value_string=self.null_value_string, timezone=self.timezone)
null_value_string=self.null_value_string)

def file_path(self):
return self.parent.parent.base_path() / self.file_name
Expand All @@ -174,5 +169,4 @@ def html_doc_items(self) -> [(str, str)]:
('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char != None else None]),
('null value string',
_.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None]),
('time zone', _.tt[self.timezone]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]
21 changes: 7 additions & 14 deletions mara_pipelines/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def html_doc_items(self, db_alias: str):
class ExecuteSQL(_SQLCommand):
def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable] = None,
replace: {str: str} = None, file_dependencies=None, db_alias: str = None,
echo_queries: bool = None, timezone: str = None) -> None:
echo_queries: bool = None) -> None:
"""
Runs an sql file or statement in a database

Expand All @@ -94,7 +94,6 @@ def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable
_SQLCommand.__init__(self, sql_statement, sql_file_name, replace)

self._db_alias = db_alias
self.timezone = timezone
self.file_dependencies = file_dependencies or []
self.echo_queries = echo_queries

Expand Down Expand Up @@ -133,14 +132,13 @@ def run(self) -> bool:

def shell_command(self):
return _SQLCommand.shell_command(self) \
+ ' | ' + mara_db.shell.query_command(self.db_alias, self.timezone, self.echo_queries)
+ ' | ' + mara_db.shell.query_command(self.db_alias, echo_queries=self.echo_queries)

def html_doc_items(self):
return [('db', _.tt[self.db_alias]),
('file dependencies', [_.i[dependency, _.br] for dependency in self.file_dependencies])] \
+ _SQLCommand.html_doc_items(self, self.db_alias) \
+ [('echo queries', _.tt[str(self.echo_queries)]),
('timezone', _.tt[self.timezone or '']),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]


Expand All @@ -149,13 +147,12 @@ class Copy(_SQLCommand):

def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str = None,
sql_statement: str = None, sql_file_name: Union[Callable, str] = None, replace: {str: str} = None,
timezone: str = None, csv_format: bool = None, delimiter_char: str = None,
csv_format: bool = None, delimiter_char: str = None,
file_dependencies=None) -> None:
_SQLCommand.__init__(self, sql_statement, sql_file_name, replace)
self.source_db_alias = source_db_alias
self.target_table = target_table
self._target_db_alias = target_db_alias
self.timezone = timezone
self.csv_format = csv_format
self.delimiter_char = delimiter_char
self.file_dependencies = file_dependencies or []
Expand Down Expand Up @@ -197,14 +194,13 @@ def run(self) -> bool:
def shell_command(self):
return _SQLCommand.shell_command(self) \
+ ' | ' + mara_db.shell.copy_command(self.source_db_alias, self.target_db_alias, self.target_table,
self.timezone, self.csv_format, self.delimiter_char)
csv_format=self.csv_format, delimiter_char=self.delimiter_char)

def html_doc_items(self) -> [(str, str)]:
return [('source db', _.tt[self.source_db_alias])] \
+ _SQLCommand.html_doc_items(self, self.source_db_alias) \
+ [('target db', _.tt[self.target_db_alias]),
('target table', _.tt[self.target_table]),
('timezone', _.tt[self.timezone or '']),
('csv format', _.tt[self.csv_format or '']),
('delimiter char', _.tt[self.delimiter_char or '']),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]
Expand All @@ -215,7 +211,7 @@ def __init__(self, source_db_alias: str, source_table: str,
modification_comparison: str, comparison_value_placeholder: str,
target_table: str, primary_keys: [str],
sql_file_name: Union[str, Callable] = None, sql_statement: Union[str, Callable] = None,
target_db_alias: str = None, timezone: str = None, replace: {str: str} = None,
target_db_alias: str = None, replace: {str: str} = None,
use_explicit_upsert: bool = False,
csv_format: bool = None, delimiter_char: str = None,
modification_comparison_type: str = None) -> None:
Expand All @@ -241,7 +237,6 @@ def __init__(self, source_db_alias: str, source_table: str,
target_db_alias: The database to write to
target_table: The table for loading data into
primary_keys: A combination of primary key columns that are used for upserting into the target table
timezone: How to interpret timestamps in the target db
use_explicit_upsert: When True, uses an Update + Insert query combination. Otherwise ON CONFLICT DO UPDATE.
"""
_SQLCommand.__init__(self, sql_statement, sql_file_name, replace)
Expand All @@ -254,7 +249,6 @@ def __init__(self, source_db_alias: str, source_table: str,
self._target_db_alias = target_db_alias
self.target_table = target_table
self.primary_keys = primary_keys
self.timezone = timezone
self.use_explicit_upsert = use_explicit_upsert
self.csv_format = csv_format
self.delimiter_char = delimiter_char
Expand Down Expand Up @@ -397,8 +391,8 @@ def _copy_command(self, target_table, replace):
return (_SQLCommand.shell_command(self)
+ ' | ' + shell.sed_command(replace)
+ ' | ' + mara_db.shell.copy_command(self.source_db_alias, self.target_db_alias,
target_table, timezone=self.timezone,
csv_format=self.csv_format, delimiter_char=self.delimiter_char))
target_table, csv_format=self.csv_format,
delimiter_char=self.delimiter_char))

def html_doc_items(self) -> [(str, str)]:
return [('source db', _.tt[self.source_db_alias]),
Expand All @@ -410,7 +404,6 @@ def html_doc_items(self) -> [(str, str)]:
('target db', _.tt[self.target_db_alias]),
('target table', _.tt[self.target_table]),
('primary_keys', _.tt[repr(self.primary_keys)]),
('timezone', _.tt[self.timezone or '']),
('csv format', _.tt[self.csv_format or '']),
('delimiter char', _.tt[self.delimiter_char or '']),
('use explicit upsert', _.tt[repr(self.use_explicit_upsert)])]
Expand Down
22 changes: 9 additions & 13 deletions mara_pipelines/parallel_tasks/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
max_number_of_parallel_tasks: int = None, file_dependencies: [str] = None, date_regex: str = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None,
db_alias: str = None, timezone: str = None) -> None:
db_alias: str = None) -> None:
pipelines.ParallelTask.__init__(self, id=id, description=description,
max_number_of_parallel_tasks=max_number_of_parallel_tasks,
commands_before=commands_before, commands_after=commands_after)
Expand All @@ -52,7 +52,6 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read

self.target_table = target_table
self._db_alias = db_alias
self.timezone = timezone

@property
def db_alias(self):
Expand Down Expand Up @@ -180,14 +179,13 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
mapper_script_file_name: str = None, make_unique: bool = False, db_alias: str = None,
delimiter_char: str = None, quote_char: str = None, null_value_string: str = None,
skip_header: bool = None, csv_format: bool = False,
timezone: str = None, max_number_of_parallel_tasks: int = None) -> None:
max_number_of_parallel_tasks: int = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after,
db_alias=db_alias, timezone=timezone,
max_number_of_parallel_tasks=max_number_of_parallel_tasks)
db_alias=db_alias, max_number_of_parallel_tasks=max_number_of_parallel_tasks)
self.compression = compression
self.mapper_script_file_name = mapper_script_file_name or ''
self.make_unique = make_unique
Expand All @@ -202,7 +200,7 @@ def read_command(self, file_name: str) -> pipelines.Command:
mapper_script_file_name=self.mapper_script_file_name, make_unique=self.make_unique,
db_alias=self.db_alias, delimiter_char=self.delimiter_char, skip_header=self.skip_header,
quote_char=self.quote_char, null_value_string=self.null_value_string,
csv_format=self.csv_format, timezone=self.timezone)
csv_format=self.csv_format)

def html_doc_items(self) -> [(str, str)]:
path = self.parent.base_path() / self.mapper_script_file_name if self.mapper_script_file_name else ''
Expand All @@ -225,27 +223,26 @@ def html_doc_items(self) -> [(str, str)]:
_.tt[json.dumps(self.delimiter_char) if self.delimiter_char != None else None]),
('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char != None else None]),
('null value string',
_.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None]),
('time zone', _.tt[self.timezone])]
_.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None])]


class ParallelReadSqlite(_ParallelRead):
def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, sql_file_name: str,
target_table: str, file_dependencies: [str] = None, date_regex: str = None,
partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False,
commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None,
db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None) -> None:
db_alias: str = None, max_number_of_parallel_tasks: int = None) -> None:
_ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern,
read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies,
date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id,
truncate_partitions=truncate_partitions,
commands_before=commands_before, commands_after=commands_after, db_alias=db_alias,
timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks)
max_number_of_parallel_tasks=max_number_of_parallel_tasks)
self.sql_file_name = sql_file_name

def read_command(self, file_name: str) -> [pipelines.Command]:
return files.ReadSQLite(sqlite_file_name=file_name, sql_file_name=self.sql_file_name,
target_table=self.target_table, db_alias=self.db_alias, timezone=self.timezone)
target_table=self.target_table, db_alias=self.db_alias)

def sql_file_path(self):
return self.parent.base_path() / self.sql_file_name
Expand All @@ -263,5 +260,4 @@ def html_doc_items(self) -> [(str, str)]:
('target_table', _.tt[self.target_table]),
('db alias', _.tt[self.db_alias]),
('partion target table by day_id', _.tt[self.partition_target_table_by_day_id]),
('truncate partitions', _.tt[self.truncate_partitions]),
('time zone', _.tt[self.timezone])]
('truncate partitions', _.tt[self.truncate_partitions])]
Loading