X-Git-Url: https://pwan.org/git/?p=vern.git;a=blobdiff_plain;f=digestauth.py;fp=digestauth.py;h=0000000000000000000000000000000000000000;hp=9c86fb03ad66d128ee954199911147a4dbf93d63;hb=4bb49e3d5b865935e7b16d1d69e59b02c068b95a;hpb=bb462271117f60af0bfbcdebd2809d77f15ba6a1 diff --git a/digestauth.py b/digestauth.py deleted file mode 100644 index 9c86fb0..0000000 --- a/digestauth.py +++ /dev/null @@ -1,79 +0,0 @@ - -from mitmproxy import ctx, exceptions -from requests.auth import HTTPDigestAuth -from requests.utils import parse_dict_header - -import re -import threading -import requests -import typing -import getpass - - -class DigestAuthenticator: - - def __init__(self): - self.lock = threading.Lock() - self.resubmitted_response = None - self._username = None - self._password = None - - def load(self, loader): - ctx.log.info('in load') - loader.add_option( - name = "uname", - typespec = typing.Text, - default = '', - help = "Please enter your user name" - ) - - def configure(self, updates): - ctx.log.info('updates: ' + str(updates)) - ctx.log.info('in configue: ' + str(ctx.options)) - if ctx.options.uname == '': - raise exceptions.OptionsError("Please enter a non-empty value for uname with --set uname=your_username") - else: - self._username = ctx.options.uname - self._password = getpass.getpass("Please enter the password for " + self._username + ": ") - - - def response(self, flow): - - if flow.response.status_code == 401 and 'Authorization' not in flow.request.headers: - - www_authenticate_header = flow.response.headers['WWW-Authenticate'] - - if 'digest' in www_authenticate_header.lower(): - - http_digest_auth = HTTPDigestAuth(self._username, self._password) - http_digest_auth.init_per_thread_state() - pat = re.compile(r'digest ', flags=re.IGNORECASE) - http_digest_auth._thread_local.chal = parse_dict_header(pat.sub('', www_authenticate_header, count=1)) - authorization_header = http_digest_auth.build_digest_header(flow.request.method, flow.request.url) - - resubmitted_headers = {} - resubmitted_headers.update(flow.request.headers) - resubmitted_headers.update({'Authorization': str(authorization_header)}) - if flow.request.method == 'GET': - resubmitted_response = requests.get(flow.request.url, headers=resubmitted_headers, allow_redirects=False) - else: - # TODO: This probably needs a data=payload arg... - resubmitted_response = requests.post(flow.request.url, headers=resubmitted_headers) - - if resubmitted_response.status_code == 301: - ctx.log.info("301 response headers: " + str(resubmitted_response.headers)) - - flow.response.status_code = resubmitted_response.status_code - flow.response.reason = resubmitted_response.reason - flow.response.content = resubmitted_response.text.encode('ascii') - - # TODO: additional response fields (headers and cookies) - - else: - # TODO: switch info to debug eventually - ctx.log("DigestAuthenticator other request: " + str(flow.response.status_code) + "\n\n", "info") - - -addons = [ - DigestAuthenticator() -]