Skip to content

Commit

Permalink
Merge pull request #324 from DataDog/quentin.pierre/aws-lambda-decorator
Browse files Browse the repository at this point in the history
Add decorator for AWS lambda
  • Loading branch information
16pierre authored Nov 27, 2018
2 parents c7b519e + 212744b commit e813a88
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 3 deletions.
2 changes: 1 addition & 1 deletion datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# datadog
from datadog import api
from datadog.dogstatsd import DogStatsd, statsd # noqa
from datadog.threadstats import ThreadStats # noqa
from datadog.threadstats import ThreadStats, datadog_lambda_wrapper, lambda_metric # noqa
from datadog.util.compat import iteritems, NullHandler
from datadog.util.config import get_version
from datadog.util.hostname import get_hostname
Expand Down
1 change: 1 addition & 0 deletions datadog/threadstats/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from datadog.threadstats.base import ThreadStats # noqa
from datadog.threadstats.aws_lambda import lambda_metric, datadog_lambda_wrapper # noqa
72 changes: 72 additions & 0 deletions datadog/threadstats/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datadog.threadstats import ThreadStats
from threading import Lock
from datadog import api
import os


"""
Usage:
from datadog import datadog_lambda_wrapper, lambda_metric
@datadog_lambda_wrapper
def my_lambda_handle(event, context):
lambda_metric("some_metric", 10)
"""


class _LambdaDecorator(object):
""" Decorator to automatically init & flush metrics, created for Lambda functions"""

# Number of opened wrappers, flush when 0
_counter = 0
_counter_lock = Lock()
_flush_lock = Lock()
_was_initialized = False

def __init__(self, func):
self.func = func

@classmethod
def _enter(cls):
with cls._counter_lock:
if not cls._was_initialized:
cls._was_initialized = True
api._api_key = os.environ.get('DATADOG_API_KEY')
api._api_host = os.environ.get('DATADOG_HOST', 'https://api.datadoghq.com')
cls._counter = cls._counter + 1

@classmethod
def _close(cls):
should_flush = False
with cls._counter_lock:
cls._counter = cls._counter - 1

# Flush only when all wrappers are closed
if cls._counter <= 0:
should_flush = True

if should_flush:
with cls._flush_lock:
# Don't flush if other wrappers were opened while _flush_lock was locked
with cls._counter_lock:
if cls._counter > 0:
should_flush = False
if should_flush:
_lambda_stats.flush(float("inf"))

def __call__(self, *args, **kw):
_LambdaDecorator._enter()
result = self.func(*args, **kw)
_LambdaDecorator._close()
return result


_lambda_stats = ThreadStats()
_lambda_stats.start(flush_in_greenlet=False, flush_in_thread=False)
datadog_lambda_wrapper = _LambdaDecorator


def lambda_metric(*args, **kw):
""" Alias to expose only distributions for lambda functions"""
_lambda_stats.distribution(*args, **kw)
47 changes: 47 additions & 0 deletions tests/performance/test_lambda_wrapper_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
# import unittest
import threading

from datadog import lambda_metric, datadog_lambda_wrapper
from datadog.threadstats.aws_lambda import _lambda_stats


TOTAL_NUMBER_OF_THREADS = 1000


class MemoryReporter(object):
""" A reporting class that reports to memory for testing. """

def __init__(self):
self.distributions = []
self.dist_flush_counter = 0

def flush_distributions(self, dists):
self.distributions += dists
self.dist_flush_counter = self.dist_flush_counter + 1


@datadog_lambda_wrapper
def wrapped_function(id):
lambda_metric("dist_" + str(id), 42)
# sleep makes the os continue another thread
time.sleep(0.001)

lambda_metric("common_dist", 42)


class TestWrapperThreadSafety(object):

def test_wrapper_thread_safety(self):
_lambda_stats.reporter = MemoryReporter()

for i in range(TOTAL_NUMBER_OF_THREADS):
threading.Thread(target=wrapped_function, args=[i]).start()
# Wait all threads to finish
time.sleep(10)

# Check that at least one flush happened
self.assertGreater(_lambda_stats.reporter.dist_flush_counter, 0)

dists = _lambda_stats.reporter.distributions
self.assertEqual(len(dists), TOTAL_NUMBER_OF_THREADS + 1)
43 changes: 41 additions & 2 deletions tests/unit/threadstats/test_threadstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import nose.tools as nt

# datadog
from datadog import ThreadStats
from datadog import ThreadStats, lambda_metric, datadog_lambda_wrapper
from datadog.threadstats.aws_lambda import _lambda_stats
from tests.util.contextmanagers import preserve_environment_variable


# Silence the logger.
logger = logging.getLogger('dd.datadogpy')
logger.setLevel(logging.ERROR)
Expand All @@ -32,9 +32,11 @@ def __init__(self):
self.distributions = []
self.metrics = []
self.events = []
self.dist_flush_counter = 0

def flush_distributions(self, distributions):
self.distributions += distributions
self.dist_flush_counter = self.dist_flush_counter + 1

def flush_metrics(self, metrics):
self.metrics += metrics
Expand Down Expand Up @@ -740,3 +742,40 @@ def test_metric_type(self):
nt.assert_equal(cnt['type'], 'rate')
nt.assert_equal(max_['type'], 'gauge')
nt.assert_equal(min_['type'], 'gauge')


# Test lambda_wrapper (uses ThreadStats under the hood)

def test_basic_lambda_decorator(self):

@datadog_lambda_wrapper
def basic_wrapped_function():
lambda_metric("lambda.somemetric", 100)

_lambda_stats.reporter = self.reporter
basic_wrapped_function()

nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1)
dists = self.sort_metrics(_lambda_stats.reporter.distributions)
nt.assert_equal(len(dists), 1)

def test_embedded_lambda_decorator(self):
"""
Test that the lambda decorator flushes metrics correctly and only once
"""

@datadog_lambda_wrapper
def wrapped_function_1():
lambda_metric("lambda.dist.1", 10)

@datadog_lambda_wrapper
def wrapped_function_2():
wrapped_function_1()
lambda_metric("lambda.dist.2", 30)

_lambda_stats.reporter = self.reporter
wrapped_function_2()
nt.assert_equal(_lambda_stats.reporter.dist_flush_counter, 1)

dists = self.sort_metrics(_lambda_stats.reporter.distributions)
nt.assert_equal(len(dists), 2)

0 comments on commit e813a88

Please sign in to comment.