Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['authorization'] = 'WSSE profile="UsernameToken"'
iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
password_digest = _wsse_username_token(cnonce, iso_now, self.credentials[1])
headers['X-WSSE'] = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
self.credentials[0],
password_digest,
cnonce,
iso_now)
class GoogleLoginAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
from urllib import urlencode
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
service = challenge['googlelogin'].get('service', 'xapi')
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
if service == 'xapi' and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
#elif request_uri.find("spreadsheets") > 0:
# service = "wise"
auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
lines = content.split('\n')
"""
return False
class BasicAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['authorization'] = 'Basic ' + base64.b64encode("%s:%s" % self.credentials).strip()
class DigestAuthentication(Authentication):
"""Only do qop='auth' and MD5, since that
is all Apache currently implements"""
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['digest']
qop = self.challenge.get('qop', 'auth')
self.challenge['qop'] = ('auth' in [x.strip() for x in qop.split()]) and 'auth' or None
if self.challenge['qop'] is None:
raise UnimplementedDigestAuthOptionError( _("Unsupported value for qop: %s." % qop))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'MD5').upper()
if self.challenge['algorithm'] != 'MD5':
raise UnimplementedDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
self.A1 = "".join([self.credentials[0], ":", self.challenge['realm'], ":", self.credentials[1]])
self.challenge['nc'] = 1
if 'authentication-info' not in response:
challenge = _parse_www_authenticate(response, 'www-authenticate').get('digest', {})
if 'true' == challenge.get('stale'):
self.challenge['nonce'] = challenge['nonce']
self.challenge['nc'] = 1
return True
else:
updated_challenge = _parse_www_authenticate(response, 'authentication-info').get('digest', {})
if 'nextnonce' in updated_challenge:
self.challenge['nonce'] = updated_challenge['nextnonce']
self.challenge['nc'] = 1
return False
class HmacDigestAuthentication(Authentication):
"""Adapted from Robert Sayre's code and DigestAuthentication above."""
__author__ = "Thomas Broyer (t.broyer@ltgt.net)"
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['Authorization'] = 'WSSE profile="UsernameToken"'
iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
password_digest = _wsse_username_token(cnonce, iso_now, self.credentials[1])
headers['X-WSSE'] = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
self.credentials[0],
password_digest,
cnonce,
iso_now)
class GoogleLoginAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
from urllib import urlencode
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response)
service = challenge['googlelogin'].get('service', 'xapi')
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
if service == 'xapi' and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
#elif request_uri.find("spreadsheets") > 0:
# service = "wise"
auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST",
if not response.has_key('authentication-info'):
challenge = _parse_www_authenticate(response, 'www-authenticate').get('digest', {})
if 'true' == challenge.get('stale'):
self.challenge['nonce'] = challenge['nonce']
self.challenge['nc'] = 1
return True
else:
updated_challenge = _parse_www_authenticate(response, 'authentication-info').get('digest', {})
if updated_challenge.has_key('nextnonce'):
self.challenge['nonce'] = updated_challenge['nextnonce']
self.challenge['nc'] = 1
return False
class HmacDigestAuthentication(Authentication):
"""Adapted from Robert Sayre's code and DigestAuthentication above."""
__author__ = "Thomas Broyer (t.broyer@ltgt.net)"
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
Authorization header. Over-rise this in sub-classes."""
pass
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried, for
example Digest may return stale=true.
"""
return False
class BasicAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['authorization'] = 'Basic ' + base64.b64encode("%s:%s" % self.credentials).strip()
class DigestAuthentication(Authentication):
"""Only do qop='auth' and MD5, since that
is all Apache currently implements"""
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['digest']
"""Modify the request headers to add the appropriate
Authorization header. Over-rise this in sub-classes."""
pass
def response(self, response, content):
"""Gives us a chance to update with new nonces
or such returned from the last authorized response.
Over-rise this in sub-classes if necessary.
Return TRUE is the request is to be retried, for
example Digest may return stale=true.
"""
return False
class BasicAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['authorization'] = 'Basic ' + base64.encodestring("%s:%s" % self.credentials).strip()
class DigestAuthentication(Authentication):
"""Only do qop='auth' and MD5, since that
is all Apache currently implements"""
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response)
if not response.has_key('authentication-info'):
challenge = _parse_www_authenticate(response, 'www-authenticate').get('digest', {})
if 'true' == challenge.get('stale'):
self.challenge['nonce'] = challenge['nonce']
self.challenge['nc'] = 1
return True
else:
updated_challenge = _parse_www_authenticate(response, 'authentication-info').get('digest', {})
if updated_challenge.has_key('nextnonce'):
self.challenge['nonce'] = updated_challenge['nextnonce']
self.challenge['nc'] = 1
return False
class HmacDigestAuthentication(Authentication):
"""Adapted from Robert Sayre's code and DigestAuthentication above."""
__author__ = "Thomas Broyer (t.broyer@ltgt.net)"
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
challenge = _parse_www_authenticate(response, 'www-authenticate')
self.challenge = challenge['hmacdigest']
# TODO: self.challenge['domain']
self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
if self.challenge['reason'] not in ['unauthorized', 'integrity']:
self.challenge['reason'] = 'unauthorized'
self.challenge['salt'] = self.challenge.get('salt', '')
if not self.challenge.get('snonce'):
raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
self.challenge['snonce'],
cnonce,
request_uri,
created,
request_digest,
keylist,
)
def response(self, response, content):
challenge = _parse_www_authenticate(response, 'www-authenticate').get('hmacdigest', {})
if challenge.get('reason') in ['integrity', 'stale']:
return True
return False
class WsseAuthentication(Authentication):
"""This is thinly tested and should not be relied upon.
At this time there isn't any third party server to test against.
Blogger and TypePad implemented this algorithm at one point
but Blogger has since switched to Basic over HTTPS and
TypePad has implemented it wrong, by never issuing a 401
challenge but instead requiring your client to telepathically know that
their endpoint is expecting WSSE profile="UsernameToken"."""
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
headers['Authorization'] = 'WSSE profile="UsernameToken"'
iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()