Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def clean_logs(self):
"""Remove all Hive logs."""
logger.info("Cleaning logs")
restart = False
if self.running:
logger.warn("The cluster needs to be stopped before cleaning.")
self.stop()
restart = True
action = Remote("rm -rf " + self.logs_dir + "/* ",
self.hosts)
action.run()
if restart:
self.start()
def clean_logs(self):
"""Remove all Spark logs."""
logger.info("Cleaning logs")
restart = False
if self.running:
logger.warn("The cluster needs to be stopped before cleaning.")
self.stop()
restart = True
action = Remote("rm -rf " + self.logs_dir + "/* " +
self.work_dir + "/*",
self.hosts)
action.run()
if restart:
self.start()
def _get_conf_files(self, host):
action = Remote("ls " + self.conf_dir + "/*.xml", [host])
action.run()
output = action.processes[0].stdout
remote_conf_files = []
for f in output.split():
remote_conf_files.append(os.path.join(self.conf_dir, f))
tmp_dir = "/tmp/mliroz_temp_hadoop/"
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
action = Get([host], remote_conf_files, tmp_dir)
action.run()
temp_conf_files = [os.path.join(tmp_dir, f) for f in
os.listdir(tmp_dir)]
def clean_data(self):
"""Remove all data created by Hadoop (including filesystem)."""
if self.running:
logger.warn("The cluster needs to be stopped before cleaning.")
self.stop()
logger.info("Cleaning hadoop data")
restart = False
if self.running:
self.stop()
restart = True
action = Remote("rm -rf " + self.hadoop_temp_dir + " /tmp/hadoop-" +
getpass.getuser() + "-*", self.hosts)
action.run()
if restart:
self.start()
self.hc.execute("fs -mkdir -p " + self.evs_log_dir)
# 3. Specify environment variables
env_file = self.conf_dir + "/spark-env.sh"
command = "cat >> " + env_file + " << EOF\n"
command += "JAVA_HOME=" + self.java_home + "\n"
command += "SPARK_LOG_DIR=" + self.logs_dir + "\n"
if self.hc:
command += "HADOOP_CONF_DIR=" + self.hc.conf_dir + "\n"
if self.mode == YARN_MODE:
command += "YARN_CONF_DIR=" + self.hc.conf_dir + "\n"
command += "EOF\n"
command += "echo SPARK_PUBLIC_DNS=$(hostname) >> " + env_file
command += " && chmod +x " + env_file
action = Remote(command, self.hosts)
action.run()
# 4. Generate initial configuration
self._initialize_conf()
def bootstrap(self, tar_file):
# 1. Remove used dirs if existing
action = Remote("rm -rf " + self.base_dir, self.hc.hosts)
action.run()
action = Remote("rm -rf " + self.conf_dir, self.hc.hosts)
action.run()
# 1. Copy Mahout tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
action = Put(self.hc.hosts, [tar_file], "/tmp")
action.run()
action = Remote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hc.hosts)
action.run()
# 2. Move installation to base dir
logger.info("Create installation directories")
action = Remote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.warehouse_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 3. Specify environment variables
command = "cat >> " + self.conf_dir + "/hive-env.sh << EOF\n"
command += "JAVA_HOME=" + self.java_home + "\n"
command += "HIVE_HOME=" + self.base_dir + "\n"
command += "HIVE_CONF_DIR=" + self.conf_dir + "\n"
command += "HADOOP_HOME=" + self.hc.base_dir + "\n"
command += "EOF\n"
command += "chmod +x " + self.conf_dir + "/hive-env.sh"
action = Remote(command, self.hosts)
action.run()
def clean_history(self):
"""Remove history."""
logger.info("Cleaning history")
restart = False
if self.running:
logger.warn("The cluster needs to be stopped before cleaning.")
self.stop()
restart = True
action = Remote("rm -rf " + self.logs_dir + "/history",
[self.master])
action.run()
if restart:
self.start()