Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
job_array_regexp = CRegExp('#PBS\W+-J\W+[\w\d\-\$]+')
job_array_template = Unicode('')
def stop(self):
job_ids = self.job_id.split(";")
for job in job_ids:
subprocess.check_call("qdel %s" % job, shell=True)
def notify_start(self, data):
self.log.debug('Process %r started: %r', self.args[0], data)
self.start_data = data
self.state = 'running'
self.job_id = data
return data
class BcbioPBSPROEngineSetLauncher(PBSPROLauncher, launcher.BatchClusterAppMixin):
"""Launch Engines using PBSPro"""
batch_file_name = Unicode('pbspro_engines' + str(uuid.uuid4()),
config=True,
help="batch file name for the engine(s) job.")
tag = traitlets.Unicode("", config=True)
cores = traitlets.Integer(1, config=True)
mem = traitlets.Unicode("", config=True)
numengines = traitlets.Integer(1, config=True)
resources = traitlets.Unicode("", config=True)
default_template = Unicode(u"""#!/bin/sh
#PBS -V
#PBS -S /bin/sh
#PBS -N {tag}-e
{resources}
{exports}
self.context["account"] = self.account
self.context["timelimit"] = self.timelimit
self.context["cores"] = self.cores
if self.mem:
self.context["mem"] = "#SBATCH --mem=%s\n" % int(float(self.mem) * 1024.0)
else:
self.context["mem"] = "#SBATCH --mem=%d\n" % (4 * DEFAULT_MEM_PER_CPU)
self.context["tag"] = self.tag if self.tag else "bcbio"
self.context["account"] = ("#SBATCH -A %s\n" % self.account if self.account else "")
self.context["resources"] = "\n".join(["#SBATCH --%s\n" % r.strip()
for r in str(self.resources).split(";")
if r.strip()])
return super(BcbioSLURMControllerLauncher, self).start(1)
class BcbioOLDSLURMEngineSetLauncher(SLURMLauncher, launcher.BatchClusterAppMixin):
"""Launch engines using SLURM for version < 2.6"""
machines = traitlets.Integer(1, config=True)
account = traitlets.Unicode("", config=True)
timelimit = traitlets.Unicode("", config=True)
batch_file_name = Unicode("SLURM_engines" + str(uuid.uuid4()),
config=True, help="batch file name for the engine(s) job.")
default_template = Unicode(u"""#!/bin/sh
#SBATCH -A {account}
#SBATCH --job-name ipengine
#SBATCH -N {machines}
#SBATCH -t {timelimit}
export IPYTHONDIR={profile_dir}
srun -N {machines} -n {n} %s %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, engine_cmd_argv)),
' '.join(timeout_params)))
"""Launch a controller using SGE."""
batch_file_name = Unicode(u'sge_controller', config=True,
help="batch file name for the ipontroller job.")
default_template = Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
"""Start the controller by profile or profile_dir."""
return super(SGEControllerLauncher, self).start(1)
class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch Engines with SGE"""
batch_file_name = Unicode(u'sge_engines', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode("""#$ -V
#$ -S /bin/sh
#$ -N ipengine
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
# LSF launchers
class LSFLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for LSF."""
submit_command = List(['bsub'], config=True,
"""Inserts a job array if required into the batch script.
"""
if not self.job_array_regexp.search(self.batch_template):
self.log.debug("adding job array settings to batch script")
#HTCondor requires that the job array goes at the bottom of the script
self.batch_template = '\n'.join([self.batch_template,
self.job_array_template])
def _insert_options_in_script(self):
"""AFAIK, HTCondor doesn't have a concept of multiple queues that can be
specified in the script.
"""
pass
class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
"""Launch a controller using HTCondor."""
batch_file_name = Unicode(u'htcondor_controller', config=True,
help="batch file name for the controller job.")
default_template = Unicode(r"""
universe = vanilla
executable = ipcontroller
# by default we expect a shared file system
transfer_executable = False
arguments = '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
""")
def start(self):
"""Start the controller by profile or profile_dir."""
return super(HTCondorControllerLauncher, self).start(1)
bsub < script
"""
# Here we save profile_dir in the context so they
# can be used in the batch script template as {profile_dir}
self.write_batch_script(n)
piped_cmd = self.args[0] + '<\"' + self.args[1] + '\"'
self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
p = Popen(piped_cmd, shell=True, env=os.environ, stdout=PIPE)
output, err = p.communicate()
output = output.decode(DEFAULT_ENCODING, 'replace')
job_id = self.parse_job_id(output)
self.notify_start(job_id)
return job_id
class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
"""Launch a controller using LSF."""
batch_file_name = Unicode(u'lsf_controller', config=True,
help="batch file name for the controller job.")
default_template = Unicode("""#!/bin/sh
#BSUB -J ipcontroller
#BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
"""Start the controller by profile or profile_dir."""
return super(LSFControllerLauncher, self).start(1)
self.context["exports"] = _local_environment_exports()
self.write_batch_script(n)
job_ids = []
for i in range(n):
output = subprocess.check_output("qsub < %s" % self.batch_file_name,
shell=True, universal_newlines=True)
if six.PY3:
if not isinstance(output, str):
output = output.decode('ascii', 'ignore')
job_ids.append(output.strip())
job_id = ";".join(job_ids)
self.notify_start(job_id)
return job_id
class BcbioPBSPROControllerLauncher(PBSPROLauncher, launcher.BatchClusterAppMixin):
"""Launch a controller using PBSPro."""
batch_file_name = Unicode("pbspro_controller" + str(uuid.uuid4()),
config=True,
help="batch file name for the controller job.")
tag = traitlets.Unicode("", config=True)
cores = traitlets.Integer(1, config=True)
mem = traitlets.Unicode("", config=True)
resources = traitlets.Unicode("", config=True)
default_template = Unicode("""#!/bin/sh
#PBS -V
#PBS -S /bin/sh
#PBS -N {tag}-c
{resources}
{exports}
cd $PBS_O_WORKDIR
class SlurmControllerLauncher(SlurmLauncher, BatchClusterAppMixin):
"""Launch a controller using Slurm."""
batch_file_name = Unicode(u'slurm_controller.sbatch', config=True,
help="batch file name for the controller job.")
default_template = Unicode("""#!/bin/sh
#SBATCH --job-name=ipy-controller-{cluster_id}
#SBATCH --ntasks=1
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
"""Start the controller by profile or profile_dir."""
return super(SlurmControllerLauncher, self).start(1)
class SlurmEngineSetLauncher(SlurmLauncher, BatchClusterAppMixin):
"""Launch Engines using Slurm"""
batch_file_name = Unicode(u'slurm_engine.sbatch', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode(u"""#!/bin/sh
#SBATCH --job-name=ipy-engine-{cluster_id}
srun %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, ipengine_cmd_argv))))
#SGE is very similar to PBS
class SGELauncher(PBSLauncher):
"""Sun GridEngine is a PBS clone with slightly different syntax"""
job_array_regexp = CRegExp(r'#\$\W+\-t')
job_array_template = Unicode('#$ -t 1-{n}')
queue_regexp = CRegExp(r'#\$\W+-q\W+\$?\w+')
queue_template = Unicode('#$ -q {queue}')
batch_file_name = Unicode(u'htcondor_controller', config=True,
help="batch file name for the controller job.")
default_template = Unicode(r"""
universe = vanilla
executable = ipcontroller
# by default we expect a shared file system
transfer_executable = False
arguments = '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
""")
def start(self):
"""Start the controller by profile or profile_dir."""
return super(HTCondorControllerLauncher, self).start(1)
class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
"""Launch Engines using HTCondor"""
batch_file_name = Unicode(u'htcondor_engines', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode("""
universe = vanilla
executable = ipengine
# by default we expect a shared file system
transfer_executable = False
arguments = " '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
""")
#-----------------------------------------------------------------------------
# A launcher for ipcluster itself!
#-----------------------------------------------------------------------------
"""Launch a controller using PBS."""
batch_file_name = Unicode(u'pbs_controller', config=True,
help="batch file name for the controller job.")
default_template = Unicode("""#!/bin/sh
#PBS -V
#PBS -N ipcontroller
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
"""Start the controller by profile or profile_dir."""
return super(PBSControllerLauncher, self).start(1)
class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
"""Launch Engines using PBS"""
batch_file_name = Unicode(u'pbs_engines', config=True,
help="batch file name for the engine(s) job.")
default_template = Unicode(u"""#!/bin/sh
#PBS -V
#PBS -N ipengine
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, ipengine_cmd_argv))))
#Slurm is very similar to PBS
class SlurmLauncher(BatchSystemLauncher):
"""A BatchSystemLauncher subclass for slurm."""
submit_command = List(['sbatch'], config=True,
help="The slurm submit command ['sbatch']")
default_template = Unicode(u"""#!/bin/sh
#SBATCH --job-name=ipy-engine-{cluster_id}
srun %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
""" % (' '.join(map(pipes.quote, ipengine_cmd_argv))))
#SGE is very similar to PBS
class SGELauncher(PBSLauncher):
"""Sun GridEngine is a PBS clone with slightly different syntax"""
job_array_regexp = CRegExp(r'#\$\W+\-t')
job_array_template = Unicode('#$ -t 1-{n}')
queue_regexp = CRegExp(r'#\$\W+-q\W+\$?\w+')
queue_template = Unicode('#$ -q {queue}')
class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
"""Launch a controller using SGE."""
batch_file_name = Unicode(u'sge_controller', config=True,
help="batch file name for the ipontroller job.")
default_template = Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
"""Start the controller by profile or profile_dir."""
return super(SGEControllerLauncher, self).start(1)
class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):