forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintegrations.py
108 lines (84 loc) · 4.82 KB
/
integrations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Functions to support and interact with Kibana integrations."""
import gzip
import json
import os
import re
from collections import OrderedDict
from pathlib import Path
import requests
from marshmallow import EXCLUDE, Schema, fields, post_load
from .semver import Version
from .utils import cached, get_etc_path, read_gzip
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
@cached
def load_integrations_manifests() -> dict:
"""Load the consolidated integrations manifest."""
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))
class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
@post_load
def transform_policy_template(self, data, **kwargs):
data["policy_templates"] = [policy["name"] for policy in data["policy_templates"]]
return data
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
final_integration_manifests = {integration: {} for integration in rule_integrations}
for integration in rule_integrations:
integration_manifests = get_integration_manifests(integration)
for manifest in integration_manifests:
validated_manifest = IntegrationManifestSchema(unknown=EXCLUDE).load(manifest)
package_version = validated_manifest.pop("version")
final_integration_manifests[integration][package_version] = validated_manifest
manifest_file = gzip.open(MANIFEST_FILE_PATH, "w+")
manifest_file_bytes = json.dumps(final_integration_manifests).encode("utf-8")
manifest_file.write(manifest_file_bytes)
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")
def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> str:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = {k: v for k, v in sorted(packages_manifest[package].items(),
key=lambda x: Version(str(x[0])))}
# filter integration_manifests to only the latest major entries
major_versions = sorted(list(set([Version(manifest_version)[0] for manifest_version in integration_manifests])),
reverse=True)
for max_major in major_versions:
major_integration_manifests = \
{k: v for k, v in integration_manifests.items() if Version(k)[0] == max_major}
# iterates through ascending integration manifests
# returns latest major version that is least compatible
for version, manifest in OrderedDict(sorted(major_integration_manifests.items(),
key=lambda x: Version(str(x[0])))).items():
compatible_versions = re.sub(r"\>|\<|\=|\^", "", manifest["conditions"]["kibana"]["version"]).split(" || ")
for kibana_ver in compatible_versions:
# check versions have the same major
if int(kibana_ver[0]) == int(current_stack_version[0]):
if Version(kibana_ver) <= Version(current_stack_version + ".0"):
return f"^{version}"
raise ValueError(f"no compatible version for integration {package}:{integration}")
def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"
# link for search parameters - https://github.com/elastic/package-registry
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
"all": "true", "include_policy_templates": "true"}
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
epr_search_response.raise_for_status()
manifests = epr_search_response.json()
if not manifests:
raise ValueError(f"EPR search for {integration} integration package returned empty list")
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests