From 71b771313e808505c0bb1e5d6206166eb33f9a27 Mon Sep 17 00:00:00 2001 From: "Martin K. Scherer" Date: Tue, 5 Apr 2016 16:02:39 +0200 Subject: [PATCH] Csv fixes Fixes #702 and several other issues. --- pyemma/coordinates/data/py_csv_reader.py | 264 ++++++++++++------ .../coordinates/data/util/traj_info_cache.py | 14 +- pyemma/coordinates/tests/test_csvreader.py | 63 ++++- requirements-build-doc.txt | 2 +- 4 files changed, 249 insertions(+), 94 deletions(-) diff --git a/pyemma/coordinates/data/py_csv_reader.py b/pyemma/coordinates/data/py_csv_reader.py index e77924204..7f4da7645 100644 --- a/pyemma/coordinates/data/py_csv_reader.py +++ b/pyemma/coordinates/data/py_csv_reader.py @@ -25,6 +25,9 @@ import csv import os +import six + +from pyemma._base.logging import Loggable from pyemma.coordinates.data._base.datasource import DataSourceIterator, DataSource from pyemma.coordinates.data.util.traj_info_cache import TrajInfo from six.moves import range @@ -71,6 +74,7 @@ def _next_chunk(self): lines.append(row) if self.chunksize != 0 and len(lines) % self.chunksize == 0: result = self._convert_to_np_chunk(lines) + del lines[:] # free some space if self._t >= traj_len: self._next_traj() return result @@ -102,20 +106,25 @@ def _next_traj(self): dialect=self._data_source._get_dialect(self._itraj)) def _convert_to_np_chunk(self, list_of_strings): + # filter empty strings + list_of_strings = list(filter(bool, list_of_strings)) stack_of_strings = np.vstack(list_of_strings) if self._custom_cols: stack_of_strings = stack_of_strings[:, self._custom_cols] - del list_of_strings[:] try: result = stack_of_strings.astype(float) except ValueError: - fn = self._data_source.filenames[self._itraj] + fn = self._file_handle.name + dialect_str = _dialect_to_str(self._reader.dialect) for idx, line in enumerate(list_of_strings): for value in line: try: float(value) except ValueError as ve: - s = "Invalid entry in file %s, line %s: %s" % (fn, self._t+idx, repr(ve)) + s = str("Invalid entry in file {fn}, line {line}: {error}." + " Used dialect to parse: {dialect}").format(fn=fn, line=self._t + idx, + error=repr(ve), + dialect=dialect_str) raise ValueError(s) self._t += len(list_of_strings) return result @@ -142,7 +151,7 @@ def _open_file(self): else: wanted_frames = np.arange(0, nt, self.stride) skip_rows = np.setdiff1d( - all_frames, wanted_frames, assume_unique=True) + all_frames, wanted_frames, assume_unique=True) self._skip_rows = skip_rows try: @@ -154,8 +163,27 @@ def _open_file(self): raise +def _dialect_to_str(dialect): + from io import StringIO + s = StringIO() + s.write("[CSVDialect ") + fields = str("delimiter='{delimiter}', lineterminator='{lineterminator}'," + " skipinitialspace={skipinitialspace}, quoting={quoting}," + " quotechar={quotechar}, doublequote={doublequote}]") + s.write(fields.format(delimiter=dialect.delimiter, + lineterminator=dialect.lineterminator, + skipinitialspace=dialect.skipinitialspace, + quoting=dialect.quoting, + quotechar=dialect.quotechar, + doublequote=dialect.doublequote)) + s.seek(0) + return str(s.read()) + + class PyCSVReader(DataSource): - r""" reads tabulated ASCII data + r""" Reader for tabulated ASCII data + + This class uses numpy to interpret string data to array data. Parameters ---------- @@ -174,7 +202,7 @@ class PyCSVReader(DataSource): comments: str, list of str or None, default='#' Lines starting with this char will be ignored, except for first line (header) - converters : dict, optional + converters : dict, optional (Not yet implemented) A dictionary mapping column number to a function that will convert that column to a float. E.g., if column 0 is a date string: ``converters = {0: datestr2num}``. Converters can also be used to @@ -185,16 +213,21 @@ class PyCSVReader(DataSource): ----- For reading files with only one column, one needs to specify a delimter... """ - DEFAULT_OPEN_MODE = 'r' # read, text, unified-newlines (always \n) + DEFAULT_OPEN_MODE = 'r' # read in text-mode def __init__(self, filenames, chunksize=1000, delimiters=None, comments='#', converters=None, **kwargs): super(PyCSVReader, self).__init__(chunksize=chunksize) self._is_reader = True - n = len(filenames) - self._comments = self.__parse_args(comments, '#', n) - self._delimiters = self.__parse_args(delimiters, None, n) + if isinstance(filenames, (tuple, list)): + n = len(filenames) + elif isinstance(filenames, six.string_types): + n = 1 + else: + raise TypeError("'filenames' argument has to be list, tuple or string") + self._comments = PyCSVReader.__parse_args(comments, '#', n) + self._delimiters = PyCSVReader.__parse_args(delimiters, None, n) self._converters = converters # mapping of boolean to indicate if file has an header and csv dialect @@ -205,13 +238,14 @@ def __init__(self, filenames, chunksize=1000, delimiters=None, comments='#', # invoke filename setter self.filenames = filenames - def __parse_args(self, arg, default, n): + @staticmethod + def __parse_args(arg, default, n): if arg is None: - return [default]*n + return [default] * n if isinstance(arg, (list, tuple)): assert len(arg) == n return arg - return [arg]*n + return [arg] * n def _create_iterator(self, skip=0, chunk=0, stride=1, return_trajindex=True, cols=None): return PyCSVIterator(self, skip=skip, chunk=chunk, stride=stride, @@ -224,84 +258,152 @@ def _get_dialect(self, itraj): def describe(self): return "[CSVReader files=%s]" % self._filenames - def _get_traj_info(self, filename): + def _determine_dialect(self, fh, length): + """ + Parameters + ---------- + fh : file handle + file handle for which the dialect should be determined. + + length : int + the previously obtained file length (from _get_file_info) + + Returns + ------- + dialect : csv.Dialect + an Dialect instance which holds things like delimiter etc. + length : int + length with respect to eventually found header (one or multiple lines) + + Notes + ----- + As a side effect this method sets the dialect for the given file handle in + self._dialects[idx] where idx = self.filenames.index(fh.name) + """ + filename = fh.name idx = self.filenames.index(filename) + fh.seek(0) + skip = 0 # rows to skip (for a eventually found header) + # auto detect delimiter with csv.Sniffer + if self._delimiters[idx] is None: + # use a sample of three lines + sample = ''.join(fh.readline() for _ in range(3)) + sniffer = csv.Sniffer() + try: + dialect = sniffer.sniff(sample) + except csv.Error as e: + s = ('During handling of file "%s" following error occurred:' + ' "%s". Sample was "%s"' % (filename, e, sample)) + raise RuntimeError(s) + if sniffer.has_header(sample): + skip += 1 + else: + sample = fh.readline() + fh.seek(0) + + class custom_dialect(csv.Dialect): + delimiter = self._delimiters[idx] + quotechar = '"' + if sample[-2] == '\r' and sample[-1] == '\n': + lineterminator = '\r\n' + else: + lineterminator = '\n' + quoting = csv.QUOTE_MINIMAL + + dialect = custom_dialect() + dialect.delimiter = self._delimiters[idx] + # determine header (multi-line) + hdr = False + for line in fh: + if line.startswith(self._comments[idx]): + hdr += 1 + continue + else: + break + + skip += hdr + length -= skip + + self._dialects[idx] = dialect + self._skip[idx] = skip + + return dialect, length, skip + + @staticmethod + def _calc_offsets(fh): + """ determines byte offsets between all lines + Parameters + ---------- + fh : file handle + file handle to obtain byte offsets from. + + Returns + ------- + lengths : int + number of valid (non-empty) lines + offsets : ndarray(dtype=int64) + byte offsets + """ def new_size(x): return int(ceil(x * 1.2)) - # how to handle mode? - """ - On Windows, tell() can return illegal values (after an fgets()) when - reading files with Unix-style line-endings. Use binary mode ('rb') to - circumvent this problem. - """ - with open(filename, self.DEFAULT_OPEN_MODE) as fh: - # approx by filesize / (first line + 20%) - size = new_size(os.stat(filename).st_size / len(fh.readline())) - assert size > 0 - fh.seek(0) - offsets = np.empty(size, dtype=np.int64) - offsets[0] = 0 - i = 1 + + filename = fh.name + fh.seek(0) + # approx by filesize / (first line + 20%) + fh.readline() # skip first line, because it may contain a much shorter header, which will give a bad estimate + size = new_size(os.stat(filename).st_size / len(fh.readline())) + offsets = np.empty(size, dtype=np.int64) + offsets[0] = 0 + i = 1 + # re-open in binary mode to circumvent a bug in Py3.5 win, where the first offset reported by tell + # overflows int64. + with open(filename, 'rb') as fh: while fh.readline(): offsets[i] = fh.tell() i += 1 if i >= len(offsets): offsets = np.resize(offsets, new_size(len(offsets))) - offsets = offsets[:i] - length = len(offsets) - 1 - fh.seek(0) + offsets = offsets[:i] + + # filter empty lines (offset between two lines is only 1 or 2 chars) + # insert an diff of 2 at the beginning to match the amount of indices + diff = np.diff(offsets) + mask = diff > 2 + mask = np.insert(mask, 0, True) + offsets = offsets[mask] + length = len(offsets) - 1 + + return length, offsets + + @staticmethod + def _get_dimension(fh, dialect, skip): + fh.seek(0) + # if we have a header subtract it from total length + r = csv.reader(fh, dialect=dialect) + for _ in range(skip + 1): + line = next(r) + + # obtain dimension from first valid row + try: + arr = np.array(line).astype(float) + except ValueError as ve: + s = 'could not parse first line of data in file "%s"' % fh.name + raise ValueError(s, ve) + s = arr.squeeze().shape + if len(s) == 1: + ndim = s[0] + else: + ndim = 1 + + return ndim - # auto detect delimiter with csv.Sniffer - if self._delimiters[idx] is None: - # determine delimiter - sample = fh.read(2048) - sniffer = csv.Sniffer() - try: - self._dialects[idx] = sniffer.sniff(sample) - except csv.Error as e: - s = ('During handling of file "%s" follwing error occured:' - ' "%s". Sample was "%s"' % (filename, e, sample)) - raise RuntimeError(s) - if sniffer.has_header(sample): - self._skip[idx] += 1 - length -= 1 - else: - class custom_dialect(csv.Dialect): - delimiter = self._delimiters[idx] - quotechar = '"' - # lets enforce \n because we use text mode with 'U' (unified newline) - lineterminator = '\n' - quoting = csv.QUOTE_MINIMAL - d = custom_dialect() - d.delimiter = self._delimiters[idx] - - # determine header - hdr = False - for line in fh: - if line.startswith(self._comments[idx]): - hdr += 1 - continue - - self._skip[idx] += hdr - length -= hdr - - self._dialects[idx] = d - # if we have a header subtract it from total length - fh.seek(0) - r = csv.reader(fh, dialect=self._dialects[idx]) - for _ in range(self._skip[idx]+1): - line = next(r) + def _get_traj_info(self, filename): + # calc byte offsets, csv dialect and dimension (elements in first valid row) - try: - arr = np.array(line).astype(float) - except ValueError as ve: - s = 'could not parse first line of data in file "%s"' % filename - raise ValueError(s, ve) - s = arr.squeeze().shape - if len(s) == 1: - ndim = s[0] - else: - ndim = 1 + with open(filename, self.DEFAULT_OPEN_MODE) as fh: + length, offsets = PyCSVReader._calc_offsets(fh) + dialect, length, skip = self._determine_dialect(fh, length) + ndim = PyCSVReader._get_dimension(fh, dialect, skip) return TrajInfo(ndim, length, offsets) diff --git a/pyemma/coordinates/data/util/traj_info_cache.py b/pyemma/coordinates/data/util/traj_info_cache.py index 5a7d70b83..c3ba6703f 100644 --- a/pyemma/coordinates/data/util/traj_info_cache.py +++ b/pyemma/coordinates/data/util/traj_info_cache.py @@ -187,6 +187,15 @@ def _set_curr_db_version(self, val): self._database['db_version'] = val self._current_db_version = val + def _handle_csv(self, reader, filename, length): + # this is maybe a bit ugly, but so far we do not store the dialect of csv files in + # the database, so we need to re-do this step in case of a cache hit. + from pyemma.coordinates.data import PyCSVReader + if not isinstance(reader, PyCSVReader): + return + with open(filename, PyCSVReader.DEFAULT_OPEN_MODE) as fh: + reader._determine_dialect(fh, length) + def __getitem__(self, filename_reader_tuple): filename, reader = filename_reader_tuple key = self._get_file_hash(filename) @@ -194,7 +203,10 @@ def __getitem__(self, filename_reader_tuple): try: result = str(self._database[key]) info = create_traj_info(result) - # handle cache misses and not interpreteable results by re-computation. + + self._handle_csv(reader, filename, info.length) + + # handle cache misses and not interpretable results by re-computation. # Note: this also handles UnknownDBFormatExceptions! except KeyError: info = reader._get_traj_info(filename) diff --git a/pyemma/coordinates/tests/test_csvreader.py b/pyemma/coordinates/tests/test_csvreader.py index dd573dd3f..a6e1e11dc 100644 --- a/pyemma/coordinates/tests/test_csvreader.py +++ b/pyemma/coordinates/tests/test_csvreader.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . - - ''' Created on 09.04.2015 @@ -34,6 +32,7 @@ class TestCSVReader(unittest.TestCase): + @classmethod def setUpClass(cls): cls.dir = tempfile.mkdtemp(prefix='pyemma_filereader') @@ -44,13 +43,12 @@ def setUpClass(cls): cls.filename2 = os.path.join(cls.dir, "data2.dat") np.savetxt(cls.filename1, cls.data) np.savetxt(cls.filename2, cls.data) - + cls.file_with_header = tempfile.mktemp(suffix=".dat", dir=cls.dir) cls.file_with_header2 = tempfile.mktemp(suffix=".dat", dir=cls.dir) - + np.savetxt(cls.file_with_header, cls.data, header="x y z") np.savetxt(cls.file_with_header2, cls.data, header="x y z") - return cls @classmethod @@ -107,9 +105,13 @@ def test_read_2file_with_header(self): def test_read_with_skipping_first_few_couple_lines(self): for skip in [0, 3, 13]: + # FIXME: opening the same file twice is not being liked by py27 r1 = CSVReader(self.filename1, chunksize=30) out_with_skip = r1.get_output(skip=skip)[0] + assert len(out_with_skip) == len(self.data[skip:]) r2 = CSVReader(self.filename1, chunksize=30) + self.maxDiff=None + #self.assertDictEqual(r1.__dict__, r2.__dict__) out = r2.get_output()[0] np.testing.assert_almost_equal(out_with_skip, out[skip::], err_msg="The first %s rows were skipped, but that did not " @@ -228,8 +230,8 @@ def test_with_stride_and_lag_with_header(self): def test_compare_readline(self): data = np.arange(99*3).reshape(-1, 3) - fn = tempfile.mktemp() - try: + with tempfile.NamedTemporaryFile(delete=False) as f: + fn = f.name np.savetxt(fn, data) # calc offsets reader = CSVReader(fn) @@ -240,16 +242,15 @@ def test_compare_readline(self): while fh2.readline(): offset.append(fh2.tell()) fh2.seek(0) + np.testing.assert_equal(trajinfo.offsets, offset) for ii, off in enumerate(trajinfo.offsets): fh2.seek(off) line = fh2.readline() fh2.seek(offset[ii]) line2 = fh2.readline() + self.assertEqual(line, line2, "differs at offset %i (%s != %s)" % (ii, off, offset[ii])) - finally: - os.unlink(fn) - - + def test_use_cols(self): reader = CSVReader(self.filename1) cols = (0, 2) @@ -257,5 +258,45 @@ def test_use_cols(self): for x in it: np.testing.assert_equal(x, self.data[:, cols]) + def test_newline_at_eof(self): + x = "1 2 3\n4 5 6\n\n" + desired = np.fromstring(x, sep=" ", dtype=np.float32).reshape(-1, 3) + assert len(desired) == 2 + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write(x) + f.close() + reader = CSVReader(f.name) + result = reader.get_output()[0] + np.testing.assert_allclose(result, desired) + + def test_newline_at_eof_with_header(self): + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("#x y z\n1 2 3\n4 5 6\n\n") + f.close() + desired = np.genfromtxt(f.name, dtype=np.float32).reshape(-1, 3) + reader = CSVReader(f.name) + result = reader.get_output()[0] + np.testing.assert_allclose(result, desired) + + def test_newline_at_eof_carriage_return(self): + x = "1 2 3\r\n4 5 6\r\n" + desired = np.fromstring(x, sep=" ", dtype=np.float32).reshape(-1, 3) + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write(x) + f.close() + reader = CSVReader(f.name) + result = reader.get_output()[0] + np.testing.assert_allclose(result, desired) + + def test_holes_in_file(self): + x = "1 2 3\n4 5 6\n7 8 9" + desired = np.fromstring(x, sep=" ", dtype=np.float32).reshape(-1, 3) + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write(x) + f.close() + reader = CSVReader(f.name) + result = reader.get_output()[0] + np.testing.assert_allclose(result, desired) + if __name__ == '__main__': unittest.main() diff --git a/requirements-build-doc.txt b/requirements-build-doc.txt index c565a2319..0a52abd04 100644 --- a/requirements-build-doc.txt +++ b/requirements-build-doc.txt @@ -1,6 +1,6 @@ # additional for building the sphinx doc and ipython notebook rst conversion matplotlib>=1.3.1 --e git://github.com/sphinx-doc/sphinx@stable#egg=sphinx +sphinx>=1.3.5 ipython>=2.1.0 # sphinx_rtd_theme from git