diff --git a/src/requests/sessions.py b/src/requests/sessions.py index 731550de88..de2583c4c6 100644 --- a/src/requests/sessions.py +++ b/src/requests/sessions.py @@ -471,10 +471,20 @@ def prepare_request(self, request): cookies = cookiejar_from_dict(cookies) # Merge with session cookies - merged_cookies = merge_cookies( - merge_cookies(RequestsCookieJar(), self.cookies), cookies - ) - + # Preserve the cookie policy from the session's cookie jar + # Fixes issue where custom policies were lost during request preparation + if isinstance(self.cookies, RequestsCookieJar): + # Create a new jar with the same policy as the session's jar + jar = RequestsCookieJar() + jar.set_policy(self.cookies.get_policy()) + merged_cookies = merge_cookies( + merge_cookies(jar, self.cookies), cookies + ) + else: + # Fall back to original behavior for non-RequestsCookieJar instances + merged_cookies = merge_cookies( + merge_cookies(RequestsCookieJar(), self.cookies), cookies + ) # Set environment's basic authentication if not explicitly set. auth = request.auth if self.trust_env and not auth and not self.auth: @@ -496,7 +506,6 @@ def prepare_request(self, request): hooks=merge_hooks(request.hooks, self.hooks), ) return p - def request( self, method, diff --git a/tests/test_requests.py b/tests/test_requests.py index 75d2deff2e..dab3062885 100644 --- a/tests/test_requests.py +++ b/tests/test_requests.py @@ -3038,3 +3038,4 @@ def test_json_decode_errors_are_serializable_deserializable(): ) deserialized_error = pickle.loads(pickle.dumps(json_decode_error)) assert repr(json_decode_error) == repr(deserialized_error) + diff --git a/tests/test_requests_backup.py b/tests/test_requests_backup.py new file mode 100644 index 0000000000..0cc2c81b23 --- /dev/null +++ b/tests/test_requests_backup.py @@ -0,0 +1,3064 @@ +"""Tests for Requests.""" + +import collections +import contextlib +import io +import json +import os +import pickle +import re +import tempfile +import threading +import warnings +from unittest import mock + +import pytest +import urllib3 +from urllib3.util import Timeout as Urllib3Timeout + +import requests +from requests.adapters import HTTPAdapter +from requests.auth import HTTPDigestAuth, _basic_auth_str +from requests.compat import ( + JSONDecodeError, + Morsel, + MutableMapping, + builtin_str, + cookielib, + getproxies, + is_urllib3_1, + urlparse, +) +from requests.cookies import cookiejar_from_dict, morsel_to_cookie +from requests.exceptions import ( + ChunkedEncodingError, + ConnectionError, + ConnectTimeout, + ContentDecodingError, + InvalidHeader, + InvalidProxyURL, + InvalidSchema, + InvalidURL, + MissingSchema, + ProxyError, + ReadTimeout, + RequestException, + RetryError, +) +from requests.exceptions import SSLError as RequestsSSLError +from requests.exceptions import Timeout, TooManyRedirects, UnrewindableBodyError +from requests.hooks import default_hooks +from requests.models import PreparedRequest, urlencode +from requests.sessions import SessionRedirectMixin +from requests.structures import CaseInsensitiveDict + +from . import SNIMissingWarning +from .compat import StringIO +from .testserver.server import TLSServer, consume_socket_content +from .utils import override_environ + +# Requests to this URL should always fail with a connection timeout (nothing +# listening on that port) +TARPIT = "http://10.255.255.1" + +# This is to avoid waiting the timeout of using TARPIT +INVALID_PROXY = "http://localhost:1" + +try: + from ssl import SSLContext + + del SSLContext + HAS_MODERN_SSL = True +except ImportError: + HAS_MODERN_SSL = False + +try: + requests.pyopenssl + HAS_PYOPENSSL = True +except AttributeError: + HAS_PYOPENSSL = False + + +class TestRequests: + digest_auth_algo = ("MD5", "SHA-256", "SHA-512") + + def test_entry_points(self): + requests.session + requests.session().get + requests.session().head + requests.get + requests.head + requests.put + requests.patch + requests.post + # Not really an entry point, but people rely on it. + from requests.packages.urllib3.poolmanager import PoolManager # noqa:F401 + + @pytest.mark.parametrize( + "exception, url", + ( + (MissingSchema, "hiwpefhipowhefopw"), + (InvalidSchema, "localhost:3128"), + (InvalidSchema, "localhost.localdomain:3128/"), + (InvalidSchema, "10.122.1.1:3128/"), + (InvalidURL, "http://"), + (InvalidURL, "http://*example.com"), + (InvalidURL, "http://.example.com"), + ), + ) + def test_invalid_url(self, exception, url): + with pytest.raises(exception): + requests.get(url) + + def test_basic_building(self): + req = requests.Request() + req.url = "http://kennethreitz.org/" + req.data = {"life": "42"} + + pr = req.prepare() + assert pr.url == req.url + assert pr.body == "life=42" + + @pytest.mark.parametrize("method", ("GET", "HEAD")) + def test_no_content_length(self, httpbin, method): + req = requests.Request(method, httpbin(method.lower())).prepare() + assert "Content-Length" not in req.headers + + @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS")) + def test_no_body_content_length(self, httpbin, method): + req = requests.Request(method, httpbin(method.lower())).prepare() + assert req.headers["Content-Length"] == "0" + + @pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS")) + def test_empty_content_length(self, httpbin, method): + req = requests.Request(method, httpbin(method.lower()), data="").prepare() + assert req.headers["Content-Length"] == "0" + + def test_override_content_length(self, httpbin): + headers = {"Content-Length": "not zero"} + r = requests.Request("POST", httpbin("post"), headers=headers).prepare() + assert "Content-Length" in r.headers + assert r.headers["Content-Length"] == "not zero" + + def test_path_is_not_double_encoded(self): + request = requests.Request("GET", "http://0.0.0.0/get/test case").prepare() + + assert request.path_url == "/get/test%20case" + + @pytest.mark.parametrize( + "url, expected", + ( + ( + "http://example.com/path#fragment", + "http://example.com/path?a=b#fragment", + ), + ( + "http://example.com/path?key=value#fragment", + "http://example.com/path?key=value&a=b#fragment", + ), + ), + ) + def test_params_are_added_before_fragment(self, url, expected): + request = requests.Request("GET", url, params={"a": "b"}).prepare() + assert request.url == expected + + def test_params_original_order_is_preserved_by_default(self): + param_ordered_dict = collections.OrderedDict( + (("z", 1), ("a", 1), ("k", 1), ("d", 1)) + ) + session = requests.Session() + request = requests.Request( + "GET", "http://example.com/", params=param_ordered_dict + ) + prep = session.prepare_request(request) + assert prep.url == "http://example.com/?z=1&a=1&k=1&d=1" + + def test_params_bytes_are_encoded(self): + request = requests.Request( + "GET", "http://example.com", params=b"test=foo" + ).prepare() + assert request.url == "http://example.com/?test=foo" + + def test_binary_put(self): + request = requests.Request( + "PUT", "http://example.com", data="ööö".encode() + ).prepare() + assert isinstance(request.body, bytes) + + def test_whitespaces_are_removed_from_url(self): + # Test for issue #3696 + request = requests.Request("GET", " http://example.com").prepare() + assert request.url == "http://example.com/" + + @pytest.mark.parametrize("scheme", ("http://", "HTTP://", "hTTp://", "HttP://")) + def test_mixed_case_scheme_acceptable(self, httpbin, scheme): + s = requests.Session() + s.proxies = getproxies() + parts = urlparse(httpbin("get")) + url = scheme + parts.netloc + parts.path + r = requests.Request("GET", url) + r = s.send(r.prepare()) + assert r.status_code == 200, f"failed for scheme {scheme}" + + def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): + r = requests.Request("GET", httpbin("get")) + s = requests.Session() + s.proxies = getproxies() + + r = s.send(r.prepare()) + + assert r.status_code == 200 + + def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): + r = requests.get(httpbin("redirect", "1")) + assert r.status_code == 200 + assert r.history[0].status_code == 302 + assert r.history[0].is_redirect + + def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin): + r = requests.post( + httpbin("redirect-to"), + data="test", + params={"url": "post", "status_code": 307}, + ) + assert r.status_code == 200 + assert r.history[0].status_code == 307 + assert r.history[0].is_redirect + assert r.json()["data"] == "test" + + def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin): + byte_str = b"test" + r = requests.post( + httpbin("redirect-to"), + data=io.BytesIO(byte_str), + params={"url": "post", "status_code": 307}, + ) + assert r.status_code == 200 + assert r.history[0].status_code == 307 + assert r.history[0].is_redirect + assert r.json()["data"] == byte_str.decode("utf-8") + + def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin): + try: + requests.get(httpbin("relative-redirect", "50")) + except TooManyRedirects as e: + url = httpbin("relative-redirect", "20") + assert e.request.url == url + assert e.response.url == url + assert len(e.response.history) == 30 + else: + pytest.fail("Expected redirect to raise TooManyRedirects but it did not") + + def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin): + s = requests.session() + s.max_redirects = 5 + try: + s.get(httpbin("relative-redirect", "50")) + except TooManyRedirects as e: + url = httpbin("relative-redirect", "45") + assert e.request.url == url + assert e.response.url == url + assert len(e.response.history) == 5 + else: + pytest.fail( + "Expected custom max number of redirects to be respected but was not" + ) + + def test_http_301_changes_post_to_get(self, httpbin): + r = requests.post(httpbin("status", "301")) + assert r.status_code == 200 + assert r.request.method == "GET" + assert r.history[0].status_code == 301 + assert r.history[0].is_redirect + + def test_http_301_doesnt_change_head_to_get(self, httpbin): + r = requests.head(httpbin("status", "301"), allow_redirects=True) + print(r.content) + assert r.status_code == 200 + assert r.request.method == "HEAD" + assert r.history[0].status_code == 301 + assert r.history[0].is_redirect + + def test_http_302_changes_post_to_get(self, httpbin): + r = requests.post(httpbin("status", "302")) + assert r.status_code == 200 + assert r.request.method == "GET" + assert r.history[0].status_code == 302 + assert r.history[0].is_redirect + + def test_http_302_doesnt_change_head_to_get(self, httpbin): + r = requests.head(httpbin("status", "302"), allow_redirects=True) + assert r.status_code == 200 + assert r.request.method == "HEAD" + assert r.history[0].status_code == 302 + assert r.history[0].is_redirect + + def test_http_303_changes_post_to_get(self, httpbin): + r = requests.post(httpbin("status", "303")) + assert r.status_code == 200 + assert r.request.method == "GET" + assert r.history[0].status_code == 303 + assert r.history[0].is_redirect + + def test_http_303_doesnt_change_head_to_get(self, httpbin): + r = requests.head(httpbin("status", "303"), allow_redirects=True) + assert r.status_code == 200 + assert r.request.method == "HEAD" + assert r.history[0].status_code == 303 + assert r.history[0].is_redirect + + def test_header_and_body_removal_on_redirect(self, httpbin): + purged_headers = ("Content-Length", "Content-Type") + ses = requests.Session() + req = requests.Request("POST", httpbin("post"), data={"test": "data"}) + prep = ses.prepare_request(req) + resp = ses.send(prep) + + # Mimic a redirect response + resp.status_code = 302 + resp.headers["location"] = "get" + + # Run request through resolve_redirects + next_resp = next(ses.resolve_redirects(resp, prep)) + assert next_resp.request.body is None + for header in purged_headers: + assert header not in next_resp.request.headers + + def test_transfer_enc_removal_on_redirect(self, httpbin): + purged_headers = ("Transfer-Encoding", "Content-Type") + ses = requests.Session() + req = requests.Request("POST", httpbin("post"), data=(b"x" for x in range(1))) + prep = ses.prepare_request(req) + assert "Transfer-Encoding" in prep.headers + + # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33 + resp = requests.Response() + resp.raw = io.BytesIO(b"the content") + resp.request = prep + setattr(resp.raw, "release_conn", lambda *args: args) + + # Mimic a redirect response + resp.status_code = 302 + resp.headers["location"] = httpbin("get") + + # Run request through resolve_redirect + next_resp = next(ses.resolve_redirects(resp, prep)) + assert next_resp.request.body is None + for header in purged_headers: + assert header not in next_resp.request.headers + + def test_fragment_maintained_on_redirect(self, httpbin): + fragment = "#view=edit&token=hunter2" + r = requests.get(httpbin("redirect-to?url=get") + fragment) + + assert len(r.history) > 0 + assert r.history[0].request.url == httpbin("redirect-to?url=get") + fragment + assert r.url == httpbin("get") + fragment + + def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): + heads = {"User-agent": "Mozilla/5.0"} + + r = requests.get(httpbin("user-agent"), headers=heads) + + assert heads["User-agent"] in r.text + assert r.status_code == 200 + + def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): + heads = {"User-agent": "Mozilla/5.0"} + + r = requests.get( + httpbin("get") + "?test=true", params={"q": "test"}, headers=heads + ) + assert r.status_code == 200 + + def test_set_cookie_on_301(self, httpbin): + s = requests.session() + url = httpbin("cookies/set?foo=bar") + s.get(url) + assert s.cookies["foo"] == "bar" + + def test_cookie_sent_on_redirect(self, httpbin): + s = requests.session() + s.get(httpbin("cookies/set?foo=bar")) + r = s.get(httpbin("redirect/1")) # redirects to httpbin('get') + assert "Cookie" in r.json()["headers"] + + def test_cookie_removed_on_expire(self, httpbin): + s = requests.session() + s.get(httpbin("cookies/set?foo=bar")) + assert s.cookies["foo"] == "bar" + s.get( + httpbin("response-headers"), + params={"Set-Cookie": "foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT"}, + ) + assert "foo" not in s.cookies + + def test_cookie_quote_wrapped(self, httpbin): + s = requests.session() + s.get(httpbin('cookies/set?foo="bar:baz"')) + assert s.cookies["foo"] == '"bar:baz"' + + def test_cookie_persists_via_api(self, httpbin): + s = requests.session() + r = s.get(httpbin("redirect/1"), cookies={"foo": "bar"}) + assert "foo" in r.request.headers["Cookie"] + assert "foo" in r.history[0].request.headers["Cookie"] + + def test_request_cookie_overrides_session_cookie(self, httpbin): + s = requests.session() + s.cookies["foo"] = "bar" + r = s.get(httpbin("cookies"), cookies={"foo": "baz"}) + assert r.json()["cookies"]["foo"] == "baz" + # Session cookie should not be modified + assert s.cookies["foo"] == "bar" + + def test_request_cookies_not_persisted(self, httpbin): + s = requests.session() + s.get(httpbin("cookies"), cookies={"foo": "baz"}) + # Sending a request with cookies should not add cookies to the session + assert not s.cookies + + def test_generic_cookiejar_works(self, httpbin): + cj = cookielib.CookieJar() + cookiejar_from_dict({"foo": "bar"}, cj) + s = requests.session() + s.cookies = cj + r = s.get(httpbin("cookies")) + # Make sure the cookie was sent + assert r.json()["cookies"]["foo"] == "bar" + # Make sure the session cj is still the custom one + assert s.cookies is cj + + def test_param_cookiejar_works(self, httpbin): + cj = cookielib.CookieJar() + cookiejar_from_dict({"foo": "bar"}, cj) + s = requests.session() + r = s.get(httpbin("cookies"), cookies=cj) + # Make sure the cookie was sent + assert r.json()["cookies"]["foo"] == "bar" + + def test_cookielib_cookiejar_on_redirect(self, httpbin): + """Tests resolve_redirect doesn't fail when merging cookies + with non-RequestsCookieJar cookiejar. + + See GH #3579 + """ + cj = cookiejar_from_dict({"foo": "bar"}, cookielib.CookieJar()) + s = requests.Session() + s.cookies = cookiejar_from_dict({"cookie": "tasty"}) + + # Prepare request without using Session + req = requests.Request("GET", httpbin("headers"), cookies=cj) + prep_req = req.prepare() + + # Send request and simulate redirect + resp = s.send(prep_req) + resp.status_code = 302 + resp.headers["location"] = httpbin("get") + redirects = s.resolve_redirects(resp, prep_req) + resp = next(redirects) + + # Verify CookieJar isn't being converted to RequestsCookieJar + assert isinstance(prep_req._cookies, cookielib.CookieJar) + assert isinstance(resp.request._cookies, cookielib.CookieJar) + assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar) + + cookies = {} + for c in resp.request._cookies: + cookies[c.name] = c.value + assert cookies["foo"] == "bar" + assert cookies["cookie"] == "tasty" + + def test_requests_in_history_are_not_overridden(self, httpbin): + resp = requests.get(httpbin("redirect/3")) + urls = [r.url for r in resp.history] + req_urls = [r.request.url for r in resp.history] + assert urls == req_urls + + def test_history_is_always_a_list(self, httpbin): + """Show that even with redirects, Response.history is always a list.""" + resp = requests.get(httpbin("get")) + assert isinstance(resp.history, list) + resp = requests.get(httpbin("redirect/1")) + assert isinstance(resp.history, list) + assert not isinstance(resp.history, tuple) + + def test_headers_on_session_with_None_are_not_sent(self, httpbin): + """Do not send headers in Session.headers with None values.""" + ses = requests.Session() + ses.headers["Accept-Encoding"] = None + req = requests.Request("GET", httpbin("get")) + prep = ses.prepare_request(req) + assert "Accept-Encoding" not in prep.headers + + def test_headers_preserve_order(self, httpbin): + """Preserve order when headers provided as OrderedDict.""" + ses = requests.Session() + ses.headers = collections.OrderedDict() + ses.headers["Accept-Encoding"] = "identity" + ses.headers["First"] = "1" + ses.headers["Second"] = "2" + headers = collections.OrderedDict([("Third", "3"), ("Fourth", "4")]) + headers["Fifth"] = "5" + headers["Second"] = "222" + req = requests.Request("GET", httpbin("get"), headers=headers) + prep = ses.prepare_request(req) + items = list(prep.headers.items()) + assert items[0] == ("Accept-Encoding", "identity") + assert items[1] == ("First", "1") + assert items[2] == ("Second", "222") + assert items[3] == ("Third", "3") + assert items[4] == ("Fourth", "4") + assert items[5] == ("Fifth", "5") + + @pytest.mark.parametrize("key", ("User-agent", "user-agent")) + def test_user_agent_transfers(self, httpbin, key): + heads = {key: "Mozilla/5.0 (github.com/psf/requests)"} + + r = requests.get(httpbin("user-agent"), headers=heads) + assert heads[key] in r.text + + def test_HTTP_200_OK_HEAD(self, httpbin): + r = requests.head(httpbin("get")) + assert r.status_code == 200 + + def test_HTTP_200_OK_PUT(self, httpbin): + r = requests.put(httpbin("put")) + assert r.status_code == 200 + + def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): + auth = ("user", "pass") + url = httpbin("basic-auth", "user", "pass") + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + r = requests.get(url) + assert r.status_code == 401 + + s = requests.session() + s.auth = auth + r = s.get(url) + assert r.status_code == 200 + + @pytest.mark.parametrize( + "username, password", + ( + ("user", "pass"), + ("имя".encode(), "пароль".encode()), + (42, 42), + (None, None), + ), + ) + def test_set_basicauth(self, httpbin, username, password): + auth = (username, password) + url = httpbin("get") + + r = requests.Request("GET", url, auth=auth) + p = r.prepare() + + assert p.headers["Authorization"] == _basic_auth_str(username, password) + + def test_basicauth_encodes_byte_strings(self): + """Ensure b'test' formats as the byte string "test" rather + than the unicode string "b'test'" in Python 3. + """ + auth = (b"\xc5\xafsername", b"test\xc6\xb6") + r = requests.Request("GET", "http://localhost", auth=auth) + p = r.prepare() + + assert p.headers["Authorization"] == "Basic xa9zZXJuYW1lOnRlc3TGtg==" + + @pytest.mark.parametrize( + "url, exception", + ( + # Connecting to an unknown domain should raise a ConnectionError + ("http://doesnotexist.google.com", ConnectionError), + # Connecting to an invalid port should raise a ConnectionError + ("http://localhost:1", ConnectionError), + # Inputing a URL that cannot be parsed should raise an InvalidURL error + ("http://fe80::5054:ff:fe5a:fc0", InvalidURL), + ), + ) + def test_errors(self, url, exception): + with pytest.raises(exception): + requests.get(url, timeout=1) + + def test_proxy_error(self): + # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError + with pytest.raises(ProxyError): + requests.get( + "http://localhost:1", proxies={"http": "non-resolvable-address"} + ) + + def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure): + with pytest.raises(InvalidProxyURL): + requests.get(httpbin_secure(), proxies={"https": "http:/badproxyurl:3128"}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin(), proxies={"http": "http://:8080"}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin_secure(), proxies={"https": "https://"}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin(), proxies={"http": "http:///example.com:8080"}) + + def test_respect_proxy_env_on_send_self_prepared_request(self, httpbin): + with override_environ(http_proxy=INVALID_PROXY): + with pytest.raises(ProxyError): + session = requests.Session() + request = requests.Request("GET", httpbin()) + session.send(request.prepare()) + + def test_respect_proxy_env_on_send_session_prepared_request(self, httpbin): + with override_environ(http_proxy=INVALID_PROXY): + with pytest.raises(ProxyError): + session = requests.Session() + request = requests.Request("GET", httpbin()) + prepared = session.prepare_request(request) + session.send(prepared) + + def test_respect_proxy_env_on_send_with_redirects(self, httpbin): + with override_environ(http_proxy=INVALID_PROXY): + with pytest.raises(ProxyError): + session = requests.Session() + url = httpbin("redirect/1") + print(url) + request = requests.Request("GET", url) + session.send(request.prepare()) + + def test_respect_proxy_env_on_get(self, httpbin): + with override_environ(http_proxy=INVALID_PROXY): + with pytest.raises(ProxyError): + session = requests.Session() + session.get(httpbin()) + + def test_respect_proxy_env_on_request(self, httpbin): + with override_environ(http_proxy=INVALID_PROXY): + with pytest.raises(ProxyError): + session = requests.Session() + session.request(method="GET", url=httpbin()) + + def test_proxy_authorization_preserved_on_request(self, httpbin): + proxy_auth_value = "Bearer XXX" + session = requests.Session() + session.headers.update({"Proxy-Authorization": proxy_auth_value}) + resp = session.request(method="GET", url=httpbin("get")) + sent_headers = resp.json().get("headers", {}) + + assert sent_headers.get("Proxy-Authorization") == proxy_auth_value + + @pytest.mark.parametrize( + "url,has_proxy_auth", + ( + ("http://example.com", True), + ("https://example.com", False), + ), + ) + def test_proxy_authorization_not_appended_to_https_request( + self, url, has_proxy_auth + ): + session = requests.Session() + proxies = { + "http": "http://test:pass@localhost:8080", + "https": "http://test:pass@localhost:8090", + } + req = requests.Request("GET", url) + prep = req.prepare() + session.rebuild_proxies(prep, proxies) + + assert ("Proxy-Authorization" in prep.headers) is has_proxy_auth + + def test_basicauth_with_netrc(self, httpbin): + auth = ("user", "pass") + wrong_auth = ("wronguser", "wrongpass") + url = httpbin("basic-auth", "user", "pass") + + old_auth = requests.sessions.get_netrc_auth + + try: + + def get_netrc_auth_mock(url): + return auth + + requests.sessions.get_netrc_auth = get_netrc_auth_mock + + # Should use netrc and work. + r = requests.get(url) + assert r.status_code == 200 + + # Given auth should override and fail. + r = requests.get(url, auth=wrong_auth) + assert r.status_code == 401 + + s = requests.session() + + # Should use netrc and work. + r = s.get(url) + assert r.status_code == 200 + + # Given auth should override and fail. + s.auth = wrong_auth + r = s.get(url) + assert r.status_code == 401 + finally: + requests.sessions.get_netrc_auth = old_auth + + def test_basicauth_with_netrc_leak(self, httpbin): + url1 = httpbin("basic-auth", "user", "pass") + url = url1[len("http://") :] + domain = url.split(":")[0] + url = f"http://example.com:@{url}" + + netrc_file = "" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as fp: + fp.write("machine example.com\n") + fp.write("login wronguser\n") + fp.write("password wrongpass\n") + fp.write(f"machine {domain}\n") + fp.write("login user\n") + fp.write("password pass\n") + fp.close() + netrc_file = fp.name + + old_netrc = os.environ.get("NETRC", "") + os.environ["NETRC"] = netrc_file + + try: + # Should use netrc + # Make sure that we don't use the example.com credentails + # for the request + r = requests.get(url) + assert r.status_code == 200 + finally: + os.environ["NETRC"] = old_netrc + os.unlink(netrc_file) + + def test_DIGEST_HTTP_200_OK_GET(self, httpbin): + for authtype in self.digest_auth_algo: + auth = HTTPDigestAuth("user", "pass") + url = httpbin("digest-auth", "auth", "user", "pass", authtype, "never") + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + r = requests.get(url) + assert r.status_code == 401 + print(r.headers["WWW-Authenticate"]) + + s = requests.session() + s.auth = HTTPDigestAuth("user", "pass") + r = s.get(url) + assert r.status_code == 200 + + def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): + for authtype in self.digest_auth_algo: + url = httpbin("digest-auth", "auth", "user", "pass", authtype) + auth = HTTPDigestAuth("user", "pass") + r = requests.get(url) + assert r.cookies["fake"] == "fake_value" + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): + for authtype in self.digest_auth_algo: + url = httpbin("digest-auth", "auth", "user", "pass", authtype) + auth = HTTPDigestAuth("user", "pass") + s = requests.Session() + s.get(url, auth=auth) + assert s.cookies["fake"] == "fake_value" + + def test_DIGEST_STREAM(self, httpbin): + for authtype in self.digest_auth_algo: + auth = HTTPDigestAuth("user", "pass") + url = httpbin("digest-auth", "auth", "user", "pass", authtype) + + r = requests.get(url, auth=auth, stream=True) + assert r.raw.read() != b"" + + r = requests.get(url, auth=auth, stream=False) + assert r.raw.read() == b"" + + def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): + for authtype in self.digest_auth_algo: + auth = HTTPDigestAuth("user", "wrongpass") + url = httpbin("digest-auth", "auth", "user", "pass", authtype) + + r = requests.get(url, auth=auth) + assert r.status_code == 401 + + r = requests.get(url) + assert r.status_code == 401 + + s = requests.session() + s.auth = auth + r = s.get(url) + assert r.status_code == 401 + + def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): + for authtype in self.digest_auth_algo: + auth = HTTPDigestAuth("user", "pass") + url = httpbin("digest-auth", "auth", "user", "pass", authtype) + + r = requests.get(url, auth=auth) + assert '"auth"' in r.request.headers["Authorization"] + + def test_POSTBIN_GET_POST_FILES(self, httpbin): + url = httpbin("post") + requests.post(url).raise_for_status() + + post1 = requests.post(url, data={"some": "data"}) + assert post1.status_code == 200 + + with open("requirements-dev.txt") as f: + post2 = requests.post(url, files={"some": f}) + assert post2.status_code == 200 + + post4 = requests.post(url, data='[{"some": "json"}]') + assert post4.status_code == 200 + + with pytest.raises(ValueError): + requests.post(url, files=["bad file data"]) + + def test_invalid_files_input(self, httpbin): + url = httpbin("post") + post = requests.post(url, files={"random-file-1": None, "random-file-2": 1}) + assert b'name="random-file-1"' not in post.request.body + assert b'name="random-file-2"' in post.request.body + + def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin): + class TestStream: + def __init__(self, data): + self.data = data.encode() + self.length = len(self.data) + self.index = 0 + + def __len__(self): + return self.length + + def read(self, size=None): + if size: + ret = self.data[self.index : self.index + size] + self.index += size + else: + ret = self.data[self.index :] + self.index = self.length + return ret + + def tell(self): + return self.index + + def seek(self, offset, where=0): + if where == 0: + self.index = offset + elif where == 1: + self.index += offset + elif where == 2: + self.index = self.length + offset + + test = TestStream("test") + post1 = requests.post(httpbin("post"), data=test) + assert post1.status_code == 200 + assert post1.json()["data"] == "test" + + test = TestStream("test") + test.seek(2) + post2 = requests.post(httpbin("post"), data=test) + assert post2.status_code == 200 + assert post2.json()["data"] == "st" + + def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): + url = httpbin("post") + requests.post(url).raise_for_status() + + post1 = requests.post(url, data={"some": "data"}) + assert post1.status_code == 200 + + with open("requirements-dev.txt") as f: + post2 = requests.post(url, data={"some": "data"}, files={"some": f}) + assert post2.status_code == 200 + + post4 = requests.post(url, data='[{"some": "json"}]') + assert post4.status_code == 200 + + with pytest.raises(ValueError): + requests.post(url, files=["bad file data"]) + + def test_post_with_custom_mapping(self, httpbin): + class CustomMapping(MutableMapping): + def __init__(self, *args, **kwargs): + self.data = dict(*args, **kwargs) + + def __delitem__(self, key): + del self.data[key] + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value + + def __iter__(self): + return iter(self.data) + + def __len__(self): + return len(self.data) + + data = CustomMapping({"some": "data"}) + url = httpbin("post") + found_json = requests.post(url, data=data).json().get("form") + assert found_json == {"some": "data"} + + def test_conflicting_post_params(self, httpbin): + url = httpbin("post") + with open("requirements-dev.txt") as f: + with pytest.raises(ValueError): + requests.post(url, data='[{"some": "data"}]', files={"some": f}) + + def test_request_ok_set(self, httpbin): + r = requests.get(httpbin("status", "404")) + assert not r.ok + + def test_status_raising(self, httpbin): + r = requests.get(httpbin("status", "404")) + with pytest.raises(requests.exceptions.HTTPError): + r.raise_for_status() + + r = requests.get(httpbin("status", "500")) + assert not r.ok + + def test_decompress_gzip(self, httpbin): + r = requests.get(httpbin("gzip")) + r.content.decode("ascii") + + @pytest.mark.parametrize( + "url, params", + ( + ("/get", {"foo": "føø"}), + ("/get", {"føø": "føø"}), + ("/get", {"føø": "føø"}), + ("/get", {"foo": "foo"}), + ("ø", {"foo": "foo"}), + ), + ) + def test_unicode_get(self, httpbin, url, params): + requests.get(httpbin(url), params=params) + + def test_unicode_header_name(self, httpbin): + requests.put( + httpbin("put"), + headers={"Content-Type": "application/octet-stream"}, + data="\xff", + ) # compat.str is unicode. + + def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle): + requests.get(httpbin_secure("status", "301"), verify=httpbin_ca_bundle) + + def test_invalid_ca_certificate_path(self, httpbin_secure): + INVALID_PATH = "/garbage" + with pytest.raises(IOError) as e: + requests.get(httpbin_secure(), verify=INVALID_PATH) + assert str( + e.value + ) == "Could not find a suitable TLS CA certificate bundle, invalid path: {}".format( + INVALID_PATH + ) + + def test_invalid_ssl_certificate_files(self, httpbin_secure): + INVALID_PATH = "/garbage" + with pytest.raises(IOError) as e: + requests.get(httpbin_secure(), cert=INVALID_PATH) + assert str( + e.value + ) == "Could not find the TLS certificate file, invalid path: {}".format( + INVALID_PATH + ) + + with pytest.raises(IOError) as e: + requests.get(httpbin_secure(), cert=(".", INVALID_PATH)) + assert str(e.value) == ( + f"Could not find the TLS key file, invalid path: {INVALID_PATH}" + ) + + @pytest.mark.parametrize( + "env, expected", + ( + ({}, True), + ({"REQUESTS_CA_BUNDLE": "/some/path"}, "/some/path"), + ({"REQUESTS_CA_BUNDLE": ""}, True), + ({"CURL_CA_BUNDLE": "/some/path"}, "/some/path"), + ({"CURL_CA_BUNDLE": ""}, True), + ({"REQUESTS_CA_BUNDLE": "", "CURL_CA_BUNDLE": ""}, True), + ( + { + "REQUESTS_CA_BUNDLE": "/some/path", + "CURL_CA_BUNDLE": "/curl/path", + }, + "/some/path", + ), + ( + { + "REQUESTS_CA_BUNDLE": "", + "CURL_CA_BUNDLE": "/curl/path", + }, + "/curl/path", + ), + ), + ) + def test_env_cert_bundles(self, httpbin, env, expected): + s = requests.Session() + with mock.patch("os.environ", env): + settings = s.merge_environment_settings( + url=httpbin("get"), proxies={}, stream=False, verify=True, cert=None + ) + assert settings["verify"] == expected + + def test_http_with_certificate(self, httpbin): + r = requests.get(httpbin(), cert=".") + assert r.status_code == 200 + + @pytest.mark.skipif( + SNIMissingWarning is None, + reason="urllib3 2.0 removed that warning and errors out instead", + ) + def test_https_warnings(self, nosan_server): + """warnings are emitted with requests.get""" + host, port, ca_bundle = nosan_server + if HAS_MODERN_SSL or HAS_PYOPENSSL: + warnings_expected = ("SubjectAltNameWarning",) + else: + warnings_expected = ( + "SNIMissingWarning", + "InsecurePlatformWarning", + "SubjectAltNameWarning", + ) + + with pytest.warns() as warning_records: + warnings.simplefilter("always") + requests.get(f"https://localhost:{port}/", verify=ca_bundle) + + warning_records = [ + item + for item in warning_records + if item.category.__name__ != "ResourceWarning" + ] + + warnings_category = tuple(item.category.__name__ for item in warning_records) + assert warnings_category == warnings_expected + + def test_certificate_failure(self, httpbin_secure): + """ + When underlying SSL problems occur, an SSLError is raised. + """ + with pytest.raises(RequestsSSLError): + # Our local httpbin does not have a trusted CA, so this call will + # fail if we use our default trust bundle. + requests.get(httpbin_secure("status", "200")) + + def test_urlencoded_get_query_multivalued_param(self, httpbin): + r = requests.get(httpbin("get"), params={"test": ["foo", "baz"]}) + assert r.status_code == 200 + assert r.url == httpbin("get?test=foo&test=baz") + + def test_form_encoded_post_query_multivalued_element(self, httpbin): + r = requests.Request( + method="POST", url=httpbin("post"), data=dict(test=["foo", "baz"]) + ) + prep = r.prepare() + assert prep.body == "test=foo&test=baz" + + def test_different_encodings_dont_break_post(self, httpbin): + with open(__file__, "rb") as f: + r = requests.post( + httpbin("post"), + data={"stuff": json.dumps({"a": 123})}, + params={"blah": "asdf1234"}, + files={"file": ("test_requests.py", f)}, + ) + assert r.status_code == 200 + + @pytest.mark.parametrize( + "data", + ( + {"stuff": "ëlïxr"}, + {"stuff": "ëlïxr".encode()}, + {"stuff": "elixr"}, + {"stuff": b"elixr"}, + ), + ) + def test_unicode_multipart_post(self, httpbin, data): + with open(__file__, "rb") as f: + r = requests.post( + httpbin("post"), + data=data, + files={"file": ("test_requests.py", f)}, + ) + assert r.status_code == 200 + + def test_unicode_multipart_post_fieldnames(self, httpbin): + filename = os.path.splitext(__file__)[0] + ".py" + with open(filename, "rb") as f: + r = requests.Request( + method="POST", + url=httpbin("post"), + data={b"stuff": "elixr"}, + files={"file": ("test_requests.py", f)}, + ) + prep = r.prepare() + + assert b'name="stuff"' in prep.body + assert b"name=\"b'stuff'\"" not in prep.body + + def test_unicode_method_name(self, httpbin): + with open(__file__, "rb") as f: + files = {"file": f} + r = requests.request( + method="POST", + url=httpbin("post"), + files=files, + ) + assert r.status_code == 200 + + def test_unicode_method_name_with_request_object(self, httpbin): + s = requests.Session() + with open(__file__, "rb") as f: + files = {"file": f} + req = requests.Request("POST", httpbin("post"), files=files) + prep = s.prepare_request(req) + assert isinstance(prep.method, builtin_str) + assert prep.method == "POST" + + resp = s.send(prep) + assert resp.status_code == 200 + + def test_non_prepared_request_error(self): + s = requests.Session() + req = requests.Request("POST", "/") + + with pytest.raises(ValueError) as e: + s.send(req) + assert str(e.value) == "You can only send PreparedRequests." + + def test_custom_content_type(self, httpbin): + with open(__file__, "rb") as f1: + with open(__file__, "rb") as f2: + data = {"stuff": json.dumps({"a": 123})} + files = { + "file1": ("test_requests.py", f1), + "file2": ("test_requests", f2, "text/py-content-type"), + } + r = requests.post(httpbin("post"), data=data, files=files) + assert r.status_code == 200 + assert b"text/py-content-type" in r.request.body + + def test_hook_receives_request_arguments(self, httpbin): + def hook(resp, **kwargs): + assert resp is not None + assert kwargs != {} + + s = requests.Session() + r = requests.Request("GET", httpbin(), hooks={"response": hook}) + prep = s.prepare_request(r) + s.send(prep) + + def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): + def hook(*args, **kwargs): + pass + + s = requests.Session() + s.hooks["response"].append(hook) + r = requests.Request("GET", httpbin()) + prep = s.prepare_request(r) + assert prep.hooks["response"] != [] + assert prep.hooks["response"] == [hook] + + def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): + def hook1(*args, **kwargs): + pass + + def hook2(*args, **kwargs): + pass + + assert hook1 is not hook2 + s = requests.Session() + s.hooks["response"].append(hook2) + r = requests.Request("GET", httpbin(), hooks={"response": [hook1]}) + prep = s.prepare_request(r) + assert prep.hooks["response"] == [hook1] + + def test_prepared_request_hook(self, httpbin): + def hook(resp, **kwargs): + resp.hook_working = True + return resp + + req = requests.Request("GET", httpbin(), hooks={"response": hook}) + prep = req.prepare() + + s = requests.Session() + s.proxies = getproxies() + resp = s.send(prep) + + assert hasattr(resp, "hook_working") + + def test_prepared_from_session(self, httpbin): + class DummyAuth(requests.auth.AuthBase): + def __call__(self, r): + r.headers["Dummy-Auth-Test"] = "dummy-auth-test-ok" + return r + + req = requests.Request("GET", httpbin("headers")) + assert not req.auth + + s = requests.Session() + s.auth = DummyAuth() + + prep = s.prepare_request(req) + resp = s.send(prep) + + assert resp.json()["headers"]["Dummy-Auth-Test"] == "dummy-auth-test-ok" + + def test_prepare_request_with_bytestring_url(self): + req = requests.Request("GET", b"https://httpbin.org/") + s = requests.Session() + prep = s.prepare_request(req) + assert prep.url == "https://httpbin.org/" + + def test_request_with_bytestring_host(self, httpbin): + s = requests.Session() + resp = s.request( + "GET", + httpbin("cookies/set?cookie=value"), + allow_redirects=False, + headers={"Host": b"httpbin.org"}, + ) + assert resp.cookies.get("cookie") == "value" + + def test_links(self): + r = requests.Response() + r.headers = { + "cache-control": "public, max-age=60, s-maxage=60", + "connection": "keep-alive", + "content-encoding": "gzip", + "content-type": "application/json; charset=utf-8", + "date": "Sat, 26 Jan 2013 16:47:56 GMT", + "etag": '"6ff6a73c0e446c1f61614769e3ceb778"', + "last-modified": "Sat, 26 Jan 2013 16:22:39 GMT", + "link": ( + "; rel="next", ; " + ' rel="last"' + ), + "server": "GitHub.com", + "status": "200 OK", + "vary": "Accept", + "x-content-type-options": "nosniff", + "x-github-media-type": "github.beta", + "x-ratelimit-limit": "60", + "x-ratelimit-remaining": "57", + } + assert r.links["next"]["rel"] == "next" + + def test_cookie_parameters(self): + key = "some_cookie" + value = "some_value" + secure = True + domain = "test.com" + rest = {"HttpOnly": True} + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value, secure=secure, domain=domain, rest=rest) + + assert len(jar) == 1 + assert "some_cookie" in jar + + cookie = list(jar)[0] + assert cookie.secure == secure + assert cookie.domain == domain + assert cookie._rest["HttpOnly"] == rest["HttpOnly"] + + def test_cookie_as_dict_keeps_len(self): + key = "some_cookie" + value = "some_value" + + key1 = "some_cookie1" + value1 = "some_value1" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert len(jar) == 2 + assert len(d1) == 2 + assert len(d2) == 2 + assert len(d3) == 2 + + def test_cookie_as_dict_keeps_items(self): + key = "some_cookie" + value = "some_value" + + key1 = "some_cookie1" + value1 = "some_value1" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert d1["some_cookie"] == "some_value" + assert d2["some_cookie"] == "some_value" + assert d3["some_cookie1"] == "some_value1" + + def test_cookie_as_dict_keys(self): + key = "some_cookie" + value = "some_value" + + key1 = "some_cookie1" + value1 = "some_value1" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + keys = jar.keys() + assert keys == list(keys) + # make sure one can use keys multiple times + assert list(keys) == list(keys) + + def test_cookie_as_dict_values(self): + key = "some_cookie" + value = "some_value" + + key1 = "some_cookie1" + value1 = "some_value1" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + values = jar.values() + assert values == list(values) + # make sure one can use values multiple times + assert list(values) == list(values) + + def test_cookie_as_dict_items(self): + key = "some_cookie" + value = "some_value" + + key1 = "some_cookie1" + value1 = "some_value1" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + items = jar.items() + assert items == list(items) + # make sure one can use items multiple times + assert list(items) == list(items) + + def test_cookie_duplicate_names_different_domains(self): + key = "some_cookie" + value = "some_value" + domain1 = "test1.com" + domain2 = "test2.com" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value, domain=domain1) + jar.set(key, value, domain=domain2) + assert key in jar + items = jar.items() + assert len(items) == 2 + + # Verify that CookieConflictError is raised if domain is not specified + with pytest.raises(requests.cookies.CookieConflictError): + jar.get(key) + + # Verify that CookieConflictError is not raised if domain is specified + cookie = jar.get(key, domain=domain1) + assert cookie == value + + def test_cookie_duplicate_names_raises_cookie_conflict_error(self): + key = "some_cookie" + value = "some_value" + path = "some_path" + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value, path=path) + jar.set(key, value) + with pytest.raises(requests.cookies.CookieConflictError): + jar.get(key) + + def test_cookie_policy_copy(self): + class MyCookiePolicy(cookielib.DefaultCookiePolicy): + pass + + jar = requests.cookies.RequestsCookieJar() + jar.set_policy(MyCookiePolicy()) + assert isinstance(jar.copy().get_policy(), MyCookiePolicy) + + def test_cookie_policy_preserved_in_prepare_request(self): + """Test that custom cookie policies are preserved and consulted during prepare_request.""" + class RejectingCookiePolicy(cookielib.DefaultCookiePolicy): + def return_ok(self, cookie, request): + # Reject all cookies to verify policy is consulted + return False + + # Create session with custom cookie policy + s = requests.Session() + s.cookies = requests.cookies.RequestsCookieJar(policy=RejectingCookiePolicy()) + + # Add a cookie to the session + s.cookies.set('test_cookie', 'test_value', domain='example.com') + + # Create and prepare a request + req = requests.Request('GET', 'https://example.com') + prep_req = s.prepare_request(req) + + # Verify that the cookie header is None because our policy rejected it + assert prep_req.headers.get('Cookie') is None + + # Verify that the session's cookie jar still has the cookie + assert len(s.cookies) == 1 + assert s.cookies.get('test_cookie') == 'test_value' + + def test_time_elapsed_blank(self, httpbin): + r = requests.get(httpbin("get")) td = r.elapsed + total_seconds = ( + td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6 + ) / 10**6 + assert total_seconds > 0.0 + + def test_empty_response_has_content_none(self): + r = requests.Response() + assert r.content is None + + def test_response_is_iterable(self): + r = requests.Response() + io = StringIO.StringIO("abc") + read_ = io.read + + def read_mock(amt, decode_content=None): + return read_(amt) + + setattr(io, "read", read_mock) + r.raw = io + assert next(iter(r)) + io.close() + + def test_response_decode_unicode(self): + """When called with decode_unicode, Response.iter_content should always + return unicode. + """ + r = requests.Response() + r._content_consumed = True + r._content = b"the content" + r.encoding = "ascii" + + chunks = r.iter_content(decode_unicode=True) + assert all(isinstance(chunk, str) for chunk in chunks) + + # also for streaming + r = requests.Response() + r.raw = io.BytesIO(b"the content") + r.encoding = "ascii" + chunks = r.iter_content(decode_unicode=True) + assert all(isinstance(chunk, str) for chunk in chunks) + + def test_response_reason_unicode(self): + # check for unicode HTTP status + r = requests.Response() + r.url = "unicode URL" + r.reason = "Komponenttia ei löydy".encode() + r.status_code = 404 + r.encoding = None + assert not r.ok # old behaviour - crashes here + + def test_response_reason_unicode_fallback(self): + # check raise_status falls back to ISO-8859-1 + r = requests.Response() + r.url = "some url" + reason = "Komponenttia ei löydy" + r.reason = reason.encode("latin-1") + r.status_code = 500 + r.encoding = None + with pytest.raises(requests.exceptions.HTTPError) as e: + r.raise_for_status() + assert reason in e.value.args[0] + + def test_response_chunk_size_type(self): + """Ensure that chunk_size is passed as None or an integer, otherwise + raise a TypeError. + """ + r = requests.Response() + r.raw = io.BytesIO(b"the content") + chunks = r.iter_content(1) + assert all(len(chunk) == 1 for chunk in chunks) + + r = requests.Response() + r.raw = io.BytesIO(b"the content") + chunks = r.iter_content(None) + assert list(chunks) == [b"the content"] + + r = requests.Response() + r.raw = io.BytesIO(b"the content") + with pytest.raises(TypeError): + chunks = r.iter_content("1024") + + @pytest.mark.parametrize( + "exception, args, expected", + ( + (urllib3.exceptions.ProtocolError, tuple(), ChunkedEncodingError), + (urllib3.exceptions.DecodeError, tuple(), ContentDecodingError), + (urllib3.exceptions.ReadTimeoutError, (None, "", ""), ConnectionError), + (urllib3.exceptions.SSLError, tuple(), RequestsSSLError), + ), + ) + def test_iter_content_wraps_exceptions(self, httpbin, exception, args, expected): + r = requests.Response() + r.raw = mock.Mock() + # ReadTimeoutError can't be initialized by mock + # so we'll manually create the instance with args + r.raw.stream.side_effect = exception(*args) + + with pytest.raises(expected): + next(r.iter_content(1024)) + + def test_request_and_response_are_pickleable(self, httpbin): + r = requests.get(httpbin("get")) + + # verify we can pickle the original request + assert pickle.loads(pickle.dumps(r.request)) + + # verify we can pickle the response and that we have access to + # the original request. + pr = pickle.loads(pickle.dumps(r)) + assert r.request.url == pr.request.url + assert r.request.headers == pr.request.headers + + def test_prepared_request_is_pickleable(self, httpbin): + p = requests.Request("GET", httpbin("get")).prepare() + + # Verify PreparedRequest can be pickled and unpickled + r = pickle.loads(pickle.dumps(p)) + assert r.url == p.url + assert r.headers == p.headers + assert r.body == p.body + + # Verify unpickled PreparedRequest sends properly + s = requests.Session() + resp = s.send(r) + assert resp.status_code == 200 + + def test_prepared_request_with_file_is_pickleable(self, httpbin): + with open(__file__, "rb") as f: + r = requests.Request("POST", httpbin("post"), files={"file": f}) + p = r.prepare() + + # Verify PreparedRequest can be pickled and unpickled + r = pickle.loads(pickle.dumps(p)) + assert r.url == p.url + assert r.headers == p.headers + assert r.body == p.body + + # Verify unpickled PreparedRequest sends properly + s = requests.Session() + resp = s.send(r) + assert resp.status_code == 200 + + def test_prepared_request_with_hook_is_pickleable(self, httpbin): + r = requests.Request("GET", httpbin("get"), hooks=default_hooks()) + p = r.prepare() + + # Verify PreparedRequest can be pickled + r = pickle.loads(pickle.dumps(p)) + assert r.url == p.url + assert r.headers == p.headers + assert r.body == p.body + assert r.hooks == p.hooks + + # Verify unpickled PreparedRequest sends properly + s = requests.Session() + resp = s.send(r) + assert resp.status_code == 200 + + def test_cannot_send_unprepared_requests(self, httpbin): + r = requests.Request(url=httpbin()) + with pytest.raises(ValueError): + requests.Session().send(r) + + def test_http_error(self): + error = requests.exceptions.HTTPError() + assert not error.response + response = requests.Response() + error = requests.exceptions.HTTPError(response=response) + assert error.response == response + error = requests.exceptions.HTTPError("message", response=response) + assert str(error) == "message" + assert error.response == response + + def test_session_pickling(self, httpbin): + r = requests.Request("GET", httpbin("get")) + s = requests.Session() + + s = pickle.loads(pickle.dumps(s)) + s.proxies = getproxies() + + r = s.send(r.prepare()) + assert r.status_code == 200 + + def test_fixes_1329(self, httpbin): + """Ensure that header updates are done case-insensitively.""" + s = requests.Session() + s.headers.update({"ACCEPT": "BOGUS"}) + s.headers.update({"accept": "application/json"}) + r = s.get(httpbin("get")) + headers = r.request.headers + assert headers["accept"] == "application/json" + assert headers["Accept"] == "application/json" + assert headers["ACCEPT"] == "application/json" + + def test_uppercase_scheme_redirect(self, httpbin): + parts = urlparse(httpbin("html")) + url = "HTTP://" + parts.netloc + parts.path + r = requests.get(httpbin("redirect-to"), params={"url": url}) + assert r.status_code == 200 + assert r.url.lower() == url.lower() + + def test_transport_adapter_ordering(self): + s = requests.Session() + order = ["https://", "http://"] + assert order == list(s.adapters) + s.mount("http://git", HTTPAdapter()) + s.mount("http://github", HTTPAdapter()) + s.mount("http://github.com", HTTPAdapter()) + s.mount("http://github.com/about/", HTTPAdapter()) + order = [ + "http://github.com/about/", + "http://github.com", + "http://github", + "http://git", + "https://", + "http://", + ] + assert order == list(s.adapters) + s.mount("http://gittip", HTTPAdapter()) + s.mount("http://gittip.com", HTTPAdapter()) + s.mount("http://gittip.com/about/", HTTPAdapter()) + order = [ + "http://github.com/about/", + "http://gittip.com/about/", + "http://github.com", + "http://gittip.com", + "http://github", + "http://gittip", + "http://git", + "https://", + "http://", + ] + assert order == list(s.adapters) + s2 = requests.Session() + s2.adapters = {"http://": HTTPAdapter()} + s2.mount("https://", HTTPAdapter()) + assert "http://" in s2.adapters + assert "https://" in s2.adapters + + def test_session_get_adapter_prefix_matching(self): + prefix = "https://example.com" + more_specific_prefix = prefix + "/some/path" + + url_matching_only_prefix = prefix + "/another/path" + url_matching_more_specific_prefix = more_specific_prefix + "/longer/path" + url_not_matching_prefix = "https://another.example.com/" + + s = requests.Session() + prefix_adapter = HTTPAdapter() + more_specific_prefix_adapter = HTTPAdapter() + s.mount(prefix, prefix_adapter) + s.mount(more_specific_prefix, more_specific_prefix_adapter) + + assert s.get_adapter(url_matching_only_prefix) is prefix_adapter + assert ( + s.get_adapter(url_matching_more_specific_prefix) + is more_specific_prefix_adapter + ) + assert s.get_adapter(url_not_matching_prefix) not in ( + prefix_adapter, + more_specific_prefix_adapter, + ) + + def test_session_get_adapter_prefix_matching_mixed_case(self): + mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix" + url_matching_prefix = mixed_case_prefix + "/full_url" + + s = requests.Session() + my_adapter = HTTPAdapter() + s.mount(mixed_case_prefix, my_adapter) + + assert s.get_adapter(url_matching_prefix) is my_adapter + + def test_session_get_adapter_prefix_matching_is_case_insensitive(self): + mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix" + url_matching_prefix_with_different_case = ( + "HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url" + ) + + s = requests.Session() + my_adapter = HTTPAdapter() + s.mount(mixed_case_prefix, my_adapter) + + assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter + + def test_session_get_adapter_prefix_with_trailing_slash(self): + # from issue #6935 + prefix = "https://example.com/" # trailing slash + url_matching_prefix = "https://example.com/some/path" + url_not_matching_prefix = "https://example.com.other.com/some/path" + + s = requests.Session() + adapter = HTTPAdapter() + s.mount(prefix, adapter) + + assert s.get_adapter(url_matching_prefix) is adapter + assert s.get_adapter(url_not_matching_prefix) is not adapter + + def test_session_get_adapter_prefix_without_trailing_slash(self): + # from issue #6935 + prefix = "https://example.com" # no trailing slash + url_matching_prefix = "https://example.com/some/path" + url_extended_hostname = "https://example.com.other.com/some/path" + + s = requests.Session() + adapter = HTTPAdapter() + s.mount(prefix, adapter) + + assert s.get_adapter(url_matching_prefix) is adapter + assert s.get_adapter(url_extended_hostname) is adapter + + def test_header_remove_is_case_insensitive(self, httpbin): + # From issue #1321 + s = requests.Session() + s.headers["foo"] = "bar" + r = s.get(httpbin("get"), headers={"FOO": None}) + assert "foo" not in r.request.headers + + def test_params_are_merged_case_sensitive(self, httpbin): + s = requests.Session() + s.params["foo"] = "bar" + r = s.get(httpbin("get"), params={"FOO": "bar"}) + assert r.json()["args"] == {"foo": "bar", "FOO": "bar"} + + def test_long_authinfo_in_url(self): + url = "http://{}:{}@{}:9000/path?query#frag".format( + "E8A3BE87-9E3F-4620-8858-95478E385B5B", + "EA770032-DA4D-4D84-8CE9-29C6D910BF1E", + "exactly-------------sixty-----------three------------characters", + ) + r = requests.Request("GET", url).prepare() + assert r.url == url + + def test_header_keys_are_native(self, httpbin): + headers = {"unicode": "blah", b"byte": "blah"} + r = requests.Request("GET", httpbin("get"), headers=headers) + p = r.prepare() + + # This is testing that they are builtin strings. A bit weird, but there + # we go. + assert "unicode" in p.headers.keys() + assert "byte" in p.headers.keys() + + def test_header_validation(self, httpbin): + """Ensure prepare_headers regex isn't flagging valid header contents.""" + valid_headers = { + "foo": "bar baz qux", + "bar": b"fbbq", + "baz": "", + "qux": "1", + } + r = requests.get(httpbin("get"), headers=valid_headers) + for key in valid_headers.keys(): + assert valid_headers[key] == r.request.headers[key] + + @pytest.mark.parametrize( + "invalid_header, key", + ( + ({"foo": 3}, "foo"), + ({"bar": {"foo": "bar"}}, "bar"), + ({"baz": ["foo", "bar"]}, "baz"), + ), + ) + def test_header_value_not_str(self, httpbin, invalid_header, key): + """Ensure the header value is of type string or bytes as + per discussion in GH issue #3386 + """ + with pytest.raises(InvalidHeader) as excinfo: + requests.get(httpbin("get"), headers=invalid_header) + assert key in str(excinfo.value) + + @pytest.mark.parametrize( + "invalid_header", + ( + {"foo": "bar\r\nbaz: qux"}, + {"foo": "bar\n\rbaz: qux"}, + {"foo": "bar\nbaz: qux"}, + {"foo": "bar\rbaz: qux"}, + {"fo\ro": "bar"}, + {"fo\r\no": "bar"}, + {"fo\n\ro": "bar"}, + {"fo\no": "bar"}, + ), + ) + def test_header_no_return_chars(self, httpbin, invalid_header): + """Ensure that a header containing return character sequences raise an + exception. Otherwise, multiple headers are created from single string. + """ + with pytest.raises(InvalidHeader): + requests.get(httpbin("get"), headers=invalid_header) + + @pytest.mark.parametrize( + "invalid_header", + ( + {" foo": "bar"}, + {"\tfoo": "bar"}, + {" foo": "bar"}, + {"foo": " bar"}, + {"foo": " bar"}, + {"foo": "\tbar"}, + {" ": "bar"}, + ), + ) + def test_header_no_leading_space(self, httpbin, invalid_header): + """Ensure headers containing leading whitespace raise + InvalidHeader Error before sending. + """ + with pytest.raises(InvalidHeader): + requests.get(httpbin("get"), headers=invalid_header) + + def test_header_with_subclass_types(self, httpbin): + """If the subclasses does not behave *exactly* like + the base bytes/str classes, this is not supported. + This test is for backwards compatibility. + """ + + class MyString(str): + pass + + class MyBytes(bytes): + pass + + r_str = requests.get(httpbin("get"), headers={MyString("x-custom"): "myheader"}) + assert r_str.request.headers["x-custom"] == "myheader" + + r_bytes = requests.get( + httpbin("get"), headers={MyBytes(b"x-custom"): b"myheader"} + ) + assert r_bytes.request.headers["x-custom"] == b"myheader" + + r_mixed = requests.get( + httpbin("get"), headers={MyString("x-custom"): MyBytes(b"myheader")} + ) + assert r_mixed.request.headers["x-custom"] == b"myheader" + + @pytest.mark.parametrize("files", ("foo", b"foo", bytearray(b"foo"))) + def test_can_send_objects_with_files(self, httpbin, files): + data = {"a": "this is a string"} + files = {"b": files} + r = requests.Request("POST", httpbin("post"), data=data, files=files) + p = r.prepare() + assert "multipart/form-data" in p.headers["Content-Type"] + + def test_can_send_file_object_with_non_string_filename(self, httpbin): + f = io.BytesIO() + f.name = 2 + r = requests.Request("POST", httpbin("post"), files={"f": f}) + p = r.prepare() + + assert "multipart/form-data" in p.headers["Content-Type"] + + def test_autoset_header_values_are_native(self, httpbin): + data = "this is a string" + length = "16" + req = requests.Request("POST", httpbin("post"), data=data) + p = req.prepare() + + assert p.headers["Content-Length"] == length + + def test_nonhttp_schemes_dont_check_URLs(self): + test_urls = ( + "", + "file:///etc/passwd", + "magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431", + ) + for test_url in test_urls: + req = requests.Request("GET", test_url) + preq = req.prepare() + assert test_url == preq.url + + def test_auth_is_stripped_on_http_downgrade( + self, httpbin, httpbin_secure, httpbin_ca_bundle + ): + r = requests.get( + httpbin_secure("redirect-to"), + params={"url": httpbin("get")}, + auth=("user", "pass"), + verify=httpbin_ca_bundle, + ) + assert r.history[0].request.headers["Authorization"] + assert "Authorization" not in r.request.headers + + def test_auth_is_retained_for_redirect_on_host(self, httpbin): + r = requests.get(httpbin("redirect/1"), auth=("user", "pass")) + h1 = r.history[0].request.headers["Authorization"] + h2 = r.request.headers["Authorization"] + + assert h1 == h2 + + def test_should_strip_auth_host_change(self): + s = requests.Session() + assert s.should_strip_auth( + "http://example.com/foo", "http://another.example.com/" + ) + + def test_should_strip_auth_http_downgrade(self): + s = requests.Session() + assert s.should_strip_auth("https://example.com/foo", "http://example.com/bar") + + def test_should_strip_auth_https_upgrade(self): + s = requests.Session() + assert not s.should_strip_auth( + "http://example.com/foo", "https://example.com/bar" + ) + assert not s.should_strip_auth( + "http://example.com:80/foo", "https://example.com/bar" + ) + assert not s.should_strip_auth( + "http://example.com/foo", "https://example.com:443/bar" + ) + # Non-standard ports should trigger stripping + assert s.should_strip_auth( + "http://example.com:8080/foo", "https://example.com/bar" + ) + assert s.should_strip_auth( + "http://example.com/foo", "https://example.com:8443/bar" + ) + + def test_should_strip_auth_port_change(self): + s = requests.Session() + assert s.should_strip_auth( + "http://example.com:1234/foo", "https://example.com:4321/bar" + ) + + @pytest.mark.parametrize( + "old_uri, new_uri", + ( + ("https://example.com:443/foo", "https://example.com/bar"), + ("http://example.com:80/foo", "http://example.com/bar"), + ("https://example.com/foo", "https://example.com:443/bar"), + ("http://example.com/foo", "http://example.com:80/bar"), + ), + ) + def test_should_strip_auth_default_port(self, old_uri, new_uri): + s = requests.Session() + assert not s.should_strip_auth(old_uri, new_uri) + + def test_manual_redirect_with_partial_body_read(self, httpbin): + s = requests.Session() + r1 = s.get(httpbin("redirect/2"), allow_redirects=False, stream=True) + assert r1.is_redirect + rg = s.resolve_redirects(r1, r1.request, stream=True) + + # read only the first eight bytes of the response body, + # then follow the redirect + r1.iter_content(8) + r2 = next(rg) + assert r2.is_redirect + + # read all of the response via iter_content, + # then follow the redirect + for _ in r2.iter_content(): + pass + r3 = next(rg) + assert not r3.is_redirect + + def test_prepare_body_position_non_stream(self): + data = b"the data" + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position is None + + def test_rewind_body(self): + data = io.BytesIO(b"the data") + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position == 0 + assert prep.body.read() == b"the data" + + # the data has all been read + assert prep.body.read() == b"" + + # rewind it back + requests.utils.rewind_body(prep) + assert prep.body.read() == b"the data" + + def test_rewind_partially_read_body(self): + data = io.BytesIO(b"the data") + data.read(4) # read some data + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position == 4 + assert prep.body.read() == b"data" + + # the data has all been read + assert prep.body.read() == b"" + + # rewind it back + requests.utils.rewind_body(prep) + assert prep.body.read() == b"data" + + def test_rewind_body_no_seek(self): + class BadFileObj: + def __init__(self, data): + self.data = data + + def tell(self): + return 0 + + def __iter__(self): + return + + data = BadFileObj("the data") + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position == 0 + + with pytest.raises(UnrewindableBodyError) as e: + requests.utils.rewind_body(prep) + + assert "Unable to rewind request body" in str(e) + + def test_rewind_body_failed_seek(self): + class BadFileObj: + def __init__(self, data): + self.data = data + + def tell(self): + return 0 + + def seek(self, pos, whence=0): + raise OSError() + + def __iter__(self): + return + + data = BadFileObj("the data") + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position == 0 + + with pytest.raises(UnrewindableBodyError) as e: + requests.utils.rewind_body(prep) + + assert "error occurred when rewinding request body" in str(e) + + def test_rewind_body_failed_tell(self): + class BadFileObj: + def __init__(self, data): + self.data = data + + def tell(self): + raise OSError() + + def __iter__(self): + return + + data = BadFileObj("the data") + prep = requests.Request("GET", "http://example.com", data=data).prepare() + assert prep._body_position is not None + + with pytest.raises(UnrewindableBodyError) as e: + requests.utils.rewind_body(prep) + + assert "Unable to rewind request body" in str(e) + + def _patch_adapter_gzipped_redirect(self, session, url): + adapter = session.get_adapter(url=url) + org_build_response = adapter.build_response + self._patched_response = False + + def build_response(*args, **kwargs): + resp = org_build_response(*args, **kwargs) + if not self._patched_response: + resp.raw.headers["content-encoding"] = "gzip" + self._patched_response = True + return resp + + adapter.build_response = build_response + + def test_redirect_with_wrong_gzipped_header(self, httpbin): + s = requests.Session() + url = httpbin("redirect/1") + self._patch_adapter_gzipped_redirect(s, url) + s.get(url) + + @pytest.mark.parametrize( + "username, password, auth_str", + ( + ("test", "test", "Basic dGVzdDp0ZXN0"), + ( + "имя".encode(), + "пароль".encode(), + "Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==", + ), + ), + ) + def test_basic_auth_str_is_always_native(self, username, password, auth_str): + s = _basic_auth_str(username, password) + assert isinstance(s, builtin_str) + assert s == auth_str + + def test_requests_history_is_saved(self, httpbin): + r = requests.get(httpbin("redirect/5")) + total = r.history[-1].history + i = 0 + for item in r.history: + assert item.history == total[0:i] + i += 1 + + def test_json_param_post_content_type_works(self, httpbin): + r = requests.post(httpbin("post"), json={"life": 42}) + assert r.status_code == 200 + assert "application/json" in r.request.headers["Content-Type"] + assert {"life": 42} == r.json()["json"] + + def test_json_param_post_should_not_override_data_param(self, httpbin): + r = requests.Request( + method="POST", + url=httpbin("post"), + data={"stuff": "elixr"}, + json={"music": "flute"}, + ) + prep = r.prepare() + assert "stuff=elixr" == prep.body + + def test_response_iter_lines(self, httpbin): + r = requests.get(httpbin("stream/4"), stream=True) + assert r.status_code == 200 + + it = r.iter_lines() + next(it) + assert len(list(it)) == 3 + + def test_response_context_manager(self, httpbin): + with requests.get(httpbin("stream/4"), stream=True) as response: + assert isinstance(response, requests.Response) + + assert response.raw.closed + + def test_unconsumed_session_response_closes_connection(self, httpbin): + s = requests.session() + + with contextlib.closing(s.get(httpbin("stream/4"), stream=True)) as response: + pass + + assert response._content_consumed is False + assert response.raw.closed + + @pytest.mark.xfail + def test_response_iter_lines_reentrant(self, httpbin): + """Response.iter_lines() is not reentrant safe""" + r = requests.get(httpbin("stream/4"), stream=True) + assert r.status_code == 200 + + next(r.iter_lines()) + assert len(list(r.iter_lines())) == 3 + + def test_session_close_proxy_clear(self): + proxies = { + "one": mock.Mock(), + "two": mock.Mock(), + } + session = requests.Session() + with mock.patch.dict(session.adapters["http://"].proxy_manager, proxies): + session.close() + proxies["one"].clear.assert_called_once_with() + proxies["two"].clear.assert_called_once_with() + + def test_proxy_auth(self): + adapter = HTTPAdapter() + headers = adapter.proxy_headers("http://user:pass@httpbin.org") + assert headers == {"Proxy-Authorization": "Basic dXNlcjpwYXNz"} + + def test_proxy_auth_empty_pass(self): + adapter = HTTPAdapter() + headers = adapter.proxy_headers("http://user:@httpbin.org") + assert headers == {"Proxy-Authorization": "Basic dXNlcjo="} + + def test_response_json_when_content_is_None(self, httpbin): + r = requests.get(httpbin("/status/204")) + # Make sure r.content is None + r.status_code = 0 + r._content = False + r._content_consumed = False + + assert r.content is None + with pytest.raises(ValueError): + r.json() + + def test_response_without_release_conn(self): + """Test `close` call for non-urllib3-like raw objects. + Should work when `release_conn` attr doesn't exist on `response.raw`. + """ + resp = requests.Response() + resp.raw = StringIO.StringIO("test") + assert not resp.raw.closed + resp.close() + assert resp.raw.closed + + def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin): + """Ensure that a byte stream with size 0 will not set both a Content-Length + and Transfer-Encoding header. + """ + auth = ("user", "pass") + url = httpbin("post") + file_obj = io.BytesIO(b"") + r = requests.Request("POST", url, auth=auth, data=file_obj) + prepared_request = r.prepare() + assert "Transfer-Encoding" in prepared_request.headers + assert "Content-Length" not in prepared_request.headers + + def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin): + """Ensure that a byte stream with size > 0 will not set both a Content-Length + and Transfer-Encoding header. + """ + auth = ("user", "pass") + url = httpbin("post") + file_obj = io.BytesIO(b"test data") + r = requests.Request("POST", url, auth=auth, data=file_obj) + prepared_request = r.prepare() + assert "Transfer-Encoding" not in prepared_request.headers + assert "Content-Length" in prepared_request.headers + + def test_chunked_upload_does_not_set_content_length_header(self, httpbin): + """Ensure that requests with a generator body stream using + Transfer-Encoding: chunked, not a Content-Length header. + """ + data = (i for i in [b"a", b"b", b"c"]) + url = httpbin("post") + r = requests.Request("POST", url, data=data) + prepared_request = r.prepare() + assert "Transfer-Encoding" in prepared_request.headers + assert "Content-Length" not in prepared_request.headers + + def test_custom_redirect_mixin(self, httpbin): + """Tests a custom mixin to overwrite ``get_redirect_target``. + + Ensures a subclassed ``requests.Session`` can handle a certain type of + malformed redirect responses. + + 1. original request receives a proper response: 302 redirect + 2. following the redirect, a malformed response is given: + status code = HTTP 200 + location = alternate url + 3. the custom session catches the edge case and follows the redirect + """ + url_final = httpbin("html") + querystring_malformed = urlencode({"location": url_final}) + url_redirect_malformed = httpbin("response-headers?%s" % querystring_malformed) + querystring_redirect = urlencode({"url": url_redirect_malformed}) + url_redirect = httpbin("redirect-to?%s" % querystring_redirect) + urls_test = [ + url_redirect, + url_redirect_malformed, + url_final, + ] + + class CustomRedirectSession(requests.Session): + def get_redirect_target(self, resp): + # default behavior + if resp.is_redirect: + return resp.headers["location"] + # edge case - check to see if 'location' is in headers anyways + location = resp.headers.get("location") + if location and (location != resp.url): + return location + return None + + session = CustomRedirectSession() + r = session.get(urls_test[0]) + assert len(r.history) == 2 + assert r.status_code == 200 + assert r.history[0].status_code == 302 + assert r.history[0].is_redirect + assert r.history[1].status_code == 200 + assert not r.history[1].is_redirect + assert r.url == urls_test[2] + + +class TestCaseInsensitiveDict: + @pytest.mark.parametrize( + "cid", + ( + CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}), + CaseInsensitiveDict([("Foo", "foo"), ("BAr", "bar")]), + CaseInsensitiveDict(FOO="foo", BAr="bar"), + ), + ) + def test_init(self, cid): + assert len(cid) == 2 + assert "foo" in cid + assert "bar" in cid + + def test_docstring_example(self): + cid = CaseInsensitiveDict() + cid["Accept"] = "application/json" + assert cid["aCCEPT"] == "application/json" + assert list(cid) == ["Accept"] + + def test_len(self): + cid = CaseInsensitiveDict({"a": "a", "b": "b"}) + cid["A"] = "a" + assert len(cid) == 2 + + def test_getitem(self): + cid = CaseInsensitiveDict({"Spam": "blueval"}) + assert cid["spam"] == "blueval" + assert cid["SPAM"] == "blueval" + + def test_fixes_649(self): + """__setitem__ should behave case-insensitively.""" + cid = CaseInsensitiveDict() + cid["spam"] = "oneval" + cid["Spam"] = "twoval" + cid["sPAM"] = "redval" + cid["SPAM"] = "blueval" + assert cid["spam"] == "blueval" + assert cid["SPAM"] == "blueval" + assert list(cid.keys()) == ["SPAM"] + + def test_delitem(self): + cid = CaseInsensitiveDict() + cid["Spam"] = "someval" + del cid["sPam"] + assert "spam" not in cid + assert len(cid) == 0 + + def test_contains(self): + cid = CaseInsensitiveDict() + cid["Spam"] = "someval" + assert "Spam" in cid + assert "spam" in cid + assert "SPAM" in cid + assert "sPam" in cid + assert "notspam" not in cid + + def test_get(self): + cid = CaseInsensitiveDict() + cid["spam"] = "oneval" + cid["SPAM"] = "blueval" + assert cid.get("spam") == "blueval" + assert cid.get("SPAM") == "blueval" + assert cid.get("sPam") == "blueval" + assert cid.get("notspam", "default") == "default" + + def test_update(self): + cid = CaseInsensitiveDict() + cid["spam"] = "blueval" + cid.update({"sPam": "notblueval"}) + assert cid["spam"] == "notblueval" + cid = CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}) + cid.update({"fOO": "anotherfoo", "bAR": "anotherbar"}) + assert len(cid) == 2 + assert cid["foo"] == "anotherfoo" + assert cid["bar"] == "anotherbar" + + def test_update_retains_unchanged(self): + cid = CaseInsensitiveDict({"foo": "foo", "bar": "bar"}) + cid.update({"foo": "newfoo"}) + assert cid["bar"] == "bar" + + def test_iter(self): + cid = CaseInsensitiveDict({"Spam": "spam", "Eggs": "eggs"}) + keys = frozenset(["Spam", "Eggs"]) + assert frozenset(iter(cid)) == keys + + def test_equality(self): + cid = CaseInsensitiveDict({"SPAM": "blueval", "Eggs": "redval"}) + othercid = CaseInsensitiveDict({"spam": "blueval", "eggs": "redval"}) + assert cid == othercid + del othercid["spam"] + assert cid != othercid + assert cid == {"spam": "blueval", "eggs": "redval"} + assert cid != object() + + def test_setdefault(self): + cid = CaseInsensitiveDict({"Spam": "blueval"}) + assert cid.setdefault("spam", "notblueval") == "blueval" + assert cid.setdefault("notspam", "notblueval") == "notblueval" + + def test_lower_items(self): + cid = CaseInsensitiveDict( + { + "Accept": "application/json", + "user-Agent": "requests", + } + ) + keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) + lowerkeyset = frozenset(["accept", "user-agent"]) + assert keyset == lowerkeyset + + def test_preserve_key_case(self): + cid = CaseInsensitiveDict( + { + "Accept": "application/json", + "user-Agent": "requests", + } + ) + keyset = frozenset(["Accept", "user-Agent"]) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset + + def test_preserve_last_key_case(self): + cid = CaseInsensitiveDict( + { + "Accept": "application/json", + "user-Agent": "requests", + } + ) + cid.update({"ACCEPT": "application/json"}) + cid["USER-AGENT"] = "requests" + keyset = frozenset(["ACCEPT", "USER-AGENT"]) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset + + def test_copy(self): + cid = CaseInsensitiveDict( + { + "Accept": "application/json", + "user-Agent": "requests", + } + ) + cid_copy = cid.copy() + assert cid == cid_copy + cid["changed"] = True + assert cid != cid_copy + + +class TestMorselToCookieExpires: + """Tests for morsel_to_cookie when morsel contains expires.""" + + def test_expires_valid_str(self): + """Test case where we convert expires from string time.""" + + morsel = Morsel() + morsel["expires"] = "Thu, 01-Jan-1970 00:00:01 GMT" + cookie = morsel_to_cookie(morsel) + assert cookie.expires == 1 + + @pytest.mark.parametrize( + "value, exception", + ( + (100, TypeError), + ("woops", ValueError), + ), + ) + def test_expires_invalid_int(self, value, exception): + """Test case where an invalid type is passed for expires.""" + morsel = Morsel() + morsel["expires"] = value + with pytest.raises(exception): + morsel_to_cookie(morsel) + + def test_expires_none(self): + """Test case where expires is None.""" + + morsel = Morsel() + morsel["expires"] = None + cookie = morsel_to_cookie(morsel) + assert cookie.expires is None + + +class TestMorselToCookieMaxAge: + + """Tests for morsel_to_cookie when morsel contains max-age.""" + + def test_max_age_valid_int(self): + """Test case where a valid max age in seconds is passed.""" + + morsel = Morsel() + morsel["max-age"] = 60 + cookie = morsel_to_cookie(morsel) + assert isinstance(cookie.expires, int) + + def test_max_age_invalid_str(self): + """Test case where a invalid max age is passed.""" + + morsel = Morsel() + morsel["max-age"] = "woops" + with pytest.raises(TypeError): + morsel_to_cookie(morsel) + + +class TestTimeout: + def test_stream_timeout(self, httpbin): + try: + requests.get(httpbin("delay/10"), timeout=2.0) + except requests.exceptions.Timeout as e: + assert "Read timed out" in e.args[0].args[0] + + @pytest.mark.parametrize( + "timeout, error_text", + ( + ((3, 4, 5), "(connect, read)"), + ("foo", "must be an int, float or None"), + ), + ) + def test_invalid_timeout(self, httpbin, timeout, error_text): + with pytest.raises(ValueError) as e: + requests.get(httpbin("get"), timeout=timeout) + assert error_text in str(e) + + @pytest.mark.parametrize("timeout", (None, Urllib3Timeout(connect=None, read=None))) + def test_none_timeout(self, httpbin, timeout): + """Check that you can set None as a valid timeout value. + + To actually test this behavior, we'd want to check that setting the + timeout to None actually lets the request block past the system default + timeout. However, this would make the test suite unbearably slow. + Instead we verify that setting the timeout to None does not prevent the + request from succeeding. + """ + r = requests.get(httpbin("get"), timeout=timeout) + assert r.status_code == 200 + + @pytest.mark.parametrize( + "timeout", ((None, 0.1), Urllib3Timeout(connect=None, read=0.1)) + ) + def test_read_timeout(self, httpbin, timeout): + try: + requests.get(httpbin("delay/10"), timeout=timeout) + pytest.fail("The recv() request should time out.") + except ReadTimeout: + pass + + @pytest.mark.parametrize( + "timeout", ((0.1, None), Urllib3Timeout(connect=0.1, read=None)) + ) + def test_connect_timeout(self, timeout): + try: + requests.get(TARPIT, timeout=timeout) + pytest.fail("The connect() request should time out.") + except ConnectTimeout as e: + assert isinstance(e, ConnectionError) + assert isinstance(e, Timeout) + + @pytest.mark.parametrize( + "timeout", ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1)) + ) + def test_total_timeout_connect(self, timeout): + try: + requests.get(TARPIT, timeout=timeout) + pytest.fail("The connect() request should time out.") + except ConnectTimeout: + pass + + def test_encoded_methods(self, httpbin): + """See: https://github.com/psf/requests/issues/2316""" + r = requests.request(b"GET", httpbin("get")) + assert r.ok + + +SendCall = collections.namedtuple("SendCall", ("args", "kwargs")) + + +class RedirectSession(SessionRedirectMixin): + def __init__(self, order_of_redirects): + self.redirects = order_of_redirects + self.calls = [] + self.max_redirects = 30 + self.cookies = {} + self.trust_env = False + + def send(self, *args, **kwargs): + self.calls.append(SendCall(args, kwargs)) + return self.build_response() + + def build_response(self): + request = self.calls[-1].args[0] + r = requests.Response() + + try: + r.status_code = int(self.redirects.pop(0)) + except IndexError: + r.status_code = 200 + + r.headers = CaseInsensitiveDict({"Location": "/"}) + r.raw = self._build_raw() + r.request = request + return r + + def _build_raw(self): + string = StringIO.StringIO("") + setattr(string, "release_conn", lambda *args: args) + return string + + +def test_json_encodes_as_bytes(): + # urllib3 expects bodies as bytes-like objects + body = {"key": "value"} + p = PreparedRequest() + p.prepare(method="GET", url="https://www.example.com/", json=body) + assert isinstance(p.body, bytes) + + +def test_requests_are_updated_each_time(httpbin): + session = RedirectSession([303, 307]) + prep = requests.Request("POST", httpbin("post")).prepare() + r0 = session.send(prep) + assert r0.request.method == "POST" + assert session.calls[-1] == SendCall((r0.request,), {}) + redirect_generator = session.resolve_redirects(r0, prep) + default_keyword_args = { + "stream": False, + "verify": True, + "cert": None, + "timeout": None, + "allow_redirects": False, + "proxies": {}, + } + for response in redirect_generator: + assert response.request.method == "GET" + send_call = SendCall((response.request,), default_keyword_args) + assert session.calls[-1] == send_call + + +@pytest.mark.parametrize( + "var,url,proxy", + [ + ("http_proxy", "http://example.com", "socks5://proxy.com:9876"), + ("https_proxy", "https://example.com", "socks5://proxy.com:9876"), + ("all_proxy", "http://example.com", "socks5://proxy.com:9876"), + ("all_proxy", "https://example.com", "socks5://proxy.com:9876"), + ], +) +def test_proxy_env_vars_override_default(var, url, proxy): + session = requests.Session() + prep = PreparedRequest() + prep.prepare(method="GET", url=url) + + kwargs = {var: proxy} + scheme = urlparse(url).scheme + with override_environ(**kwargs): + proxies = session.rebuild_proxies(prep, {}) + assert scheme in proxies + assert proxies[scheme] == proxy + + +@pytest.mark.parametrize( + "data", + ( + (("a", "b"), ("c", "d")), + (("c", "d"), ("a", "b")), + (("a", "b"), ("c", "d"), ("e", "f")), + ), +) +def test_data_argument_accepts_tuples(data): + """Ensure that the data argument will accept tuples of strings + and properly encode them. + """ + p = PreparedRequest() + p.prepare( + method="GET", url="http://www.example.com", data=data, hooks=default_hooks() + ) + assert p.body == urlencode(data) + + +@pytest.mark.parametrize( + "kwargs", + ( + None, + { + "method": "GET", + "url": "http://www.example.com", + "data": "foo=bar", + "hooks": default_hooks(), + }, + { + "method": "GET", + "url": "http://www.example.com", + "data": "foo=bar", + "hooks": default_hooks(), + "cookies": {"foo": "bar"}, + }, + {"method": "GET", "url": "http://www.example.com/üniçø∂é"}, + ), +) +def test_prepared_copy(kwargs): + p = PreparedRequest() + if kwargs: + p.prepare(**kwargs) + copy = p.copy() + for attr in ("method", "url", "headers", "_cookies", "body", "hooks"): + assert getattr(p, attr) == getattr(copy, attr) + + +def test_urllib3_retries(httpbin): + from urllib3.util import Retry + + s = requests.Session() + s.mount("http://", HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500]))) + + with pytest.raises(RetryError): + s.get(httpbin("status/500")) + + +def test_urllib3_pool_connection_closed(httpbin): + s = requests.Session() + s.mount("http://", HTTPAdapter(pool_connections=0, pool_maxsize=0)) + + try: + s.get(httpbin("status/200")) + except ConnectionError as e: + assert "Pool is closed." in str(e) + + +class TestPreparingURLs: + @pytest.mark.parametrize( + "url,expected", + ( + ("http://google.com", "http://google.com/"), + ("http://ジェーピーニック.jp", "http://xn--hckqz9bzb1cyrb.jp/"), + ("http://xn--n3h.net/", "http://xn--n3h.net/"), + ("http://ジェーピーニック.jp".encode(), "http://xn--hckqz9bzb1cyrb.jp/"), + ("http://straße.de/straße", "http://xn--strae-oqa.de/stra%C3%9Fe"), + ( + "http://straße.de/straße".encode(), + "http://xn--strae-oqa.de/stra%C3%9Fe", + ), + ( + "http://Königsgäßchen.de/straße", + "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe", + ), + ( + "http://Königsgäßchen.de/straße".encode(), + "http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe", + ), + (b"http://xn--n3h.net/", "http://xn--n3h.net/"), + ( + b"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", + "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", + ), + ( + "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", + "http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/", + ), + ), + ) + def test_preparing_url(self, url, expected): + def normalize_percent_encode(x): + # Helper function that normalizes equivalent + # percent-encoded bytes before comparisons + for c in re.findall(r"%[a-fA-F0-9]{2}", x): + x = x.replace(c, c.upper()) + return x + + r = requests.Request("GET", url=url) + p = r.prepare() + assert normalize_percent_encode(p.url) == expected + + @pytest.mark.parametrize( + "url", + ( + b"http://*.google.com", + b"http://*", + "http://*.google.com", + "http://*", + "http://☃.net/", + ), + ) + def test_preparing_bad_url(self, url): + r = requests.Request("GET", url=url) + with pytest.raises(requests.exceptions.InvalidURL): + r.prepare() + + @pytest.mark.parametrize("url, exception", (("http://:1", InvalidURL),)) + def test_redirecting_to_bad_url(self, httpbin, url, exception): + with pytest.raises(exception): + requests.get(httpbin("redirect-to"), params={"url": url}) + + @pytest.mark.parametrize( + "input, expected", + ( + ( + b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", + "http+unix://%2Fvar%2Frun%2Fsocket/path~", + ), + ( + "http+unix://%2Fvar%2Frun%2Fsocket/path%7E", + "http+unix://%2Fvar%2Frun%2Fsocket/path~", + ), + ( + b"mailto:user@example.org", + "mailto:user@example.org", + ), + ( + "mailto:user@example.org", + "mailto:user@example.org", + ), + ( + b"data:SSDimaUgUHl0aG9uIQ==", + "data:SSDimaUgUHl0aG9uIQ==", + ), + ), + ) + def test_url_mutation(self, input, expected): + """ + This test validates that we correctly exclude some URLs from + preparation, and that we handle others. Specifically, it tests that + any URL whose scheme doesn't begin with "http" is left alone, and + those whose scheme *does* begin with "http" are mutated. + """ + r = requests.Request("GET", url=input) + p = r.prepare() + assert p.url == expected + + @pytest.mark.parametrize( + "input, params, expected", + ( + ( + b"http+unix://%2Fvar%2Frun%2Fsocket/path", + {"key": "value"}, + "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", + ), + ( + "http+unix://%2Fvar%2Frun%2Fsocket/path", + {"key": "value"}, + "http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", + ), + ( + b"mailto:user@example.org", + {"key": "value"}, + "mailto:user@example.org", + ), + ( + "mailto:user@example.org", + {"key": "value"}, + "mailto:user@example.org", + ), + ), + ) + def test_parameters_for_nonstandard_schemes(self, input, params, expected): + """ + Setting parameters for nonstandard schemes is allowed if those schemes + begin with "http", and is forbidden otherwise. + """ + r = requests.Request("GET", url=input, params=params) + p = r.prepare() + assert p.url == expected + + def test_post_json_nan(self, httpbin): + data = {"foo": float("nan")} + with pytest.raises(requests.exceptions.InvalidJSONError): + requests.post(httpbin("post"), json=data) + + def test_json_decode_compatibility(self, httpbin): + r = requests.get(httpbin("bytes/20")) + with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo: + r.json() + assert isinstance(excinfo.value, RequestException) + assert isinstance(excinfo.value, JSONDecodeError) + assert r.text not in str(excinfo.value) + + def test_json_decode_persists_doc_attr(self, httpbin): + r = requests.get(httpbin("bytes/20")) + with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo: + r.json() + assert excinfo.value.doc == r.text + + def test_status_code_425(self): + r1 = requests.codes.get("TOO_EARLY") + r2 = requests.codes.get("too_early") + r3 = requests.codes.get("UNORDERED") + r4 = requests.codes.get("unordered") + r5 = requests.codes.get("UNORDERED_COLLECTION") + r6 = requests.codes.get("unordered_collection") + + assert r1 == 425 + assert r2 == 425 + assert r3 == 425 + assert r4 == 425 + assert r5 == 425 + assert r6 == 425 + + def test_different_connection_pool_for_tls_settings_verify_True(self): + def response_handler(sock): + consume_socket_content(sock, timeout=0.5) + sock.send( + b"HTTP/1.1 200 OK\r\n" + b"Content-Length: 18\r\n\r\n" + b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' + ) + + s = requests.Session() + close_server = threading.Event() + server = TLSServer( + handler=response_handler, + wait_to_close_event=close_server, + requests_to_handle=3, + cert_chain="tests/certs/expired/server/server.pem", + keyfile="tests/certs/expired/server/server.key", + ) + + with server as (host, port): + url = f"https://{host}:{port}" + r1 = s.get(url, verify=False) + assert r1.status_code == 200 + + # Cannot verify self-signed certificate + with pytest.raises(requests.exceptions.SSLError): + s.get(url) + + close_server.set() + assert 2 == len(s.adapters["https://"].poolmanager.pools) + + def test_different_connection_pool_for_tls_settings_verify_bundle_expired_cert( + self, + ): + def response_handler(sock): + consume_socket_content(sock, timeout=0.5) + sock.send( + b"HTTP/1.1 200 OK\r\n" + b"Content-Length: 18\r\n\r\n" + b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' + ) + + s = requests.Session() + close_server = threading.Event() + server = TLSServer( + handler=response_handler, + wait_to_close_event=close_server, + requests_to_handle=3, + cert_chain="tests/certs/expired/server/server.pem", + keyfile="tests/certs/expired/server/server.key", + ) + + with server as (host, port): + url = f"https://{host}:{port}" + r1 = s.get(url, verify=False) + assert r1.status_code == 200 + + # Has right trust bundle, but certificate expired + with pytest.raises(requests.exceptions.SSLError): + s.get(url, verify="tests/certs/expired/ca/ca.crt") + + close_server.set() + assert 2 == len(s.adapters["https://"].poolmanager.pools) + + def test_different_connection_pool_for_tls_settings_verify_bundle_unexpired_cert( + self, + ): + def response_handler(sock): + consume_socket_content(sock, timeout=0.5) + sock.send( + b"HTTP/1.1 200 OK\r\n" + b"Content-Length: 18\r\n\r\n" + b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' + ) + + s = requests.Session() + close_server = threading.Event() + server = TLSServer( + handler=response_handler, + wait_to_close_event=close_server, + requests_to_handle=3, + cert_chain="tests/certs/valid/server/server.pem", + keyfile="tests/certs/valid/server/server.key", + ) + + with server as (host, port): + url = f"https://{host}:{port}" + r1 = s.get(url, verify=False) + assert r1.status_code == 200 + + r2 = s.get(url, verify="tests/certs/valid/ca/ca.crt") + assert r2.status_code == 200 + + close_server.set() + assert 2 == len(s.adapters["https://"].poolmanager.pools) + + def test_different_connection_pool_for_mtls_settings(self): + client_cert = None + + def response_handler(sock): + nonlocal client_cert + client_cert = sock.getpeercert() + consume_socket_content(sock, timeout=0.5) + sock.send( + b"HTTP/1.1 200 OK\r\n" + b"Content-Length: 18\r\n\r\n" + b'\xff\xfe{\x00"\x00K0"\x00=\x00"\x00\xab0"\x00\r\n' + ) + + s = requests.Session() + close_server = threading.Event() + server = TLSServer( + handler=response_handler, + wait_to_close_event=close_server, + requests_to_handle=2, + cert_chain="tests/certs/expired/server/server.pem", + keyfile="tests/certs/expired/server/server.key", + mutual_tls=True, + cacert="tests/certs/expired/ca/ca.crt", + ) + + cert = ( + "tests/certs/mtls/client/client.pem", + "tests/certs/mtls/client/client.key", + ) + with server as (host, port): + url = f"https://{host}:{port}" + r1 = s.get(url, verify=False, cert=cert) + assert r1.status_code == 200 + with pytest.raises(requests.exceptions.SSLError): + s.get(url, cert=cert) + close_server.set() + + assert client_cert is not None + + +def test_content_length_for_bytes_data(httpbin): + data = "This is a string containing multi-byte UTF-8 ☃️" + encoded_data = data.encode("utf-8") + length = str(len(encoded_data)) + req = requests.Request("POST", httpbin("post"), data=encoded_data) + p = req.prepare() + + assert p.headers["Content-Length"] == length + + +@pytest.mark.skipif( + is_urllib3_1, + reason="urllib3 2.x encodes all strings to utf-8, urllib3 1.x uses latin-1", +) +def test_content_length_for_string_data_counts_bytes(httpbin): + data = "This is a string containing multi-byte UTF-8 ☃️" + length = str(len(data.encode("utf-8"))) + req = requests.Request("POST", httpbin("post"), data=data) + p = req.prepare() + + assert p.headers["Content-Length"] == length + + +def test_json_decode_errors_are_serializable_deserializable(): + json_decode_error = requests.exceptions.JSONDecodeError( + "Extra data", + '{"responseCode":["706"],"data":null}{"responseCode":["706"],"data":null}', + 36, + ) + deserialized_error = pickle.loads(pickle.dumps(json_decode_error)) + assert repr(json_decode_error) == repr(deserialized_error)