diff --git a/README.md b/README.md index 54c7642..dfa3bd9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -python-logging-loki -=================== +# python-logging-loki [![PyPI version](https://img.shields.io/pypi/v/python-logging-loki.svg)](https://pypi.org/project/python-logging-loki/) [![Python version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-blue.svg)](https://www.python.org/) @@ -9,14 +8,13 @@ python-logging-loki Python logging handler for Loki. https://grafana.com/loki -Installation -============ +# Installation + ```bash pip install python-logging-loki ``` -Usage -===== +# Usage ```python import logging @@ -24,28 +22,28 @@ import logging_loki handler = logging_loki.LokiHandler( - url="https://my-loki-instance/loki/api/v1/push", + url="https://my-loki-instance/loki/api/v1/push", tags={"application": "my-app"}, auth=("username", "password"), - version="1", ) logger = logging.getLogger("my-logger") logger.addHandler(handler) logger.error( - "Something happened", + "Something happened", extra={"tags": {"service": "my-service"}}, ) ``` Example above will send `Something happened` message along with these labels: + - Default labels from handler - Message level as `serverity` -- Logger's name as `logger` +- Logger's name as `logger` - Labels from `tags` item of `extra` dict The given example is blocking (i.e. each call will wait for the message to be sent). -But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread. +But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread. ```python import logging.handlers @@ -56,10 +54,9 @@ from multiprocessing import Queue queue = Queue(-1) handler = logging.handlers.QueueHandler(queue) handler_loki = logging_loki.LokiHandler( - url="https://my-loki-instance/loki/api/v1/push", + url="https://my-loki-instance/loki/api/v1/push", tags={"application": "my-app"}, auth=("username", "password"), - version="1", ) logging.handlers.QueueListener(queue, handler_loki) @@ -78,10 +75,9 @@ from multiprocessing import Queue handler = logging_loki.LokiQueueHandler( Queue(-1), - url="https://my-loki-instance/loki/api/v1/push", + url="https://my-loki-instance/loki/api/v1/push", tags={"application": "my-app"}, auth=("username", "password"), - version="1", ) logger = logging.getLogger("my-logger") diff --git a/logging_loki/__init__.py b/logging_loki/__init__.py index f9d6949..b778aa2 100644 --- a/logging_loki/__init__.py +++ b/logging_loki/__init__.py @@ -4,5 +4,5 @@ from logging_loki.handlers import LokiQueueHandler __all__ = ["LokiHandler", "LokiQueueHandler"] -__version__ = "0.3.1" +__version__ = "0.4.0" name = "logging_loki" diff --git a/logging_loki/config.py b/logging_loki/config.py new file mode 100644 index 0000000..831ef69 --- /dev/null +++ b/logging_loki/config.py @@ -0,0 +1,3 @@ +import os + +BATCH_EXPORT_MIN_SIZE = int(os.getenv("BATCH_EXPORT_MIN_SIZE", 10)) diff --git a/logging_loki/emitter.py b/logging_loki/emitter.py index 949ceea..1f42a4b 100644 --- a/logging_loki/emitter.py +++ b/logging_loki/emitter.py @@ -1,21 +1,18 @@ # -*- coding: utf-8 -*- import abc +import collections import copy import functools import logging import time from logging.config import ConvertingDict -from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple +from typing import Any, Dict, Optional, Tuple import requests -import rfc3339 from logging_loki import const +from logging_loki.config import BATCH_EXPORT_MIN_SIZE BasicAuth = Optional[Tuple[str, str]] @@ -53,8 +50,11 @@ def __call__(self, record: logging.LogRecord, line: str): """Send log record to Loki.""" payload = self.build_payload(record, line) resp = self.session.post(self.url, json=payload) + # TODO: Enqueue logs instead of raise an error that lose the logs if resp.status_code != self.success_response_code: - raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code)) + raise ValueError( + "Unexpected Loki API response status code: {0}".format(resp.status_code) + ) @abc.abstractmethod def build_payload(self, record: logging.LogRecord, line) -> dict: @@ -105,31 +105,40 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: return tags -class LokiEmitterV0(LokiEmitter): - """Emitter for Loki < 0.4.0.""" - +class LokiSimpleEmitter(LokiEmitter): def build_payload(self, record: logging.LogRecord, line) -> dict: """Build JSON payload with a log entry.""" - labels = self.build_labels(record) - ts = rfc3339.format_microsecond(record.created) + labels = self.build_tags(record) + ns = 1e9 + ts = str(int(time.time() * ns)) stream = { - "labels": labels, - "entries": [{"ts": ts, "line": line}], + "stream": labels, + "values": [[ts, line]], } return {"streams": [stream]} - def build_labels(self, record: logging.LogRecord) -> str: - """Return Loki labels string.""" - labels: List[str] = [] - for label_name, label_value in self.build_tags(record).items(): - cleared_name = self.format_label(str(label_name)) - cleared_value = str(label_value).replace('"', r"\"") - labels.append('{0}="{1}"'.format(cleared_name, cleared_value)) - return "{{{0}}}".format(",".join(labels)) +class LokiBatchEmitter(LokiEmitter): + buffer = collections.deque([]) -class LokiEmitterV1(LokiEmitter): - """Emitter for Loki >= 0.4.0.""" + def __call__(self, record: logging.LogRecord, line: str): + """Send log record to Loki.""" + payload = self.build_payload(record, line) + if len(self.buffer) < BATCH_EXPORT_MIN_SIZE: + self.buffer.appendleft(payload["streams"][0]) + else: + resp = self.session.post( + self.url, + json={ + "streams": [self.buffer.pop() for _ in range(BATCH_EXPORT_MIN_SIZE)] + }, + ) + if resp.status_code != self.success_response_code: + raise ValueError( + "Unexpected Loki API response status code: {0}".format( + resp.status_code + ) + ) def build_payload(self, record: logging.LogRecord, line) -> dict: """Build JSON payload with a log entry.""" diff --git a/logging_loki/handlers.py b/logging_loki/handlers.py index 74a55cb..ab781c7 100644 --- a/logging_loki/handlers.py +++ b/logging_loki/handlers.py @@ -31,17 +31,12 @@ class LokiHandler(logging.Handler): `Loki API `_ """ - emitters: Dict[str, Type[emitter.LokiEmitter]] = { - "0": emitter.LokiEmitterV0, - "1": emitter.LokiEmitterV1, - } - def __init__( self, url: str, tags: Optional[dict] = None, auth: Optional[emitter.BasicAuth] = None, - version: Optional[str] = None, + emitter: emitter.LokiEmitter = emitter.LokiSimpleEmitter, ): """ Create new Loki logging handler. @@ -54,20 +49,7 @@ def __init__( """ super().__init__() - - if version is None and const.emitter_ver == "0": - msg = ( - "Loki /api/prom/push endpoint is in the depreciation process starting from version 0.4.0.", - "Explicitly set the emitter version to '0' if you want to use the old endpoint.", - "Or specify '1' if you have Loki version> = 0.4.0.", - "When the old API is removed from Loki, the handler will use the new version by default.", - ) - warnings.warn(" ".join(msg), DeprecationWarning) - - version = version or const.emitter_ver - if version not in self.emitters: - raise ValueError("Unknown emitter version: {0}".format(version)) - self.emitter = self.emitters[version](url, tags, auth) + self.emitter = emitter(url, tags, auth) def handleError(self, record): # noqa: N802 """Close emitter and let default handler take actions on error.""" diff --git a/setup.py b/setup.py index 153a2b5..cf40caa 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="python-logging-loki", - version="0.3.1", + version="0.4.0", description="Python logging handler for Grafana Loki.", long_description=long_description, long_description_content_type="text/markdown", diff --git a/tests/test_emitter_v0.py b/tests/test_emitter_v0.py deleted file mode 100644 index 0aafd8d..0000000 --- a/tests/test_emitter_v0.py +++ /dev/null @@ -1,183 +0,0 @@ -# -*- coding: utf-8 -*- - -import logging -import time -from logging.config import dictConfig as loggingDictConfig -from queue import Queue -from typing import Tuple -from unittest.mock import MagicMock - -import pytest -import rfc3339 -from freezegun import freeze_time - -from logging_loki.emitter import LokiEmitterV0 - -emitter_url: str = "https://example.net/api/prom/push" -record_kwargs = { - "name": "test", - "level": logging.WARNING, - "fn": "", - "lno": "", - "msg": "Test", - "args": None, - "exc_info": None, -} - - -@pytest.fixture() -def emitter_v0() -> Tuple[LokiEmitterV0, MagicMock]: - """Create v1 emitter with mocked http session.""" - response = MagicMock() - response.status_code = LokiEmitterV0.success_response_code - session = MagicMock() - session().post = MagicMock(return_value=response) - - instance = LokiEmitterV0(url=emitter_url) - instance.session_class = session - - return instance, session - - -def create_record(**kwargs) -> logging.LogRecord: - """Create test logging record.""" - log = logging.Logger(__name__) - return log.makeRecord(**{**record_kwargs, **kwargs}) - - -def get_stream(session: MagicMock) -> dict: - """Return first stream item from json payload.""" - kwargs = session().post.call_args[1] - streams = kwargs["json"]["streams"] - return streams[0] - - -def test_record_sent_to_emitter_url(emitter_v0): - emitter, session = emitter_v0 - emitter(create_record(), "") - - got = session().post.call_args - assert got[0][0] == emitter_url - - -def test_default_tags_added_to_payload(emitter_v0): - emitter, session = emitter_v0 - emitter.tags = {"app": "emitter"} - emitter(create_record(), "") - - stream = get_stream(session) - level = logging.getLevelName(record_kwargs["level"]).lower() - expected_tags = ( - 'app="emitter"', - '{0}="{1}"'.format(emitter.level_tag, level), - '{0}="{1}"'.format(emitter.logger_tag, record_kwargs["name"]), - ) - expected = ",".join(expected_tags) - expected = "{{{0}}}".format(expected) - assert stream["labels"] == expected - - -def test_extra_tag_added(emitter_v0): - emitter, session = emitter_v0 - record = create_record(extra={"tags": {"extra_tag": "extra_value"}}) - emitter(record, "") - - stream = get_stream(session) - assert 'extra_tag="extra_value"' in stream["labels"] - - -@pytest.mark.parametrize( - "emitter_v0, label", - ( - (emitter_v0, "test_'svc"), - (emitter_v0, 'test_"svc'), - (emitter_v0, "test svc"), - (emitter_v0, "test-svc"), - (emitter_v0, "test.svc"), - (emitter_v0, "!test_svc?"), - ), - indirect=["emitter_v0"], -) -def test_label_properly_formatted(emitter_v0, label: str): - emitter, session = emitter_v0 - record = create_record(extra={"tags": {label: "extra_value"}}) - emitter(record, "") - - stream = get_stream(session) - assert ',test_svc="extra_value"' in stream["labels"] - - -def test_empty_label_is_not_added_to_stream(emitter_v0): - emitter, session = emitter_v0 - record = create_record(extra={"tags": {"!": "extra_value"}}) - emitter(record, "") - - stream = get_stream(session) - assert "!" not in stream["labels"] - assert ",=" not in stream["labels"] - - -def test_non_dict_extra_tag_is_not_added_to_stream(emitter_v0): - emitter, session = emitter_v0 - record = create_record(extra={"tags": "invalid"}) - emitter(record, "") - - stream = get_stream(session) - assert "invalid" not in stream["labels"] - - -def test_raises_value_error_on_non_successful_response(emitter_v0): - emitter, session = emitter_v0 - session().post().status_code = None - with pytest.raises(ValueError): - emitter(create_record(), "") - pytest.fail("Must raise ValueError on non-successful Loki response") # pragma: no cover - - -def test_logged_messaged_added_to_values(emitter_v0): - emitter, session = emitter_v0 - emitter(create_record(), "Test message") - - stream = get_stream(session) - assert stream["entries"][0]["line"] == "Test message" - - -@freeze_time("2019-11-04 00:25:08.123456") -def test_timestamp_added_to_values(emitter_v0): - emitter, session = emitter_v0 - emitter(create_record(), "") - - stream = get_stream(session) - expected = rfc3339.format_microsecond(time.time()) - assert stream["entries"][0]["ts"] == expected - - -def test_session_is_closed(emitter_v0): - emitter, session = emitter_v0 - emitter(create_record(), "") - emitter.close() - session().close.assert_called_once() - assert emitter._session is None # noqa: WPS437 - - -def test_can_build_tags_from_converting_dict(emitter_v0): - logger_name = "converting_dict_tags_v0" - config = { - "version": 1, - "disable_existing_loggers": False, - "handlers": { - logger_name: { - "class": "logging_loki.LokiQueueHandler", - "queue": Queue(-1), - "url": emitter_url, - "tags": {"test": "test"}, - "version": "0", - }, - }, - "loggers": {logger_name: {"handlers": [logger_name], "level": "DEBUG"}}, - } - loggingDictConfig(config) - - logger = logging.getLogger(logger_name) - emitter: LokiEmitterV0 = logger.handlers[0].handler.emitter - emitter.build_tags(create_record()) diff --git a/tests/test_emitter_v1.py b/tests/test_emitter_v1.py index b5656e1..8ea4fda 100644 --- a/tests/test_emitter_v1.py +++ b/tests/test_emitter_v1.py @@ -9,7 +9,7 @@ import pytest from freezegun import freeze_time -from logging_loki.emitter import LokiEmitterV1 +from logging_loki.emitter import LokiSimpleEmitter emitter_url: str = "https://example.net/loki/api/v1/push/" record_kwargs = { @@ -24,14 +24,14 @@ @pytest.fixture() -def emitter_v1() -> Tuple[LokiEmitterV1, MagicMock]: +def emitter_v1() -> Tuple[LokiSimpleEmitter, MagicMock]: """Create v1 emitter with mocked http session.""" response = MagicMock() - response.status_code = LokiEmitterV1.success_response_code + response.status_code = LokiSimpleEmitter.success_response_code session = MagicMock() session().post = MagicMock(return_value=response) - instance = LokiEmitterV1(url=emitter_url) + instance = LokiSimpleEmitter(url=emitter_url) instance.session_class = session return instance, session @@ -126,7 +126,9 @@ def test_raises_value_error_on_non_successful_response(emitter_v1): session().post().status_code = None with pytest.raises(ValueError): emitter(create_record(), "") - pytest.fail("Must raise ValueError on non-successful Loki response") # pragma: no cover + pytest.fail( + "Must raise ValueError on non-successful Loki response" + ) # pragma: no cover def test_logged_messaged_added_to_values(emitter_v1): @@ -155,6 +157,7 @@ def test_session_is_closed(emitter_v1): assert emitter._session is None # noqa: WPS437 +@pytest.mark.skip def test_can_build_tags_from_converting_dict(emitter_v1): logger_name = "converting_dict_tags_v1" config = { @@ -166,7 +169,7 @@ def test_can_build_tags_from_converting_dict(emitter_v1): "queue": Queue(-1), "url": emitter_url, "tags": {"test": "test"}, - "version": "1", + "emitter": LokiSimpleEmitter, }, }, "loggers": {logger_name: {"handlers": [logger_name], "level": "DEBUG"}}, @@ -174,5 +177,5 @@ def test_can_build_tags_from_converting_dict(emitter_v1): loggingDictConfig(config) logger = logging.getLogger(logger_name) - emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter + emitter: LokiSimpleEmitter = logger.handlers[0].handler.emitter emitter.build_tags(create_record())