Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round trip tests #7

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/__init__.py
Empty file.
Binary file added tests/data/vcf/sample.vcf.gz
Binary file not shown.
Binary file added tests/data/vcf/sample.vcf.gz.tbi
Binary file not shown.
23 changes: 23 additions & 0 deletions tests/test_vcf_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest

from bio2zarr import vcf2zarr
from vcztools.vcf_writer import write_vcf
from .utils import assert_vcfs_close


@pytest.mark.parametrize(
"vcf_file", ["sample.vcf.gz"]
)
def test_vcf_to_zarr_to_vcf__real_files(shared_datadir, tmp_path, vcf_file):
path = shared_datadir / "vcf" / vcf_file
intermediate_icf = tmp_path.joinpath("intermediate.icf")
intermediate_vcz = tmp_path.joinpath("intermediate.vcz")
output = tmp_path.joinpath("output.vcf")

vcf2zarr.convert(
[path], intermediate_vcz, icf_path=intermediate_icf, worker_processes=0
)

write_vcf(intermediate_vcz, output)

assert_vcfs_close(path, output)
105 changes: 105 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from contextlib import contextmanager
from itertools import zip_longest
from typing import Iterator

import cyvcf2
import numpy as np


@contextmanager
def open_vcf(path) -> Iterator[cyvcf2.VCF]:
"""A context manager for opening a VCF file."""
vcf = cyvcf2.VCF(path)
try:
yield vcf
finally:
vcf.close()


def assert_vcfs_close(f1, f2, *, rtol=1e-05, atol=1e-03):
"""Like :py:func:`numpy.testing.assert_allclose()`, but for VCF files.

Raises an `AssertionError` if two VCF files are not equal to one another.
Float values in QUAL, INFO, or FORMAT fields are compared up to the
desired tolerance. All other values must match exactly.

Parameters
----------
f1
Path to first VCF to compare.
f2
Path to second VCF to compare.
rtol
Relative tolerance.
atol
Absolute tolerance.
"""
with open_vcf(f1) as vcf1, open_vcf(f2) as vcf2:
assert vcf1.raw_header == vcf2.raw_header
assert vcf1.samples == vcf2.samples

for v1, v2 in zip_longest(vcf1, vcf2):
if v1 is None and v2 is not None:
raise AssertionError(f"Right contains extra variant: {v2}")
if v1 is not None and v2 is None:
raise AssertionError(f"Left contains extra variant: {v1}")

assert v1.CHROM == v2.CHROM, f"CHROM not equal for variants\n{v1}{v2}"
assert v1.POS == v2.POS, f"POS not equal for variants\n{v1}{v2}"
assert v1.ID == v2.ID, f"ID not equal for variants\n{v1}{v2}"
assert v1.REF == v2.REF, f"REF not equal for variants\n{v1}{v2}"
assert v1.ALT == v2.ALT, f"ALT not equal for variants\n{v1}{v2}"
np.testing.assert_allclose(
np.array(v1.QUAL, dtype=np.float32),
np.array(v2.QUAL, dtype=np.float32),
rtol=rtol,
atol=atol,
err_msg=f"QUAL not equal for variants\n{v1}{v2}",
)
assert set(v1.FILTERS) == set(
v2.FILTERS
), f"FILTER not equal for variants\n{v1}{v2}"

assert (
dict(v1.INFO).keys() == dict(v2.INFO).keys()
), f"INFO keys not equal for variants\n{v1}{v2}"
for k in dict(v1.INFO).keys():
# values are python objects (not np arrays)
val1 = v1.INFO[k]
val2 = v2.INFO[k]
if isinstance(val1, float) or (
isinstance(val1, tuple) and any(isinstance(v, float) for v in val1)
):
np.testing.assert_allclose(
np.array(val1, dtype=np.float32),
np.array(val2, dtype=np.float32),
rtol=rtol,
atol=atol,
err_msg=f"INFO {k} not equal for variants\n{v1}{v2}",
)
else:
assert val1 == val2, f"INFO {k} not equal for variants\n{v1}{v2}"

assert v1.FORMAT == v2.FORMAT, f"FORMAT not equal for variants\n{v1}{v2}"
for field in v1.FORMAT:
if field == "GT":
assert (
v1.genotypes == v2.genotypes
), f"GT not equal for variants\n{v1}{v2}"
else:
val1 = v1.format(field)
val2 = v2.format(field)
if val1.dtype.kind == "f":
np.testing.assert_allclose(
val1,
val2,
rtol=rtol,
atol=atol,
err_msg=f"FORMAT {field} not equal for variants\n{v1}{v2}",
)
else:
np.testing.assert_array_equal(
val1,
val2,
err_msg=f"FORMAT {field} not equal for variants\n{v1}{v2}",
)
2 changes: 0 additions & 2 deletions vcztools/vcf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
vcf_values_to_byte_buf,
vcf_values_to_byte_buf_size,
)
from sgkit.model import get_contigs, get_filters
from sgkit.typing import PathType

# references to the VCF spec are for https://samtools.github.io/hts-specs/VCFv4.3.pdf

Expand Down