-
The Fronius inverter for photovoltaic systems uses for its local REST API digest authentication. It uses in its initial response header Here an initial curl command with the response header:
httpx 0.27.2 fails to extract Lines 201 to 212 in 87713d2 My script import httpx
url = "http://192.168.1.141/config/emrs" # adjust IP to that one of your inverter
user = "technician"
pw = <INSERT PW>
auth = httpx.DigestAuth(user,pw)
with httpx.Client(auth=auth) as client:
r = client.get(url)
print(r.status_code)
print(r.json()['priorities'])
r = client.get(url)
print(r.status_code)
print(r.json()['priorities']) runs only with following quick diff ❯ git diff
diff --git a/httpx/_auth.py b/httpx/_auth.py
index b03971a..e4f1600 100644
--- a/httpx/_auth.py
+++ b/httpx/_auth.py
@@ -198,17 +198,27 @@ class DigestAuth(Auth):
response = yield request
- if response.status_code != 401 or "www-authenticate" not in response.headers:
+ auth_header_key = False
+ for key in [ "www-authenticate", "x-www-authenticate" ]:
+ if key in response.headers:
+ if auth_header_key == False:
+ auth_header_key = key
+ else:
+ message = "Malformed Digest WWW-Authenticate response header"
+ raise ProtocolError(message, request=request)
+
+ if response.status_code != 401 or auth_header_key == False:
# If the response is not a 401 then we don't
# need to build an authenticated request.
return
- for auth_header in response.headers.get_list("www-authenticate"):
+ for auth_header in response.headers.get_list(auth_header_key):
if auth_header.lower().startswith("digest "):
break
else:
# If the response does not include a 'WWW-Authenticate: Digest ...'
- # header, then we don't need to build an authenticated request.
+ # or 'X-WWW-Authenticate: Digest ...' header, then we don't need to
+ # build an authenticated request.
return
self._last_challenge = self._parse_challenge(request, response, auth_header) Note, this diff is only to show that it works. I did not follow any coding guidelines of this project. For reference, here a couple of projects which have written their custom digest authorization to handle the Fronius response header:
Software versions of the inverter:
I am not sure how common it is to use However, I still think supporting this deviation might be acceptable. If Fronius screwed this up more heavily than I think, I understand that httpx cannot support all quirks of all digest implementations. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
I doubt it. Guidelines for these types of case...
|
Beta Was this translation helpful? Give feedback.
-
thanks for your quick response.
IMHO it is not fun. But I can imagine how you mean it. Apparently the prefix
Other occurrences:
which also handles this. It might be the case that old AngularJS apps might have introduced it and the Fronius backend was developed in AngularJS.
Since the probably well used node.js module urllib supports it, is there any chance that this might be more convincing than the Fronius inverter on its own? |
Beta Was this translation helpful? Give feedback.
-
Here is a custom MWE: $ pipenv install httpx
$ touch auth.py test.py
auth.py
# modified copy of DigestAuth from https://github.com/encode/httpx/blob/master/httpx/_auth.py
from __future__ import annotations
import hashlib
import os
import re
import time
import typing
from base64 import b64encode
from urllib.request import parse_http_list
from httpx._exceptions import ProtocolError
from httpx._models import Cookies, Request, Response
from httpx._utils import to_bytes, to_str, unquote
if typing.TYPE_CHECKING: # pragma: no cover
from hashlib import _Hash
from httpx import Auth
class DigestAuthX(Auth):
_ALGORITHM_TO_HASH_FUNCTION: dict[str, typing.Callable[[bytes], _Hash]] = {
"MD5": hashlib.md5,
"MD5-SESS": hashlib.md5,
"SHA": hashlib.sha1,
"SHA-SESS": hashlib.sha1,
"SHA-256": hashlib.sha256,
"SHA-256-SESS": hashlib.sha256,
"SHA-512": hashlib.sha512,
"SHA-512-SESS": hashlib.sha512,
}
def __init__(self, username: str | bytes, password: str | bytes) -> None:
self._username = to_bytes(username)
self._password = to_bytes(password)
self._last_challenge: _DigestAuthChallenge | None = None
self._nonce_count = 1
def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
if self._last_challenge:
request.headers["Authorization"] = self._build_auth_header(
request, self._last_challenge
)
response = yield request
if response.status_code != 401 or "x-www-authenticate" not in response.headers:
# If the response is not a 401 then we don't
# need to build an authenticated request.
return
for auth_header in response.headers.get_list("x-www-authenticate"):
if auth_header.lower().startswith("digest "):
break
else:
# If the response does not include a 'X-WWW-Authenticate: Digest ...'
# header, then we don't need to build an authenticated request.
return
self._last_challenge = self._parse_challenge(request, response, auth_header)
self._nonce_count = 1
request.headers["Authorization"] = self._build_auth_header(
request, self._last_challenge
)
if response.cookies:
Cookies(response.cookies).set_cookie_header(request=request)
yield request
def _parse_challenge(
self, request: Request, response: Response, auth_header: str
) -> _DigestAuthChallenge:
"""
Returns a challenge from a Digest X-WWW-Authenticate header.
These take the form of:
`Digest realm="[email protected]",qop="auth,auth-int",nonce="abc",opaque="xyz"`
"""
scheme, _, fields = auth_header.partition(" ")
# This method should only ever have been called with a Digest auth header.
assert scheme.lower() == "digest"
header_dict: dict[str, str] = {}
for field in parse_http_list(fields):
key, value = field.strip().split("=", 1)
header_dict[key] = unquote(value)
try:
realm = header_dict["realm"].encode()
nonce = header_dict["nonce"].encode()
algorithm = header_dict.get("algorithm", "MD5")
opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
qop = header_dict["qop"].encode() if "qop" in header_dict else None
return _DigestAuthChallenge(
realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
)
except KeyError as exc:
message = "Malformed Digest WWW-Authenticate header"
raise ProtocolError(message, request=request) from exc
def _build_auth_header(
self, request: Request, challenge: _DigestAuthChallenge
) -> str:
hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]
def digest(data: bytes) -> bytes:
return hash_func(data).hexdigest().encode()
A1 = b":".join((self._username, challenge.realm, self._password))
path = request.url.raw_path
A2 = b":".join((request.method.encode(), path))
# TODO: implement auth-int
HA2 = digest(A2)
nc_value = b"%08x" % self._nonce_count
cnonce = self._get_client_nonce(self._nonce_count, challenge.nonce)
self._nonce_count += 1
HA1 = digest(A1)
if challenge.algorithm.lower().endswith("-sess"):
HA1 = digest(b":".join((HA1, challenge.nonce, cnonce)))
qop = self._resolve_qop(challenge.qop, request=request)
if qop is None:
# Following RFC 2069
digest_data = [HA1, challenge.nonce, HA2]
else:
# Following RFC 2617/7616
digest_data = [HA1, challenge.nonce, nc_value, cnonce, qop, HA2]
format_args = {
"username": self._username,
"realm": challenge.realm,
"nonce": challenge.nonce,
"uri": path,
"response": digest(b":".join(digest_data)),
"algorithm": challenge.algorithm.encode(),
}
if challenge.opaque:
format_args["opaque"] = challenge.opaque
if qop:
format_args["qop"] = b"auth"
format_args["nc"] = nc_value
format_args["cnonce"] = cnonce
return "Digest " + self._get_header_value(format_args)
def _get_client_nonce(self, nonce_count: int, nonce: bytes) -> bytes:
s = str(nonce_count).encode()
s += nonce
s += time.ctime().encode()
s += os.urandom(8)
return hashlib.sha1(s).hexdigest()[:16].encode()
def _get_header_value(self, header_fields: dict[str, bytes]) -> str:
NON_QUOTED_FIELDS = ("algorithm", "qop", "nc")
QUOTED_TEMPLATE = '{}="{}"'
NON_QUOTED_TEMPLATE = "{}={}"
header_value = ""
for i, (field, value) in enumerate(header_fields.items()):
if i > 0:
header_value += ", "
template = (
QUOTED_TEMPLATE
if field not in NON_QUOTED_FIELDS
else NON_QUOTED_TEMPLATE
)
header_value += template.format(field, to_str(value))
return header_value
def _resolve_qop(self, qop: bytes | None, request: Request) -> bytes | None:
if qop is None:
return None
qops = re.split(b", ?", qop)
if b"auth" in qops:
return b"auth"
if qops == [b"auth-int"]:
raise NotImplementedError("Digest auth-int support is not yet implemented")
message = f'Unexpected qop value "{qop!r}" in digest auth'
raise ProtocolError(message, request=request)
class _DigestAuthChallenge(typing.NamedTuple):
realm: bytes
nonce: bytes
algorithm: str
opaque: bytes | None
qop: bytes | None # file: test.py
import httpx
import pprint
from auth import DigestAuthX
url = "http://192.168.1.141/config/emrs"
user = "technician"
pw = "your pw"
auth = DigestAuthX(user, pw)
with httpx.Client(auth=auth) as client:
r = client.get(url)
pp = pprint.PrettyPrinter(depth=1)
pp.pprint(r.json()['priorities']) ❯ pipenv run python test.py
{'_batteries_meta': {...},
'_ios_meta': {...},
'_ohmpilots_meta': {...},
'_supply_ohmpilots_from_battery_meta': {...},
'batteries': 2,
'ios': 3,
'ohmpilots': 1,
'supply_ohmpilots_from_battery': False} |
Beta Was this translation helpful? Give feedback.
Here is a custom
DigestAuthX
for httpx. It is copy and paste of the originalDigestAuth
contained in httpx, simply modified the handling of the authenticate response header by adding the prefixX-
to the authentication key.MWE:
auth.py