Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'/random/url',
tenant_id,
token)
self.assertIsNone(invalid_cached_data)
# Test: Redis Client tosses exception
def redis_toss_exception(*args, **kwargs):
raise Exception('mock redis exception')
redis_exception_result = auth._retrieve_data_from_cache(
redis_toss_exception, url, tenant_id, token)
self.assertEqual(redis_exception_result, None)
# msgpack error
with mock.patch('eom.auth.__unpacker') as MockMsgPacker:
MockMsgPacker.side_effect = msgpack.exceptions.UnpackException(
'mock')
msgpack_error = auth._retrieve_data_from_cache(redis_client,
url,
tenant_id, token)
self.assertIsNone(msgpack_error)
# Test: Happy case V2 data
happy_v2_result = auth._retrieve_data_from_cache(redis_client,
url,
tenant_id,
token)
self.assertEqual(happy_v2_result, data)
if args["URL"] == '-':
urls = os.sys.stdin.read().splitlines()
else:
urls = [ args["URL"] ]
d_file = args["--file"] or os.environ["HOME"] + "/.bookmarks"
try:
if not os.path.exists(d_file):
print('The file "' + d_file + '" does not exists: creating it.',
file=os.sys.stderr)
open(d_file, "a+").close()
database = msgpack.load(open(d_file, "rb"), encoding="utf-8")
except msgpack.exceptions.UnpackValueError:
database = {}
except PermissionError as e:
os.sys.exit(e)
tags = args["TAG"]
if not args["--no-path-subs"] and urls != [ None ]:
lst = []
for url in urls:
url = str(url)
if os.path.exists(url):
lst.append(os.path.abspath(url))
else:
lst.append(url)
def pack_load_local_registry(name, registryf=None):
value = {}
regpath = get_registry_path(name,
registryf=registryf,
registry_format='pack')
try:
if os.path.exists(regpath):
with open(regpath) as fic:
rvalue = fic.read()
value = __salt__['mc_utils.msgpack_load'](
rvalue)
except (msgpack.exceptions.UnpackValueError,
msgpack.exceptions.ExtraData):
log.error('decoding error, removing stale {0}'.format(regpath))
os.unlink(regpath)
value = {}
return value
Returns:
list: List of tuples containing the item size and the unpacked item.
'''
self.unpk.feed(byts)
retn = []
while True:
try:
item = self.unpk.unpack()
tell = self.unpk.tell()
retn.append((tell - self.size, item))
self.size = tell
except msgpack.exceptions.OutOfData:
break
return retn
def _decode(filep):
'''decode in a file using msgpack backend'''
value = None
try:
if os.path.exists(filep):
with open(filep) as fic:
rvalue = fic.read()
value = msgpack.unpackb(rvalue)['value']
except msgpack.exceptions.UnpackValueError:
log.error('decoding error, removing stale {0}'.format(filep))
os.unlink(filep)
value = None
return value
def pack_load_local_registry(name, registryf=None):
value = {}
regpath = get_registry_path(name,
registryf=registryf,
registry_format='pack')
try:
if os.path.exists(regpath):
with open(regpath) as fic:
rvalue = fic.read()
value = __salt__['mc_utils.msgpack_load'](
rvalue)
except (msgpack.exceptions.UnpackValueError,
msgpack.exceptions.ExtraData):
log.error('decoding error, removing stale {0}'.format(regpath))
os.unlink(regpath)
value = {}
return value