diff --git a/components/lif/datatypes/schema.py b/components/lif/datatypes/schema.py new file mode 100644 index 0000000..26a9994 --- /dev/null +++ b/components/lif/datatypes/schema.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass +from typing import Any, Dict + +@dataclass +class SchemaField: + """Represents a single schema field, including its path and attributes.""" + + json_path: str + description: str + attributes: Dict[str, Any] + py_field_name: str = "" diff --git a/components/lif/dynamic_models/__init__.py b/components/lif/dynamic_models/__init__.py new file mode 100644 index 0000000..18ec27f --- /dev/null +++ b/components/lif/dynamic_models/__init__.py @@ -0,0 +1,3 @@ +from lif.dynamic_models import core + +__all__ = ["core"] diff --git a/components/lif/dynamic_models/core.py b/components/lif/dynamic_models/core.py new file mode 100644 index 0000000..4257832 --- /dev/null +++ b/components/lif/dynamic_models/core.py @@ -0,0 +1,406 @@ +""" +Reusable module to build nested Pydantic models dynamically +from a list of schema fields (endpoints in the schema tree). + +This module supports building Pydantic models at runtime based on a schema definition, +allowing flexible data validation for various use cases (e.g., query filters, mutations, or full models). +All fields in generated models are Optional and default to None. + +Functions: + build_dynamic_model: Creates nested Pydantic models from schema fields. + build_dynamic_models: Loads schema fields and builds all model variants (filters, mutations, full model). +""" + +import logging +import os +import re +from datetime import date, datetime +from enum import Enum +from typing import Annotated, Any, Dict, List, Optional, Tuple, Type, TypeVar, cast + +from pydantic import BaseModel, ConfigDict, Field + +from lif.datatypes.schema import SchemaField +from lif.schema.core import load_schema_nodes +from lif.string_utils.core import to_pascal_case + + +# ===== Environment and Global Config ===== + +ROOT_NODE: str | None = os.getenv("ROOT_NODE") +OPENAPI_SCHEMA_FILE: str | None = os.getenv("OPENAPI_SCHEMA_FILE") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +T = TypeVar("T") +ModelDict = Dict[str, Type[BaseModel]] + +#: Map XML-Schema datatypes ➜ native Python types +DATATYPE_MAP: dict[str, type[Any]] = { + "xsd:string": str, + "xsd:decimal": float, + "xsd:integer": int, + "xsd:boolean": bool, + "xsd:date": date, + "xsd:dateTime": datetime, + "xsd:datetime": datetime, + "xsd:anyURI": str, +} + +#: Singleton cache to prevent duplicate Enum definitions +_ENUM_CLASS_CACHE: Dict[str, Type[Enum]] = {} + +_TRUTHY = {"yes", "true", "1"} + + +# ===== Helpers ===== + + +def _is_yes(value: Any) -> bool: + """Check if value is a truthy 'yes' string.""" + return str(value).strip().lower() in _TRUTHY + + +def _to_enum_member(label: str) -> str: + """Convert a string label into a valid Enum member name.""" + key = re.sub(r"\W|^(?=\d)", "_", label.upper()) + return key + + +def make_enum(name: str, values: List[Any]) -> Type[Enum]: + """Return a cached Enum type for the given values.""" + cache_key = f"{name}_{'_'.join(map(str, sorted(values)))}" + if cache_key in _ENUM_CLASS_CACHE: # short-circuit hit + return _ENUM_CLASS_CACHE[cache_key] + + # Ensure value type is compatible with typing stubs and tooling + members: Dict[str, object] = {_to_enum_member(v): v for v in values} + # Use functional API with explicit module for better pickling/introspection + enum_cls = cast(Type[Enum], Enum(name, members, module=__name__)) + # enum_cls = type(name, (Enum,), {"__module__": __name__, **members}) # alternative + _ENUM_CLASS_CACHE[cache_key] = enum_cls + return enum_cls + + +# ===== Core builders ===== + + +def build_dynamic_model( + schema_fields: List[SchemaField], + *, + attribute_flag: str | None = "xQueryable", + model_doc: str = "Data model", + allow_extra: bool = False, + model_suffix: str = "", + all_optional: bool = True, +) -> ModelDict: + """Create nested Pydantic models for the given schema fields. + + Args: + schema_fields (List[SchemaField]): Leaf-level schema nodes. + attribute_flag (str | None): Only include fields whose attributes[flag] is truthy. + Use None to include all fields. + model_doc (str): Base docstring used for generated classes. + allow_extra (bool): If True, allows extra properties (additionalProperties: true). + model_suffix (str): Suffix for model class names (e.g., "Type", "Filter", "Mutation"). + all_optional (bool): If True, all fields will be Optional and default to None. + + Returns: + ModelDict: A mapping { model_name: Pydantic class }. + """ + + # Build a *tree* structure and a quick lookup for leaf nodes. + tree: dict[str, Any] = {} + leaf_by_path: Dict[Tuple[str, ...], SchemaField] = {} + + for sf in schema_fields: + if attribute_flag and not sf.attributes.get(attribute_flag, False): + continue + + parts = sf.json_path.split(".") + node = tree + for i, part in enumerate(parts): + node = node.setdefault(part, {}) + if i == len(parts) - 1: + leaf_by_path[tuple(parts)] = sf + + if not tree: + return {} + + if len(tree) != 1: + raise ValueError(f"All {attribute_flag or 'selected'} json_path values must share a common root") + + # ===== Internal utilities ===== + + def _field_type(sf: SchemaField, name: str) -> Any: + """Translate a SchemaField to a typing type. + + Args: + sf (SchemaField): The schema field. + name (str): Name for enum classes. + + Returns: + Any: The Python type or Enum for the field. + """ + if "enum" in sf.attributes: + base: Any = make_enum(name.capitalize(), sf.attributes["enum"]) + else: + base = DATATYPE_MAP.get(sf.attributes.get("dataType", "xsd:string"), str) + + if _is_yes(sf.attributes.get("array", "No")): + base = List[base] + + if all_optional: + return Optional[base] + return base + + def _wrap_root(root_name: str, inner: Type[BaseModel], is_array: bool) -> Type[BaseModel]: + """Create a top-level Pydantic wrapper model with the specified root name. + + Args: + root_name (str): Root field name. + inner (Type[BaseModel]): The inner model class. + is_array (bool): If True, wraps the model in a List[]. + + Returns: + Type[BaseModel]: The generated wrapper model. + """ + field_type: Any = List[inner] if is_array else inner + annotations = {root_name: field_type} + doc = f"Top-level wrapper with `{root_name}` field." if root_name else "Top-level wrapper." + class_name = f"{root_name.capitalize()}{model_suffix}" + namespace = { + "__annotations__": annotations, + "__doc__": doc, + "model_config": ConfigDict(strict=False, extra="allow" if allow_extra else "forbid"), + } + # Only set default if all_optional + if all_optional: + namespace[root_name] = None + return type(class_name, (BaseModel,), namespace) + + # ===== Recursive Model Builder ===== + + models: ModelDict = {} + + def strip_root(parts): + """Remove the root node from the path.""" + if parts and parts[0].lower() == root_name.lower(): + return parts[1:] + return parts + + def _build_model(name: str, subtree: dict[str, Any], path: Tuple[str, ...]) -> Type[BaseModel] | None: + """Recursively build nested Pydantic models. + + Args: + name (str): Model class name. + subtree (dict[str, Any]): Subtree of the schema. + path (Tuple[str, ...]): Current path in the tree. + + Returns: + Type[BaseModel] | None: The constructed model or None if no fields. + """ + sf = leaf_by_path.get(path) + stripped = strip_root(path) + if stripped: + class_name = to_pascal_case("".join(x for x in stripped)) + else: + class_name = to_pascal_case(root_name) + if model_suffix and not class_name.endswith(model_suffix): + class_name = f"{class_name}{model_suffix}" + # Guarantee non-empty unique_name for root + unique_name = (sf.attributes.get("uniqueName") if sf else None) or ".".join(stripped) or class_name + + annotations: Dict[str, Any] = {} + defaults: Dict[str, Any] = {} + + for key, child in subtree.items(): + child_path = path + (key,) + leaf_sf = leaf_by_path.get(child_path) + + if not child: # leaf + if leaf_sf: + if all_optional: + annotations[key] = Annotated[ + Optional[_field_type(leaf_sf, to_pascal_case(key))], Field(description=leaf_sf.description) + ] + defaults[key] = None + else: + annotations[key] = Annotated[ + _field_type(leaf_sf, to_pascal_case(key)), Field(description=leaf_sf.description) + ] + # No default: required + else: # branch ➜ nested model + is_array = _is_yes(leaf_sf.attributes.get("array", "No")) if leaf_sf else False + child_model = _build_model(key.capitalize(), child, child_path) + if child_model: + if all_optional: + annotations[key] = Optional[List[child_model]] if is_array else Optional[child_model] + defaults[key] = None + else: + annotations[key] = List[child_model] if is_array else child_model + # No default: required + + if not annotations: + return None + + desc = f"{model_doc} for `{class_name}`." + namespace = { + "__annotations__": annotations, + "__doc__": desc, + "__module__": __name__, + # Use supported ConfigDict keys; attach metadata via json_schema_extra + "model_config": ConfigDict( + # TODO (from before integration into this repo): Make sure the change from this works: title=class_name, description=desc, strict=False, extra="allow" if allow_extra else "forbid" + title=class_name, + strict=False, + extra="allow" if allow_extra else "forbid", + json_schema_extra={"description": desc}, + ), + } + # Only set defaults for all_optional + if all_optional: + namespace.update(defaults) + cls = type(class_name, (BaseModel,), namespace) + models[unique_name] = cls + return cls + + # ===== Build Root + Wrapper ===== + + # TODO (from before integration into this repo): This forces the wrapper structure. It should use OpenAPI schema + + root_name = next(iter(tree)) + + inner_model = _build_model(root_name.capitalize(), tree[root_name], (root_name,)) + if inner_model is None: # pragma: no cover – safeguard + return {} + + # Just force as array: + wrapper_model = _wrap_root(root_name, inner_model, True) + + models[root_name] = inner_model + models[f"{root_name}_wrapper"] = wrapper_model + return models + + +# ===== External Entrypoints ===== + + +def get_schema_fields() -> List[SchemaField]: + """ + Load and return the list of schema fields from the configured schema source. + + Returns: + List[SchemaField]: The schema fields for the root node. + """ + # Read environment at call time to allow tests/runtime overrides + openapi_file = os.getenv("OPENAPI_SCHEMA_FILE", OPENAPI_SCHEMA_FILE) + root_node = os.getenv("ROOT_NODE", ROOT_NODE) + if openapi_file is None: + raise ValueError("OPENAPI_SCHEMA_FILE environment variable is not set") + return load_schema_nodes(openapi_file, root_node) + + +def build_filter_models(fields: List[SchemaField], *, allow_extra: bool = True, all_optional: bool = True) -> ModelDict: + """ + Build filter models from schema fields. + + Args: + fields (List[SchemaField]): Schema fields. + allow_extra (bool, optional): Allow extra properties in models. Default is True. + all_optional (bool, optional): Make all fields Optional and default to None. Default is True. + + Returns: + ModelDict: A mapping { model_name: Pydantic class } for filter models. + """ + return build_dynamic_model( + fields, + attribute_flag="xQueryable", + model_doc="Filter data model", + model_suffix="Filter", + allow_extra=allow_extra, + all_optional=all_optional, + ) + + +def build_mutation_models( + fields: List[SchemaField], *, allow_extra: bool = False, all_optional: bool = True +) -> ModelDict: + """ + Build mutation models from schema fields. + + Args: + fields (List[SchemaField]): Schema fields. + allow_extra (bool, optional): Allow extra properties in models. Default is False. + all_optional (bool, optional): Make all fields Optional and default to None. Default is True. + + Returns: + ModelDict: A mapping { model_name: Pydantic class } for mutation models. + """ + return build_dynamic_model( + fields, + attribute_flag="xMutable", + model_doc="Mutation data model", + model_suffix="Mutation", + allow_extra=allow_extra, + all_optional=all_optional, + ) + + +def build_full_models(fields: List[SchemaField], *, allow_extra: bool = False, all_optional: bool = False) -> ModelDict: + """ + Build full (strict) models from schema fields. + + Args: + fields (List[SchemaField]): Schema fields. + allow_extra (bool, optional): Allow extra properties in models. Default is False. + all_optional (bool, optional): Make all fields Optional and default to None. Default is False (fields required). + + Returns: + ModelDict: A mapping { model_name: Pydantic class } for full models. + """ + return build_dynamic_model( + fields, + attribute_flag=None, + model_doc="Full data model", + model_suffix="Type", + allow_extra=allow_extra, + all_optional=all_optional, + ) + + +def build_all_models( + *, + filter_allow_extra: bool = True, + filter_all_optional: bool = True, + mutation_allow_extra: bool = False, + mutation_all_optional: bool = True, + full_allow_extra: bool = False, + full_all_optional: bool = False, +) -> tuple[List[SchemaField], ModelDict, ModelDict, ModelDict]: + """ + Build all three model sets (filter, mutation, full) in one go, optionally customizing allow_extra and all_optional for each. + + Keyword Args: + filter_allow_extra (bool): Allow extra properties in filter models. Default True. + filter_all_optional (bool): Make all filter model fields Optional. Default True. + mutation_allow_extra (bool): Allow extra properties in mutation models. Default False. + mutation_all_optional (bool): Make all mutation model fields Optional. Default True. + full_allow_extra (bool): Allow extra properties in full models. Default False. + full_all_optional (bool): Make all full model fields Optional. Default False (fields required). + + Returns: + tuple: + - List[SchemaField]: Schema fields. + - ModelDict: Filter models. + - ModelDict: Mutation models. + - ModelDict: Full models. + """ + fields = get_schema_fields() + return ( + fields, + build_filter_models(fields, allow_extra=filter_allow_extra, all_optional=filter_all_optional), + build_mutation_models(fields, allow_extra=mutation_allow_extra, all_optional=mutation_all_optional), + build_full_models(fields, allow_extra=full_allow_extra, all_optional=full_all_optional), + ) diff --git a/components/lif/schema/__init__.py b/components/lif/schema/__init__.py new file mode 100644 index 0000000..3138d3f --- /dev/null +++ b/components/lif/schema/__init__.py @@ -0,0 +1,3 @@ +from lif.schema import core + +__all__ = ["core"] diff --git a/components/lif/schema/core.py b/components/lif/schema/core.py new file mode 100644 index 0000000..3d80c39 --- /dev/null +++ b/components/lif/schema/core.py @@ -0,0 +1,144 @@ +from pathlib import Path +from typing import Any, List, Optional, Union + +import jsonref + +from lif.datatypes.schema import SchemaField +from lif.logging import get_logger +from lif.string_utils.core import camelcase_path, to_camel_case + + +logger = get_logger(__name__) + + +ATTRIBUTE_KEYS = [ + "x-queryable", + "x-mutable", + "DataType", + "Required", + "Array", + "UniqueName", + "enum", + "type", +] + +# ===== SCHEMA FIELD EXTRACTION ===== + + +def extract_nodes(obj: Any, path_prefix: str = "") -> List[SchemaField]: + """ + Recursively extract schema fields from an OpenAPI/JSON Schema object. + + Returns: + List[SchemaField]: Flat list of SchemaField objects. + """ + nodes = [] + + def is_array(node: dict) -> bool: + """Return True if node is an array.""" + return node.get("type") == "array" or "items" in node + + def get_description(node: dict) -> str: + """Get description from node, prefer lower-case.""" + return node.get("Description", "") or node.get("description", "") + + def extract_attributes(node: dict) -> dict: + """Extract core attributes from node.""" + attributes = { + to_camel_case(k): node.get(k) for k in ATTRIBUTE_KEYS if k in node + } + if "Array" in node: + attributes["array"] = node["Array"] + else: + attributes["array"] = "Yes" if is_array(node) else "No" + attributes["type"] = node.get("type", node.get("DataType", None)) + return attributes + + if isinstance(obj, dict): + key = camelcase_path(path_prefix.rstrip(".")) + + branch = ( + "properties" in obj + and isinstance(obj["properties"], dict) + and obj["properties"] + ) or ("items" in obj and isinstance(obj["items"], dict)) + attributes = extract_attributes(obj) + attributes["branch"] = bool(branch) + attributes["leaf"] = not attributes["branch"] + nodes.append( + SchemaField( + json_path=key, + description=get_description(obj), + attributes=attributes, + ) + ) + + # Recurse children + if "properties" in obj and isinstance(obj["properties"], dict): + for prop, val in obj["properties"].items(): + new_prefix = f"{path_prefix}.{prop}" if path_prefix else prop + nodes.extend(extract_nodes(val, new_prefix)) + + if "items" in obj: + items = obj["items"] + if isinstance(items, dict): + nodes.extend(extract_nodes(items, path_prefix)) + elif isinstance(items, list): # tuple validation + for sub_item in items: + nodes.extend(extract_nodes(sub_item, path_prefix)) + + return nodes + + +# ===== ROOT SCHEMA RESOLUTION ===== + + +def resolve_openapi_root(doc: dict, root: str): + """Return the schema node for a given root in the OpenAPI spec.""" + candidates = [] + if "components" in doc and "schemas" in doc["components"]: + schemas = doc["components"]["schemas"] + if root in schemas: + return schemas[root], root + candidates.extend(schemas.keys()) + if "definitions" in doc: + definitions = doc["definitions"] + if root in definitions: + return definitions[root], root + candidates.extend(definitions.keys()) + raise ValueError(f"Root schema '{root}' not found. Available: {sorted(candidates)}") + + +# ===== FILE LOADING ===== + + +def load_schema_nodes( + openapi: Union[str, Path, dict], + root: Optional[str] = None, +) -> List[SchemaField]: + """ + Load and extract schema fields from an OpenAPI JSON file, pathlib.Path, or dictionary. + + Args: + openapi (str | Path | dict): Either a file path (str or Path) to the OpenAPI JSON file, + or a dictionary representing the OpenAPI schema. + root (str, optional): Root key in the schema to resolve. + + Returns: + List[SchemaField]: Extracted SchemaField objects. + """ + if isinstance(openapi, (str, Path)): + with open(openapi, "r") as f: + doc = jsonref.load(f) + elif isinstance(openapi, dict): + # Replace $ref references in dict input + doc = jsonref.JsonRef.replace_refs(openapi) + else: + raise TypeError("openapi must be a str, Path, or dict") + + node = doc + path_prefix = "" + if root: + node, path_prefix = resolve_openapi_root(doc, root) + + return extract_nodes(node, path_prefix) diff --git a/components/lif/string_utils/core.py b/components/lif/string_utils/core.py index 6884657..b56effb 100644 --- a/components/lif/string_utils/core.py +++ b/components/lif/string_utils/core.py @@ -19,6 +19,7 @@ def safe_identifier(name: str) -> str: s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name) s2 = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s1) safe = re.sub(r"\W|^(?=\d)", "_", s2) + safe = re.sub(r"_+", "_", safe) # Collapse consecutive underscores return safe.lower() @@ -31,7 +32,24 @@ def to_pascal_case(*parts: str) -> str: Returns: str: PascalCase string. """ - return "".join("".join(word.capitalize() for word in part.split("_")) for part in parts if part) + result = [] + for part in parts: + if not part: + continue + # First, insert separators at case boundaries (camelCase -> camel_Case) + s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", part.strip()) + s2 = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s1) + # Now split on all separators (underscores, hyphens, spaces) + words = re.split(r"[_\-\s]+", s2) + for word in words: + if not word: + continue + # If word is all uppercase (acronym), keep it; otherwise capitalize + if word.isupper(): + result.append(word) + else: + result.append(word.capitalize()) + return "".join(result) def to_snake_case(name: str) -> str: @@ -41,12 +59,10 @@ def to_snake_case(name: str) -> str: def to_camel_case(s: str) -> str: - """Converts snake_case to lowerCamelCase.""" + """Convert string to camelCase.""" + s = re.sub(r"([_\-\s]+)([a-zA-Z])", lambda m: m.group(2).upper(), s) if not s: return s - if "_" in s: - parts = s.lower().split("_") - return parts[0] + "".join(word.capitalize() for word in parts[1:]) return s[0].lower() + s[1:] diff --git a/pyproject.toml b/pyproject.toml index bef3a42..45bdf63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,6 +111,8 @@ dev-dependencies = ["polylith-cli"] "components/lif/translator" = "lif/translator" "components/lif/mdr_utils" = "lif/mdr_utils" "components/lif/mdr_dto" = "lif/mdr_dto" +"components/lif/schema" = "lif/schema" +"components/lif/dynamic_models" = "lif/dynamic_models" [tool.ruff] line-length = 120 diff --git a/test/components/lif/dynamic_models/__init__.py b/test/components/lif/dynamic_models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/components/lif/dynamic_models/test_core.py b/test/components/lif/dynamic_models/test_core.py new file mode 100644 index 0000000..bed0144 --- /dev/null +++ b/test/components/lif/dynamic_models/test_core.py @@ -0,0 +1,980 @@ +""" +Comprehensive unit tests for the dynamic_models.core module. + +Tests the dynamic Pydantic model building functionality including: +- Building nested models from schema fields +- Filter, mutation, and full model variants +- Enum handling +- Field type mapping +- Error conditions +""" + +import os +import pytest +from enum import Enum +from pathlib import Path +from unittest.mock import patch + +from pydantic import BaseModel, ValidationError + +from lif.dynamic_models import core +from lif.datatypes.schema import SchemaField + + +PATH_TO_TEST_SCHEMA = Path(__file__).parent.parent.parent.parent / "data" / "test_openapi_schema.json" + + +class TestHelperFunctions: + """Test helper functions in the core module.""" + + def test_is_yes(self): + """Test the _is_yes helper function.""" + assert core._is_yes("yes") is True + assert core._is_yes("YES") is True + assert core._is_yes("true") is True + assert core._is_yes("TRUE") is True + assert core._is_yes("1") is True + assert core._is_yes(" yes ") is True + + assert core._is_yes("no") is False + assert core._is_yes("false") is False + assert core._is_yes("0") is False + assert core._is_yes("") is False + assert core._is_yes("maybe") is False + + def test_to_enum_member(self): + """Test the _to_enum_member helper function.""" + assert core._to_enum_member("Valid Option") == "VALID_OPTION" + assert core._to_enum_member("123invalid") == "_123INVALID" + assert core._to_enum_member("special-chars!@#") == "SPECIAL_CHARS___" + assert core._to_enum_member("") == "" + + def test_make_enum(self): + """Test enum creation and caching.""" + # Create enum + enum_cls = core.make_enum("TestEnum", ["option1", "option2", "option3"]) + assert issubclass(enum_cls, Enum) + + # Test enum values + assert hasattr(enum_cls, "OPTION1") + assert hasattr(enum_cls, "OPTION2") + assert hasattr(enum_cls, "OPTION3") + + # Test enum value content + option1_member = getattr(enum_cls, "OPTION1") + assert option1_member.value == "option1" + + # Test caching - same values should return same class + enum_cls2 = core.make_enum("TestEnum", ["option1", "option2", "option3"]) + assert enum_cls is enum_cls2 + + # Different values should return different class + enum_cls3 = core.make_enum("TestEnum", ["option1", "option2", "option4"]) + assert enum_cls is not enum_cls3 + + +class TestFieldTypeMapping: + """Test field type mapping through model creation and validation.""" + + def test_string_field_in_model(self): + """Test string field type mapping and validation.""" + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid string assignment + instance = test_model(field="hello") + assert getattr(instance, "field") == "hello" + + # Test None assignment (optional) + instance_none = test_model(field=None) + assert getattr(instance_none, "field") is None + + # Test string conversion from valid types + instance_converted = test_model(field="123") + assert getattr(instance_converted, "field") == "123" + + # Test that invalid types raise ValidationError + with pytest.raises(ValidationError): + test_model(field=123) # Integer not allowed for strict string + + def test_integer_field_in_model(self): + """Test integer field type mapping and validation.""" + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:integer", "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid integer assignment + instance = test_model(field=42) + assert getattr(instance, "field") == 42 + + # Test string to integer conversion (if supported) + try: + instance_converted = test_model(field="100") + assert getattr(instance_converted, "field") == 100 + except ValidationError: + # If strict mode doesn't allow string conversion, that's also valid + pass + + # Test invalid conversion should raise ValidationError + with pytest.raises(ValidationError): + test_model(field="not_a_number") + + def test_boolean_field_in_model(self): + """Test boolean field type mapping and validation.""" + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:boolean", "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid boolean assignment + instance_true = test_model(field=True) + assert getattr(instance_true, "field") is True + + instance_false = test_model(field=False) + assert getattr(instance_false, "field") is False + + # Test truthy/falsy conversion + instance_truthy = test_model(field=1) + assert getattr(instance_truthy, "field") is True + + instance_falsy = test_model(field=0) + assert getattr(instance_falsy, "field") is False + + def test_date_field_in_model(self): + """Test date field type mapping and validation.""" + from datetime import date + + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:date", "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid date assignment + test_date = date(2023, 12, 25) + instance = test_model(field=test_date) + assert getattr(instance, "field") == test_date + + # Test string date parsing (if supported by Pydantic) + try: + instance_str = test_model(field="2023-12-25") + parsed_date = getattr(instance_str, "field") + assert isinstance(parsed_date, date) + except ValidationError: + # If strict parsing is not enabled, that's also valid behavior + pass + + def test_datetime_field_in_model(self): + """Test datetime field type mapping and validation.""" + from datetime import datetime + + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:dateTime", "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid datetime assignment + test_datetime = datetime(2023, 12, 25, 14, 30, 0) + instance = test_model(field=test_datetime) + assert getattr(instance, "field") == test_datetime + + # Test ISO string parsing (if supported) + try: + instance_str = test_model(field="2023-12-25T14:30:00") + parsed_datetime = getattr(instance_str, "field") + assert isinstance(parsed_datetime, datetime) + except ValidationError: + # If strict parsing is not enabled, that's also valid behavior + pass + + def test_enum_field_in_model(self): + """Test enum field type mapping and validation.""" + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "enum": ["option1", "option2", "option3"], "array": "No"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid enum value assignment + instance = test_model(field="option1") + enum_value = getattr(instance, "field") + assert enum_value.value == "option1" + + # Test all enum options + for option in ["option1", "option2", "option3"]: + instance = test_model(field=option) + assert getattr(instance, "field").value == option + + # Test invalid enum value + with pytest.raises(ValidationError): + test_model(field="invalid_option") + + def test_array_field_in_model(self): + """Test array field type mapping and validation.""" + fields = [ + SchemaField( + json_path="test.field", + description="Test field", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "Yes"}, + ) + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Test valid list assignment + instance = test_model(field=["item1", "item2", "item3"]) + field_value = getattr(instance, "field") + assert field_value == ["item1", "item2", "item3"] + + # Test empty list + instance_empty = test_model(field=[]) + assert getattr(instance_empty, "field") == [] + + # Test None assignment (optional) + instance_none = test_model(field=None) + assert getattr(instance_none, "field") is None + + def test_field_descriptions_preserved(self): + """Test that field descriptions are preserved in the model.""" + fields = [ + SchemaField( + json_path="test.name", + description="A person's full name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="test.age", + description="Age in years", + attributes={"xQueryable": True, "dataType": "xsd:integer", "array": "No"}, + ), + ] + models = core.build_dynamic_model(fields) + assert "test" in models + + test_model = models["test"] + + # Check that the model has the expected fields + instance = test_model() + assert hasattr(instance, "name") + assert hasattr(instance, "age") + + # Verify field information is accessible through model schema + schema = test_model.model_json_schema() + if "properties" in schema: + if "name" in schema["properties"]: + assert "description" in schema["properties"]["name"] + if "age" in schema["properties"]: + assert "description" in schema["properties"]["age"] + + +class TestBuildDynamicModel: + """Test the main build_dynamic_model function.""" + + def test_empty_fields(self): + """Test with empty schema fields.""" + result = core.build_dynamic_model([]) + assert result == {} + + def test_no_matching_fields(self): + """Test with fields that don't match the attribute flag.""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xMutable": True}, # No xQueryable + ) + ] + result = core.build_dynamic_model(fields, attribute_flag="xQueryable") + assert result == {} + + def test_multiple_roots_error(self): + """Test error when fields have different root paths.""" + fields = [ + SchemaField(json_path="person.name", description="Person name", attributes={"xQueryable": True}), + SchemaField( + json_path="organization.name", description="Organization name", attributes={"xQueryable": True} + ), + ] + with pytest.raises(ValueError, match="must share a common root"): + core.build_dynamic_model(fields) + + def test_simple_model_creation(self): + """Test creating a simple model with basic fields.""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.age", + description="Person age", + attributes={"xQueryable": True, "dataType": "xsd:integer", "array": "No"}, + ), + ] + + models = core.build_dynamic_model(fields) + + assert "person" in models + assert "person_wrapper" in models + + # Test inner model + person_model = models["person"] + assert issubclass(person_model, BaseModel) + + # Create instance + instance = person_model(name="John", age=30) + assert getattr(instance, "name") == "John" + assert getattr(instance, "age") == 30 + + # Test with None values (optional fields) + instance2 = person_model() + assert getattr(instance2, "name") is None + assert getattr(instance2, "age") is None + + def test_nested_model_creation(self): + """Test creating nested models.""" + fields = [ + SchemaField( + json_path="person.name.firstName", + description="First name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.name.lastName", + description="Last name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.age", + description="Person age", + attributes={"xQueryable": True, "dataType": "xsd:integer", "array": "No"}, + ), + ] + + models = core.build_dynamic_model(fields) + + assert "person" in models + # Check that nested models were created (key may vary based on implementation) + assert len(models) > 1 # Should have more than just the root model + + # Test nested structure + person_model = models["person"] + instance = person_model() + + # Should have name and age fields + assert hasattr(instance, "name") + assert hasattr(instance, "age") + + def test_array_fields(self): + """Test handling of array fields.""" + fields = [ + SchemaField( + json_path="person.identifier", + description="Identifiers", + attributes={"xQueryable": True, "array": "Yes", "branch": True}, + ), + SchemaField( + json_path="person.identifier.value", + description="Identifier value", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + ] + + models = core.build_dynamic_model(fields) + + # Should create models + assert len(models) > 0 + + def test_enum_fields(self): + """Test handling of enum fields.""" + fields = [ + SchemaField( + json_path="person.gender", + description="Gender", + attributes={"xQueryable": True, "enum": ["Male", "Female", "Other"], "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + + assert "person" in models + person_model = models["person"] + + # Create instance with enum value + instance = person_model(gender="Male") + # Enum fields return enum members, so we need to check the value + gender_value = getattr(instance, "gender") + assert gender_value.value == "Male" + + def test_model_suffix(self): + """Test model suffix application.""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields, model_suffix="Filter") + + # Model names should include suffix + person_model = models["person"] + assert person_model.__name__.endswith("Filter") + + def test_all_optional_false(self): + """Test with all_optional=False.""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields, all_optional=False) + + person_model = models["person"] + + # Should require fields when all_optional=False + with pytest.raises(ValidationError): + person_model() # Missing required field + + def test_allow_extra_true(self): + """Test with allow_extra=True.""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields, allow_extra=True) + + person_model = models["person"] + + # Should allow extra fields + instance = person_model(name="John", extra_field="value") + assert getattr(instance, "extra_field") == "value" + + def test_attribute_flag_none(self): + """Test with attribute_flag=None (include all fields).""" + fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"dataType": "xsd:string", "array": "No"}, # No xQueryable + ) + ] + + models = core.build_dynamic_model(fields, attribute_flag=None) + + assert "person" in models + person_model = models["person"] + assert hasattr(person_model(), "name") + + +class TestBuilderFunctions: + """Test the convenience builder functions.""" + + @patch("lif.dynamic_models.core.get_schema_fields") + def test_build_filter_models(self, mock_get_schema_fields): + """Test build_filter_models function.""" + mock_fields = [ + # SchemaField( + # json_path="person.name", + # description="Person name", + # attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + # ) + # SchemaField( + # json_path='person', + # description='', + # attributes={'xMutable': False, 'type': 'object', 'array': 'No', 'branch': True, 'leaf': False}, + # py_field_name='' + # ) + SchemaField( + json_path="person.identifier.identifier", + description='A number and/or alphanumeric code used to uniquely identify the entity. Use "missing at will", "ad-hoc" and "not applicable" for missing data to avoid skewed outcomes.', + attributes={ + "xQueryable": True, + "xMutable": False, + "dataType": "xsd:string", + "required": "Yes", + "array": "No", + "uniqueName": "Common.Identifier.identifier", + "type": "xsd:string", + "branch": False, + "leaf": True, + }, + py_field_name="", + ) + ] + mock_get_schema_fields.return_value = mock_fields + + models = core.build_filter_models(mock_fields) + + print(models) + + assert len(models) > 0 + # Should have Filter suffix + for model_cls in models.values(): + if hasattr(model_cls, "__name__"): + assert "Filter" in model_cls.__name__ + + @patch("lif.dynamic_models.core.get_schema_fields") + def test_build_mutation_models(self, mock_get_schema_fields): + """Test build_mutation_models function.""" + mock_fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xMutable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + mock_get_schema_fields.return_value = mock_fields + + models = core.build_mutation_models(mock_fields) + + assert len(models) > 0 + # Should have Mutation suffix + for model_cls in models.values(): + if hasattr(model_cls, "__name__"): + assert "Mutation" in model_cls.__name__ + + @patch("lif.dynamic_models.core.get_schema_fields") + def test_build_full_models(self, mock_get_schema_fields): + """Test build_full_models function.""" + mock_fields = [ + SchemaField( + json_path="person.name", description="Person name", attributes={"dataType": "xsd:string", "array": "No"} + ) + ] + mock_get_schema_fields.return_value = mock_fields + + models = core.build_full_models(mock_fields) + + assert len(models) > 0 + # Should have Type suffix + for model_cls in models.values(): + if hasattr(model_cls, "__name__"): + assert "Type" in model_cls.__name__ + + +class TestGetSchemaFields: + """Test the get_schema_fields function.""" + + @patch.dict(os.environ, {"OPENAPI_SCHEMA_FILE": str(PATH_TO_TEST_SCHEMA), "ROOT_NODE": "Person"}) + @patch("lif.dynamic_models.core.load_schema_nodes") + def test_get_schema_fields_with_env_vars(self, mock_load_schema_nodes): + """Test get_schema_fields with environment variables.""" + mock_fields = [SchemaField("person.name", "Name", {})] + mock_load_schema_nodes.return_value = mock_fields + + result = core.get_schema_fields() + + # Verify the function was called with correct arguments + mock_load_schema_nodes.assert_called_once() + call_args = mock_load_schema_nodes.call_args[0] + assert str(call_args[0]) == str(PATH_TO_TEST_SCHEMA) + assert call_args[1] == "Person" + assert result == mock_fields + + def test_get_schema_fields_no_env_var(self): + """Test get_schema_fields without OPENAPI_SCHEMA_FILE.""" + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(ValueError, match="OPENAPI_SCHEMA_FILE environment variable is not set"): + core.get_schema_fields() + + +class TestBuildAllModels: + """Test the build_all_models function.""" + + @patch("lif.dynamic_models.core.get_schema_fields") + def test_build_all_models(self, mock_get_schema_fields): + """Test build_all_models function.""" + mock_fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "xMutable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + mock_get_schema_fields.return_value = mock_fields + + fields, filter_models, mutation_models, full_models = core.build_all_models() + + assert fields == mock_fields + assert len(filter_models) > 0 + assert len(mutation_models) > 0 + assert len(full_models) > 0 + + @patch("lif.dynamic_models.core.get_schema_fields") + def test_build_all_models_custom_options(self, mock_get_schema_fields): + """Test build_all_models with custom options.""" + mock_fields = [ + SchemaField( + json_path="person.name", + description="Person name", + attributes={"xQueryable": True, "xMutable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + mock_get_schema_fields.return_value = mock_fields + + fields, filter_models, mutation_models, full_models = core.build_all_models( + filter_allow_extra=False, + filter_all_optional=False, + mutation_allow_extra=True, + mutation_all_optional=False, + full_allow_extra=True, + full_all_optional=True, + ) + + assert fields == mock_fields + assert len(filter_models) > 0 + assert len(mutation_models) > 0 + assert len(full_models) > 0 + + +class TestRealWorldIntegration: + """Integration tests with the actual test schema file.""" + + def test_with_test_schema_file(self): + """Test with the actual test_openapi_schema.json file.""" + with patch.dict(os.environ, {"OPENAPI_SCHEMA_FILE": str(PATH_TO_TEST_SCHEMA), "ROOT_NODE": "Person"}): + fields = core.get_schema_fields() + + # Should have loaded fields from the test schema + assert len(fields) > 0 + + # Test building models + filter_models = core.build_filter_models(fields) + assert len(filter_models) > 0 + + # Test that we can create instances + if "person" in filter_models: + person_model = filter_models["person"] + instance = person_model() + assert instance is not None + + def test_end_to_end_model_creation(self): + """End-to-end test of model creation and usage.""" + # Create test fields manually to simulate real usage + fields = [ + SchemaField( + json_path="person.identifier.identifier", + description="A number and/or alphanumeric code used to uniquely identify the entity", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.identifier.identifierType", + description="The types of sources of identifiers used to uniquely identify the entity", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.name.firstName", + description="The first name of a person or individual", + attributes={"xMutable": False, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.name.lastName", + description="The last name of a person or individual", + attributes={"xMutable": False, "dataType": "xsd:string", "array": "No"}, + ), + ] + + # Build filter models (only xQueryable fields) + filter_models = core.build_filter_models(fields) + + # Should have models + assert len(filter_models) > 0 + + # Build full models (all fields) - use all_optional=True for this test + full_models = core.build_full_models(fields, all_optional=True) + + # Should have models + assert len(full_models) > 0 + + # Test that models work correctly + if "person" in full_models: + person_model = full_models["person"] + + # Create instance (should work with all_optional=True) + instance = person_model() + assert instance is not None + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_invalid_enum_values(self): + """Test handling of invalid enum values.""" + fields = [ + SchemaField( + json_path="person.status", + description="Status", + attributes={"xQueryable": True, "enum": ["Active", "Inactive"], "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + person_model = models["person"] + + # Valid enum value should work + instance = person_model(status="Active") + assert getattr(instance, "status").value == "Active" + + # Invalid enum value should raise validation error + with pytest.raises(ValidationError): + person_model(status="Invalid") + + def test_complex_nested_structure(self): + """Test deeply nested field structures.""" + fields = [ + SchemaField( + json_path="person.contact.address.street", + description="Street address", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.contact.email.address", + description="Email address", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + ] + + models = core.build_dynamic_model(fields) + + # Should create nested models + assert "person" in models + assert len(models) > 1 + + # Test that nested structure works + person_model = models["person"] + instance = person_model() + + # Should have contact field + assert hasattr(instance, "contact") + + def test_special_characters_in_enum(self): + """Test enum with special characters.""" + fields = [ + SchemaField( + json_path="person.type", + description="Person type", + attributes={"xQueryable": True, "enum": ["Type-A", "Type B", "Type@C"], "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + person_model = models["person"] + + # Should handle special characters in enum values + instance = person_model(type="Type-A") + assert getattr(instance, "type").value == "Type-A" + + # Test all special character variants + for type_val in ["Type-A", "Type B", "Type@C"]: + instance = person_model(type=type_val) + assert getattr(instance, "type").value == type_val + + def test_empty_description(self): + """Test fields with empty descriptions.""" + fields = [ + SchemaField( + json_path="person.field", + description="", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + assert len(models) > 0 + + # Should still create working model + person_model = models["person"] + instance = person_model(field="test") + assert getattr(instance, "field") == "test" + + def test_missing_attributes(self): + """Test fields with minimal attributes.""" + fields = [ + SchemaField( + json_path="person.field", + description="Basic field", + attributes={"xQueryable": True}, # Minimal attributes + ) + ] + + models = core.build_dynamic_model(fields) + assert len(models) > 0 + + # Should create model with default string type + person_model = models["person"] + instance = person_model(field="test") + assert getattr(instance, "field") == "test" + + def test_array_with_nested_objects(self): + """Test arrays containing nested objects.""" + fields = [ + SchemaField( + json_path="person.addresses", + description="List of addresses", + attributes={"xQueryable": True, "array": "Yes", "branch": True}, + ), + SchemaField( + json_path="person.addresses.street", + description="Street address", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + SchemaField( + json_path="person.addresses.city", + description="City", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ), + ] + + models = core.build_dynamic_model(fields) + assert len(models) > 0 + + # Should create model with nested array structure + person_model = models["person"] + instance = person_model() + assert hasattr(instance, "addresses") + + def test_very_long_field_names(self): + """Test handling of very long field names.""" + long_name = "a" * 100 # 100 character field name + fields = [ + SchemaField( + json_path=f"person.{long_name}", + description="Field with very long name", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + person_model = models["person"] + + # Should handle long field names gracefully + instance = person_model(**{long_name: "test_value"}) + assert getattr(instance, long_name) == "test_value" + + def test_unicode_in_descriptions(self): + """Test handling of unicode characters in descriptions.""" + fields = [ + SchemaField( + json_path="person.name", + description="Имя пользователя (User name in Cyrillic) 用户名 (Chinese)", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ] + + models = core.build_dynamic_model(fields) + assert len(models) > 0 + + person_model = models["person"] + instance = person_model(name="Test") + assert getattr(instance, "name") == "Test" + + +class TestPerformance: + """Test performance aspects of model generation.""" + + def test_large_schema_handling(self): + """Test model generation with a large number of fields.""" + import time + + # Generate 100 fields + fields = [] + for i in range(100): + fields.append( + SchemaField( + json_path=f"person.field_{i}", + description=f"Test field number {i}", + attributes={"xQueryable": True, "dataType": "xsd:string", "array": "No"}, + ) + ) + + start_time = time.time() + models = core.build_dynamic_model(fields) + end_time = time.time() + + # Should complete in reasonable time (less than 5 seconds) + assert (end_time - start_time) < 5.0 + assert len(models) > 0 + + # Test that the resulting model works + person_model = models["person"] + test_data = {f"field_{i}": f"value_{i}" for i in range(10)} # Test first 10 fields + instance = person_model(**test_data) + + for i in range(10): + assert getattr(instance, f"field_{i}") == f"value_{i}" + + def test_enum_caching_efficiency(self): + """Test that enum caching works efficiently.""" + # Create the same enum multiple times + enum_values = ["A", "B", "C"] + + enum1 = core.make_enum("TestEnum", enum_values) + enum2 = core.make_enum("TestEnum", enum_values) + enum3 = core.make_enum("TestEnum", enum_values) + + # Should return the same class (cached) + assert enum1 is enum2 is enum3 + + # Different values should create different enums + enum4 = core.make_enum("TestEnum", ["A", "B", "D"]) + assert enum1 is not enum4 + + +def test_sample(): + """Legacy test to maintain compatibility.""" + assert core is not None diff --git a/test/components/lif/schema/__init__.py b/test/components/lif/schema/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/components/lif/schema/test_core.py b/test/components/lif/schema/test_core.py new file mode 100644 index 0000000..aded94a --- /dev/null +++ b/test/components/lif/schema/test_core.py @@ -0,0 +1,424 @@ +import pytest +from pathlib import Path +from unittest.mock import mock_open, patch + +from lif.schema import core + + +class TestExtractNodes: + """Tests for the extract_nodes function.""" + + def test_extract_simple_string_field(self): + """Test extracting a simple string field.""" + obj = {"type": "string", "description": "A simple string field", "x-queryable": True} + + nodes = core.extract_nodes(obj, "test.field") + + assert len(nodes) == 1 + node = nodes[0] + assert node.json_path == "test.field" + assert node.description == "A simple string field" + assert node.attributes["xQueryable"] is True + assert node.attributes["type"] == "string" + assert node.attributes["array"] == "No" + assert node.attributes["leaf"] is True + assert node.attributes["branch"] is False + + def test_extract_array_field(self): + """Test extracting an array field.""" + obj = {"type": "array", "items": {"type": "string"}, "description": "An array of strings"} + + nodes = core.extract_nodes(obj, "test.array") + + assert len(nodes) == 2 + # First node is the array container + array_node = nodes[0] + assert array_node.json_path == "test.array" + assert array_node.description == "An array of strings" + assert array_node.attributes["type"] == "array" + assert array_node.attributes["array"] == "Yes" + assert array_node.attributes["branch"] is True + + # Second node is the items + items_node = nodes[1] + assert items_node.json_path == "test.array" + assert items_node.attributes["type"] == "string" + + def test_extract_object_with_properties(self): + """Test extracting an object with properties.""" + obj = { + "type": "object", + "description": "An object with properties", + "properties": { + "name": {"type": "string", "description": "Name field", "x-mutable": False}, + "age": {"type": "integer", "description": "Age field"}, + }, + } + + nodes = core.extract_nodes(obj, "person") + + assert len(nodes) == 3 + + # First node is the object container + obj_node = nodes[0] + assert obj_node.json_path == "person" + assert obj_node.description == "An object with properties" + assert obj_node.attributes["type"] == "object" + assert obj_node.attributes["branch"] is True + + # Second and third nodes are the properties + name_node = next(n for n in nodes if n.json_path == "person.name") + assert name_node.description == "Name field" + assert name_node.attributes["type"] == "string" + assert name_node.attributes["xMutable"] is False + + age_node = next(n for n in nodes if n.json_path == "person.age") + assert age_node.description == "Age field" + assert age_node.attributes["type"] == "integer" + + def test_extract_with_custom_attributes(self): + """Test extracting fields with custom LIF attributes.""" + obj = { + "type": "string", + "Description": "Custom description", # uppercase D + "DataType": "xsd:string", + "Required": "Yes", + "Array": "No", + "UniqueName": "Person.Name.firstName", + "enum": ["option1", "option2"], + } + + nodes = core.extract_nodes(obj, "name") + + assert len(nodes) == 1 + node = nodes[0] + assert node.description == "Custom description" + assert node.attributes["dataType"] == "xsd:string" + assert node.attributes["required"] == "Yes" + assert node.attributes["array"] == "No" + assert node.attributes["uniqueName"] == "Person.Name.firstName" + assert node.attributes["enum"] == ["option1", "option2"] + + def test_extract_nested_structure(self): + """Test extracting a deeply nested structure.""" + obj = { + "type": "object", + "properties": { + "contact": { + "type": "array", + "items": { + "type": "object", + "properties": { + "email": {"type": "string", "description": "Email address", "x-queryable": True} + }, + }, + } + }, + } + + nodes = core.extract_nodes(obj, "person") + + # Should have: person, person.contact, person.contact (items), person.contact.email + assert len(nodes) == 4 + + email_node = next(n for n in nodes if n.json_path == "person.contact.email") + assert email_node.description == "Email address" + assert email_node.attributes["xQueryable"] is True + + def test_extract_with_empty_path_prefix(self): + """Test extracting with empty path prefix.""" + obj = {"type": "string", "description": "Root field"} + + nodes = core.extract_nodes(obj, "") + + assert len(nodes) == 1 + assert nodes[0].json_path == "" + + def test_extract_non_dict_returns_empty(self): + """Test that non-dict objects return empty list.""" + nodes = core.extract_nodes("not a dict", "path") + assert nodes == [] + + nodes = core.extract_nodes(123, "path") + assert nodes == [] + + nodes = core.extract_nodes(None, "path") + assert nodes == [] + + +class TestResolveOpenApiRoot: + """Tests for the resolve_openapi_root function.""" + + def test_resolve_from_components_schemas(self): + """Test resolving root from components.schemas.""" + doc = {"components": {"schemas": {"Person": {"type": "object"}, "Organization": {"type": "object"}}}} + + schema, root = core.resolve_openapi_root(doc, "Person") + + assert root == "Person" + assert schema == {"type": "object"} + + def test_resolve_from_definitions(self): + """Test resolving root from definitions (older OpenAPI/JSON Schema).""" + doc = {"definitions": {"User": {"type": "object"}, "Product": {"type": "object"}}} + + schema, root = core.resolve_openapi_root(doc, "User") + + assert root == "User" + assert schema == {"type": "object"} + + def test_resolve_components_takes_precedence(self): + """Test that components.schemas takes precedence over definitions.""" + doc = { + "components": {"schemas": {"Person": {"type": "object", "description": "from components"}}}, + "definitions": {"Person": {"type": "object", "description": "from definitions"}}, + } + + schema, root = core.resolve_openapi_root(doc, "Person") + + assert schema["description"] == "from components" + + def test_resolve_nonexistent_root_raises_error(self): + """Test that resolving a non-existent root raises ValueError.""" + doc = {"components": {"schemas": {"Person": {"type": "object"}}}} + + with pytest.raises(ValueError) as exc_info: + core.resolve_openapi_root(doc, "NonExistent") + + assert "Root schema 'NonExistent' not found" in str(exc_info.value) + assert "Person" in str(exc_info.value) + + def test_resolve_empty_doc_raises_error(self): + """Test that resolving from empty doc raises ValueError.""" + doc = {} + + with pytest.raises(ValueError) as exc_info: + core.resolve_openapi_root(doc, "Person") + + assert "Root schema 'Person' not found" in str(exc_info.value) + + +class TestLoadSchemaNodes: + """Tests for the load_schema_nodes function.""" + + def test_load_from_dict(self): + """Test loading schema nodes from a dictionary.""" + schema_dict = {"type": "object", "properties": {"name": {"type": "string", "description": "Person name"}}} + + nodes = core.load_schema_nodes(schema_dict) + + assert len(nodes) == 2 + assert any(n.json_path == "" for n in nodes) + assert any(n.json_path == "name" for n in nodes) + + def test_load_from_dict_with_root(self): + """Test loading schema nodes from dict with specific root.""" + schema_dict = { + "components": {"schemas": {"Person": {"type": "object", "properties": {"name": {"type": "string"}}}}} + } + + nodes = core.load_schema_nodes(schema_dict, root="Person") + + assert len(nodes) == 2 + # The root should be resolved and path should start with the camelCase root name + assert any(n.json_path == "person" for n in nodes) # Root node + assert any(n.json_path == "person.name" for n in nodes) # Property node + + @patch("builtins.open", new_callable=mock_open) + @patch("jsonref.load") + def test_load_from_file_path_string(self, mock_jsonref_load, mock_file_open): + """Test loading schema nodes from file path as string.""" + schema_data = {"type": "object", "properties": {"id": {"type": "string"}}} + mock_jsonref_load.return_value = schema_data + + nodes = core.load_schema_nodes("/path/to/schema.json") + + mock_file_open.assert_called_once_with("/path/to/schema.json", "r") + mock_jsonref_load.assert_called_once() + assert len(nodes) == 2 + + @patch("builtins.open", new_callable=mock_open) + @patch("jsonref.load") + def test_load_from_pathlib_path(self, mock_jsonref_load, mock_file_open): + """Test loading schema nodes from pathlib.Path.""" + schema_data = {"type": "string"} + mock_jsonref_load.return_value = schema_data + + path = Path("/path/to/schema.json") + nodes = core.load_schema_nodes(path) + + mock_file_open.assert_called_once_with(path, "r") + assert len(nodes) == 1 + + def test_load_invalid_type_raises_error(self): + """Test that invalid input type raises TypeError.""" + with pytest.raises(TypeError) as exc_info: + core.load_schema_nodes(123) # type: ignore + + assert "openapi must be a str, Path, or dict" in str(exc_info.value) + + @patch("lif.schema.core.jsonref") + def test_load_dict_calls_replace_refs(self, mock_jsonref): + """Test that loading from dict calls jsonref.replace_refs.""" + schema_dict = {"type": "string"} + mock_jsonref.JsonRef.replace_refs.return_value = schema_dict + + core.load_schema_nodes(schema_dict) + + mock_jsonref.JsonRef.replace_refs.assert_called_once_with(schema_dict) + + +class TestAttributeKeys: + """Test that ATTRIBUTE_KEYS contains expected values.""" + + def test_attribute_keys_content(self): + """Test that ATTRIBUTE_KEYS contains the expected keys.""" + expected_keys = ["x-queryable", "x-mutable", "DataType", "Required", "Array", "UniqueName", "enum", "type"] + + assert core.ATTRIBUTE_KEYS == expected_keys + + +class TestHelperFunctions: + """Test helper functions within extract_nodes.""" + + def test_is_array_detection(self): + """Test array detection logic.""" + # Test with direct array access since is_array is nested + obj_with_type_array = {"type": "array"} + nodes = core.extract_nodes(obj_with_type_array, "test") + assert nodes[0].attributes["array"] == "Yes" + + obj_with_items = {"items": {"type": "string"}} + nodes = core.extract_nodes(obj_with_items, "test") + assert nodes[0].attributes["array"] == "Yes" + + obj_without_array = {"type": "string"} + nodes = core.extract_nodes(obj_without_array, "test") + assert nodes[0].attributes["array"] == "No" + + def test_description_preference(self): + """Test that uppercase 'Description' is preferred over 'description'.""" + obj_with_both = { + "type": "string", + "Description": "Uppercase description", + "description": "Lowercase description", + } + + nodes = core.extract_nodes(obj_with_both, "test") + assert nodes[0].description == "Uppercase description" + + obj_with_lowercase_only = {"type": "string", "description": "Lowercase only"} + + nodes = core.extract_nodes(obj_with_lowercase_only, "test") + assert nodes[0].description == "Lowercase only" + + +class TestIntegrationWithTestSchema: + """Integration tests using the test schema.""" + + def test_extract_from_simple_person_schema(self): + """Test extracting from a simple person schema.""" + person_schema = { + "type": "object", + "properties": { + "Identifier": { + "type": "array", + "properties": { + "identifier": {"type": "string", "description": "A unique identifier", "x-queryable": True}, + "identifierType": {"type": "string", "x-queryable": True}, + }, + }, + "Name": { + "type": "array", + "properties": { + "firstName": {"type": "string", "x-mutable": False}, + "lastName": {"type": "string", "x-mutable": False}, + }, + }, + }, + } + + nodes = core.extract_nodes(person_schema, "Person") + + # Should extract: Person, Identifier, identifier, identifierType, Name, firstName, lastName + assert len(nodes) == 7 + + # Check specific nodes + person_node = next(n for n in nodes if n.json_path == "person") + assert person_node.attributes["type"] == "object" + assert person_node.attributes["branch"] is True + + id_node = next(n for n in nodes if n.json_path == "person.identifier.identifier") + assert id_node.attributes["xQueryable"] is True + assert id_node.description == "A unique identifier" + + first_name_node = next(n for n in nodes if n.json_path == "person.name.firstName") + assert first_name_node.attributes["xMutable"] is False + + +class TestWithRealTestSchema: + """Test with the actual test schema file.""" + + def test_load_test_schema_file(self): + """Test loading the actual test_openapi_schema.json file.""" + test_schema_path = Path(__file__).parent.parent.parent.parent / "data" / "test_openapi_schema.json" + + nodes = core.load_schema_nodes(test_schema_path, root="Person") + + # Should have nodes for Person and its properties + assert len(nodes) > 0 + + # Check that we have the expected main properties + paths = [n.json_path for n in nodes] + assert "person" in paths # Root Person object + assert any("identifier" in path for path in paths) # Identifier array + assert any("name" in path for path in paths) # Name array + assert any("proficiency" in path for path in paths) # Proficiency array + assert any("contact" in path for path in paths) # Contact array + + # Check that x-queryable and x-mutable attributes are preserved + queryable_nodes = [n for n in nodes if n.attributes.get("xQueryable")] + mutable_nodes = [n for n in nodes if "xMutable" in n.attributes] + + assert len(queryable_nodes) > 0 # Should have some queryable fields + assert len(mutable_nodes) > 0 # Should have some mutable fields + + +# Additional edge case tests +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_extract_with_tuple_validation_items(self): + """Test extracting with items as list (tuple validation).""" + obj = {"type": "array", "items": [{"type": "string"}, {"type": "number"}]} + + nodes = core.extract_nodes(obj, "tuple_array") + + # Should have the array node plus nodes for each item type + assert len(nodes) >= 1 + assert nodes[0].attributes["array"] == "Yes" + + def test_extract_with_missing_attributes(self): + """Test extraction when some attributes are missing.""" + obj = { + # No type, no description + "x-queryable": True + } + + nodes = core.extract_nodes(obj, "test") + + assert len(nodes) == 1 + node = nodes[0] + assert node.description == "" + assert node.attributes["xQueryable"] is True + assert node.attributes["type"] is None + + def test_camelcase_path_conversion(self): + """Test that paths are properly converted to camelCase.""" + obj = {"type": "string"} + + nodes = core.extract_nodes(obj, "some-complex_path.with-dashes") + + # The camelcase_path function should be called on the path + assert len(nodes) == 1 + # The actual conversion is handled by camelcase_path function in string_utils diff --git a/test/components/lif/string_utils/test_core.py b/test/components/lif/string_utils/test_core.py index da55029..aeb2210 100644 --- a/test/components/lif/string_utils/test_core.py +++ b/test/components/lif/string_utils/test_core.py @@ -1,5 +1,94 @@ -from lif.string_utils import core +from datetime import date, datetime +from lif.string_utils import ( + safe_identifier, + to_pascal_case, + to_snake_case, + to_camel_case, + camelcase_path, + dict_keys_to_snake, + dict_keys_to_camel, + convert_dates_to_strings, + to_value_enum_name, +) -def test_sample(): - assert core is not None + +class TestSafeIdentifier: + def test_basic(self): + assert safe_identifier("First Name") == "first_name" + assert safe_identifier("first-name") == "first_name" + assert safe_identifier("first$name") == "first_name" + + def test_leading_digit(self): + assert safe_identifier("123abc") == "_123abc" + + def test_camel_pascal(self): + assert safe_identifier("CamelCase") == "camel_case" + assert safe_identifier("camelCaseABC") == "camel_case_abc" + + +class TestToPascalCase: + def test_single_part(self): + assert to_pascal_case("hello world") == "HelloWorld" + assert to_pascal_case("hello-world") == "HelloWorld" + assert to_pascal_case("hello_world") == "HelloWorld" + + def test_multiple_parts(self): + assert to_pascal_case("hello", "world") == "HelloWorld" + assert to_pascal_case("HTTP", "status", "200") == "HTTPStatus200" + + def test_mixed_case(self): + assert to_pascal_case("camelCase") == "CamelCase" + assert to_pascal_case("PascalCase") == "PascalCase" + + +class TestToSnakeCase: + def test_basic(self): + assert to_snake_case("CamelCase") == "camel_case" + assert to_snake_case("camelCase") == "camel_case" + + def test_with_acronyms(self): + assert to_snake_case("HTTPServerID") == "http_server_id" + + +class TestToCamelCase: + def test_basic(self): + assert to_camel_case("hello_world") == "helloWorld" + assert to_camel_case("Hello World") == "helloWorld" + assert to_camel_case("hello-world") == "helloWorld" + + def test_empty(self): + assert to_camel_case("") == "" + + +class TestCamelcasePath: + def test_path(self): + assert camelcase_path("a.b_c.d-e f") == "a.bC.dEF" + + +class TestDictKeyTransforms: + def test_to_snake(self): + data = {"FirstName": "Alice", "Address": {"zipCode": 12345}, "items": [{"itemID": 1}]} + out = dict_keys_to_snake(data) + assert out == {"first_name": "Alice", "address": {"zip_code": 12345}, "items": [{"item_id": 1}]} + + def test_to_camel(self): + data = {"first_name": "Bob", "address": {"zip_code": 12345}, "items": [{"item_id": 1}]} + out = dict_keys_to_camel(data) + assert out == {"firstName": "Bob", "address": {"zipCode": 12345}, "items": [{"itemId": 1}]} + + +class TestConvertDatesToStrings: + def test_nested(self): + d = date(2020, 1, 2) + dt = datetime(2020, 1, 2, 3, 4, 5) + obj = {"when": d, "arr": [dt, {"n": 1}]} + out = convert_dates_to_strings(obj) + assert out == {"when": d.isoformat(), "arr": [dt.isoformat(), {"n": 1}]} + + +class TestToValueEnumName: + def test_basic(self): + assert to_value_enum_name("in progress") == "IN_PROGRESS" + assert to_value_enum_name("done!") == "DONE_" + assert to_value_enum_name("123start") == "_123START" diff --git a/test/data/test_openapi_schema.json b/test/data/test_openapi_schema.json new file mode 100644 index 0000000..249c9c8 --- /dev/null +++ b/test/data/test_openapi_schema.json @@ -0,0 +1,71 @@ +{ + "openapi": "3.0.0", + "info": {"title": "Test", "version": "1.0"}, + "paths": {}, + "components": { + "schemas": { + "Person": { + "type": "object", + "properties": { + "Identifier": { + "type": "array", + "properties": { + "identifier": { + "type": "string", + "description": "A number and/or alphanumeric code used to uniquely identify the entity", + "x-queryable": true + }, + "identifierType": { + "type": "string", + "description": "The types of sources of identifiers used to uniquely identify the entity", + "x-queryable": true + } + } + }, + "Name": { + "type": "array", + "properties": { + "firstName": { + "type": "string", + "description": "The first name of a person or individual", + "x-mutable": false + }, + "lastName": { + "type": "string", + "description": "The last name of a person or individual", + "x-mutable": false + } + } + }, + "Proficiency": { + "type": "array", + "properties": { + "name": { + "type": "string", + "description": "Name of the proficiency" + }, + "description": { + "type": "string", + "description": "Description of the proficiency" + } + } + }, + "Contact": { + "type": "array", + "properties": { + "Email": { + "type": "array", + "properties": { + "emailAddress": { + "type": "string", + "description": "The electronic mail address of an individual or person" + } + } + } + } + } + } + } + } + } +}