Skip to content

Commit df8a539

Browse files
TimPansinolrafeeiumaannamalaimichaelgoin
authored
Fix Lost Data in gRPC (Infinite Tracing) (#490)
* Fix for lost data in gRPC. Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: Michael Goin <[email protected]> * [Mega-Linter] Apply linters fixes * Bump Tests Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: Michael Goin <[email protected]> Co-authored-by: TimPansino <[email protected]>
1 parent c821354 commit df8a539

File tree

2 files changed

+61
-19
lines changed

2 files changed

+61
-19
lines changed

newrelic/common/streaming_utils.py

+44-17
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313
# limitations under the License.
1414

1515
import collections
16+
import logging
1617
import threading
1718

1819
try:
1920
from newrelic.core.infinite_tracing_pb2 import AttributeValue
2021
except:
2122
AttributeValue = None
2223

24+
_logger = logging.getLogger(__name__)
2325

24-
class StreamBuffer(object):
2526

27+
class StreamBuffer(object):
2628
def __init__(self, maxlen):
2729
self._queue = collections.deque(maxlen=maxlen)
2830
self._notify = self.condition()
@@ -64,18 +66,46 @@ def stats(self):
6466

6567
return seen, dropped
6668

67-
def __next__(self):
68-
while True:
69-
if self._shutdown:
70-
raise StopIteration
69+
def __iter__(self):
70+
return StreamBufferIterator(self)
71+
7172

72-
try:
73-
return self._queue.popleft()
74-
except IndexError:
75-
pass
73+
class StreamBufferIterator(object):
74+
def __init__(self, stream_buffer):
75+
self.stream_buffer = stream_buffer
76+
self._notify = self.stream_buffer._notify
77+
self._shutdown = False
78+
self._stream = None
7679

77-
with self._notify:
78-
if not self._shutdown and not self._queue:
80+
def shutdown(self):
81+
with self._notify:
82+
self._shutdown = True
83+
self._notify.notify_all()
84+
85+
def stream_closed(self):
86+
return self._shutdown or self.stream_buffer._shutdown or (self._stream and self._stream.done())
87+
88+
def __next__(self):
89+
with self._notify:
90+
while True:
91+
# When a gRPC stream receives a server side disconnect (usually in the form of an OK code)
92+
# the item it is waiting to consume from the iterator will not be sent, and will inevitably
93+
# be lost. To prevent this, StopIteration is raised by shutting down the iterator and
94+
# notifying to allow the thread to exit. Iterators cannot be reused or race conditions may
95+
# occur between iterator shutdown and restart, so a new iterator must be created from the
96+
# streaming buffer.
97+
if self.stream_closed():
98+
_logger.debug("gRPC stream is closed. Shutting down and refusing to iterate.")
99+
if not self._shutdown:
100+
self.shutdown()
101+
raise StopIteration
102+
103+
try:
104+
return self.stream_buffer._queue.popleft()
105+
except IndexError:
106+
pass
107+
108+
if not self.stream_closed() and not self.stream_buffer._queue:
79109
self._notify.wait()
80110

81111
next = __next__
@@ -90,10 +120,8 @@ def __init__(self, *args, **kwargs):
90120
if args:
91121
arg = args[0]
92122
if len(args) > 1:
93-
raise TypeError(
94-
"SpanProtoAttrs expected at most 1 argument, got %d",
95-
len(args))
96-
elif hasattr(arg, 'keys'):
123+
raise TypeError("SpanProtoAttrs expected at most 1 argument, got %d", len(args))
124+
elif hasattr(arg, "keys"):
97125
for k in arg:
98126
self[k] = arg[k]
99127
else:
@@ -104,8 +132,7 @@ def __init__(self, *args, **kwargs):
104132
self[k] = kwargs[k]
105133

106134
def __setitem__(self, key, value):
107-
super(SpanProtoAttrs, self).__setitem__(key,
108-
SpanProtoAttrs.get_attribute_value(value))
135+
super(SpanProtoAttrs, self).__setitem__(key, SpanProtoAttrs.get_attribute_value(value))
109136

110137
def copy(self):
111138
copy = SpanProtoAttrs()

newrelic/core/agent_streaming.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
4848
self._endpoint = endpoint
4949
self._ssl = ssl
5050
self.metadata = metadata
51-
self.request_iterator = stream_buffer
51+
self.stream_buffer = stream_buffer
52+
self.request_iterator = iter(stream_buffer)
5253
self.response_processing_thread = threading.Thread(
5354
target=self.process_responses, name="NR-StreamingRpc-process-responses"
5455
)
@@ -68,6 +69,12 @@ def create_channel(self):
6869

6970
self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)
7071

72+
def create_response_iterator(self):
73+
with self.stream_buffer._notify:
74+
self.request_iterator = iter(self.stream_buffer)
75+
self.request_iterator._stream = reponse_iterator = self.rpc(self.request_iterator, metadata=self.metadata)
76+
return reponse_iterator
77+
7178
@staticmethod
7279
def condition(*args, **kwargs):
7380
return threading.Condition(*args, **kwargs)
@@ -114,6 +121,12 @@ def process_responses(self):
114121
"response code. The agent will attempt "
115122
"to reestablish the stream immediately."
116123
)
124+
125+
# Reconnect channel for load balancing
126+
self.request_iterator.shutdown()
127+
self.channel.close()
128+
self.create_channel()
129+
117130
else:
118131
self.record_metric(
119132
"Supportability/InfiniteTracing/Span/Response/Error",
@@ -153,6 +166,7 @@ def process_responses(self):
153166
)
154167

155168
# Reconnect channel with backoff
169+
self.request_iterator.shutdown()
156170
self.channel.close()
157171
self.notify.wait(retry_time)
158172
if self.closed:
@@ -164,7 +178,8 @@ def process_responses(self):
164178
if self.closed:
165179
break
166180

167-
response_iterator = self.rpc(self.request_iterator, metadata=self.metadata)
181+
response_iterator = self.create_response_iterator()
182+
168183
_logger.info("Streaming RPC connect completed.")
169184

170185
try:

0 commit comments

Comments
 (0)