-
Notifications
You must be signed in to change notification settings - Fork 2k
[DNM] Handle allOf and oneOf in json_schema_to_type #3840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
e4d9c24
a3927d5
08236c4
16b4845
087d4b3
025ed35
f954396
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -403,6 +403,10 @@ def _object_schema_to_type( | |
| """ | ||
| has_properties = bool(schema.get("properties")) | ||
| additional_props = schema.get("additionalProperties") | ||
| # Per JSON Schema, an empty schema {} means "accept anything" — | ||
| # equivalent to additionalProperties: true. | ||
| if isinstance(additional_props, dict) and not additional_props: | ||
| additional_props = True | ||
| class_name = name if name is not None else schema.get("title") | ||
|
|
||
| if not has_properties and additional_props: | ||
|
|
@@ -453,7 +457,13 @@ def _schema_to_type( | |
| if not schema: | ||
| return object | ||
|
|
||
| if "type" not in schema and "properties" in schema: | ||
| if ( | ||
| "type" not in schema | ||
| and "properties" in schema | ||
| and "allOf" not in schema | ||
| and "oneOf" not in schema | ||
| and "anyOf" not in schema | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an implicit object schema has sibling Useful? React with 👍 / 👎. |
||
| ): | ||
| return _create_dataclass(schema, schema.get("title", "<unknown>"), schemas) | ||
|
|
||
| # Handle references first | ||
|
|
@@ -493,6 +503,109 @@ def _schema_to_type( | |
| else: | ||
| return Union[tuple(types)] # type: ignore # noqa: UP007 | ||
|
|
||
| # Handle allOf (schema intersection / composition). | ||
| # Flatten all sub-schemas into a single merged object schema. | ||
| if "allOf" in schema: | ||
| merged: dict[str, Any] = {} | ||
| merged_properties: dict[str, Any] = {} | ||
| merged_required: list[str] = [] | ||
|
Comment on lines
+509
to
+511
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Useful? React with 👍 / 👎. |
||
| has_false = False | ||
|
|
||
| def _collect_allof(sub: Any) -> None: | ||
| """Recursively collect properties from a sub-schema.""" | ||
| nonlocal has_false | ||
| if sub is False: | ||
| has_false = True | ||
| return | ||
| if sub is True: | ||
| return | ||
| if isinstance(sub, bool): | ||
| return | ||
|
Comment on lines
+522
to
+523
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Returning early for boolean sub-schemas silently ignores Useful? React with 👍 / 👎. |
||
| if "$ref" in sub: | ||
| resolved = _resolve_ref(sub["$ref"], schemas) | ||
| if isinstance(resolved, bool): | ||
| if resolved is False: | ||
| has_false = True | ||
| return | ||
| sub = dict(resolved) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an Useful? React with 👍 / 👎. |
||
| # Recurse into nested allOf | ||
| if "allOf" in sub: | ||
| for nested in sub["allOf"]: | ||
| _collect_allof(nested) | ||
| merged_properties.update(sub.get("properties", {})) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the same property appears in multiple Useful? React with 👍 / 👎. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an Useful? React with 👍 / 👎. |
||
| merged_required.extend(sub.get("required", [])) | ||
| for key in ("title", "description"): | ||
| if key in sub and key not in merged: | ||
| merged[key] = sub[key] | ||
| # Intersect additionalProperties: false is most restrictive and wins. | ||
| if "additionalProperties" in sub: | ||
| existing = merged.get("additionalProperties") | ||
| if existing is None: | ||
| merged["additionalProperties"] = sub["additionalProperties"] | ||
| elif sub["additionalProperties"] is False: | ||
| merged["additionalProperties"] = False | ||
|
|
||
| # Collect from allOf children first, then overlay sibling | ||
| # properties so local definitions take precedence over inherited. | ||
| for sub in schema["allOf"]: | ||
| _collect_allof(sub) | ||
|
|
||
| merged_properties.update(schema.get("properties", {})) | ||
| merged_required.extend(schema.get("required", [])) | ||
|
Comment on lines
+553
to
+554
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The allOf merge only copies sibling Useful? React with 👍 / 👎. |
||
|
|
||
| if has_false: | ||
| return _UnsatisfiableType # type: ignore[return-value] | ||
|
|
||
| if merged_properties: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an Useful? React with 👍 / 👎. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This Useful? React with 👍 / 👎. |
||
| merged["type"] = "object" | ||
| merged["properties"] = merged_properties | ||
| if merged_required: | ||
| merged["required"] = list(dict.fromkeys(merged_required)) | ||
| # Preserve additionalProperties from the parent schema, not just | ||
| # from allOf children, so schemas like | ||
| # {"additionalProperties": true, "allOf": [...]} allow extra keys. | ||
| if ( | ||
| "additionalProperties" not in merged | ||
| and "additionalProperties" in schema | ||
| ): | ||
| merged["additionalProperties"] = schema["additionalProperties"] | ||
|
Comment on lines
+567
to
+571
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an Useful? React with 👍 / 👎. |
||
| return _schema_to_type(merged, schemas) | ||
| # allOf with no mergeable properties — fall through to Any | ||
|
|
||
| # Handle oneOf (exactly-one-of union). | ||
| # Treat like anyOf for type construction — Pydantic's Union | ||
| # does "first match" which is a reasonable approximation. | ||
| if "oneOf" in schema: | ||
| types: list[type | Any] = [] | ||
| for subschema in schema["oneOf"]: | ||
| # Same dict special-case as the anyOf handler: detect | ||
| # map-like objects so they become dict[str, X] instead | ||
| # of empty dataclasses that discard key/value data. | ||
| if ( | ||
| isinstance(subschema, dict) | ||
| and subschema.get("type") == "object" | ||
| and not subschema.get("properties") | ||
| and "additionalProperties" in subschema | ||
| ): | ||
| additional_props = subschema["additionalProperties"] | ||
| if additional_props is True or additional_props == {}: | ||
| types.append(dict[str, Any]) | ||
| else: | ||
| value_type = _schema_to_type(additional_props, schemas) | ||
| types.append(dict[str, value_type]) # type: ignore | ||
| else: | ||
| types.append(_schema_to_type(subschema, schemas)) | ||
|
Comment on lines
+596
to
+597
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a schema has sibling Useful? React with 👍 / 👎. |
||
| has_null = type(None) in types | ||
| types = [t for t in types if t is not type(None)] | ||
| if len(types) == 0: | ||
| return type(None) | ||
| elif len(types) == 1: | ||
| return types[0] | None if has_null else types[0] # type: ignore | ||
| else: | ||
| if has_null: | ||
| return Union[(*types, type(None))] # type: ignore | ||
| return Union[tuple(types)] # type: ignore # noqa: UP007 | ||
|
Comment on lines
+605
to
+607
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This returns a plain Useful? React with 👍 / 👎. |
||
|
|
||
| schema_type = schema.get("type") | ||
| if not schema_type: | ||
| return Any | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For schemas that include the standard OpenAPI form
{"type": "object", "allOf": [...]}or{"type": "object", "oneOf": [...]}, the top-leveljson_schema_to_type()returns through_object_schema_to_type()before this_schema_to_type()guard and the new composition handlers are never reached. That still accepts invalid payloads asdict[str, Any]or a plain object model instead of enforcing the composed branches, so typed object compositions need to be routed through the allOf/oneOf logic before the object shortcut.Useful? React with 👍 / 👎.