diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io.py b/gcp_variant_transforms/beam_io/vcf_file_size_io.py new file mode 100644 index 000000000..ef093dcd2 --- /dev/null +++ b/gcp_variant_transforms/beam_io/vcf_file_size_io.py @@ -0,0 +1,217 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A source for estimating the size of VCF files when processed by vcf_to_bq.""" + +from __future__ import absolute_import + +from typing import Iterable, List, Tuple # pylint: disable=unused-import +import logging +import itertools + +import apache_beam as beam +from apache_beam import coders +from apache_beam import transforms +from apache_beam.io import filebasedsource +from apache_beam.io import filesystem +from apache_beam.io import filesystems +from apache_beam.io import iobase +from apache_beam.io import range_trackers # pylint: disable=unused-import + +from gcp_variant_transforms.beam_io import vcfio + + +# Number of lines from each VCF that should be read when estimating disk usage. +SNIPPET_READ_SIZE = 50 + +def _get_file_size(file_name): + # type: (str) -> List[FileSizeInfo] + matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list + if len(matched_files) != 1: + raise IOError("File name {} did not correspond to exactly 1 result. " + "Instead, got {} matches.".format(file_name, + len(matched_files))) + file_metadata = matched_files[0] + + compression_type = filesystem.CompressionTypes.detect_compression_type( + file_metadata.path) + if compression_type != filesystem.CompressionTypes.UNCOMPRESSED: + logging.error("VCF file %s is compressed; disk requirement estimator " + "will not be accurate.", file_metadata.path) + return file_metadata.size_in_bytes + + +class FileSizeInfo(object): + def __init__(self, raw_size, encoded_size=None, name="[no filename]"): + # type: (int, int, str) -> None + self.raw_size = raw_size + self.encoded_size = encoded_size # Allow direct initialization + self.name = name + + def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): + # type: (int, int) -> None + """Estimate encoded file size, given the sizes for the raw file, sample raw + lines and sample encoded lines. + """ + if raw_sample_size == 0: + # Propagate in-band error state to avoid divide-by-zero. + logging.warning("File %s appears to have no valid Variant lines. File " + "will be ignored for size estimation.", self.name) + self.encoded_size = 0 + self.raw_size = 0 + else: + self.encoded_size = (self.raw_size * encoded_sample_size / + raw_sample_size) + + +class FileSizeInfoSumFn(beam.CombineFn): + """Combiner Function, used to sum up the size fields of FileSizeInfo objects. + """ + def create_accumulator(self): + # type: (None) -> Tuple[int, int] + return (0, 0) # (raw, encoded) sums + + def add_input(self, (raw, encoded), file_size_info): + # type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int] + return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size + + def merge_accumulators(self, accumulators): + # type: (Iterable[Tuple[int, int]]) -> Tuple[int, int] + raw, encoded = zip(*accumulators) + return sum(raw), sum(encoded) + + def extract_output(self, (raw, encoded)): + # type: (Tuple[int, int]) -> FileSizeInfo + return FileSizeInfo(raw, encoded) + + +class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): + """A source for estimating the encoded size of a VCF file in `vcf_to_bq`. + + This source first obtains the raw file sizes of a set of VCF files. Then, + the source reads a limited number of variants from a set of VCF files, + both as raw strings and encoded `Variant` objects. Finally, the reader + returns a single `FileSizeInfo` object with an estimate of the input size + if all sizes had been encoded as `Variant` objects. + + Lines that are malformed are skipped. + """ + + DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB + + def __init__(self, + file_pattern, + sample_size, + compression_type=filesystem.CompressionTypes.AUTO, + validate=True, + vcf_parser_type=vcfio.VcfParserType.PYVCF): + # type: (str, int, str, bool, vcfio.VcfParserType) -> None + super(_EstimateVcfSizeSource, self).__init__( + file_pattern, + compression_type=compression_type, + validate=validate, + splittable=False) + self._compression_type = compression_type + self._sample_size = sample_size + self._vcf_parser_type = vcf_parser_type + + def read_records( + self, + file_name, # type: str + range_tracker # type: range_trackers.UnsplittableRangeTracker + ): + # type: (...) -> Iterable[FileSizeInfo] + """This "generator" only emits a single FileSizeInfo object per file.""" + vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type) + record_iterator = vcf_parser_class( + file_name, + range_tracker, + self._compression_type, + allow_malformed_records=True, + file_pattern=self._pattern, + representative_header_lines=None, + buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE, + skip_header_lines=0) + + raw_file_size = _get_file_size(file_name) + + # Open distinct channel to read lines as raw bytestrings. + with filesystems.FileSystems.open(file_name, + self._compression_type) as raw_iterator: + count, raw_size, encoded_size = 0, 0, 0 + for encoded_record, raw_record in itertools.izip(record_iterator, + raw_iterator): + while raw_record and raw_record.startswith('#'): + # Skip headers. Assume that header size is negligible. + raw_record = raw_iterator.next() + logging.debug( + "Reading record for disk usage estimation. Encoded variant: %s\n" + "Raw variant: %s", encoded_record, raw_record) + if count >= self._sample_size: + break + if not isinstance(encoded_record, vcfio.Variant): + logging.error( + "Skipping VCF line that could not be decoded as a " + "`vcfio.Variant` in file %s: %s", file_name, raw_record) + continue + # Encoding in `utf-8` should represent the string as one byte per char, + # even for non-ASCII chars. Python adds significant overhead to the + # bytesize of the full str object. + raw_size += len(raw_record.encode('utf-8')) + encoded_size += coders.registry.get_coder(vcfio.Variant).estimate_size( + encoded_record) + count += 1 + + file_size_info = FileSizeInfo(raw_file_size, name=file_name) + file_size_info.estimate_encoded_file_size(raw_size, encoded_size) + yield file_size_info + + +class EstimateVcfSize(transforms.PTransform): + """PTransform estimating encoded size of VCFs without reading whole files. + + Output is a PCollection with a single FileSizeInfo object representing the + aggregate encoded size estimate. + """ + + def __init__( + self, + file_pattern, # type: str + sample_size, # type: int + compression_type=filesystem.CompressionTypes.AUTO, # type: str + validate=True, # type: bool + **kwargs # type: **str + ): + # type: (...) -> None + """Initialize the :class:`ReadVcfHeaders` transform. + + Args: + file_pattern: The file path to read from either as a single file or a glob + pattern. + sample_size: The number of lines that should be read from the file. + compression_type: Used to handle compressed input files. + Typical value is :attr:`CompressionTypes.AUTO + `, in which case the + underlying file_path's extension will be used to detect the compression. + validate: Flag to verify that the files exist during the pipeline creation + time. + """ + super(EstimateVcfSize, self).__init__(**kwargs) + self._source = _EstimateVcfSizeSource( + file_pattern, sample_size, compression_type, validate=validate) + + def expand(self, pvalue): + return (pvalue.pipeline + | iobase.Read(self._source) + | beam.CombineGlobally(FileSizeInfoSumFn())) diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io_test.py b/gcp_variant_transforms/beam_io/vcf_file_size_io_test.py new file mode 100644 index 000000000..e69de29bb diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 21a87e923..ccd4b027f 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -19,19 +19,19 @@ from __future__ import absolute_import -from typing import Any, Iterable, List, Tuple # pylint: disable=unused-import +from typing import Any, Iterable, List, Tuple, Type # pylint: disable=unused-import from functools import partial import enum import apache_beam as beam +from apache_beam import transforms from apache_beam.coders import coders from apache_beam.io import filebasedsource +from apache_beam.io import filesystem from apache_beam.io import filesystems +from apache_beam.io import iobase from apache_beam.io import range_trackers # pylint: disable=unused-import from apache_beam.io import textio -from apache_beam.io.filesystem import CompressionTypes -from apache_beam.io.iobase import Read -from apache_beam.transforms import PTransform from gcp_variant_transforms.beam_io import bgzf_io from gcp_variant_transforms.beam_io import vcf_parser @@ -56,6 +56,16 @@ class VcfParserType(enum.Enum): PYVCF = 0 NUCLEUS = 1 +def get_vcf_parser(vcf_parser_type): + # type: (VcfParserType) -> Type[vcf_parser.VcfParser] + if vcf_parser_type == VcfParserType.PYVCF: + return vcf_parser.PyVcfParser + elif vcf_parser_type == VcfParserType.NUCLEUS: + return vcf_parser.NucleusParser + else: + raise ValueError( + 'Unrecognized _vcf_parser_type: %s.' % str(vcf_parser_type)) + class _ToVcfRecordCoder(coders.Coder): """Coder for encoding :class:`Variant` objects as VCF text lines.""" @@ -193,7 +203,7 @@ class _VcfSource(filebasedsource.FileBasedSource): def __init__(self, file_pattern, # type: str representative_header_lines=None, # type: List[str] - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int validate=True, # type: bool allow_malformed_records=False, # type: bool @@ -214,14 +224,7 @@ def read_records(self, range_tracker # type: range_trackers.OffsetRangeTracker ): # type: (...) -> Iterable[MalformedVcfRecord] - vcf_parser_class = None - if self._vcf_parser_type == VcfParserType.PYVCF: - vcf_parser_class = vcf_parser.PyVcfParser - elif self._vcf_parser_type == VcfParserType.NUCLEUS: - vcf_parser_class = vcf_parser.NucleusParser - else: - raise ValueError( - 'Unrecognized _vcf_parser_type: %s.' % str(self._vcf_parser_type)) + vcf_parser_class = get_vcf_parser(self._vcf_parser_type) record_iterator = vcf_parser_class( file_name, range_tracker, @@ -280,7 +283,7 @@ def expand(self, pcoll): | 'ReadBlock' >> beam.ParDo(self._read_records)) -class ReadFromVcf(PTransform): +class ReadFromVcf(transforms.PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading VCF files. @@ -294,7 +297,7 @@ def __init__( self, file_pattern=None, # type: str representative_header_lines=None, # type: List[str] - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str validate=True, # type: bool allow_malformed_records=False, # type: bool vcf_parser_type=VcfParserType.PYVCF, # type: int @@ -325,7 +328,7 @@ def __init__( vcf_parser_type=vcf_parser_type) def expand(self, pvalue): - return pvalue.pipeline | Read(self._source) + return pvalue.pipeline | iobase.Read(self._source) def _create_vcf_source( @@ -337,7 +340,7 @@ def _create_vcf_source( allow_malformed_records=allow_malformed_records) -class ReadAllFromVcf(PTransform): +class ReadAllFromVcf(transforms.PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading a :class:`~apache_beam.pvalue.PCollection` of VCF files. @@ -355,7 +358,7 @@ def __init__( self, representative_header_lines=None, # type: List[str] desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, # type: int - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str allow_malformed_records=False, # type: bool **kwargs # type: **str ): @@ -384,7 +387,7 @@ def __init__( allow_malformed_records=allow_malformed_records) self._read_all_files = filebasedsource.ReadAllFiles( True, # splittable - CompressionTypes.AUTO, desired_bundle_size, + filesystem.CompressionTypes.AUTO, desired_bundle_size, 0, # min_bundle_size source_from_file) @@ -392,13 +395,13 @@ def expand(self, pvalue): return pvalue | 'ReadAllFiles' >> self._read_all_files -class WriteToVcf(PTransform): +class WriteToVcf(transforms.PTransform): """A PTransform for writing to VCF files.""" def __init__(self, file_path, num_shards=1, - compression_type=CompressionTypes.AUTO, + compression_type=filesystem.CompressionTypes.AUTO, headers=None): # type: (str, int, str, List[str]) -> None """Initialize a WriteToVcf PTransform. @@ -449,7 +452,7 @@ def process(self, (file_path, variants), *args, **kwargs): file_to_write.write(self._coder.encode(variant)) -class WriteVcfDataLines(PTransform): +class WriteVcfDataLines(transforms.PTransform): """A PTransform for writing VCF data lines. This PTransform takes PCollection<`file_path`, `variants`> as input, and diff --git a/gcp_variant_transforms/libs/preprocess_reporter.py b/gcp_variant_transforms/libs/preprocess_reporter.py index 7870647d4..94bf2036d 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter.py +++ b/gcp_variant_transforms/libs/preprocess_reporter.py @@ -26,6 +26,8 @@ TODO(allieychen): Eventually, it also contains the resource estimation. Output example (assuming opening in spreedsheet): +Estimated disk usage by Dataflow: 4846.0 GB +Total raw file sizes: 1231.0 GB Header Conflicts ID Category Conflicts File Paths Proposed Resolution NS INFO num=1 type=Float file1 num=1 type=Float @@ -43,13 +45,16 @@ File Path Variant Record Error Message file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10. """ +import logging +import math from typing import Dict, List, Optional, Union # pylint: disable=unused-import from apache_beam.io import filesystems from gcp_variant_transforms.beam_io import vcfio # pylint: disable=unused-import -from gcp_variant_transforms.beam_io import vcf_header_io +from gcp_variant_transforms.beam_io import vcf_header_io # pylint: disable=unused-import +from gcp_variant_transforms.beam_io import vcf_file_size_io # pylint: disable=unused-import from gcp_variant_transforms.libs import vcf_header_definitions_merger # pylint: disable=unused-import # An alias for the header key constants to make referencing easier. @@ -78,6 +83,7 @@ class _HeaderLine(object): def generate_report( header_definitions, # type: _VcfHeaderDefinitions file_path, # type: str + disk_usage_estimate, # type: vcf_file_size_io.FileSizeInfo resolved_headers=None, # type: vcf_header_io.VcfHeader inferred_headers=None, # type: vcf_header_io.VcfHeader malformed_records=None # type: List[vcfio.MalformedVcfRecord] @@ -89,6 +95,9 @@ def generate_report( header_definitions: The container which contains all header definitions and the corresponding file names. file_path: The location where the report is saved. + disk_usage_estimate: `FileSizeInfo` with metadata about the input files' + sizes in both raw and Beam-encoded formats. Can be set to `None` if no + estimate was made. resolved_headers: The `VcfHeader` that provides the resolutions for the fields that have conflicting definitions. inferred_headers: The `VcfHeader` that contains the inferred header @@ -98,6 +107,7 @@ def generate_report( """ resolved_headers = resolved_headers or vcf_header_io.VcfHeader() with filesystems.FileSystems.create(file_path) as file_to_write: + _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate) _append_conflicting_headers_to_report(file_to_write, header_definitions, resolved_headers) _append_inferred_headers_to_report(file_to_write, inferred_headers) @@ -276,6 +286,19 @@ def _format_definition(num_value, type_value): return ' '.join(formatted_definition) +def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate): + # type: (file, vcf_file_size_io.FileSizeInfo) -> None + if disk_usage_estimate is None: + return + logging.info("Final estimate of encoded size: %d GB", + disk_usage_estimate.encoded_size / 1e9) + file_to_write.write( + 'Estimated disk usage by Dataflow: {} GB\n' + 'Total raw file sizes: {} GB\n'.format( + int(math.ceil(disk_usage_estimate.encoded_size / 1e9)), + int(math.ceil(disk_usage_estimate.raw_size / 1e9)))) + + def _append_to_report(file_to_write, error_type, header, contents): # type: (file, str, str, List[str]) -> None """Appends the contents to `file_to_write`. diff --git a/gcp_variant_transforms/libs/preprocess_reporter_test.py b/gcp_variant_transforms/libs/preprocess_reporter_test.py index f113bf52d..7ba52d6c0 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter_test.py +++ b/gcp_variant_transforms/libs/preprocess_reporter_test.py @@ -18,10 +18,11 @@ from typing import List # pylint: disable=unused-import import unittest -from apache_beam.io.filesystems import FileSystems +from apache_beam.io import filesystems from vcf.parser import _Format as Format from vcf.parser import _Info as Info +from gcp_variant_transforms.beam_io import vcf_file_size_io from gcp_variant_transforms.beam_io import vcfio from gcp_variant_transforms.beam_io.vcf_header_io import VcfHeader from gcp_variant_transforms.libs import preprocess_reporter @@ -39,18 +40,21 @@ def _generate_report_and_assert_contents_equal( header_definitions, # type: VcfHeaderDefinitions resolved_headers=None, # type: VcfHeader inferred_headers=None, # type: VcfHeader - malformed_records=None # type: List[vcfio.MalformedVcfRecord] + malformed_records=None, # type: List[vcfio.MalformedVcfRecord] + disk_usage_estimate=None, # type: vcf_file_size_io.FileSizeInfo ): # type: (...) -> None with temp_dir.TempDir() as tempdir: - file_path = FileSystems.join(tempdir.get_path(), - PreprocessReporterTest._REPORT_NAME) + file_path = filesystems.FileSystems.join( + tempdir.get_path(), + PreprocessReporterTest._REPORT_NAME) preprocess_reporter.generate_report(header_definitions, file_path, + disk_usage_estimate, resolved_headers, inferred_headers, malformed_records) - with FileSystems.open(file_path) as f: + with filesystems.FileSystems.open(file_path) as f: reader = f.readlines() self.assertEqual(reader, expected_content) @@ -69,6 +73,29 @@ def test_report_no_conflicts(self): header_definitions, resolved_headers) + def test_report_with_disk_estimate(self): + header_definitions = VcfHeaderDefinitions() + header_definitions._infos = {'NS': {Definition(1, 'Float'): ['file1']}} + header_definitions._formats = {'NS': {Definition(1, 'Float'): ['file2']}} + + infos = OrderedDict([ + ('NS', Info('NS', 1, 'Integer', 'Number samples', None, None))]) + formats = OrderedDict([('NS', Format('NS', 1, 'Float', 'Number samples'))]) + resolved_headers = VcfHeader(infos=infos, formats=formats) + file_size_info = vcf_file_size_io.FileSizeInfo( + raw_size=int(1e10), + encoded_size=int(2e10)) + + expected = ['Estimated disk usage by Dataflow: 20 GB\n', + 'Total raw file sizes: 10 GB\n', + 'No Header Conflicts Found.\n', + '\n'] + self._generate_report_and_assert_contents_equal( + expected, + header_definitions, + resolved_headers, + disk_usage_estimate=file_size_info) + def test_report_conflicts(self): header_definitions = VcfHeaderDefinitions() header_definitions._infos = {'NS': {Definition(1, 'Integer'): ['file1'], diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index dd6314244..f7499730b 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -478,6 +478,13 @@ def add_arguments(self, parser): help=('The full path of the resolved headers. The file will not be' 'generated if unspecified. Otherwise, please provide a local ' 'path if run locally, or a cloud path if run on Dataflow.')) + parser.add_argument( + '--estimate_disk_usage', + type='bool', default=False, nargs='?', const=True, + help=('By default, disk resource usage will not be estimated.' + 'If true, the preprocessor will estimate the maximum disk usage ' + 'consumed at any step in the pipeline, which could lead to ' + 'out-of-disk errors at a shuffle step e.g. MergeVariants.')) def validate(self, parsed_args): _validate_inputs(parsed_args) diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 478fac13e..39332fd76 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -56,6 +56,7 @@ from apache_beam.options import pipeline_options from gcp_variant_transforms import pipeline_common +from gcp_variant_transforms.beam_io import vcf_file_size_io from gcp_variant_transforms.libs import preprocess_reporter from gcp_variant_transforms.options import variant_transform_options from gcp_variant_transforms.transforms import filter_variants @@ -65,14 +66,13 @@ _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] - def _get_inferred_headers(variants, # type: pvalue.PCollection merged_header # type: pvalue.PCollection ): # type: (...) -> (pvalue.PCollection, pvalue.PCollection) inferred_headers = (variants | 'FilterVariants' >> filter_variants.FilterVariants() - | ' InferHeaderFields' >> + | 'InferHeaderFields' >> infer_headers.InferHeaderFields( pvalue.AsSingleton(merged_header), allow_incompatible_records=True, @@ -102,6 +102,15 @@ def run(argv=None): merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) + + disk_usage_estimate = None + if known_args.estimate_disk_usage: + # TODO(hanjohn): Add an e2e test + # TODO(hanjohn): Add support for `ReadAll` pattern for inputs with very + # large numbers of files. + disk_usage_estimate = beam.pvalue.AsSingleton( + p | 'SampleAndEstimateFileSize' >> vcf_file_size_io.EstimateVcfSize( + known_args.input_pattern, vcf_file_size_io.SNIPPET_READ_SIZE)) if known_args.report_all_conflicts: variants = pipeline_common.read_variants(p, all_patterns, @@ -114,6 +123,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers), beam.pvalue.AsSingleton(inferred_headers), beam.pvalue.AsIter(malformed_records))) @@ -122,6 +132,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers))) if known_args.resolved_headers_path: