diff --git a/streaming/language_support/python/pymongo_hadoop/__init__.py b/streaming/language_support/python/pymongo_hadoop/__init__.py index 7920a603..24264393 100644 --- a/streaming/language_support/python/pymongo_hadoop/__init__.py +++ b/streaming/language_support/python/pymongo_hadoop/__init__.py @@ -1,10 +1,10 @@ import sys -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput -from reducer import BSONReducer, BSONReducerInput -from reducer import KeyValueBSONReducer, KeyValueBSONReducerInput -from mapper import BSONMapper, KeyValueBSONMapper +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput +from .reducer import BSONReducer, BSONReducerInput +from .reducer import KeyValueBSONReducer, KeyValueBSONReducerInput +from .mapper import BSONMapper, KeyValueBSONMapper __all__ = ['BSONInput', 'BSONOutput', 'KeyValueBSONOutput', 'KeyValueBSONInput', @@ -13,5 +13,5 @@ def dump_bits(bits): for bit in bits: - print >> sys.stderr, "\t * Bit: %s Ord: %d" % (hex(ord(bit)), ord(bit)) + print("\t * Bit: %s Ord: %d" % (hex(ord(bit)), ord(bit)), file=sys.stderr) diff --git a/streaming/language_support/python/pymongo_hadoop/input.py b/streaming/language_support/python/pymongo_hadoop/input.py index f8516c7c..a7770afe 100644 --- a/streaming/language_support/python/pymongo_hadoop/input.py +++ b/streaming/language_support/python/pymongo_hadoop/input.py @@ -14,7 +14,7 @@ class BSONInput(object): https://github.com/klbostee/typedbytes """ - def __init__(self, fh=sys.stdin, unicode_errors='strict'): + def __init__(self, fh=sys.stdin.buffer, unicode_errors='strict'): self.fh = fh self.unicode_errors = unicode_errors self.eof = False @@ -26,11 +26,11 @@ def _read(self): data = size_bits + self.fh.read(size) if len(data) != size + 4: raise struct.error("Unable to cleanly read expected BSON Chunk; EOF, underful buffer or invalid object size.") - if data[size + 4 - 1] != "\x00": + if data[size + 4 - 1] != 0: raise InvalidBSON("Bad EOO in BSON Data") doc = BSON(data).decode(codec_options=STREAMING_CODEC_OPTIONS) return doc - except struct.error, e: + except struct.error as e: #print >> sys.stderr, "Parsing Length record failed: %s" % e self.eof = True raise StopIteration(e) @@ -38,8 +38,8 @@ def _read(self): def read(self): try: return self._read() - except StopIteration, e: - print >> sys.stderr, "Iteration Failure: %s" % e + except StopIteration as e: + print("Iteration Failure: %s" % e, file=sys.stderr) return None def _reads(self): @@ -56,8 +56,8 @@ class KeyValueBSONInput(BSONInput): def read(self): try: doc = self._read() - except StopIteration, e: - print >> sys.stderr, "Key/Value Input iteration failed/stopped: %s" % e + except StopIteration as e: + print("Key/Value Input iteration failed/stopped: %s" % e, file=sys.stderr) return None if '_id' in doc: return doc['_id'], doc @@ -66,7 +66,7 @@ def read(self): def reads(self): it = self._reads() - n = it.next + n = it.__next__ while 1: doc = n() if '_id' in doc: diff --git a/streaming/language_support/python/pymongo_hadoop/mapper.py b/streaming/language_support/python/pymongo_hadoop/mapper.py index 90753d2a..5d6e1374 100644 --- a/streaming/language_support/python/pymongo_hadoop/mapper.py +++ b/streaming/language_support/python/pymongo_hadoop/mapper.py @@ -1,5 +1,5 @@ -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput class BSONMapper(object): """Wraps BSONInput to allow writing mapper functions diff --git a/streaming/language_support/python/pymongo_hadoop/output.py b/streaming/language_support/python/pymongo_hadoop/output.py index fbb9b0d8..d3c9f86b 100644 --- a/streaming/language_support/python/pymongo_hadoop/output.py +++ b/streaming/language_support/python/pymongo_hadoop/output.py @@ -10,7 +10,7 @@ class BSONOutput(object): https://github.com/klbostee/typedbytes """ - def __init__(self, fh=sys.stdout, unicode_errors='strict'): + def __init__(self, fh=sys.stdout.buffer, unicode_errors='strict'): self.fh = fh self.unicode_errors = unicode_errors diff --git a/streaming/language_support/python/pymongo_hadoop/reducer.py b/streaming/language_support/python/pymongo_hadoop/reducer.py index b11ffb57..be69b6e5 100644 --- a/streaming/language_support/python/pymongo_hadoop/reducer.py +++ b/streaming/language_support/python/pymongo_hadoop/reducer.py @@ -3,8 +3,8 @@ https://github.com/klbostee/dumbo """ from itertools import groupby -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput import sys @@ -55,7 +55,7 @@ def __init__(self, factory=None, input_fh=None): def default_reducer(data): - print >> sys.stderr, "*** Invoking default reducer function, this is unoptimized for your data and may be very slow." + print("*** Invoking default reducer function, this is unoptimized for your data and may be very slow.", file=sys.stderr) class BSONReducerInput(BSONInput): """Wrapper to 'roll up' the reduce data down to just