diff --git a/pyproject.toml b/pyproject.toml index c1ca2d06..404cd9bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,8 @@ exclude = ''' \.git | .tox )/ +| src/webob/multipart.py +| tests/test_multipart/ ''' # This next section only exists for people that have their editors @@ -17,7 +19,7 @@ exclude = ''' profile = "black" multi_line_output = 3 src_paths = ["src", "tests"] -skip_glob = ["docs/*"] +skip_glob = ["docs/*", "tests/test_multipart/*", "src/webob/multipart.py"] include_trailing_comma = true force_grid_wrap = false combine_as_imports = true diff --git a/setup.py b/setup.py index a83f673a..23086943 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ package_dir={"": "src"}, python_requires=">=3.9.0", install_requires=[ - "legacy-cgi>=2.6; python_version>='3.13'", + "multipart~=1.1", ], zip_safe=True, extras_require={"testing": testing_extras, "docs": docs_extras}, diff --git a/src/webob/compat.py b/src/webob/compat.py deleted file mode 100644 index 55fbef9e..00000000 --- a/src/webob/compat.py +++ /dev/null @@ -1,117 +0,0 @@ -# flake8: noqa - -import cgi -from cgi import FieldStorage as _cgi_FieldStorage, parse_header -from html import escape -from queue import Empty, Queue -import sys -import tempfile -import types - - -# Various different FieldStorage work-arounds required on Python 3.x -class cgi_FieldStorage(_cgi_FieldStorage): # pragma: no cover - def __repr__(self): - """monkey patch for FieldStorage.__repr__ - - Unbelievably, the default __repr__ on FieldStorage reads - the entire file content instead of being sane about it. - This is a simple replacement that doesn't do that - """ - - if self.file: - return f"FieldStorage({self.name!r}, {self.filename!r})" - - return f"FieldStorage({self.name!r}, {self.filename!r}, {self.value!r})" - - # Work around https://bugs.python.org/issue27777 - def make_file(self): - if self._binary_file or self.length >= 0: - return tempfile.TemporaryFile("wb+") - else: - return tempfile.TemporaryFile("w+", encoding=self.encoding, newline="\n") - - # Work around http://bugs.python.org/issue23801 - # This is taken exactly from Python 3.5's cgi.py module - def read_multi(self, environ, keep_blank_values, strict_parsing): - """Internal: read a part that is itself multipart.""" - ib = self.innerboundary - - if not cgi.valid_boundary(ib): - raise ValueError(f"Invalid boundary in multipart form: {ib!r}") - self.list = [] - - if self.qs_on_post: - query = cgi.urllib.parse.parse_qsl( - self.qs_on_post, - self.keep_blank_values, - self.strict_parsing, - encoding=self.encoding, - errors=self.errors, - ) - - for key, value in query: - self.list.append(cgi.MiniFieldStorage(key, value)) - - klass = self.FieldStorageClass or self.__class__ - first_line = self.fp.readline() # bytes - - if not isinstance(first_line, bytes): - raise ValueError( - f"{self.fp} should return bytes, got {type(first_line).__name__}" - ) - self.bytes_read += len(first_line) - - # Ensure that we consume the file until we've hit our innerboundary - - while first_line.strip() != (b"--" + self.innerboundary) and first_line: - first_line = self.fp.readline() - self.bytes_read += len(first_line) - - while True: - parser = cgi.FeedParser() - hdr_text = b"" - - while True: - data = self.fp.readline() - hdr_text += data - - if not data.strip(): - break - - if not hdr_text: - break - # parser takes strings, not bytes - self.bytes_read += len(hdr_text) - parser.feed(hdr_text.decode(self.encoding, self.errors)) - headers = parser.close() - # Some clients add Content-Length for part headers, ignore them - - if "content-length" in headers: - filename = None - - if "content-disposition" in self.headers: - cdisp, pdict = parse_header(self.headers["content-disposition"]) - - if "filename" in pdict: - filename = pdict["filename"] - - if filename is None: - del headers["content-length"] - part = klass( - self.fp, - headers, - ib, - environ, - keep_blank_values, - strict_parsing, - self.limit - self.bytes_read, - self.encoding, - self.errors, - ) - self.bytes_read += part.bytes_read - self.list.append(part) - - if part.done or self.bytes_read >= self.length > 0: - break - self.skip_lines() diff --git a/src/webob/multidict.py b/src/webob/multidict.py index e54ea3b0..539698f2 100644 --- a/src/webob/multidict.py +++ b/src/webob/multidict.py @@ -6,10 +6,12 @@ """ import binascii from collections.abc import MutableMapping -from urllib.parse import urlencode as url_encode +from urllib.parse import parse_qsl, urlencode as url_encode import warnings -__all__ = ["MultiDict", "NestedMultiDict", "NoVars", "GetDict"] +from multipart import parse_options_header + +__all__ = ["MultiDict", "MultiDictFile", "NestedMultiDict", "NoVars", "GetDict"] class MultiDict(MutableMapping): @@ -54,9 +56,18 @@ def view_list(cls, lst): return obj @classmethod - def from_fieldstorage(cls, fs): + def from_fieldstorage(cls, fs): # pragma: no cover """ Create a multidict from a cgi.FieldStorage instance + + .. deprecated:: 2.0 + + This method will not function in Python 3.13 or greater because the + `cgi` module has been removed. Consider using the `multipart`_ + library with :meth:`from_multipart` instead. + + .. _multipart: https://pypi.org/project/multipart/ + """ obj = cls() # fs.list can be None when there's nothing to parse @@ -96,6 +107,31 @@ def decode(b): return obj + @classmethod + def from_multipart(cls, mp): + """ + Create a multidict from a `MultipartParser`_ object. + + .. _MultipartParser: https://multipart.readthedocs.io/en/latest/api.html#multipart.MultipartParser + + """ + obj = cls() + + for part in mp: + if part.filename or not part.is_buffered(): + container = MultiDictFile.from_multipart_part(part) + obj.add(part.name, container) + else: + obj.add(part.name, part.value) + return obj + + @classmethod + def from_qs(cls, data, charset="utf-8"): + data = parse_qsl(data, keep_blank_values=True) + return cls( + (key.decode(charset), value.decode(charset)) for (key, value) in data + ) + def __getitem__(self, key): for k, v in reversed(self._items): if k == key: @@ -286,6 +322,60 @@ def values(self): _dummy = object() +class MultiDictFile: + """ + An object representing a file upload in a ``multipart/form-data`` request. + + This object has the same shape as Python's deprecated ``cgi.FieldStorage`` + object, which was previously used by webob to represent file uploads. + + """ + + def __init__( + self, + name, + filename, + file, + type, + type_options, + disposition, + disposition_options, + headers, + ): + self.name = name + self.filename = filename + self.file = file + self.type = type + self.type_options = type_options + self.disposition = disposition + self.disposition_options = disposition_options + self.headers = headers + + @classmethod + def from_multipart_part(cls, part): + content_type = part.headers.get("Content-Type", "") + content_type, options = parse_options_header(part.content_type) + disposition, disp_options = parse_options_header(part.disposition) + return cls( + name=part.name, + filename=part.filename, + file=part.file, + type=content_type, + type_options=options, + disposition=disposition, + disposition_options=disp_options, + headers=part.headers, + ) + + @property + def value(self): + pos = self.file.tell() + self.file.seek(0) + val = self.file.read() + self.file.seek(pos) + return val + + class GetDict(MultiDict): # def __init__(self, data, tracker, encoding, errors): # d = lambda b: b.decode(encoding, errors) diff --git a/src/webob/request.py b/src/webob/request.py index ee52a7d1..276838dd 100644 --- a/src/webob/request.py +++ b/src/webob/request.py @@ -9,6 +9,8 @@ from urllib.parse import quote as url_quote, quote_plus, urlencode as url_encode import warnings +from multipart import MultipartParser + from webob.acceptparse import ( accept_charset_property, accept_encoding_property, @@ -16,7 +18,6 @@ accept_property, ) from webob.cachecontrol import CacheControl, serialize_cache_control -from webob.compat import cgi_FieldStorage from webob.cookies import RequestCookies from webob.descriptors import ( CHARSET_RE, @@ -168,18 +169,7 @@ def decode(self, charset=None, errors="strict"): elif content_type != "multipart/form-data": return r - fs_environ = self.environ.copy() - fs_environ.setdefault("CONTENT_LENGTH", "0") - fs_environ["QUERY_STRING"] = "" - fs = cgi_FieldStorage( - fp=self.body_file, - environ=fs_environ, - keep_blank_values=True, - encoding=charset, - errors=errors, - ) - - fout = t.transcode_fs(fs, r._content_type_raw) + fout = t.transcode_multipart(self.body_file, r._content_type_raw) # this order is important, because setting body_file # resets content_length @@ -796,27 +786,22 @@ def POST(self): return NoVars( "Not an HTML form submission (Content-Type: %s)" % content_type ) - self._check_charset() - - self.make_body_seekable() - self.body_file_raw.seek(0) - fs_environ = env.copy() - # FieldStorage assumes a missing CONTENT_LENGTH, but a - # default of 0 is better: - fs_environ.setdefault("CONTENT_LENGTH", "0") - fs_environ["QUERY_STRING"] = "" - fs = cgi_FieldStorage( - fp=self.body_file, - environ=fs_environ, - keep_blank_values=True, - encoding="utf8", - ) - - self.body_file_raw.seek(0) - vars = MultiDict.from_fieldstorage(fs) + self._check_charset() + if content_type == "multipart/form-data": + self.make_body_seekable() + self.body_file_raw.seek(0) + boundary = _get_multipart_boundary(self._content_type_raw) + parser = MultipartParser( + self.body_file, + boundary, + charset="utf8", + ) + vars = MultiDict.from_multipart(parser) + self.body_file_raw.seek(0) + else: + vars = MultiDict.from_qs(self.body) env["webob._parsed_post_vars"] = (vars, self.body_file_raw) - return vars @property @@ -1752,23 +1737,14 @@ def transcode_query(self, q): return url_encode(q) - def transcode_fs(self, fs, content_type): - # transcode FieldStorage - def decode(b): - return b - - data = [] - - for field in fs.list or (): - field.name = decode(field.name) - - if field.filename: - field.filename = decode(field.filename) - data.append((field.name, field)) - else: - data.append((field.name, decode(field.value))) - - # TODO: transcode big requests to temp file - content_type, fout = _encode_multipart(data, content_type, fout=io.BytesIO()) - + def transcode_multipart(self, body, content_type): + # Transcode multipart + boundary = _get_multipart_boundary(content_type) + parser = MultipartParser(body, boundary, charset=self.charset) + data = MultiDict.from_multipart(parser) + content_type, fout = _encode_multipart( + data.items(), + content_type, + fout=io.BytesIO(), + ) return fout diff --git a/src/webob/util.py b/src/webob/util.py index d26358e3..d7fb3322 100644 --- a/src/webob/util.py +++ b/src/webob/util.py @@ -1,6 +1,6 @@ +from html import escape import warnings -from webob.compat import escape from webob.headers import _trans_key diff --git a/tests/test_compat.py b/tests/test_compat.py deleted file mode 100644 index 9c9f87ea..00000000 --- a/tests/test_compat.py +++ /dev/null @@ -1,186 +0,0 @@ -from io import BytesIO -import sys - -import pytest - - -class TestText: - def _callFUT(self, *arg, **kw): - from webob.util import text_ - - return text_(*arg, **kw) - - def test_binary(self): - result = self._callFUT(b"123") - assert isinstance(result, str) - assert result == str(b"123", "ascii") - - def test_binary_alternate_decoding(self): - result = self._callFUT(b"La Pe\xc3\xb1a", "utf-8") - assert isinstance(result, str) - assert result == str(b"La Pe\xc3\xb1a", "utf-8") - - def test_binary_decoding_error(self): - pytest.raises(UnicodeDecodeError, self._callFUT, b"\xff", "utf-8") - - def test_text(self): - result = self._callFUT(str(b"123", "ascii")) - assert isinstance(result, str) - assert result == str(b"123", "ascii") - - -class TestBytes: - def _callFUT(self, *arg, **kw): - from webob.util import bytes_ - - return bytes_(*arg, **kw) - - def test_binary(self): - result = self._callFUT(b"123") - assert isinstance(result, bytes) - assert result == b"123" - - def test_text(self): - val = str(b"123", "ascii") - result = self._callFUT(val) - assert isinstance(result, bytes) - assert result == b"123" - - def test_text_alternate_encoding(self): - val = str(b"La Pe\xc3\xb1a", "utf-8") - result = self._callFUT(val, "utf-8") - assert isinstance(result, bytes) - assert result == b"La Pe\xc3\xb1a" - - -class Test_cgi_FieldStorage_Py3_tests: - def test_fieldstorage_not_multipart(self): - from webob.compat import cgi_FieldStorage - - POSTDATA = b'{"name": "Bert"}' - - env = { - "REQUEST_METHOD": "POST", - "CONTENT_TYPE": "text/plain", - "CONTENT_LENGTH": str(len(POSTDATA)), - } - fp = BytesIO(POSTDATA) - fs = cgi_FieldStorage(fp, environ=env) - assert fs.list is None - assert fs.value == b'{"name": "Bert"}' - - @pytest.mark.skipif( - sys.version_info < (3, 0), - reason="FieldStorage on Python 2.7 is broken, see " - "https://github.com/Pylons/webob/issues/293", - ) - def test_fieldstorage_part_content_length(self): - from webob.compat import cgi_FieldStorage - - BOUNDARY = "JfISa01" - POSTDATA = """--JfISa01 -Content-Disposition: form-data; name="submit-name" -Content-Length: 5 - -Larry ---JfISa01""" - env = { - "REQUEST_METHOD": "POST", - "CONTENT_TYPE": f"multipart/form-data; boundary={BOUNDARY}", - "CONTENT_LENGTH": str(len(POSTDATA)), - } - fp = BytesIO(POSTDATA.encode("latin-1")) - fs = cgi_FieldStorage(fp, environ=env) - assert len(fs.list) == 1 - assert fs.list[0].name == "submit-name" - assert fs.list[0].value == "Larry" - - def test_my_fieldstorage_part_content_length(self): - from webob.compat import cgi_FieldStorage - - BOUNDARY = "4ddfd368-cb07-4b9e-b003-876010298a6c" - POSTDATA = """--4ddfd368-cb07-4b9e-b003-876010298a6c -Content-Disposition: form-data; name="object"; filename="file.txt" -Content-Type: text/plain -Content-Length: 5 -Content-Transfer-Encoding: 7bit - -ADMIN ---4ddfd368-cb07-4b9e-b003-876010298a6c -Content-Disposition: form-data; name="sign_date" -Content-Type: application/json; charset=UTF-8 -Content-Length: 22 -Content-Transfer-Encoding: 7bit - -"2016-11-23T12:22:41Z" ---4ddfd368-cb07-4b9e-b003-876010298a6c -Content-Disposition: form-data; name="staffId" -Content-Type: text/plain; charset=UTF-8 -Content-Length: 5 -Content-Transfer-Encoding: 7bit - -ADMIN ---4ddfd368-cb07-4b9e-b003-876010298a6c--""" - env = { - "REQUEST_METHOD": "POST", - "CONTENT_TYPE": f"multipart/form-data; boundary={BOUNDARY}", - "CONTENT_LENGTH": str(len(POSTDATA)), - } - fp = BytesIO(POSTDATA.encode("latin-1")) - fs = cgi_FieldStorage(fp, environ=env) - assert len(fs.list) == 3 - expect = [ - {"name": "object", "filename": "file.txt", "value": b"ADMIN"}, - {"name": "sign_date", "filename": None, "value": '"2016-11-23T12:22:41Z"'}, - {"name": "staffId", "filename": None, "value": "ADMIN"}, - ] - for x in range(len(fs.list)): - for k, exp in expect[x].items(): - got = getattr(fs.list[x], k) - assert got == exp - - def test_fieldstorage_multipart_leading_whitespace(self): - from webob.compat import cgi_FieldStorage - - BOUNDARY = "---------------------------721837373350705526688164684" - POSTDATA = """-----------------------------721837373350705526688164684 -Content-Disposition: form-data; name="id" - -1234 ------------------------------721837373350705526688164684 -Content-Disposition: form-data; name="title" - - ------------------------------721837373350705526688164684 -Content-Disposition: form-data; name="file"; filename="test.txt" -Content-Type: text/plain - -Testing 123. - ------------------------------721837373350705526688164684 -Content-Disposition: form-data; name="submit" - - Add\x20 ------------------------------721837373350705526688164684-- -""" - - env = { - "REQUEST_METHOD": "POST", - "CONTENT_TYPE": f"multipart/form-data; boundary={BOUNDARY}", - "CONTENT_LENGTH": "560", - } - # Add some leading whitespace to our post data that will cause the - # first line to not be the innerboundary. - fp = BytesIO(b"\r\n" + POSTDATA.encode("latin-1")) - fs = cgi_FieldStorage(fp, environ=env) - assert len(fs.list) == 4 - expect = [ - {"name": "id", "filename": None, "value": "1234"}, - {"name": "title", "filename": None, "value": ""}, - {"name": "file", "filename": "test.txt", "value": b"Testing 123.\n"}, - {"name": "submit", "filename": None, "value": " Add "}, - ] - for x in range(len(fs.list)): - for k, exp in expect[x].items(): - got = getattr(fs.list[x], k) - assert got == exp diff --git a/tests/test_in_wsgiref.py b/tests/test_in_wsgiref.py index f8727762..d5d6a6fb 100644 --- a/tests/test_in_wsgiref.py +++ b/tests/test_in_wsgiref.py @@ -1,12 +1,12 @@ -import cgi import logging +from queue import Empty, Queue import socket import sys from urllib.request import urlopen as url_open +import multipart import pytest -from webob.compat import Empty, Queue from webob.request import Request from webob.response import Response from webob.util import bytes_ @@ -88,7 +88,7 @@ def _test_app_req_interrupt(env, sr): def _req_int_cgi(req): assert req.body_file.read(0) == b"" - cgi.FieldStorage(fp=req.body_file, environ=req.environ) + multipart.MultipartParser(req.body_file, "foobar").parts() def _req_int_readline(req): diff --git a/tests/test_multidict.py b/tests/test_multidict.py index b5638204..4cd6a001 100644 --- a/tests/test_multidict.py +++ b/tests/test_multidict.py @@ -1,8 +1,14 @@ +import sys + import pytest from webob import multidict from webob.util import text_ +requires_cgi = pytest.mark.skipif( + sys.version_info >= (3, 13), reason="requires `cgi` module" +) + class BaseDictTests: def setup_method(self, method): @@ -139,6 +145,7 @@ def test_view_list(self): d = MultiDict() assert d.view_list([1, 2])._items == [1, 2] + @requires_cgi def test_from_fieldstorage_with_filename(self): from webob.multidict import MultiDict @@ -146,6 +153,7 @@ def test_from_fieldstorage_with_filename(self): fs = DummyFieldStorage("a", "1", "file") assert d.from_fieldstorage(fs) == MultiDict({"a": fs.list[0]}) + @requires_cgi def test_from_fieldstorage_without_filename(self): from webob.multidict import MultiDict @@ -153,6 +161,7 @@ def test_from_fieldstorage_without_filename(self): fs = DummyFieldStorage("a", "1") assert d.from_fieldstorage(fs) == MultiDict({"a": "1"}) + @requires_cgi def test_from_fieldstorage_with_charset(self): from cgi import FieldStorage @@ -182,6 +191,7 @@ def test_from_fieldstorage_with_charset(self): "utf8" ) + @requires_cgi def test_from_fieldstorage_with_base64_encoding(self): from cgi import FieldStorage @@ -212,6 +222,7 @@ def test_from_fieldstorage_with_base64_encoding(self): "utf8" ) + @requires_cgi def test_from_fieldstorage_with_quoted_printable_encoding(self): from cgi import FieldStorage @@ -264,6 +275,34 @@ def test_repr_with_password(self): d = self._get_instance(password="pwd") assert repr(d) == "MultiDict([('password', '******')])" + def test_from_multipart(self): + from io import BytesIO + + from multipart import MultipartParser + + data = ( + b"--foobar\r\n" + b'Content-Disposition: form-data; name="foo"\r\n' + b"\r\n" + b"bar\r\n" + b"--foobar\r\n" + b'Content-Disposition: form-data; name="fizz"; filename="fizz.txt"\r\n' + b"Content-type: application/octet-stream\r\n" + b"\r\n" + b"buzz\r\n" + b"\r\n" + b"--foobar--\r\n" + ) + body = BytesIO(data) + body.seek(0) + mp = MultipartParser(body, b"foobar") + inst = self.klass.from_multipart(mp) + assert inst["foo"] == "bar" + fizz = inst["fizz"] + assert isinstance(fizz, multidict.MultiDictFile) + assert fizz.filename == "fizz.txt" + assert fizz.value == b"buzz\r\n" + class TestNestedMultiDict(BaseDictTests): klass = multidict.NestedMultiDict diff --git a/tests/test_request.py b/tests/test_request.py index 86fbdfbd..bfecc3c0 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -73,6 +73,7 @@ def test_body_file_getter(self): } req = self._makeOne(environ) assert req.body_file is not INPUT + assert req.body_file.read() == body def test_body_file_getter_seekable(self): body = b"input" @@ -579,17 +580,17 @@ def test_POST_urlencoded(self, method): @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_multipart(self, method): data = ( - b"------------------------------deb95b63e42a\n" - b'Content-Disposition: form-data; name="foo"\n' - b"\n" - b"foo\n" - b"------------------------------deb95b63e42a\n" - b'Content-Disposition: form-data; name="bar"; filename="bar.txt"\n' - b"Content-type: application/octet-stream\n" - b"\n" - b'these are the contents of the file "bar.txt"\n' - b"\n" - b"------------------------------deb95b63e42a--\n" + b"------------------------------deb95b63e42a\r\n" + b'Content-Disposition: form-data; name="foo"\r\n' + b"\r\n" + b"foo\r\n" + b"------------------------------deb95b63e42a\r\n" + b'Content-Disposition: form-data; name="bar"; filename="bar.txt"\r\n' + b"Content-type: application/octet-stream\r\n" + b"\r\n" + b'these are the contents of the file "bar.txt"\r\n' + b"\r\n" + b"------------------------------deb95b63e42a--\r\n" ) wsgi_input = BytesIO(data) environ = { @@ -606,7 +607,7 @@ def test_POST_multipart(self, method): bar = result["bar"] assert bar.name == "bar" assert bar.filename == "bar.txt" - assert bar.file.read() == b'these are the contents of the file "bar.txt"\n' + assert bar.file.read() == b'these are the contents of the file "bar.txt"\r\n' @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_preserves_body_file(self, method): @@ -1060,9 +1061,7 @@ def test_blank__post_multipart(self): assert request.content_length == 139 def test_blank__post_files(self): - import cgi - - from webob.multidict import MultiDict + from webob.multidict import MultiDict, MultiDictFile from webob.request import _get_multipart_boundary POST = MultiDict() @@ -1090,8 +1089,9 @@ def test_blank__post_files(self): ) assert body_norm == expected assert request.content_length == 294 - assert isinstance(request.POST["first"], cgi.FieldStorage) - assert isinstance(request.POST["second"], cgi.FieldStorage) + # TODO: Backwards incompatible changes + assert isinstance(request.POST["first"], MultiDictFile) + assert isinstance(request.POST["second"], MultiDictFile) assert request.POST["first"].value == b"1" assert request.POST["second"].value == b"2" assert request.POST["third"] == "3" @@ -2120,21 +2120,6 @@ def test_already_consumed_stream(self): req2 = req2.decode("latin-1") assert body == req2.body - def test_none_field_name(self): - from webob.request import Request - - body = b"--FOO\r\nContent-Disposition: form-data\r\n\r\n123\r\n--FOO--" - content_type = "multipart/form-data; boundary=FOO" - environ = { - "wsgi.input": BytesIO(body), - "CONTENT_TYPE": content_type, - "CONTENT_LENGTH": len(body), - "REQUEST_METHOD": "POST", - } - req = Request(environ) - req = req.decode("latin-1") - assert body == req.body - def test_broken_seek(self): # copy() should work even when the input has a broken seek method req = self._blankOne( @@ -2440,7 +2425,7 @@ def test_from_bytes(self): # A valid request without a Content-Length header should still read # the full body. # Also test parity between as_string and from_bytes / from_file. - import cgi + from webob.multidict import MultiDictFile cls = self._getTargetClass() req = cls.from_bytes(_test_req) @@ -2455,7 +2440,7 @@ def test_from_bytes(self): assert bar_contents in req.body assert req.params["foo"] == "foo" bar = req.params["bar"] - assert isinstance(bar, cgi.FieldStorage) + assert isinstance(bar, MultiDictFile) assert bar.type == "application/octet-stream" bar.file.seek(0) assert bar.file.read() == bar_contents @@ -2473,7 +2458,7 @@ def test_from_bytes(self): cls.from_bytes(_test_req2 + b"xx") def test_from_text(self): - import cgi + from webob.multidict import MultiDictFile cls = self._getTargetClass() req = cls.from_text(text_(_test_req, "utf-8")) @@ -2488,7 +2473,7 @@ def test_from_text(self): assert bar_contents in req.body assert req.params["foo"] == "foo" bar = req.params["bar"] - assert isinstance(bar, cgi.FieldStorage) + assert isinstance(bar, MultiDictFile) assert bar.type == "application/octet-stream" bar.file.seek(0) assert bar.file.read() == bar_contents @@ -2574,16 +2559,6 @@ def test_body_file_noseek(self): lst = [req.body_file.read(1) for i in range(3)] assert lst == [b"a", b"b", b"c"] - def test_cgi_escaping_fix(self): - req = self._blankOne( - "/", - content_type="multipart/form-data; boundary=boundary", - POST=_cgi_escaping_body, - ) - assert list(req.POST.keys()) == ['%20%22"'] - req.body_file.read() - assert list(req.POST.keys()) == ['%20%22"'] - def test_content_type_none(self): r = self._blankOne("/", content_type="text/html") assert r.content_type == "text/html" @@ -2922,35 +2897,6 @@ def equal_req(self, req, inp): assert req_body == req2_body -class Test_cgi_FieldStorage__repr__patch: - def _callFUT(self, fake): - from webob.compat import cgi_FieldStorage - - return cgi_FieldStorage.__repr__(fake) - - def test_with_file(self): - class Fake: - name = "name" - file = "file" - filename = "filename" - value = "value" - - fake = Fake() - result = self._callFUT(fake) - assert result, "FieldStorage('name' == 'filename')" - - def test_without_file(self): - class Fake: - name = "name" - file = None - filename = "filename" - value = "value" - - fake = Fake() - result = self._callFUT(fake) - assert result, "FieldStorage('name', 'filename' == 'value')" - - class TestLimitedLengthFile: def _makeOne(self, file, maxlen): from webob.request import LimitedLengthFile @@ -3132,11 +3078,13 @@ def simpleapp(environ, start_response): ] -_cgi_escaping_body = """--boundary -Content-Disposition: form-data; name="%20%22"" - - ---boundary--""" +_cgi_escaping_body = ( + b"--boundary\r\n" + b'Content-Disposition: form-data; name="%20%22""\r\n' + b"\r\n" + b"\r\n" + b"--boundary--\r\n" +) def _norm_req(s): diff --git a/tests/test_response.py b/tests/test_response.py index f539b422..87c88e06 100644 --- a/tests/test_response.py +++ b/tests/test_response.py @@ -1052,7 +1052,7 @@ def dummy_wsgi_callable(environ, start_response): environ = {} def dummy_start_response(status, headers, exc_info=None): - assert headers, [("Set-Cookie" == "a=1; Path=/")] + assert headers, ["Set-Cookie" == "a=1; Path=/"] result = wsgiapp(environ, dummy_start_response) assert result == "abc"