Skip to content

Commit

Permalink
ima: move to new failure architecture
Browse files Browse the repository at this point in the history
Part of enhancement proposal keylime/enhancements#48

Signed-off-by: Thore Sommer <[email protected]>
  • Loading branch information
THS-on committed Sep 13, 2021
1 parent 4430b76 commit e2b9d89
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 50 deletions.
62 changes: 40 additions & 22 deletions keylime/ima.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from keylime import ima_ast
from keylime.agentstates import AgentAttestState
from keylime import ima_file_signatures
from keylime.failure import Failure, Component


logger = keylime_logging.init_logging('ima')
Expand Down Expand Up @@ -77,39 +78,49 @@ def read_unpack(fd, fmt):
return struct.unpack(fmt, fd.read(struct.calcsize(fmt)))


def _validate_ima_ng(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, hash_types='hashes'):
if allowlist is not None and allowlist.get(hash_types) is not None:

def _validate_ima_ng(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, hash_types='hashes') -> Failure:
failure = Failure(Component.IMA, ["validation", "ima-ng"])
if allowlist is not None:
if exclude_regex is not None and exclude_regex.match(path.name):
logger.debug("IMA: ignoring excluded path %s" % path)
return True
return failure

accept_list = allowlist[hash_types].get(path.name, None)
if accept_list is None:
logger.warning("Entry not found in allowlist: %s" % (path.name))
return False
logger.warning(f"File not found in allowlist: {path.name}")
failure.add_event("not_in_allowlist", f"File not found in allowlist: {path.name}", True)
return failure

if codecs.encode(digest.hash, 'hex').decode('utf-8') not in accept_list:
logger.warning("Hashes for file %s don't match %s not in %s" %
(path.name,
codecs.encode(digest.hash, 'hex').decode('utf-8'),
accept_list))
return False
failure.add_event(
"allowlist_hash",
{"message": "Hash not in allowlist found",
"got": codecs.encode(digest.hash, 'hex').decode('utf-8'),
"expected": accept_list}, True)
return failure

return True
return failure


def _validate_ima_sig(exclude_regex, ima_keyring, allowlist, digest: ima_ast.Digest, path: ima_ast.Name,
signature: ima_ast.Signature):
signature: ima_ast.Signature) -> Failure:
failure = Failure(Component.IMA, ["validator", "ima-sig"])
valid_signature = False
if ima_keyring and signature:

if exclude_regex is not None and exclude_regex.match(path.name):
logger.debug(f"IMA: ignoring excluded path {path.name}")
return True
return failure

if not ima_keyring.integrity_digsig_verify(signature.data, digest.hash, digest.algorithm):
logger.warning(f"signature for file {path.name} is not valid")
return False
failure.add_event("invalid_signature", f"signature for file {path.name} is not valid", True)
return failure

valid_signature = True
logger.debug("signature for file %s is good" % path)
Expand All @@ -124,9 +135,11 @@ def _validate_ima_sig(exclude_regex, ima_keyring, allowlist, digest: ima_ast.Dig

# If we don't have a allowlist and don't have a keyring we just ignore the validation.
if ima_keyring is None:
return True
return failure

return valid_signature
if not valid_signature:
failure.add_event("invalid_signature", f"signature for file {path.name} could not be validated", True)
return failure


def _validate_ima_buf(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, data: ima_ast.Buffer):
Expand All @@ -136,10 +149,11 @@ def _validate_ima_buf(exclude_regex, allowlist, digest: ima_ast.Digest, path: im
return _validate_ima_ng(exclude_regex, allowlist, digest, path, hash_types='keyrings')

# Anything else evaluates to true for now
return True
return Failure(Component.IMA)


def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcrval=None, ima_keyring=None, boot_aggregates=None):
failure = Failure(Component.IMA)
running_hash = agentAttestState.get_pcr_state(config.IMA_PCR)
found_pcr = (pcrval is None)
errors = {}
Expand Down Expand Up @@ -190,7 +204,10 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr
# update hash
running_hash = hashlib.sha1(running_hash + entry.template_hash).digest()

if not entry.valid():
validation_failure = entry.invalid()

if validation_failure:
failure.merge(validation_failure)
errors[type(entry.mode)] = errors.get(type(entry.mode), 0) + 1

if not found_pcr:
Expand All @@ -208,6 +225,7 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr
path = entry.mode.path.name
m2w.write(f"{hash_value} {path}\n")
except ima_ast.ParserError:
failure.add_event("entry", f"Line was not parsable into a valid IMA entry: {line}", True, ["parser"])
logger.error(f"Line was not parsable into a valid IMA entry: {line}")

# iterative attestation may send us no log; compare last know PCR 10 state
Expand All @@ -217,30 +235,30 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr

# check PCR value has been found
if not found_pcr:
logger.error("IMA measurement list does not match TPM PCR %s" % pcrval)
return None
logger.error(f"IMA measurement list does not match TPM PCR {pcrval}")
failure.add_event("pcr_mismatch", f"IMA measurement list does not match TPM PCR {pcrval}", True)

# Check if any validators failed
if sum(errors.values()) > 0:
error_msg = "IMA ERRORS: Some entries couldn't be validated. Number of failures in modes: "
error_msg += ", ".join([f'{k.__name__ } {v}' for k, v in errors.items()])
logger.error(error_msg + ".")
return None

return codecs.encode(running_hash, 'hex').decode('utf-8')
return codecs.encode(running_hash, 'hex').decode('utf-8'), failure


def process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcrval=None, ima_keyring=None, boot_aggregates=None):
result = None
failure = Failure(Component.IMA)
try:
result = _process_measurement_list(agentAttestState, lines, lists=lists, m2w=m2w, pcrval=pcrval, ima_keyring=ima_keyring, boot_aggregates=boot_aggregates)
running_hash, failure = _process_measurement_list(agentAttestState, lines, lists=lists, m2w=m2w, pcrval=pcrval, ima_keyring=ima_keyring, boot_aggregates=boot_aggregates)
except: # pylint: disable=try-except-raise
raise
finally:
if not result:
if failure:
# TODO currently reset on any failure which might be an issue
agentAttestState.reset_ima_attestation()

return result
return running_hash, failure


def process_allowlists(allowlist, exclude):
Expand Down
23 changes: 17 additions & 6 deletions keylime/ima_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from typing import Dict, Callable, Any, Optional
from keylime import keylime_logging
from keylime.failure import Failure, Component

logger = keylime_logging.init_logging("ima")

Expand All @@ -39,7 +40,9 @@ def get_validator(self, class_type) -> Callable:
validator = self.functions.get(class_type, None)
if validator is None:
logger.warning(f"No validator was implemented for: {class_type} . Using always false validator!")
return lambda *_: False
failure = Failure(Component.IMA, ["validation"])
failure.add_event("no_validator", f"No validator was implemented for: {class_type} . Using always false validator!", True)
return lambda *_: failure
return validator


Expand Down Expand Up @@ -305,14 +308,22 @@ def __init__(self, data: str, validator=None):
if self.template_hash == START_HASH:
self.template_hash = FF_HASH

def valid(self):
def invalid(self):
failure = Failure(Component.IMA, ["validation"])
# Ignore template hash for ToMToU errors
if self.template_hash == FF_HASH:
logger.warning("Skipped template_hash validation entry with FF_HASH")
return self.mode.is_data_valid(self.validator)
failure.add_event("tomtou", "hash validation was skipped", True)
failure.merge(self.mode.is_data_valid(self.validator))
return failure
if self.template_hash != self.mode.hash():
return False
failure.add_event("ima_hash",
{"message": "IMA hash does not match the calculated hash.",
"expected": self.template_hash, "got": self.mode.hash()}, True)
return failure
if self.validator is None:
return False
failure.add_event("no_validator", "No validator specified", True)
return failure

return self.mode.is_data_valid(self.validator)
failure.merge(self.mode.is_data_valid(self.validator))
return failure
5 changes: 3 additions & 2 deletions test/test_ima_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import unittest

from keylime import ima_ast
from keylime.failure import Failure, Component

# BEGIN TEST DATA

Expand All @@ -32,7 +33,7 @@


def _true(*_):
return True
return Failure(Component.DEFAULT)


AlwaysTrueValidator = ima_ast.Validator({
Expand All @@ -54,7 +55,7 @@ def test_valid_entry_construction(self):
self.assertTrue(isinstance(entry.mode, expected_mode)) # pylint: disable=isinstance-second-argument-not-valid-type
self.assertTrue(entry.template_hash == entry.mode.hash(),
f"Constructed hash of {name} does not match template hash.\n Expected: {entry.template_hash}.\n Got: {entry.mode.hash()}")
self.assertTrue(entry.valid(), f"Entry of {name} couldn't be validated.")
self.assertTrue(not entry.invalid(), f"Entry of {name} couldn't be validated.")
except ima_ast.ParserError as e:
err = e
if err:
Expand Down
60 changes: 40 additions & 20 deletions test/test_ima_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,19 @@ def test_measurment_verification(self):
lists_map = ima.process_allowlists(ALLOWLIST, '')
lists_map_empty = ima.process_allowlists(ALLOWLIST_EMPTY, '')

self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines) is not None,
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines)
self.assertTrue(not failure,
"Validation should always work when no allowlist and no keyring is specified")

