Skip to content

Commit 4430b76

Browse files
committed
failure: add infrastructure to tag and collect revocation events in Keylime
Currently Keylime operates in a binary state when it comes failures that cause revocations and does not collect information where and why that revocation happened. This commit introduces the tagging and collection infrastructure and configuration for revocation events and adding context to them. SeverityLabel Has a name that can be set in the keylime.conf and a severity that is dynamically calculated based on the order in the configuration. Components Is a enumeration that contains the main components of Keylime that can cause events. Must be specified when creating a Failure object. Event Holds a revocation event. An event can be identified by their event_id which has the format: "component.[sub_component].event" The severity is automatically assigned based on the event_id. The event contains a context which is string encoded JSON object. An event must be marked irrecoverable if other validation steps are skipped by early returning. Failure Holds a collection of events and provides add_event(...) for adding new events to it and merge(...) to merge it with another Failure object. Is False if no events are currently in the Failure object and is otherwise True. Future changes that cause a revocation must extend and return a Failure object instead of returning a boolean value. Part of enhancement proposal keylime/enhancements#48 Signed-off-by: Thore Sommer <[email protected]>
1 parent f3d5823 commit 4430b76

File tree

2 files changed

+212
-0
lines changed

2 files changed

+212
-0
lines changed

keylime.conf

+11
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,17 @@ measured_boot_policy_name = accept-all
278278
# The default value for this config item is the empty string.
279279
# measured_boot_imports = keylime.elchecking
280280

281+
# Severity labels for revocation events strictly ordered from least severity to
282+
# highest severtiy.
283+
severity_labels = ["info", "notice", "warning", "error", "critical", "alert", "emergency"]
284+
285+
# Severity policy that matches different event_ids to the severity label.
286+
# The rules are evaluated from the beginning of the list and the first match is
287+
# used. The event_id can also be a regex. Default policy assigns the highest
288+
# severity to all events.
289+
severity_policy = [{"event_id": ".*", "severity_label": "emergency"}]
290+
291+
281292
#=============================================================================
282293
[tenant]
283294
#=============================================================================

keylime/failure.py

