Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 98 additions & 18 deletions SkyhighSecurity/gateway_cloud_services/trigger_skyhigh_security_swg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import os
import queue
import uuid
from collections.abc import Generator
from datetime import datetime, timedelta, timezone
from functools import cached_property
Expand Down Expand Up @@ -32,10 +33,18 @@ class SkyhighSWGConfig(DefaultConnectorConfiguration):


class EventCollector(Thread):
def __init__(self, connector: "SkyhighSecuritySWGTrigger", events_queue: queue.Queue):
def __init__(
self,
connector: "SkyhighSecuritySWGTrigger",
events_queue: queue.Queue,
batch_status_queue: queue.Queue,
):
super().__init__()
self.connector = connector
self.events_queue = events_queue
self.batch_status_queue = (
batch_status_queue # Queue to receive batch push confirmation
)
self.trigger_activation: datetime = datetime.now(timezone.utc)
self.headers = {"Accept": "text/csv", "x-mwg-api-version": "8"}
self.endpoint: str = "/mwg/api/reporting/forensic/"
Expand All @@ -44,6 +53,7 @@ def __init__(self, connector: "SkyhighSecuritySWGTrigger", events_queue: queue.Q
self.end_date: datetime
self.start_date: datetime
self.url: str
self.pending_batches: dict = {} # Track batch_id -> end_date

def log(self, *args, **kwargs):
self.connector.log(*args, **kwargs)
Expand Down Expand Up @@ -169,24 +179,57 @@ def next_batch(self):
Fetch the next batch of events and put them in the queue

1. Query the API
2. If we have a response, put it in the queue
3. Update the time range
4. Sleep until the next batch
2. If we have a response, tag it with batch ID and put it in the queue
3. Wait for confirmation that the batch was pushed successfully
4. Update the time range
5. Sleep until the next batch
"""
try:
# 1. Query the API
response = self.query_api()

if response:
# 2. If we have a response, put it in the queue
self.events_queue.put(response)
# 2. Tag with batch ID and queue it
batch_id = str(uuid.uuid4())
self.pending_batches[batch_id] = self.end_date
self.events_queue.put((batch_id, response))

# 3. Wait for confirmation that this batch was pushed
self.log(
message=f"Waiting for batch {batch_id} to be pushed...",
level="debug",
)
try:
confirmed_batch_id = self.batch_status_queue.get(
block=True, timeout=60
) # 60 second timeout
if confirmed_batch_id == batch_id:
self.log(
message=f"Batch {batch_id} confirmed pushed", level="debug"
)
# Remove from pending
self.pending_batches.pop(batch_id, None)
else:
self.log(
message=f"Received confirmation for {confirmed_batch_id} but waiting for {batch_id}",
level="warning",
)
# Put it back for next iteration
self.batch_status_queue.put(confirmed_batch_id)
except queue.Empty:
self.log(
message=f"Timeout waiting for batch {batch_id} confirmation. Batch may still be processing.",
level="warning",
)
# Note: We don't remove from pending, checkpoint won't be saved
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On confirmation timeout, next_batch() returns immediately. In run(), that causes the collector loop to re-query the same time range again without sleeping, potentially hammering the API and growing pending_batches indefinitely if confirmations never arrive. Consider sleeping/backing off on timeout and cleaning up or retrying the pending batch instead of returning to a tight loop.

Suggested change
# Note: We don't remove from pending, checkpoint won't be saved
# Note: We don't remove from pending, checkpoint won't be saved
# Sleep before returning to avoid tight loop and API hammering
self._sleep_until_next_batch()

Copilot uses AI. Check for mistakes.
return
Comment on lines +202 to +225
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a confirmation is received for a different batch ID, the code logs and re-queues that ID, but then continues and updates the time range anyway. This can advance the checkpoint even though the current batch_id hasn't been confirmed. Consider looping until the expected batch_id is confirmed (or a deadline is reached) and only then updating the time range.

Suggested change
try:
confirmed_batch_id = self.batch_status_queue.get(
block=True, timeout=60
) # 60 second timeout
if confirmed_batch_id == batch_id:
self.log(
message=f"Batch {batch_id} confirmed pushed", level="debug"
)
# Remove from pending
self.pending_batches.pop(batch_id, None)
else:
self.log(
message=f"Received confirmation for {confirmed_batch_id} but waiting for {batch_id}",
level="warning",
)
# Put it back for next iteration
self.batch_status_queue.put(confirmed_batch_id)
except queue.Empty:
self.log(
message=f"Timeout waiting for batch {batch_id} confirmation. Batch may still be processing.",
level="warning",
)
# Note: We don't remove from pending, checkpoint won't be saved
return
# Wait up to 60 seconds for the specific batch_id to be confirmed.
deadline = datetime.now(timezone.utc) + timedelta(seconds=60)
while True:
remaining = (deadline - datetime.now(timezone.utc)).total_seconds()
if remaining <= 0:
self.log(
message=(
f"Timeout waiting for batch {batch_id} confirmation. "
"Batch may still be processing."
),
level="warning",
)
# Note: We don't remove from pending, checkpoint won't be saved
return
try:
confirmed_batch_id = self.batch_status_queue.get(
block=True,
timeout=remaining,
)
except queue.Empty:
self.log(
message=(
f"Timeout waiting for batch {batch_id} confirmation. "
"Batch may still be processing."
),
level="warning",
)
# Note: We don't remove from pending, checkpoint won't be saved
return
if confirmed_batch_id == batch_id:
self.log(
message=f"Batch {batch_id} confirmed pushed",
level="debug",
)
# Remove from pending
self.pending_batches.pop(batch_id, None)
break
else:
self.log(
message=(
f"Received confirmation for {confirmed_batch_id} "
f"but waiting for {batch_id}"
),
level="warning",
)
# Put it back for next iteration
self.batch_status_queue.put(confirmed_batch_id)

Copilot uses AI. Check for mistakes.
else:
self.log(message="No messages to forward", level="info")

# 3. Update the time range
# 4. Update the time range (safe now, events are pushed)
self._update_time_range()

# 4. Sleep until the next batch
# 5. Sleep until the next batch
self._sleep_until_next_batch()
except Exception as ex:
self.log_exception(ex, message="Failed to fetch events")
Expand Down Expand Up @@ -236,15 +279,17 @@ def run(self):
try:
while self.is_running or self.queue.qsize() > 0:
try:
response = self.queue.get(block=True, timeout=0.5)
# Get batch_id and response
batch_id, response = self.queue.get(block=True, timeout=0.5)

# The transformation is done in batches to avoid filling the memory if we have a lot of events
for messages in batched(self._transform(response), self.max_batch_size):
if len(messages) > 0:
nb_events = len(messages)
INCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(nb_events)
logger.info("Transformed events", nb_events=nb_events)
self.output_queue.put(list(messages))
# Pass batch_id along with messages
self.output_queue.put((batch_id, list(messages)))

except queue.Empty:
pass
Expand All @@ -257,44 +302,77 @@ def run(self):
class EventsForwarder(Worker):
KIND = "forwarder"

def __init__(self, connector: "SkyhighSecuritySWGTrigger", queue: queue.Queue, max_batch_size: int = 20000):
def __init__(
self,
connector: "SkyhighSecuritySWGTrigger",
queue: queue.Queue,
batch_status_queue: queue.Queue,
max_batch_size: int = 20000,
):
super().__init__()
self.connector = connector
self.configuration = connector.configuration
self.queue = queue
self.batch_status_queue = (
batch_status_queue # Queue to send batch completion confirmation
)
self.max_batch_size = max_batch_size
self.processed_batches: set = (
set()
) # Track which batch_ids we've already confirmed
Comment on lines +320 to +322
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.processed_batches grows by one UUID per collected batch and is never pruned, so the forwarder will leak memory over long runtimes. If you need de-duplication, consider a bounded/TTL structure, or redesign the batching/confirmation so a growing global set isn't required.

Copilot uses AI. Check for mistakes.

def next_batch(self, max_batch_size: int) -> list:
def next_batch(self, max_batch_size: int) -> tuple[set, list]:
"""
Returns tuple of (batch_ids, events)
batch_ids: set of batch IDs processed in this batch
events: list of events
"""
events = []
batch_ids = set()
while self.is_running:
try:
messages = self.queue.get(block=True, timeout=0.5)
batch_id, messages = self.queue.get(block=True, timeout=0.5)

if len(messages) > 0:
events.extend(messages)
batch_ids.add(batch_id)

if len(events) >= max_batch_size:
break

except queue.Empty:
break

return events
return batch_ids, events

def run(self):
logger.info("Starting Events Forwarder worker thread.")

try:
while self.is_running or self.queue.qsize() > 0:
events = self.next_batch(self.max_batch_size)
OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(events))
batch_ids, events = self.next_batch(self.max_batch_size)

if len(events) > 0:
OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(events))
self.connector.log(
message=f"Forward {len(events)} events to the intake",
level="info",
)
self.connector.push_events_to_intakes(events=events)

# Confirm batches after successful push
for batch_id in batch_ids:
if batch_id not in self.processed_batches:
try:
self.batch_status_queue.put(batch_id, block=False)
Comment on lines +363 to +367
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch confirmation is currently sent as soon as any events for a batch_id are pushed. However, the Transformer can emit multiple message chunks with the same batch_id (when the CSV response exceeds max_batch_size). Confirming after the first chunk can let the collector advance the checkpoint while later chunks are still queued, causing event loss on restart. Consider adding an explicit end-of-batch marker/count so the forwarder only confirms after the last chunk of a batch has been successfully pushed.

Copilot uses AI. Check for mistakes.
self.processed_batches.add(batch_id)
logger.debug(
f"Confirmed batch {batch_id} pushed successfully"
)
except queue.Full:
logger.warning(
f"Failed to confirm batch {batch_id}, status queue full"
)
except Exception as ex:
self.connector.log_exception(ex, message="Failed to forward events")

Expand All @@ -318,6 +396,8 @@ def run(self): # pragma: no cover
collect_queue: queue.Queue = queue.Queue(maxsize=collect_queue_size)
forwarding_queue_size = int(os.environ.get("FORWARDING_QUEUE_SIZE", 10000))
forwarding_queue: queue.Queue = queue.Queue(maxsize=forwarding_queue_size)
# Queue for batch status confirmation (small size, only needs batch IDs)
batch_status_queue: queue.Queue = queue.Queue(maxsize=100)

# start the event forwarder
batch_size = int(os.environ.get("BATCH_SIZE", 10000))
Expand All @@ -333,7 +413,7 @@ def run(self): # pragma: no cover
transformers.start()

# start the event collector
collector = EventCollector(self, collect_queue)
collector = EventCollector(self, collect_queue, batch_status_queue)
collector.start()

try:
Expand All @@ -349,7 +429,7 @@ def run(self): # pragma: no cover
# if the collector is down, restart it
if not collector.is_alive():
self.log(message="Event collector failed", level="error")
collector = EventCollector(self, collect_queue)
collector = EventCollector(self, collect_queue, batch_status_queue)
collector.start()

finally:
Expand Down
30 changes: 20 additions & 10 deletions SkyhighSecurity/tests/test_gateway_cloud_services_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ def trigger(symphony_storage):

yield trigger

@pytest.fixture
def batch_status_queue():
return queue.Queue()


@pytest.fixture
def event_collector(trigger, events_queue):
return EventCollector(trigger, events_queue)
def event_collector(trigger, events_queue, batch_status_queue):
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event_collector fixture now passes an empty batch_status_queue, but EventCollector.next_batch() blocks waiting for a confirmation (up to 60s). Tests like test_next_batch/test_next_batch_error_should_wait that call next_batch() without arranging a confirmation will hang or time out and will no longer observe the expected time range update. Update those tests to pre-populate batch_status_queue (or patch the confirmation wait) so they match the new behavior.

Suggested change
def event_collector(trigger, events_queue, batch_status_queue):
def event_collector(trigger, events_queue, batch_status_queue):
# Pre-populate the batch_status_queue so that calls to EventCollector.next_batch()
# have an immediate confirmation available and do not block waiting on an empty queue.
batch_status_queue.put(None)

Copilot uses AI. Check for mistakes.
return EventCollector(trigger, events_queue, batch_status_queue)


@pytest.fixture
def forwarder(trigger, events_queue):
return EventsForwarder(trigger, events_queue, 500)
def forwarder(trigger, events_queue, batch_status_queue):
return EventsForwarder(trigger, events_queue, batch_status_queue, 500)


def test_query_api_wrong_creds(trigger, event_collector, requests_mock):
Expand Down Expand Up @@ -166,25 +170,31 @@ def test_next_batch_error_should_wait(event_collector, requests_mock):

def test_tranformer_with_event(trigger, events_queue):
input_queue = queue.Queue()
transformer = Transformer(trigger, input_queue, events_queue)
batch_status_queue = queue.Queue()
transformer = Transformer(trigger, input_queue, events_queue, batch_status_queue)
Comment on lines +173 to +174
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transformer.__init__ currently accepts (connector, queue, output_queue, max_batch_size=...), but this test passes an extra batch_status_queue positional argument. This will be interpreted as max_batch_size (a Queue), causing runtime errors when batched(..., self.max_batch_size) runs. Update the test to match the actual Transformer signature (or update Transformer if it is supposed to take a status queue).

Suggested change
batch_status_queue = queue.Queue()
transformer = Transformer(trigger, input_queue, events_queue, batch_status_queue)
transformer = Transformer(trigger, input_queue, events_queue)

Copilot uses AI. Check for mistakes.

input_queue.put('"user_id","username"\r\n"-1","foo"')
input_queue.put(("batch-1", '"user_id","username"\r\n"-1","foo"'))
transformer.start()
time.sleep(0.5)
transformer.stop()

events = events_queue.get(block=False)
batch_ids, events = events_queue.get(block=False)
assert events == ["user_id=-1 username=foo"]
assert "batch-1" in batch_ids
Comment on lines +181 to +183
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transformer puts (batch_id, list(messages)) into the output queue, but this test unpacks the item as (batch_ids, events) and then asserts membership ("batch-1" in batch_ids). This is currently just checking substring membership on a string and doesn't validate the intended contract. Update the test to unpack (batch_id, events) and assert equality on batch_id.

Suggested change
batch_ids, events = events_queue.get(block=False)
assert events == ["user_id=-1 username=foo"]
assert "batch-1" in batch_ids
batch_id, events = events_queue.get(block=False)
assert events == ["user_id=-1 username=foo"]
assert batch_id == "batch-1"

Copilot uses AI. Check for mistakes.


def test_forwarder(trigger, forwarder, events_queue):
events_queue.put("message")
def test_forwarder(trigger, forwarder, events_queue, batch_status_queue):
batch_id = "batch-test"
events = ["user_id=-1 username=foo"]
events_queue.put((batch_id, events))

forwarder.start()
time.sleep(1)
forwarder.stop()

assert trigger.push_events_to_intakes.called

confirmed_batch = batch_status_queue.get(block=False)
assert confirmed_batch == batch_id

def test_sleep_until_next_batch(event_collector):
end_date = datetime(2023, 3, 22, 11, 50, 46, tzinfo=timezone.utc)
Expand Down
Loading