Skip to content

Commit 1d3ab94

Browse files
committed

8 files changed

+261
-94
lines changed

warcio/archiveiterator.py

+45-14
Original file line numberDiff line numberDiff line change
@@ -80,28 +80,46 @@ def close(self):
8080
self.reader.close_decompressor()
8181
self.reader = None
8282

83+
def _decompressor(self):
84+
"""Helper method for _iterate_records that returns the readers decompressor"""
85+
return self.reader.decompressor
86+
87+
def _iterate_records_next_record(self, raise_invalid_gzip):
88+
"""Helper method for _iterate_records that reads and returns the next record.
89+
If raise_invalid_gzip is true, _raise_invalid_gzip_err is called.
90+
91+
:param bool raise_invalid_gzip: Should the invalid gzip error be raised
92+
:return: The next record
93+
:rtype: ArcWarcRecord
94+
"""
95+
self.record = self._next_record(self.next_line)
96+
if raise_invalid_gzip:
97+
self._raise_invalid_gzip_err()
98+
return self.record
99+
83100
def _iterate_records(self):
84101
""" iterate over each record
85102
"""
86103
raise_invalid_gzip = False
87104
empty_record = False
88105

106+
# perf references in order to avoid the cost of dot property lookup
107+
self_read_to_end = self.read_to_end
108+
self_decompressor = self._decompressor
109+
self_reader_read_next_member = self.reader.read_next_member
110+
self_iterate_records_next_record = self._iterate_records_next_record
111+
89112
while True:
90113
try:
91-
self.record = self._next_record(self.next_line)
92-
if raise_invalid_gzip:
93-
self._raise_invalid_gzip_err()
94-
95-
yield self.record
96-
114+
yield self_iterate_records_next_record(raise_invalid_gzip)
97115
except EOFError:
98116
empty_record = True
99117

100-
self.read_to_end()
118+
self_read_to_end()
101119

102-
if self.reader.decompressor:
120+
if self_decompressor():
103121
# if another gzip member, continue
104-
if self.reader.read_next_member():
122+
if self_reader_read_next_member():
105123
continue
106124

