Skip to content

Commit

Permalink
Merge pull request #104 from dlt-hub/rfix/usability-improvements-1
Browse files Browse the repository at this point in the history
usability improvements
  • Loading branch information
rudolfix authored Dec 2, 2022
2 parents 397fceb + 39fde94 commit 92515b9
Show file tree
Hide file tree
Showing 30 changed files with 248 additions and 166 deletions.
8 changes: 4 additions & 4 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ def main() -> None:
subparsers = parser.add_subparsers(dest="command")

init_cmd = subparsers.add_parser("init", help="Creates a new pipeline script from a selected template.")
init_cmd.add_argument("pipeline_name", help="Pipeline name. If pipeline with given name already exists it will be used as a template. Otherwise new template will be created.")
init_cmd.add_argument("destination_name", help="Name of a destination ie. bigquery or redshift")
init_cmd.add_argument("--generic", default=False, action="store_true", help="When present uses a generic template that must be completed in order to be run. Otherwise a simpler, runnable template is used that produces the debug output.")
init_cmd.add_argument("source", help="Data source name. If pipeline for given data source already exists it will be used as a template. Otherwise new template will be created.")
init_cmd.add_argument("destination", help="Name of a destination ie. bigquery or redshift")
init_cmd.add_argument("--generic", default=False, action="store_true", help="When present uses a generic template with all the dlt loading code present will be used. Otherwise a debug template is used that can be immediately run to get familiar with the dlt sources.")
init_cmd.add_argument("--branch", default=None, help="Advanced. Uses specific branch of the init repository to fetch the template.")

deploy_cmd = subparsers.add_parser("deploy", help="Creates a deployment package for a selected pipeline script")
Expand Down Expand Up @@ -132,7 +132,7 @@ def main() -> None:

exit(0)
elif args.command == "init":
init_command_wrapper(args.pipeline_name, args.destination_name, args.generic, args.branch)
init_command_wrapper(args.source, args.destination, args.generic, args.branch)
exit(0)
elif args.command == "deploy":
deploy_command_wrapper(args.pipeline_script_path, args.deployment_method, args.schedule, args.run_on_push, args.run_manually, args.branch)
Expand Down
22 changes: 11 additions & 11 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ def __init__(self, signal_code: int) -> None:
super().__init__(f"Signal {signal_code} received")


class PoolException(DltException):
"""
Thrown by worker pool to pass information when thrown during processing an item
"""
def __init__(self, pool_name: str = None, item: str = None, internal_exception: Exception = None) -> None:
# we need it to make it pickle compatible
if pool_name:
self.pool_name = pool_name
self.item = item
self.internal_exception = internal_exception
super().__init__(f"Pool {pool_name} raised on item {item} with {str(internal_exception)}")
# class PoolException(DltException):
# """
# Thrown by worker pool to pass information when thrown during processing an item
# """
# def __init__(self, pool_name: str = None, item: str = None, internal_exception: Exception = None) -> None:
# # we need it to make it pickle compatible
# if pool_name:
# self.pool_name = pool_name
# self.item = item
# self.internal_exception = internal_exception
# super().__init__(f"Pool {pool_name} raised on item {item} with {str(internal_exception)}")


class UnsupportedProcessStartMethodException(DltException):
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ def custom_pua_decode(obj: Any) -> Any:
return obj


def custom_pua_remove(obj: Any) -> Any:
"""Removes the PUA data type marker and leaves the correctly serialized type representation. Unmarked values are returned as-is."""
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - 0xF026
# decode only the PUA space defined in DECODERS
if c >=0 and c <= 6:
return obj[1:]
return obj


simplejson.loads = partial(simplejson.loads, use_decimal=False)
simplejson.load = partial(simplejson.load, use_decimal=False)
# prevent default decimal serializer (use_decimal=False) and binary serializer (encoding=None)
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ def _extract_pod_info() -> StrStr:
return filter_env_vars(["KUBE_NODE_NAME", "KUBE_POD_NAME", "KUBE_POD_NAMESPACE"])


def _extract_github_info() -> StrStr:
return filter_env_vars(["GITHUB_USER", "GITHUB_REPOSITORY"])


class _SentryHttpTransport(HttpTransport):

