diff --git a/fastmcp_slim/fastmcp/utilities/json_schema_type.py b/fastmcp_slim/fastmcp/utilities/json_schema_type.py index 140fc6041..2a0dbd386 100644 --- a/fastmcp_slim/fastmcp/utilities/json_schema_type.py +++ b/fastmcp_slim/fastmcp/utilities/json_schema_type.py @@ -403,6 +403,10 @@ def _object_schema_to_type( """ has_properties = bool(schema.get("properties")) additional_props = schema.get("additionalProperties") + # Per JSON Schema, an empty schema {} means "accept anything" — + # equivalent to additionalProperties: true. + if isinstance(additional_props, dict) and not additional_props: + additional_props = True class_name = name if name is not None else schema.get("title") if not has_properties and additional_props: @@ -453,7 +457,13 @@ def _schema_to_type( if not schema: return object - if "type" not in schema and "properties" in schema: + if ( + "type" not in schema + and "properties" in schema + and "allOf" not in schema + and "oneOf" not in schema + and "anyOf" not in schema + ): return _create_dataclass(schema, schema.get("title", ""), schemas) # Handle references first @@ -493,6 +503,109 @@ def _schema_to_type( else: return Union[tuple(types)] # type: ignore # noqa: UP007 + # Handle allOf (schema intersection / composition). + # Flatten all sub-schemas into a single merged object schema. + if "allOf" in schema: + merged: dict[str, Any] = {} + merged_properties: dict[str, Any] = {} + merged_required: list[str] = [] + has_false = False + + def _collect_allof(sub: Any) -> None: + """Recursively collect properties from a sub-schema.""" + nonlocal has_false + if sub is False: + has_false = True + return + if sub is True: + return + if isinstance(sub, bool): + return + if "$ref" in sub: + resolved = _resolve_ref(sub["$ref"], schemas) + if isinstance(resolved, bool): + if resolved is False: + has_false = True + return + sub = dict(resolved) + # Recurse into nested allOf + if "allOf" in sub: + for nested in sub["allOf"]: + _collect_allof(nested) + merged_properties.update(sub.get("properties", {})) + merged_required.extend(sub.get("required", [])) + for key in ("title", "description"): + if key in sub and key not in merged: + merged[key] = sub[key] + # Intersect additionalProperties: false is most restrictive and wins. + if "additionalProperties" in sub: + existing = merged.get("additionalProperties") + if existing is None: + merged["additionalProperties"] = sub["additionalProperties"] + elif sub["additionalProperties"] is False: + merged["additionalProperties"] = False + + # Collect from allOf children first, then overlay sibling + # properties so local definitions take precedence over inherited. + for sub in schema["allOf"]: + _collect_allof(sub) + + merged_properties.update(schema.get("properties", {})) + merged_required.extend(schema.get("required", [])) + + if has_false: + return _UnsatisfiableType # type: ignore[return-value] + + if merged_properties: + merged["type"] = "object" + merged["properties"] = merged_properties + if merged_required: + merged["required"] = list(dict.fromkeys(merged_required)) + # Preserve additionalProperties from the parent schema, not just + # from allOf children, so schemas like + # {"additionalProperties": true, "allOf": [...]} allow extra keys. + if ( + "additionalProperties" not in merged + and "additionalProperties" in schema + ): + merged["additionalProperties"] = schema["additionalProperties"] + return _schema_to_type(merged, schemas) + # allOf with no mergeable properties — fall through to Any + + # Handle oneOf (exactly-one-of union). + # Treat like anyOf for type construction — Pydantic's Union + # does "first match" which is a reasonable approximation. + if "oneOf" in schema: + types: list[type | Any] = [] + for subschema in schema["oneOf"]: + # Same dict special-case as the anyOf handler: detect + # map-like objects so they become dict[str, X] instead + # of empty dataclasses that discard key/value data. + if ( + isinstance(subschema, dict) + and subschema.get("type") == "object" + and not subschema.get("properties") + and "additionalProperties" in subschema + ): + additional_props = subschema["additionalProperties"] + if additional_props is True or additional_props == {}: + types.append(dict[str, Any]) + else: + value_type = _schema_to_type(additional_props, schemas) + types.append(dict[str, value_type]) # type: ignore + else: + types.append(_schema_to_type(subschema, schemas)) + has_null = type(None) in types + types = [t for t in types if t is not type(None)] + if len(types) == 0: + return type(None) + elif len(types) == 1: + return types[0] | None if has_null else types[0] # type: ignore + else: + if has_null: + return Union[(*types, type(None))] # type: ignore + return Union[tuple(types)] # type: ignore # noqa: UP007 + schema_type = schema.get("type") if not schema_type: return Any diff --git a/tests/utilities/json_schema_type/test_json_schema_type.py b/tests/utilities/json_schema_type/test_json_schema_type.py index c125ddc98..7fc2f6640 100644 --- a/tests/utilities/json_schema_type/test_json_schema_type.py +++ b/tests/utilities/json_schema_type/test_json_schema_type.py @@ -404,3 +404,390 @@ def test_fallback_only_triggers_for_regex_errors(self): warnings.simplefilter("error", UserWarning) T = json_schema_to_type(schema) # must not warn TypeAdapter(T).validate_python("hello") + + +class TestAllOfOneOf: + """allOf and oneOf composition, previously returning Any.""" + + def test_allof_merges_properties(self): + """allOf sub-schemas should be merged into a single object type.""" + schema = { + "allOf": [ + { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + { + "type": "object", + "properties": {"age": {"type": "integer"}}, + }, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + result = ta.validate_python({"name": "Alice", "age": 30}) + assert result.name == "Alice" # ty:ignore[unresolved-attribute] + assert result.age == 30 # ty:ignore[unresolved-attribute] + + def test_allof_preserves_required(self): + """Required fields from all allOf sub-schemas should be enforced.""" + schema = { + "allOf": [ + { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + { + "type": "object", + "properties": {"age": {"type": "integer"}}, + }, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + with pytest.raises(ValidationError): + ta.validate_python({"age": 30}) # missing required 'name' + + def test_allof_with_ref(self): + """allOf with $ref should resolve and merge the referenced schema.""" + schema = { + "allOf": [ + {"$ref": "#/$defs/Base"}, + { + "type": "object", + "properties": {"extra": {"type": "string"}}, + }, + ], + "$defs": { + "Base": { + "type": "object", + "properties": {"id": {"type": "integer"}}, + "required": ["id"], + } + }, + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + result = ta.validate_python({"id": 1, "extra": "hello"}) + assert result.id == 1 # ty:ignore[unresolved-attribute] + assert result.extra == "hello" # ty:ignore[unresolved-attribute] + + def test_oneof_creates_union(self): + """oneOf should create a Union type that accepts any sub-schema.""" + schema = { + "oneOf": [ + { + "type": "object", + "properties": { + "kind": {"const": "dog"}, + "breed": {"type": "string"}, + }, + "required": ["kind", "breed"], + }, + { + "type": "object", + "properties": { + "kind": {"const": "cat"}, + "indoor": {"type": "boolean"}, + }, + "required": ["kind", "indoor"], + }, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + dog = ta.validate_python({"kind": "dog", "breed": "lab"}) + assert dog.kind == "dog" # ty:ignore[unresolved-attribute] + + cat = ta.validate_python({"kind": "cat", "indoor": True}) + assert cat.indoor is True # ty:ignore[unresolved-attribute] + + def test_oneof_with_scalars(self): + """oneOf with scalar types should create a Union.""" + schema = { + "oneOf": [ + {"type": "string"}, + {"type": "integer"}, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + assert ta.validate_python("hello") == "hello" + assert ta.validate_python(42) == 42 + + def test_nested_allof(self): + """allOf inside allOf should be flattened recursively.""" + schema = { + "allOf": [ + { + "allOf": [ + {"type": "object", "properties": {"a": {"type": "string"}}}, + {"type": "object", "properties": {"b": {"type": "integer"}}}, + ] + }, + {"type": "object", "properties": {"c": {"type": "boolean"}}}, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + result = ta.validate_python({"a": "x", "b": 1, "c": True}) + assert result.a == "x" # ty:ignore[unresolved-attribute] + assert result.b == 1 # ty:ignore[unresolved-attribute] + assert result.c is True # ty:ignore[unresolved-attribute] + + def test_allof_ref_to_allof(self): + """allOf with $ref pointing to another allOf should resolve fully.""" + schema = { + "allOf": [ + {"$ref": "#/$defs/Combined"}, + {"type": "object", "properties": {"extra": {"type": "string"}}}, + ], + "$defs": { + "Combined": { + "allOf": [ + { + "type": "object", + "properties": {"x": {"type": "integer"}}, + "required": ["x"], + }, + { + "type": "object", + "properties": {"y": {"type": "integer"}}, + "required": ["y"], + }, + ] + } + }, + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + result = ta.validate_python({"x": 1, "y": 2, "extra": "hi"}) + assert result.x == 1 # ty:ignore[unresolved-attribute] + assert result.y == 2 # ty:ignore[unresolved-attribute] + assert result.extra == "hi" # ty:ignore[unresolved-attribute] + + def test_allof_with_sibling_properties(self): + """Sibling properties on the same schema as allOf should be included.""" + schema = { + "properties": {"local": {"type": "string"}}, + "required": ["local"], + "allOf": [ + { + "type": "object", + "properties": {"inherited": {"type": "integer"}}, + } + ], + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + result = ta.validate_python({"local": "hi", "inherited": 42}) + assert result.local == "hi" # ty:ignore[unresolved-attribute] + assert result.inherited == 42 # ty:ignore[unresolved-attribute] + + with pytest.raises(ValidationError): + ta.validate_python({"inherited": 42}) # missing required 'local' + + def test_oneof_with_dict_branch(self): + """oneOf with a map-like object branch should produce dict[str, X].""" + schema = { + "oneOf": [ + {"type": "string"}, + { + "type": "object", + "additionalProperties": {"type": "integer"}, + }, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + assert ta.validate_python("hello") == "hello" + assert ta.validate_python({"x": 1, "y": 2}) == {"x": 1, "y": 2} + + def test_allof_false_sub_schema_is_unsatisfiable(self): + """allOf containing `false` should produce an unsatisfiable type.""" + schema = { + "allOf": [ + {"type": "object", "properties": {"name": {"type": "string"}}}, + False, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + with pytest.raises(ValidationError): + ta.validate_python({"name": "Alice"}) + + def test_allof_only_false_is_unsatisfiable(self): + """allOf with only `false` should produce an unsatisfiable type.""" + schema = {"allOf": [False]} + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + with pytest.raises(ValidationError): + ta.validate_python({"any": "value"}) + + def test_allof_ref_to_boolean_schema(self): + """allOf with a $ref resolving to `false` should be unsatisfiable.""" + schema = { + "allOf": [ + {"$ref": "#/$defs/Never"}, + {"type": "object", "properties": {"name": {"type": "string"}}}, + ], + "$defs": {"Never": False}, + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + + with pytest.raises(ValidationError): + ta.validate_python({"name": "Alice"}) + + def test_additional_properties_empty_schema_is_allow_any(self): + """additionalProperties: {} is equivalent to additionalProperties: true per spec.""" + schema = { + "type": "object", + "additionalProperties": {}, + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + # Should accept any values (empty schema = true = allow everything) + assert ta.validate_python({"x": 1, "y": "two"}) == {"x": 1, "y": "two"} + + def test_additional_properties_empty_schema_with_properties(self): + """properties + additionalProperties: {} should produce Pydantic model (extra=allow).""" + from pydantic import BaseModel + + schema = { + "type": "object", + "properties": {"name": {"type": "string"}}, + "additionalProperties": {}, + } + T = json_schema_to_type(schema) + assert issubclass(T, BaseModel) + ta = TypeAdapter(T) + result = ta.validate_python({"name": "Alice", "extra_key": 42}) + assert result.name == "Alice" # ty: ignore[unresolved-attribute] + + def test_sibling_properties_override_allof(self): + """Sibling properties should take precedence over allOf children.""" + schema = { + "properties": {"name": {"type": "integer"}}, + "allOf": [ + {"type": "object", "properties": {"name": {"type": "string"}}}, + ], + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + # Sibling says name is int, allOf child says str — sibling wins + result = ta.validate_python({"name": 42}) + assert result.name == 42 # ty: ignore[unresolved-attribute] + + def test_allof_additional_properties_false_wins(self): + """When allOf children conflict on additionalProperties, false should win.""" + schema = { + "allOf": [ + { + "type": "object", + "properties": {"a": {"type": "string"}}, + "additionalProperties": True, + }, + { + "type": "object", + "properties": {"b": {"type": "integer"}}, + "additionalProperties": False, + }, + ], + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + result = ta.validate_python({"a": "hello", "b": 1}) + assert result.a == "hello" # ty: ignore[unresolved-attribute] + assert result.b == 1 # ty: ignore[unresolved-attribute] + + def test_oneof_with_null_branch(self): + """oneOf with null branch should produce Optional type.""" + schema = { + "oneOf": [ + {"type": "string"}, + {"type": "null"}, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + assert ta.validate_python("hello") == "hello" + assert ta.validate_python(None) is None + + def test_properties_with_anyof_not_shortcircuited(self): + """Schema with properties + anyOf should NOT shortcircuit to dataclass.""" + schema = { + "properties": {"base": {"type": "string"}}, + "anyOf": [ + {"properties": {"variant_a": {"type": "integer"}}}, + {"properties": {"variant_b": {"type": "boolean"}}}, + ], + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + result = ta.validate_python({"base": "val"}) + assert result is not None + + def test_allof_with_only_required_no_properties(self): + """allOf child with only required (no properties) merges correctly.""" + schema = { + "allOf": [ + { + "type": "object", + "properties": { + "name": {"type": "string"}, + "age": {"type": "integer"}, + }, + }, + {"required": ["name"]}, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + result = ta.validate_python({"name": "Alice"}) + assert result.name == "Alice" # ty: ignore[unresolved-attribute] + with pytest.raises(ValidationError): + ta.validate_python({"age": 30}) + + def test_oneof_with_empty_additional_properties_dict(self): + """oneOf branch with additionalProperties: {} should produce dict[str, Any].""" + schema = { + "oneOf": [ + {"type": "string"}, + {"type": "object", "additionalProperties": {}}, + ] + } + T = json_schema_to_type(schema) + ta = TypeAdapter(T) + assert ta.validate_python("hello") == "hello" + assert ta.validate_python({"x": 1}) == {"x": 1} + + def test_allof_with_sibling_additional_properties_true(self): + """properties + additionalProperties: true + allOf should preserve extra keys.""" + from pydantic import BaseModel + + schema = { + "additionalProperties": True, + "allOf": [ + { + "type": "object", + "properties": {"name": {"type": "string"}}, + }, + ], + } + T = json_schema_to_type(schema) + assert issubclass(T, BaseModel) + ta = TypeAdapter(T) + result = ta.validate_python({"name": "Alice", "extra": "kept"}) + assert result.name == "Alice" # ty: ignore[unresolved-attribute]