Checkpoint commit (pylint support)
[vern.git] / vern / digestauth.py
diff --git a/vern/digestauth.py b/vern/digestauth.py
new file mode 100644 (file)
index 0000000..1717579
--- /dev/null
@@ -0,0 +1,74 @@
+"""
+mitmproxy addon
+"""
+import getpass
+import re
+import typing
+import requests
+from requests.auth import HTTPDigestAuth
+from requests.utils import parse_dict_header
+from mitmproxy import ctx
+from mitmproxy.net.http import headers as nheaders
+
+
+class DigestAuthenticator:
+
+    def __init__(self):
+        self._username = None
+        self._password = None
+
+    def load(self, loader): # pylint: disable=no-self-use
+        loader.add_option(
+            name="uname",
+            typespec=typing.Text,
+            default='',
+            help="Please enter your user name"
+        )
+
+    def configure(self, updates):  # pylint: disable=unused-argument
+        if ctx.options.uname == '':
+            ctx.log.error("Please enter a non-empty value for uname with --set uname=your_username")
+            ctx.master.shutdown()
+        else:
+            self._username = ctx.options.uname
+            self._password = getpass.getpass("Please enter the password for " + self._username + ": ")
+
+    def response(self, flow):
+
+        if flow.response.status_code == 401 and 'Authorization' not in flow.request.headers:
+
+            www_authenticate_header = flow.response.headers['WWW-Authenticate']
+
+            if 'digest' in www_authenticate_header.lower():
+
+                http_digest_auth = HTTPDigestAuth(self._username, self._password)
+                http_digest_auth.init_per_thread_state()
+                pat = re.compile(r'digest ', flags=re.IGNORECASE)
+                http_digest_auth._thread_local.chal = parse_dict_header(pat.sub('', www_authenticate_header, count=1))  # pylint: disable=protected-access
+                authorization_header = http_digest_auth.build_digest_header(flow.request.method, flow.request.url)
+
+                resubmitted_headers = dict(flow.request.headers)
+                resubmitted_headers.update({'Authorization': str(authorization_header)})
+                if flow.request.method == 'GET':
+                    resubmitted_response = requests.get(flow.request.url, cookies=flow.request.cookies, headers=resubmitted_headers, allow_redirects=False)
+                else:
+                   # TODO:  This probably needs a data=payload arg...
+                    resubmitted_response = requests.post(flow.request.url, headers=resubmitted_headers)
+
+                if resubmitted_response.status_code == 301:
+                    ctx.log.info("301 response headers: " + str(resubmitted_response.headers))
+
+                flow.response.status_code = resubmitted_response.status_code
+                flow.response.reason = resubmitted_response.reason
+                flow.response.content = resubmitted_response.text.encode('ascii')
+
+                flow.response.headers = nheaders.Headers(**dict(resubmitted_response.headers))
+                flow.response.cookies = dict(resubmitted_response.cookies)
+
+        else:
+            # TODO: switch info to debug eventually
+            ctx.log.info("DigestAuthenticator other request: " + str(flow.response.status_code) + "\n\n", "info")
+
+addons = [  # pylint: disable=C0103
+    DigestAuthenticator()
+]