timeout: int = 0
Expand Down Expand Up @@ -180,6 +184,10 @@ def _init_sentry(C: RunConfiguration, version: StrStr) -> None:
pod_tags = _extract_pod_info()
for k, v in pod_tags.items():
sentry_sdk.set_tag(k, v)
# add github info
github_tags = _extract_github_info()
for k, v in github_tags.items():
sentry_sdk.set_tag(k, v)


def init_telemetry(config: RunConfiguration) -> None:
Expand Down
Empty file.
12 changes: 12 additions & 0 deletions dlt/common/normalizers/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dlt.common.exceptions import DltException


class NormalizerException(DltException):
pass


class InvalidJsonNormalizer(NormalizerException):
def __init__(self, required_normalizer: str, present_normalizer: str) -> None:
self.required_normalizer = required_normalizer
self.present_normalizer = present_normalizer
super().__init__(f"Operation requires {required_normalizer} normalizer while {present_normalizer} normalizer is present")
30 changes: 24 additions & 6 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any
from dlt.common.normalizers.exceptions import InvalidJsonNormalizer

from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny
from dlt.common.schema import Schema
Expand Down Expand Up @@ -26,15 +27,15 @@ class TDataItemRowChild(TDataItemRow, total=False):
value: Any # for lists of simple types


class JSONNormalizerConfigPropagation(TypedDict, total=True):
class RelationalNormalizerConfigPropagation(TypedDict, total=True):
root: Optional[Mapping[str, TColumnName]]
tables: Optional[Mapping[str, Mapping[str, TColumnName]]]


class JSONNormalizerConfig(TypedDict, total=True):
class RelationalNormalizerConfig(TypedDict, total=False):
generate_dlt_id: Optional[bool]
max_nesting: Optional[int]
propagation: Optional[JSONNormalizerConfigPropagation]
propagation: Optional[RelationalNormalizerConfigPropagation]


# for those paths the complex nested objects should be left in place
Expand Down Expand Up @@ -107,7 +108,7 @@ def _get_content_hash(schema: Schema, table: str, row: StrAny) -> str:


def _get_propagated_values(schema: Schema, table: str, row: TDataItemRow, is_top_level: bool) -> StrAny:
config: JSONNormalizerConfigPropagation = (schema._normalizers_config["json"].get("config") or {}).get("propagation", None)
config: RelationalNormalizerConfigPropagation = (schema._normalizers_config["json"].get("config") or {}).get("propagation", None)
extend: DictStrAny = {}
if config:
# mapping(k:v): propagate property with name "k" as property with name "v" in child table
Expand Down Expand Up @@ -199,10 +200,27 @@ def _normalize_row(
yield from _normalize_list(schema, list_content, extend, schema.normalize_make_path(table, k), table, row_id, _r_lvl + 1)


def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
validate_dict(RelationalNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.normalize_column_name))


def update_normalizer_config(schema: Schema, config: RelationalNormalizerConfig) -> None:
_validate_normalizer_config(schema, config)
# make sure schema has right normalizer
norm_config = schema._normalizers_config["json"]
present_normalizer = norm_config["module"]
if present_normalizer != __name__:
raise InvalidJsonNormalizer(__name__, present_normalizer)
if "config" in norm_config:
norm_config["config"].update(config) # type: ignore
else:
norm_config["config"] = config


def extend_schema(schema: Schema) -> None:
# validate config
config = schema._normalizers_config["json"].get("config") or {}
validate_dict(JSONNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.normalize_column_name))
config = cast(RelationalNormalizerConfig, schema._normalizers_config["json"].get("config") or {})
_validate_normalizer_config(schema, config)

# quick check to see if hints are applied
default_hints = schema.settings.get("default_hints") or {}
Expand Down
16 changes: 10 additions & 6 deletions dlt/common/runners/pool_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def update_gauges() -> TRunHealth:


def run_pool(C: PoolRunnerConfiguration, run_f: Union[Runnable[TPool], Callable[[TPool], TRunMetrics]]) -> int:
# validate run function
if not isinstance(run_f, Runnable) and not callable(run_f):
raise ValueError(run_f, "Pool runner entry point must be a function f(pool: TPool) or Runnable")