+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
'''
2+
SPDX-License-Identifier: Apache-2.0
3+
Copyright 2021 Thore Sommer
4+
5+
Tagging of failure events that might cause revocation in Keylime.
6+
'''
7+
import ast
8+
import dataclasses
9+
import enum
10+
import functools
11+
import json
12+
import re
13+
from typing import List, Optional, Tuple, Callable, Union
14+
15+
from keylime import config
16+
from keylime import keylime_logging
17+
18+
logger = keylime_logging.init_logging("failure")
19+
20+
21+
@functools.total_ordering
22+
@dataclasses.dataclass(frozen=True)
23+
class SeverityLabel:
24+
"""
25+
Severity label that can be attached to an event.
26+
27+
The severity level is assigned dynamically based on the configuration,
28+
so only use the name for use outside use of the tagging module.
29+
"""
30+
name: str
31+
severity: int
32+
33+
def __lt__(self, other):
34+
return self.severity < other.severity
35+
36+
def __eq__(self, other):
37+
return self.severity == other.severity
38+
39+
40+
class Component(enum.Enum):
41+
"""
42+
Main components of Keylime that can generate revocations.
43+
"""
44+
QUOTE_VALIDATION = "qoute_validation"
45+
PCR_VALIDATION = "pcr_validation"
46+
MEASURED_BOOT = "measured_boot"
47+
IMA = "ima"
48+
INTERNAL = "internal"
49+
DEFAULT = "default"
50+
51+
52+
@dataclasses.dataclass
53+
class Event:
54+
"""
55+
Event that might be the reason for revocation.
56+
57+
The context is string
58+
"""
59+
event_id: str
60+
severity_label: SeverityLabel
61+
context: str
62+
recoverable: bool
63+
64+
def __init__(self, component: Component,
65+
sub_components: Optional[List[str]],
66+
event_id: str,
67+
context: Union[str, dict],
68+
recoverable: bool):
69+
70+
# Build full event id with the format "component.[sub_component].event_id"
71+
self.event_id = component.value
72+
if sub_components is not None:
73+
self.event_id += "." + ".".join(sub_components)
74+
self.event_id += f".{event_id}"
75+
76+
# Convert message
77+
if isinstance(context, str):
78+
context = {"message": context}
79+
self.context = json.dumps(context)
80+
81+
self.severity_label = _severity_match(self.event_id)
82+
self.recoverable = recoverable
83+
84+
85+
class Failure:
86+
"""
87+
Failure Object that collects all events that might cause a revocation.
88+
89+
If recoverable is set to False the validation process returned early and might skipped other validation steps.
90+
"""
91+
events: List[Event]
92+
recoverable: bool
93+
highest_severity: Optional[SeverityLabel]
94+
_component: Optional[Component]
95+
_sub_components: Optional[List[str]]
96+
97+
def __init__(self, component, sub_components=None):
98+
self._component = component
99+
self._sub_components = sub_components
100+
self.events = []
101+
self.recoverable = True
102+
self.highest_severity_event: Optional[Event] = None # This only holds the first event that has the highest severity
103+
self.highest_severity: Optional[SeverityLabel] = None
104+
105+
def _add(self, event: Event):
106+
if not event.recoverable:
107+
self.recoverable = False
108+
if event.severity_label != MAX_SEVERITY_LABEL:
109+
logger.warning(
110+
f"Irrecoverable Event with id: {event.event_id} has not the highest severity level.\n "
111+
f"Setting it the the highest severity level.")
112+
event.severity_label = MAX_SEVERITY_LABEL
113+
114+
if self.highest_severity is None or event.severity_label > self.highest_severity:
115+
self.highest_severity = event.severity_label
116+
self.highest_severity_event = event
117+
118+
self.events.append(event)
119+
120+
def add_event(self, event_id: str, context: Union[str, dict], recoverable: bool, sub_components=None):
121+
"""
122+
Add event to Failure object. Uses the component and subcomponents specified in the Failure object.
123+
124+
As context specify either a string that contains a message or a dict that contains useful information about that
125+
event.
126+
Set recoverable to False if the code skips other not directly related checks. Those events should always have
127+
the highest severity label assigned and if not we manually do that.
128+
129+
Example usage:
130+
failure.add_event("ima_hash",
131+
{"message": "IMA hash does not match the calculated hash.",
132+
"expected": self.template_hash, "got": self.mode.hash()}, True)
133+
"""
134+
if sub_components is not None and self._sub_components is not None:
135+
sub_components = self._sub_components + sub_components
136+
elif self._sub_components is not None:
137+
sub_components = self._sub_components
138+
event = Event(self._component, sub_components, event_id, context, recoverable)
139+
self._add(event)
140+
141+
def merge(self, other):
142+
if self.recoverable:
143+
self.recoverable = other.recoverable
144+
if self.highest_severity is None:
145+
self.highest_severity = other.highest_severity
146+
self.highest_severity_event = other.highest_severity_event
147+
elif other.highest_severity is not None and self.highest_severity < other.highest_severity:
148+
self.highest_severity = other.highest_severity
149+
self.highest_severity_event = other.highest_severity_event
150+
151+
self.events.extend(other.events)
152+
153+
def __bool__(self):
154+
return not self.recoverable or len(self.events) > 0
155+
156+
157+
def _eval_severity_config() -> Tuple[List[Callable[[str], Optional[SeverityLabel]]], SeverityLabel]:
158+
"""
159+
Generates the list of rules to match a event_id against.
160+
"""
161+
162+
labels_list = ast.literal_eval(config.get("cloud_verifier", "severity_labels"))
163+
labels = {}
164+
for label, level in zip(labels_list, range(0, len(labels_list))):
165+
labels[label] = SeverityLabel(label, level)
166+
167+
label_max = labels[labels_list[-1]]
168+
169+
policies = ast.literal_eval(config.get("cloud_verifier", "severity_policy"))
170+
rules = []
171+
for policy in policies:
172+
# TODO validate regex
173+
regex = re.compile(policy["event_id"])
174+
175+
def rule(policy_regex, label_str: str, event_id: str) -> Optional[SeverityLabel]:
176+
if policy_regex.fullmatch(event_id):
177+
policy_label = labels.get(label_str)
178+
if policy_label is None:
179+
logger.error(f"Label {label_str} is not a valid label. Defaulting to maximal severity label!")
180+
return label_max
181+
return policy_label
182+
return None
183+
184+
rules.append(functools.partial(rule, regex, policy["severity_label"]))
185+
return rules, label_max
186+
187+
188+
# Only evaluate the policy once on module load
189+
SEVERITY_RULES, MAX_SEVERITY_LABEL = _eval_severity_config()
190+
191+
192+
def _severity_match(event_id: str) -> SeverityLabel:
193+
"""
194+
Match the event_id to a severity label.
195+
"""
196+
for rule in SEVERITY_RULES:
197+
match = rule(event_id)
198+
if match is not None:
199+
return match
200+
logger.warning(f"No rule matched for event_id: {event_id}. Defaulting to max severity label")
201+
return MAX_SEVERITY_LABEL

0 commit comments

Comments
 (0)