forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrecord_batch_compose.py
77 lines (58 loc) · 1.96 KB
/
record_batch_compose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
from __future__ import print_function
import hashlib
import itertools
import os
import random
import perf
from kafka.record.memory_records import MemoryRecordsBuilder
DEFAULT_BATCH_SIZE = 1600 * 1024
KEY_SIZE = 6
VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages
MESSAGES_PER_BATCH = 100
def random_bytes(length):
buffer = bytearray(length)
for i in range(length):
buffer[i] = random.randint(0, 255)
return bytes(buffer)
def prepare():
return iter(itertools.cycle([
(random_bytes(KEY_SIZE),
random_bytes(VALUE_SIZE),
random.randint(*TIMESTAMP_RANGE)
)
for _ in range(int(MESSAGES_PER_BATCH * 1.94))
]))
def finalize(results):
# Just some strange code to make sure PyPy does execute the main code
# properly, without optimizing it away
hash_val = hashlib.md5()
for buf in results:
hash_val.update(buf)
print(hash_val, file=open(os.devnull, "w"))
def func(loops, magic):
# Jit can optimize out the whole function if the result is the same each
# time, so we need some randomized input data )
precomputed_samples = prepare()
results = []
# Main benchmark code.
t0 = perf.perf_counter()
for _ in range(loops):
batch = MemoryRecordsBuilder(
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
for _ in range(MESSAGES_PER_BATCH):
key, value, timestamp = next(precomputed_samples)
size = batch.append(
timestamp=timestamp, key=key, value=value)
assert size
batch.close()
results.append(batch.buffer())
res = perf.perf_counter() - t0
finalize(results)
return res
runner = perf.Runner()
runner.bench_time_func('batch_append_v0', func, 0)
runner.bench_time_func('batch_append_v1', func, 1)
runner.bench_time_func('batch_append_v2', func, 2)