diff --git a/cornice/schemas.py b/cornice/schemas.py deleted file mode 100644 index 9e606b56..00000000 --- a/cornice/schemas.py +++ /dev/null @@ -1,180 +0,0 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this file, -# You can obtain one at http://mozilla.org/MPL/2.0/. -import webob.multidict - -from pyramid.path import DottedNameResolver -from cornice.util import to_list, extract_request_data - - -class SchemaError(Exception): - pass - - -class CorniceSchema(object): - """Defines a cornice schema""" - - def __init__(self, _colander_schema, bind_request=True): - self._colander_schema = _colander_schema - self._colander_schema_runtime = None - self._bind_request = bind_request - - @property - def colander_schema(self): - if not self._colander_schema_runtime: - schema = self._colander_schema - schema = DottedNameResolver(__name__).maybe_resolve(schema) - if callable(schema): - schema = schema() - self._colander_schema_runtime = schema - return self._colander_schema_runtime - - def bind_attributes(self, request=None): - schema = self.colander_schema - if request and self._bind_request: - schema = schema.bind(request=request) - return schema.children - - def get_attributes(self, location=("body", "header", "querystring"), - required=(True, False), - request=None): - """Return a list of attributes that match the given criteria. - - By default, if nothing is specified, it will return all the attributes, - without filtering anything. - """ - attributes = self.bind_attributes(request) - - def _filter(attr): - if not hasattr(attr, "location"): - valid_location = 'body' in location - else: - valid_location = attr.location in to_list(location) - return valid_location and attr.required in to_list(required) - - return list(filter(_filter, attributes)) - - def as_dict(self): - """returns a dict containing keys for the different attributes, and - for each of them, a dict containing information about them:: - - >>> schema.as_dict() # NOQA - {'foo': {'type': 'string', - 'location': 'body', - 'description': 'yeah', - 'required': True}, - 'bar': {'type': 'string', - 'location': 'body', - 'description': 'yeah', - 'required': True} - # ... - } - """ - attributes = self.bind_attributes() - schema = {} - for attr in attributes: - schema[attr.name] = { - 'type': getattr(attr, 'type', attr.typ), - 'name': attr.name, - 'description': getattr(attr, 'description', ''), - 'required': getattr(attr, 'required', False), - } - - return schema - - def unflatten(self, data): - return self.colander_schema.unflatten(data) - - def flatten(self, data): - return self.colander_schema.flatten(data) - - @classmethod - def from_colander(klass, colander_schema, **kwargs): - return CorniceSchema(colander_schema, **kwargs) - - -def validate_colander_schema(schema, request): - """Validates that the request is conform to the given schema""" - from colander import Invalid, Sequence, drop, null, Mapping - - # CorniceSchema.colander_schema guarantees that we have a colander - # instance and not a class so we should use `typ` and not - # `schema_type()` to determine the type. - schema_type = schema.colander_schema.typ - unknown = getattr(schema_type, 'unknown', None) - - if not isinstance(schema_type, Mapping): - raise SchemaError('colander schema type is not a Mapping: %s' % - type(schema_type)) - - def _validate_fields(location, data): - if location == 'body': - try: - original = data - data = webob.multidict.MultiDict(schema.unflatten(data)) - data.update(original) - except KeyError: - pass - - if location == 'querystring': - try: - original = data - data = schema.unflatten(original) - except KeyError: - pass - - for attr in schema.get_attributes(location=location, - request=request): - if attr.required and attr.name not in data and \ - attr.default == null: - # missing - request.errors.add(location, attr.name, - "%s is missing" % attr.name) - else: - try: - if attr.name not in data: - if attr.default != null: - deserialized = attr.deserialize(attr.serialize()) - else: - deserialized = attr.deserialize() - else: - if (location == 'querystring' and - isinstance(attr.typ, Sequence)): - serialized = original.getall(attr.name) - else: - serialized = data[attr.name] - deserialized = attr.deserialize(serialized) - except Invalid as e: - # the struct is invalid - try: - request.errors.add(location, attr.name, - e.asdict()[attr.name]) - except KeyError: - for k, v in e.asdict().items(): - if k.startswith(attr.name): - request.errors.add(location, k, v) - else: - if deserialized is not drop: - request.validated[attr.name] = deserialized - - if location == "body" and unknown == 'preserve': - for field, value in data.items(): - if field not in request.validated: - request.validated[field] = value - - qs, headers, body, path = extract_request_data(request) - - _validate_fields('path', path) - _validate_fields('header', headers) - _validate_fields('body', body) - _validate_fields('querystring', qs) - - # validate unknown - if unknown == 'raise': - attrs = schema.get_attributes(location=('body', 'querystring'), - request=request) - params = list(qs.keys()) + list(body.keys()) - msg = '%s is not allowed' - for param in set(params) - set([attr.name for attr in attrs]): - request.errors.add('body' if param in body else 'querystring', - param, msg % param) diff --git a/cornice/schemas/__init__.py b/cornice/schemas/__init__.py new file mode 100644 index 00000000..8aacf02e --- /dev/null +++ b/cornice/schemas/__init__.py @@ -0,0 +1,91 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import + +import importlib + +from pyramid import path + +from cornice.schemas import generic +from cornice import util + + +def get_adapter(name): + try: + a = adapters[name] + except KeyError: + raise AdapterNotFoundError(name) + return a + + +def use(schema, request): + schema = _apply_compat_if_required(schema) + schema = _python_path_resolver.maybe_resolve(schema) + + for bind in _adapters: + try: + adapter = bind(schema) + except generic.UnsuitableSchemaCtrl: + continue + + break + else: + raise generic.InvalidSchemaError( + 'No schema adapter found for: {!r}'.format(schema)) + + payload, errors = adapter(request) + + for err in errors: + request.errors.add(err.location, err.field, err.desc) + request.validated.update(payload) + + +def _apply_compat_if_required(schema): + # only dotted name should be forced to use backward compatible adapter. + # Because it points to colander schema class or instance in previous + # releases. Without this hack, such objects will be handled by new colander + # adapter now. + if isinstance(schema, util.string_types): + schema = CorniceSchema.from_colander(schema) + return schema + + +class _PredefinedAdapter(generic.GenericAdapter): + def __init__(self, schema): + super(_PredefinedAdapter, self).__init__(schema) + if not isinstance(self.schema, generic.GenericAdapter): + raise generic.UnsuitableSchemaCtrl + + def __call__(self, request): + return self.schema(request) + + +_python_path_resolver = path.DottedNameResolver(__name__) + + +adapters = {} +_adapters = [ + _PredefinedAdapter] +for name in ('.compat', '.colander', '.generic'): + try: + # TODO: rewrite using stevedore + mod = importlib.import_module(name, __name__) + except ImportError: + continue + + payload = mod.init() + if isinstance(payload, generic.AdapterDescriptor): + _adapters.append(payload.adapter) + adapters[payload.name] = payload.adapter + else: + try: + _adapters.extend(payload) + except TypeError: + _adapters.append(payload) + + +AdapterNotFoundError = generic.AdapterNotFoundError +InvalidSchemaError = generic.InvalidSchemaError +CorniceSchema = generic.CorniceSchema diff --git a/cornice/schemas/colander.py b/cornice/schemas/colander.py new file mode 100644 index 00000000..2f430642 --- /dev/null +++ b/cornice/schemas/colander.py @@ -0,0 +1,136 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import + +import inspect + +import colander +from webob import multidict + +from cornice.schemas import generic +from cornice import util + + +class ColanderAdapter(generic.GenericAdapter): + def __init__(self, schema, bind_request=True, flattening=False): + if inspect.isclass(schema) and issubclass(schema, colander.Schema): + schema = schema() + super(ColanderAdapter, self).__init__(schema) + + if not isinstance(self.schema, colander.Schema): + raise generic.UnsuitableSchemaCtrl + if not self._is_mapping_field(self.schema): + raise generic.UnsuitableSchemaCtrl + + self.bind_request = bind_request + self.need_flattening = flattening + + def __call__(self, request): + schema = self.schema.clone() + if self.bind_request: + schema = schema.bind(request=request) + + data, fields_to_location = self._assemble_request_data(schema, request) + data = self._flattening_data(schema, data) + try: + validated = schema.deserialize(data) + errors = [] + except colander.Invalid as e: + validated = {} + errors = self._unpack_errors(e, fields_to_location) + return validated, errors + + @classmethod + def _assemble_request_data(cls, schema, request): + sources = { + 'path': request.matchdict, + 'header': request.headers, + 'body': util.extract_request_body(request), + 'querystring': request.GET} + + data = dict() + fields_to_location = dict() + invalid_locations = list() + + for field in schema: + source_name = cls._get_field_location(field) + try: + location = sources[source_name] + except KeyError: + invalid_locations.append(field) + continue + fields_to_location[field.name] = source_name + + is_multi = isinstance(location, multidict.MultiDict) + + try: + value = location[field.name] + if is_multi and cls._is_sequence_field(field): + value = location.getall(field.name) + except KeyError: + continue + + data[field.name] = value + + if invalid_locations: + raise generic.InvalidSchemaError( + 'Schema contain fields with unsupported "location" markers', + invalid_locations) + + # copy unknown fields + for location in ('body', 'querystring'): + for name, value in sources[location].iteritems(): + data.setdefault(name, value) + fields_to_location.setdefault(name, location) + + return data, fields_to_location + + def _flattening_data(self, schema, data): + if not self.need_flattening: + return data + return schema.unflatten(data) + + @classmethod + def _unpack_errors(cls, err, fields_to_locations): + # TODO(surabujin): move at least part of this into colander code + errors = [] + err_factory = generic.ErrorFactory(fields_to_locations) + for path in err.paths(): + prefix = [] + for e in path: + prefix.append(e._keyname()) + if isinstance(e, colander.ExtraItemsError): + for item in e.extras: + errors.append(err_factory( + prefix + [item], 'Unrecognized key')) + elif e.msg is None: + pass + else: + msg = e.msg + try: + substitute = getattr(msg, 'interpolate') + except AttributeError: + pass + else: + msg = substitute() + errors.append(err_factory(prefix, msg)) + return errors + + @staticmethod + def _is_mapping_field(field): + return isinstance(field.typ, colander.Mapping) + + @staticmethod + def _is_sequence_field(field): + return isinstance( + field.typ, (colander.Positional, colander.List, colander.Set)) + + @staticmethod + def _get_field_location(field): + return getattr(field, 'location', 'body') + + +def init(): + return generic.AdapterDescriptor('colander', ColanderAdapter) diff --git a/cornice/schemas/compat.py b/cornice/schemas/compat.py new file mode 100644 index 00000000..5b482aa2 --- /dev/null +++ b/cornice/schemas/compat.py @@ -0,0 +1,47 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import print_function, absolute_import + +import colander +from pyramid import path + +from cornice.schemas import generic +from cornice.schemas import colander as cornice_colander + + +class ColanderSchema(colander.MappingSchema): + pass + + +class BackwardCompatibilityAdapter(cornice_colander.ColanderAdapter): + def __init__(self, schema): + if isinstance(schema, generic.CorniceSchema): + bind_request = schema.bind_request + schema = _python_path_resolver.maybe_resolve(schema.schema) + elif isinstance(schema, ColanderSchema): + bind_request = True + else: + raise generic.UnsuitableSchemaCtrl + + super(BackwardCompatibilityAdapter, self).__init__( + schema, bind_request=bind_request, flattening=True) + + def _flattening_data(self, schema, data): + if not self.need_flattening: + return data + try: + flatted = schema.unflatten(data) + flatted.update(data) + except KeyError: + flatted = data + + return flatted + + +_python_path_resolver = path.DottedNameResolver(__name__) + + +def init(): + return BackwardCompatibilityAdapter diff --git a/cornice/schemas/generic.py b/cornice/schemas/generic.py new file mode 100644 index 00000000..2b041851 --- /dev/null +++ b/cornice/schemas/generic.py @@ -0,0 +1,82 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import + +import abc +import collections + +from cornice import util + + +AdapterDescriptor = collections.namedtuple( + 'AdapterDescriptor', 'name, adapter') +InvalidField = collections.namedtuple('InvalidField', 'location, field, desc') + + +class AdapterNotFoundError(Exception): + def __init__(self, name): + super(AdapterNotFoundError, self).__init__(name) + + @property + def name(self): + return self.args[0] + + +class InvalidSchemaError(Exception): + pass + + +class UnsuitableSchemaCtrl(Exception): + pass + + +class GenericAdapter(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, schema): + self.schema = schema + + @abc.abstractmethod + def __call__(self, request): + pass + + +class CallableAdapter(GenericAdapter): + def __init__(self, schema): + if not isinstance(schema, collections.Callable): + raise UnsuitableSchemaCtrl + super(CallableAdapter, self).__init__(schema) + + def __call__(self, request): + payload = util.extract_request_body(request) + return self.schema(payload), tuple() + + +# Backward compatibility +class CorniceSchema(object): + def __init__(self, schema, bind_request=True): + self.schema = schema + self.bind_request = bind_request + + @classmethod + def from_colander(cls, schema, **kwargs): + return cls(schema, **kwargs) + + +class ErrorFactory(object): + def __init__(self, fields_to_locations): + self.fields_to_locations = fields_to_locations + + def __call__(self, chunks, desc): + chunks = filter(bool, chunks) + field = '.'.join(chunks) + location = None + if chunks: + location = self.fields_to_locations.get(chunks[0]) + return InvalidField(location, field, desc) + + +def init(): + return CallableAdapter diff --git a/cornice/service.py b/cornice/service.py index 925e47c5..86d94e31 100644 --- a/cornice/service.py +++ b/cornice/service.py @@ -10,7 +10,7 @@ DEFAULT_VALIDATORS, DEFAULT_FILTERS, ) -from cornice.schemas import CorniceSchema, validate_colander_schema +from cornice import schemas from cornice.util import is_string, to_list, json_error, func_name try: @@ -245,11 +245,6 @@ def get_arguments(self, conf=None): value.extend(to_list(conf.pop(arg))) arguments[arg] = value - # schema validation handling - if 'schema' in conf: - arguments['schema'] = ( - CorniceSchema.from_colander(conf.pop('schema'))) - # Allow custom error handler arguments['error_handler'] = conf.pop('error_handler', getattr(self, 'error_handler', @@ -545,10 +540,11 @@ def wrapper(request): request.deserializer = args['deserializer'] # do schema validation - if 'schema' in args: - validate_colander_schema(args['schema'], request) - elif hasattr(ob, 'schema'): - validate_colander_schema(ob.schema, request) + schema = args.get('schema') + if schema is None: + schema = getattr(ob, 'schema', None) + if schema is not None: + schemas.use(schema, request) # the validators can either be a list of callables or contain some # non-callable values. In which case we want to resolve them using the diff --git a/cornice/tests/test_resource.py b/cornice/tests/test_resource.py index 7776612a..a96107f4 100644 --- a/cornice/tests/test_resource.py +++ b/cornice/tests/test_resource.py @@ -162,12 +162,15 @@ def test_schema_on_resource(self): User.schema = CorniceSchema.from_colander( validationapp.FooBarSchema) result = self.patch("/users/1", status=400).json - self.assertEquals( - [(e['name'], e['description']) for e in result['errors']], [ - ('foo', 'foo is missing'), - ('bar', 'bar is missing'), - ('yeah', 'yeah is missing'), - ]) + + actual = set( + (e['name'], e['description']) for e in result['errors']) + expect = set(( + ('foo', 'Required'), + ('bar', 'Required'), + ('yeah', 'Required'), + )) + self.assertEquals(actual, expect) class NonAutocommittingConfigurationTestResource(TestCase): diff --git a/cornice/tests/test_schemas.py b/cornice/tests/test_schemas.py index d57c3d10..f94e120c 100644 --- a/cornice/tests/test_schemas.py +++ b/cornice/tests/test_schemas.py @@ -3,9 +3,7 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. from cornice.errors import Errors from cornice.tests.support import TestCase -from cornice.schemas import ( - CorniceSchema, validate_colander_schema, SchemaError -) +from cornice import schemas from cornice.util import extract_json_data import json @@ -28,37 +26,8 @@ if COLANDER: - @deferred - def deferred_validator(node, kw): - """ - This is a deferred validator that changes its own behavior based on - request object being passed, thus allowing for validation of fields - depending on other field values. - - This example shows how to validate a body field based on a dummy - header value, using OneOf validator with different choices - """ - request = kw['request'] - if request['x-foo'] == 'version_a': - return OneOf(['a', 'b']) - else: - return OneOf(['c', 'd']) - - class TestingSchema(MappingSchema): - foo = SchemaNode(String(), type='str') - bar = SchemaNode(String(), type='str', location="body") - baz = SchemaNode(String(), type='str', location="querystring") - class WrongSchema(SequenceSchema): - items = TestingSchema() - - class InheritedSchema(TestingSchema): - foo = SchemaNode(Int(), missing=1) - - class ToBoundSchema(TestingSchema): - foo = SchemaNode(Int(), missing=1) - bazinga = SchemaNode(String(), type='str', location="body", - validator=deferred_validator) + pass class DropSchema(MappingSchema): foo = SchemaNode(String(), type='str', missing=drop) @@ -79,13 +48,12 @@ class NestedSchema(MappingSchema): class DefaultSchema(MappingSchema): foo = SchemaNode(String(), type='str', location="querystring", - missing=drop, default='foo') + missing='foo') bar = SchemaNode(String(), type='str', location="querystring", - default='bar') + missing='bar') - class DefaultValueSchema(MappingSchema): - foo = SchemaNode(Int(), type="int") - bar = SchemaNode(Int(), type="int", default=10) + class DefaultValueConvertSchema(MappingSchema): + foo = SchemaNode(Int(), type="int", missing=10) class QsSchema(MappingSchema): foo = SchemaNode(String(), type='str', location="querystring", @@ -95,19 +63,6 @@ class StrictQsSchema(StrictMappingSchema): foo = SchemaNode(String(), type='str', location="querystring", missing=drop) - imperative_schema = SchemaNode(Mapping()) - imperative_schema.add(SchemaNode(String(), name='foo', type='str')) - imperative_schema.add(SchemaNode(String(), name='bar', type='str', - location="body")) - imperative_schema.add(SchemaNode(String(), name='baz', type='str', - location="querystring")) - - class TestingSchemaWithHeader(MappingSchema): - foo = SchemaNode(String(), type='str') - bar = SchemaNode(String(), type='str', location="body") - baz = SchemaNode(String(), type='str', location="querystring") - qux = SchemaNode(String(), type='str', location="header") - class PreserveUnkownSchema(MappingSchema): bar = SchemaNode(String(), type='str') @@ -140,156 +95,58 @@ def __init__(self, body, get): class TestSchemas(TestCase): - def test_colander_integration(self): - # not specifying body should act the same way as specifying it - schema = CorniceSchema.from_colander(TestingSchema) - body_fields = schema.get_attributes(location="body") - qs_fields = schema.get_attributes(location="querystring") - - self.assertEqual(len(body_fields), 2) - self.assertEqual(len(qs_fields), 1) - - def test_colander_integration_with_header(self): - schema = CorniceSchema.from_colander(TestingSchemaWithHeader) - all_fields = schema.get_attributes() - body_fields = schema.get_attributes(location="body") - qs_fields = schema.get_attributes(location="querystring") - header_fields = schema.get_attributes(location="header") - - self.assertEqual(len(all_fields), 4) - self.assertEqual(len(body_fields), 2) - self.assertEqual(len(qs_fields), 1) - self.assertEqual(len(header_fields), 1) - - def test_colander_inheritance(self): - """ - support inheritance of colander.Schema - introduced in colander 0.9.9 - - attributes of base-classes with the same name than - subclass-attributes get overwritten. - """ - base_schema = CorniceSchema.from_colander(TestingSchema) - inherited_schema = CorniceSchema.from_colander(InheritedSchema) - - self.assertEqual(len(base_schema.get_attributes()), - len(inherited_schema.get_attributes())) - - def foo_filter(obj): - return obj.name == "foo" - - base_foo = list(filter(foo_filter, - base_schema.get_attributes()))[0] - inherited_foo = list(filter(foo_filter, - inherited_schema.get_attributes()))[0] - self.assertTrue(base_foo.required) - self.assertFalse(inherited_foo.required) - - def test_colander_bound_schemas(self): - dummy_request = {'x-foo': 'version_a'} - a_schema = CorniceSchema.from_colander(ToBoundSchema) - field = a_schema.get_attributes(request=dummy_request)[3] - self.assertEqual(field.validator.choices, ['a', 'b']) - - other_dummy_request = {'x-foo': 'bazinga!'} - b_schema = CorniceSchema.from_colander(ToBoundSchema) - field = b_schema.get_attributes(request=other_dummy_request)[3] - self.assertEqual(field.validator.choices, ['c', 'd']) - - def test_colander_bound_schema_rebinds_to_new_request(self): - dummy_request = {'x-foo': 'version_a'} - the_schema = CorniceSchema.from_colander(ToBoundSchema) - field = the_schema.get_attributes(request=dummy_request)[3] - self.assertEqual(field.validator.choices, ['a', 'b']) - - other_dummy_request = {'x-foo': 'bazinga!'} - field = the_schema.get_attributes(request=other_dummy_request)[3] - self.assertEqual(field.validator.choices, ['c', 'd']) - - def test_colander_request_is_bound_by_default(self): - the_schema = CorniceSchema.from_colander(ToBoundSchema) - dummy_request = {'x-foo': 'version_a'} - field = the_schema.get_attributes(request=dummy_request)[3] - # Deferred are resolved - self.assertNotEqual(type(field.validator), deferred) - - def test_colander_request_is_not_bound_if_disabled(self): - the_schema = CorniceSchema.from_colander(ToBoundSchema, - bind_request=False) - dummy_request = {'x-foo': 'version_a'} - field = the_schema.get_attributes(request=dummy_request)[3] - # Deferred are not resolved - self.assertEqual(type(field.validator), deferred) - - def test_imperative_colander_schema(self): - # not specifying body should act the same way as specifying it - schema = CorniceSchema.from_colander(imperative_schema) - body_fields = schema.get_attributes(location="body") - qs_fields = schema.get_attributes(location="querystring") - - self.assertEqual(len(body_fields), 2) - self.assertEqual(len(qs_fields), 1) - - dummy_request = get_mock_request('{"bar": "some data"}') - validate_colander_schema(schema, dummy_request) - def test_colander_schema_using_drop(self): """ remove fields from validated data if they deserialize to colander's `drop` object. """ - schema = CorniceSchema.from_colander(DropSchema) + schema = schemas.CorniceSchema.from_colander(DropSchema) dummy_request = get_mock_request('{"bar": "required_data"}') - validate_colander_schema(schema, dummy_request) + schemas.use(schema, dummy_request) self.assertNotIn('foo', dummy_request.validated) self.assertIn('bar', dummy_request.validated) self.assertEqual(len(dummy_request.errors), 0) def test_colander_strict_schema(self): - schema = CorniceSchema.from_colander(StrictSchema) + schema = schemas.CorniceSchema.from_colander(StrictSchema) dummy_request = get_mock_request( ''' {"bar": "required_data", "foo": "optional_data", "other": "not_wanted_data"} ''') - validate_colander_schema(schema, dummy_request) + schemas.use(schema, dummy_request) errors = dummy_request.errors self.assertEqual(len(errors), 1) - self.assertEqual(errors[0], {'description': 'other is not allowed', + self.assertEqual(errors[0], {'description': 'Unrecognized key', 'location': 'body', 'name': 'other'}) - self.assertIn('foo', dummy_request.validated) - self.assertIn('bar', dummy_request.validated) def test_colander_schema_using_dotted_names(self): """ Schema could be passed as string in view """ - schema = CorniceSchema.from_colander( - 'cornice.tests.schema.AccountSchema') + schema = 'cornice.tests.schema.AccountSchema' - dummy_request = get_mock_request('{"nickname": "john"}') - validate_colander_schema(schema, dummy_request) + dummy_request = get_mock_request( + '{"nickname": "john", "city": "Moscow"}') + schemas.use(schema, dummy_request) self.assertIn('nickname', dummy_request.validated) - self.assertNotIn('city', dummy_request.validated) + self.assertIn('city', dummy_request.validated) def test_colander_nested_schema(self): - schema = CorniceSchema.from_colander(NestedSchema) + schema = schemas.CorniceSchema.from_colander(NestedSchema) dummy_request = get_mock_request('{"ham": {"bar": "POST"}}', {'egg.bar': 'GET'}) - validate_colander_schema(schema, dummy_request) - - qs_fields = schema.get_attributes(location="querystring") + schemas.use(schema, dummy_request) errors = dummy_request.errors self.assertEqual(len(errors), 0, errors) - self.assertEqual(len(qs_fields), 1) expected = {'egg': {'bar': 'GET'}, 'ham': {'bar': 'POST'}, @@ -301,58 +158,48 @@ def test_colander_schema_using_defaults(self): """ Schema could contains default values """ - schema = CorniceSchema.from_colander(DefaultSchema) + schema = schemas.CorniceSchema.from_colander(DefaultSchema) dummy_request = get_mock_request('', {'bar': 'test'}) - validate_colander_schema(schema, dummy_request) - - qs_fields = schema.get_attributes(location="querystring") - - errors = dummy_request.errors - self.assertEqual(len(errors), 0) - self.assertEqual(len(qs_fields), 2) + schemas.use(schema, dummy_request) expected = {'foo': 'foo', 'bar': 'test'} - self.assertEqual(expected, dummy_request.validated) dummy_request = get_mock_request('', {'bar': 'test', 'foo': 'test'}) - validate_colander_schema(schema, dummy_request) - - qs_fields = schema.get_attributes(location="querystring") - - errors = dummy_request.errors - self.assertEqual(len(errors), 0) - self.assertEqual(len(qs_fields), 2) + schemas.use(schema, dummy_request) expected = {'foo': 'test', 'bar': 'test'} - self.assertEqual(expected, dummy_request.validated) - def test_colander_schema_default_value(self): + def test_colander_schema_defaults_convert(self): + """ + Test schema behaviour regarding conversion missing(default) values + """ # apply default value to field if the input for them is # missing - schema = CorniceSchema.from_colander(DefaultValueSchema) - dummy_request = get_mock_request('{"foo": 5}') - validate_colander_schema(schema, dummy_request) + schema = schemas.CorniceSchema.from_colander( + DefaultValueConvertSchema) - self.assertIn('bar', dummy_request.validated) - self.assertEqual(len(dummy_request.errors), 0) - self.assertEqual(dummy_request.validated['foo'], 5) - # default value should be available - self.assertEqual(dummy_request.validated['bar'], 10) + dummy_request = get_mock_request('') + schemas.use(schema, dummy_request) + self.assertEqual({'foo': 10}, dummy_request.validated) + + dummy_request = get_mock_request('{"foo": 5}') + schemas.use(schema, dummy_request) + self.assertEqual({'foo': 5}, dummy_request.validated) def test_only_mapping_is_accepted(self): - schema = CorniceSchema.from_colander(WrongSchema) + schema = schemas.CorniceSchema.from_colander(WrongSchema) dummy_request = get_mock_request('', {'foo': 'test', 'bar': 'test'}) - self.assertRaises(SchemaError, - validate_colander_schema, schema, dummy_request) + self.assertRaises(schemas.InvalidSchemaError, + schemas.use, schema, dummy_request) # We shouldn't accept a MappingSchema if the `typ` has # been set to something else: - schema = CorniceSchema.from_colander( + schema = schemas.CorniceSchema.from_colander( MappingSchema( Sequence, SchemaNode(String(), name='foo'), @@ -360,14 +207,14 @@ def test_only_mapping_is_accepted(self): SchemaNode(String(), name='baz') ) ) - self.assertRaises(SchemaError, - validate_colander_schema, schema, dummy_request) + self.assertRaises(schemas.InvalidSchemaError, + schemas.use, schema, dummy_request) def test_extra_params_qs(self): - schema = CorniceSchema.from_colander(QsSchema) + schema = schemas.CorniceSchema.from_colander(QsSchema) dummy_request = get_mock_request('', {'foo': 'test', 'bar': 'test'}) - validate_colander_schema(schema, dummy_request) + schemas.use(schema, dummy_request) errors = dummy_request.errors self.assertEqual(len(errors), 0) @@ -376,26 +223,23 @@ def test_extra_params_qs(self): self.assertEqual(expected, dummy_request.validated) def test_extra_params_qs_strict(self): - schema = CorniceSchema.from_colander(StrictQsSchema) + schema = schemas.CorniceSchema.from_colander(StrictQsSchema) dummy_request = get_mock_request('', {'foo': 'test', 'bar': 'test'}) - validate_colander_schema(schema, dummy_request) + schemas.use(schema, dummy_request) errors = dummy_request.errors self.assertEqual(len(errors), 1) - self.assertEqual(errors[0], {'description': 'bar is not allowed', + self.assertEqual(errors[0], {'description': 'Unrecognized key', 'location': 'querystring', 'name': 'bar'}) - expected = {'foo': 'test'} - self.assertEqual(expected, dummy_request.validated) - def test_validate_colander_schema_can_preserve_unknown_fields(self): - schema = CorniceSchema.from_colander(PreserveUnkownSchema) + schema = schemas.CorniceSchema.from_colander(PreserveUnkownSchema) data = json.dumps({"bar": "required_data", "optional": "true"}) dummy_request = get_mock_request(data) - validate_colander_schema(schema, dummy_request) + schemas.use(schema, dummy_request) self.assertDictEqual(dummy_request.validated, { "bar": "required_data", diff --git a/cornice/tests/test_service_description.py b/cornice/tests/test_service_description.py index d87851e6..d0a4a8f7 100644 --- a/cornice/tests/test_service_description.py +++ b/cornice/tests/test_service_description.py @@ -7,11 +7,12 @@ from pyramid import testing from webtest import TestApp -from cornice.schemas import CorniceSchema +from cornice import schemas from cornice.tests.validationapp import COLANDER from cornice.tests.support import TestCase, CatchErrors from cornice.service import Service + if COLANDER: from cornice.tests.validationapp import FooBarSchema from colander import (MappingSchema, SchemaNode, String, SequenceSchema, @@ -66,11 +67,6 @@ def setUp(self): def tearDown(self): testing.tearDown() - def test_get_from_colander(self): - schema = CorniceSchema.from_colander(FooBarSchema) - attrs = schema.as_dict() - self.assertEqual(len(attrs), 6) - def test_description_attached(self): # foobar should contain a schema argument containing the cornice # schema object, so it can be introspected if needed @@ -134,7 +130,7 @@ def test_foo_required(self): status=400) self.assertEqual(resp.json, { - u'errors': [{u'description': u'foo is missing', + u'errors': [{u'description': u'Required', u'location': u'body', u'name': u'foo'}], u'status': u'error'}) @@ -206,7 +202,7 @@ def test_qux_header(self): resp = self.app.delete('/foobar', status=400) self.assertEqual(resp.json, { u'errors': [ - {u'description': u'X-Qux is missing', + {u'description': u'Required', u'location': u'header', u'name': u'X-Qux'}], u'status': u'error'}) diff --git a/cornice/tests/validationapp.py b/cornice/tests/validationapp.py index 732e1e2a..d4ebe1b2 100644 --- a/cornice/tests/validationapp.py +++ b/cornice/tests/validationapp.py @@ -141,9 +141,9 @@ def post7(request): try: + from cornice.schemas.compat import ColanderSchema as MappingSchema from colander import ( Invalid, - MappingSchema, SequenceSchema, SchemaNode, String, diff --git a/cornice/util.py b/cornice/util.py index d10ca17c..07cfa8c6 100644 --- a/cornice/util.py +++ b/cornice/util.py @@ -12,7 +12,7 @@ __all__ = ['json_renderer', 'to_list', 'json_error', 'match_accept_header', - 'extract_request_data'] + 'extract_request_body'] PY3 = sys.version_info[0] == 3 @@ -148,7 +148,7 @@ def extract_form_urlencoded_data(request): return request.POST -def extract_request_data(request): +def extract_request_body(request): """extract the different parts of the data from the request, and return them as a tuple of (querystring, headers, body, path) """ @@ -163,8 +163,7 @@ def extract_request_data(request): body = deserializer(request) # otherwise, don't block but it will be an empty body, decode # on your own - - return request.GET, request.headers, body, request.matchdict + return body def content_type_matches(request, content_types):