Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
job.tag()
# running jobs are not errors either.
else:
is_run = getattr(extract, 'is_running', False)
if is_run:
job.tag()
qstuff = self.qstat(name)
is_inqueue = len(qstuff) > 0
if is_inqueue:
job.tag()
# what's left is an error.
else:
job.untag()
logger.debug('ipython/explore errors: dir: %s is_run: %s' % (directory, is_run))
# Look only for jobs which are successfull.
if args.type == "results":
if interactive.jobfolder_path is None:
print("No known path/file for current job-folder.\n"\
"Please save to file first.")
return
directory = dirname(interactive.jobfolder_path)
for name, job in interactive.jobfolder.items():
if not job.functional.Extract(join(directory, name)).success:
job.tag()
else:
job.untag()
# Look only for jobs which are running (and can be determined as such).
elif args.type == "running":
# ['checkjob', str(job.jobNumber)],
# shell=False,
# cwd=wkDir,
# stdin=subprocess.PIPE,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# bufsize=10*1000*1000)
# (stdout, stderr) = proc.communicate()
# parse stdout to get status. May be 'not found'.
# if idle or active: job.untag()
# else: job.tag()
job.untag()
else:
job.tag()
logger.debug('ipython/explore running: dir: %s is_run: %s' % (directory, is_run))
# All jobs without restriction.
elif args.type == "all":
if interactive.jobfolder_path is None:
return
for job in interactive.jobfolder.values():
job.untag()