Skip to content

Commit

Permalink
[SOAR-18656] Mimecast V2 (#3066)
Browse files Browse the repository at this point in the history
* Initial task code

* Format api.py

* Refactor task

* Validators

* Update state

* Add todo. Update state handling.

* Update version
  • Loading branch information
ablakley-r7 authored Jan 29, 2025
1 parent ee91f6f commit dcecd4e
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 21 deletions.
2 changes: 1 addition & 1 deletion plugins/mimecast_v2/.CHECKSUM
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"spec": "4f06521e5c8a2a4506cf3cda093ee4ad",
"spec": "09e7a8073da54693bcc990b37ec6e2a4",
"manifest": "e0e42959bee1c96589545b1afb0b1f61",
"setup": "ea867af34e3163ba06ef9660ec9023fc",
"schemas": [
Expand Down
2 changes: 1 addition & 1 deletion plugins/mimecast_v2/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 rapid7/insightconnect-python-3-slim-plugin:6.2.3
FROM --platform=linux/amd64 rapid7/insightconnect-python-3-slim-plugin:6.2.4

LABEL organization=rapid7
LABEL sdk=python
Expand Down
19 changes: 12 additions & 7 deletions plugins/mimecast_v2/icon_mimecast_v2/connection/connection.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import insightconnect_plugin_runtime
from insightconnect_plugin_runtime.exceptions import PluginException, ConnectionTestException
from .schema import ConnectionSchema, Input
from icon_mimecast_v2.util.api import API

# Custom imports below


class Connection(insightconnect_plugin_runtime.Connection):

def __init__(self):
super(self.__class__, self).__init__(input=ConnectionSchema())

def connect(self, params):
self.logger.info("Connect: Connecting...")
# START INPUT BINDING - DO NOT REMOVE - ANY INPUTS BELOW WILL UPDATE WITH YOUR PLUGIN SPEC AFTER REGENERATION
self.cleint_secret = params.get(Input.CLEINT_SECRET)
self.client_id = params.get(Input.CLIENT_ID)
# END INPUT BINDING - DO NOT REMOVE
self.client_secret = params.get(Input.CLIENT_SECRET, {}).get("secretKey", "").strip()
self.client_id = params.get(Input.CLIENT_ID, {}).get("secretKey", "").strip()
self.api = API(client_id=self.client_id, client_secret=self.client_secret, logger=self.logger)
self.api.authenticate()

def test(self):
# TODO: Implement connection test
pass
try:
self.api.health_check()
return {"success": True}
except PluginException as error:
raise ConnectionTestException(cause=error.cause, assistance=error.assistance, data=error.data)
196 changes: 185 additions & 11 deletions plugins/mimecast_v2/icon_mimecast_v2/tasks/monitor_siem_logs/task.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,192 @@
import insightconnect_plugin_runtime
from insightconnect_plugin_runtime.exceptions import APIException, PluginException
from .schema import MonitorSiemLogsInput, MonitorSiemLogsOutput, MonitorSiemLogsState, Input, Output, Component, State
# Custom imports below
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta
import copy

LOG_TYPES = ["receipt", "url protect", "attachment protect"]
MAX_LOOKBACK_DAYS = 7
INITIAL_MAX_LOOKBACK_DAYS = 1
INITIAL_RUN = "initial_run"
SUBSEQUENT_RUN = "subsequent_run"
PAGINATION_RUN = "pagination_run"

class MonitorSiemLogs(insightconnect_plugin_runtime.Task):

class MonitorSiemLogs(insightconnect_plugin_runtime.Task):
def __init__(self):
super(self.__class__, self).__init__(
name="monitor_siem_logs",
description=Component.DESCRIPTION,
input=MonitorSiemLogsInput(),
output=MonitorSiemLogsOutput(),
state=MonitorSiemLogsState())

def run(self, params={}, state={}):
# TODO: Implement run function
return {}, {}
name="monitor_siem_logs",
description=Component.DESCRIPTION,
input=MonitorSiemLogsInput(),
output=MonitorSiemLogsOutput(),
state=MonitorSiemLogsState(),
)

def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-argument
self.logger.info(f"TASK: Received State: {state}")
existing_state = state.copy()
try:
# TODO: Additional error handling
run_condition = self.detect_run_condition(state.get("query_config", {}))
self.logger.info(f"TASK: Current run state is {run_condition}")
state = self.update_state(state, custom_config)
self.logger.info(f"NEW STATE: {state}")
now_date = datetime.now(tz=timezone.utc).date()
max_run_lookback_date = self.get_max_lookback_date(now_date, run_condition, bool(custom_config))
query_config = self.prepare_query_params(state.get("query_config", {}), max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config)
# TODO: Dedupe
self.logger.info(f"TASK: Total logs collected this run {len(logs)}")
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date)
return logs, exit_state, has_more_pages, 200, None
except APIException as error:
self.logger.info(
f"Error: An API exception has occurred. Status code: {error.status_code} returned. Cause: {error.cause}. Error data: {error.data}."
)
return [], existing_state, False, error.status_code, error
except PluginException as error:
self.logger.info(f"Error: A Plugin exception has occurred. Cause: {error.cause} Error data: {error.data}.")
return [], existing_state, False, error.status_code, error
except Exception as error:
self.logger.info(f"Error: Unknown exception has occurred. No results returned. Error Data: {error}")
return [], existing_state, False, 500, PluginException(preset=PluginException.Preset.UNKNOWN, data=error)

def detect_run_condition(self, query_config: Dict) -> str:
"""
Return runtype based on query configuration
:param query_config:
:return: runtype string
"""
if not query_config:
return INITIAL_RUN
for log_type_config in query_config.values():
if not log_type_config.get("caught_up"):
return PAGINATION_RUN
return SUBSEQUENT_RUN

def update_state(self, state: Dict, custom_config: Dict) -> Dict:
"""
Initialise state, validate state, apply custom config
:param state:
:param custom_config:
:return:
"""
initial_log_type_config = {"caught_up": False}
if not state:
state = {"query_config": {log_type: copy.deepcopy(initial_log_type_config) for log_type in LOG_TYPES}}
self.apply_custom_config(state, custom_config)
else:
for log_type in LOG_TYPES:
if log_type not in state.get("query_config", {}).keys():
state["query_config"][log_type] = copy.deepcopy(initial_log_type_config)
return state

def get_max_lookback_date(self, now_date: datetime, run_condition: str, custom_config: bool) -> datetime:
"""
Get max lookback date for run condition
:param now_date:
:param run_condition:
:param custom_config:
:return: max_run_lookback_date
"""
max_run_lookback_days = MAX_LOOKBACK_DAYS
if run_condition in [INITIAL_RUN] and not custom_config:
max_run_lookback_days = INITIAL_MAX_LOOKBACK_DAYS

max_run_lookback_date = now_date - timedelta(days=max_run_lookback_days)
return max_run_lookback_date

def apply_custom_config(self, state: Dict, custom_config: Dict) -> None:
"""
Apply custom configuration for lookback, query date applies to start and end time of query
:param current_query_config:
:param custom_config:
:return: N/A
"""
# TODO: Additional custom config for page size, thread size, limit
current_query_config = state.get("query_config")
for log_type, lookback_date_string in custom_config.items():
self.logger.info(f"TASK: Supplied lookback date of {lookback_date_string} for {log_type} log type")
current_query_config[log_type] = {"query_date": lookback_date_string}

def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_date: datetime) -> Dict:
"""
Prepare query for request. Validate query dates, move forward when caught up
:param query_config:
:param max_lookback_date:
:param now_date:
:return:
"""
for log_type, log_type_config in query_config.items():
query_date_str = log_type_config.get("query_date")
self.logger.info(f"PREPPING {log_type_config}")
self.logger.info(f"{log_type}, {query_date_str}")
if query_date_str:
query_date = datetime.strptime(query_date_str, "%Y-%m-%d").date()
if not query_date_str:
log_type_config["query_date"] = max_lookback_date
elif query_date < now_date and log_type_config.get("caught_up") is True:
self.logger.info(f"TASK: Log type {log_type} has caught up for {query_date}")
log_type_config["query_date"] = query_date + timedelta(days=1)
log_type_config["caught_up"] = False
log_type_config.pop("next_page")
query_config[log_type] = self.validate_config_lookback(log_type_config, max_lookback_date, now_date)
return query_config

def validate_config_lookback(self, log_type_config: Dict, max_lookback_date: datetime, now_date: datetime) -> Dict:
"""
Ensures provided query date in scope of request time window
:param log_type_config:
:param max_lookback_date:
:param now_date:
:return: log_type_config
"""
query_date = log_type_config.get("query_date")
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if query_date < max_lookback_date:
return {"query_date": max_lookback_date}
if query_date > now_date:
log_type_config["query_date"] = now_date
return log_type_config

def get_all_logs(self, run_condition: str, query_config: Dict) -> Tuple[List, Dict]:
"""
Gets all logs of provided log type. First retrieves batch URLs. Then downloads and reads batches, pooling logs.
:param run_condition:
:param query_config:
:return: Logs, updated query configuration (state)
"""
complete_logs = []
for log_type, log_type_config in query_config.items():
if (not log_type_config.get("caught_up")) or (run_condition != PAGINATION_RUN):
logs, results_next_page, caught_up = self.connection.api.get_siem_logs(
log_type=log_type,
query_date=log_type_config.get("query_date"),
next_page=log_type_config.get("next_page"),
)
complete_logs.extend(logs)
log_type_config.update({"next_page": results_next_page, "caught_up": caught_up})
else:
self.logger.info(f"TASK: Query for {log_type} is caught up. Skipping as we are currently paginating")
return complete_logs, query_config

