Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def o_encrypt(self, data):
a = base64.b64decode(data)
i = 16
s = max((len(a) - 2 * i) // 3, 0)
u = a[s: s + i]
a = a[0: s] + a[s + i:]
sec_key = xxhash.xxh64_hexdigest(u, 41405)
print(sec_key)
text = rc4(a, sec_key)
data = plistlib.loads(text, fmt=FMT_BINARY)
return data
def get_volname_hash(volname):
"""XXHash based on Volume name"""
return xxhash.xxh64_hexdigest(volname)
try:
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
except TypeError:
self.Fp[hashVal.uid] = self.Fp[hashVal.uid]()
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
except KeyError:
process_dir = self.STAGEDIR if self.mode == 'a' else self.STOREDIR
if Path(process_dir, f'{hashVal.uid}.hdf5').is_file():
file_pth = self.DATADIR.joinpath(f'{hashVal.uid}.hdf5')
self.rFp[hashVal.uid] = h5py.File(file_pth, 'r', swmr=True, libver='latest')
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
else:
raise
out = destArr.reshape(hashVal.shape)
if xxh64_hexdigest(out) != hashVal.checksum:
# try casting to check if dtype does not match for all zeros case
out = out.astype(np.typeDict[self.Fp[hashVal.uid]['/'].attrs['schema_dtype_num']])
if xxh64_hexdigest(out) != hashVal.checksum:
raise RuntimeError(
f'DATA CORRUPTION Checksum {xxh64_hexdigest(out)} != recorded {hashVal}')
return out
except TypeError:
self.Fp[hashVal.uid] = self.Fp[hashVal.uid]()
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
except KeyError:
process_dir = self.STAGEDIR if self.mode == 'a' else self.STOREDIR
if Path(process_dir, f'{hashVal.uid}.hdf5').is_file():
file_pth = self.DATADIR.joinpath(f'{hashVal.uid}.hdf5')
self.rFp[hashVal.uid] = h5py.File(file_pth, 'r', swmr=True, libver='latest')
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
else:
raise
if xxh64_hexdigest(destArr) != hashVal.checksum:
# try casting to check if dtype does not match for all zeros case
destArr = destArr.astype(np.typeDict[self.Fp[hashVal.uid]['/'].attrs['schema_dtype_num']])
if xxh64_hexdigest(destArr) != hashVal.checksum:
raise RuntimeError(
f'DATA CORRUPTION Checksum {xxh64_hexdigest(destArr)} != recorded {hashVal}')
return destArr
----------
array : np.ndarray
tensor to write to group.
remote_operation : optional, kwarg only, bool
If this is a remote process which is adding data, any necessary
hdf5 dataset files will be created in the remote data dir instead
of the stage directory. (default is False, which is for a regular
access process)
Returns
-------
bytes
string identifying the collection dataset and collection dim-0 index
which the array can be accessed at.
"""
checksum = xxh64_hexdigest(array)
if self.w_uid in self.wFp:
self.hIdx += 1
if self.hIdx >= self.hMaxSize:
self.hIdx = 0
self.hNextPath += 1
self.hColsRemain -= 1
if self.hColsRemain <= 1:
self.wFp[self.w_uid]['/'].attrs.modify('next_location', (self.hNextPath, self.hIdx))
self.wFp[self.w_uid]['/'].attrs.modify('collections_remaining', self.hColsRemain)
self.wFp[self.w_uid].flush()
self._create_schema(remote_operation=remote_operation)
else:
self._create_schema(remote_operation=remote_operation)
srcSlc = None
destSlc = (self.slcExpr[self.hIdx], self.slcExpr[0:array.size])
except KeyError:
process_dir = self.STAGEDIR if self.mode == 'a' else self.STOREDIR
if Path(process_dir, f'{hashVal.uid}.hdf5').is_file():
file_pth = self.DATADIR.joinpath(f'{hashVal.uid}.hdf5')
self.rFp[hashVal.uid] = h5py.File(file_pth, 'r', swmr=True, libver='latest')
destArr = self.Fp[hashVal.uid][dsetCol][srcSlc]
else:
raise
out = destArr.reshape(hashVal.shape)
if xxh64_hexdigest(out) != hashVal.checksum:
# try casting to check if dtype does not match for all zeros case
out = out.astype(np.typeDict[self.Fp[hashVal.uid]['/'].attrs['schema_dtype_num']])
if xxh64_hexdigest(out) != hashVal.checksum:
raise RuntimeError(
f'DATA CORRUPTION Checksum {xxh64_hexdigest(out)} != recorded {hashVal}')
return out
"""writes array data to disk in the numpy_00 fmtBackend
Parameters
----------
array : np.ndarray
tensor to write to disk
remote_operation : bool, optional, kwarg only
True if writing in a remote operation, otherwise False. Default is
False
Returns
-------
bytes
db hash record value specifying location information
"""
checksum = xxh64_hexdigest(array)
if self.w_uid in self.wFp:
self.hIdx += 1
if self.hIdx >= COLLECTION_SIZE:
self.wFp[self.w_uid].flush()
self._create_schema(remote_operation=remote_operation)
else:
self._create_schema(remote_operation=remote_operation)
destSlc = (self.slcExpr[self.hIdx], *(self.slcExpr[0:x] for x in array.shape))
self.wFp[self.w_uid][destSlc] = array
hashVal = numpy_10_encode(uid=self.w_uid,
cksum=checksum,
collection_idx=self.hIdx,
shape=array.shape)
return hashVal