self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, lists_map) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines, lists_map)
self.assertTrue(not failure)
# test with list as a string
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, str(lists_map)) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines, str(lists_map))
self.assertTrue(not failure)

# No files are in the allowlist -> this should fail
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, lists_map_empty) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines, lists_map_empty)
self.assertTrue(failure)

def test_signature_verification(self):
""" Test the signature verification """
Expand All @@ -105,20 +109,24 @@ def test_signature_verification(self):

# empty keyring
keyring = ima_file_signatures.ImaKeyring()
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, ima_keyring=keyring) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines, ima_keyring=keyring)
self.assertTrue(failure)

# add key for 1st entry; 1st entry must be verifiable
rsakeyfile = os.path.join(keydir, "rsa2048pub.pem")
pubkey, keyidv2 = ima_file_signatures.get_pubkey_from_file(rsakeyfile)
keyring.add_pubkey(pubkey, keyidv2)
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[0:1], ima_keyring=keyring) is not None)
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[1:2], ima_keyring=keyring) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines[0:1], ima_keyring=keyring)
self.assertTrue(not failure)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines[1:2], ima_keyring=keyring)
self.assertTrue(failure)

# add key for 2nd entry; 1st & 2nd entries must be verifiable
eckeyfile = os.path.join(keydir, "secp256k1.pem")
pubkey, keyidv2 = ima_file_signatures.get_pubkey_from_file(eckeyfile)
keyring.add_pubkey(pubkey, keyidv2)
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[0:2], ima_keyring=keyring) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), lines[0:2], ima_keyring=keyring)
self.assertTrue(not failure)

def test_ima_buf_verification(self):
""" The verification of ima-buf entries supporting keys loaded onto keyrings """
Expand All @@ -139,10 +147,12 @@ def test_iterative_attestation(self):
template_hash = codecs.decode(parts[1].encode("utf-8"), "hex")
running_hash = hashlib.sha1(running_hash + template_hash).digest()
pcrval = codecs.encode(running_hash, "hex").decode("utf-8")
self.assertTrue(ima.process_measurement_list(agentAttestState, [line], pcrval=pcrval) == pcrval)
ima_hash, _ = ima.process_measurement_list(agentAttestState, [line], pcrval=pcrval)
self.assertTrue(ima_hash == pcrval)

# Feed empty iterative measurement list simulating 'no new measurement list entries' on attested system
self.assertTrue(ima.process_measurement_list(agentAttestState, [''], pcrval=pcrval) == pcrval)
ima_hash, _ = ima.process_measurement_list(agentAttestState, [''], pcrval=pcrval)
self.assertTrue(ima_hash == pcrval)


def test_mixed_verfication(self):
Expand All @@ -156,7 +166,8 @@ def test_mixed_verfication(self):
empty_keyring = ima_file_signatures.ImaKeyring()

# every entry is covered by the allowlist and there's no keyring -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map)) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map))
self.assertTrue(not failure)

curdir = os.path.dirname(os.path.abspath(__file__))
keydir = os.path.join(curdir, "data", "ima_keys")
Expand All @@ -171,31 +182,40 @@ def test_mixed_verfication(self):
keyring.add_pubkey(pubkey, keyidv2)

# entries are not covered by a exclude list -> this should fail
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), ima_keyring=keyring) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), ima_keyring=keyring)
self.assertTrue(failure)

# all entries are either covered by allow list or by signature verification -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map), ima_keyring=keyring) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map), ima_keyring=keyring)
self.assertTrue(not failure)

# the signature is valid but the hash in the allowlist is wrong -> this should fail
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_wrong), ima_keyring=keyring) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_wrong), ima_keyring=keyring)
self.assertTrue(failure)

# the signature is valid and the file is not in the allowlist -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_empty), ima_keyring=keyring) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_empty), ima_keyring=keyring)
self.assertTrue(not failure)

# the signature is invalid but the correct hash is in the allowlist -> this should fail
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map), ima_keyring=empty_keyring) is None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map), ima_keyring=empty_keyring)
self.assertTrue(failure)

# the file has no signature but the hash is correct -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map)))
_, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map))
self.assertTrue(not failure)

# All files are in the exclude list but hashes are invalid -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong)) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong))
self.assertTrue(not failure)

# All files are in the exclude list and their signatures are invalid -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_exclude), ima_keyring=empty_keyring) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_exclude), ima_keyring=empty_keyring)
self.assertTrue(not failure)

# All files are in the exclude list but hashes or signatures are invalid -> this should pass
self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong), ima_keyring=empty_keyring) is not None)
_, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong), ima_keyring=empty_keyring)
self.assertTrue(not failure)

def test_read_allowlist(self):
""" Test reading and processing of the IMA allow-list """
Expand Down

0 comments on commit e2b9d89

Please sign in to comment.