Skip to content

Commit 5a0e971

Browse files
Mephiusdpkp
authored andcommitted
Fixed couple of "leaks" when gc is disabled (#979)
1 parent 82d50f4 commit 5a0e971

File tree

4 files changed

+27
-16
lines changed

4 files changed

+27
-16
lines changed

kafka/protocol/legacy.py

+17-12
Original file line numberDiff line numberDiff line change
@@ -133,21 +133,26 @@ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
133133
if acks not in (1, 0, -1):
134134
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
135135

136+
topics = []
137+
for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
138+
topic_msgs = []
139+
for partition, payload in topic_payloads.items():
140+
partition_msgs = []
141+
for msg in payload.messages:
142+
m = kafka.protocol.message.Message(
143+
msg.value, key=msg.key,
144+
magic=msg.magic, attributes=msg.attributes
145+
)
146+
partition_msgs.append((0, m.encode()))
147+
topic_msgs.append((partition, partition_msgs))
148+
topics.append((topic, topic_msgs))
149+
150+
136151
return kafka.protocol.produce.ProduceRequest[0](
137152
required_acks=acks,
138153
timeout=timeout,
139-
topics=[(
140-
topic,
141-
[(
142-
partition,
143-
[(0,
144-
kafka.protocol.message.Message(
145-
msg.value, key=msg.key,
146-
magic=msg.magic, attributes=msg.attributes
147-
).encode())
148-
for msg in payload.messages])
149-
for partition, payload in topic_payloads.items()])
150-
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
154+
topics=topics
155+
)
151156

152157
@classmethod
153158
def decode_produce_response(cls, response):

kafka/protocol/message.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from .types import (
1111
Int8, Int32, Int64, Bytes, Schema, AbstractType
1212
)
13-
from ..util import crc32
13+
from ..util import crc32, WeakMethod
1414

1515

1616
class Message(Struct):
@@ -52,7 +52,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
5252
self.attributes = attributes
5353
self.key = key
5454
self.value = value
55-
self.encode = self._encode_self
55+
self.encode = WeakMethod(self._encode_self)
5656

5757
@property
5858
def timestamp_type(self):

kafka/protocol/struct.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from .abstract import AbstractType
66
from .types import Schema
77

8+
from ..util import WeakMethod
9+
810

911
class Struct(AbstractType):
1012
SCHEMA = Schema()
@@ -19,7 +21,9 @@ def __init__(self, *args, **kwargs):
1921
self.__dict__.update(kwargs)
2022

2123
# overloading encode() to support both class and instance
22-
self.encode = self._encode_self
24+
# Without WeakMethod() this creates circular ref, which
25+
# causes instances to "leak" to garbage
26+
self.encode = WeakMethod(self._encode_self)
2327

2428
@classmethod
2529
def encode(cls, item): # pylint: disable=E0202

kafka/vendor/six.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ def __len__(self):
7070
else:
7171
# 64-bit
7272
MAXSIZE = int((1 << 63) - 1)
73-
del X
73+
74+
# Don't del it here, cause with gc disabled this "leaks" to garbage
75+
# del X
7476

7577

7678
def _add_doc(func, doc):

0 commit comments

Comments
 (0)