X-Git-Url: https://pwan.org/git/?p=vern.git;a=blobdiff_plain;f=vern%2Fdigestauth.py;fp=vern%2Fdigestauth.py;h=1717579a95910f0a310ed8b064e57e51d480bf82;hp=0000000000000000000000000000000000000000;hb=4bb49e3d5b865935e7b16d1d69e59b02c068b95a;hpb=bb462271117f60af0bfbcdebd2809d77f15ba6a1 diff --git a/vern/digestauth.py b/vern/digestauth.py new file mode 100644 index 0000000..1717579 --- /dev/null +++ b/vern/digestauth.py @@ -0,0 +1,74 @@ +""" +mitmproxy addon +""" +import getpass +import re +import typing +import requests +from requests.auth import HTTPDigestAuth +from requests.utils import parse_dict_header +from mitmproxy import ctx +from mitmproxy.net.http import headers as nheaders + + +class DigestAuthenticator: + + def __init__(self): + self._username = None + self._password = None + + def load(self, loader): # pylint: disable=no-self-use + loader.add_option( + name="uname", + typespec=typing.Text, + default='', + help="Please enter your user name" + ) + + def configure(self, updates): # pylint: disable=unused-argument + if ctx.options.uname == '': + ctx.log.error("Please enter a non-empty value for uname with --set uname=your_username") + ctx.master.shutdown() + else: + self._username = ctx.options.uname + self._password = getpass.getpass("Please enter the password for " + self._username + ": ") + + def response(self, flow): + + if flow.response.status_code == 401 and 'Authorization' not in flow.request.headers: + + www_authenticate_header = flow.response.headers['WWW-Authenticate'] + + if 'digest' in www_authenticate_header.lower(): + + http_digest_auth = HTTPDigestAuth(self._username, self._password) + http_digest_auth.init_per_thread_state() + pat = re.compile(r'digest ', flags=re.IGNORECASE) + http_digest_auth._thread_local.chal = parse_dict_header(pat.sub('', www_authenticate_header, count=1)) # pylint: disable=protected-access + authorization_header = http_digest_auth.build_digest_header(flow.request.method, flow.request.url) + + resubmitted_headers = dict(flow.request.headers) + resubmitted_headers.update({'Authorization': str(authorization_header)}) + if flow.request.method == 'GET': + resubmitted_response = requests.get(flow.request.url, cookies=flow.request.cookies, headers=resubmitted_headers, allow_redirects=False) + else: + # TODO: This probably needs a data=payload arg... + resubmitted_response = requests.post(flow.request.url, headers=resubmitted_headers) + + if resubmitted_response.status_code == 301: + ctx.log.info("301 response headers: " + str(resubmitted_response.headers)) + + flow.response.status_code = resubmitted_response.status_code + flow.response.reason = resubmitted_response.reason + flow.response.content = resubmitted_response.text.encode('ascii') + + flow.response.headers = nheaders.Headers(**dict(resubmitted_response.headers)) + flow.response.cookies = dict(resubmitted_response.cookies) + + else: + # TODO: switch info to debug eventually + ctx.log.info("DigestAuthenticator other request: " + str(flow.response.status_code) + "\n\n", "info") + +addons = [ # pylint: disable=C0103 + DigestAuthenticator() +]