Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_logging_failing_explore_sql(tmpdir, sql_error):
sql_error.metadata["dimension"] = None
expected_directory = Path(tmpdir) / "queries"
expected_directory.mkdir(exist_ok=True)
log_sql_error(
sql_error.model,
sql_error.explore,
sql_error.test,
tmpdir,
sql_error.metadata["dimension"],
)
expected_path = expected_directory / "eye_exam__users.sql"
assert Path.exists(expected_path)
with expected_path.open("r") as file:
content = file.read()
assert content == "SELECT age FROM users WHERE 1=2 LIMIT 1"
mock_resp.return_value.ok = True
with mock.patch('agent.logger'), \
mock.patch('agent.can_read_cert') as cr, \
mock.patch('requests.request') as req, \
mock.patch('os.chmod') as chm, \
mock.patch('os.chown') as chw:
cr.return_value = True
req.return_value = mock_resp
mock_resp.return_value.ok = True
agent.fetch_credentials(False)
assert Path.exists(tmpdir / user / 'name1.json')
assert Path.exists(tmpdir / user / 'name2.json')
assert Path.exists(tmpdir / 'name2.json')
assert Path.exists(json3_path) is False
pi_dir_path = str(tmpdir / user)
pi_name1_path = str(tmpdir / user / 'name1.json')
pi_name2_path = str(tmpdir / user / 'name2.json')
rt_name2_path = str(tmpdir / 'name2.json')
with open(pi_name1_path) as f:
assert json.load(f) == {"key1": "v1"}
with open(pi_name2_path) as f:
assert json.load(f) == {"key1": "v21", "key2": "v22"}
with open(rt_name2_path) as f:
assert json.load(f) == {"key3": "v23"}
chm.assert_has_calls([
mock.call(pi_name1_path, 0o400),
mock.call(pi_name2_path, 0o400),
mock.call(rt_name2_path, 0o400),
mock_resp.return_value.ok = True
with mock.patch('agent.logger'), \
mock.patch('agent.can_read_cert') as cr, \
mock.patch('requests.request') as req, \
mock.patch('os.chmod') as chm, \
mock.patch('os.chown') as chw:
cr.return_value = True
req.return_value = mock_resp
mock_resp.return_value.ok = True
agent.fetch_credentials(False)
assert Path.exists(tmpdir / user / 'name1.json')
assert Path.exists(tmpdir / user / 'name2.json')
assert Path.exists(tmpdir / 'name2.json')
assert Path.exists(json3_path) is False
pi_dir_path = str(tmpdir / user)
pi_name1_path = str(tmpdir / user / 'name1.json')
pi_name2_path = str(tmpdir / user / 'name2.json')
rt_name2_path = str(tmpdir / 'name2.json')
with open(pi_name1_path) as f:
assert json.load(f) == {"key1": "v1"}
with open(pi_name2_path) as f:
assert json.load(f) == {"key1": "v21", "key2": "v22"}
with open(rt_name2_path) as f:
assert json.load(f) == {"key3": "v23"}
chm.assert_has_calls([
mock.call(pi_name1_path, 0o400),
mock.call(pi_name2_path, 0o400),
mock.call(rt_name2_path, 0o400),
def test_save_image():
""" Tests save_image to create image"""
fake_path = "fake_images/Fakeimage.jpg"
fake_image = b"FakeFakeFake"
bot.save_image(path=fake_path, image=fake_image)
assert Path.exists(Path(fake_path)), "Fake image wasn't created"
# Removing test path & image
Path.unlink(Path(fake_path))
Path.rmdir(Path(fake_path).parent)
def test_create_image_folder():
""" Tests create_image_folder() to create directory """
fake_path = "fake_images/"
bot.create_image_folder(path=fake_path)
assert Path.exists(Path(fake_path)), "Couldn't find image directory"
# Removing test path
Path.rmdir(Path(fake_path))
def validate_file_list_values(fileList, no_of_epochs):
"""
Args:
fileList:
no_of_epochs:
Returns:
"""
if fileList is None:
raise ValueError("No value supplied for input file list: " + str(fileList))
for file_path_str in fileList.read_text().split("\n"):
# ignore empty lines in file
if len(file_path_str) > 1:
if not pathlib.Path.exists(pathlib.Path(file_path_str)):
raise ValueError("Give file name: " + str(file_path_str)+" does not exist.")
else:
matches = re.findall("(\d{8})", file_path_str)
if len(matches) < no_of_epochs:
matches = re.findall("(\d{6})", file_path_str)
if len(matches) < no_of_epochs:
raise ValueError(
"For the given file name: "
+ str(file_path_str)
+ " the number of epochs in file names are less the required number:"
+ str(no_of_epochs)
)
- Example:
.. code-block:: python
# This will use the information in the login file
ws = WS()
# This will use the token passed
token = {"host": "opencga_host", "user": "username", "sid": "XXXXXXXXXXXXXX"}
ws = WS(token=token, version="v2", instance="other_instance")
"""
home = Path(os.getenv("HOME"))
if token is None:
opencga_dir = home.joinpath(".opencga", "openCGA.json")
if not Path.exists(opencga_dir):
raise LoginException()
fd = open(opencga_dir.as_posix())
session = json.load(fd)
else:
session = token
self.session_id = session["sid"]
self.host = session["host"]
self.debug_path = home.joinpath(".opencga", "pyCGA.log").as_posix()
if "debug" in session:
self.debug = session["debug"]
else:
self.debug = False
if "instance" in session:
>>> from gen_phy_metrics import gen_metrics
>>> e_spks.ks2_to_alf(ks_dir_full_path, alf_dir_full_path)
>>> gen_metrics(alf_dir, ks_dir)
2) Generate metrics from an alf directory and metrics that require an ephys_file_path. For phy,
the ephys file should be in `ks_dir`.
>>> from gen_phy_metrics import gen_metrics
>>> gen_metrics(alf_dir, ks_dir, ephys_file_path=ks_dir)
"""
# Setup #
# ----- #
# Extract alf objects from `alf_dir` and get units info
alf_dir = Path(alf_dir)
if not (Path.exists(alf_dir)):
raise FileNotFoundError('The given alf directory {} does not exist!'.format(alf_dir))
spks_b = aio.load_object(alf_dir, 'spikes')
clstrs_b = aio.load_object(alf_dir, 'clusters')
units_b = bb.processing.get_units_bunch(spks_b)
units = list(units_b.amps.keys())
n_units = np.max(spks_b.clusters) + 1
# Initialize metrics
cum_amp_drift = np.full((n_units,), np.nan)
cum_depth_drift = np.full((n_units,), np.nan)
cv_amp = np.full((n_units,), np.nan)
cv_fr = np.full((n_units,), np.nan)
frac_isi_viol = np.full((n_units,), np.nan)
fn_est = np.full((n_units,), np.nan)
fp_est = np.full((n_units,), np.nan)
def ensure_dir_exists_for_file(file_name):
output_parent_dir = file_name.parent
if not Path.exists(output_parent_dir):
Path.mkdir(output_parent_dir)
def endFile(self):
if self.errFile:
self.errFile.flush()
if self.warnFile:
self.warnFile.flush()
if self.diagFile:
self.diagFile.flush()
self.outFile.flush()
if self.outFile != sys.stdout and self.outFile != sys.stderr:
self.outFile.close()
if self.genOpts.filename is not None:
if sys.platform == 'win32':
directory = Path(self.genOpts.directory)
if not Path.exists(directory):
os.makedirs(directory)
shutil.move(self.outFile.name, self.genOpts.directory + '/' + self.genOpts.filename)
self.genOpts = None