Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix chaotic: support relative paths in ref: #884

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions chaotic/chaotic/front/ref_resolver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import os
import collections
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Any, Dict, List, Optional, Set

from chaotic import error
from chaotic.front import types
Expand Down Expand Up @@ -38,6 +35,26 @@ def do_node(node: str):
return sorted_nodes


def normalize_ref(ref: str, base: str = None) -> str:
"""
Normalizes a link, converting relative paths to canonical form.
If the link contains an anchor ("#/"):
- If the part before '#' is empty and base is specified, then base is used.
- Otherwise, the part of the path is normalized via os.path.normpath.
Examples:
"../role.yaml#/definitions/role" -> "role.yaml#/definitions/role"
"#/definitions/type" (with base="vfull") -> "vfull#/definitions/type"
"""
if '#/' in ref:
file_path, fragment = ref.split('#', 1)
if not file_path and base:
file_path = base
else:
file_path = os.path.normpath(file_path)
return file_path + '#' + fragment
return os.path.normpath(ref)


class RefResolver:
def sort_schemas(
self,
Expand Down Expand Up @@ -68,19 +85,20 @@ def visitor(
if cur_node.indirect:
indirect = True

if cur_node.ref not in schemas.schemas:
ref = external_schemas.schemas.get(cur_node.ref)
norm_ref = normalize_ref(cur_node.ref, base=name.split('#')[0] if cur_node.ref.startswith('#') else None)
if norm_ref not in schemas.schemas:
ref = external_schemas.schemas.get(norm_ref)
if ref:
cur_node = ref
is_external = True
else:
known = '\n'.join([f'- {v}' for v in schemas.schemas.keys()])
known += '\n'.join([f'- {v}' for v in external_schemas.schemas.keys()])
raise Exception(
f'$ref to unknown type "{cur_node.ref}", known refs:\n{known}',
f'$ref to unknown type "{norm_ref}", known refs:\n{known}',
)
else:
cur_node = schemas.schemas[cur_node.ref]
cur_node = schemas.schemas[norm_ref]
if cur_node in seen:
# cycle is detected
# an exception will be raised later in sort_dfs()
Expand All @@ -91,7 +109,7 @@ def visitor(
local_schema.indirect = indirect

if isinstance(parent, types.Array):
if name == local_schema.ref:
if name == normalize_ref(local_schema.ref, base=name.split('#')[0] if local_schema.ref.startswith('#') else None):
if indirect:
raise error.BaseError(
full_filepath=local_schema.source_location().filepath,
Expand All @@ -111,13 +129,14 @@ def visitor(
)
if not indirect:
if not is_external:
# print(f'add {name} -> {local_schema.ref}')
edges[name].append(local_schema.ref)
if local_schema.ref.startswith('#'):
edges[name].append(normalize_ref(local_schema.ref, base=name.split('#')[0]))
else:
edges[name].append(normalize_ref(local_schema.ref))
else:
# skip indirect link
pass

# TODO: forbid non-schemas/ $refs
for name, schema_item in schemas.schemas.items():
visitor(schema_item, None)
schema_item.visit_children(visitor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
definitions:
ObjectWithExternalRefRelativePath:
type: object
properties:
empty:
$ref: '../object_empty.yaml#/definitions/ObjectEmpty'
additionalProperties: false
44 changes: 44 additions & 0 deletions chaotic/tests/front/test_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,47 @@ def test_cycle():
assert str(exc) == '$ref cycle: vfull#/definitions/type1, vfull#/definitions/type2'
else:
assert False

def test_relative_path():
config = ParserConfig(erase_prefix='')
schemas = []

parser = SchemaParser(
config=config,
full_filepath='folder/full',
full_vfilepath='folder/vfull',
)
parser.parse_schema('/definitions/vfull', {'type': 'integer'})
schemas.append(parser.parsed_schemas())

parser = SchemaParser(
config=config,
full_filepath='folder/obj/full2',
full_vfilepath='folder/obj/vfull2',
)
parser.parse_schema('/definitions/vfull2', {
'type': 'object',
'properties': {
'test': {'$ref': '../vfull#/definitions/vfull'}
},
'additionalProperties': False,
})
schemas.append(parser.parsed_schemas())

rr = ref_resolver.RefResolver()
parsed_schemas = rr.sort_schemas(types.ParsedSchemas.merge(schemas))

assert parsed_schemas.schemas == {
'folder/vfull#/definitions/vfull': Integer(),
'folder/obj/vfull2#/definitions/vfull2': SchemaObject(
properties={
'test': Ref(
ref='folder/obj/../vfull#/definitions/vfull',
schema=Integer(),
indirect=False,
self_ref=False,
)
},
additionalProperties=False,
),
}