Skip to content

Commit

Permalink
Use interface_meta instead of inbuilt metaclasses. (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewwardrop authored Jan 22, 2019
1 parent d0c02ef commit c49b9eb
Show file tree
Hide file tree
Showing 22 changed files with 274 additions and 248 deletions.
1 change: 1 addition & 0 deletions omniduct/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"six", # Python 2/3 support
"enum34", # Python 3.4+ style enums in older versions of python

"interface_meta>=1.0.0<1.1", # Metaclass for creating an extensible well-documented architecture
"pyyaml", # YAML configuration parsing
"decorator", # Decorators used by caching and documentation routines
"progressbar2>=3.30.0", # Support for progressbars in logging routines
Expand Down
2 changes: 1 addition & 1 deletion omniduct/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import six
import yaml
from decorator import decorator
from interface_meta import quirk_docs

from omniduct.duct import Duct
from omniduct.utils.config import config
from omniduct.utils.debug import logger
from omniduct.utils.decorators import function_args_as_kwargs
from omniduct.utils.docs import quirk_docs

from ._serializers import PickleSerializer

Expand Down
16 changes: 15 additions & 1 deletion omniduct/caches/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import six
import yaml
from interface_meta import override

from omniduct.filesystems.base import FileSystemClient
from omniduct.filesystems.local import LocalFsClient
Expand All @@ -14,6 +15,7 @@ class FileSystemCache(Cache):

PROTOCOLS = ['filesystem_cache']

@override
def _init(self, path, fs=None):
"""
path (str): The top-level path of the cache in the filesystem.
Expand All @@ -30,6 +32,7 @@ def _init(self, path, fs=None):
self._config = None
self.connection_fields += ('fs',)

@override
def _prepare(self):
Cache._prepare(self)

Expand Down Expand Up @@ -73,48 +76,59 @@ def _prepare_cache(self):
yaml.safe_dump({'version': 1}, fh, default_flow_style=False)
return {'version': 1}

@override
def _connect(self):
self.fs.connect()

@override
def _is_connected(self):
return self.fs.is_connected()

@override
def _disconnect(self):
return self.fs.disconnect()

# Implementations for abstract methods in Cache

@override
def _namespace(self, namespace):
if namespace is None:
return '__default__'
assert isinstance(namespace, str) and namespace != 'config'
return namespace

@override
def _get_namespaces(self):
return [d for d in self.fs.listdir(self.path) if d != 'config']

@override
def _has_namespace(self, namespace):
return self.fs.exists(self.fs.path_join(self.path, namespace))

@override
def _remove_namespace(self, namespace):
return self.fs.remove(self.fs.path_join(self.path, namespace), recursive=True)

@override
def _get_keys(self, namespace):
return self.fs.listdir(self.fs.path_join(self.path, namespace))

@override
def _has_key(self, namespace, key):
return self.fs.exists(self.fs.path_join(self.path, namespace, key))

@override
def _remove_key(self, namespace, key):
return self.fs.remove(self.fs.path_join(self.path, namespace, key), recursive=True)

@override
def _get_bytecount_for_key(self, namespace, key):
path = self.fs.path_join(self.path, namespace, key)
return sum([
f.bytes
for f in self.fs.dir(path)
])

@override
def _get_stream_for_key(self, namespace, key, stream_name, mode, create):
path = self.fs.path_join(self.path, namespace, key)

Expand Down
3 changes: 2 additions & 1 deletion omniduct/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import jinja2.meta
import sqlparse
from decorator import decorator
from interface_meta import quirk_docs, override

from omniduct.caches.base import cached_method
from omniduct.duct import Duct
from omniduct.filesystems.local import LocalFsClient
from omniduct.utils.debug import logger, logging_scope
from omniduct.utils.docs import quirk_docs
from omniduct.utils.magics import (MagicsProvider, process_line_arguments,
process_line_cell_arguments)

Expand Down Expand Up @@ -842,6 +842,7 @@ def table_props(self, table, renew=True, **kwargs):
def _table_props(self, table, **kwargs):
pass

@override
def _register_magics(self, base_name):
"""
The following magic functions will be registered (assuming that
Expand Down
13 changes: 13 additions & 0 deletions omniduct/databases/druid.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

from interface_meta import override

from omniduct.utils.debug import logger

from .base import DatabaseClient
Expand All @@ -16,10 +18,12 @@ class DruidClient(DatabaseClient):
NAMESPACE_QUOTECHAR = '"'
NAMESPACE_SEPARATOR = '.'

@override
def _init(self):
self.__druid = None

# Connection
@override
def _connect(self):
from pydruid.db import connect
logger.info('Connecting to Druid database ...')
Expand All @@ -30,9 +34,11 @@ def _connect(self):
'pydruid connection currently does not allow these fields to be passed.'
)

@override
def _is_connected(self):
return self.__druid is not None

@override
def _disconnect(self):
logger.info('Disconnecting from Druid database ...')
try:
Expand All @@ -42,15 +48,18 @@ def _disconnect(self):
self.__druid = None

# Querying
@override
def _execute(self, statement, cursor, wait, session_properties):
cursor = cursor or self.__druid.cursor()
cursor.execute(statement)
return cursor

@override
def _table_list(self, namespace, like=None, **kwargs):
cmd = "SELECT * FROM INFORMATION_SCHEMA.TABLES"
return self.query(cmd, **kwargs)

@override
def _table_exists(self, table, **kwargs):
logger.disabled = True
try:
Expand All @@ -61,9 +70,11 @@ def _table_exists(self, table, **kwargs):
finally:
logger.disabled = False

@override
def _table_drop(self, table, **kwargs):
raise NotImplementedError

@override
def _table_desc(self, table, **kwargs):
query = ("""
SELECT
Expand All @@ -78,8 +89,10 @@ def _table_desc(self, table, **kwargs):
WHERE TABLE_NAME = '{}'""").format(table)
return self.query(query, **kwargs)

@override
def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)

