Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
hosts.
Args:
conf_dir (str):
The remote configuration dir.
hosts (list of Host, optional):
The list of hosts where the configuration is going to be copied. If
not specified, all the hosts of the Spark cluster are used.
"""
if not hosts:
hosts = self.hosts
conf_files = [os.path.join(conf_dir, f) for f in os.listdir(conf_dir)]
action = TaktukPut(hosts, conf_files, self.conf_dir)
action.run()
if not action.finished_ok:
logger.warn("Error while copying configuration")
if not action.ended:
action.kill()
get_java_home = SshProcess('echo $(readlink -f /usr/bin/javac | '
'sed "s:/bin/javac::")', self.master)
get_java_home.run()
self.java_home = get_java_home.stdout.strip()
logger.info("All required packages are present")
# 1. Copy Hive tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir +
" " + self.warehouse_dir +
" " + self.logs_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote("tar xf /tmp/" + os.path.basename(tar_file) +
" -C /tmp", self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.warehouse_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
hosts.
Args:
conf_dir (str):
The remote configuration dir.
hosts (list of Host, optional):
The list of hosts where the configuration is going to be copied. If
not specified, all the hosts of the Hadoop cluster are used.
"""
if not hosts:
hosts = self.hosts
conf_files = [os.path.join(conf_dir, f) for f in os.listdir(conf_dir)]
action = TaktukPut(hosts, conf_files, self.conf_dir)
action.run()
if not action.finished_ok:
logger.warn("Error while copying configuration")
if not action.ended:
action.kill()
hosts.
Args:
conf_dir (str):
The remote configuration dir.
hosts (list of Host, optional):
The list of hosts where the configuration is going to be copied. If
not specified, all the hosts of the Spark cluster are used.
"""
if not hosts:
hosts = self.hosts
conf_files = [os.path.join(conf_dir, f) for f in os.listdir(conf_dir)]
action = TaktukPut(hosts, conf_files, self.conf_dir)
action.run()
if not action.finished_ok:
logger.warn("Error while copying configuration")
if not action.ended:
action.kill()
# 0. Check requirements
java_major_version = 7
if not check_java_version(java_major_version, self.hosts):
msg = "Java 1.%d+ required" % java_major_version
logger.error(msg)
raise SparkException(msg)
self.java_home = get_java_home(self.master)
# 1. Copy hadoop tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" + os.path.basename(tar_file).replace(".tgz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.logs_dir,
java_major_version = 7
if not check_java_version(java_major_version, self.hosts):
msg = "Java 1.%d+ required" % java_major_version
logger.error(msg)
raise HadoopException(msg)
self.java_home = get_java_home(self.master)
# 1. Copy hadoop tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir +
" " + self.logs_dir +
" " + self.hadoop_temp_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir and create other dirs
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +