From 590ad052261057e4ccf839ca0c4a384b8055b069 Mon Sep 17 00:00:00 2001 From: tanushreekurup_crest Date: Tue, 29 Oct 2024 20:06:09 +0530 Subject: [PATCH 1/3] CTE Netskope: Fixed an issue while sharing URLs. --- netskope/main.py | 1 + netskope/manifest.json | 164 ++++++++++++++++++++--------------------- 2 files changed, 83 insertions(+), 82 deletions(-) diff --git a/netskope/main.py b/netskope/main.py index 0c92e11e..6f313746 100644 --- a/netskope/main.py +++ b/netskope/main.py @@ -952,6 +952,7 @@ def _push_malsites( message="Could not share indicators.", ) invalid_indicators = [] + ipv6_iocs = [] if append_urllist_netskope.status_code == 400: response_json = append_urllist_netskope.json() invalid_indicators, ipv6_iocs = self._extract_invalid_indicators( diff --git a/netskope/manifest.json b/netskope/manifest.json index 76dc37a5..2e29f18c 100644 --- a/netskope/manifest.json +++ b/netskope/manifest.json @@ -1,92 +1,92 @@ { - "name": "Netskope Threat Exchange", - "id": "netskope", - "netskope": true, - "version": "2.1.0", - "module": "CTE", - "minimum_version": "5.1.0", - "minimum_provider_version": "1.0.0", - "description": "This plugin is used to fetch the File hashes (MD5 and SHA256) and URLs (URL, IPv4, hostname, domain, and FQDN) from the Malware and Malsite alerts available on the Netskope Tenant from Skope IT > Alerts. This plugin also supports sharing File Hashes (MD5 and SHA256) and URLs (URL, IPv4, hostname, domain, and FQDN) indicators to File Hash List (Policies > File), URL List (Policies > Web > URL LISTS) and Private App (Settings > Security Cloud Platform > App Defination > Private App) within Netskope. Consider the maximum size of data that Netskope File Hash List and URL List can hold (8 MB) while configuring the Business Rule.", - "provider_id": "netskope_provider", - "delete_supported": true, - "patch_supported": false, - "supported_subtypes": { - "alerts": [ - "malsite", - "Malware" - ], - "events": [] - }, - "configuration": [ - { - "label": "Enable Polling", - "key": "is_pull_required", - "type": "choice", - "choices": [ - { - "key": "Yes", - "value": "Yes" - }, - { - "key": "No", - "value": "No" - } - ], - "default": "Yes", - "mandatory": true, - "description": "Enable/Disable polling data from Netskope." + "name": "Netskope Threat Exchange", + "id": "netskope", + "netskope": true, + "version": "2.1.0", + "module": "CTE", + "minimum_version": "5.1.0", + "minimum_provider_version": "1.0.0", + "description": "This plugin is used to fetch the File hashes (MD5 and SHA256) and URLs (URL, IPv4, hostname, domain, and FQDN) from the Malware and Malsite alerts available on the Netskope Tenant from Skope IT > Alerts. This plugin also supports sharing File Hashes (MD5 and SHA256) and URLs (URL, IPv4, hostname, domain, and FQDN) indicators to File Hash List (Policies > File), URL List (Policies > Web > URL LISTS) and Private App (Settings > Security Cloud Platform > App Defination > Private App) within Netskope. Consider the maximum size of data that Netskope File Hash List and URL List can hold (8 MB) while configuring the Business Rule.", + "provider_id": "netskope_provider", + "delete_supported": true, + "patch_supported": false, + "supported_subtypes": { + "alerts": [ + "malsite", + "Malware" + ], + "events": [] }, - { - "label": "Types of Threat Data to Pull", - "key": "threat_data_type", - "type": "multichoice", - "choices": [ + "configuration": [ { - "key": "MD5", - "value": "MD5" + "label": "Enable Polling", + "key": "is_pull_required", + "type": "choice", + "choices": [ + { + "key": "Yes", + "value": "Yes" + }, + { + "key": "No", + "value": "No" + } + ], + "default": "Yes", + "mandatory": true, + "description": "Enable/Disable polling data from Netskope." }, { - "key": "SHA256", - "value": "SHA256" + "label": "Types of Threat Data to Pull", + "key": "threat_data_type", + "type": "multichoice", + "choices": [ + { + "key": "MD5", + "value": "MD5" + }, + { + "key": "SHA256", + "value": "SHA256" + }, + { + "key": "URL", + "value": "URL" + } + ], + "mandatory": true, + "default": [ + "MD5", + "SHA256", + "URL" + ], + "description": "Indicators of selected types will be extracted from Netskope malware alerts and stored on CTE." }, { - "key": "URL", - "value": "URL" - } - ], - "mandatory": true, - "default": [ - "MD5", - "SHA256", - "URL" - ], - "description": "Indicators of selected types will be extracted from Netskope malware alerts and stored on CTE." - }, - { - "label": "Initial Range (in days)", - "key": "days", - "type": "number", - "mandatory": true, - "default": 7, - "description": "Number of days to pull the data for the initial run." - }, - { - "label": "Enable Tagging", - "key": "enable_tagging", - "type": "choice", - "choices": [ - { - "key": "Yes", - "value": "yes" + "label": "Initial Range (in days)", + "key": "days", + "type": "number", + "mandatory": true, + "default": 7, + "description": "Number of days to pull the data for the initial run." }, { - "key": "No", - "value": "no" + "label": "Enable Tagging", + "key": "enable_tagging", + "type": "choice", + "choices": [ + { + "key": "Yes", + "value": "yes" + }, + { + "key": "No", + "value": "no" + } + ], + "default": "yes", + "mandatory": true, + "description": "Enable/disable tagging of unshared indicators." } - ], - "default": "yes", - "mandatory": true, - "description": "Enable/disable tagging of unshared indicators." - } - ] -} + ] +} \ No newline at end of file From 60d5e5dcaeb2880c7c7b6aaae36c89e48956fad5 Mon Sep 17 00:00:00 2001 From: tanushreekurup_crest Date: Wed, 30 Oct 2024 17:08:58 +0530 Subject: [PATCH 2/3] CTE MISP 1.5.0 - Added support for IOC Retarction and IOC Source Labelling. --- misp/CHANGELOG.md | 10 + misp/main.py | 888 ++++++++++++++++++++++++++++++++++------ misp/manifest.json | 514 ++++++++++++----------- misp/utils/constants.py | 30 +- misp/utils/helper.py | 9 +- 5 files changed, 1059 insertions(+), 392 deletions(-) diff --git a/misp/CHANGELOG.md b/misp/CHANGELOG.md index c023d7eb..a3af28ff 100644 --- a/misp/CHANGELOG.md +++ b/misp/CHANGELOG.md @@ -1,3 +1,13 @@ +# 1.5.0 +## Added +- Added support for IoC(s) Retraction. +- Added support for IoC Source Labelling. +- Added support for pulling Hostname, Destination IP (ip-dst), Destination IP|Port (ip-dst|port), Domain|IP (domain|ip), Source IP|Port (ip-src|port), and Hostname|Port (hostname|port) from MISP. + +# 1.4.1 +## Added +- Added support for accepting multiple events in the "Exclude IoCs from Event" parameter. + # 1.4.0 ## Added - Added support for pulling indicators on the basis of Decaying Score Threshold, Decaying Model IDs, Published Events, IDS flag and Enforce Warning List IoCs flag. diff --git a/misp/main.py b/misp/main.py index 7a729d24..8a7f5ea0 100644 --- a/misp/main.py +++ b/misp/main.py @@ -64,6 +64,9 @@ PLUGIN_NAME, PLUGIN_VERSION, PULL_PAGE_SIZE, + RETRACTION, + RETRACTION_BATCH, + SHARING_TAG_CONSTANT, ) from .utils.helper import MISPPluginException, MISPPluginHelper @@ -72,6 +75,10 @@ "sha256": IndicatorType.SHA256, "url": IndicatorType.URL, "domain": getattr(IndicatorType, "DOMAIN", IndicatorType.URL), + "ip-src|port": IndicatorType.URL, + "ip-dst|port": IndicatorType.URL, + "hostname": getattr(IndicatorType, "HOSTNAME", IndicatorType.URL), + "hostname|port": IndicatorType.URL, } @@ -96,9 +103,10 @@ def __init__( ) self.plugin_name, self.plugin_version = self._get_plugin_info() self.log_prefix = f"{MODULE_NAME} {self.plugin_name}" + self.config_name = name if name: self.log_prefix = f"{self.log_prefix} [{name}]" - + self.retraction_batch = RETRACTION_BATCH self.misp_helper = MISPPluginHelper( logger=self.logger, plugin_name=self.plugin_name, @@ -127,6 +135,466 @@ def _get_plugin_info(self) -> tuple: ) return (PLATFORM_NAME, PLUGIN_VERSION) + def _retract_attributes(self, attribute_ids, base_url, api_key): + """Make an API call to delete one batch from misp.""" + for event_id, attributes in attribute_ids.items(): + if not attributes: + continue + retracted_count = 0 + try: + event_log = f"event with Event ID '{event_id}'" + resp_json = self.misp_helper.api_helper( + method="POST", + url=f"{base_url}/attributes/deleteSelected/{event_id}", + headers=self.misp_helper.get_header(api_key), + json={"id": attributes, "event_id": event_id}, + logger_msg=( + f"retracting {len(attributes)} indicator(s) " + f"from {event_log} from {PLATFORM_NAME}" + ), + verify=self.ssl_validation, + proxies=self.proxy, + ) + if resp_json.get("success"): + retracted_count += len(attributes) + self.logger.info( + f"{self.log_prefix}: Successfully retracted " + f"{retracted_count} indicator(s) from" + f" {event_log}." + ) + else: + err_msg = resp_json.get("errors") + if err_msg and isinstance(err_msg, str): + log_msg = ( + f"Unable to retract all indicators " + f"from {event_log}. API Error: {err_msg}" + ) + success_pattern = re.compile( + r"(\d+) attributes deleted" + ) + match = success_pattern.search(err_msg) + if match: + retracted_count += int(match.group(1)) + + self.logger.error( + message=log_msg, + details=f"API response: {resp_json}", + ) + else: + self.logger.error( + message=( + f"{self.log_prefix}: Unable to retract " + f"{len(attributes)} indicator(s) " + f"from {event_log}." + ), + details=f"API response: {resp_json}", + ) + except MISPPluginException: + continue + except Exception as exp: + err_mg = ( + "Unexpected error occurred while retracting" + f" {len(attributes)} indicator(s) from " + f"{event_log}. Error: {exp}" + ) + self.logger.error( + message=f"{self.log_prefix}: {err_mg}", + details=traceback.format_exc(), + ) + self.logger.info( + f"Successfully retracted {retracted_count} indicator(s) " + f"for {event_log}." + ) + + def retract_indicators( + self, + retracted_indicators_lists: List[List[Indicator]], + list_action_dict: List[Action], + ): + """Retract indicators from misp.""" + if RETRACTION not in self.log_prefix: + self.log_prefix = self.log_prefix + f" [{RETRACTION}]" + end_time = datetime.now() + retraction_interval = self.configuration.get("retraction_interval") + if not (retraction_interval and isinstance(retraction_interval, int)): + log_msg = ( + "Retraction Interval is not available for the configuration" + f' "{self.config_name}". Skipping retraction of IoC(s)' + f" from {PLATFORM_NAME}." + ) + self.logger.info(f"{self.log_prefix}: {log_msg}") + yield ValidationResult( + success=False, disabled=True, message=log_msg + ) + retraction_interval = int(retraction_interval) + start_time = end_time - timedelta(days=int(retraction_interval)) + start_time = int(start_time.timestamp()) + end_time = int(end_time.timestamp()) + self.logger.info( + f"{self.log_prefix}: Start time for this retract" + f" indicators cycle: {start_time}" + ) + event_ids = [] + base_url, api_key = self.misp_helper.get_credentials( + self.configuration + ) + for inc_event in list_action_dict: + event_id = self._event_exists( + inc_event.parameters.get("event_name"), + base_url, + api_key, + is_retraction=True, + )[1] + event_ids.append(event_id) + if len(event_ids) == 0: + err_msg = ( + "Error occurred while getting event ids for events which" + " are provided in sharing configurations." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise MISPPluginException(err_msg) + + log_event_ids = ", ".join(event_ids) + for retraction_batch in retracted_indicators_lists: + iocs = [ioc.value for ioc in retraction_batch] + available_attributes_id = {} + body = { + "returnFormat": "json", + "limit": PULL_PAGE_SIZE, + "page": 1, + "attribute_timestamp": [str(start_time), str(end_time)], + "eventid": event_ids, + } + last_page = False + while True: + page_ioc_count = 0 + resp_json = self.misp_helper.api_helper( + method="POST", + url=f"{base_url}/attributes/restSearch", + headers=self.misp_helper.get_header(api_key), + json=body, + logger_msg=( + f"pulling indicators for page {body['page']}" + f" from {PLATFORM_NAME}" + ), + verify=self.ssl_validation, + proxies=self.proxy, + is_retraction=True, + ) + + for attr in resp_json.get("response", {}).get( + "Attribute", [] + ): + event_id = attr.get("event_id") + # if attr.get("value") and attr.get("value") not in iocs: + # continue + if attr.get("value") in iocs: + if event_id in available_attributes_id: + available_attributes_id.get(event_id).append( + attr.get("id") + ) + page_ioc_count += 1 + else: + available_attributes_id[event_id] = [ + attr.get("id") + ] + page_ioc_count += 1 + + if ( + len(resp_json.get("response", {}).get("Attribute", [])) + < body["limit"] + ): + last_page = True + + total_ioc_count = sum( + len(attributes) + for attributes in available_attributes_id.values() + ) + self.logger.info( + f"{self.log_prefix}: Successfully pulled {page_ioc_count}" + f" IoC(s) from MISP for page {body['page']} from Event " + f"ID(s) '{log_event_ids}'. Total IoCs: {total_ioc_count}" + ) + if last_page: + break + body["page"] += 1 + + # Run API to remove attributes from misp + self._retract_attributes( + available_attributes_id, base_url, api_key + ) + + # yield indicators, None if last_page else body + yield ValidationResult( + success=True, message="Completed execution for one batch." + ) + + def get_modified_indicators(self, source_indicators): + """Get all modified indicators status. + + Args: + source_indicators (List[List[Dict]]): Source Indicators. + + Yields: + List of retracted indicators, Status (List, bool): List of + retracted indicators values. Status of execution. + """ + self.log_prefix = f"{self.log_prefix} [{RETRACTION}]" + retraction_interval = self.configuration.get("retraction_interval") + if not (retraction_interval and isinstance(retraction_interval, int)): + log_msg = ( + "Retraction Interval is not available for the configuration" + f' "{self.config_name}". Skipping retraction of IoC(s)' + f" from {PLATFORM_NAME}." + ) + self.logger.info(f"{self.log_prefix}: {log_msg}") + yield [], True + + retraction_interval = int(retraction_interval) + pulling_mechanism = self.configuration.get( + "pulling_mechanism", "incremental" + ) + end_time = datetime.now() + for source_ioc_list in source_indicators: + source_unique_iocs = set() + for ioc in source_ioc_list: + source_unique_iocs.add(ioc.value) + self.logger.info( + f"{self.log_prefix}: Getting modified indicators status" + f" for {len(source_unique_iocs)} indicator(s) from" + f" {PLATFORM_NAME}." + ) + + if pulling_mechanism == "look_back": + look_back = self.configuration.get("look_back", 24) + if look_back is None: + err_msg = ( + "Look Back is a required configuration " + 'parameter when "Look Back" is selected as ' + "Pulling Mechanism." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise MISPPluginException(err_msg) + elif ( + not isinstance(look_back, int) + or look_back <= 0 + or look_back > MAX_LOOK_BACK + ): + err_msg = ( + "Invalid value for Look Back provided in" + " configuration parameters. Valid value should be " + "an integer in range 1-8760 i.e. 1 year." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise MISPPluginException(err_msg) + else: + start_time = end_time - timedelta(hours=int(look_back)) + else: + start_time = end_time - timedelta(days=retraction_interval) + + # create set of excluded events for + event_ids = [] + include_event_name = self.configuration.get("include_event_name") + exclude_event = self.configuration.get("event_name", "") + exclude_events = [] + if exclude_event: + exclude_events = [ + event + for event in exclude_event.strip().split(",") + if event + ] + base_url, api_key = self.misp_helper.get_credentials( + self.configuration + ) + + if include_event_name: + for inc_event in include_event_name.strip().split(","): + event_id = self._event_exists( + inc_event, base_url, api_key, is_retraction=True + )[1] + event_ids.append(event_id) + + misp_tags = [f"!{DEFAULT_IOC_TAG}"] + tags = self.configuration.get("tags", "").strip() + if tags: + misp_tags.extend(tags.split(",")) + + body = { + "returnFormat": "json", + "limit": PULL_PAGE_SIZE, + "page": 1, + "attribute_timestamp": [ + int(start_time.timestamp()), + int(end_time.timestamp()), + ], + # Filter attributes based on type, category and tags + "category": self.configuration.get("attr_category"), + "type": self.configuration.get("attr_type"), + "tags": misp_tags, + "includeDecayScore": 1, + } + published = self.configuration.get("published", []) + if published == ["published"]: + body["published"] = 1 + elif published == ["unpublished"]: + body["published"] = 0 + + to_ids = self.configuration.get("to_ids", []) + if to_ids == ["enabled"]: + body["to_ids"] = 1 + elif to_ids == ["disabled"]: + body["to_ids"] = 0 + + enforce_warning_list = self.configuration.get( + "enforce_warning_list", "no" + ).strip() + if enforce_warning_list == "yes": + body["enforceWarninglist"] = 1 + elif enforce_warning_list == "no": + body["enforceWarninglist"] = 0 + + if event_ids: + body["eventid"] = event_ids + score_threshold = self.configuration.get("score_threshold") + decaying_models = ( + self.configuration.get("decaying_models", "") + .strip() + .split(",") + ) + if score_threshold is not None: + model_ids = [ + int(model_id) for model_id in decaying_models if model_id + ] + score_params = { + "excludeDecayed": 1, + "decayingModel": model_ids, + "modelOverrides": {"threshold": score_threshold}, + } + body.update(score_params) + + last_page = False + while True: + indicators = set() + try: + resp_json = self.misp_helper.api_helper( + method="POST", + url=f"{base_url}/attributes/restSearch", + headers=self.misp_helper.get_header(api_key), + json=body, + logger_msg=( + f"pulling indicators for page {body['page']}" + f" to check their existence on {PLATFORM_NAME}" + ), + verify=self.ssl_validation, + proxies=self.proxy, + is_retraction=True, + ) + + for attr in resp_json.get("response", {}).get( + "Attribute", [] + ): + if ( + attr.get("Event", {}).get("info", "") + in exclude_events + or attr.get("Event", {}).get("id") + in exclude_events + ): + + continue + + if attr.get("type") == "domain|ip": + iocs = attr.get("value", "").split("|") + for ioc in iocs: + if ioc: + indicators.add(ioc) + elif attr.get("type") in ATTRIBUTE_TYPES: + # Filter already pushed attributes/indicators + indicators.add(attr.get("value")) + + if ( + len( + resp_json.get("response", {}).get("Attribute", []) + ) + < body["limit"] + ): + last_page = True + # remove existing indicators. + source_unique_iocs = source_unique_iocs - indicators + self.logger.info( + f"{self.log_prefix}: Successfully fetched " + f"{len(indicators)} indicator(s) in " + f"page {body['page']}." + ) + body["page"] += 1 + # yield indicators, None if last_page else body + if last_page or len(source_unique_iocs) == 0: + break + except MISPPluginException: + raise + except Exception as exp: + err_msg = ( + f"Unexpected error occurred while pulling " + f"indicators for page {body['page']} " + f"from {PLATFORM_NAME}. Error: {exp}" + ) + self.logger.error( + message=f"{self.log_prefix}: {err_msg}", + details=str(traceback.format_exc()), + ) + raise MISPPluginException(err_msg) + + yield list(source_unique_iocs), False + + def _get_ioc_type_from_attribute(self, attribute_value): + """Get IoC type from attribute.""" + if self._is_valid_ipv4(attribute_value): + return getattr( + IndicatorType, + "IPV4", + IndicatorType.URL, + ) + elif self._is_valid_ipv6(attribute_value): + return getattr( + IndicatorType, + "IPV6", + IndicatorType.URL, + ) + elif self._is_valid_domain(attribute_value): + return getattr( + IndicatorType, + "DOMAIN", + IndicatorType.URL, + ) + else: + return getattr( + IndicatorType, + "URL", + IndicatorType.URL, + ) + + def _get_decaying_comment(self, decay_score, comment) -> str: + score_comment = [] + if decay_score: + for decay in decay_score: + model = decay.get("DecayingModel", {}) + if decay.get("score"): + cmt = ( + "Decaying Score: " + + str(round(decay.get("score"), 2)) + + ", Decaying Model ID: " + + str(model.get("id", "Unknown")) + + ", Decaying Model Name: " + + str(model.get("name", "Unknown")) + ) + score_comment.append(cmt) + + if score_comment: + if comment: + comment += str(f" | {' | '.join(score_comment)}") # noqa + else: + comment += f"{' | '.join(score_comment)}" # noqa + return comment + def _pull(self): """Pull indicators from MISP.""" existing_tag = f"{self.name} Latest" @@ -140,7 +608,7 @@ def _pull(self): start_time = None if pulling_mechanism == "look_back": look_back = self.configuration.get("look_back", 24) - if not look_back: + if look_back is None: err_msg = ( "Look Back is a required configuration " 'parameter when "Look Back" is selected as ' @@ -148,10 +616,25 @@ def _pull(self): ) self.logger.error(f"{self.log_prefix}: {err_msg}") raise MISPPluginException(err_msg) + elif ( + not isinstance(look_back, int) + or look_back <= 0 + or look_back > MAX_LOOK_BACK + ): + err_msg = ( + "Invalid value for Look Back provided in" + " configuration parameters. Valid value should be " + "an integer in range 1-8760 i.e. 1 year." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise MISPPluginException(err_msg) + else: # Removing the Latest tag from the existing # indicators - query = {"sources": {"$elemMatch": {"source": f"{self.name}"}}} + query = { + "sources": {"$elemMatch": {"source": f"{self.name}"}} + } TagUtils().on_indicators(query).remove(existing_tag) start_time = end_time - timedelta(hours=int(look_back)) if self.last_run_at and self.last_run_at < start_time: @@ -179,7 +662,7 @@ def _pull(self): # create set of excluded events for event_ids = [] include_event_name = self.configuration.get("include_event_name") - exclude_event = self.configuration.get("event_name") + exclude_event = self.configuration.get("event_name", "") base_url, api_key = self.misp_helper.get_credentials( self.configuration ) @@ -188,6 +671,11 @@ def _pull(self): for inc_event in include_event_name.strip().split(","): event_id = self._event_exists(inc_event, base_url, api_key)[1] event_ids.append(event_id) + exclude_events = [] + if exclude_event: + exclude_events = [ + event for event in exclude_event.strip().split(",") if event + ] # Convert to epoch if start_time: @@ -205,7 +693,7 @@ def _pull(self): "returnFormat": "json", "limit": PULL_PAGE_SIZE, "page": 1, - "timestamp": [str(start_time), str(end_time)], + "attribute_timestamp": [str(start_time), str(end_time)], # Filter attributes based on type, category and tags "category": self.configuration.get("attr_category"), "type": self.configuration.get("attr_type"), @@ -268,6 +756,7 @@ def _pull(self): "ipv4": 0, "ipv6": 0, "url": 0, + "hostname": 0, } page_skip_count = 0 indicators, skipped_tags = [], [] @@ -285,15 +774,22 @@ def _pull(self): proxies=self.proxy, ) - for attr in resp_json.get("response", {}).get("Attribute", []): + for attr in resp_json.get("response", {}).get( + "Attribute", [] + ): + + if ( + attr.get("Event", {}).get("info", "") + in exclude_events + or attr.get("Event", {}).get("id") in exclude_events + ): + + continue if ( - attr.get("type") in ATTRIBUTE_TYPES + attr.get("type") + in ATTRIBUTE_TYPES # Filter already pushed attributes/indicators - and ( - attr.get("Event", {}).get("info", "") - != exclude_event - ) ): # Deep link of event corresponding to the attribute @@ -315,65 +811,78 @@ def _pull(self): skipped_tags.extend(skipped) if pulling_mechanism == "look_back" and look_back: tags.append(new_indicator_tag) - ioc_type = ioc_type = MISP_TO_INTERNAL_TYPE.get( - attr.get("type") - ) - if attr.get( - "type" - ) == "ip-src" and self._is_valid_ipv4( - attr.get("value") - ): - ioc_type = getattr( - IndicatorType, "IPV4", IndicatorType.URL + + if not attr.get("value"): + page_skip_count += 1 + continue + + if attr.get("type") in [ + "ip-src", + "ip-dst", + ]: + ioc_type = self._get_ioc_type_from_attribute( + attr.get("value") ) - elif attr.get( - "type" - ) == "ip-src" and self._is_valid_ipv6( - attr.get("value") - ): - ioc_type = getattr( - IndicatorType, "IPV6", IndicatorType.URL + else: + ioc_type = MISP_TO_INTERNAL_TYPE.get( + attr.get("type") ) + try: - score_comment = [] + # Get decaying score decay_score = attr.get("decay_score", []) - if decay_score: - for decay in decay_score: - model = decay.get("DecayingModel", {}) - if decay.get("score"): - cmt = ( - "Decaying Score: " - + str(round(decay.get("score"), 2)) - + ", Decaying Model ID: " - + str(model.get("id", "Unknown")) - + ", Decaying Model Name: " - + str(model.get("name", "Unknown")) - ) - score_comment.append(cmt) - comment = attr.get("comment", "") - if score_comment: - if comment: - comment += str( - f" | {' | '.join(score_comment)}" # noqa + # Get comments + comment = self._get_decaying_comment( + decay_score, attr.get("comment", "") + ) + + first_seen = attr.get("first_seen", None) + if first_seen: + first_seen = datetime.fromisoformat( + first_seen + ) + last_seen = attr.get("last_seen", None) + if last_seen: + last_seen = datetime.fromisoformat(last_seen) + if attr.get("type") == "domain|ip": + iocs = attr.get("value", "").split("|") + for ioc in iocs: + if not ioc: + # Skip IoC creation if IoC value + # is none or empty string + page_skip_count += 1 + continue + ioc_type = ( + self._get_ioc_type_from_attribute(ioc) ) - else: - comment += ( - f"{' | '.join(score_comment)}" # noqa + indicators.append( + Indicator( + value=ioc, + type=ioc_type, + firstSeen=first_seen, + lastSeen=last_seen, + comments=comment, + tags=tags, + extendedInformation=deep_link, + ) + ) + ioc_counts[ioc_type] += 1 + total_ioc_count += 1 + else: + indicators.append( + Indicator( + value=attr.get("value"), + type=ioc_type, + firstSeen=first_seen, + lastSeen=last_seen, + comments=comment, + tags=tags, + extendedInformation=deep_link, ) - - indicators.append( - Indicator( - value=attr.get("value"), - type=ioc_type, - firstSeen=attr.get("first_seen", None), - comments=comment, - tags=tags, - extendedInformation=deep_link, ) - ) - ioc_counts[ioc_type] += 1 - total_ioc_count += 1 + ioc_counts[ioc_type] += 1 + total_ioc_count += 1 except (ValidationError, Exception) as error: page_skip_count += 1 error_message = ( @@ -381,12 +890,14 @@ def _pull(self): if isinstance(error, ValidationError) else "Unexpected error occurred" ) + attr_id = attr.get("id") self.logger.error( message=( f"{self.log_prefix}: {error_message} while" - " creating indicator for page " - f"{body['page']}. This record will be" - f" skipped. Error: {error}." + f" creating indicator from attribute " + f"having ID {attr_id} for page " + f"{body['page']}. This record will be " + f"skipped. Error: {error}." ), details=str(traceback.format_exc()), ) @@ -425,15 +936,17 @@ def _pull(self): break except MISPPluginException: raise - except Exception: + except Exception as exp: err_msg = ( f"Unexpected error occurred while pulling " - f"indicators for page {body['page']} from {PLATFORM_NAME}." + f"indicators for page {body['page']} " + f"from {PLATFORM_NAME}. Error: {exp}" ) self.logger.error( message=f"{self.log_prefix}: {err_msg}", details=str(traceback.format_exc()), ) + raise MISPPluginException(err_msg) def pull(self) -> List[Indicator]: if hasattr(self, "sub_checkpoint"): @@ -605,6 +1118,7 @@ def _event_exists( base_url: str, api_key: str, is_validation: bool = False, + is_retraction: bool = False, ) -> tuple: """Check if event exists on MISP instance. @@ -612,6 +1126,10 @@ def _event_exists( event_name (str): MISP event name. base_url (str): Base URL. api_key (str): Authentication Key + is_validation (bool, optional): Is validation. + Defaults to False. + is_retraction (bool, optional): Is retraction. + Defaults to False. Returns: tuple: True if exists else False, event_id @@ -634,11 +1152,12 @@ def _event_exists( logger_msg=logger_msg, verify=self.ssl_validation, proxies=self.proxy, + is_retraction=is_retraction, ) if resp_json.get("response", []): - return True, resp_json.get("response")[0].get("Event", {}).get( - "id", None - ) + return True, resp_json.get("response")[0].get( + "Event", {} + ).get("id", None) return False, None except MISPPluginException: if is_validation: @@ -738,7 +1257,9 @@ def _update_event( ) return False - def _is_tag_exists(self, base_url: str, api_key: str) -> bool: + def _is_tag_exists( + self, base_url: str, api_key: str, tag_name: str + ) -> bool: """Is netskope-ce tag exists on MISP. Args: @@ -748,7 +1269,7 @@ def _is_tag_exists(self, base_url: str, api_key: str) -> bool: Returns: bool: True if tag exists else False. """ - endpoint = f"{base_url}/tags/search/{DEFAULT_IOC_TAG}" + endpoint = f"{base_url}/tags/search/{tag_name}" headers = self.misp_helper.get_header(api_key) resp_json = self.misp_helper.api_helper( method="POST", @@ -758,16 +1279,13 @@ def _is_tag_exists(self, base_url: str, api_key: str) -> bool: verify=self.ssl_validation, proxies=self.proxy, logger_msg=( - f"checking existence of '{DEFAULT_IOC_TAG}' tag " + f"checking existence of '{tag_name}' tag " f"on {PLATFORM_NAME}" ), ) - if ( - resp_json - and resp_json[0].get("Tag", {}).get("name") == DEFAULT_IOC_TAG - ): + if resp_json and resp_json[0].get("Tag", {}).get("name") == tag_name: self.logger.debug( - f"{self.log_prefix}: '{DEFAULT_IOC_TAG}' tag " + f"{self.log_prefix}: '{tag_name}' tag " f"exists on {PLATFORM_NAME}." ) return True @@ -775,7 +1293,12 @@ def _is_tag_exists(self, base_url: str, api_key: str) -> bool: return False def push( - self, indicators: List[Indicator], action_dict: Dict + self, + indicators: List[Indicator], + action_dict: Dict, + source: str = None, + business_rule: str = None, + plugin_name: str = None, ) -> PushResult: """Push given indicators to MISP Event. @@ -800,46 +1323,64 @@ def push( ) self.logger.error(f"{self.log_prefix}: {err_msg}") raise MISPPluginException(err_msg) - - event_name = action_dict.get("parameters", {}).get("event_name") - # Check if event already exists base_url, api_key = self.misp_helper.get_credentials( self.configuration ) - - result = self._is_tag_exists(base_url, api_key) - if not result: - # Create it - endpoint = f"{base_url}/tags/add" - body = {"name": DEFAULT_IOC_TAG, "colour": "#ff0000"} - headers = self.misp_helper.get_header(api_key) - resp_json = self.misp_helper.api_helper( - method="POST", - url=endpoint, - headers=headers, - verify=self.ssl_validation, - proxies=self.proxy, - logger_msg=( - f"creating '{DEFAULT_IOC_TAG}' tag on {PLATFORM_NAME}" - ), - json=body, + source_label_tag = ( + f"{SHARING_TAG_CONSTANT} | {plugin_name}" if plugin_name else None + ) + default_tags_to_send = [DEFAULT_IOC_TAG] + if source_label_tag and len(source_label_tag) <= 255: + default_tags_to_send.append(source_label_tag) + else: + self.logger.info( + f"{self.log_prefix}: Skipped adding source label tag" + f" {source_label_tag} to IoCs as it exceeds MISP's 255 " + "character tag limit." ) - if ( - resp_json - and resp_json.get("Tag", {}).get("name") == DEFAULT_IOC_TAG - ): - self.logger.info( - f"{self.log_prefix}: Successfully created " - f"'{DEFAULT_IOC_TAG}' tag on {PLATFORM_NAME}." - ) - else: - err_msg = ( - f"Unable to create '{DEFAULT_IOC_TAG}' " - f"tag on {PLATFORM_NAME}." + + for tag_name in default_tags_to_send: + result = self._is_tag_exists(base_url, api_key, tag_name) + if not result: + # Create it + endpoint = f"{base_url}/tags/add" + body = {"name": tag_name, "colour": "#ff0000"} + headers = self.misp_helper.get_header(api_key) + resp_json = self.misp_helper.api_helper( + method="POST", + url=endpoint, + headers=headers, + verify=self.ssl_validation, + proxies=self.proxy, + logger_msg=( + f"creating '{tag_name}' tag on {PLATFORM_NAME}" + ), + json=body, ) - self.logger.error(f"{self.log_prefix}: {err_msg}") - raise MISPPluginException(err_msg) + if ( + resp_json + and resp_json.get("Tag", {}).get("name") == tag_name + ): + self.logger.info( + f"{self.log_prefix}: Successfully created " + f"'{tag_name}' tag on {PLATFORM_NAME}." + ) + else: + err_msg = ( + f"Unable to create '{tag_name}' " + f"tag on {PLATFORM_NAME}." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise MISPPluginException(err_msg) + tags_payload = [ + {"name": tag_name} for tag_name in default_tags_to_send + ] + event_name = action_dict.get("parameters", {}).get("event_name") + ip_ioc_type = action_dict.get("parameters", {}).get( + "ip_ioc_type", "ip-src" + ) + # Check if event already exists exists, event_id = self._event_exists( event_name=event_name, base_url=base_url, @@ -851,17 +1392,15 @@ def push( action_dict = action_dict.get("parameters") for indicator in indicators: ioc_type = indicator.type.value - if ioc_type in BIFURCATE_INDICATOR_TYPES: - if ( - self._is_valid_domain(indicator.value) - or self._is_valid_fqdn(indicator.value) - or self.is_valid_hostname(indicator.value) - ): - ioc_type = "domain" - elif self._is_valid_ipv4( + if ioc_type == "hostname": + ioc_type = "hostname" + elif ioc_type in ["domain", "fqdn"]: + ioc_type = "domain" + elif ioc_type in BIFURCATE_INDICATOR_TYPES: + if self._is_valid_ipv4( indicator.value ) or self._is_valid_ipv6(indicator.value): - ioc_type = "ip-src" + ioc_type = ip_ioc_type else: ioc_type = "url" @@ -880,7 +1419,7 @@ def push( if indicator.lastSeen else None ), - "Tag": [{"name": DEFAULT_IOC_TAG}], + "Tag": tags_payload, } ) @@ -1057,6 +1596,13 @@ def validate(self, configuration: dict) -> ValidationResult: events_to_include = include_event_name.split(",") event_to_exclude = configuration.get("event_name", "").strip() + exclude_events = [] + if event_to_exclude: + exclude_events = [ + event.strip() + for event in event_to_exclude.strip().split(",") + if event.strip() + ] for event in events_to_include: event = event.strip() @@ -1071,7 +1617,7 @@ def validate(self, configuration: dict) -> ValidationResult: ) return ValidationResult(success=False, message=err_msg) - if event == event_to_exclude: + if event in exclude_events: err_msg = ( f"{event} is present in Event Names and " "Exclude IoCs from Event. Event Names and Exclude" @@ -1086,9 +1632,24 @@ def validate(self, configuration: dict) -> ValidationResult: message=err_msg, ) try: - exist = self._event_exists( + exist, event_id = self._event_exists( event, base_url, api_key, is_validation=True - )[0] + ) + if event_id in exclude_events: + err_msg = ( + f"{event} is present in Event Names and " + "Exclude IoCs from Event. Event Names and Exclude" + " IoCs from Event can't contain same value " + "of event." + ) + self.logger.error( + f"{self.log_prefix}: {validation_err_msg}." + f" {err_msg}." + ) + return ValidationResult( + success=False, + message=err_msg, + ) except Exception as exp: err_msg = ( f"Unable to check the existence of {event}" @@ -1167,7 +1728,9 @@ def validate(self, configuration: dict) -> ValidationResult: "by commas." ) self.logger.error(f"{self.log_prefix}: {err_msg}") - return ValidationResult(success=False, message=err_msg) + return ValidationResult( + success=False, message=err_msg + ) except Exception as exp: err_msg = ( @@ -1207,6 +1770,25 @@ def validate(self, configuration: dict) -> ValidationResult: ) return ValidationResult(success=False, message=err_msg) + retraction_days = configuration.get("retraction_interval") + if retraction_days: + if ( + not isinstance(retraction_days, int) + or int(retraction_days) <= 0 + or int(retraction_days) > INTEGER_THRESHOLD + ): + err_msg = ( + "Invalid Retraction Interval provided in configuration" + " parameters. Valid value should be in range 1 to 2^62." + ) + self.logger.error( + f"{self.log_prefix}: {validation_err_msg}. {err_msg}" + ) + return ValidationResult( + success=False, + message=err_msg, + ) + enable_tagging = configuration.get("enable_tagging", "").strip() if not enable_tagging: err_msg = "Enable Tagging is a required configuration parameter." @@ -1251,7 +1833,7 @@ def validate(self, configuration: dict) -> ValidationResult: f"{self.log_prefix}: {validation_err_msg}. {err_msg}" ) return ValidationResult(success=False, message=err_msg) - elif pulling_mechanism == "look_back" and not look_back: + elif pulling_mechanism == "look_back" and look_back is None: err_msg = ( "Look Back is a required configuration " 'parameter when "Look Back" is selected as ' @@ -1294,8 +1876,12 @@ def validate(self, configuration: dict) -> ValidationResult: return ValidationResult(success=False, message=err_msg) days = configuration.get("days") - if days is None: - err_msg = "Initial Range is a required configuration parameter." + if pulling_mechanism == "incremental" and days is None: + err_msg = ( + "Initial Range is a required configuration parameter." + ' When "Incremental" is selected as ' + "Pulling Mechanism." + ) self.logger.error(f"{self.log_prefix}: {err_msg}") return ValidationResult( success=False, @@ -1321,11 +1907,9 @@ def validate(self, configuration: dict) -> ValidationResult: message=err_msg, ) - self.logger.debug( - f"{self.log_prefix}: Successfully validated credentials " - f"for {PLATFORM_NAME} plugin." + return ValidationResult( + success=True, message="Validation successful." ) - return ValidationResult(success=True, message="Validation successful.") def _validate_auth( self, @@ -1391,7 +1975,24 @@ def validate_action(self, action: Action): err_msg = "Invalid Event Name provided in action parameters." self.logger.error(f"{self.log_prefix}: {err_msg}") return ValidationResult(success=False, message=err_msg) - self.logger.info( + + ip_ioc_type = action.parameters.get("ip_ioc_type", "ip-src") + if not ip_ioc_type: + err_msg = "Invalid Type of IoC provided in action parameters." + self.logger.error(f"{self.log_prefix}: {err_msg}") + return ValidationResult(success=False, message=err_msg) + elif not isinstance(ip_ioc_type, str) or ip_ioc_type not in [ + "ip-src", + "ip-dst", + ]: + err_msg = ( + "Invalid Type of IoC provided in action parameters. Valid" + " values are Source IP (ip-src) and Destination IP (ip-dst)." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + return ValidationResult(success=False, message=err_msg) + + self.logger.debug( f"{self.log_prefix}: Successfully saved Action configuration." ) return ValidationResult( @@ -1412,5 +2013,20 @@ def get_action_fields(self, action: Action): "Name of the MISP Event in which the " "attributes/indicators are to be pushed." ), - } + }, + { + "label": "Type of IPv4 or IPv6 IoC to be shared", + "key": "ip_ioc_type", + "type": "choice", + "mandatory": True, + "choices": [ + {"key": "Source IP (ip-src)", "value": "ip-src"}, + {"key": "Destination IP (ip-dst)", "value": "ip-dst"}, + ], + "default": "ip-src", + "description": ( + "Select the IoC type to which IPv4 or IPv6" + " addresses should be shared." + ), + }, ] diff --git a/misp/manifest.json b/misp/manifest.json index d446f11c..f5e65b72 100644 --- a/misp/manifest.json +++ b/misp/manifest.json @@ -1,275 +1,309 @@ { - "name": "MISP", - "id": "misp", - "version": "1.4.0", - "description": "This plugin is used to fetch event attributes from MISP (Malware Information Sharing Platform) and extract indicators of type SHA256, MD5, URL, Domain, IP (IPv4 and IPv6) from them. It can also share the indicators of type SHA256, MD5, URL, Domain (Domain, FQDN and Hostname), IP (IPv4 and IPv6) as attributes to MISP Custom Events. To get required details for creating a new configuration, navigate to https:///events/automation.", - "patch_supported": true, - "push_supported": true, - "configuration": [ - { - "label": "MISP Base URL", - "key": "base_url", - "type": "text", - "mandatory": true, - "default": "", - "description": "Base URL for MISP instance." - }, - { - "label": "Authentication Key", - "key": "api_key", - "type": "password", - "mandatory": true, - "default": "", - "description": "Authentication Key for MISP instance. Authentication Key can be generated from 'Administration > List Auth Keys'." - }, - { - "label": "MISP Attribute Type", - "key": "attr_type", - "type": "multichoice", - "choices": [ + "name": "MISP", + "id": "misp", + "version": "1.5.0", + "description": "This plugin is used to fetch event attributes from MISP (Malware Information Sharing Platform) and extract indicators of type SHA256, MD5, URL, Domain, IP (IPv4 and IPv6) and Hostname from them. It can also share the indicators of type SHA256, MD5, URL, Domain (Domain and FQDN), Hostname, IP (IPv4 and IPv6) as attributes to MISP Custom Events. To get required details for creating a new configuration, navigate to https:///events/automation.\n\nNote: The Source IP (ip-src) and Destination IP (ip-dst) will be stored as either IPv4 or IPv6 in Cloud Exchange. Source IP|Port (ip-src|port), Destination IP|Port (ip-dst|port), and Hostname|Port (hostname|port) will be stored as URLs in Cloud Exchange. For Domain|IP, the domain and IP (either IPv4 or IPv6) will be split and stored as separate IoCs in Cloud Exchange.", + "patch_supported": true, + "push_supported": true, + "delete_supported": true, + "fetch_retraction_info": true, + "module": "CTE", + "configuration": [ { - "key": "MD5", - "value": "md5" + "label": "MISP Base URL", + "key": "base_url", + "type": "text", + "mandatory": true, + "default": "", + "description": "Base URL for MISP instance." }, { - "key": "SHA256", - "value": "sha256" + "label": "Authentication Key", + "key": "api_key", + "type": "password", + "mandatory": true, + "default": "", + "description": "Authentication Key for MISP instance. Authentication Key can be generated from 'Administration > List Auth Keys'." }, { - "key": "IP (IPv4 and IPv6)", - "value": "ip-src" + "label": "MISP Attribute Type", + "key": "attr_type", + "type": "multichoice", + "choices": [ + { + "key": "MD5", + "value": "md5" + }, + { + "key": "SHA256", + "value": "sha256" + }, + { + "key": "Source IP", + "value": "ip-src" + }, + { + "key": "Source IP|Port", + "value": "ip-src|port" + }, + { + "key": "Destination IP", + "value": "ip-dst" + }, + { + "key": "Destination IP|Port", + "value": "ip-dst|port" + }, + { + "key": "URL", + "value": "url" + }, + { + "key": "Domain", + "value": "domain" + }, + { + "key": "Domain|IP", + "value": "domain|ip" + }, + { + "key": "Hostname", + "value": "hostname" + }, + { + "key": "Hostname|Port", + "value": "hostname|port" + } + ], + "mandatory": false, + "default": [], + "description": "Indicators from only specified Attribute types will be fetched. Keep empty to fetch indicators of all Types. Multiple Types are accepted." }, { - "key": "URL", - "value": "url" + "label": "MISP Attribute Category", + "key": "attr_category", + "type": "multichoice", + "choices": [ + { + "key": "Internal reference", + "value": "Internal reference" + }, + { + "key": "Targeting data", + "value": "Targeting data" + }, + { + "key": "Antivirus detection", + "value": "Antivirus detection" + }, + { + "key": "Payload delivery", + "value": "Payload delivery" + }, + { + "key": "Artifacts dropped", + "value": "Artifacts dropped" + }, + { + "key": "Payload installation", + "value": "Payload installation" + }, + { + "key": "Persistence mechanism", + "value": "Persistence mechanism" + }, + { + "key": "Network activity", + "value": "Network activity" + }, + { + "key": "Payload type", + "value": "Payload type" + }, + { + "key": "Attribution", + "value": "Attribution" + }, + { + "key": "External analysis", + "value": "External analysis" + }, + { + "key": "Financial fraud", + "value": "Financial fraud" + }, + { + "key": "Support Tool", + "value": "Support Tool" + }, + { + "key": "Social network", + "value": "Social network" + }, + { + "key": "Person", + "value": "Person" + }, + { + "key": "Other", + "value": "Other" + } + ], + "mandatory": false, + "default": [], + "description": "Indicators from only specified Attribute Categories will be fetched. Keep empty to fetch indicators of all Categories. Multiple Categories are accepted." }, { - "key": "DOMAIN", - "value": "domain" - } - ], - "mandatory": false, - "default": [], - "description": "Indicators from only specified Attribute types will be fetched. Keep empty to fetch indicators of all Types. Multiple Types are accepted." - }, - { - "label": "MISP Attribute Category", - "key": "attr_category", - "type": "multichoice", - "choices": [ - { - "key": "Internal reference", - "value": "Internal reference" - }, - { - "key": "Targeting data", - "value": "Targeting data" - }, - { - "key": "Antivirus detection", - "value": "Antivirus detection" - }, - { - "key": "Payload delivery", - "value": "Payload delivery" - }, - { - "key": "Artifacts dropped", - "value": "Artifacts dropped" - }, - { - "key": "Payload installation", - "value": "Payload installation" - }, - { - "key": "Persistence mechanism", - "value": "Persistence mechanism" + "label": "MISP Attribute Tags", + "key": "tags", + "type": "text", + "mandatory": false, + "default": "", + "description": "Indicators from only specified comma separated Tags will be fetched. Keep empty to fetch indicators of all Tags. Dynamic values are accepted." }, { - "key": "Network activity", - "value": "Network activity" + "label": "Event Names", + "key": "include_event_name", + "type": "text", + "mandatory": false, + "default": "", + "description": "Indicators from only the specified comma separated event names will be fetched. Keep empty to pull indicators from all the events." }, { - "key": "Payload type", - "value": "Payload type" + "label": "Exclude IoCs from Events", + "key": "event_name", + "type": "text", + "mandatory": true, + "default": "", + "description": "Indicators attached to the provided comma-separated events will be ignored while pulling data from MISP. Expected value is comma-separated event names or event IDs." }, { - "key": "Attribution", - "value": "Attribution" + "label": "IoC Event Type", + "key": "published", + "type": "multichoice", + "choices": [ + { + "key": "Published", + "value": "published" + }, + { + "key": "Unpublished", + "value": "unpublished" + } + ], + "default": [], + "mandatory": false, + "description": "Indicators will be pulled based on the selected event type. Keep empty to fetch all IoCs." }, { - "key": "External analysis", - "value": "External analysis" + "label": "Decaying Score Threshold", + "key": "score_threshold", + "type": "number", + "mandatory": false, + "description": "Only indicators having Decaying Score greater then provided value will be pulled. Value should be in range of 0 to 100." }, { - "key": "Financial fraud", - "value": "Financial fraud" + "label": "Decaying Model IDs", + "key": "decaying_models", + "type": "text", + "mandatory": false, + "default": "", + "description": "Decaying scores of only specified comma separated decaying models will be tracked. Keep blank to fetch scores of all Decaying Models. Decaying Model IDs can be found from 'Global Actions > List Decaying Models'." }, { - "key": "Support Tool", - "value": "Support Tool" + "label": "Filter on IDS flag", + "key": "to_ids", + "type": "multichoice", + "choices": [ + { + "key": "Enabled", + "value": "enabled" + }, + { + "key": "Disabled", + "value": "disabled" + } + ], + "default": [], + "mandatory": false, + "description": "Only pull IoCs having IDS flag enabled. Keep blank to fetch all IoCs." }, { - "key": "Social network", - "value": "Social network" + "label": "Enforce Warning List IoCs", + "key": "enforce_warning_list", + "type": "choice", + "choices": [ + { + "key": "Yes", + "value": "yes" + }, + { + "key": "No", + "value": "no" + } + ], + "default": "no", + "mandatory": false, + "description": "Remove any IoC from the events that would cause a hit on a warning list entry." }, { - "key": "Person", - "value": "Person" + "label": "Pulling Mechanism", + "key": "pulling_mechanism", + "type": "choice", + "choices": [ + { + "key": "Incremental", + "value": "incremental" + }, + { + "key": "Look Back", + "value": "look_back" + } + ], + "default": "incremental", + "mandatory": true, + "description": "Pulling mechanism used to fetch the indicators from MISP." }, { - "key": "Other", - "value": "Other" - } - ], - "mandatory": false, - "default": [], - "description": "Indicators from only specified Attribute Categories will be fetched. Keep empty to fetch indicators of all Categories. Multiple Categories are accepted." - }, - { - "label": "MISP Attribute Tags", - "key": "tags", - "type": "text", - "mandatory": false, - "default": "", - "description": "Indicators from only specified comma separated Tags will be fetched. Keep empty to fetch indicators of all Tags. Dynamic values are accepted." - }, - { - "label": "Event Names", - "key": "include_event_name", - "type": "text", - "mandatory": false, - "default": "", - "description": "Indicators from only the specified comma separated event names will be fetched. Keep empty to pull indicators from all the events." - }, - { - "label": "Exclude IoCs from Event", - "key": "event_name", - "type": "text", - "mandatory": true, - "default": "", - "description": "Indicators attached to this event will be ignored while pulling data from MISP." - }, - { - "label": "IoC Event Type", - "key": "published", - "type": "multichoice", - "choices": [ - { - "key": "Published", - "value": "published" - }, - { - "key": "Unpublished", - "value": "unpublished" - } - ], - "default": [], - "mandatory": false, - "description": "Indicators will be pulled based on the selected event type. Keep empty to fetch all IoCs." - }, - { - "label": "Decaying Score Threshold", - "key": "score_threshold", - "type": "number", - "mandatory": false, - "description": "Only indicators having Decaying Score greater then provided value will be pulled. Value should be in range of 0 to 100." - }, - { - "label": "Decaying Model IDs", - "key": "decaying_models", - "type": "text", - "mandatory": false, - "default": "", - "description": "Decaying scores of only specified comma separated decaying models will be tracked. Keep blank to fetch scores of all Decaying Models. Decaying Model IDs can be found from 'Global Actions > List Decaying Models'." - }, - { - "label": "Filter on IDS flag", - "key": "to_ids", - "type": "multichoice", - "choices": [ - { - "key": "Enabled", - "value": "enabled" - }, - { - "key": "Disabled", - "value": "disabled" - } - ], - "default": [], - "mandatory": false, - "description": "Only pull IoCs having IDS flag enabled. Keep blank to fetch all IoCs." - }, - { - "label": "Enforce Warning List IoCs", - "key": "enforce_warning_list", - "type": "choice", - "choices": [ - { - "key": "Yes", - "value": "yes" + "label": "Look Back (in hours)", + "key": "look_back", + "type": "number", + "mandatory": false, + "default": 24, + "description": "Look Back hours for fetching the indicators from MISP. Note: This parameter will only be considered if \"Pulling Mechanism\" is set to \"Look Back\"." }, { - "key": "No", - "value": "no" - } - ], - "default": "no", - "mandatory": false, - "description": "Remove any IoC from the events that would cause a hit on a warning list entry." - }, - { - "label": "Pulling Mechanism", - "key": "pulling_mechanism", - "type": "choice", - "choices": [ - { - "key": "Incremental", - "value": "incremental" + "label": "Retraction Interval (in days)", + "key": "retraction_interval", + "type": "number", + "mandatory": false, + "description": "Retraction Interval days to run IoC(s) retraction for MISP indicators. Note: This parameter will only be considered if \"IoC(s) Retraction\" is enabled in Threat Exchange Settings." }, { - "key": "Look Back", - "value": "look_back" - } - ], - "default": "incremental", - "mandatory": true, - "description": "Pulling mechanism used to fetch the indicators from MISP." - }, - { - "label": "Look Back (in hours)", - "key": "look_back", - "type": "number", - "mandatory": false, - "default": 24, - "description": "Look Back hours for fetching the indicators from MISP. Note: This parameter will only be considered if \"Pulling Mechanism\" is set to \"Look Back\"." - }, - { - "label": "Enable Tagging", - "key": "enable_tagging", - "type": "choice", - "choices": [ - { - "key": "Yes", - "value": "yes" + "label": "Enable Tagging", + "key": "enable_tagging", + "type": "choice", + "choices": [ + { + "key": "Yes", + "value": "yes" + }, + { + "key": "No", + "value": "no" + } + ], + "default": "yes", + "mandatory": true, + "description": "Enable/Disable tagging functionality." }, { - "key": "No", - "value": "no" + "label": "Initial Range (in days)", + "key": "days", + "type": "number", + "mandatory": true, + "default": 7, + "description": "Number of days to pull the data for the initial run. Note: This parameter will only be considered if \"Pulling Mechanism\" is set to \"Incremental\"." } - ], - "default": "yes", - "mandatory": true, - "description": "Enable/Disable tagging functionality." - }, - { - "label": "Initial Range (in days)", - "key": "days", - "type": "number", - "mandatory": true, - "default": 7, - "description": "Number of days to pull the data for the initial run. Note: This parameter will only be considered if \"Pulling Mechanism\" is set to \"Incremental\"." - } - ] -} + ] +} \ No newline at end of file diff --git a/misp/utils/constants.py b/misp/utils/constants.py index 51e80239..b3d7804e 100644 --- a/misp/utils/constants.py +++ b/misp/utils/constants.py @@ -32,15 +32,19 @@ CTE MISP Constants module. """ -TYPES = { - "md5": "md5", - "sha256": "sha256", - "ip-src": "url", - "url": "url", - "domain": "url", -} - -ATTRIBUTE_TYPES = ["md5", "sha256", "ip-src", "url", "domain"] +ATTRIBUTE_TYPES = [ + "md5", + "sha256", + "ip-src", + "ip-src|port", + "ip-dst", + "ip-dst|port", + "url", + "domain", + "domain|ip", + "hostname", + "hostname|port", +] ATTRIBUTE_CATEGORIES = [ @@ -64,7 +68,7 @@ PLATFORM_NAME = "MISP" PLUGIN_NAME = "MISP" MODULE_NAME = "CTE" -PLUGIN_VERSION = "1.4.0" +PLUGIN_VERSION = "1.5.0" BATCH_SIZE = 2500 MAX_API_CALLS = 4 DEFAULT_WAIT_TIME = 60 @@ -72,11 +76,11 @@ MAX_LOOK_BACK = 8760 BIFURCATE_INDICATOR_TYPES = { "url", - "domain", - "hostname", "ipv4", "ipv6", - "fqdn", } +RETRACTION = "Retraction" DEFAULT_IOC_TAG = "netskope-ce" +SHARING_TAG_CONSTANT = "Netskope CE" PULL_PAGE_SIZE = 1000 +RETRACTION_BATCH = 10000 diff --git a/misp/utils/helper.py b/misp/utils/helper.py index 2aed3639..e587730c 100644 --- a/misp/utils/helper.py +++ b/misp/utils/helper.py @@ -45,6 +45,7 @@ MAX_API_CALLS, MODULE_NAME, PLATFORM_NAME, + RETRACTION, ) @@ -119,6 +120,7 @@ def api_helper( verify: bool = True, proxies: Dict = {}, show_payload: bool = True, + is_retraction: bool = False, ): """API Helper perform API request to ThirdParty platform and captures all the possible errors for requests. @@ -146,10 +148,11 @@ def api_helper( is_handle_error_required is True otherwise returns Response object. """ try: + if is_retraction and RETRACTION not in self.log_prefix: + self.log_prefix = self.log_prefix + f" [{RETRACTION}]" headers = self._add_user_agent(headers) - debug_log_msg = ( - f"{self.log_prefix} : API Request for {logger_msg}." + f"{self.log_prefix}: API Request for {logger_msg}." f" Endpoint: {method} {url}" ) if params: @@ -172,7 +175,7 @@ def api_helper( ) status_code = response.status_code self.logger.debug( - f"{self.log_prefix} : Received API Response for " + f"{self.log_prefix}: Received API Response for " f"{logger_msg}. Status Code={status_code}." ) From 8d3c378c774dae4c9cb5a17d1f19770326c1f985 Mon Sep 17 00:00:00 2001 From: Tanushree Kurup Date: Wed, 11 Mar 2026 18:24:45 +0530 Subject: [PATCH 3/3] Added CTE Netskope CRE v1.6.0 plugin. --- netskope_ztre/CHANGELOG.md | 8 + netskope_ztre/main.py | 1336 ++++++++++++++++++++++++++++-- netskope_ztre/manifest.json | 4 +- netskope_ztre/utils/constants.py | 45 +- netskope_ztre/utils/helper.py | 34 +- 5 files changed, 1364 insertions(+), 63 deletions(-) diff --git a/netskope_ztre/CHANGELOG.md b/netskope_ztre/CHANGELOG.md index c5294f6c..949ef639 100644 --- a/netskope_ztre/CHANGELOG.md +++ b/netskope_ztre/CHANGELOG.md @@ -1,3 +1,11 @@ +# 1.6.0 +## Added +- Added support for 'Tag/Untag Device' action on Netskope. +- Added handling to split and store comma separated UBA user records. +- Added support to pull Netskope Device tags in CE. +## Fixed +- Fixed an issue where the plugin sends empty dict if the expected fields are not found in API response. + # 1.5.1 ## Added - Added support for partial failure in bulk action. diff --git a/netskope_ztre/main.py b/netskope_ztre/main.py index 112d355e..f47391ff 100644 --- a/netskope_ztre/main.py +++ b/netskope_ztre/main.py @@ -1,4 +1,35 @@ -"""Netskope CRE plugin.""" +""" +BSD 3-Clause License + +Copyright (c) 2021, Netskope OSS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Netskope CRE plugin.""" import json import re @@ -31,6 +62,7 @@ MAX_HOSTS_PER_PRIVATE_APP, REGEX_HOST, REGEX_EMAIL, + REGEX_TAG, MODULE_NAME, PLUGIN, PLUGIN_VERSION, @@ -45,8 +77,12 @@ USERS_BATCH_SIZE, APPLICATIONS_BATCH_SIZE, TAG_APP_TAG_LENGTH, + TAG_DEVICE_TAG_LENGTH, + TAG_DEVICE_BATCH_SIZE, TAG_EXISTS, DEVICE_FIELD_MAPPING, + MAX_TAGS_PER_DEVICE, + TAG_CACHE_PAGE_SIZE, ) from .utils.helper import NetskopePluginHelper, NetskopeException @@ -197,7 +233,8 @@ def get_entities(self) -> List[Entity]: ), EntityField( name="Netskope Device UID", - type=EntityFieldType.STRING + type=EntityFieldType.STRING, + required=True, ), EntityField( name="Mac Addresses", @@ -213,7 +250,8 @@ def get_entities(self) -> List[Entity]: ), EntityField( name="Device Serial Number", - type=EntityFieldType.STRING + type=EntityFieldType.STRING, + required=True, ), EntityField( name="Operating System", @@ -253,12 +291,17 @@ def get_entities(self) -> List[Entity]: ), EntityField( name="User Key", - type=EntityFieldType.STRING + type=EntityFieldType.LIST, + required=True, ), EntityField( name="Device Classification Status", type=EntityFieldType.STRING ), + EntityField( + name="Tags", + type=EntityFieldType.LIST + ), ], ) ] @@ -279,13 +322,19 @@ def _convert_string_to_list(self, data_object: Dict, key: str) -> Dict: return data_object def _add_field(self, fields_dict: dict, field_name: str, value): - """Add field to the extracted_fields dictionary. + """ + Add field to the extracted_fields dictionary. Args: fields_dict (dict): Field dictionary to update. field_name (str): Field name to add. value: Field to add. """ + # Skip empty dicts to prevent MongoDB errors + if (isinstance(value, dict) or isinstance(value, list)) and not value: + fields_dict[field_name] = None + return + if isinstance(value, int) or isinstance(value, float): fields_dict[field_name] = value return @@ -316,7 +365,11 @@ def _extract_field_from_event( k = keys.pop(0) if k not in event and default is not None: return default - event = event.get(k, {}) + if k not in event: + return default + if not isinstance(event, dict): + return default + event = event.get(k) if transformation and transformation == "string": return str(event) return event @@ -326,7 +379,7 @@ def _extract_entity_fields( event: dict, entity_field_mapping: Dict[str, Dict[str, str]], entity: str, - ) -> dict: + ) -> list[dict]: """ Extracts the required entity fields from the event payload as per the mapping provided. @@ -339,7 +392,9 @@ def _extract_entity_fields( entity (str): Entity name. Returns: - dict: Dictionary containing the extracted entity fields. + list[dict]: List of dictionaries containing the extracted + entity fields. For Users entity with comma-separated + emails, returns multiple records. """ extracted_fields = {} for field_name, field_value in entity_field_mapping.items(): @@ -358,6 +413,24 @@ def _extract_entity_fields( transformation=transformation, ), ) + + if entity == "Users": + email_value = extracted_fields.get("email", "") + if email_value and "," in email_value: + emails = [ + email.strip() + for email in email_value.split(",") + if email.strip() + ] + records = [] + for email in emails: + record = extracted_fields.copy() + record["email"] = email + records.append(record) + return records + else: + return [extracted_fields] if extracted_fields else [] + # If the CSV response has multiple values for mac address field they # are separated by '|' and the tenant plugin parses it into a list # of strings but for single value in mac address field it is parsed @@ -367,6 +440,13 @@ def _extract_entity_fields( data_object=extracted_fields, key="Mac Addresses", ) + # The userkey earlier was going as str, but + # in the netity mapping we have made it as list now + # so it should also be returned as list + extracted_fields = self._convert_string_to_list( + data_object=extracted_fields, + key="User Key", + ) # Converting Unix timestamp to datetime object last_updated_timestamp = extracted_fields.get( "Last Updated Timestamp" @@ -380,7 +460,7 @@ def _extract_entity_fields( except Exception: converted_datetime = None extracted_fields["Last Updated Timestamp"] = converted_datetime - return extracted_fields + return [extracted_fields] if extracted_fields else [] def fetch_records(self, entity: str) -> list[dict]: """Fetch user and application records from Netskope alerts. @@ -803,7 +883,7 @@ def _fetch_users(self) -> list[dict]: entity="Users", ) if extracted_data: - users.append(extracted_data) + users.extend(extracted_data) else: skipped_count += 1 except Exception as err: @@ -921,6 +1001,170 @@ def _update_users(self, records: list[dict]): ) return updated_users + def _update_devices(self, records: list[dict]) -> list[dict]: + """Update device tags by fetching them from the Netskope tenant. + + Args: + records (list): List of device records to update. + + Returns: + list: List of updated device records with a new 'Tags' field. + """ + tenant_name = self.tenant.parameters.get("tenantName", "").strip() + url = f"{tenant_name}{URLS.get('V2_DEVICE_GET_TAGS')}" + token = resolve_secret(self.tenant.parameters.get("v2token", "")) + headers = { + "Netskope-API-Token": token, + "Content-Type": "application/json", + } + total_updated_record_counter = 0 + updated_records = [] + for record in records: + # Make a copy to avoid modifying the original record in case of failure + updated_record = {} + + device_uid = record.get("Netskope Device UID") + user_key = record.get("User Key") + updated_record["Device Serial Number"] = record.get( + "Device Serial Number" + ) + updated_record["Device ID"] = record.get("Device ID") + updated_record["Netskope Device UID"] = device_uid + updated_record["User Key"] = user_key + + # Handle User Key as list (multi-user scenario) + user_keys = user_key + if isinstance(user_key, str): + user_keys = [user_key] if user_key else [] + elif not isinstance(user_key, list): + user_keys = [] + + unique_user_keys = set() + for uk in user_keys: + unique_user_keys.add(uk) + + if not device_uid or not unique_user_keys: + self.logger.info( + f"{self.log_prefix}: Missing Netskope Device UID or " + f"User Key for device '{record.get('Device ID')}'. " + "Skipping update." + ) + continue + + user_keys_list = list(unique_user_keys) + total_batches = ( + (len(user_keys_list) + TAG_DEVICE_BATCH_SIZE - 1) // + TAG_DEVICE_BATCH_SIZE + ) + + if total_batches > 1: + self.logger.debug( + f"{self.log_prefix}: Device '{device_uid}' has " + f"{len(user_keys_list)} user key(s). Processing " + f"{total_batches} batch(es) of {TAG_DEVICE_BATCH_SIZE}." + ) + + all_tags = set() + successful_batches = 0 + failed_batches = 0 + + for batch_num in range(total_batches): + start_idx = batch_num * TAG_DEVICE_BATCH_SIZE + end_idx = min( + start_idx + TAG_DEVICE_BATCH_SIZE, + len(user_keys_list) + ) + user_keys_batch = user_keys_list[start_idx:end_idx] + + devices_payload = [ + {"nsdeviceuid": device_uid, "userkey": user_key} + for user_key in user_keys_batch + ] + payload = {"devices": devices_payload} + logger_msg = ( + f"fetching tags for device with UID '{device_uid}' " + f"(batch {batch_num + 1}/{total_batches} with " + f"{len(user_keys_batch)} user(s))" + ) + + try: + response = self.netskope_helper._api_call_helper( + url=url, + method="post", + headers=headers, + json=payload, + proxies=self.proxy, + message=f"Error occurred while {logger_msg}", + logger_msg=logger_msg, + error_codes=["CRE_1045", "CRE_1049"], + ) + + if response.get("success"): + tags = [ + tag.get("name") + for tag in response.get("data", {}).get("data", []) + if tag.get("name") + ] + all_tags.update(tags) + successful_batches += 1 + else: + self.logger.info( + message=( + f"{self.log_prefix}: Batch " + f"{batch_num + 1}/{total_batches} failed for device " + f"with UID '{device_uid}'." + ), + details=( + f"{str(response.get('error'))}" + ) + ) + failed_batches += 1 + continue + + except NetskopeException as e: + self.logger.error( + f"{self.log_prefix}: {logger_msg} failed due to an " + f"exception. Error: {e}" + ) + failed_batches += 1 + continue + except Exception as e: + self.logger.error( + message=( + f"{self.log_prefix}: Unexpected error while " + f"{logger_msg}. Error: {e}" + ), + details=str(traceback.format_exc()), + ) + failed_batches += 1 + continue + + log_msg = "" + if failed_batches > 0: + log_msg = ( + f" Failed to fetch tags for {failed_batches} " + f"out of {total_batches} batch(es) for device " + f"with UID '{device_uid}'." + ) + + self.logger.debug( + f"{self.log_prefix}: Successfully fetched tags for " + f"{successful_batches} batch(es).{log_msg}" + ) + + updated_record["Tags"] = list(all_tags) + updated_records.append(updated_record) + + if list(all_tags): + total_updated_record_counter += 1 + + self.logger.info( + f"{self.log_prefix}: Successfully updated {len(updated_records)}" + f" record(s). Fetched tag(s) for {total_updated_record_counter} out " + f"of {len(records)} record(s)." + ) + return updated_records + def _update_applications(self, records: list[dict]): """Update application scores. @@ -962,7 +1206,7 @@ def _fetch_devices(self): entity="Devices", ) if extracted_data: - devices.append(extracted_data) + devices.extend(extracted_data) else: skipped_count += 1 except Exception as e: @@ -1005,9 +1249,9 @@ def update_records(self, entity: str, records: List[dict]): self.logger.info(update_logger_msg) return self._update_applications(records) elif entity == "Devices": - # There is no score related field in Devices entity - # hence returning empty list. - return [] + # Add tags to devices + self.logger.info(update_logger_msg) + return self._update_devices(records) else: raise ValueError(f"Unsupported entity '{entity}'") @@ -1029,6 +1273,9 @@ def get_actions(self): ActionWithoutParams( label="Tag/Untag Application", value="tag_app" ), + ActionWithoutParams( + label="Tag/Untag Device", value="tag_device" + ), ActionWithoutParams(label="No actions", value="generate"), ] @@ -1832,6 +2079,16 @@ def validate_action(self, action: Action): return ValidationResult( success=True, message="Validation successful." ) + elif action.value == "tag_device": + try: + self._process_params_for_tag_device_action( + action.parameters, is_validation=True + ) + except NetskopeException as ex: + return ValidationResult(success=False, message=str(ex)) + return ValidationResult( + success=True, message="Validation successful." + ) elif action.value == "app_instance": try: self._process_params_for_app_instance_action(action.parameters) @@ -2608,6 +2865,77 @@ def get_action_params(self, action: Action) -> list: ), }, ] + elif action.value == "tag_device": + return [ + { + "label": "Tag Action", + "key": "tag_device_action", + "type": "choice", + "choices": [ + { + "key": "Add", + "value": "append" + }, + { + "key": "Remove", + "value": "remove" + }, + { + "key": "Replace", + "value": "replace" + } + ], + "default": "append", + "placeholder": "Add", + "mandatory": True, + "description": ( + "Select whether to add, remove, or replace tags on the " + "devices. Select Tag Action from the static dropdown only. Note: " + "at max 5 tags are allowed per user-device pair on Netskope." + ), + }, + { + "label": "Tags", + "key": "tags", + "type": "text", + "default": "", + "placeholder": "e.g. tag-1, tag-2", + "mandatory": False, + "description": ( + "Select a source field for the tags or provide static " + "comma-separated tag values. Note: For Replace " + "action, if tags are not provided or empty, all tags " + "will be removed from the device." + ), + }, + { + "label": "Netskope Device UID", + "key": "device_id", + "type": "text", + "default": "", + "placeholder": "e.g. uid-1", + "mandatory": True, + "description": ( + "The Device UID of the device to which the tag action should be " + "performed. Select a source field or " + "provide a static value." + ), + }, + { + "label": "User Key", + "key": "device_user_key", + "type": "text", + "default": "", + "placeholder": "e.g. user-1", + "mandatory": True, + "description": ( + "The User Key of the user associated with the" + " device on which the tag action should be " + "performed. Select a source field or " + "provide a static value." + ), + }, + ] elif action.value == "app_instance": return [ { @@ -3121,59 +3449,439 @@ def _tag_application( f"{self.log_prefix}: {log_msg}" ) - def _create_app_instance( + def _fetch_all_tags(self): + """Fetch all tags from Netskope tenant with pagination. + + Returns: + dict: Dictionary mapping tag_name to tag_id, or None if fetch fails. + """ + tenant_name = self.tenant.parameters.get('tenantName', '').strip().strip('/') + headers = { + "Netskope-API-Token": resolve_secret( + self.tenant.parameters.get("v2token", "") + ) + } + + tag_cache = {} + offset = 0 + total_fetched = 0 + + try: + self.logger.info( + f"{self.log_prefix}: Fetching all tags from Netskope." + ) + + while True: + log_message = f"fetching tags (offset: {offset}, limit: {TAG_CACHE_PAGE_SIZE})" + self.logger.debug(f"{self.log_prefix}: {log_message.capitalize()}.") + + url = f"{tenant_name}{URLS.get('V2_DEVICE_GET_TAGS')}" + payload = { + "devices": [], + "offset": offset, + "limit": TAG_CACHE_PAGE_SIZE + } + + response = self.netskope_helper._api_call_helper( + url=url, + method="post", + error_codes=["CRE_1045", "CRE_1049"], + headers=headers, + json=payload, + proxies=self.proxy, + message=f"Error occurred while {log_message}", + logger_msg=log_message, + ) + + if not response.get("success"): + err_msg = ( + "Error occurred while fetching all tags " + "from Netskope." + ) + self.logger.error( + message=( + f"{self.log_prefix}: {err_msg}." + ), + details=str(response.get("error", {})) + ) + raise NetskopeException(err_msg) + + data = response.get("data", {}) + tags = data.get("data", []) if data and isinstance(data, dict) else [] + + for tag_info in tags: + if isinstance(tag_info, dict): + # as the Netskope API is case insensitive , + # we are normalizing the tags and then storing it + tag_name = tag_info.get("name").lower() + tag_id = tag_info.get("id") + if tag_name and tag_id: + tag_cache[tag_name] = tag_id + + total_fetched += len(tags) + + total_count = data.get("total_count", 0) + self.logger.debug( + f"{self.log_prefix}: Fetched {len(tags)} tag(s) in current page. " + f"Total fetched: {total_fetched}/{total_count}." + ) + + if total_fetched >= total_count or len(tags) < TAG_CACHE_PAGE_SIZE: + break + + offset += TAG_CACHE_PAGE_SIZE + + self.logger.info( + f"{self.log_prefix}: Successfully fetched {len(tag_cache)} tag(s)" + " from Netskope tenant" + ) + return tag_cache + + except Exception as e: + err_msg = ( + "An Unexpected error occurred while " + "fetching all tags from Netskope." + ) + self.logger.error( + message=( + f"{self.log_prefix}: {err_msg} Error: {e}" + ), + details=str(traceback.format_exc()) + ) + raise NetskopeException(err_msg) + + def _query_and_create_tags_with_cache( self, - instance_id: str, - instance_name: str, - app: str, tags: list[str], - auth_token: str, + action: str, + tag_cache: dict = None ): - """Create an app instance. + """Query for tags using cache and create them if they don't exist. Args: - instance_id (str): Instance ID. - instance_name (str): Instance name. - app (str): App name. - tags (str): Tag. - auth_token (str): Auth token. + tags (list[str]): List of tags to query and create. + action (str): Action type - 'append', 'remove', or 'replace'. + tag_cache (dict): Cache mapping tag_name to tag_id. + + Returns: + tuple: (tag_ids list[str], updated_tag_cache dict) + + Raises: + NetskopeException: If any tag creation/validation fails. """ - tenant_name = ( - self.tenant.parameters.get('tenantName', '').strip() - ) - url = ( - f"{tenant_name}{URLS.get('V1_APP_INSTANCE')}" - ) - params = { - "op": "list", - "app": app, - "instance_id": instance_id, - "instance_name": instance_name, - } - logger_msg = f"listing app instances for application: {app}" - try: - list_instances_response = self.netskope_helper._api_call_helper( - url=url, - method="post", - error_codes=["CRE_1045", "CRE_1049"], - params=params, - data={"token": auth_token}, - proxies=self.proxy, - message=f"Error occurred while {logger_msg}", - logger_msg=logger_msg, + tenant_name = self.tenant.parameters.get('tenantName', '').strip().strip('/') + headers = { + "Netskope-API-Token": resolve_secret( + self.tenant.parameters.get("v2token", "") ) - if ( - list_instances_response.get("status", "").lower() != - SUCCESS.lower() - ): + } + + tag_ids = [] + updated_cache = tag_cache.copy() + failed_tags = [] + non_empty_tags_list = [] + for tag in tags: + if not tag: + continue + else: + non_empty_tags_list.append(tag) + + if not re.match(REGEX_TAG, tag): err_msg = ( - f"Error occurred while {logger_msg}" + f"Invalid tag '{tag}' provided. Tag name must contain " + "only alphanumeric characters, hyphens, and spaces. This" + f"tag will be skipped for {str(action)} action" ) - self.logger.error( - message=( - f"{self.log_prefix}: {err_msg}." - ), - details=json.dumps(list_instances_response), + self.logger.info(f"{self.log_prefix}: {err_msg}") + failed_tags.append(tag) + continue + + if len(tag) > TAG_DEVICE_TAG_LENGTH: + err_msg = ( + f"Invalid tag '{tag}' provided. Tag length cannot " + f"exceed {TAG_DEVICE_TAG_LENGTH} characters. This" + f"tag will be skipped for {str(action)} action" + ) + self.logger.info(f"{self.log_prefix}: {err_msg}") + failed_tags.append(tag) + continue + + normalized_tag = tag.lower() + if normalized_tag in updated_cache: + tag_ids.append(updated_cache[normalized_tag]) + self.logger.debug( + f"{self.log_prefix}: Tag '{tag}' found in tag lookup" + f" list with ID '{updated_cache[normalized_tag]}'." + ) + elif action in ["append", "replace"]: + log_message = f"creating tag '{tag}'" + self.logger.debug(f"{self.log_prefix}: {log_message.capitalize()}.") + + try: + url = f"{tenant_name}{URLS.get('V2_DEVICE_TAG')}" + create_response = self.netskope_helper._api_call_helper( + url=url, + method="post", + error_codes=["CRE_1045", "CRE_1049"], + headers=headers, + json={"name": tag, "description": "Tag created by Cloud Exchange"}, + proxies=self.proxy, + message=f"Error occurred while {log_message}", + logger_msg=log_message, + ) + + if not create_response.get("success"): + err_msg = ( + f"Error occurred while {log_message}. This" + f"tag will be skipped for {str(action)} action" + ) + self.logger.error( + message=f"{self.log_prefix}: {err_msg}", + details=json.dumps(create_response.get("error", {})) + ) + failed_tags.append(tag) + elif tag_info := create_response.get("data", {}): + tag_id = tag_info.get("id") + if tag_id: + tag_ids.append(tag_id) + updated_cache[normalized_tag] = tag_id + self.logger.debug( + f"{self.log_prefix}: Successfully created " + f"tag '{tag}' with ID '{tag_id}'." + ) + else: + failed_tags.append(tag) + else: + failed_tags.append(tag) + except Exception: + failed_tags.append(tag) + elif action == "remove": + err_msg = ( + f"Provided tag '{tag}' not found on Netskope " + "when trying to untag device. " + "Please provide valid tags." + ) + self.logger.error( + message=f"{self.log_prefix}: {err_msg}" + ) + failed_tags.append(tag) + + if failed_tags: + msg = ( + f"Failed to process {len(failed_tags)} tag(s) either " + "due to being invalid or of unsupported type for " + f"{action} action. Following tag(s) {failed_tags} will be " + "skipped." + ) + self.logger.info( + f"{self.log_prefix}: {msg}" + ) + + if not tag_ids: + if action == "replace" and not non_empty_tags_list: + self.logger.debug( + f"{self.log_prefix}: No tags provided for " + "replace action. All tags will be removed from " + "the device(s)." + ) + return tag_ids, updated_cache + else: + err_msg = ( + f"No tag ids found for {str(tags)} tag(s) to" + f" perform {action} tag(s) action on Device(s)." + ) + raise NetskopeException(err_msg) + + return tag_ids, updated_cache + + def _get_tag_id( + self, + tag_name: str, + response: dict + ): + """Check Tag Query Response for given tag + + Args: + tag_name str: tag name to search for in response + response dict: Response received from Netskope + + Returns: + tag_id str: tag id if it exists, else None + """ + if not response.get("success"): + return None + data = response.get("data", {}).get("data", []) + try: + for tag_info in data: + if tag_info.get("name").lower() == tag_name.lower(): + return tag_info.get("id") + except Exception: + pass + return None + + def _tag_devices( + self, + tag_ids: list[str], + devices: list[dict], + cci_tag_action: str + ): + """Tag the device(s) on Netskope. + + Args: + tags (list[str]): List of tags to be attached to \ + the device(s). + device_ids (list[str]): List of device IDs to be tagged. + device_user_keys (list[str]): List of device user keys to be tagged. + cci_tag_action (str): Action to be performed on the tags. + """ + tenant_name = ( + self.tenant.parameters.get('tenantName', '').strip() + ) + headers = { + "Netskope-API-Token": resolve_secret( + self.tenant.parameters.get("v2token", "") + ) + } + + if cci_tag_action == "append": + log_message = "adding tags to devices" + url = f"{tenant_name}{URLS.get('V2_DEVICE_BULK_ADD_TAGS')}" + else: # remove + log_message = "removing tags from devices" + url = f"{tenant_name}{URLS.get('V2_DEVICE_BULK_REMOVE_TAGS')}" + + data = {"tags": tag_ids, "devices": devices} + + self.logger.debug(f"{self.log_prefix}: {log_message.capitalize()}.") + response = self.netskope_helper._api_call_helper( + url=url, + method="post", + error_codes=["CRE_1045", "CRE_1049"], + headers=headers, + json=data, + proxies=self.proxy, + message=f"Error occurred while {log_message}", + logger_msg=log_message, + ) + if not response.get("success"): + err_msg = f"Error occurred while {log_message}." + self.logger.error( + message=f"{self.log_prefix}: {err_msg}", + details=json.dumps(response), + ) + raise NetskopeException(err_msg) + + log_msg = ( + f"Successfully {'added' if cci_tag_action == 'append' else 'removed'} " + f"tags for {len(devices)} device record(s) on the Netskope Tenant." + ) + self.logger.debug( + f"{self.log_prefix}: {log_msg}" + ) + + def _replace_device_tags( + self, + tag_ids: list[int], + devices: list[dict], + ): + """Replace all tags on devices with the provided tags. + + Args: + tag_ids (list[int]): List of tag IDs to replace on the devices. + devices (list[dict]): List of device dicts with nsdeviceuid and + userkey. + """ + tenant_name = ( + self.tenant.parameters.get('tenantName', '').strip().strip('/') + ) + headers = { + "Netskope-API-Token": resolve_secret( + self.tenant.parameters.get("v2token", "") + ) + } + + log_message = f"replacing tags on {len(devices)} device record(s)" + url = f"{tenant_name}{URLS.get('V2_DEVICE_BULK_REPLACE_TAGS')}" + + data = {"tags": tag_ids, "devices": devices} + + self.logger.debug(f"{self.log_prefix}: {log_message.capitalize()}.") + response = self.netskope_helper._api_call_helper( + url=url, + method="post", + error_codes=["CRE_1045", "CRE_1049"], + headers=headers, + json=data, + proxies=self.proxy, + message=f"Error occurred while {log_message}", + logger_msg=log_message, + ) + if not response.get("success"): + err_msg = f"Error occurred while {log_message}." + self.logger.error( + message=f"{self.log_prefix}: {err_msg}", + details=json.dumps(response), + ) + raise NetskopeException(err_msg) + + self.logger.debug( + f"{self.log_prefix}: Successfully replaced tags for " + f"{len(devices)} device record(s) on the Netskope Tenant." + ) + + def _create_app_instance( + self, + instance_id: str, + instance_name: str, + app: str, + tags: list[str], + auth_token: str, + ): + """Create an app instance. + + Args: + instance_id (str): Instance ID. + instance_name (str): Instance name. + app (str): App name. + tags (str): Tag. + auth_token (str): Auth token. + """ + tenant_name = ( + self.tenant.parameters.get('tenantName', '').strip() + ) + url = ( + f"{tenant_name}{URLS.get('V1_APP_INSTANCE')}" + ) + params = { + "op": "list", + "app": app, + "instance_id": instance_id, + "instance_name": instance_name, + } + logger_msg = f"listing app instances for application: {app}" + try: + list_instances_response = self.netskope_helper._api_call_helper( + url=url, + method="post", + error_codes=["CRE_1045", "CRE_1049"], + params=params, + data={"token": auth_token}, + proxies=self.proxy, + message=f"Error occurred while {logger_msg}", + logger_msg=logger_msg, + ) + if ( + list_instances_response.get("status", "").lower() != + SUCCESS.lower() + ): + err_msg = ( + f"Error occurred while {logger_msg}" + ) + self.logger.error( + message=( + f"{self.log_prefix}: {err_msg}." + ), + details=json.dumps(list_instances_response), ) raise NetskopeException(err_msg) if len(list_instances_response.get("data", [])): @@ -3630,6 +4338,144 @@ def convert_to_list(value: Union[str, list[str]]) -> list[str]: return tags, apps, ids, tag_action + def _process_params_for_tag_device_action( + self, + params: dict, + is_execute: bool = False, + is_validation: bool = False + ) -> tuple: + """Process parameters for tag device action. + + Args: + params (dict): Params dictionary. + + Returns: + tuple: Processed params. + """ + + def convert_to_list( + value: Union[str, list[str]], + static_input_len_validation: bool = False, + field_name: str = "", + is_validation: bool = False, + action_name: str = "" + ) -> list[str]: + """Convert to list. + + Args: + value (Union[str, list[str]]): Value to be converted. + + Returns: + list[str]: List of values. + """ + list_values = [] + if isinstance(value, list): + list_values = value + if isinstance(value, str): + list_values = [v.strip() for v in value.split(",")] + if ( + is_validation and any(not v for v in list_values) + ): + log_and_raise = False + if action_name == "replace" and len(list_values) > 1: + log_and_raise = True + elif action_name != "replace": + log_and_raise = True + + if log_and_raise: + err_msg = ( + f"Static field '{field_name}' " + "can not have empty comma separated values." + ) + raise NetskopeException(err_msg) + if is_validation and static_input_len_validation and len(list_values) > 1: + err_msg = ( + f"Static field '{field_name}'" + "can not have multiple comma separated values." + ) + raise NetskopeException(err_msg) + return list_values + + tags = params.get("tags") or "" + tag_action = params.get("tag_device_action") or "append" + + skip_tag_validation = isinstance(tags, str) and tags.startswith("$") + tags = convert_to_list( + action_name=tag_action, + value=tags, + field_name="Tags", + is_validation=is_validation + ) + + device_id = params.get("device_id") or "" + skip_device_id_validation = isinstance(device_id, str) and device_id.startswith("$") + convert_to_list( + value=device_id, + static_input_len_validation=True, + field_name="Netskope Device UID", + is_validation=is_validation + ) + + device_user_key = params.get("device_user_key") or "" + skip_device_user_key_validation = isinstance(device_user_key, str) and device_user_key.startswith("$") + device_user_key = convert_to_list( + value=device_user_key, + field_name="User Key", + is_validation=is_validation + ) + + if isinstance(tag_action, str) and tag_action.startswith("$"): + err_msg = ( + "Select Tag Action " + "from Static field dropdown only." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + if tag_action not in ["append", "remove", "replace"]: + err_msg = ( + "Invalid value for Tag Action provided. " + "It must be either 'Add', 'Remove', or 'Replace'." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + if not tags and not skip_tag_validation and tag_action in ["append", "remove"]: + err_msg = ( + f"Invalid value for tags provided. Tags cannot be empty for " + f"'{tag_action}' action." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + + if not skip_tag_validation and not is_execute: + for tag in tags: + if not re.match(REGEX_TAG, tag): + err_msg = ( + "Invalid value for Tags provided. " + "Tag name must contain only alphanumeric characters, hyphens, and spaces." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + if len(tag) > TAG_DEVICE_TAG_LENGTH: + err_msg = ( + "Invalid value for Tags provided. " + "Each tag length can not exceed " + f"{TAG_DEVICE_TAG_LENGTH} characters." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + + if (not device_id and not device_user_key) and not ( + skip_device_id_validation or skip_device_user_key_validation + ): + err_msg = ( + "Invalid value for Netskope Device UID/User Key provided. " + "Provide either Netskope Device UID or User Key." + ) + self.logger.error(f"{self.log_prefix}: {err_msg}") + raise NetskopeException(err_msg) + + return tags, device_id, device_user_key, tag_action + def revert_action(self, action: Action): """Revert the action. @@ -3872,6 +4718,353 @@ def revert_uci_update_impact(self, anomaly_id: str): ) raise NetskopeException(error_message) + def _bulk_add_remove_tag_device(self, actions: List[Action], action_label: str) -> List[str]: + """Tag devices in bulk. + + Args: + actions (List[Action]): List of actions. + action_label (str): Action label. + + Returns: + List[str]: List of failed action IDs. + """ + + try: + failed_action_ids = [] + tag_groups = {} + tag_cache = self._fetch_all_tags() + action_value = None + for action_dict in actions: + action_id, action = action_dict.get("id"), action_dict.get("params") + try: + ( + tags, + device_id, + device_user_key, + cci_tag_action, + ) = self._process_params_for_tag_device_action( + action.parameters, True + ) + + user_keys = device_user_key + action_value = "Add" if cci_tag_action == "append" else "Remove" + if isinstance(device_user_key, str): + user_keys = [device_user_key] if device_user_key else [] + elif not isinstance(device_user_key, list): + user_keys = [] + + unique_user_keys = set() + for uk in user_keys: + if uk: + unique_user_keys.add(uk) + + if not unique_user_keys or not device_id: + failed_action_ids.append(action_id) + continue + + for user_key in unique_user_keys: + device = {"nsdeviceuid": device_id, "userkey": user_key} + + for tag in tags: + tag_key = (tag, cci_tag_action) + if tag_key not in tag_groups: + tag_groups[tag_key] = { + 'devices': [], + 'action_id_to_devices': {} + } + + tag_groups[tag_key]['devices'].append(device) + if action_id not in tag_groups[tag_key][ + 'action_id_to_devices' + ]: + tag_groups[tag_key][ + 'action_id_to_devices' + ][action_id] = [] + tag_groups[tag_key][ + 'action_id_to_devices' + ][action_id].append(device) + + except Exception as e: + self.logger.error( + f"{self.log_prefix}: Error occurred while processing " + f"action '{action_label}' for record with ID '{action_id}'. " + f"Error: {e}." + ) + failed_action_ids.append(action_id) + log_msg = "" + skipped_records = len(failed_action_ids) + if skipped_records > 0: + log_msg = ( + f" {skipped_records} record(s) will be skipped " + "either due to being invalid or missing " + "'Netskope Device UID' or 'User Key' field values." + ) + self.logger.info( + f"{self.log_prefix}: Performing '{action_value}' Tag(s)" + f" action on {len(actions)-skipped_records} " + f"record(s).{log_msg if log_msg else ''}" + ) + for (tag, cci_tag_action), group_data in tag_groups.items(): + devices = group_data['devices'] + action_id_to_devices = group_data['action_id_to_devices'] + + try: + tag_ids, tag_cache = self._query_and_create_tags_with_cache( + [tag], action=cci_tag_action, tag_cache=tag_cache + ) + + unique_devices = [] + seen_devices = set() + for device in devices: + device_key = (device.get("nsdeviceuid"), device.get("userkey")) + if device_key not in seen_devices: + unique_devices.append(device) + seen_devices.add(device_key) + + total_batches = (len(unique_devices) + TAG_DEVICE_BATCH_SIZE - 1) // TAG_DEVICE_BATCH_SIZE + for batch_num in range(total_batches): + start_idx = batch_num * TAG_DEVICE_BATCH_SIZE + end_idx = min(start_idx + TAG_DEVICE_BATCH_SIZE, len(unique_devices)) + device_batch = unique_devices[start_idx:end_idx] + + batch_action_ids = [] + batch_device_keys = set( + (d.get("nsdeviceuid"), d.get("userkey")) + for d in device_batch + ) + for action_id, action_devices in action_id_to_devices.items(): + for action_device in action_devices: + action_device_key = ( + action_device.get("nsdeviceuid"), + action_device.get("userkey") + ) + if action_device_key in batch_device_keys: + batch_action_ids.append(action_id) + break + + try: + self.logger.info( + f"{self.log_prefix}: Processing batch {batch_num + 1}/{total_batches} " + f"of {len(device_batch)} device record(s) for tag '{tag}'." + ) + self._tag_devices(tag_ids, device_batch, cci_tag_action) + + self.logger.info( + f"{self.log_prefix}: Successfully {'tagged' if cci_tag_action == 'append' else 'untagged'} " + f"batch {batch_num + 1}/{total_batches} ({len(device_batch)} device record(s)) with tag '{tag}'." + ) + + except Exception as batch_e: + self.logger.error( + f"{self.log_prefix}: Failed to {'tag' if cci_tag_action == 'append' else 'untag'} " + f"batch {batch_num + 1}/{total_batches} ({len(device_batch)} device record(s)) with tag '{tag}'. " + f"Error: {batch_e}" + ) + failed_action_ids.extend(batch_action_ids) + + except Exception as group_e: + self.logger.error( + f"{self.log_prefix}: Failed to execute action '{action_label}' for tag '{tag}'. " + f"Error: {group_e}" + ) + failed_action_ids.extend(list(action_id_to_devices.keys())) + except Exception as group_e: + failed_action_ids = [] + self.logger.error( + f"{self.log_prefix}: Failed to execute action '{action_label}'. " + f"Error: {group_e}" + ) + for action_dict in actions: + action_id = action_dict.get("id") + failed_action_ids.append(action_id) + return failed_action_ids + + return failed_action_ids + + def _bulk_replace_device_tags( + self, + actions: List[Action], + action_label: str + ) -> List[str]: + """Replace tags on devices in bulk using optimized tag-set grouping. + + This method optimizes API calls by: + 1. Grouping devices with identical tag sets together + 2. Batching devices (max 100 per API call) + 3. Enforcing 5-tag limit per device + + Args: + actions (List[Action]): List of actions. + action_label (str): Action label. + + Returns: + List[str]: List of failed action IDs. + """ + failed_action_ids = [] + device_to_tags = {} + tag_cache = self._fetch_all_tags() + for action_dict in actions: + action_id = action_dict.get("id") + action = action_dict.get("params") + try: + ( + tags, + device_id, + device_user_key, + _, + ) = self._process_params_for_tag_device_action( + action.parameters, True + ) + + user_keys = device_user_key + if isinstance(device_user_key, str): + user_keys = [device_user_key] if device_user_key else [] + elif not isinstance(device_user_key, list): + user_keys = [] + + unique_user_keys = set() + for uk in user_keys: + if uk: + unique_user_keys.add(uk) + + if not unique_user_keys or not device_id: + failed_action_ids.append(action_id) + continue + + for user_key in unique_user_keys: + device_key = (device_id, user_key) + if device_key not in device_to_tags: + device_to_tags[device_key] = { + 'tags': set(), + 'action_ids': set() + } + device_to_tags[device_key]['tags'].update(tags) + device_to_tags[device_key]['action_ids'].add(action_id) + + except Exception as e: + self.logger.error( + f"{self.log_prefix}: Error occurred while processing " + f"action '{action_label}' for record with ID '{action_id}'. " + f"Error: {e}." + ) + failed_action_ids.append(action_id) + log_msg = "" + skipped_records = len(failed_action_ids) + if skipped_records > 0: + log_msg = ( + f" {skipped_records} record(s) will be skipped " + "either due to being invalid or missing " + "'Netskope Device UID' or 'User Key' field values." + ) + self.logger.info( + f"{self.log_prefix}: Performing 'Replace' Tag(s) " + f"action on {len(actions)-skipped_records} " + f"record(s).{log_msg if log_msg else ''}" + ) + + tag_set_groups = {} + + for (device_id, user_key), group_data in device_to_tags.items(): + tags = sorted(list(group_data['tags'])) # Sort for consistent grouping + action_ids = group_data['action_ids'] + + # 5-tag limit + if len(tags) > MAX_TAGS_PER_DEVICE: + skipped_tags = tags[MAX_TAGS_PER_DEVICE:] + tags = tags[:MAX_TAGS_PER_DEVICE] + self.logger.info( + f"{self.log_prefix}: Device '{device_id}' (userkey: " + f"'{user_key}') has more than {MAX_TAGS_PER_DEVICE} tags. " + f"Only the first {MAX_TAGS_PER_DEVICE} sorted tags will be " + f"applied: {tags}. Skipped tags: {skipped_tags}" + ) + + tag_set_key = tuple(tags) + if tag_set_key not in tag_set_groups: + tag_set_groups[tag_set_key] = { + 'devices': [], + 'action_ids': set() + } + + device = {"nsdeviceuid": device_id, "userkey": user_key} + tag_set_groups[tag_set_key]['devices'].append(device) + tag_set_groups[tag_set_key]['action_ids'].update(action_ids) + + self.logger.debug( + f"{self.log_prefix}: Grouped {len(device_to_tags)} device record(s) into " + f"{len(tag_set_groups)} tag set group(s)." + ) + + for tags_tuple, group_data in tag_set_groups.items(): + tags = list(tags_tuple) + devices = group_data['devices'] + action_ids = list(group_data['action_ids']) + + try: + tag_ids, tag_cache = self._query_and_create_tags_with_cache( + tags, action="replace", tag_cache=tag_cache + ) + + # Batch TAG_DEVICE_BATCH_SIZE per API call) + total_batches = ( + (len(devices) + TAG_DEVICE_BATCH_SIZE - 1) // TAG_DEVICE_BATCH_SIZE + ) + + self.logger.info( + f"{self.log_prefix}: Replacing tags {tags} on " + f"{len(devices)} device record(s) in {total_batches} batch(es)." + ) + + for batch_num in range(total_batches): + start_idx = batch_num * TAG_DEVICE_BATCH_SIZE + end_idx = min( + start_idx + TAG_DEVICE_BATCH_SIZE, len(devices) + ) + device_batch = devices[start_idx:end_idx] + + try: + self.logger.info( + f"{self.log_prefix}: Processing batch " + f"{batch_num + 1}/{total_batches} with " + f"{len(device_batch)} device records(s) for tags {tags}." + ) + + self._replace_device_tags(tag_ids, device_batch) + + self.logger.info( + f"{self.log_prefix}: Successfully replaced tags " + f"on batch {batch_num + 1}/{total_batches} " + f"({len(device_batch)} device records) with tags {tags}." + ) + + except Exception as batch_e: + self.logger.error( + message=( + f"{self.log_prefix}: Failed to replace tags on " + f"batch {batch_num + 1}/{total_batches} " + f"({len(device_batch)} records). Error: {batch_e}" + ), + details=str(traceback.format_exc()), + ) + batch_device_keys = set( + (d.get("nsdeviceuid"), d.get("userkey")) + for d in device_batch + ) + for device_key, dev_data in device_to_tags.items(): + if device_key in batch_device_keys: + failed_action_ids.extend( + list(dev_data['action_ids']) + ) + + except Exception as group_e: + self.logger.error( + f"{self.log_prefix}: Failed to process tag set group " + f"with tags {tags}. Error: {group_e}" + ) + failed_action_ids.extend(action_ids) + + return failed_action_ids + def execute_action(self, action: Action): """Execute action on the user. @@ -4351,6 +5544,33 @@ def execute_actions(self, actions): message=f"Successfully {log_msg_add_remove} applications.", failed_action_ids=failed_action_ids ) + elif first_action.value == "tag_device": + cci_tag_action = first_action.parameters.get( + "tag_device_action", "append" + ) + if cci_tag_action == "replace": + # Use device-to-tags mapping for replace action + failed_action_ids = self._bulk_replace_device_tags( + actions, action_label + ) + return ActionResult( + success=True, + message="Successfully replaced tags on devices.", + failed_action_ids=list(set(failed_action_ids)), + ) + else: + # Use tag-to-devices mapping for add/remove actions + log_msg_add_remove = ( + "tagged" if cci_tag_action == "append" else "untagged" + ) + failed_action_ids = self._bulk_add_remove_tag_device( + actions, action_label + ) + return ActionResult( + success=True, + message=f"Successfully {log_msg_add_remove} devices.", + failed_action_ids=list(set(failed_action_ids)), + ) elif first_action.value == "app_instance": bulk_app_instance_payload = { "update": [], diff --git a/netskope_ztre/manifest.json b/netskope_ztre/manifest.json index f3433726..fbaa5d80 100644 --- a/netskope_ztre/manifest.json +++ b/netskope_ztre/manifest.json @@ -1,12 +1,12 @@ { "name": "Netskope Risk Exchange", - "description": "This plugin is used to fetch Users from UBA Alerts, Applications from Applications Events, and Devices from Client Status Events of the Netskope Tenant. This plugin also supports 'Add user to group', 'Remove user from group', 'Update UCI score', 'UCI Reset', 'Add host to Private App', 'Create or Update App Instance', 'Tag/Untag Application', 'No Action' and 'Mark Anomaly as allowed' is also supported for 'Update UCI score' action.", + "description": "This plugin is used to fetch Users from UBA Alerts, Applications from Applications Events, and Devices from Client Status Events of the Netskope Tenant. This plugin also supports 'Add user to group', 'Remove user from group', 'Update UCI score', 'UCI Reset', 'Add host to Private App', 'Create or Update App Instance', 'Tag/Untag Application', 'Tag/Untag Device', 'No Action' and 'Mark Anomaly as allowed' is also supported for 'Update UCI score' action.", "id": "netskope_ztre", "minimum_version": "6.0.0", "minimum_provider_version": "1.4.0", "provider_id": "netskope_provider", "netskope": true, - "version": "1.5.1", + "version": "1.6.0", "module": "CRE", "supported_subtypes": { "alerts": [ diff --git a/netskope_ztre/utils/constants.py b/netskope_ztre/utils/constants.py index 51dfec50..787e4de7 100644 --- a/netskope_ztre/utils/constants.py +++ b/netskope_ztre/utils/constants.py @@ -1,4 +1,35 @@ -"""Netskope CRE plugin Constants.""" +""" +BSD 3-Clause License + +Copyright (c) 2021, Netskope OSS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Netskope CRE plugin Constants.""" MAX_RETRY_COUNT = 4 DEFAULT_WAIT_TIME = 60 @@ -11,15 +42,21 @@ r"^(?:(?:25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.){3}(?:25[0-5]|(?:2[0-4]|1\d|[1-9]|)\d)$" ) REGEX_EMAIL = r"[^@]+@[^@]+\.[^@]+" +REGEX_TAG = r"^[a-zA-Z0-9- ]*$" MODULE_NAME = "CRE" PLUGIN = "Netskope Risk Exchange" -PLUGIN_VERSION = "1.5.1" +PLUGIN_VERSION = "1.6.0" URLS = { "V2_PRIVATE_APP": "/api/v2/steering/apps/private", "V2_PRIVATE_APP_PATCH": "/api/v2/steering/apps/private/{}", "V2_PUBLISHER": "/api/v2/infrastructure/publishers", "V2_CCI_TAG_CREATE": "/api/v2/services/cci/tags", "V2_CCI_TAG_UPDATE": "/api/v2/services/cci/tags/{}", + "V2_DEVICE_GET_TAGS": "/api/v2/devices/device/tags/gettags", + "V2_DEVICE_TAG": "/api/v2/devices/device/tags", + "V2_DEVICE_BULK_ADD_TAGS": "/api/v2/devices/device/tags/bulkadd", + "V2_DEVICE_BULK_REMOVE_TAGS": "/api/v2/devices/device/tags/bulkremove", + "V2_DEVICE_BULK_REPLACE_TAGS": "/api/v2/devices/device/tags/bulkreplace", "V1_APP_INSTANCE": "/api/v1/app_instances", "V2_SCIM_GROUPS": "/api/v2/scim/Groups", "V2_SCIM_USERS": "/api/v2/scim/Users", @@ -44,6 +81,10 @@ APP_INSTANCE_BATCH_SIZE = 500 TAG_APP_BATCH_SIZE = 100 TAG_APP_TAG_LENGTH = 75 +TAG_DEVICE_TAG_LENGTH = 80 +TAG_DEVICE_BATCH_SIZE = 100 +MAX_TAGS_PER_DEVICE = 5 +TAG_CACHE_PAGE_SIZE = 10 DEVICE_FIELD_MAPPING = { "Device ID": {"key": "device_id"}, "Hostname": {"key": "host_info.hostname"}, diff --git a/netskope_ztre/utils/helper.py b/netskope_ztre/utils/helper.py index f1989fd8..fe7354d7 100644 --- a/netskope_ztre/utils/helper.py +++ b/netskope_ztre/utils/helper.py @@ -1,4 +1,36 @@ -"""Netskope CRE plugin helper module.""" +""" +BSD 3-Clause License + +Copyright (c) 2021, Netskope OSS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Netskope CRE plugin helper module.""" + import requests import time import traceback