Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redis database duct #59

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions omniduct/databases/cursor_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,10 @@ def init(self):
# Convert null values to '\N'.
def prepare_row(self, row):
return [r'\N' if v is None else str(v).replace('\t', r'\t') for v in row]


class SingletonCursorFormatter(RawCursorFormatter):

def format_dump(self, data):
assert len(data) == 1, 'singleton cursor formatter expects data of length 1'
return data[0]
69 changes: 69 additions & 0 deletions omniduct/databases/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import absolute_import

import redis
from omniduct.utils.debug import logger

from .base import DatabaseClient


class RedisCursor(object):

def __init__(self, result):
self.result = result

def close(self):
pass

def fetchall(self):
yield self.result


class RedisClient(DatabaseClient):
"""
This Duct connects to a redis database server using the `redis` python
library.
"""

PROTOCOLS = ['redis']
DEFAULT_PORT = 6379
DEFAULT_CURSOR_FORMATTER = 'singleton'

@classmethod
def statement_cleanup(cls, statement):
return statement # base statement cleanup assumes SQL

def _init(self):
self.__redis_connection = None

# Connection
def _connect(self):
self.__redis_connection = redis.Redis(
self.host,
self.port,
)

def _is_connected(self):
return hasattr(self, '__redis_connection') and self.__redis_connection is not None

def _disconnect(self):
logger.info('Disconnecting from Redis database ...')
self.__redis_connection = None

# Querying
def _execute(self, statement, cursor=None, asynchronous=False):
return RedisCursor(self.__redis_connection.execute_command(statement))

def _table_exists(self, table, schema=None):
raise Exception('tables do not apply to the Redis database')

def _table_desc(self, table, **kwargs):
raise Exception('tables do not apply to the Redis database')

def _table_head(self, table, n=10, **kwargs):
raise Exception('tables do not apply to the Redis database')

def _table_list(self, table, schema=None):
raise Exception('tables do not apply to the Redis database')

def _table_props(self, table, **kwargs):
raise Exception('tables do not apply to the Redis database')
1 change: 1 addition & 0 deletions omniduct/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .databases.neo4j import Neo4jClient
from .databases.presto import PrestoClient
from .databases.pyspark import PySparkClient
from .databases.redis import RedisClient
from .databases.sqlalchemy import SQLAlchemyClient
from .filesystems.local import LocalFsClient
from .filesystems.s3 import S3Client
Expand Down