forked from crowdsecurity/hub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mkindex
executable file
·246 lines (198 loc) · 7.25 KB
/
mkindex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python3
import base64
import decimal
from dataclasses import dataclass
import hashlib
import itertools
import json
from pathlib import Path
from typing import Iterable
import yaml
class HubType(str):
pass
hubtypes: list[HubType] = [
HubType("appsec-configs"),
HubType("appsec-rules"),
HubType("collections"),
HubType("contexts"),
HubType("parsers"),
HubType("postoverflows"),
HubType("scenarios"),
]
class AuthorName(str):
pass
class ItemName(str):
pass
class Content(str):
pass
@dataclass
class VersionDetail:
deprecated: bool
digest: str
@dataclass
class Item:
path: str
author: AuthorName
content: Content
long_description: str | None
version: str
versions: dict[str, VersionDetail]
labels: dict[str, str] | None = None
stage: str | None = None
references: list[str] | None = None
appsec_configs: list[str] | None = None
appsec_rules: list[str] | None = None
collections: list[str] | None = None
contexts: list[str] | None = None
parsers: list[str] | None = None
postoverflows: list[str] | None = None
scenarios: list[str] | None = None
def set_versions(self, prev_versions: dict):
content_hash = hashlib.sha256(base64.b64decode(self.content)).hexdigest()
last_version = decimal.Decimal("0.0")
for version_number, detail in prev_versions.items():
version_decimal = decimal.Decimal(version_number)
if version_decimal > last_version:
last_version = version_decimal
self.versions[version_number] = VersionDetail(
deprecated=detail.get("deprecated", False), digest=detail["digest"]
)
if content_hash == detail["digest"]:
self.version = version_number
if self.version == "":
last_version += decimal.Decimal("0.1")
self.version = str(last_version)
self.versions[self.version] = VersionDetail(
deprecated=False, digest=content_hash
)
def content_as_dicts(self):
return yaml.safe_load_all(base64.b64decode(self.content))
def set_meta_from_content(self):
contents = list(self.content_as_dicts())
content = contents[0]
# XXX: ignore multiple documents after the first one
if "labels" in content:
self.labels = content["labels"]
# for sigma scenarios
if self.labels and 'classification' in self.labels and self.labels['classification'] is None:
del self.labels['classification']
if "description" in content:
self.description = content["description"]
if "references" in content:
self.references = content["references"]
if "appsec-configs" in content:
self.appsec_configs = content["appsec-configs"]
if "appsec-rules" in content:
self.appsec_rules = content["appsec-rules"]
if "collections" in content:
self.collections = content["collections"]
if "contexts" in content:
self.contexts = content["contexts"]
if "parsers" in content:
self.parsers = content["parsers"]
if "postoverflows" in content:
self.postoverflows = content["postoverflows"]
if "scenarios" in content:
self.scenarios = content["scenarios"]
class CustomEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Item):
d = o.__dict__
# remove None or ''
if "long_description" in d and not d.get("long_description"):
del d["long_description"]
if "description" in d and not d.get("description"):
del d["description"]
for key in list(d):
if key == "labels":
# retain None for legacy
continue
# remove None from dependency lists
if d[key] is None:
del d[key]
if "appsec_configs" in d:
d["appsec-configs"] = d.pop("appsec_configs")
if "appsec_rules" in d:
d["appsec-rules"] = d.pop("appsec_rules")
return d
if isinstance(o, VersionDetail):
return o.__dict__
return super().default(o)
type Index = dict[HubType, dict[str, Item]]
class IndexUpdater:
def __init__(self, index: Index):
self.prev_index: dict = index
self.new_index = {}
def parse_dir(self, root: Path):
index: Index = {}
for hubtype, _, author, name, item in iter_types(root):
index.setdefault(hubtype, {})
index[hubtype][f"{author}/{name}"] = item
# copy previous versions from previous index
for hubtype, items in index.items():
for full_name, item in items.items():
prev_versions = {}
try:
prev_versions = self.prev_index[hubtype][full_name]["versions"]
except KeyError:
pass
item.set_versions(prev_versions)
item.set_meta_from_content()
self.new_index = index
def index_json(self) -> str:
return json.dumps(self.new_index, sort_keys=True, indent=2, cls=CustomEncoder)
def iter_items(
authordir: Path, stage_name: str | None
) -> Iterable[tuple[AuthorName, ItemName, Item]]:
for p in itertools.chain(authordir.glob("*/*.yaml"), authordir.glob("*/*.yml")):
content = Content(base64.b64encode(p.read_bytes()).decode())
author = AuthorName(p.parent.name)
suffix = ""
if p.name.endswith(".yaml"):
suffix = ".yaml"
elif p.name.endswith(".yml"):
suffix = ".yml"
name = ItemName(p.name.removesuffix(suffix))
try:
long_description = base64.b64encode(
p.parent.joinpath(name + ".md").read_bytes()
).decode()
except FileNotFoundError:
long_description = None
yield author, name, Item(
path=p.as_posix(),
author=author,
content=content,
version="",
versions={},
long_description=long_description,
stage=stage_name,
)
def iter_stages(
typedir: Path,
) -> Iterable[tuple[str | None, AuthorName, ItemName, Item]]:
hubtype = typedir.name
if hubtype in ["parsers", "postoverflows"]:
for stage in typedir.iterdir():
for author, name, item in iter_items(stage, stage.name):
yield stage.name, author, name, item
else:
for author, name, item in iter_items(typedir, None):
yield None, author, name, item
def iter_types(
root: Path,
) -> Iterable[tuple[HubType, str | None, AuthorName, ItemName, Item]]:
for hubtype in root.iterdir():
if hubtype.name not in hubtypes:
continue
if not hubtype.is_dir():
continue
for stage_name, author, name, item in iter_stages(hubtype):
yield HubType(hubtype.name), stage_name, author, name, item
def main():
prev_index = json.loads(Path(".index.json").read_text())
up = IndexUpdater(prev_index)
up.parse_dir(Path("."))
print(up.index_json())
if __name__ == "__main__":
main()