forked from ComplianceAsCode/content
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassert_ansible_schema.py
137 lines (107 loc) · 3.74 KB
/
assert_ansible_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python3
from __future__ import print_function
import argparse
import re
import sys
import yaml
COMMENT_KEYS = set((
"platform",
"reboot",
"strategy",
"complexity",
"disruption",
))
TAGS = set((
"strategy",
"complexity",
"disruption",
"severity",
))
def make_parser():
parser = argparse.ArgumentParser()
parser.add_argument("input", nargs="+")
parser.add_argument("--verbose", "-v", action="store_true")
parser.add_argument("--cce", action="store_true", help="Require CCEs to be defined as well")
return parser
def validate_comments(fname, args):
caught_comments = set()
regex = re.compile(
r"# ({keys}) = .*"
.format(keys="|".join(COMMENT_KEYS)))
with open(fname, "r") as lines:
for line in lines:
found = re.search(regex, line)
if found:
caught_comments.add(found.group(1))
assert COMMENT_KEYS == caught_comments, (
"Did not found key(s) in comments: {keys}"
.format(keys=", ".join(COMMENT_KEYS.difference(caught_comments))))
if args.verbose:
print("Comment-based metadata OK")
def validate_playbook(playbook, args):
assert "name" in playbook, "playbook doesn't have a name"
assert "hosts" in playbook, "playbook doesn't have the hosts entry"
assert playbook["hosts"] == "@@HOSTS@@", "playbook's hosts is not set to @@HOSTS@@"
assert "become" in playbook, "playbook doesn't have a become key"
assert playbook["become"], "become in the playbook is not set to a true-ish value"
if "vars" in playbook:
assert playbook["vars"], "there are no variables under the 'vars' key of the playbook"
assert "tasks" in playbook, "there are no tasks in the playbook"
first_task = playbook["tasks"][0]
if "block" in first_task:
first_task = first_task["block"][0]
assert "name" in first_task, "The first task doesn't have name"
assert "tags" in playbook, "the playbook doesn't have tags"
if args.verbose:
print("Basic playbook properties OK")
caught_tags = set()
tag_regex = re.compile(
r".*_({keys})"
.format(keys="|".join(TAGS)))
cce_regex = re.compile(r"CCE-[0-9]+-[0-9]+")
for tag in playbook["tags"]:
assert "@" not in tag, \
"A playbook tag {tag} contains @, which is unexpected.".format(tag=tag)
found = re.search(tag_regex, tag)
if found:
caught_tags.add(found.group(1))
if re.search(cce_regex, tag):
caught_tags.add("CCE")
tags_not_caught = TAGS.difference(caught_tags)
assert not tags_not_caught, \
"Missing tags: {stray}".format(stray=", ".join(tags_not_caught))
if args.verbose:
print("Playbook tags OK")
if args.cce:
assert "CCE" in caught_tags, "There is no CCE tag in the playbook"
if args.verbose:
print("Playbook CCE OK")
def validate_yaml(fname, args):
stream = open(fname, "r")
data = yaml.load(stream, Loader=yaml.Loader)
for playbook in data:
validate_playbook(playbook, args)
def validate_input(fname, args):
if args.verbose:
print("Analyzing {fname}".format(fname=fname))
try:
validate_comments(fname, args)
validate_yaml(fname, args)
if args.verbose:
print("Analysis OK")
except AssertionError as err:
msg = (
"Error processing {fname}: {err}"
.format(fname=fname, err=err)
)
print(msg, file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
parser = make_parser()
args = parser.parse_args()
ret = 0
for fname in args.input:
current_ret = validate_input(fname, args)
ret = max(current_ret, ret)
sys.exit(ret)