File tree Expand file tree Collapse file tree 1 file changed +4
-5
lines changed
quickwit/quickwit-indexing/src/source Expand file tree Collapse file tree 1 file changed +4
-5
lines changed Original file line number Diff line number Diff line change @@ -191,6 +191,7 @@ impl ObjectUriBatchReader {
191191 }
192192 let limit_num_bytes = self . current_offset + BATCH_NUM_BYTES_LIMIT as usize ;
193193 let mut new_offset = self . current_offset ;
194+ let mut eof_position: Option < Position > = None ;
194195 while new_offset < limit_num_bytes {
195196 if let Some ( record) = source_progress
196197 . protect_future ( self . reader . next_record ( ) )
@@ -200,18 +201,16 @@ impl ObjectUriBatchReader {
200201 batch_builder. add_doc ( record. doc ) ;
201202 if record. is_last {
202203 self . is_eof = true ;
204+ eof_position = Some ( Position :: eof ( new_offset) ) ;
203205 break ;
204206 }
205207 } else {
206208 self . is_eof = true ;
209+ eof_position = Some ( Position :: eof ( new_offset) ) ;
207210 break ;
208211 }
209212 }
210- let to_position = if self . is_eof {
211- Position :: eof ( new_offset)
212- } else {
213- Position :: offset ( new_offset)
214- } ;
213+ let to_position = eof_position. unwrap_or ( Position :: offset ( new_offset) ) ;
215214 batch_builder. checkpoint_delta . record_partition_delta (
216215 self . partition_id . clone ( ) ,
217216 Position :: offset ( self . current_offset ) ,
You can’t perform that action at this time.
0 commit comments