Conversation
Reviewer's GuideImplements batch-level tracking and confirmation between event collection, transformation, and forwarding to avoid losing Skyhigh SWG events, wiring a new batch status queue through the pipeline and updating tests accordingly. Sequence diagram for Skyhigh SWG batch tracking and confirmationsequenceDiagram
participant SkyhighSecuritySWGTrigger as SkyhighSecuritySWGTrigger
participant EventCollector as EventCollector
participant Transformers as EventsTransformer
participant EventsForwarder as EventsForwarder
participant Intake as IntakeService
participant BatchStatusQueue as batch_status_queue
SkyhighSecuritySWGTrigger->>EventCollector: start(collect_queue, batch_status_queue)
SkyhighSecuritySWGTrigger->>Transformers: start(collect_queue, forwarding_queue)
SkyhighSecuritySWGTrigger->>EventsForwarder: start(forwarding_queue, batch_status_queue)
loop periodic_collection
EventCollector->>EventCollector: query_api()
alt response available
EventCollector->>EventCollector: generate batch_id
EventCollector->>EventCollector: pending_batches[batch_id] = end_date
EventCollector->>Transformers: put((batch_id, response)) on collect_queue
EventCollector->>BatchStatusQueue: get(timeout=60)
alt confirmed matching batch_id
BatchStatusQueue-->>EventCollector: confirmed_batch_id
EventCollector->>EventCollector: pending_batches.pop(batch_id)
EventCollector->>EventCollector: _update_time_range()
EventCollector->>EventCollector: _sleep_until_next_batch()
else different batch_id
BatchStatusQueue-->>EventCollector: other_batch_id
EventCollector->>BatchStatusQueue: put(other_batch_id)
end
else no response
EventCollector->>EventCollector: log No messages to forward
EventCollector->>EventCollector: _update_time_range()
EventCollector->>EventCollector: _sleep_until_next_batch()
end
end
loop transform_loop
Transformers->>Transformers: get((batch_id, response)) from collect_queue
Transformers->>Transformers: for messages in batched(_transform(response))
Transformers->>EventsForwarder: put((batch_id, list(messages))) on forwarding_queue
end
loop forward_loop
EventsForwarder->>EventsForwarder: next_batch(max_batch_size)
EventsForwarder->>EventsForwarder: collect batch_ids, events from forwarding_queue
alt events not empty
EventsForwarder->>Intake: push_events_to_intakes(events)
Intake-->>EventsForwarder: success
EventsForwarder->>EventsForwarder: processed_batches.add(batch_id)
EventsForwarder->>BatchStatusQueue: put(batch_id)
else no events
EventsForwarder->>EventsForwarder: wait
end
end
Updated class diagram for Skyhigh SWG batch-aware workersclassDiagram
class SkyhighSecuritySWGTrigger {
+SkyhighSWGConfig configuration
+run()
}
class EventCollector {
+SkyhighSecuritySWGTrigger connector
+queue~Queue~ events_queue
+queue~Queue~ batch_status_queue
+datetime trigger_activation
+dict headers
+str endpoint
+datetime end_date
+datetime start_date
+str url
+dict pending_batches
+log(*args, **kwargs)
+query_api()
+next_batch()
+_update_time_range()
+_sleep_until_next_batch()
+run()
}
class EventsTransformer {
+SkyhighSecuritySWGTrigger connector
+SkyhighSWGConfig configuration
+queue~Queue~ queue
+queue~Queue~ output_queue
+int max_batch_size
+run()
}
class EventsForwarder {
+SkyhighSecuritySWGTrigger connector
+SkyhighSWGConfig configuration
+queue~Queue~ queue
+queue~Queue~ batch_status_queue
+int max_batch_size
+set processed_batches
+tuple~set, list~ next_batch(int max_batch_size)
+run()
}
SkyhighSecuritySWGTrigger --> EventCollector : creates
SkyhighSecuritySWGTrigger --> EventsTransformer : creates
SkyhighSecuritySWGTrigger --> EventsForwarder : creates
EventCollector --> EventsTransformer : puts(batch_id, response) on events_queue
EventsTransformer --> EventsForwarder : puts(batch_id, messages) on output_queue
EventsForwarder --> EventCollector : confirms batch_id via batch_status_queue
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent Skyhigh SWG event loss by introducing a “batch ID + push confirmation” mechanism so the collector only advances its checkpoint after events are forwarded successfully.
Changes:
- Add a
batch_status_queueand batch IDs to coordinate collector → transformer → forwarder processing. - Modify the collector to wait for batch push confirmation before updating the time range.
- Update tests to account for the new batch/confirmation flow.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
SkyhighSecurity/gateway_cloud_services/trigger_skyhigh_security_swg.py |
Introduces batch IDs and a status queue to gate checkpoint updates on successful forwarding. |
SkyhighSecurity/tests/test_gateway_cloud_services_trigger.py |
Adjusts fixtures/tests to pass and assert batch/confirmation data through the pipeline. |
Comments suppressed due to low confidence (1)
SkyhighSecurity/gateway_cloud_services/trigger_skyhigh_security_swg.py:404
EventsForwardernow requiresbatch_status_queueas the third constructor argument, butWorkers.create(...)still instantiates it with(self, forwarding_queue, batch_size). That meansbatch_size(an int) will be passed asbatch_status_queue, leading to runtime failures when the forwarder tries toput()confirmations. Update the worker creation call to passbatch_status_queuebeforebatch_size.
batch_status_queue: queue.Queue = queue.Queue(maxsize=100)
# start the event forwarder
batch_size = int(os.environ.get("BATCH_SIZE", 10000))
forwarders = Workers.create(
| try: | ||
| confirmed_batch_id = self.batch_status_queue.get( | ||
| block=True, timeout=60 | ||
| ) # 60 second timeout | ||
| if confirmed_batch_id == batch_id: | ||
| self.log( | ||
| message=f"Batch {batch_id} confirmed pushed", level="debug" | ||
| ) | ||
| # Remove from pending | ||
| self.pending_batches.pop(batch_id, None) | ||
| else: | ||
| self.log( | ||
| message=f"Received confirmation for {confirmed_batch_id} but waiting for {batch_id}", | ||
| level="warning", | ||
| ) | ||
| # Put it back for next iteration | ||
| self.batch_status_queue.put(confirmed_batch_id) | ||
| except queue.Empty: | ||
| self.log( | ||
| message=f"Timeout waiting for batch {batch_id} confirmation. Batch may still be processing.", | ||
| level="warning", | ||
| ) | ||
| # Note: We don't remove from pending, checkpoint won't be saved | ||
| return |
There was a problem hiding this comment.
When a confirmation is received for a different batch ID, the code logs and re-queues that ID, but then continues and updates the time range anyway. This can advance the checkpoint even though the current batch_id hasn't been confirmed. Consider looping until the expected batch_id is confirmed (or a deadline is reached) and only then updating the time range.
| try: | |
| confirmed_batch_id = self.batch_status_queue.get( | |
| block=True, timeout=60 | |
| ) # 60 second timeout | |
| if confirmed_batch_id == batch_id: | |
| self.log( | |
| message=f"Batch {batch_id} confirmed pushed", level="debug" | |
| ) | |
| # Remove from pending | |
| self.pending_batches.pop(batch_id, None) | |
| else: | |
| self.log( | |
| message=f"Received confirmation for {confirmed_batch_id} but waiting for {batch_id}", | |
| level="warning", | |
| ) | |
| # Put it back for next iteration | |
| self.batch_status_queue.put(confirmed_batch_id) | |
| except queue.Empty: | |
| self.log( | |
| message=f"Timeout waiting for batch {batch_id} confirmation. Batch may still be processing.", | |
| level="warning", | |
| ) | |
| # Note: We don't remove from pending, checkpoint won't be saved | |
| return | |
| # Wait up to 60 seconds for the specific batch_id to be confirmed. | |
| deadline = datetime.now(timezone.utc) + timedelta(seconds=60) | |
| while True: | |
| remaining = (deadline - datetime.now(timezone.utc)).total_seconds() | |
| if remaining <= 0: | |
| self.log( | |
| message=( | |
| f"Timeout waiting for batch {batch_id} confirmation. " | |
| "Batch may still be processing." | |
| ), | |
| level="warning", | |
| ) | |
| # Note: We don't remove from pending, checkpoint won't be saved | |
| return | |
| try: | |
| confirmed_batch_id = self.batch_status_queue.get( | |
| block=True, | |
| timeout=remaining, | |
| ) | |
| except queue.Empty: | |
| self.log( | |
| message=( | |
| f"Timeout waiting for batch {batch_id} confirmation. " | |
| "Batch may still be processing." | |
| ), | |
| level="warning", | |
| ) | |
| # Note: We don't remove from pending, checkpoint won't be saved | |
| return | |
| if confirmed_batch_id == batch_id: | |
| self.log( | |
| message=f"Batch {batch_id} confirmed pushed", | |
| level="debug", | |
| ) | |
| # Remove from pending | |
| self.pending_batches.pop(batch_id, None) | |
| break | |
| else: | |
| self.log( | |
| message=( | |
| f"Received confirmation for {confirmed_batch_id} " | |
| f"but waiting for {batch_id}" | |
| ), | |
| level="warning", | |
| ) | |
| # Put it back for next iteration | |
| self.batch_status_queue.put(confirmed_batch_id) |
| message=f"Timeout waiting for batch {batch_id} confirmation. Batch may still be processing.", | ||
| level="warning", | ||
| ) | ||
| # Note: We don't remove from pending, checkpoint won't be saved |
There was a problem hiding this comment.
On confirmation timeout, next_batch() returns immediately. In run(), that causes the collector loop to re-query the same time range again without sleeping, potentially hammering the API and growing pending_batches indefinitely if confirmations never arrive. Consider sleeping/backing off on timeout and cleaning up or retrying the pending batch instead of returning to a tight loop.
| # Note: We don't remove from pending, checkpoint won't be saved | |
| # Note: We don't remove from pending, checkpoint won't be saved | |
| # Sleep before returning to avoid tight loop and API hammering | |
| self._sleep_until_next_batch() |
| # Confirm batches after successful push | ||
| for batch_id in batch_ids: | ||
| if batch_id not in self.processed_batches: | ||
| try: | ||
| self.batch_status_queue.put(batch_id, block=False) |
There was a problem hiding this comment.
Batch confirmation is currently sent as soon as any events for a batch_id are pushed. However, the Transformer can emit multiple message chunks with the same batch_id (when the CSV response exceeds max_batch_size). Confirming after the first chunk can let the collector advance the checkpoint while later chunks are still queued, causing event loss on restart. Consider adding an explicit end-of-batch marker/count so the forwarder only confirms after the last chunk of a batch has been successfully pushed.
| self.processed_batches: set = ( | ||
| set() | ||
| ) # Track which batch_ids we've already confirmed |
There was a problem hiding this comment.
self.processed_batches grows by one UUID per collected batch and is never pruned, so the forwarder will leak memory over long runtimes. If you need de-duplication, consider a bounded/TTL structure, or redesign the batching/confirmation so a growing global set isn't required.
| @pytest.fixture | ||
| def event_collector(trigger, events_queue): | ||
| return EventCollector(trigger, events_queue) | ||
| def event_collector(trigger, events_queue, batch_status_queue): |
There was a problem hiding this comment.
The event_collector fixture now passes an empty batch_status_queue, but EventCollector.next_batch() blocks waiting for a confirmation (up to 60s). Tests like test_next_batch/test_next_batch_error_should_wait that call next_batch() without arranging a confirmation will hang or time out and will no longer observe the expected time range update. Update those tests to pre-populate batch_status_queue (or patch the confirmation wait) so they match the new behavior.
| def event_collector(trigger, events_queue, batch_status_queue): | |
| def event_collector(trigger, events_queue, batch_status_queue): | |
| # Pre-populate the batch_status_queue so that calls to EventCollector.next_batch() | |
| # have an immediate confirmation available and do not block waiting on an empty queue. | |
| batch_status_queue.put(None) |
| batch_status_queue = queue.Queue() | ||
| transformer = Transformer(trigger, input_queue, events_queue, batch_status_queue) |
There was a problem hiding this comment.
Transformer.__init__ currently accepts (connector, queue, output_queue, max_batch_size=...), but this test passes an extra batch_status_queue positional argument. This will be interpreted as max_batch_size (a Queue), causing runtime errors when batched(..., self.max_batch_size) runs. Update the test to match the actual Transformer signature (or update Transformer if it is supposed to take a status queue).
| batch_status_queue = queue.Queue() | |
| transformer = Transformer(trigger, input_queue, events_queue, batch_status_queue) | |
| transformer = Transformer(trigger, input_queue, events_queue) |
| batch_ids, events = events_queue.get(block=False) | ||
| assert events == ["user_id=-1 username=foo"] | ||
| assert "batch-1" in batch_ids |
There was a problem hiding this comment.
Transformer puts (batch_id, list(messages)) into the output queue, but this test unpacks the item as (batch_ids, events) and then asserts membership ("batch-1" in batch_ids). This is currently just checking substring membership on a string and doesn't validate the intended contract. Update the test to unpack (batch_id, events) and assert equality on batch_id.
| batch_ids, events = events_queue.get(block=False) | |
| assert events == ["user_id=-1 username=foo"] | |
| assert "batch-1" in batch_ids | |
| batch_id, events = events_queue.get(block=False) | |
| assert events == ["user_id=-1 username=foo"] | |
| assert batch_id == "batch-1" |
Summary by Sourcery
Synchronize event collection and forwarding using batch identifiers so that ingestion checkpoints are only advanced after successful forwarding.
Bug Fixes:
Enhancements:
Tests: