Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multi(self):
stream = StringIO()
gramex.debug.print(a=True, b=1, lst=[1, 2], string='abc', stream=stream) # noqa
line = line_no() - 1
val = stream.getvalue().replace('.pyc', '.py')
ok_(val.startswith('\n'))
ok_(val.endswith('\n'))
lines = {line for line in stream.getvalue().splitlines()}
self.assertIn('{}({:d}).test_multi:'.format(testfile, line), lines)
self.assertIn(" .. a = True", lines)
self.assertIn(" .. b = 1", lines)
self.assertIn(" .. lst = [1, 2]", lines)
if six.PY2:
self.assertIn(" .. string = u'abc'", lines)
else:
self.assertIn(" .. string = 'abc'", lines)
def test_parse_error(self):
self.assertRaisesMessage(
TypeError, "data must be dict, not %s." % ('unicode' if six.PY2 else 'str'),
Item.parse, {'a': ['b', {}]}, Meta(), Loader('.', '.')
)
def move_to(self, file_obj=None, msg=None):
"""
Finds the first line with msg and moves read file pointer to that line.
:param file_obj: file object from which we read
:param msg: keyword to find
"""
if not six.PY2:
msg = bytes(msg, "utf8")
while not self.file_end(file_obj=file_obj):
pos = file_obj.tell()
line = file_obj.readline()
if line.strip() and msg in line:
file_obj.seek(pos)
break
def _read_data(self, spider, request):
key = request_util.fingerprint(request)
data = yield self._coll.find_one({'fingerprint': key})
if not data:
if six.PY2:
returnValue()
else:
return
if 0 < self.expiration_secs < (datetime.datetime.now() - data['update_time']).seconds:
yield self._coll.delete_one({'fingerprint': key})
if six.PY2:
returnValue()
else:
return
if six.PY2:
returnValue(data)
else:
return data
self._logger.debug('Keystone Reply: Status: %d, Output: %s',
resp.status_code, resp.content)
try:
identity_info = resp.json()
token_id = str(identity_info['access']['token']['id'])
tenant = identity_info['access']['token']['tenant']
except (ValueError, KeyError):
error = 'Error on keystone reply: %d %s'
self._logger.debug(error, resp.status_code, resp.content)
return self._deny_request('InvalidURI')(environ, start_response)
req.headers['X-Auth-Token'] = token_id
tenant_to_connect = force_tenant or tenant['id']
if six.PY2 and isinstance(tenant_to_connect, six.text_type):
tenant_to_connect = tenant_to_connect.encode('utf-8')
self._logger.debug('Connecting with tenant: %s', tenant_to_connect)
new_tenant_name = '%s%s' % (self._reseller_prefix, tenant_to_connect)
environ['PATH_INFO'] = environ['PATH_INFO'].replace(account,
new_tenant_name)
return self._app(environ, start_response)
print("Got event, publication ID {}, publisher {}: {}".format(details.publication, details.publisher, i))
self.received += 1
if self.received > 5:
self.leave()
await self.subscribe(on_event, u'com.myapp.topic1',
options=SubscribeOptions(details_arg='details'))
def onDisconnect(self):
asyncio.get_event_loop().stop()
if __name__ == '__main__':
import six
url = environ.get("AUTOBAHN_DEMO_ROUTER", u"ws://127.0.0.1:8080/ws")
if six.PY2 and type(url) == six.binary_type:
url = url.decode('utf8')
realm = u"crossbardemo"
runner = ApplicationRunner(url, realm)
runner.run(Component)
- require the Py3 subprocess backport `subprocess32` on Python2,
- in-toto namespace subprocess constants (DEVNULL, PIPE) and
- provide a custom `subprocess.run` wrapper
- provide a special `run_duplicate_streams` function
"""
import os
import sys
import io
import tempfile
import logging
import time
import shlex
import six
if six.PY2:
import subprocess32 as subprocess # pragma: no cover pylint: disable=import-error
else: # pragma: no cover
import subprocess
import in_toto.formats as formats
# Constants.
from in_toto.settings import SUBPROCESS_TIMEOUT
DEVNULL = subprocess.DEVNULL
PIPE = subprocess.PIPE
# Inherits from in_toto base logger (c.f. in_toto.log)
LOG = logging.getLogger(__name__)
if isinstance(infile, (list, tuple)):
infile[0] = newline
else:
infile = newline
# UTF-8
if isinstance(infile, six.text_type):
return infile.splitlines(True)
elif isinstance(infile, six.binary_type):
return infile.decode('utf-8').splitlines(True)
else:
return self._decode(infile, 'utf-8')
# UTF16 - have to decode
return self._decode(infile, encoding)
if six.PY2 and isinstance(line, str):
# don't actually do any decoding, since we're on python 2 and
# returning a bytestring is fine
return self._decode(infile, None)
# No BOM discovered and no encoding specified, default to UTF-8
if isinstance(infile, six.binary_type):
return infile.decode('utf-8').splitlines(True)
else:
return self._decode(infile, 'utf-8')
def _get_true_argv():
"""Retrieves all real arguments of the python interpreter
...even those not listed in ``sys.argv``
:seealso: http://stackoverflow.com/a/28338254/595220
:seealso: http://stackoverflow.com/a/6683222/595220
:seealso: http://stackoverflow.com/a/28414807/595220
"""
try:
char_p = ctypes.c_char_p if six.PY2 else ctypes.c_wchar_p
argv = ctypes.POINTER(char_p)()
argc = ctypes.c_int()
ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv))
except AttributeError:
"""It looks Py_GetArgcArgv is completely absent in MS Windows
:seealso: https://github.com/cherrypy/cherrypy/issues/1506
:ref: https://chromium.googlesource.com/infra/infra/+/69eb0279c12bcede5937ce9298020dd4581e38dd%5E!/
"""
raise NotImplementedError
else:
return argv[:argc.value]
def set_text(self, value, base64encode=False):
def _wrong_type_value(xsd, value):
msg = _str('Type and value do not match: {xsd}:{type}:{value}')
msg = msg.format(xsd=xsd, type=type(value), value=value)
raise ValueError(msg)
# only work with six.string_types
_str = unicode if six.PY2 else str
if isinstance(value, six.binary_type):
value = value.decode('utf-8')
type_to_xsd = {
_str: 'string',
int: 'integer',
float: 'float',
bool: 'boolean',
type(None): '',
}
# entries of xsd-types each declaring:
# - a corresponding python type
# - a function to turn a string into that type
# - a function to turn that type into a text-value
xsd_types_props = {