Skip to content

Commit

Permalink
Fix reprocessing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
doruirimescu committed Nov 11, 2024
1 parent 3c59f98 commit 6a780f9
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/stateful_data_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,23 @@ def _iterate_items(self, items: Collection[Any], *args, **kwargs):
self.logger.info(f"Item {item} already processed, skipping...")
continue

elif item in self.data and self.should_reprocess:
if self.skip_list and item in self.skip_list:
self.logger.info(f"Item {item} in skip list, skipping...")
continue
self.logger.info(f"Reprocessing item {item}...")
self.reprocess_item(item, iteration_index, *args, **kwargs)

if self.skip_list and item in self.skip_list:
self.logger.info(f"Item {item} in skip list, skipping...")
continue

self.process_item(item, iteration_index, *args, **kwargs)
if item in self.data and self.should_reprocess:
self.logger.info(f"Reprocessing item {item}...")
self.reprocess_item(item, iteration_index, *args, **kwargs)
else:
self.process_item(item, iteration_index, *args, **kwargs)

if (iteration_index) % self.print_interval == 0:
self.logger.info(f"Processed item {item} {iteration_index + 1} / {items_len}")
self.logger.info(f"Finished processing all items. {len(self.data)} / {items_len} items processed.")
self.logger.info(
f"Processed item {item} {iteration_index + 1} / {items_len}"
)
self.logger.info(
f"Finished processing all items. {len(self.data)} / {items_len} items processed."
)

@abstractmethod
def process_item(
Expand Down

0 comments on commit 6a780f9

Please sign in to comment.