Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Commit

Permalink
Csv fixes
Browse files Browse the repository at this point in the history
Fixes #702 and several other issues.
  • Loading branch information
marscher committed Apr 5, 2016
1 parent 3fccfa7 commit 71b7713
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 94 deletions.
264 changes: 183 additions & 81 deletions pyemma/coordinates/data/py_csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import csv
import os

import six

from pyemma._base.logging import Loggable
from pyemma.coordinates.data._base.datasource import DataSourceIterator, DataSource
from pyemma.coordinates.data.util.traj_info_cache import TrajInfo
from six.moves import range
Expand Down Expand Up @@ -71,6 +74,7 @@ def _next_chunk(self):
lines.append(row)
if self.chunksize != 0 and len(lines) % self.chunksize == 0:
result = self._convert_to_np_chunk(lines)
del lines[:] # free some space
if self._t >= traj_len:
self._next_traj()
return result
Expand Down Expand Up @@ -102,20 +106,25 @@ def _next_traj(self):
dialect=self._data_source._get_dialect(self._itraj))

def _convert_to_np_chunk(self, list_of_strings):
# filter empty strings
list_of_strings = list(filter(bool, list_of_strings))
stack_of_strings = np.vstack(list_of_strings)
if self._custom_cols:
stack_of_strings = stack_of_strings[:, self._custom_cols]
del list_of_strings[:]
try:
result = stack_of_strings.astype(float)
except ValueError:
fn = self._data_source.filenames[self._itraj]
fn = self._file_handle.name
dialect_str = _dialect_to_str(self._reader.dialect)
for idx, line in enumerate(list_of_strings):
for value in line:
try:
float(value)
except ValueError as ve:
s = "Invalid entry in file %s, line %s: %s" % (fn, self._t+idx, repr(ve))
s = str("Invalid entry in file {fn}, line {line}: {error}."
" Used dialect to parse: {dialect}").format(fn=fn, line=self._t + idx,
error=repr(ve),
dialect=dialect_str)
raise ValueError(s)
self._t += len(list_of_strings)
return result
Expand All @@ -142,7 +151,7 @@ def _open_file(self):
else:
wanted_frames = np.arange(0, nt, self.stride)
skip_rows = np.setdiff1d(
all_frames, wanted_frames, assume_unique=True)
all_frames, wanted_frames, assume_unique=True)

self._skip_rows = skip_rows
try:
Expand All @@ -154,8 +163,27 @@ def _open_file(self):
raise


def _dialect_to_str(dialect):
from io import StringIO
s = StringIO()
s.write("[CSVDialect ")
fields = str("delimiter='{delimiter}', lineterminator='{lineterminator}',"
" skipinitialspace={skipinitialspace}, quoting={quoting},"
" quotechar={quotechar}, doublequote={doublequote}]")
s.write(fields.format(delimiter=dialect.delimiter,
lineterminator=dialect.lineterminator,
skipinitialspace=dialect.skipinitialspace,
quoting=dialect.quoting,
quotechar=dialect.quotechar,
doublequote=dialect.doublequote))
s.seek(0)
return str(s.read())