107125
# if empty record, then we're done
@@ -149,10 +167,18 @@ def _consume_blanklines(self):
149167
display a warning
150168
"""
151169
empty_size = 0
170+
error_count = 0
152171
first_line = True
153172

173+
# perf references in order to avoid the cost of dot property lookup
174+
self_reader_readline = self.reader.readline
175+
self_fh_tell = self.fh.tell
176+
self_reader_rem_length = self.reader.rem_length
177+
self_INC_RECORD_format = self.INC_RECORD.format
178+
sys_stderr_write = sys.stderr.write
179+
154180
while True:
155-
line = self.reader.readline()
181+
line = self_reader_readline()
156182
if len(line) == 0:
157183
return None, empty_size
158184

@@ -164,13 +190,15 @@ def _consume_blanklines(self):
164190
if len(stripped) != 0:
165191
# if first line is not blank,
166192
# likely content-length was invalid, display warning
167-
err_offset = self.fh.tell() - self.reader.rem_length() - empty_size
168-
sys.stderr.write(self.INC_RECORD.format(err_offset, line))
169-
self.err_count += 1
193+
err_offset = self_fh_tell() - self_reader_rem_length() - empty_size
194+
sys_stderr_write(self_INC_RECORD_format(err_offset, line))
195+
error_count += 1
170196

171197
first_line = False
172198
continue
173199

200+
# we hit here only once in the loop
201+
self.err_count += error_count
174202
return line, empty_size
175203

176204
def read_to_end(self, record=None):
@@ -189,8 +217,11 @@ def read_to_end(self, record=None):
189217

190218
curr_offset = self.offset
191219

220+
# perf reference in order to avoid the cost of dot property lookup
221+
self_record_raw_stream_read = self.record.raw_stream.read
222+
192223
while True:
193-
b = self.record.raw_stream.read(BUFF_SIZE)
224+
b = self_record_raw_stream_read(BUFF_SIZE)
194225
if not b:
195226
break
196227

warcio/bufferedreaders.py

+132-41
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ def _init_decomp(self, decomp_type):
9494
self.decomp_type = None
9595
self.decompressor = None
9696

97+
def _fillbuff_has_more_data(self):
98+
"""Returns T/F to indicate if there is more data
99+
to be read by _fillbuff.
100+
101+
:return: T/F to indicate if there is more data
102+
to be read
103+
:rtype: bool
104+
"""
105+
return self.decompressor and not self.decompressor.unused_data and self.empty()
106+
97107
def _fillbuff(self, block_size=None):
98108
if not self.empty():
99109
return
@@ -112,12 +122,17 @@ def _fillbuff(self, block_size=None):
112122

113123
self._process_read(data)
114124

125+
# perf references in order to avoid the cost of dot property lookup
126+
self_stream_read = self.stream.read
127+
self_process_read = self._process_read
128+
self_fillbuff_has_more_data = self._fillbuff_has_more_data
129+
115130
# if raw data is not empty and decompressor set, but
116131
# decompressed buff is empty, keep reading --
117132
# decompressor likely needs more data to decompress
118-
while data and self.decompressor and not self.decompressor.unused_data and self.empty():
119-
data = self.stream.read(block_size)
120-
self._process_read(data)
133+
while data and self_fillbuff_has_more_data():
134+
data = self_stream_read(block_size)
135+
self_process_read(data)
121136

122137
def _process_read(self, data):
123138
# don't process if no raw data read
@@ -149,6 +164,14 @@ def _decompress(self, data):
149164
return b''
150165
return data
151166

167+
def _buff_read(self, length):
168+
"""Utility method for read that returns
169+
the results of self.buff.read(length).
170+
171+
:param int length: The mount to be read
172+
"""
173+
return self.buff.read(length)
174+
152175
def read(self, length=None):
153176
"""
154177
Fill bytes and read some number of bytes
@@ -158,19 +181,32 @@ def read(self, length=None):
158181
specified length is read
159182
"""
160183
all_buffs = []
184+
185+
# perf references in order to avoid the cost of dot property lookup
186+
all_buffs_append = all_buffs.append
187+
self_fillbuff = self._fillbuff
188+
self_empty = self.empty
189+
self_buff_read = self._buff_read
190+
161191
while length is None or length > 0:
162-
self._fillbuff()
163-
if self.empty():
192+
self_fillbuff()
193+
if self_empty():
164194
break
165195

166-
buff = self.buff.read(length)
167-
all_buffs.append(buff)
196+
buff = self_buff_read(length)
197+
all_buffs_append(buff)
168198
if length:
169199
length -= len(buff)
170200

171201
return b''.join(all_buffs)
172202

203+
def _buff_readline(self, length):
204+
"""Utility method for read that returns
205+
the results of self.buff.readline(length).
173206
207+
:param int length: The mount to be read
208+
"""
209+
return self.buff.readline(length)
174210

175211
def readline(self, length=None):
176212
"""
@@ -189,21 +225,35 @@ def readline(self, length=None):
189225

190226
linebuff = self.buff.readline(length)
191227

228+
# perf references in order to avoid the cost of dot property lookup
229+
self_fillbuff = self._fillbuff
230+
self_empty = self.empty
231+
self_buff_readline = self._buff_readline
232+
233+
# string concatenation using += is an expensive operation due to python's string internment
234+
# appending each part of the string using a list is the pythonic way
235+
# https://wiki.python.org/moin/PythonSpeed/PerformanceTips#String_Concatenation
236+
current_full_line_buff_len = len(linebuff)
237+
full_line_buff = [linebuff]
238+
full_line_buff_append = full_line_buff.append
239+
192240
# we may be at a boundary
193241
while not linebuff.endswith(b'\n'):
194242
if length:
195-
length -= len(linebuff)
243+
length -= current_full_line_buff_len
196244
if length <= 0:
197245
break
198246

199-
self._fillbuff()
247+
self_fillbuff()
200248

201-
if self.empty():
249+
if self_empty():
202250
break
203251

204-
linebuff += self.buff.readline(length)
252+
linebuff = self_buff_readline(length)
253+
full_line_buff_append(linebuff)
254+
current_full_line_buff_len += len(linebuff)
205255

206-
return linebuff
256+
return b''.join(full_line_buff)
207257

208258
def empty(self):
209259
if not self.buff or self.buff.tell() >= self.buff_size:
@@ -292,29 +342,53 @@ def _fillbuff(self, block_size=None):
292342
if self.not_chunked:
293343
return super(ChunkedDataReader, self)._fillbuff(block_size)
294344

345+
length_header = None
346+
347+
# perf references in order to avoid the cost of dot property lookup
348+
self_chunked_fillbuff_has_more_data = self._chunked_fillbuff_has_more_data
349+
self_stream_readline = self.stream.readline
350+
self_try_decode = self._try_decode
351+
self_chunked_fillbuff_handle_exception = self._chunked_fillbuff_handle_exception
352+
295353
# Loop over chunks until there is some data (not empty())
296354
# In particular, gzipped data may require multiple chunks to
297355
# return any decompressed result
298-
while (self.empty() and
299-
not self.all_chunks_read and
300-
not self.not_chunked):
301-
356+
while self_chunked_fillbuff_has_more_data():
302357
try:
303-
length_header = self.stream.readline(64)
304-
self._try_decode(length_header)
358+
length_header = self_stream_readline(64)
359+
self_try_decode(length_header)
305360
except ChunkedDataException as e:
306-
if self.raise_chunked_data_exceptions:
307-
raise
361+
self_chunked_fillbuff_handle_exception(e, length_header, block_size)
308362

309-
# Can't parse the data as chunked.
310-
# It's possible that non-chunked data is served
311-
# with a Transfer-Encoding: chunked.
312-
# Treat this as non-chunk encoded from here on.
313-
self._process_read(length_header + e.data)
314-
self.not_chunked = True
363+
def _chunked_fillbuff_has_more_data(self):
364+
"""Determines if there is more data to be had for filling the
365+
_fillbuff method.
315366
316-
# parse as block as non-chunked
317-
return super(ChunkedDataReader, self)._fillbuff(block_size)
367+
:return: T/F indicating if there is more data in the stream
368+
:rtype: bool
369+
"""
370+
return self.empty() and not self.all_chunks_read and not self.not_chunked
371+
372+
def _chunked_fillbuff_handle_exception(self, e, length_header, block_size):
373+
"""Handles the ChunkedDataException raised by _try_decode while attempting
374+
to fill the buffer
375+
376+
:param ChunkedDataException e:
377+
:param bytes length_header:
378+
:param int block_size:
379+
"""
380+
if self.raise_chunked_data_exceptions:
381+
raise e
382+
383+
# Can't parse the data as chunked.
384+
# It's possible that non-chunked data is served
385+
# with a Transfer-Encoding: chunked.
386+
# Treat this as non-chunk encoded from here on.
387+
self._process_read(length_header + e.data)
388+
self.not_chunked = True
389+
390+
# parse as block as non-chunked
391+
return super(ChunkedDataReader, self)._fillbuff(block_size)
318392

319393
def _try_decode(self, length_header):
320394
# decode length header
@@ -336,36 +410,53 @@ def _try_decode(self, length_header):
336410
return
337411

338412
data_len = 0
339-
data = b''
413+
414+
# string concatenation perf
415+
data = []
416+
417+
# perf references in order to avoid the cost of dot property lookup
418+
data_append = data.append
419+
self_stream_read = self.stream.read
420+
self_try_decode_no_new_data = self._try_decode_no_new_data
340421

341422
# read chunk
342423
while data_len < chunk_size:
343-
new_data = self.stream.read(chunk_size - data_len)
424+
new_data = self_stream_read(chunk_size - data_len)
344425

345426
# if we unexpectedly run out of data,
346427
# either raise an exception or just stop reading,
347428
# assuming file was cut off
348429
if not new_data:
349-
if self.raise_chunked_data_exceptions:
350-
msg = 'Ran out of data before end of chunk'
351-
raise ChunkedDataException(msg, data)
352-
else:
353-
chunk_size = data_len
354-
self.all_chunks_read = True
430+
# if self_try_decode_no_new_data does not raise an exception
431+
# set chunk_size to the current data_len in order to stop reading
432+
self_try_decode_no_new_data(data)
433+
chunk_size = data_len
355434

356-
data += new_data
357-
data_len = len(data)
435+
data_append(new_data)
436+
data_len += len(new_data)
358437

359438
# if we successfully read a block without running out,
360439
# it should end in \r\n
361440
if not self.all_chunks_read:
362441
clrf = self.stream.read(2)
363442
if clrf != b'\r\n':
364-
raise ChunkedDataException(b"Chunk terminator not found.",
365-
data)
443+
raise ChunkedDataException(b"Chunk terminator not found.", b''.join(data))
366444

367445
# hand to base class for further processing
368-
self._process_read(data)
446+
self._process_read(b''.join(data))
447+
448+
def _try_decode_no_new_data(self, data_buffer):
449+
"""If we unexpectedly run out of data, either raise an exception or just stop reading,
450+
assuming file was cut off.
451+
452+
:param list[bytes] data_buffer: The list of byte strings being
453+
:return:
454+
"""
455+
if self.raise_chunked_data_exceptions:
456+
msg = 'Ran out of data before end of chunk'
457+
raise ChunkedDataException(msg, b''.join(data_buffer))
458+
else:
459+
self.all_chunks_read = True
369460

370461

371462
#=================================================================

0 commit comments

Comments
 (0)