def prepare_exit_state(self, state: dict, query_config: dict, now_date: datetime) -> Tuple[Dict, bool]:
"""
Prepare state and pagination for task completion. Format date time.
:param state:
:param query_config:
:param now_date:
:return: state, has_more_pages
"""
has_more_pages = False
for log_type_config in query_config.values():
query_date = log_type_config.get("query_date")
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if (not log_type_config.get("caught_up")) or query_date < now_date:
has_more_pages = True
log_type_config["query_date"] = query_date.strftime("%Y-%m-%d")
state["query_config"] = query_config
return state, has_more_pages
128 changes: 128 additions & 0 deletions plugins/mimecast_v2/icon_mimecast_v2/util/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import requests
from insightconnect_plugin_runtime.exceptions import (
APIException,
PluginException,
HTTPStatusCodes,
ResponseExceptionData,
)
from insightconnect_plugin_runtime.helper import make_request, extract_json
from logging import Logger
from requests import Response, Request
from io import BytesIO
from icon_mimecast_v2.util.endpoints import Endpoints
from typing import Dict, List, Tuple
import gzip
import json

GET = "GET"
POST = "POST"


class API:
def __init__(self, client_id: str, client_secret: str, logger: Logger) -> None:
self.client_id = client_id
self.client_secret = client_secret
self.logger = logger
self.access_token = None

def authenticate(self) -> None:
self.logger.info("API: Authenticating...")
data = {"client_id": self.client_id, "client_secret": self.client_secret, "grant_type": "client_credentials"}
response = self.make_api_request(
url=Endpoints.AUTH,
method=POST,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=data,
auth=False,
)
self.access_token = response.get("access_token")
self.logger.info("API: Authenticated")

def get_siem_logs(
self, log_type: str, query_date: str, next_page: str, page_size: int = 100
) -> Tuple[List[str], str, bool]:
batch_download_urls, result_next_page, caught_up = self.get_siem_batches(
log_type, query_date, next_page, page_size
)
logs = []
self.logger.info(f"API: Getting SIEM logs from batches for log type {log_type}...")
for url in batch_download_urls:
batch_logs = self.get_siem_logs_from_batch(url=url)
if isinstance(batch_logs, (List, Dict)):
logs.extend(batch_logs)
self.logger.info(f"API: Discovered {len(logs)} logs for log type {log_type}")
return logs, result_next_page, caught_up

def get_siem_batches(
self, log_type: str, query_date: str, next_page: str, page_size: int = 100
) -> Tuple[List[str], str, bool]:
self.logger.info(
f"API: Getting SIEM batches for log type {log_type} for {query_date} with page token {next_page}..."
)
params = {
"type": log_type,
"dateRangeStartsAt": query_date,
"dateRangeEndsAt": query_date,
"pageSize": page_size,
}
if next_page:
params.update({"nextPage": next_page})
batch_response = self.make_api_request(url=Endpoints.GET_SIEM_LOGS_BATCH, method=GET, params=params)
batch_list = batch_response.get("value", [])
caught_up = batch_response.get("isCaughtUp")
self.logger.info(
f"API: Discovered {len(batch_list)} batches for log type {log_type}. Response reporting {caught_up} that logs have caught up to query window"
)
urls = [batch.get("url") for batch in batch_list]
return urls, batch_response.get("@nextPage"), caught_up

def get_siem_logs_from_batch(self, url: str):
# TODO: Threading
response = requests.request(method=GET, url=url, stream=False)
with gzip.GzipFile(fileobj=BytesIO(response.content), mode="rb") as file_:
logs = []
# Iterate over lines in the decompressed file, decode and load the JSON
for line in file_:
decoded_line = line.decode("utf-8").strip()
logs.append(json.loads(decoded_line))
return logs

def make_api_request(
self,
url: str,
method: str,
headers: Dict = {},
json: Dict = None,
data: Dict = None,
params: Dict = None,
return_json: bool = True,
auth=True,
) -> Response:
if auth:
headers["Authorization"] = f"Bearer {self.access_token}"
request = Request(url=url, method=method, headers=headers, params=params, data=data, json=json)
# TODO: Handle rate limit, handle retry backoff
try:
response = make_request(
_request=request,
allowed_status_codes=[HTTPStatusCodes.UNAUTHORIZED],
exception_data_location=ResponseExceptionData.RESPONSE,
)
except PluginException as exception:
if isinstance(exception.data, Response):
raise APIException(
cause=exception.cause,
assistance=exception.assistance,
data=exception.data,
status_code=exception.data.status_code,
)
raise exception
if (
response.status_code == HTTPStatusCodes.UNAUTHORIZED
and extract_json(response).get("fail", [{}])[0].get("code") == "token_expired"
):
self.authenticate()
if return_json:
json_data = extract_json(response)
return json_data
return response
6 changes: 6 additions & 0 deletions plugins/mimecast_v2/icon_mimecast_v2/util/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BASE_URL = "https://api.services.mimecast.com/"


class Endpoints:
AUTH = f"{BASE_URL}oauth/token"
GET_SIEM_LOGS_BATCH = f"{BASE_URL}siem/v1/batch/events/cg"
2 changes: 1 addition & 1 deletion plugins/mimecast_v2/plugin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ support: rapid7
cloud_ready: true
sdk:
type: slim
version: 6.2.3
version: 6.2.4
user: nobody
status: []
resources:
Expand Down

0 comments on commit dcecd4e

Please sign in to comment.