class PyCSVReader(DataSource):
r""" reads tabulated ASCII data
r""" Reader for tabulated ASCII data
This class uses numpy to interpret string data to array data.
Parameters
----------
Expand All @@ -174,7 +202,7 @@ class PyCSVReader(DataSource):
comments: str, list of str or None, default='#'
Lines starting with this char will be ignored, except for first line (header)
converters : dict, optional
converters : dict, optional (Not yet implemented)
A dictionary mapping column number to a function that will convert
that column to a float. E.g., if column 0 is a date string:
``converters = {0: datestr2num}``. Converters can also be used to
Expand All @@ -185,16 +213,21 @@ class PyCSVReader(DataSource):
-----
For reading files with only one column, one needs to specify a delimter...
"""
DEFAULT_OPEN_MODE = 'r' # read, text, unified-newlines (always \n)
DEFAULT_OPEN_MODE = 'r' # read in text-mode

def __init__(self, filenames, chunksize=1000, delimiters=None, comments='#',
converters=None, **kwargs):
super(PyCSVReader, self).__init__(chunksize=chunksize)
self._is_reader = True

n = len(filenames)
self._comments = self.__parse_args(comments, '#', n)
self._delimiters = self.__parse_args(delimiters, None, n)
if isinstance(filenames, (tuple, list)):
n = len(filenames)
elif isinstance(filenames, six.string_types):
n = 1
else:
raise TypeError("'filenames' argument has to be list, tuple or string")
self._comments = PyCSVReader.__parse_args(comments, '#', n)
self._delimiters = PyCSVReader.__parse_args(delimiters, None, n)
self._converters = converters

# mapping of boolean to indicate if file has an header and csv dialect
Expand All @@ -205,13 +238,14 @@ def __init__(self, filenames, chunksize=1000, delimiters=None, comments='#',
# invoke filename setter
self.filenames = filenames

def __parse_args(self, arg, default, n):
@staticmethod
def __parse_args(arg, default, n):
if arg is None:
return [default]*n
return [default] * n
if isinstance(arg, (list, tuple)):
assert len(arg) == n
return arg
return [arg]*n
return [arg] * n

def _create_iterator(self, skip=0, chunk=0, stride=1, return_trajindex=True, cols=None):
return PyCSVIterator(self, skip=skip, chunk=chunk, stride=stride,
Expand All @@ -224,84 +258,152 @@ def _get_dialect(self, itraj):
def describe(self):
return "[CSVReader files=%s]" % self._filenames

def _get_traj_info(self, filename):
def _determine_dialect(self, fh, length):
"""
Parameters
----------
fh : file handle
file handle for which the dialect should be determined.
length : int
the previously obtained file length (from _get_file_info)
Returns
-------
dialect : csv.Dialect
an Dialect instance which holds things like delimiter etc.
length : int
length with respect to eventually found header (one or multiple lines)
Notes
-----
As a side effect this method sets the dialect for the given file handle in
self._dialects[idx] where idx = self.filenames.index(fh.name)
"""
filename = fh.name
idx = self.filenames.index(filename)
fh.seek(0)
skip = 0 # rows to skip (for a eventually found header)
# auto detect delimiter with csv.Sniffer
if self._delimiters[idx] is None:
# use a sample of three lines
sample = ''.join(fh.readline() for _ in range(3))
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample)
except csv.Error as e:
s = ('During handling of file "%s" following error occurred:'
' "%s". Sample was "%s"' % (filename, e, sample))
raise RuntimeError(s)
if sniffer.has_header(sample):
skip += 1
else:
sample = fh.readline()
fh.seek(0)

class custom_dialect(csv.Dialect):
delimiter = self._delimiters[idx]
quotechar = '"'
if sample[-2] == '\r' and sample[-1] == '\n':
lineterminator = '\r\n'
else:
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL

dialect = custom_dialect()
dialect.delimiter = self._delimiters[idx]
# determine header (multi-line)
hdr = False
for line in fh:
if line.startswith(self._comments[idx]):
hdr += 1
continue
else:
break

skip += hdr
length -= skip

self._dialects[idx] = dialect
self._skip[idx] = skip

return dialect, length, skip

@staticmethod
def _calc_offsets(fh):
""" determines byte offsets between all lines
Parameters
----------
fh : file handle
file handle to obtain byte offsets from.
Returns
-------
lengths : int
number of valid (non-empty) lines
offsets : ndarray(dtype=int64)
byte offsets
"""

def new_size(x):
return int(ceil(x * 1.2))
# how to handle mode?
"""
On Windows, tell() can return illegal values (after an fgets()) when
reading files with Unix-style line-endings. Use binary mode ('rb') to
circumvent this problem.
"""
with open(filename, self.DEFAULT_OPEN_MODE) as fh:
# approx by filesize / (first line + 20%)
size = new_size(os.stat(filename).st_size / len(fh.readline()))
assert size > 0
fh.seek(0)
offsets = np.empty(size, dtype=np.int64)
offsets[0] = 0
i = 1

filename = fh.name
fh.seek(0)
# approx by filesize / (first line + 20%)
fh.readline() # skip first line, because it may contain a much shorter header, which will give a bad estimate
size = new_size(os.stat(filename).st_size / len(fh.readline()))
offsets = np.empty(size, dtype=np.int64)
offsets[0] = 0
i = 1
# re-open in binary mode to circumvent a bug in Py3.5 win, where the first offset reported by tell
# overflows int64.
with open(filename, 'rb') as fh:
while fh.readline():
offsets[i] = fh.tell()
i += 1
if i >= len(offsets):
offsets = np.resize(offsets, new_size(len(offsets)))
offsets = offsets[:i]
length = len(offsets) - 1
fh.seek(0)
offsets = offsets[:i]

# filter empty lines (offset between two lines is only 1 or 2 chars)
# insert an diff of 2 at the beginning to match the amount of indices
diff = np.diff(offsets)
mask = diff > 2
mask = np.insert(mask, 0, True)
offsets = offsets[mask]
length = len(offsets) - 1

return length, offsets

@staticmethod
def _get_dimension(fh, dialect, skip):
fh.seek(0)
# if we have a header subtract it from total length
r = csv.reader(fh, dialect=dialect)
for _ in range(skip + 1):
line = next(r)

# obtain dimension from first valid row
try:
arr = np.array(line).astype(float)
except ValueError as ve:
s = 'could not parse first line of data in file "%s"' % fh.name
raise ValueError(s, ve)
s = arr.squeeze().shape
if len(s) == 1:
ndim = s[0]
else:
ndim = 1

return ndim

# auto detect delimiter with csv.Sniffer
if self._delimiters[idx] is None:
# determine delimiter
sample = fh.read(2048)
sniffer = csv.Sniffer()
try:
self._dialects[idx] = sniffer.sniff(sample)
except csv.Error as e:
s = ('During handling of file "%s" follwing error occured:'
' "%s". Sample was "%s"' % (filename, e, sample))
raise RuntimeError(s)
if sniffer.has_header(sample):
self._skip[idx] += 1
length -= 1
else:
class custom_dialect(csv.Dialect):
delimiter = self._delimiters[idx]
quotechar = '"'
# lets enforce \n because we use text mode with 'U' (unified newline)
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL
d = custom_dialect()
d.delimiter = self._delimiters[idx]

# determine header
hdr = False
for line in fh:
if line.startswith(self._comments[idx]):
hdr += 1
continue

self._skip[idx] += hdr
length -= hdr

self._dialects[idx] = d
# if we have a header subtract it from total length
fh.seek(0)
r = csv.reader(fh, dialect=self._dialects[idx])
for _ in range(self._skip[idx]+1):
line = next(r)
def _get_traj_info(self, filename):
# calc byte offsets, csv dialect and dimension (elements in first valid row)

try:
arr = np.array(line).astype(float)
except ValueError as ve:
s = 'could not parse first line of data in file "%s"' % filename
raise ValueError(s, ve)
s = arr.squeeze().shape
if len(s) == 1:
ndim = s[0]
else:
ndim = 1
with open(filename, self.DEFAULT_OPEN_MODE) as fh:
length, offsets = PyCSVReader._calc_offsets(fh)
dialect, length, skip = self._determine_dialect(fh, length)
ndim = PyCSVReader._get_dimension(fh, dialect, skip)

return TrajInfo(ndim, length, offsets)
14 changes: 13 additions & 1 deletion pyemma/coordinates/data/util/traj_info_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,26 @@ def _set_curr_db_version(self, val):
self._database['db_version'] = val
self._current_db_version = val

def _handle_csv(self, reader, filename, length):
# this is maybe a bit ugly, but so far we do not store the dialect of csv files in
# the database, so we need to re-do this step in case of a cache hit.
from pyemma.coordinates.data import PyCSVReader
if not isinstance(reader, PyCSVReader):
return
with open(filename, PyCSVReader.DEFAULT_OPEN_MODE) as fh:
reader._determine_dialect(fh, length)

def __getitem__(self, filename_reader_tuple):
filename, reader = filename_reader_tuple
key = self._get_file_hash(filename)
result = None
try:
result = str(self._database[key])
info = create_traj_info(result)
# handle cache misses and not interpreteable results by re-computation.

self._handle_csv(reader, filename, info.length)

# handle cache misses and not interpretable results by re-computation.
# Note: this also handles UnknownDBFormatExceptions!
except KeyError:
info = reader._get_traj_info(filename)
Expand Down
Loading

0 comments on commit 71b7713

Please sign in to comment.