From e2b9d89481667a0c9be679a26e5c8c9a35f48bdb Mon Sep 17 00:00:00 2001 From: Thore Sommer Date: Mon, 30 Aug 2021 12:03:16 +0200 Subject: [PATCH] ima: move to new failure architecture Part of enhancement proposal https://github.com/keylime/enhancements/issues/48 Signed-off-by: Thore Sommer --- keylime/ima.py | 62 ++++++++++++++++++++++------------- keylime/ima_ast.py | 23 +++++++++---- test/test_ima_ast.py | 5 +-- test/test_ima_verification.py | 60 ++++++++++++++++++++++----------- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/keylime/ima.py b/keylime/ima.py index ad90ae7f6..409eb8d2b 100644 --- a/keylime/ima.py +++ b/keylime/ima.py @@ -20,6 +20,7 @@ from keylime import ima_ast from keylime.agentstates import AgentAttestState from keylime import ima_file_signatures +from keylime.failure import Failure, Component logger = keylime_logging.init_logging('ima') @@ -77,39 +78,49 @@ def read_unpack(fd, fmt): return struct.unpack(fmt, fd.read(struct.calcsize(fmt))) -def _validate_ima_ng(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, hash_types='hashes'): - if allowlist is not None and allowlist.get(hash_types) is not None: + +def _validate_ima_ng(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, hash_types='hashes') -> Failure: + failure = Failure(Component.IMA, ["validation", "ima-ng"]) + if allowlist is not None: if exclude_regex is not None and exclude_regex.match(path.name): logger.debug("IMA: ignoring excluded path %s" % path) - return True + return failure accept_list = allowlist[hash_types].get(path.name, None) if accept_list is None: - logger.warning("Entry not found in allowlist: %s" % (path.name)) - return False + logger.warning(f"File not found in allowlist: {path.name}") + failure.add_event("not_in_allowlist", f"File not found in allowlist: {path.name}", True) + return failure if codecs.encode(digest.hash, 'hex').decode('utf-8') not in accept_list: logger.warning("Hashes for file %s don't match %s not in %s" % (path.name, codecs.encode(digest.hash, 'hex').decode('utf-8'), accept_list)) - return False + failure.add_event( + "allowlist_hash", + {"message": "Hash not in allowlist found", + "got": codecs.encode(digest.hash, 'hex').decode('utf-8'), + "expected": accept_list}, True) + return failure - return True + return failure def _validate_ima_sig(exclude_regex, ima_keyring, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, - signature: ima_ast.Signature): + signature: ima_ast.Signature) -> Failure: + failure = Failure(Component.IMA, ["validator", "ima-sig"]) valid_signature = False if ima_keyring and signature: if exclude_regex is not None and exclude_regex.match(path.name): logger.debug(f"IMA: ignoring excluded path {path.name}") - return True + return failure if not ima_keyring.integrity_digsig_verify(signature.data, digest.hash, digest.algorithm): logger.warning(f"signature for file {path.name} is not valid") - return False + failure.add_event("invalid_signature", f"signature for file {path.name} is not valid", True) + return failure valid_signature = True logger.debug("signature for file %s is good" % path) @@ -124,9 +135,11 @@ def _validate_ima_sig(exclude_regex, ima_keyring, allowlist, digest: ima_ast.Dig # If we don't have a allowlist and don't have a keyring we just ignore the validation. if ima_keyring is None: - return True + return failure - return valid_signature + if not valid_signature: + failure.add_event("invalid_signature", f"signature for file {path.name} could not be validated", True) + return failure def _validate_ima_buf(exclude_regex, allowlist, digest: ima_ast.Digest, path: ima_ast.Name, data: ima_ast.Buffer): @@ -136,10 +149,11 @@ def _validate_ima_buf(exclude_regex, allowlist, digest: ima_ast.Digest, path: im return _validate_ima_ng(exclude_regex, allowlist, digest, path, hash_types='keyrings') # Anything else evaluates to true for now - return True + return Failure(Component.IMA) def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcrval=None, ima_keyring=None, boot_aggregates=None): + failure = Failure(Component.IMA) running_hash = agentAttestState.get_pcr_state(config.IMA_PCR) found_pcr = (pcrval is None) errors = {} @@ -190,7 +204,10 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr # update hash running_hash = hashlib.sha1(running_hash + entry.template_hash).digest() - if not entry.valid(): + validation_failure = entry.invalid() + + if validation_failure: + failure.merge(validation_failure) errors[type(entry.mode)] = errors.get(type(entry.mode), 0) + 1 if not found_pcr: @@ -208,6 +225,7 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr path = entry.mode.path.name m2w.write(f"{hash_value} {path}\n") except ima_ast.ParserError: + failure.add_event("entry", f"Line was not parsable into a valid IMA entry: {line}", True, ["parser"]) logger.error(f"Line was not parsable into a valid IMA entry: {line}") # iterative attestation may send us no log; compare last know PCR 10 state @@ -217,30 +235,30 @@ def _process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcr # check PCR value has been found if not found_pcr: - logger.error("IMA measurement list does not match TPM PCR %s" % pcrval) - return None + logger.error(f"IMA measurement list does not match TPM PCR {pcrval}") + failure.add_event("pcr_mismatch", f"IMA measurement list does not match TPM PCR {pcrval}", True) # Check if any validators failed if sum(errors.values()) > 0: error_msg = "IMA ERRORS: Some entries couldn't be validated. Number of failures in modes: " error_msg += ", ".join([f'{k.__name__ } {v}' for k, v in errors.items()]) logger.error(error_msg + ".") - return None - return codecs.encode(running_hash, 'hex').decode('utf-8') + return codecs.encode(running_hash, 'hex').decode('utf-8'), failure def process_measurement_list(agentAttestState, lines, lists=None, m2w=None, pcrval=None, ima_keyring=None, boot_aggregates=None): - result = None + failure = Failure(Component.IMA) try: - result = _process_measurement_list(agentAttestState, lines, lists=lists, m2w=m2w, pcrval=pcrval, ima_keyring=ima_keyring, boot_aggregates=boot_aggregates) + running_hash, failure = _process_measurement_list(agentAttestState, lines, lists=lists, m2w=m2w, pcrval=pcrval, ima_keyring=ima_keyring, boot_aggregates=boot_aggregates) except: # pylint: disable=try-except-raise raise finally: - if not result: + if failure: + # TODO currently reset on any failure which might be an issue agentAttestState.reset_ima_attestation() - return result + return running_hash, failure def process_allowlists(allowlist, exclude): diff --git a/keylime/ima_ast.py b/keylime/ima_ast.py index 760fa12be..fcea4bda4 100644 --- a/keylime/ima_ast.py +++ b/keylime/ima_ast.py @@ -17,6 +17,7 @@ from typing import Dict, Callable, Any, Optional from keylime import keylime_logging +from keylime.failure import Failure, Component logger = keylime_logging.init_logging("ima") @@ -39,7 +40,9 @@ def get_validator(self, class_type) -> Callable: validator = self.functions.get(class_type, None) if validator is None: logger.warning(f"No validator was implemented for: {class_type} . Using always false validator!") - return lambda *_: False + failure = Failure(Component.IMA, ["validation"]) + failure.add_event("no_validator", f"No validator was implemented for: {class_type} . Using always false validator!", True) + return lambda *_: failure return validator @@ -305,14 +308,22 @@ def __init__(self, data: str, validator=None): if self.template_hash == START_HASH: self.template_hash = FF_HASH - def valid(self): + def invalid(self): + failure = Failure(Component.IMA, ["validation"]) # Ignore template hash for ToMToU errors if self.template_hash == FF_HASH: logger.warning("Skipped template_hash validation entry with FF_HASH") - return self.mode.is_data_valid(self.validator) + failure.add_event("tomtou", "hash validation was skipped", True) + failure.merge(self.mode.is_data_valid(self.validator)) + return failure if self.template_hash != self.mode.hash(): - return False + failure.add_event("ima_hash", + {"message": "IMA hash does not match the calculated hash.", + "expected": self.template_hash, "got": self.mode.hash()}, True) + return failure if self.validator is None: - return False + failure.add_event("no_validator", "No validator specified", True) + return failure - return self.mode.is_data_valid(self.validator) + failure.merge(self.mode.is_data_valid(self.validator)) + return failure diff --git a/test/test_ima_ast.py b/test/test_ima_ast.py index 445c6dada..0741fbfd5 100644 --- a/test/test_ima_ast.py +++ b/test/test_ima_ast.py @@ -6,6 +6,7 @@ import unittest from keylime import ima_ast +from keylime.failure import Failure, Component # BEGIN TEST DATA @@ -32,7 +33,7 @@ def _true(*_): - return True + return Failure(Component.DEFAULT) AlwaysTrueValidator = ima_ast.Validator({ @@ -54,7 +55,7 @@ def test_valid_entry_construction(self): self.assertTrue(isinstance(entry.mode, expected_mode)) # pylint: disable=isinstance-second-argument-not-valid-type self.assertTrue(entry.template_hash == entry.mode.hash(), f"Constructed hash of {name} does not match template hash.\n Expected: {entry.template_hash}.\n Got: {entry.mode.hash()}") - self.assertTrue(entry.valid(), f"Entry of {name} couldn't be validated.") + self.assertTrue(not entry.invalid(), f"Entry of {name} couldn't be validated.") except ima_ast.ParserError as e: err = e if err: diff --git a/test/test_ima_verification.py b/test/test_ima_verification.py index bcd2c4c38..88545e048 100644 --- a/test/test_ima_verification.py +++ b/test/test_ima_verification.py @@ -86,15 +86,19 @@ def test_measurment_verification(self): lists_map = ima.process_allowlists(ALLOWLIST, '') lists_map_empty = ima.process_allowlists(ALLOWLIST_EMPTY, '') - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines) is not None, + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines) + self.assertTrue(not failure, "Validation should always work when no allowlist and no keyring is specified") - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, lists_map) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines, lists_map) + self.assertTrue(not failure) # test with list as a string - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, str(lists_map)) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines, str(lists_map)) + self.assertTrue(not failure) # No files are in the allowlist -> this should fail - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, lists_map_empty) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines, lists_map_empty) + self.assertTrue(failure) def test_signature_verification(self): """ Test the signature verification """ @@ -105,20 +109,24 @@ def test_signature_verification(self): # empty keyring keyring = ima_file_signatures.ImaKeyring() - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines, ima_keyring=keyring) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines, ima_keyring=keyring) + self.assertTrue(failure) # add key for 1st entry; 1st entry must be verifiable rsakeyfile = os.path.join(keydir, "rsa2048pub.pem") pubkey, keyidv2 = ima_file_signatures.get_pubkey_from_file(rsakeyfile) keyring.add_pubkey(pubkey, keyidv2) - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[0:1], ima_keyring=keyring) is not None) - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[1:2], ima_keyring=keyring) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines[0:1], ima_keyring=keyring) + self.assertTrue(not failure) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines[1:2], ima_keyring=keyring) + self.assertTrue(failure) # add key for 2nd entry; 1st & 2nd entries must be verifiable eckeyfile = os.path.join(keydir, "secp256k1.pem") pubkey, keyidv2 = ima_file_signatures.get_pubkey_from_file(eckeyfile) keyring.add_pubkey(pubkey, keyidv2) - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), lines[0:2], ima_keyring=keyring) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), lines[0:2], ima_keyring=keyring) + self.assertTrue(not failure) def test_ima_buf_verification(self): """ The verification of ima-buf entries supporting keys loaded onto keyrings """ @@ -139,10 +147,12 @@ def test_iterative_attestation(self): template_hash = codecs.decode(parts[1].encode("utf-8"), "hex") running_hash = hashlib.sha1(running_hash + template_hash).digest() pcrval = codecs.encode(running_hash, "hex").decode("utf-8") - self.assertTrue(ima.process_measurement_list(agentAttestState, [line], pcrval=pcrval) == pcrval) + ima_hash, _ = ima.process_measurement_list(agentAttestState, [line], pcrval=pcrval) + self.assertTrue(ima_hash == pcrval) # Feed empty iterative measurement list simulating 'no new measurement list entries' on attested system - self.assertTrue(ima.process_measurement_list(agentAttestState, [''], pcrval=pcrval) == pcrval) + ima_hash, _ = ima.process_measurement_list(agentAttestState, [''], pcrval=pcrval) + self.assertTrue(ima_hash == pcrval) def test_mixed_verfication(self): @@ -156,7 +166,8 @@ def test_mixed_verfication(self): empty_keyring = ima_file_signatures.ImaKeyring() # every entry is covered by the allowlist and there's no keyring -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map)) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map)) + self.assertTrue(not failure) curdir = os.path.dirname(os.path.abspath(__file__)) keydir = os.path.join(curdir, "data", "ima_keys") @@ -171,31 +182,40 @@ def test_mixed_verfication(self): keyring.add_pubkey(pubkey, keyidv2) # entries are not covered by a exclude list -> this should fail - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), ima_keyring=keyring) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), ima_keyring=keyring) + self.assertTrue(failure) # all entries are either covered by allow list or by signature verification -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map), ima_keyring=keyring) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), COMBINED.splitlines(), str(lists_map), ima_keyring=keyring) + self.assertTrue(not failure) # the signature is valid but the hash in the allowlist is wrong -> this should fail - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_wrong), ima_keyring=keyring) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_wrong), ima_keyring=keyring) + self.assertTrue(failure) # the signature is valid and the file is not in the allowlist -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_empty), ima_keyring=keyring) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_empty), ima_keyring=keyring) + self.assertTrue(not failure) # the signature is invalid but the correct hash is in the allowlist -> this should fail - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map), ima_keyring=empty_keyring) is None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map), ima_keyring=empty_keyring) + self.assertTrue(failure) # the file has no signature but the hash is correct -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map))) + _, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map)) + self.assertTrue(not failure) # All files are in the exclude list but hashes are invalid -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong)) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong)) + self.assertTrue(not failure) # All files are in the exclude list and their signatures are invalid -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_exclude), ima_keyring=empty_keyring) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), SIGNATURES.splitlines(), str(lists_map_exclude), ima_keyring=empty_keyring) + self.assertTrue(not failure) # All files are in the exclude list but hashes or signatures are invalid -> this should pass - self.assertTrue(ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong), ima_keyring=empty_keyring) is not None) + _, failure = ima.process_measurement_list(AgentAttestState('1'), MEASUREMENTS.splitlines(), str(lists_map_exclude_wrong), ima_keyring=empty_keyring) + self.assertTrue(not failure) def test_read_allowlist(self): """ Test reading and processing of the IMA allow-list """