|
| 1 | +#!/usr/bin/env python |
| 2 | +""" |
| 3 | +Test Procedure Specification (TPS) Report Manifest Shim |
| 4 | +======================================================= |
| 5 | +
|
| 6 | +Validate and parse a Test Procedure Specification (TPS) Report manifest. Execute |
| 7 | +something for the next stage of parsing. |
| 8 | +
|
| 9 | +This file is used as a shim to bridge the gap between the parsing for the |
| 10 | +TPS manifest format and the next action to taken after parsing. This file allows |
| 11 | +for registration of next phase parsers via environment variables. |
| 12 | +
|
| 13 | +The purpose of this script is to preform the initial validation and parsing of |
| 14 | +the TPS manifest. It's responsibility is to then call the appropriate next phase |
| 15 | +manifest parser. It will pass the manifest's data in a format the next phase |
| 16 | +understands, and execute the next phase using capabilities defined within this |
| 17 | +file. |
| 18 | +
|
| 19 | +Updates |
| 20 | +------- |
| 21 | +
|
| 22 | +This file has been vendored into multiple locations. Please be sure to track |
| 23 | +progress as the format evolves upstream. Upstream URL: |
| 24 | +https://github.com/intel/dffml/blob/manifest/dffml/util/testing/manifest/shim.py |
| 25 | +
|
| 26 | +Pull Request for discussion, questions, comments, concerns, review: |
| 27 | +https://github.com/intel/dffml/pull/1273/files |
| 28 | +
|
| 29 | +Contributing |
| 30 | +------------ |
| 31 | +
|
| 32 | +This section is documentation for contributing to the TPS Report (manifest) |
| 33 | +shim. |
| 34 | +
|
| 35 | +We want this shim to be usable on a default format which we'll work to define as |
| 36 | +a community upstream. |
| 37 | +
|
| 38 | +Design Goals |
| 39 | +```````````` |
| 40 | +
|
| 41 | +This shim MUST |
| 42 | +
|
| 43 | +- Work with arbitrary manifest formats |
| 44 | +
|
| 45 | +- Discover verification mechanisms |
| 46 | +
|
| 47 | +- Verify the manifest (think secure boot) |
| 48 | +
|
| 49 | +- Parse the manifest |
| 50 | +
|
| 51 | +- Discover next phase parsers |
| 52 | +
|
| 53 | +- Output the manifest in a format the next phase parser can understand |
| 54 | +
|
| 55 | +- Execute the next phase parser |
| 56 | +
|
| 57 | +Format |
| 58 | +`````` |
| 59 | +
|
| 60 | +We need to come up with a format that allows us to evolve it as we move |
| 61 | +forward. |
| 62 | +
|
| 63 | +To make sure we have forwards / backwards compatibility we should |
| 64 | +include information which allows us to identify what format the document |
| 65 | +is in, and what version of that format it is. This will likely also feed |
| 66 | +into our input dataflow requirements as we'll need to have the ability |
| 67 | +to check an arbitrary input to see if we might have an applicable |
| 68 | +converter. |
| 69 | +
|
| 70 | +Let's learn from JSON Schema and include a URL where we might be able |
| 71 | +to find the schema for the document. We can double up on our previous |
| 72 | +needs by asking that the filename of the URL can help us identify our |
| 73 | +document format (we'll provide fallback for if we don't have control |
| 74 | +over the filename via the ``document_format`` and ``$document_version`` |
| 75 | +keys). We'll parse the URL for the filename component. When we parse it |
| 76 | +we'll split on ``.``. If the first part is eff (Extensible Format |
| 77 | +Format) we'll treat the rest up until the semantic version as the format |
| 78 | +name. Then the semantic version is the version of the format. Then the |
| 79 | +rest should be the extension which is associated with the format which |
| 80 | +we can use to validate the contents of the document, such as JSON |
| 81 | +schema. |
| 82 | +
|
| 83 | +``$schema: "https://example.com/eff.my.document.format.0.0.0.schema.json"`` |
| 84 | +
|
| 85 | +TODO |
| 86 | +---- |
| 87 | +
|
| 88 | +- Verification of the manifest. Idea: Developer generates manifest. |
| 89 | + Signs manifest with public asymmetric key. Prepends base64 encoded |
| 90 | + signature as a valid key, ``$signature``. This means you have to |
| 91 | + parse the YAML before you have verified the signature, which is not |
| 92 | + ideal. However, it's one method available to us and a simple parse |
| 93 | + without the use of a full YAML parser could be done. Or we could |
| 94 | + distribute out of band and verify the document before the conversion |
| 95 | + stage, in the loading stage. |
| 96 | +
|
| 97 | +- Verification of references within manifest. Do we support public |
| 98 | + portion of CA key embedded in the document various places? We |
| 99 | + could then use it for things like verification of git repos where |
| 100 | + the CA must sign all developer keys which are in the repo history. |
| 101 | + This will apply to anything that is an external reference in the |
| 102 | + document. There should be a way for the document to include an HMAC or |
| 103 | + something like that or something more dynamic like a CA. |
| 104 | +
|
| 105 | +Notes |
| 106 | +----- |
| 107 | +
|
| 108 | +- https://github.com/mjg59/ssh_pki |
| 109 | +
|
| 110 | + - Should we use this? No. Are we going to? Yes. |
| 111 | +""" |
| 112 | +import os |
| 113 | +import sys |
| 114 | +import pathlib |
| 115 | +import importlib |
| 116 | +import contextlib |
| 117 | +import dataclasses |
| 118 | +from typing import Dict |
| 119 | + |
| 120 | +with contextlib.suppress((ImportError, ModuleNotFoundError)): |
| 121 | + import yaml |
| 122 | + |
| 123 | + |
| 124 | +def parse(contents: str): |
| 125 | + r''' |
| 126 | + Given the contents of the manifest file as a string, parse the contents into |
| 127 | + a dictionary object. |
| 128 | +
|
| 129 | + :param str conents: string containing the manifest file's contents |
| 130 | + :return: a dictionary representing the manifest |
| 131 | + :rtype: dict |
| 132 | +
|
| 133 | + >>> import textwrap |
| 134 | + >>> from dffml.util.testing.manifest.shim import parse |
| 135 | + >>> |
| 136 | + >>> parse( |
| 137 | + ... textwrap.dedent( |
| 138 | + ... """\ |
| 139 | + ... $document_format: tps.manifest |
| 140 | + ... $document_version: 0.0.1 |
| 141 | + ... testplan: |
| 142 | + ... - git: |
| 143 | + ... repo: https://example.com/my-repo.git |
| 144 | + ... branch: main |
| 145 | + ... file: my_test.py |
| 146 | + ... """ |
| 147 | + ... ) |
| 148 | + ... ) |
| 149 | + {'$document_format': 'tps.manifest', '$document_version': '0.0.1', 'testplan': [{'git': {'repo': 'https://example.com/my-repo.git', 'branch': 'main', 'file': 'my_test.py'}}]} |
| 150 | + ''' |
| 151 | + try: |
| 152 | + return json.loads(contents) |
| 153 | + except Exception as json_parse_error: |
| 154 | + if "yaml" not in sys.modules[__name__].__dict__: |
| 155 | + raise |
| 156 | + try: |
| 157 | + return yaml.safe_load(contents) |
| 158 | + except Exception as yaml_parse_error: |
| 159 | + raise yaml_parse_error from json_parse_error |
| 160 | + |
| 161 | + from pprint import pprint |
| 162 | + |
| 163 | + # Known parser mapping |
| 164 | + parse = { |
| 165 | + ( |
| 166 | + "tps.manifest", |
| 167 | + "0.0.0", |
| 168 | + "dataflow", |
| 169 | + ): self.parse_my_document_format_0_0_0_dataflow |
| 170 | + } |
| 171 | + # Grab mapped parser |
| 172 | + document_format_version_output_mode = ( |
| 173 | + doc.get("$document_format", None), |
| 174 | + doc.get("$document_version", None), |
| 175 | + doc.get("$document_version", None), |
| 176 | + ) |
| 177 | + parser = parse.get(document_format_version, None) |
| 178 | + |
| 179 | + if parser is None: |
| 180 | + raise Exception( |
| 181 | + f"Unknown document format/version pair: {document_format_version}" |
| 182 | + ) |
| 183 | + |
| 184 | + print() |
| 185 | + pprint(doc) |
| 186 | + print() |
| 187 | + parser(doc) |
| 188 | + |
| 189 | + def parse_my_document_format_0_0_0_dataflow(self, doc): |
| 190 | + pass |
| 191 | + |
| 192 | + |
| 193 | +@dataclasses.dataclass |
| 194 | +class ManifestFormatParser: |
| 195 | + """ |
| 196 | + Read in configuration to determine what the next phase of parsing is. |
| 197 | +
|
| 198 | + args holds arguments passed to target. |
| 199 | + """ |
| 200 | + |
| 201 | + format_name: str |
| 202 | + version: str |
| 203 | + output: str |
| 204 | + action: str |
| 205 | + target: str |
| 206 | + args: str = "" |
| 207 | + |
| 208 | + |
| 209 | +ENV_PREFIX = "TPS_MANIFEST_" |
| 210 | + |
| 211 | + |
| 212 | +def environ_discover_dataclass( |
| 213 | + dataclass, |
| 214 | + environ: Dict[str, str] = None, |
| 215 | + *, |
| 216 | + prefix: str = ENV_PREFIX, |
| 217 | + dataclass_key: str = None, |
| 218 | +): |
| 219 | + r""" |
| 220 | + >>> import dataclasses |
| 221 | + >>> from dffml.util.testing.manifest.shim import environ_discover_dataclass |
| 222 | + >>> |
| 223 | + >>> @dataclasses.dataclass |
| 224 | + ... class MyDataclass: |
| 225 | + ... name: str |
| 226 | + ... version: str |
| 227 | + >>> |
| 228 | + >>> environ_discover_dataclass( |
| 229 | + ... MyDataclass, |
| 230 | + ... { |
| 231 | + ... "MYPREFIX_NAME_EXAMPLE_FORMAT": "Example Format", |
| 232 | + ... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", |
| 233 | + ... }, |
| 234 | + ... prefix="MYPREFIX_", |
| 235 | + ... ) |
| 236 | + {'example_format': MyDataclass(name='Example Format', version='0.0.1')} |
| 237 | + >>> |
| 238 | + >>> environ_discover_dataclass( |
| 239 | + ... MyDataclass, |
| 240 | + ... { |
| 241 | + ... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", |
| 242 | + ... }, |
| 243 | + ... prefix="MYPREFIX_", |
| 244 | + ... dataclass_key="name", |
| 245 | + ... ) |
| 246 | + {'example_format': MyDataclass(name='example_format', version='0.0.1')} |
| 247 | + """ |
| 248 | + if environ is None: |
| 249 | + environ = os.environ |
| 250 | + discovered_parsers = {} |
| 251 | + for key, value in environ.items(): |
| 252 | + if not key.startswith(prefix): |
| 253 | + continue |
| 254 | + metadata_key, parser_name = ( |
| 255 | + key[len(prefix) :].lower().split("_", maxsplit=1) |
| 256 | + ) |
| 257 | + discovered_parsers.setdefault(parser_name, {}) |
| 258 | + discovered_parsers[parser_name][metadata_key] = value |
| 259 | + # Ensure they are loaded into the correct class |
| 260 | + for key, value in discovered_parsers.items(): |
| 261 | + if dataclass_key is not None: |
| 262 | + value[dataclass_key] = key |
| 263 | + discovered_parsers[key] = dataclass(**value) |
| 264 | + return discovered_parsers |
| 265 | + |
| 266 | + |
| 267 | +def shim(manifest: str, lockdown: bool, strict: bool): |
| 268 | + parsers = environ_discover_dataclass( |
| 269 | + ManifestFormatParser, dataclass_key="format_name", environ=os.environ |
| 270 | + ) |
| 271 | + print(parsers) |
| 272 | + |
| 273 | + |
| 274 | +def make_parser(): |
| 275 | + parser = argparse.ArgumentParser( |
| 276 | + prog="shim.py", |
| 277 | + formatter_class=argparse.RawDescriptionHelpFormatter, |
| 278 | + description=__doc__, |
| 279 | + ) |
| 280 | + |
| 281 | + parser.add_argument( |
| 282 | + "-l", "--lockdown", type=bool, action="store_true", default=False, |
| 283 | + ) |
| 284 | + parser.add_argument( |
| 285 | + "-s", "--strict", type=argparse.FileType("r"), default=sys.stdin |
| 286 | + ) |
| 287 | + parser.add_argument( |
| 288 | + "-i", "--input", type=argparse.FileType("r"), default=sys.stdin |
| 289 | + ) |
| 290 | + parser.add_argument( |
| 291 | + "-o", "--output", type=argparse.FileType("w"), default=sys.stdout |
| 292 | + ) |
| 293 | + parser.add_argument("-n", "--name", help="Name of function to replace") |
| 294 | + return parser |
| 295 | + |
| 296 | + |
| 297 | +def main(): |
| 298 | + parser = make_parser() |
| 299 | + args = parser.parse_args() |
| 300 | + args.output.write( |
| 301 | + replace_function(args.input.read(), args.name, args.func.read()) + "\n" |
| 302 | + ) |
0 commit comments