Skip to content

Commit

Permalink
Minimum viable product
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Jul 11, 2018
1 parent a1bb4e9 commit 6d2be4d
Show file tree
Hide file tree
Showing 27 changed files with 7,476 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ venv.bak/

# mypy
.mypy_cache/

.idea
tests/.workspace/
24 changes: 24 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
language: python
python:
- 2.7
- 3.6
env:
global:
ETCD3_VERSION=v3.2.23

before_install:
- ./scripts/etcd_start.sh
- ./vagrant/etcd_init.sh

install:
- pip install -e .
- pip install nose coverage coveralls

script:
- nosetests --with-coverage --cover-package=etcd3

after_success:
- coveralls

after_script:
- ./script/etcd_stop.sh
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# etcd3-slim

[![Build Status](https://travis-ci.org/mailgun/etcd3-slim.svg?branch=master)](https://travis-ci.org/mailgun/etcd3-slim)
[![Coverage Status](https://coveralls.io/repos/mailgun/etcd3-slim/badge.svg?branch=master&service=github)](https://coveralls.io/github/mailgun/etcd3-slim?branch=master)

Thin wrapper around Etcd3 gRPC stubs
30 changes: 30 additions & 0 deletions etcd3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import absolute_import

import os

from etcd3._client import Client

ENV_ETCD3_CA = 'ETCD3_CA'
ENV_ETCD3_ENDPOINT = 'ETCD3_ENDPOINT'
ENV_ETCD3_PASSWORD = 'ETCD3_PASSWORD'
ENV_ETCD3_USER = 'ETCD3_USER'


__all__ = [
'Client'
]

_clt = None


def client():
global _clt
if _clt:
return _clt

endpoint = os.getenv(ENV_ETCD3_ENDPOINT, '127.0.0.1:2379')
user = os.getenv(ENV_ETCD3_USER)
password = os.getenv(ENV_ETCD3_PASSWORD)
cert_ca = os.getenv(ENV_ETCD3_CA)
_clt = Client(endpoint, user, password, cert_ca=cert_ca)
return _clt
167 changes: 167 additions & 0 deletions etcd3/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from __future__ import absolute_import

import logging
from threading import Lock

import grpc

from etcd3 import _utils
from etcd3._grpc_stubs.rpc_pb2 import (AuthStub, AuthenticateRequest,
DeleteRangeRequest, KVStub,
LeaseGrantRequest,
LeaseRevokeRequest, LeaseStub,
PutRequest, RangeRequest,
WatchStub)
from etcd3._keep_aliver import KeepAliver
from etcd3._watcher import Watcher

_DEFAULT_ETCD_ENDPOINT = '127.0.0.1:23790'
_DEFAULT_REQUEST_TIMEOUT = 1 # Seconds


_log = logging.getLogger(__name__)


def _reconnect(f):

def wrapper(*args, **kwargs):
etcd3_clt = args[0]
assert isinstance(etcd3_clt, Client)
etcd3_clt._ensure_grpc_channel()
try:
return f(*args, **kwargs)

except Exception:
etcd3_clt._close_grpc_channel()
raise

return wrapper


class Client(object):

def __init__(self, endpoint, user, password, cert_ca=None, timeout=None):
self._endpoint = endpoint
self._user = user
self._password = password
self._ssl_creds = grpc.ssl_channel_credentials(_read_file(cert_ca))
self._timeout = timeout or _DEFAULT_REQUEST_TIMEOUT

self._grpc_channel_mu = Lock()
self._grpc_channel = None
self._kv_stub = None
self._watch_stub = None
self._lease_stub = None

@_reconnect
def get(self, key, is_prefix=False):
rq = RangeRequest(key=_utils.to_bytes(key))
if is_prefix:
rq.range_end = _utils.range_end(rq.key)

return self._kv_stub.Range(rq, timeout=self._timeout)

def get_value(self, key):
"""
Convenience wrapper around `get`. It returns value only, or None if the
key does not exist.
"""
rs = self.get(key)
if rs.count == 0:
return None

return rs.kvs[0].value

@_reconnect
def put(self, key, val, lease_id=None):
rq = PutRequest(key=_utils.to_bytes(key),
value=_utils.to_bytes(val),
lease=lease_id)
return self._kv_stub.Put(rq, timeout=self._timeout)

@_reconnect
def delete(self, key, is_prefix=False):
rq = DeleteRangeRequest(key=_utils.to_bytes(key))
if is_prefix:
rq.range_end = _utils.range_end(rq.key)

return self._kv_stub.DeleteRange(rq, timeout=self._timeout)

@_reconnect
def lease_grant(self, ttl):
rq = LeaseGrantRequest(TTL=ttl)
return self._lease_stub.LeaseGrant(rq, timeout=self._timeout)

@_reconnect
def lease_revoke(self, lease_id):
rq = LeaseRevokeRequest(ID=lease_id)
return self._lease_stub.LeaseRevoke(rq, timeout=self._timeout)

def new_watcher(self, key, event_handler, is_prefix=False,
start_revision=0, spin_pause=None):
return Watcher(
self, key, event_handler, is_prefix, start_revision, spin_pause)

def new_keep_aliver(self, key, value, ttl, spin_pause=None):
return KeepAliver(self, key, value, ttl, spin_pause)

def _ensure_grpc_channel(self):
with self._grpc_channel_mu:
if self._grpc_channel:
return

self._grpc_channel = self._dial()
self._kv_stub = KVStub(self._grpc_channel)
self._watch_stub = WatchStub(self._grpc_channel)
self._lease_stub = LeaseStub(self._grpc_channel)

def _close_grpc_channel(self):
with self._grpc_channel_mu:
if not self._grpc_channel:
return
try:
self._grpc_channel.close()
except Exception:
_log.exception('Failed to close Etcd client gRPC channel')

self._grpc_channel = None

def _dial(self):
token = self._authenticate()
token_plugin = _TokenAuthMetadataPlugin(token)
token_creds = grpc.metadata_call_credentials(token_plugin)
creds = grpc.composite_channel_credentials(self._ssl_creds,
token_creds)
return grpc.secure_channel(self._endpoint, creds)

def _authenticate(self):
grpc_channel = grpc.secure_channel(self._endpoint, self._ssl_creds)
try:
auth_stub = AuthStub(grpc_channel)
rq = AuthenticateRequest(
name=self._user,
password=self._password
)
rs = auth_stub.Authenticate(rq, timeout=self._timeout)
return rs.token

finally:
grpc_channel.close()


class _TokenAuthMetadataPlugin(grpc.AuthMetadataPlugin):

def __init__(self, token):
self._token = token

def __call__(self, context, callback):
metadata = (('token', self._token),)
callback(metadata, None)


def _read_file(filename):
if not filename:
return None

with open(filename, 'rb') as f:
return f.read()
101 changes: 101 additions & 0 deletions etcd3/_grpc_bd_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import absolute_import

import logging
import threading

from six.moves import queue

_log = logging.getLogger(__name__)


class GrpcBDStream(object):

def __init__(self, name, make_stream, buf_size=2):
self._name = name
self._requests = queue.Queue(maxsize=buf_size)
self._responses = queue.Queue(maxsize=buf_size)
self._response_iter = make_stream(self._request_iter())
self._thread = threading.Thread(name=name, target=self._run)
self._thread.daemon = True
self._thread.start()
self._closed_mu = threading.Lock()
self._closed = False

def send(self, rq, timeout=None):
try:
self._requests.put(rq, timeout=timeout)

except queue.Full:
raise RuntimeError('%s request submit timeout' % (self._name,))

def recv(self, timeout=None):
try:
rs = self._responses.get(timeout=timeout)
if rs is None:
with self._closed_mu:
self._closed = True

raise RuntimeError('%s closed by server' % (self._name,))

if isinstance(rs, Exception):
with self._closed_mu:
self._closed = True

raise rs

return rs

except queue.Empty:
return None

def close(self, timeout):
with self._closed_mu:
if self._closed:
return
try:
self._requests.put(None, timeout=timeout)
except queue.Full:
_log.error('[%s] timed out on close request', self._name)
return

try:
# Drain unhandled responses
while True:
rs = self._responses.get(timeout=timeout)
if rs is None:
break

if isinstance(rs, Exception):
_log.info('[%s] error discarded: %s', self._name, rs)
break

_log.info('[%s] response discarded: %s', self._name, rs)

except queue.Empty:
_log.error('[%s] timed out draining responses', self._name)

with self._closed_mu:
self._closed = True

def _request_iter(self):
while True:
rq = self._requests.get()
if rq is None:
# FIXME: Raising a RuntimeException here is the only sure way
# FIXME: to make gRCP terminate the response iterator.
raise RuntimeError('%s terminating' % (self._name,))

yield rq

def _run(self):
_log.info('[%s] thread started', self._name)
try:
for rs in self._response_iter:
self._responses.put(rs)

self._responses.put(None)

except Exception as err:
self._responses.put(err)

_log.info('[%s] thread stopped', self._name)
Empty file added etcd3/_grpc_stubs/__init__.py
Empty file.
Loading

0 comments on commit 6d2be4d

Please sign in to comment.