diff --git a/pydantic_schemas/generators/generate_excel_files.py b/pydantic_schemas/generators/generate_excel_files.py index 3a6ba8d..03fdfec 100644 --- a/pydantic_schemas/generators/generate_excel_files.py +++ b/pydantic_schemas/generators/generate_excel_files.py @@ -4,7 +4,7 @@ ei = SchemaInterface() -for metadata_type in ei.get_metadata_types(): +for metadata_type in ei.list_metadata_types(): filename = f"excel_sheets/{metadata_type.capitalize()}_metadata.xlsx" print(f"Writing {metadata_type} outline to {filename}") if os.path.exists(filename): diff --git a/pydantic_schemas/generators/generate_pydantic_schemas.py b/pydantic_schemas/generators/generate_pydantic_schemas.py index 981124c..516a1b0 100644 --- a/pydantic_schemas/generators/generate_pydantic_schemas.py +++ b/pydantic_schemas/generators/generate_pydantic_schemas.py @@ -11,7 +11,6 @@ "image-schema.json", "microdata-schema.json", "script-schema.json", - "series-schema.json", "table-schema.json", "timeseries-db-schema.json", "timeseries-schema.json", diff --git a/pydantic_schemas/schema_interface.py b/pydantic_schemas/schema_interface.py index 5486788..5bf3a13 100644 --- a/pydantic_schemas/schema_interface.py +++ b/pydantic_schemas/schema_interface.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Type from openpyxl import load_workbook from pydantic import BaseModel @@ -8,7 +8,6 @@ geospatial_schema, microdata_schema, script_schema, - series_schema, table_schema, timeseries_db_schema, timeseries_schema, @@ -24,7 +23,7 @@ class SchemaInterface: """ Interface with Excel for creating, saving and updating metadata for various types: - documents, scripts, series, survey, table, timeseries, timeseries_db, video + documents, scripts, survey, table, timeseries, timeseries_db, video Retrieve pydantic model definitions for each metadata type """ @@ -34,7 +33,6 @@ class SchemaInterface: "geospatial": geospatial_schema.GeospatialSchema, # "image":image_schema.ImageDataTypeSchema, "script": script_schema.ResearchProjectSchemaDraft, - "series": series_schema.Series, "survey": microdata_schema.MicrodataSchema, "table": table_schema.Model, "timeseries": timeseries_schema.TimeseriesSchema, @@ -47,7 +45,6 @@ class SchemaInterface: # "geospatial":, # "image":, "script": write_across_many_sheets, - "series": write_to_single_sheet, # one sheet "survey": write_across_many_sheets, "table": write_across_many_sheets, "timeseries": write_across_many_sheets, @@ -60,7 +57,6 @@ class SchemaInterface: # "geospatial":, # "image":, "script": excel_doc_to_pydantic, - "series": excel_single_sheet_to_pydantic, # one sheet "survey": excel_doc_to_pydantic, "table": excel_doc_to_pydantic, "timeseries": excel_doc_to_pydantic, @@ -69,21 +65,19 @@ class SchemaInterface: } def get_metadata_class(self, metadata_type: str): - metadata_type = self._process_metadata_type(metadata_type) - if metadata_type not in self._TYPE_TO_SCHEMA: - raise NameError(f"{metadata_type} not known, must be one of {list(self._TYPE_TO_SCHEMA.keys())}.") + metadata_type = self.standardize_metadata_type_name(metadata_type) schema = self._TYPE_TO_SCHEMA[metadata_type] return schema - def template_to_pydantic(self, template: Dict, parent_schema_type: str, name: Optional[str] = None) -> BaseModel: - # metadata_type = self._process_metadata_type(parent_schema_type) - # schema = self._TYPE_TO_SCHEMA[metadata_type] + def template_to_pydantic( + self, template: Dict, parent_schema_type: str, name: Optional[str] = None + ) -> Type[BaseModel]: schema = self.get_metadata_class(parent_schema_type) return pydantic_from_template(template, schema, name) - def get_metadata_types(self): - return list(self._TYPE_TO_READER.keys()) + def list_metadata_types(self): + return list(self._TYPE_TO_SCHEMA.keys()) @staticmethod def _merge_dicts(base, update): @@ -123,11 +117,12 @@ def _merge_dicts(base, update): new_dict[key] = base_value return new_dict - @staticmethod - def _process_metadata_type(metadata_type: str) -> str: + def standardize_metadata_type_name(self, metadata_type: str) -> str: metadata_type = metadata_type.lower() + metadata_type = metadata_type.replace("-", "_") if metadata_type == "microdata" or metadata_type == "survey_microdata": metadata_type = "survey" + self._raise_if_unsupported_metadata_type(metadata_type=metadata_type) return metadata_type def type_to_outline(self, metadata_type: str, debug: bool = False) -> BaseModel: @@ -155,8 +150,10 @@ def write_outline_metadata_to_excel( Outputs: An Excel file into which metadata can be entered """ - metadata_type = self._process_metadata_type(metadata_type) - self._raise_if_unsupported_metadata_type(metadata_type=metadata_type) + metadata_type = self.standardize_metadata_type_name(metadata_type) + if metadata_type == "geospatial": + raise NotImplementedError("Geospatial schema contains an infinite loop so cannot be written to excel") + if filename is None: filename = f"{metadata_type}_metadata.xlsx" if not str(filename).endswith(".xlsx"): @@ -189,8 +186,9 @@ def save_metadata_to_excel( Outputs: An Excel file containing the metadata from the pydantic object. This file can be updated as needed. """ - metadata_type = self._process_metadata_type(metadata_type) - self._raise_if_unsupported_metadata_type(metadata_type=metadata_type) + metadata_type = self.standardize_metadata_type_name(metadata_type) + if metadata_type == "geospatial": + raise NotImplementedError("Geospatial schema contains an infinite loop so cannot be written to excel") if filename is None: filename = f"{metadata_type}_metadata.xlsx" @@ -240,7 +238,7 @@ def _get_metadata_type_from_excel_file(filename: str) -> str: return cell_values[0] - def read_metadata_excel(self, filename: str) -> BaseModel: + def read_metadata_from_excel(self, filename: str) -> BaseModel: """ Read in metadata_type metadata from an appropriately formatted Excel file as a pydantic object. @@ -251,25 +249,13 @@ def read_metadata_excel(self, filename: str) -> BaseModel: BaseModel: a pydantic object containing the metadata from the file """ metadata_type = self._get_metadata_type_from_excel_file(filename) - metadata_type = self._process_metadata_type(metadata_type) - self._raise_if_unsupported_metadata_type(metadata_type=metadata_type) + metadata_type = self.standardize_metadata_type_name(metadata_type) schema = self._TYPE_TO_SCHEMA[metadata_type] reader = self._TYPE_TO_READER[metadata_type] read_object = reader(filename, schema) - new_ob = self.inflate_read_data_to_schema(metadata_type, read_object) - return new_ob - - def inflate_read_data_to_schema(self, metadata_type, read_object): - metadata_type = self._process_metadata_type(metadata_type) - self._raise_if_unsupported_metadata_type(metadata_type=metadata_type) skeleton_object = self.type_to_outline(metadata_type=metadata_type, debug=False) - if isinstance(read_object, dict): - read_object_dict = read_object - elif isinstance(read_object, BaseModel): - read_object_dict = read_object.model_dump(exclude_none=True, exclude_unset=True, exclude_defaults=True) - else: - raise ValueError(f"Expected dict or pydantic BaseModel but got {type(read_object)}") + read_object_dict = read_object.model_dump(exclude_none=True, exclude_unset=True, exclude_defaults=True) combined_dict = self._merge_dicts( skeleton_object.model_dump(), read_object_dict, @@ -284,9 +270,6 @@ def _raise_if_unsupported_metadata_type(self, metadata_type: str): If the type is specifically unsupported - geospatial or image - a NotImplementedError is raised If the type is simply unknown then a ValueError is raised. """ - metadata_type = self._process_metadata_type(metadata_type) - if metadata_type == "geospatial": - raise NotImplementedError("Geospatial schema contains an infinite loop so cannot be written to excel") if metadata_type == "image": raise NotImplementedError("Due to an issue with image metadata schema definition causing __root__ errors") if metadata_type not in self._TYPE_TO_SCHEMA.keys(): diff --git a/pydantic_schemas/series_schema.py b/pydantic_schemas/series_schema.py deleted file mode 100644 index 3c371ae..0000000 --- a/pydantic_schemas/series_schema.py +++ /dev/null @@ -1,167 +0,0 @@ -# generated by datamodel-codegen: -# filename: series-schema.json -# timestamp: 2024-07-24T21:06:28+00:00 - -from __future__ import annotations - -from typing import List, Optional - -from pydantic import Field - -from .schema_base_model import SchemaBaseModel - - -class Model(SchemaBaseModel): - pass - - -class Alias(SchemaBaseModel): - alias: Optional[str] = Field(None, title="Alias") - - -class DefinitionReference(SchemaBaseModel): - source: Optional[str] = Field(None, title="Source") - uri: str = Field(..., description="URI", title="URI") - note: Optional[str] = Field(None, description="Note", title="Note") - - -class RelatedConcept(SchemaBaseModel): - name: str = Field(..., title="Name") - definition: Optional[str] = Field(None, description="Definition", title="Definition") - - -class Topic(SchemaBaseModel): - topic: str = Field(..., title="Topic") - vocabulary: Optional[str] = Field( - None, description="Name of the controlled vocabulary, if the topic is from a taxonomy.", title="Vocabulary" - ) - uri: Optional[str] = Field( - None, - description="Link to the controlled vocabulary web page, if the topic is from a taxonomy.", - title="Vocabulary URI", - ) - - -class SeriesDate(SchemaBaseModel): - start: Optional[str] = Field(None, title="Start") - end: Optional[str] = Field(None, title="End") - - -class GeographicUnit(SchemaBaseModel): - name: str = Field( - ..., description="Name of the geographic unit e.g. 'World', 'Africa', 'Afghanistan'", title="Location name" - ) - code: Optional[str] = Field( - None, description="Code of the geographic unit (for countries, preferred = ISO3 code)", title="Location code" - ) - type: Optional[str] = Field( - None, description="Type of geographic unit e.g. country, state, region, province etc", title="Type" - ) - - -class SerAccessLicense(SchemaBaseModel): - type: Optional[str] = Field(None, title="License type") - uri: Optional[str] = Field(None, title="URI") - - -class SeriesLink(SchemaBaseModel): - type: Optional[str] = Field(None, description="Link types - API, website, etc.", title="Link type") - description: Optional[str] = Field(None, title="Description") - uri: Optional[str] = Field(None, title="URI") - - -class ApiDocumentation(SchemaBaseModel): - """ - API Documentation - """ - - description: Optional[str] = Field(None, title="Description") - uri: Optional[str] = Field(None, title="URI") - - -class Keyword(SchemaBaseModel): - name: Optional[str] = Field(None, title="Keyword") - vocabulary: Optional[str] = Field(None, title="Vocabulary") - uri: Optional[str] = Field(None, title="URI") - - -class Note(SchemaBaseModel): - note: Optional[str] = Field(None, title="Note") - - -class RelatedIndicator(SchemaBaseModel): - code: Optional[str] = Field(None, title="Indicator code") - label: Optional[str] = Field(None, title="Indicator name") - uri: Optional[str] = Field(None, title="URI") - - -class ComplianceItem(SchemaBaseModel): - standard: Optional[str] = Field(None, title="Standard name") - organization: Optional[str] = Field(None, title="Organization name") - uri: Optional[str] = Field(None, title="URI") - - -class SeriesGroup(SchemaBaseModel): - name: Optional[str] = Field(None, title="Name") - version: Optional[str] = Field(None, title="Version") - uri: Optional[str] = Field(None, title="URI") - - -class Series(SchemaBaseModel): - idno: str = Field(..., description="Unique series ID", title="Series unique ID") - name: str = Field(..., title="Series Name") - db_idno: Optional[str] = Field(None, description="Series database ID", title="Database ID") - aliases: Optional[List[Alias]] = Field(None, title="Series other names") - measurement_unit: Optional[str] = Field(None, title="Series unit of measure") - periodicity: Optional[str] = Field(None, title="Periodicity of data") - base_period: Optional[str] = Field(None, title="Base period") - definition_short: Optional[str] = Field(None, title="Definition short") - definition_long: Optional[str] = Field(None, title="Definition long") - definition_references: Optional[List[DefinitionReference]] = Field( - None, - description="URL to standard definition of the indicator (international or national standard)", - title="Definition references", - ) - related_concepts: Optional[List[RelatedConcept]] = Field( - None, description="Related concepts", title="Related concepts" - ) - methodology: Optional[str] = Field(None, title="Methodology") - imputation: Optional[str] = Field(None, title="Imputations") - quality_checks: Optional[str] = Field(None, title="Quality control methods") - quality_note: Optional[str] = Field(None, title="Note on data quality") - series_break: Optional[str] = Field(None, title="Breaks in series") - statistical_concept: Optional[str] = Field(None, title="Statistical concept") - limitation: Optional[str] = Field(None, title="Limitations and exceptions") - topics: Optional[List[Topic]] = Field(None, description="Topics covered by the indicator", title="Topics") - relevance: Optional[str] = Field(None, title="Relavance") - series_dates: Optional[List[SeriesDate]] = Field(None, title="Series dates") - geographic_units: Optional[List[GeographicUnit]] = Field( - None, - description=( - "List of geographic units (regions, countries, states, provinces, etc.) for which data are available in the" - " database." - ), - title="Geographic locations", - ) - aggregation_method: Optional[str] = Field(None, title="Aggregation method") - ser_access_license: Optional[SerAccessLicense] = Field(None, title="Access licence") - confidentiality: Optional[str] = Field( - None, description="Confidentiality statement", title="Confidentiality statement" - ) - confidentiality_status: Optional[str] = Field(None, title="Confidentiality status") - confidentiality_note: Optional[str] = Field(None, title="Confidentiality note") - series_links: Optional[List[SeriesLink]] = Field( - None, description="Links to API calls, websites, etc.", title="Series links" - ) - api_documentation: Optional[ApiDocumentation] = Field(None, description="API Documentation") - source: Optional[str] = Field(None, title="Original source") - source_note: Optional[str] = Field(None, title="Notes form original source") - keywords: Optional[List[Keyword]] = Field(None, description="Keywords") - notes: Optional[List[Note]] = Field(None, description="Notes", title="Notes") - related_indicators: Optional[List[RelatedIndicator]] = Field(None, description="Related indicators") - compliance: Optional[List[ComplianceItem]] = Field( - None, description="Compliance with international resolution", title="Compliance with international resolution" - ) - series_groups: Optional[List[SeriesGroup]] = Field( - None, description="Series included in groups", title="Series groups" - ) diff --git a/pydantic_schemas/tests/test_excel_interface.py b/pydantic_schemas/tests/test_excel_interface.py index 5d3ef69..4d6203e 100644 --- a/pydantic_schemas/tests/test_excel_interface.py +++ b/pydantic_schemas/tests/test_excel_interface.py @@ -15,7 +15,7 @@ def test_metadata(tmpdir, metadata_type): ) # Read the metadata back - tmp = ei.read_metadata_excel(filename=filename) + tmp = ei.read_metadata_from_excel(filename=filename) # Save the read metadata to a new file filename2 = tmpdir.join(f"test_{metadata_type}_2.xlsx") diff --git a/pydantic_schemas/utils/template_to_pydantic.py b/pydantic_schemas/utils/template_to_pydantic.py index 5ff28b3..f44d311 100644 --- a/pydantic_schemas/utils/template_to_pydantic.py +++ b/pydantic_schemas/utils/template_to_pydantic.py @@ -7,22 +7,28 @@ def get_child_field_info_from_dot_annotated_name(name, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "get_child_field_info_from_dot_annotated_name" name_split = name.split(".") for key in name_split[:-1]: parent_schema = parent_schema.model_fields[key].annotation if is_optional_annotation(parent_schema) or is_list_annotation(parent_schema): parent_schema = get_subtype_of_optional_or_list(parent_schema) + if not isinstance(parent_schema, type(BaseModel)): + raise KeyError(name) try: child_field_info = parent_schema.model_fields[name_split[-1]] except KeyError as e: - raise KeyError(name) + raise KeyError(name) from e + except: + raise ValueError(f"name={name}, parent_schema={parent_schema}") return child_field_info -def define_simple_element(item, parent_schema, type=str): +def define_simple_element(item, parent_schema, element_type=str): + assert isinstance(parent_schema, type(BaseModel)), "define_simple_element" assert ( - isinstance(item, dict) and "type" in item and item["type"] in ["string", "integer"] - ), f"expected string item, got {item}" + isinstance(item, dict) and "type" in item and item["type"] in ["string", "text", "integer", "number", "boolean"] + ), f"expected string, integer or boolean item, got {item}" try: child_field_info = get_child_field_info_from_dot_annotated_name(item["key"], parent_schema) if "title" in item: @@ -31,52 +37,58 @@ def define_simple_element(item, parent_schema, type=str): child_field_info.description = item["description"] except KeyError as e: warnings.warn(f"KeyError: {e}. Proceeding since {item['key']} is a string type.", UserWarning) - child_field_info = Field(..., title=item["title"], description=item["help_text"]) + child_field_info = Field(..., title=item["title"]) + if "help_text" in item: + child_field_info.description = item["help_text"] if "required" in item and item["required"]: - field_type = type, child_field_info + field_type = element_type, child_field_info else: child_field_info.default = None - field_type = Optional[type], child_field_info + field_type = Optional[element_type], child_field_info return {item["key"]: field_type} def get_children_of_props(props, parent_schema) -> Dict[str, Tuple["type_annotation", "field_info"]]: + assert isinstance(parent_schema, type(BaseModel)), "get_children_of_props" children = {} for prop in props: - name = prop["prop_key"] - try: - child_field_info = get_child_field_info_from_dot_annotated_name(name, parent_schema) - if "title" in prop: - child_field_info.title = prop["title"] - if "help_text" in prop: - child_field_info.description = prop["help_text"] - child_field = child_field_info.annotation, child_field_info - children[prop["key"]] = child_field - except KeyError as e: - if prop["type"] == "string": - warnings.warn(f"KeyError: {e}. Proceeding since {name} is a string type.", UserWarning) - children.update(define_simple_element(prop, parent_schema=parent_schema)) - elif prop["type"] == "integer": - warnings.warn(f"KeyError: {e}. Proceeding since {name} is an int type.", UserWarning) - children.update(define_simple_element(prop, parent_schema=parent_schema, type=int)) - else: - raise KeyError(e) from e - children = standardize_keys_in_dict(children, snake_to_pascal=True) + if "prop_key" not in prop: + children.update(template_type_handler(prop, parent_schema)) + else: + name = prop["prop_key"] + try: + child_field_info = get_child_field_info_from_dot_annotated_name(name, parent_schema) + if "title" in prop: + child_field_info.title = prop["title"] + if "help_text" in prop: + child_field_info.description = prop["help_text"] + child_field = child_field_info.annotation, child_field_info + children[prop["key"]] = child_field + except KeyError as e: + children.update(template_type_handler(prop, parent_schema)) return children def define_array_element(item, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "define_array_element" assert "type" in item and ( item["type"] == "array" or item["type"] == "nested_array" ), f"expected array item but got {item}" - assert "props" in item, f"expected props in item but got {item.keys()}" assert "key" in item, f"expected key in item but got {item.keys()}" - children = get_children_of_props(item["props"], parent_schema) - item_element = create_model(f"{item['key']}_item", **children) - return {item["key"]: (List[item_element], item_element)} + if "props" not in item: + warnings.warn(f"array without type found, assuming array of str: {item}") + field_info = Field(..., title=item["title"]) + if "help_text" in item: + field_info.description = item["help_text"] + return {item["key"]: (List[str], field_info)} + else: + children = get_children_of_props(item["props"], parent_schema) + item_element = create_model(f"{item['key']}_item", **children) + return {item["key"]: (List[item_element], item_element)} def define_simple_array_element(item, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "define_simple_array_element" assert ( isinstance(item, dict) and "type" in item and item["type"] == "simple_array" ), f"expected simple_array item, got {item}" @@ -88,7 +100,9 @@ def define_simple_array_element(item, parent_schema): child_field_info.description = item["description"] except KeyError as e: warnings.warn(f"KeyError: {e}. Proceeding since {item['key']} is a simple_array type.", UserWarning) - child_field_info = Field(..., title=item["title"], description=item["help_text"]) + child_field_info = Field(..., title=item["title"]) + if "help_test" in item: + child_field_info.description = item["help_text"] if "required" in item and item["required"]: field_type = List[str], child_field_info else: @@ -98,6 +112,7 @@ def define_simple_array_element(item, parent_schema): def define_from_section_container(item, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "define_from_section_container" assert ( isinstance(item, dict) and "type" in item and item["type"] == "section_container" ), f"expected section_container got {item}" @@ -112,32 +127,59 @@ def define_from_section_container(item, parent_schema): def define_group_of_elements(items, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "define_group_of_elements" elements = {} for i, item in enumerate(items): - if item["type"] == "section_container": - elements.update(define_from_section_container(item, parent_schema=parent_schema)) - elif item["type"] == "string": - elements.update(define_simple_element(item, parent_schema, str)) - elif item["type"] == "integer": - elements.update(define_simple_element(item, parent_schema, int)) - elif item["type"] in ["array", "nested_array"]: - elements.update(define_array_element(item, parent_schema)) - elif item["type"] == "simple_array": - elements.update(define_simple_array_element(item, parent_schema)) - elif item["type"] == "section": - print(f"encountered section {item['key']}, {item['title']}, ignoring this heirarchy and appending") - assert "items" in item, f"section does not contain items, found only {item}" - elements.update(define_group_of_elements(item["items"], parent_schema)) + if "is_custom" in item and item["is_custom"] == True: + if "additional" not in elements: + elements["additional"] = {} + elements["additional"].update(template_type_handler(item, parent_schema)) + elements["additional"] = standardize_keys_in_dict(elements["additional"], pascal_to_snake=True) else: - raise NotImplementedError(f"item {i} has type {item['type']}, {item}") - elements = standardize_keys_in_dict(elements, snake_to_pascal=True) + elements.update(template_type_handler(item, parent_schema)) + elements = standardize_keys_in_dict(elements, pascal_to_snake=True) + if "additional" in elements: + additional = elements.pop("additional") + additional = create_model("additional", **additional) + sub_field = Field(...) + sub_field.title = "additional" + elements["additional"] = additional, sub_field return elements -def pydantic_from_template(template: Dict, parent_schema: Type[BaseModel], name: Optional[str] = None) -> BaseModel: +def template_type_handler(item, parent_schema): + assert isinstance(parent_schema, type(BaseModel)), "template_type_handler" + if item["type"] == "section_container": + return define_from_section_container(item, parent_schema) + elif item["type"] in ["string", "text"]: + return define_simple_element(item, parent_schema, str) + elif item["type"] in ["integer", "number"]: + return define_simple_element(item, parent_schema, int) + elif item["type"] == "boolean": + return define_simple_element(item, parent_schema, bool) + elif item["type"] in ["array", "nested_array"]: + return define_array_element(item, parent_schema) + elif item["type"] == "simple_array": + return define_simple_array_element(item, parent_schema) + elif item["type"] == "section": + warnings.warn(f"encountered section {item['key']}, {item['title']}, ignoring this heirarchy and appending") + if "items" in item: + return define_group_of_elements(item["items"], parent_schema) + elif "props" in item: + return define_group_of_elements(item["props"], parent_schema) + else: + raise ValueError(f"section does not contain items or props, found only {item}") + else: + raise NotImplementedError(f"type {item['type']}, {item}") + + +def pydantic_from_template( + template: Dict, parent_schema: Type[BaseModel], name: Optional[str] = None +) -> Type[BaseModel]: + assert isinstance(parent_schema, type(BaseModel)), "pydantic_from_template" assert "items" in template, f"expected 'items' in template but got {list(template.keys())}" m = define_group_of_elements(template["items"], parent_schema) - m = standardize_keys_in_dict(m, snake_to_pascal=True) + m = standardize_keys_in_dict(m, pascal_to_snake=True) if name is None: if "title" in template: name = template["title"] diff --git a/pydantic_schemas/utils/utils.py b/pydantic_schemas/utils/utils.py index c96cc8c..1e4a923 100644 --- a/pydantic_schemas/utils/utils.py +++ b/pydantic_schemas/utils/utils.py @@ -1,3 +1,4 @@ +import re import typing from typing import Any, Callable, Dict, List, Optional, Type, Union @@ -109,13 +110,19 @@ def seperate_simple_from_pydantic(ob: BaseModel) -> Dict[str, Dict]: return {"simple": simple_children, "pydantic": pydantic_children} -def _standardize_keys_in_list_of_possible_dicts(lst: List[any], snake_to_pascal: bool) -> List[Any]: +def _standardize_keys_in_list_of_possible_dicts(lst: List[any], snake_to_pascal, pascal_to_snake) -> List[Any]: new_value = [] for item in lst: if isinstance(item, dict): - new_value.append(standardize_keys_in_dict(item, snake_to_pascal)) + new_value.append( + standardize_keys_in_dict(item, snake_to_pascal=snake_to_pascal, pascal_to_snake=pascal_to_snake) + ) elif isinstance(item, list): - new_value.append(_standardize_keys_in_list_of_possible_dicts(item, snake_to_pascal)) + new_value.append( + _standardize_keys_in_list_of_possible_dicts( + item, snake_to_pascal=snake_to_pascal, pascal_to_snake=pascal_to_snake + ) + ) else: new_value.append(item) return new_value @@ -127,7 +134,14 @@ def capitalize_first_letter(s): return s -def standardize_keys_in_dict(d: Dict[str, Any], snake_to_pascal: bool = False) -> Dict[str, Any]: +def split_on_capitals(s): + # Use regular expression to split on capitalized letters + return re.findall(r"[a-z]+|[A-Z][a-z]*", s) + + +def standardize_keys_in_dict( + d: Dict[str, Any], snake_to_pascal: bool = False, pascal_to_snake: bool = False +) -> Dict[str, Any]: """ sometimes when field names are also python protected names like 'from' and 'import' then we append an underscore to the field name to avoide clashes. @@ -139,13 +153,17 @@ def standardize_keys_in_dict(d: Dict[str, Any], snake_to_pascal: bool = False) - new_key = key.replace(" ", "_").rstrip("_") new_key = new_key.split(".")[-1] if snake_to_pascal: - print(f"snake_to_pascal from {new_key}") new_key = "".join([capitalize_first_letter(x) for x in new_key.split("_")]) - print(f"to {new_key}\n") + elif pascal_to_snake: + new_key = "_".join([x.lower() for x in split_on_capitals(new_key)]) if isinstance(value, dict): - new_value = standardize_keys_in_dict(value, snake_to_pascal=snake_to_pascal) + new_value = standardize_keys_in_dict( + value, snake_to_pascal=snake_to_pascal, pascal_to_snake=pascal_to_snake + ) elif isinstance(value, list): - new_value = _standardize_keys_in_list_of_possible_dicts(value, snake_to_pascal) + new_value = _standardize_keys_in_list_of_possible_dicts( + value, snake_to_pascal, pascal_to_snake=pascal_to_snake + ) else: new_value = value new_dict[new_key] = new_value diff --git a/pyproject.toml b/pyproject.toml index db66af0..bb6a5d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,13 @@ [tool.poetry] name = "metadataschemas" -version = "0.1.0" +version = "0.1.5" description = "" authors = ["Mehmood Asghar ", "Gordon Blackadder "] readme = "README.md" packages = [ { include = "*_schema.py", from = "pydantic_schemas", to = "metadataschemas"}, { include = "schema_base_model.py", from = "pydantic_schemas", to = "metadataschemas"}, - { include = "excel_interface.py", from = "pydantic_schemas", to = "metadataschemas"}, + { include = "schema_interface.py", from = "pydantic_schemas", to = "metadataschemas"}, { include = "utils", from = "pydantic_schemas", to = "metadataschemas"}, ]