forked from Ontraport/Backend-Test
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcompressor.py
More file actions
107 lines (103 loc) · 5.97 KB
/
compressor.py
File metadata and controls
107 lines (103 loc) · 5.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from collections import abc
from typing import Union
class ObjectCompressor:
"""
Note: Tested with Python v 3.7.3
This is a class containing compress and decompress functions as described on https://github.com/Ontraport/Backend-Test
"""
def compress(self, object_to_compress, path_so_far: str = "", dictionary: dict = None) -> dict:
"""
This function compresses a multi-dimensional container of any size (tested with nested Python dictionaries and nested custom classes)
and returns a compressed version of the original container as a Python dictionary
(note that a top level empty Array becomes an empty Array)
"""
if(dictionary is None):
dictionary = {}
# If this is a Collection, it will always be Sized, but we want to handle Mappings slightly different, since they can contain nested objects
if(isinstance(object_to_compress, abc.Collection)):
# If this is a Mapping (e.g. a dictionary), iterate through keys and compress recursively
if(isinstance(object_to_compress, abc.Mapping)):
if(len(object_to_compress) > 0):
for attribute in object_to_compress.keys():
value = object_to_compress.get(attribute)
self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, {}, True)
# If this is a Sized Collection (e.g. a List), iterate through elements and compress recursively
elif(isinstance(object_to_compress, abc.Sized)):
if(len(object_to_compress) > 0):
for i in range(len(object_to_compress)):
value = object_to_compress[i]
self.save_or_recurse(path_so_far + "/" + str(i), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, [], True)
else:
return []
# If this is a custom object, iterate through non-private attributes and compress
else:
filtered_attributes = list(filter(lambda x: not x.startswith('__'), dir(object_to_compress)))
if(len(filtered_attributes) > 0):
for attribute in filtered_attributes:
value = getattr(object_to_compress, str(attribute))
self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, {}, True)
return dictionary
def save_or_recurse(self, path_so_far: str, dictionary: dict, value, force_insert: bool = False) -> dict:
"""
This helper function was introduced to minimize duplicate code in the compress(...) method
"""
if(path_so_far.startswith("/")):
path_so_far = path_so_far[1:]
if (not isinstance(value, (float, int, str, bool, type(None))) and force_insert is False):
return self.compress(value, path_so_far, dictionary)
else:
dictionary[path_so_far] = value
return dictionary
def decompress(self, compressed_object: dict) -> Union[dict, list]:
"""
This function expands a compressed container into a dictionary representation of its original form
NOTE: If a custom object was flattened, decompress will still return a dictionary representation
"""
if(isinstance(compressed_object, abc.Collection)):
if(isinstance(compressed_object, abc.Mapping)):
resultObject = {}
keys = compressed_object.keys()
# For each key in the compressed object
for attribute in compressed_object.keys():
pathList = attribute.split("/")
if(pathList[0].isnumeric() and isinstance(resultObject, abc.Mapping)):
resultObject = []
pointer = resultObject
# For each token in the path
for i in range(len(pathList)):
token = pathList[i]
if(isinstance(pointer, abc.MutableSequence)): # We are in a list
if(token.isnumeric()):
if(int(token) < len(pointer)): # Already exists in this list
pointer = pointer[int(token)]
else:
if(i < len(pathList) - 1 and pathList[i+1].isnumeric()):
pointer.append([])
elif(i == len(pathList) - 1):
pointer.append(compressed_object[attribute])
else:
pointer.append({})
pointer = pointer[-1]
else: # We are in a Map
if(i < len(pathList) - 1):
if(pathList[i+1].isnumeric()): # There is an intermediate array
if(token not in pointer):
pointer[token] = []
else:
if(token not in pointer):
pointer[token] = {}
pointer = pointer[token]
else:
pointer[token] = compressed_object[attribute]
return resultObject
elif(isinstance(compressed_object, abc.Sized) and len(compressed_object) == 0):
return compressed_object