Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
proc = SshProcess("jps", self.master)
proc.run()
ids_to_kill = []
for line in proc.stdout.splitlines():
field = line.split()
if field[1] in hive_processes:
ids_to_kill.append(field[0])
if ids_to_kill:
force_kill = True
ids_to_kill_str = ""
for pid in ids_to_kill:
ids_to_kill_str += " " + pid
proc = SshProcess("kill -9" + ids_to_kill_str, h)
proc.run()
if force_kill:
logger.info(
"Processes from previous hadoop deployments had to be killed")
self.clean_logs()
def __force_clean(self):
"""Stop previous Spark processes (if any) and remove all remote files
created by it."""
spark_processes = [
"Master",
"Worker"
]
force_kill = False
for h in self.hosts:
proc = SshProcess("jps", h)
proc.run()
ids_to_kill = []
for line in proc.stdout.splitlines():
field = line.split()
if field[1] in spark_processes:
ids_to_kill.append(field[0])
if ids_to_kill:
force_kill = True
ids_to_kill_str = ""
for pid in ids_to_kill:
ids_to_kill_str += " " + pid
logger.warn(
"Killing running Spark processes in host %s" %
def start_dfs_and_wait(self):
"""Start the NameNode and DataNodes and wait for exiting safemode."""
self._check_initialization()
self.start_dfs()
logger.info("Waiting for safe mode to be off")
proc = SshProcess(self.bin_dir + "/hadoop dfsadmin -safemode wait",
self.master)
proc.run()
if not proc.finished_ok:
logger.warn("Error while starting HDFS")
else:
self.running_dfs = True
if self.running_map_reduce:
self.running = True
for line in proc.stdout.splitlines():
field = line.split()
if field[1] in spark_processes:
ids_to_kill.append(field[0])
if ids_to_kill:
force_kill = True
ids_to_kill_str = ""
for pid in ids_to_kill:
ids_to_kill_str += " " + pid
logger.warn(
"Killing running Spark processes in host %s" %
style.host(h.address.split('.')[0]))
proc = SshProcess("kill -9" + ids_to_kill_str, h)
proc.run()
if force_kill:
logger.info(
"Processes from previous hadoop deployments had to be killed")
self.clean_logs()
def format_dfs(self):
"""Format the distributed filesystem."""
logger.info("Formatting HDFS")
proc = SshProcess(self.bin_dir + "/hadoop namenode -format",
self.master)
proc.run()
if proc.finished_ok:
logger.info("HDFS formatted successfully")
else:
logger.warn("Error while formatting HDFS")
def stop_spark(self):
"""Stop Spark processes."""
logger.info("Stopping Spark")
if self.mode == STANDALONE_MODE:
proc = SshProcess(self.sbin_dir + "/stop-slaves.sh;" +
self.sbin_dir + "/stop-master.sh;",
self.master)
proc.run()
if not proc.finished_ok:
logger.warn("Error while stopping Spark")
return
self.running = False
def get_version(self):
"""Return the Hadoop version.
Returns (str):
The version used by the Hadoop cluster.
"""
proc = SshProcess("export JAVA_HOME=" + self.java_home + ";" +
self.bin_dir + "/hadoop version",
self.master)
proc.run()
version = proc.stdout.splitlines()[0]
return version
def stop_yarn(self):
"""Stop the YARN ResourceManager and NodeManagers."""
self._check_initialization()
logger.info("Stopping YARN")
proc = SshProcess(self.sbin_dir + "/stop-yarn.sh", self.master)
proc.run()
if not proc.finished_ok:
logger.warn("Error while stopping YARN")
else:
self.running_yarn = False
def start_map_reduce(self):
"""Start the JobTracker and TaskTrackers."""
self._check_initialization()
logger.info("Starting MapReduce")
if self.running_map_reduce:
logger.warn("Error while starting MapReduce")
return
proc = SshProcess(self.sbin_dir + "/start-mapred.sh", self.master)
proc.run()
if not proc.finished_ok:
logger.info("MapReduce started successfully")
else:
self.running_map_reduce = True
if self.running_dfs:
self.running = True