Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 64 additions & 166 deletions cornice/schemas.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,78 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import webob.multidict
from cornice.util import extract_request_data

from pyramid.path import DottedNameResolver
from cornice.util import to_list, extract_request_data


class SchemaError(Exception):
try:
import colander
except ImportError:
pass


class CorniceSchema(object):
"""Defines a cornice schema"""

def __init__(self, _colander_schema, bind_request=True):
self._colander_schema = _colander_schema
self._colander_schema_runtime = None
self._bind_request = bind_request

@property
def colander_schema(self):
if not self._colander_schema_runtime:
schema = self._colander_schema
schema = DottedNameResolver(__name__).maybe_resolve(schema)
if callable(schema):
schema = schema()
self._colander_schema_runtime = schema
return self._colander_schema_runtime

def bind_attributes(self, request=None):
schema = self.colander_schema
if request and self._bind_request:
schema = schema.bind(request=request)
return schema.children

def get_attributes(self, location=("body", "header", "querystring"),
required=(True, False),
request=None):
"""Return a list of attributes that match the given criteria.

By default, if nothing is specified, it will return all the attributes,
without filtering anything.
else:

class StrictMappingSchema(colander.MappingSchema):
@staticmethod
def schema_type():
return colander.Mapping(unknown='raise')

class CorniceSchema(colander.MappingSchema):
querystring = StrictMappingSchema()
headers = StrictMappingSchema()
body = StrictMappingSchema()
path = StrictMappingSchema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For retro-compatibility, we could add a from_colander method (?)

    @classmethod
    def from_colander(klass, colander_schema, **kwargs):
        # XXX: ...written on a hungry stomache:
        class BodySchema(CorniceSchema):
            body = colander_schema
        return BodySchema(**kwargs)

Thoughts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR works totally different then the past system so maybe it'd be best to actually change the name so there's no longer a CorniceSchema. Or, CorniceSchema is some sort of compatibility class that's deprecated that helps people transition to the new system.

Any ideas for a different name? Maybe it should be "ColanderSchema" since we're talking about adding alternate schema libraries and this class is already within the cornice namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "CorniceColanderSchema" too wordy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue with the old system is everything ends up in request.validated so, for example, if you have a "body" element and a "querystring" with the same name they'll conflict. The new one ends up with request.validated['body'] and request.validated['querystring']. I wrote a from_colander() method, but the result in request.validated will be completely different then before.

I don't think there's a way to support a smooth transition from the old... So, I think it's best to fail early and hard so people are forced to face the change directly.


def simple_cstruct_serialize(val):
""" cstruct is colander's internal use format which is a
series of nested dicts, lists, and tuples with only str values
as the "leaf" values. If this method will try to recursively
go through a given value and convert all dict-like objects into
dicts, convert iterables that aren't dict-like into lists, and
convert everything else into a string.
"""
attributes = self.bind_attributes(request)

def _filter(attr):
if not hasattr(attr, "location"):
valid_location = 'body' in location
else:
valid_location = attr.location in to_list(location)
return valid_location and attr.required in to_list(required)

return list(filter(_filter, attributes))

def as_dict(self):
"""returns a dict containing keys for the different attributes, and
for each of them, a dict containing information about them::

>>> schema.as_dict() # NOQA
{'foo': {'type': 'string',
'location': 'body',
'description': 'yeah',
'required': True},
'bar': {'type': 'string',
'location': 'body',
'description': 'yeah',
'required': True}
# ...
}
"""
attributes = self.bind_attributes()
schema = {}
for attr in attributes:
schema[attr.name] = {
'type': getattr(attr, 'type', attr.typ),
'name': attr.name,
'description': getattr(attr, 'description', ''),
'required': getattr(attr, 'required', False),
}

return schema

def unflatten(self, data):
return self.colander_schema.unflatten(data)

def flatten(self, data):
return self.colander_schema.flatten(data)

@classmethod
def from_colander(klass, colander_schema, **kwargs):
return CorniceSchema(colander_schema, **kwargs)
try:
# try dict-like interpretation
result = {}
for k in val.keys():
result[k] = simple_cstruct_serialize(val[k])
return result
except (TypeError, AttributeError):
try:
# try iterable interpretation
result = []
for k in val:
result.append(simple_cstruct_serialize(k))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When looking at this: https://travis-ci.org/mozilla-services/cornice/jobs/76151945#L525 recursivity seems infinite!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. it seems I didn't properly test simple_cstruct_serialize(). It's because strings are iterable. I'll patch this.

return result
except TypeError:
return str(val)


def validate_colander_schema(schema, request):
"""Validates that the request is conform to the given schema"""
"""Validates that the request conforms to the given schema"""
from colander import Invalid, Sequence, drop, null, Mapping

# CorniceSchema.colander_schema guarantees that we have a colander
# instance and not a class so we should use `typ` and not
# `schema_type()` to determine the type.
schema_type = schema.colander_schema.typ
unknown = getattr(schema_type, 'unknown', None)

if not isinstance(schema_type, Mapping):
raise SchemaError('colander schema type is not a Mapping: %s' %
type(schema_type))

def _validate_fields(location, data):
if location == 'body':
try:
original = data
data = webob.multidict.MultiDict(schema.unflatten(data))
data.update(original)
except KeyError:
pass

if location == 'querystring':
try:
original = data
data = schema.unflatten(original)
except KeyError:
pass

for attr in schema.get_attributes(location=location,
request=request):
if attr.required and attr.name not in data and \
attr.default == null:
# missing
request.errors.add(location, attr.name,
"%s is missing" % attr.name)
else:
try:
if attr.name not in data:
if attr.default != null:
deserialized = attr.deserialize(attr.serialize())
else:
deserialized = attr.deserialize()
else:
if (location == 'querystring' and
isinstance(attr.typ, Sequence)):
serialized = original.getall(attr.name)
else:
serialized = data[attr.name]
deserialized = attr.deserialize(serialized)
except Invalid as e:
# the struct is invalid
try:
request.errors.add(location, attr.name,
e.asdict()[attr.name])
except KeyError:
for k, v in e.asdict().items():
if k.startswith(attr.name):
request.errors.add(location, k, v)
else:
if deserialized is not drop:
request.validated[attr.name] = deserialized

if location == "body" and unknown == 'preserve':
for field, value in data.items():
if field not in request.validated:
request.validated[field] = value
if not isinstance(schema, CorniceSchema):
raise TypeError(
'schema type is not a CorniceSchema'
)

# compile the querystring, headers, body, path into an appstruct
# for consumption by colander for validation
qs, headers, body, path = extract_request_data(request)

_validate_fields('path', path)
_validate_fields('header', headers)
_validate_fields('body', body)
_validate_fields('querystring', qs)

# validate unknown
if unknown == 'raise':
attrs = schema.get_attributes(location=('body', 'querystring'),
request=request)
params = list(qs.keys()) + list(body.keys())
msg = '%s is not allowed'
for param in set(params) - set([attr.name for attr in attrs]):
request.errors.add('body' if param in body else 'querystring',
param, msg % param)
initial_appstruct = {
'querystring': qs,
'headers': headers,
'body': body,
'path': path,
}

try:
cstruct = simple_cstruct_serialize(initial_appstruct)
appstruct = schema.deserialize(cstruct)
except colander.Invalid as e:
for component_path, msg in e.asdict().items():
(location, _, name) = component_path.partition('.')
request.errors.add(
location, name, msg
)
else:
request.validated.update(appstruct)
Loading