-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathfile_data.py
116 lines (91 loc) · 3.75 KB
/
file_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import json
from pathlib import Path
from typing import Any, Optional
from uuid import NAMESPACE_DNS, uuid5
from pydantic import BaseModel, Field, ValidationError, field_validator, model_validator
from unstructured_ingest.v2.logger import logger
class SourceIdentifiers(BaseModel):
filename: str
fullpath: str
rel_path: Optional[str] = None
@property
def filename_stem(self) -> str:
return Path(self.filename).stem
@property
def relative_path(self) -> str:
return self.rel_path or self.fullpath
class FileDataSourceMetadata(BaseModel):
url: Optional[str] = None
version: Optional[str] = None
record_locator: Optional[dict[str, Any]] = None
date_created: Optional[str] = None
date_modified: Optional[str] = None
date_processed: Optional[str] = None
permissions_data: Optional[list[dict[str, Any]]] = None
filesize_bytes: Optional[int] = None
class FileData(BaseModel):
identifier: str
connector_type: str
source_identifiers: SourceIdentifiers
metadata: FileDataSourceMetadata = Field(default_factory=lambda: FileDataSourceMetadata())
additional_metadata: dict[str, Any] = Field(default_factory=dict)
reprocess: bool = False
local_download_path: Optional[str] = None
display_name: Optional[str] = None
@classmethod
def from_file(cls, path: str) -> "FileData":
path = Path(path).resolve()
if not path.exists() or not path.is_file():
raise ValueError(f"file path not valid: {path}")
with open(str(path.resolve()), "rb") as f:
file_data_dict = json.load(f)
file_data = cls.model_validate(file_data_dict)
return file_data
@classmethod
def cast(cls, file_data: "FileData", **kwargs) -> "FileData":
file_data_dict = file_data.model_dump()
return cls.model_validate(file_data_dict, **kwargs)
def to_file(self, path: str) -> None:
path = Path(path).resolve()
path.parent.mkdir(parents=True, exist_ok=True)
with open(str(path.resolve()), "w") as f:
json.dump(self.model_dump(), f, indent=2)
class BatchItem(BaseModel):
identifier: str
version: Optional[str] = None
class BatchFileData(FileData):
identifier: str = Field(init=False)
batch_items: list[BatchItem]
source_identifiers: Optional[SourceIdentifiers] = None
@field_validator("batch_items")
@classmethod
def check_batch_items(cls, v: list[BatchItem]) -> list[BatchItem]:
if not v:
raise ValueError("batch items cannot be empty")
all_identifiers = [item.identifier for item in v]
if len(all_identifiers) != len(set(all_identifiers)):
raise ValueError(f"duplicate identifiers: {all_identifiers}")
sorted_batch_items = sorted(v, key=lambda item: item.identifier)
return sorted_batch_items
@model_validator(mode="before")
@classmethod
def populate_identifier(cls, data: Any) -> Any:
if isinstance(data, dict) and "identifier" not in data:
batch_items = data["batch_items"]
identifier_data = json.dumps(
{item.identifier: item.version for item in batch_items}, sort_keys=True
)
data["identifier"] = str(uuid5(NAMESPACE_DNS, str(identifier_data)))
return data
def file_data_from_file(path: str) -> FileData:
try:
return BatchFileData.from_file(path=path)
except ValidationError:
logger.debug(f"{path} not detected as batch file data")
return FileData.from_file(path=path)
def file_data_from_dict(data: dict) -> FileData:
try:
return BatchFileData.model_validate(data)
except ValidationError:
logger.debug(f"{data} not valid for batch file data")
return FileData.model_validate(data)