Checkpoint commit (pylint support)
[vern.git] / vern / digestauth.py
1 """
2 mitmproxy addon
3 """
4 import getpass
5 import re
6 import typing
7 import requests
8 from requests.auth import HTTPDigestAuth
9 from requests.utils import parse_dict_header
10 from mitmproxy import ctx
11 from mitmproxy.net.http import headers as nheaders
12
13
14 class DigestAuthenticator:
15
16 def __init__(self):
17 self._username = None
18 self._password = None
19
20 def load(self, loader): # pylint: disable=no-self-use
21 loader.add_option(
22 name="uname",
23 typespec=typing.Text,
24 default='',
25 help="Please enter your user name"
26 )
27
28 def configure(self, updates): # pylint: disable=unused-argument
29 if ctx.options.uname == '':
30 ctx.log.error("Please enter a non-empty value for uname with --set uname=your_username")
31 ctx.master.shutdown()
32 else:
33 self._username = ctx.options.uname
34 self._password = getpass.getpass("Please enter the password for " + self._username + ": ")
35
36 def response(self, flow):
37
38 if flow.response.status_code == 401 and 'Authorization' not in flow.request.headers:
39
40 www_authenticate_header = flow.response.headers['WWW-Authenticate']
41
42 if 'digest' in www_authenticate_header.lower():
43
44 http_digest_auth = HTTPDigestAuth(self._username, self._password)
45 http_digest_auth.init_per_thread_state()
46 pat = re.compile(r'digest ', flags=re.IGNORECASE)
47 http_digest_auth._thread_local.chal = parse_dict_header(pat.sub('', www_authenticate_header, count=1)) # pylint: disable=protected-access
48 authorization_header = http_digest_auth.build_digest_header(flow.request.method, flow.request.url)
49
50 resubmitted_headers = dict(flow.request.headers)
51 resubmitted_headers.update({'Authorization': str(authorization_header)})
52 if flow.request.method == 'GET':
53 resubmitted_response = requests.get(flow.request.url, cookies=flow.request.cookies, headers=resubmitted_headers, allow_redirects=False)
54 else:
55 # TODO: This probably needs a data=payload arg...
56 resubmitted_response = requests.post(flow.request.url, headers=resubmitted_headers)
57
58 if resubmitted_response.status_code == 301:
59 ctx.log.info("301 response headers: " + str(resubmitted_response.headers))
60
61 flow.response.status_code = resubmitted_response.status_code
62 flow.response.reason = resubmitted_response.reason
63 flow.response.content = resubmitted_response.text.encode('ascii')
64
65 flow.response.headers = nheaders.Headers(**dict(resubmitted_response.headers))
66 flow.response.cookies = dict(resubmitted_response.cookies)
67
68 else:
69 # TODO: switch info to debug eventually
70 ctx.log.info("DigestAuthenticator other request: " + str(flow.response.status_code) + "\n\n", "info")
71
72 addons = [ # pylint: disable=C0103
73 DigestAuthenticator()
74 ]