|
| 1 | +import gzip |
1 | 2 | import os |
2 | 3 | import time |
3 | 4 |
|
4 | 5 | import mock |
| 6 | +import msgpack |
| 7 | +import pytest |
5 | 8 |
|
6 | 9 | from ddtrace.internal.datastreams.processor import PROPAGATION_KEY |
7 | 10 | from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64 |
|
15 | 18 | mocked_time = 1642544540 |
16 | 19 |
|
17 | 20 |
|
| 21 | +def _decode_datastreams_payload(payload): |
| 22 | + decompressed = gzip.decompress(payload) |
| 23 | + decoded = msgpack.unpackb(decompressed, raw=False, strict_map_key=False) |
| 24 | + |
| 25 | + return decoded |
| 26 | + |
| 27 | + |
| 28 | +def test_periodic_payload_tags(): |
| 29 | + processor = DataStreamsProcessor("http://localhost:8126") |
| 30 | + try: |
| 31 | + captured_payloads = [] |
| 32 | + with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append): |
| 33 | + processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], mocked_time, 1, 1) |
| 34 | + processor.periodic() |
| 35 | + |
| 36 | + assert captured_payloads, "expected periodic to send a payload" |
| 37 | + decoded = _decode_datastreams_payload(captured_payloads[0]) |
| 38 | + assert decoded["Service"] == processor._service |
| 39 | + assert decoded["TracerVersion"] == processor._version |
| 40 | + assert decoded["Lang"] == "python" |
| 41 | + assert decoded["Hostname"] == processor._hostname |
| 42 | + assert "ProcessTags" not in decoded |
| 43 | + finally: |
| 44 | + processor.stop() |
| 45 | + processor.join() |
| 46 | + |
| 47 | + |
| 48 | +@pytest.mark.subprocess( |
| 49 | + env=dict( |
| 50 | + DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="true", |
| 51 | + ) |
| 52 | +) |
| 53 | +def test_periodic_payload_process_tags(): |
| 54 | + import mock |
| 55 | + |
| 56 | + from ddtrace.internal.datastreams.processor import DataStreamsProcessor |
| 57 | + from tests.datastreams.test_processor import _decode_datastreams_payload |
| 58 | + |
| 59 | + processor = DataStreamsProcessor("http://localhost:8126") |
| 60 | + try: |
| 61 | + captured_payloads = [] |
| 62 | + with mock.patch.object(processor, "_flush_stats_with_backoff", side_effect=captured_payloads.append): |
| 63 | + processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], 1642544540, 1, 1) |
| 64 | + processor.periodic() |
| 65 | + |
| 66 | + assert captured_payloads, "expected periodic to send a payload" |
| 67 | + decoded = _decode_datastreams_payload(captured_payloads[0]) |
| 68 | + assert decoded["Service"] == processor._service |
| 69 | + assert decoded["TracerVersion"] == processor._version |
| 70 | + assert decoded["Lang"] == "python" |
| 71 | + assert decoded["Hostname"] == processor._hostname |
| 72 | + assert "ProcessTags" in decoded |
| 73 | + finally: |
| 74 | + processor.stop() |
| 75 | + processor.join() |
| 76 | + |
| 77 | + |
18 | 78 | def test_data_streams_processor(): |
19 | 79 | now = time.time() |
20 | 80 | processor.on_checkpoint_creation(1, 2, ["direction:out", "topic:topicA", "type:kafka"], now, 1, 1) |
|
0 commit comments