Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def encode(cls, text):
length = len(text)
padding_count = cls.block_size - length % cls.block_size
if padding_count == 0:
padding_count = cls.block_size
padding = to_binary(chr(padding_count))
return text + padding * padding_count
def __base64_decode(self, text):
return to_text(base64.b64decode(to_binary(text)))
def __repr__(self):
_repr = "{klass}({msg})".format(
klass=self.__class__.__name__,
msg=repr(self._data)
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
def _decrypt(self, text, _id, exception=None):
text = to_binary(text)
plain_text = self.cipher.decrypt(base64.b64decode(text))
padding = byte2int(plain_text[-1])
content = plain_text[16:-padding]
xml_length = socket.ntohl(struct.unpack(b'I', content[:4])[0])
xml_content = to_text(content[4:xml_length + 4])
from_id = to_text(content[xml_length + 4:])
if from_id != _id:
exception = exception or Exception
raise exception()
return xml_content
def encode(cls, text):
length = len(text)
padding_count = cls.block_size - length % cls.block_size
if padding_count == 0:
padding_count = cls.block_size
padding = to_binary(chr(padding_count))
return text + padding * padding_count
def __str__(self):
_repr = 'Error code: {code}, message: {msg}'.format(
code=self.errcode,
msg=self.errmsg
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
def _encrypt(self, text, _id):
text = to_binary(text)
tmp_list = []
tmp_list.append(to_binary(self.get_random_string()))
length = struct.pack(b'I', socket.htonl(len(text)))
tmp_list.append(length)
tmp_list.append(text)
tmp_list.append(to_binary(_id))
text = b''.join(tmp_list)
text = PKCS7Encoder.encode(text)
ciphertext = to_binary(self.cipher.encrypt(text))
return base64.b64encode(ciphertext)
def __repr__(self):
_repr = "{klass}({msg})".format(
klass=self.__class__.__name__,
msg=repr(self._data)
)
if six.PY2:
return to_binary(_repr)
else:
return to_text(_repr)
http_client = AsyncHTTPClient()
if not url_or_endpoint.startswith(('http://', 'https://')):
api_base_url = kwargs.pop('api_base_url', self.API_BASE_URL)
url = '{base}{endpoint}'.format(
base=api_base_url,
endpoint=url_or_endpoint
)
else:
url = url_or_endpoint
headers = {}
params = kwargs.pop('params', {})
if 'access_token' not in params:
params['access_token'] = self.access_token
params = urlencode(dict((k, to_binary(v)) for k, v in params.items()))
url = '{0}?{1}'.format(url, params)
data = kwargs.get('data', {})
files = kwargs.get('files')
if files:
from requests.models import RequestEncodingMixin
from requests.utils import super_len
body, content_type = RequestEncodingMixin._encode_files(
files,
data
)
headers['Content-Type'] = content_type
headers['Content-Length'] = super_len(body)
else:
if isinstance(data, dict):
def _decrypt(self, text, _id, exception=None):
text = to_binary(text)
plain_text = self.cipher.decrypt(base64.b64decode(text))
padding = byte2int(plain_text, -1)
content = plain_text[16:-padding]
xml_length = socket.ntohl(struct.unpack(b'I', content[:4])[0])
xml_content = to_text(content[4:xml_length + 4])
from_id = to_text(content[xml_length + 4:])
if from_id != _id:
exception = exception or Exception
raise exception()
return xml_content