# create health gauges
if not HEALTH_PROPS_GAUGES:
create_gauges(REGISTRY)
Expand Down Expand Up @@ -76,17 +80,17 @@ def run_pool(C: PoolRunnerConfiguration, run_f: Union[Runnable[TPool], Callable[
else:
raise SignalReceivedException(-1)
except Exception as exc:
# the run failed
run_metrics = TRunMetrics(True, True, -1)
# preserve exception
# TODO: convert it to callback
global LAST_RUN_EXCEPTION
LAST_RUN_EXCEPTION = exc
if (type(exc) is SignalReceivedException) or (type(exc) is TimeRangeExhaustedException):
# always exit
raise
else:
logger.exception("run")
# the run failed
run_metrics = TRunMetrics(True, True, -1)
# preserve exception
# TODO: convert it to callback
global LAST_RUN_EXCEPTION
LAST_RUN_EXCEPTION = exc
# re-raise if EXIT_ON_EXCEPTION is requested
if C.exit_on_exception:
raise
Expand Down
12 changes: 8 additions & 4 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
from dlt.common import pendulum, json, Decimal, Wei
from pendulum.parsing import parse_iso8601, _parse_common as parse_datetime_common
from pendulum.tz import UTC
from dlt.common.json import custom_encode as json_custom_encode
from dlt.common.json import custom_encode as json_custom_encode, custom_pua_remove
from dlt.common.arithmetics import InvalidOperation
from dlt.common.exceptions import DictValidationException
from dlt.common.normalizers.names import TNormalizeNameFunc
from dlt.common.typing import DictStrAny, REPattern
from dlt.common.utils import str2bool
from dlt.common.utils import map_nested_in_place, str2bool
from dlt.common.validation import TCustomValidator, validate_dict
from dlt.common.schema import detections
from dlt.common.schema.typing import LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition
Expand Down Expand Up @@ -314,16 +314,20 @@ def py_type_to_sc_type(t: Type[Any]) -> TDataType:
raise TypeError(t)


def complex_to_str(value: Any) -> str:
return json.dumps(map_nested_in_place(custom_pua_remove, value))


def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any:
if to_type == from_type:
if to_type == "complex":
# complex types will be always represented as strings
return json.dumps(value)
return complex_to_str(value)
return value

if to_type == "text":
if from_type == "complex":
return json.dumps(value)
return complex_to_str(value)
else:
# use the same string encoding as in json
try:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def timestamp_before(timestamp: float, max_inclusive: Optional[float]) -> bool:


def sleep(sleep_seconds: float) -> None:
"""A signal-aware version of sleep function. Will raise SignalReceivedException if signal was received during sleep period."""
# do not allow sleeping if signal was received
signals.raise_if_signalled()
# sleep or wait for signal
Expand Down
22 changes: 21 additions & 1 deletion dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from typing import Any, Dict, Iterable, Iterator, Optional, Sequence, TypeVar, Mapping, List, TypedDict, Union

from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TDataItem, TDataItems, TFun
from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TAny, TDataItem, TDataItems, TFun

T = TypeVar("T")

Expand Down Expand Up @@ -129,13 +129,33 @@ def filter_env_vars(envs: List[str]) -> StrStr:


def update_dict_with_prune(dest: DictStrAny, update: StrAny) -> None:
"""Updates values that are both in `dest` and `update` and deletes `dest` values that are None in `update`"""
for k, v in update.items():
if v is not None:
dest[k] = v
elif k in dest:
del dest[k]


def map_nested_in_place(func: AnyFun, _complex: TAny) -> TAny:
"""Applies `func` to all elements in `_dict` recursively, replacing elements in nested dictionaries and lists in place."""
if isinstance(_complex, dict):
for k, v in _complex.items():
if isinstance(v, (dict, list)):
map_nested_in_place(func, v)
else:
_complex[k] = func(v)
elif isinstance(_complex, list):
for idx, _l in enumerate(_complex):
if isinstance(_l, (dict, list)):
map_nested_in_place(func, _l)
else:
_complex[idx] = func(_l)
else:
raise ValueError(_complex, "Not a complex type")
return _complex


def is_interactive() -> bool:
import __main__ as main
return not hasattr(main, '__file__')
Expand Down
Loading

0 comments on commit 92515b9

Please sign in to comment.