|
1 | 1 | """The ExpenseDocument class is the object representation of an AnalyzeID response. It is similar to a dictionary. Despite its name it does not inherit from Document as the AnalyzeID response does not contains position information.""" |
2 | 2 |
|
3 | | -import os |
4 | | -from typing import List, Dict, Union |
5 | | -from textractor.entities.bbox import SpatialObject |
6 | | -from textractor.entities.expense_field import ExpenseField |
| 3 | +from collections import defaultdict |
| 4 | +from typing import List |
7 | 5 |
|
8 | | -from textractor.exceptions import InputError |
| 6 | +from textractor.data.constants import AnalyzeExpenseFieldsGroup as AEFieldsGroup, AnalyzeExpenseFields as AEFields |
| 7 | +from textractor.entities.expense_field import ExpenseField, LineItemGroup, BoundingBox, DocumentEntity |
9 | 8 |
|
10 | 9 |
|
11 | | -class ExpenseDocument(SpatialObject): |
| 10 | +class Fields(dict): |
| 11 | + """ |
| 12 | + Dictionary to hold Summary Fields |
| 13 | + Dynamically added properties to enable ease of discovery |
| 14 | + """ |
| 15 | + def __init__(self): |
| 16 | + super(Fields, self).__init__() |
| 17 | + # We dynamically set the fields to None to help with discoverability |
| 18 | + for field in AEFields: |
| 19 | + setattr(self.__class__, field.name, property(lambda self, field=field: self.get(field.name))) |
| 20 | + |
| 21 | + def __repr__(self): |
| 22 | + output = "" |
| 23 | + for key, value in self.items(): |
| 24 | + output += f"{key}:" |
| 25 | + offset = 0 |
| 26 | + if len(value): |
| 27 | + output += "\n" |
| 28 | + offset = 4 |
| 29 | + for field in value: |
| 30 | + output += " "*offset + str(field).replace('\n', '\\n') + "\n" |
| 31 | + |
| 32 | + return output |
| 33 | + |
| 34 | +class FieldsGroups(dict): |
| 35 | + """ |
| 36 | + Summary Fields Group dictionary |
| 37 | + {GROUP_KEY_NAME: {GROUP_ID_1: [SUMMARY_FIELD1, SUMMARY_FIELD2]}} |
| 38 | + """ |
| 39 | + |
| 40 | + def __init__(self): |
| 41 | + super(FieldsGroups, self).__init__() |
| 42 | + for group in AEFieldsGroup: |
| 43 | + setattr(self.__class__, group.name, property(lambda self, group=group: self.get(group.name))) |
| 44 | + |
| 45 | + def __repr__(self): |
| 46 | + output = "" |
| 47 | + for key, group in self.items(): |
| 48 | + output += f"{key}: \n" |
| 49 | + for block in group.values(): |
| 50 | + for expense_field in block: |
| 51 | + output += " " + str(expense_field).replace('\n', '\\n') + "\n" |
| 52 | + output += "\n" |
| 53 | + output += "\n" |
| 54 | + return output |
| 55 | + |
| 56 | + def get_group_bboxes(self, key: str): |
| 57 | + """ |
| 58 | + Return the enclosing bboxes for each group for a given group key |
| 59 | + :param key: Group key e.g VENDOR |
| 60 | + :return: |
| 61 | + """ |
| 62 | + bboxes = [] |
| 63 | + for groups in self.get(key, {}).values(): |
| 64 | + bboxes.append(BoundingBox.enclosing_bbox([f.bbox for f in groups])) |
| 65 | + return bboxes |
| 66 | + |
| 67 | + |
| 68 | +class ExpenseDocument(DocumentEntity): |
12 | 69 | """ |
13 | 70 | Represents the description of a single expense document. |
14 | 71 | """ |
15 | 72 |
|
16 | 73 | def __init__( |
17 | | - self, summary_fields: List[ExpenseField], line_item_fields: List[ExpenseField] |
| 74 | + self, summary_fields: List[ExpenseField], line_items_groups: List[LineItemGroup], bounding_box: BoundingBox, page:int |
18 | 75 | ): |
19 | 76 | """ |
20 | | - Creates a new document, ideally containing entity objects pertaining to each page. |
21 | | -
|
22 | | - :param num_pages: Number of pages in the input Document. |
23 | | - """ |
24 | | - super().__init__(width=0, height=0) |
25 | | - self._summary_fields = ExpenseDocument._fields_to_dict(summary_fields) |
26 | | - self._line_item_fields = ExpenseDocument._fields_to_dict(line_item_fields) |
27 | | - |
28 | | - @classmethod |
29 | | - def _fields_to_dict( |
30 | | - cls, fields: Union[List[ExpenseField], List[Dict]] |
31 | | - ) -> Dict[str, ExpenseField]: |
32 | | - """Converts a list of expense field to a dictionary of ExpenseField |
33 | | -
|
34 | | - :param fields: Expense fields |
35 | | - :type fields: Union[List[ExpenseField], List[Dict]] |
36 | | - :raises InputError: Raised if `fields` is not of of type Union[List[ExpenseField], List[Dict]]) |
37 | | - :return: Dictionary that maps keys to ExpenseFields |
38 | | - :rtype: Dict[str, ExpenseField] |
| 77 | + :param summary_fields: List of ExpenseFields, not including line item ones |
| 78 | + :param line_items_groups: Groups of Line Item tables |
| 79 | + :param bounding_box: The bounding box for that ExpenseDocument |
| 80 | + :param page: The page where that document is |
39 | 81 | """ |
40 | | - if not fields: |
41 | | - return {} |
42 | | - elif isinstance(fields, list) and isinstance(fields[0], ExpenseField): |
43 | | - return { |
44 | | - ( |
45 | | - expense_field.key.text |
46 | | - if expense_field.key else |
47 | | - expense_field.type.text |
48 | | - ): expense_field |
49 | | - for expense_field in fields |
50 | | - } |
51 | | - elif isinstance(fields, list) and isinstance(fields[0], dict): |
52 | | - field_dict = {} |
53 | | - for expense_field in fields.values(): |
54 | | - field_dict[expense_field["key"]] = ExpenseField( |
55 | | - expense_field["key"], |
56 | | - expense_field["value"], |
57 | | - expense_field["confidence"], |
58 | | - ) |
59 | | - return field_dict |
60 | | - else: |
61 | | - raise InputError( |
62 | | - f"fields needs to be a list of ExpenseFields or a list of dictionaries, not {type(fields)}" |
63 | | - ) |
| 82 | + super().__init__('', bbox=bounding_box) |
| 83 | + self._summary_fields_list = summary_fields |
| 84 | + self._line_items_groups = line_items_groups |
| 85 | + self.summary_fields = Fields() |
| 86 | + self.summary_groups = FieldsGroups() |
| 87 | + self._unnormalized_fields = defaultdict(list) |
| 88 | + self._assign_summary_fields() |
| 89 | + self._page = page |
64 | 90 |
|
65 | 91 | @property |
66 | | - def summary_fields(self) -> Dict[str, ExpenseField]: |
67 | | - """Returns a dictionary of summary fields |
68 | | -
|
69 | | - :return: Dictionary of summary fields |
70 | | - :rtype: Dict[str, ExpenseField] |
71 | | - """ |
72 | | - return self._summary_fields |
73 | | - |
74 | | - @summary_fields.setter |
75 | | - def summary_fields(self, summary_fields: Dict[str, ExpenseField]): |
76 | | - """Setter for summary_fields |
77 | | -
|
78 | | - :param summary_fields: Summary fields |
79 | | - :type summary_fields: Dict[str, ExpenseField] |
80 | | - """ |
81 | | - self._summary_fields = summary_fields |
| 92 | + def page(self): |
| 93 | + return self._page |
82 | 94 |
|
83 | | - def __getitem__(self, key) -> str: |
84 | | - return self._summary_fields.get(key, self._line_item_fields.get(key)).value |
| 95 | + @property |
| 96 | + def bbox(self): |
| 97 | + return BoundingBox.enclosing_bbox([s.bbox for s in self._summary_fields_list]+[g.bbox for g in self._line_items_groups], spatial_object=self._bbox.spatial_object) |
| 98 | + |
| 99 | + def _assign_summary_fields(self): |
| 100 | + for field in self._summary_fields_list: |
| 101 | + # We assign them as properties |
| 102 | + name = field.type.text |
| 103 | + |
| 104 | + # Adding it to the dicts of normalized field |
| 105 | + if name in self.summary_fields: |
| 106 | + self.summary_fields[name].append(field) |
| 107 | + else: |
| 108 | + self.summary_fields[name] = [field] |
| 109 | + |
| 110 | + # Adding it to the dicts of unnormalized fields using the provided key |
| 111 | + key = field.key.text if field.key else "" |
| 112 | + self._unnormalized_fields[key].append(field) |
| 113 | + |
| 114 | + # If the field is part of a group, we add it to the list of fields for that group |
| 115 | + for group_properties in field.group_properties: |
| 116 | + for property_type in group_properties.types: |
| 117 | + if property_type not in self.summary_groups: |
| 118 | + self.summary_groups[property_type] = dict() |
| 119 | + if group_properties.id not in self.summary_groups[property_type]: |
| 120 | + self.summary_groups[property_type][group_properties.id] = [] |
| 121 | + self.summary_groups[property_type][group_properties.id].append(field) |
85 | 122 |
|
86 | | - def get(self, key) -> Union[str, None]: |
87 | | - result = self._summary_fields.get(key, self._line_item_fields.get(key)) |
88 | | - if result is None: |
89 | | - return None |
90 | | - return result.value |
| 123 | + @property |
| 124 | + def summary_fields_list(self): |
| 125 | + return self._summary_fields_list |
91 | 126 |
|
92 | | - def keys(self) -> List[str]: |
93 | | - return list(self._summary_fields.keys()) |
| 127 | + @property |
| 128 | + def line_items_groups(self) -> List[LineItemGroup]: |
| 129 | + return self._line_items_groups |
94 | 130 |
|
95 | 131 | def __repr__(self) -> str: |
96 | | - return os.linesep.join( |
97 | | - [f"{str(k)}: {str(v)}" for k, v in self._summary_fields.items()] |
98 | | - ) |
| 132 | + output = f"Summary fields: {len(self.summary_fields)}\n" |
| 133 | + output += "Line Item Groups:" |
| 134 | + output += "\n" if len(self.line_items_groups) > 1 else " " |
| 135 | + for i, line_item in enumerate(self.line_items_groups): |
| 136 | + output += f"index {line_item.index}: {len(line_item.rows)} row{'s' if (len(line_item.rows) > 1) else ''}" |
| 137 | + return output |
| 138 | + |
0 commit comments