|
32 | 32 | from model_signing.hashing import hashing
|
33 | 33 | from model_signing.manifest import manifest as manifest_module
|
34 | 34 | from model_signing.serialization import serialize_by_file
|
| 35 | +from model_signing.serialization import serialize_by_file_shard |
35 | 36 | from model_signing.signing import in_toto
|
36 | 37 |
|
37 | 38 |
|
@@ -143,3 +144,67 @@ def test_only_runs_on_expected_manifest_types(self):
|
143 | 144 | match="Only FileLevelManifest is supported",
|
144 | 145 | ):
|
145 | 146 | in_toto.DigestOfDigestsIntotoPayload.from_manifest(manifest)
|
| 147 | + |
| 148 | + |
| 149 | +class TestDigestOfShardDigestsIntotoPayload: |
| 150 | + |
| 151 | + def _hasher_factory( |
| 152 | + self, path: pathlib.Path, start: int, end: int |
| 153 | + ) -> file.ShardedFileHasher: |
| 154 | + return file.ShardedFileHasher( |
| 155 | + path, memory.SHA256(), start=start, end=end |
| 156 | + ) |
| 157 | + |
| 158 | + @pytest.mark.parametrize("model_fixture_name", test_support.all_test_models) |
| 159 | + def test_known_models(self, request, model_fixture_name): |
| 160 | + # Set up variables (arrange) |
| 161 | + testdata_path = request.path.parent / "testdata" |
| 162 | + test_path = testdata_path / "in_toto" |
| 163 | + test_class_path = test_path / "TestDigestOfShardDigestsIntotoPayload" |
| 164 | + golden_path = test_class_path / model_fixture_name |
| 165 | + should_update = request.config.getoption("update_goldens") |
| 166 | + model = request.getfixturevalue(model_fixture_name) |
| 167 | + |
| 168 | + # Compute payload (act) |
| 169 | + serializer = serialize_by_file_shard.ManifestSerializer( |
| 170 | + self._hasher_factory, allow_symlinks=True |
| 171 | + ) |
| 172 | + manifest = serializer.serialize(model) |
| 173 | + payload = in_toto.DigestOfShardDigestsIntotoPayload.from_manifest( |
| 174 | + manifest |
| 175 | + ) |
| 176 | + |
| 177 | + # Compare with golden, or write to golden (approximately "assert") |
| 178 | + if should_update: |
| 179 | + with open(golden_path, "w", encoding="utf-8") as f: |
| 180 | + f.write(f"{json_format.MessageToJson(payload.statement.pb)}\n") |
| 181 | + else: |
| 182 | + with open(golden_path, "r", encoding="utf-8") as f: |
| 183 | + json_contents = f.read() |
| 184 | + expected_proto = json_format.Parse( |
| 185 | + json_contents, statement_pb2.Statement() |
| 186 | + ) |
| 187 | + |
| 188 | + assert payload.statement.pb == expected_proto |
| 189 | + |
| 190 | + def test_produces_valid_statements(self, sample_model_folder): |
| 191 | + serializer = serialize_by_file_shard.ManifestSerializer( |
| 192 | + self._hasher_factory, allow_symlinks=True |
| 193 | + ) |
| 194 | + manifest = serializer.serialize(sample_model_folder) |
| 195 | + |
| 196 | + payload = in_toto.DigestOfShardDigestsIntotoPayload.from_manifest( |
| 197 | + manifest |
| 198 | + ) |
| 199 | + |
| 200 | + payload.statement.validate() |
| 201 | + |
| 202 | + def test_only_runs_on_expected_manifest_types(self): |
| 203 | + digest = hashing.Digest("test", b"test_digest") |
| 204 | + manifest = manifest_module.DigestManifest(digest) |
| 205 | + |
| 206 | + with pytest.raises( |
| 207 | + TypeError, |
| 208 | + match="Only ShardLevelManifest is supported", |
| 209 | + ): |
| 210 | + in_toto.DigestOfShardDigestsIntotoPayload.from_manifest(manifest) |
0 commit comments