@override
def _table_props(self, table, **kwargs):
raise NotImplementedError
16 changes: 16 additions & 0 deletions omniduct/databases/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time

import pandas as pd
from interface_meta import override
from jinja2 import Template

from omniduct.utils.debug import logger
Expand Down Expand Up @@ -51,6 +52,7 @@ class HiveServer2Client(DatabaseClient, SchemasMixin):
NAMESPACE_QUOTECHAR = '`'
NAMESPACE_SEPARATOR = '.'

@override
def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL',
push_using_hive_cli=False, default_table_props=None, **connection_options):
"""
Expand Down Expand Up @@ -84,6 +86,7 @@ def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL',

assert self.driver in ('pyhive', 'impyla'), "Supported drivers are pyhive and impyla."

@override
def _connect(self):
from sqlalchemy import create_engine, MetaData
if self.driver == 'pyhive':
Expand Down Expand Up @@ -132,9 +135,11 @@ def __hive_cursor(self):
self._connect()
return self.__hive.cursor()

@override
def _is_connected(self):
return self.__hive is not None

@override
def _disconnect(self):
logger.info('Disconnecting from Hive coordinator...')
try:
Expand All @@ -146,6 +151,7 @@ def _disconnect(self):
self._sqlalchemy_metadata = None
self._schemas = None

@override
def _statement_prepare(self, statement, session_properties, **kwargs):
return (
"\n".join(
Expand All @@ -154,6 +160,7 @@ def _statement_prepare(self, statement, session_properties, **kwargs):
) + statement
)

@override
def _execute(self, statement, cursor, wait, session_properties, poll_interval=1):
"""
Additional Args:
Expand Down Expand Up @@ -183,6 +190,7 @@ def _execute(self, statement, cursor, wait, session_properties, poll_interval=1)

return cursor

@override
def _cursor_empty(self, cursor):
if self.driver == 'impyla':
return not cursor.has_result_set
Expand Down Expand Up @@ -215,6 +223,7 @@ def _log_status(self, cursor, log_offset=0):

return len(log)

@override
def _query_to_table(self, statement, table, if_exists, **kwargs):
statements = []

Expand All @@ -231,6 +240,7 @@ def _query_to_table(self, statement, table, if_exists, **kwargs):
)
return self.execute(statement, **kwargs)

@override
def _dataframe_to_table(
self, df, table, if_exists='fail', use_hive_cli=None,
partition=None, sep=chr(1), table_props=None, dtype_overrides=None, **kwargs
Expand Down Expand Up @@ -389,11 +399,13 @@ def _dataframe_to_table(
partition="into {} of ".format(partition_clause) if partition_clause else ""
))

@override
def _table_list(self, namespace, like='*', **kwargs):
schema = namespace.name or self.schema
return self.query("SHOW TABLES IN {0} '{1}'".format(schema, like),
**kwargs)

@override
def _table_exists(self, table, **kwargs):
logger.disabled = True
try:
Expand All @@ -404,9 +416,11 @@ def _table_exists(self, table, **kwargs):
finally:
logger.disabled = False

@override
def _table_drop(self, table, **kwargs):
return self.execute("DROP TABLE {table}".format(table=table))

@override
def _table_desc(self, table, **kwargs):
records = self.query("DESCRIBE {0}".format(table), **kwargs)

Expand All @@ -424,9 +438,11 @@ def _table_desc(self, table, **kwargs):

return pd.concat((fields_df, partitions_df))

@override
def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)

@override
def _table_props(self, table, **kwargs):
return self.query('SHOW TBLPROPERTIES {0}'.format(table), **kwargs)

Expand Down
13 changes: 13 additions & 0 deletions omniduct/databases/neo4j.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

from interface_meta import override

from omniduct.utils.debug import logger

from .base import DatabaseClient
Expand All @@ -19,19 +21,23 @@ class Neo4jClient(DatabaseClient):
def statement_cleanup(cls, statement):
return statement # base statement cleanup assumes SQL

@override
def _init(self):
self.__driver = None

# Connection
@override
def _connect(self):
from neo4j.v1 import GraphDatabase
logger.info('Connecting to Neo4J graph database ...')
auth = (self.username, self.password) if self.username else None
self.__driver = GraphDatabase.driver("bolt://{}:{}".format(self.host, self.port), auth=auth) # TODO: Add kerberos support

@override
def _is_connected(self):
return hasattr(self, '__driver') and self.__driver is not None

@override
def _disconnect(self):
logger.info('Disconnecting from Neo4J graph database ...')
try:
Expand All @@ -41,6 +47,7 @@ def _disconnect(self):
self.__driver = None

# Querying
@override
def _execute(self, statement, cursor, wait, session_properties):
with self.__driver.session() as session:
result = session.run(statement)
Expand All @@ -50,20 +57,26 @@ def _execute(self, statement, cursor, wait, session_properties):
result.fetchall = result.records
return result

@override
def _table_exists(self, table, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')

@override
def _table_drop(self, table, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')

@override
def _table_desc(self, table, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')

@override
def _table_head(self, table, n=10, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')

@override
def _table_list(self, namespace, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')

@override
def _table_props(self, table, **kwargs):
raise Exception('tables do not apply to the Neo4J graph database')
Loading

0 comments on commit c49b9eb

Please sign in to comment.