Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" + os.path.basename(tar_file).replace(".tgz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.logs_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.logs_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 2.1. Create spark-events dir
if self.evs_log_dir:
if self.evs_log_dir.startswith("file://") or \
"://" not in self.evs_log_dir:
mk_evs_dir = TaktukRemote("mkdir -p " + self.evs_log_dir +
" && chmod g+w " + self.evs_log_dir,
self.hosts)
mk_evs_dir.run()
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" + os.path.basename(tar_file).replace(".tgz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.logs_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.logs_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 2.1. Create spark-events dir
if self.evs_log_dir:
if self.evs_log_dir.startswith("file://") or \
"://" not in self.evs_log_dir:
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir and create other dirs
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.logs_dir +
" && mkdir -p " + self.hadoop_temp_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.logs_dir +
" && chmod g+w " + self.hadoop_temp_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 4. Specify environment variables
command = "cat >> " + self.conf_dir + "/hadoop-env.sh << EOF\n"
command += "export JAVA_HOME=" + self.java_home + "\n"
command += "export HADOOP_LOG_DIR=" + self.logs_dir + "\n"
command += "HADOOP_HOME_WARN_SUPPRESS=\"TRUE\"\n"
command += "EOF"
action = Remote(command, self.hosts)
action.run()
# 5. Check version (cannot do it before)
"export DEBIAN_MASTER=noninteractive ; " +
"apt-get update && apt-get install -y --force-yes " +
required_packages, self.hosts).run()
if not install_packages.ok:
logger.error("Unable to install the packages")
get_java_home = SshProcess('echo $(readlink -f /usr/bin/javac | '
'sed "s:/bin/javac::")', self.master)
get_java_home.run()
self.java_home = get_java_home.stdout.strip()
logger.info("All required packages are present")
# 1. Copy Hive tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir +
" " + self.warehouse_dir +
" " + self.logs_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote("tar xf /tmp/" + os.path.basename(tar_file) +
" -C /tmp", self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
tar_file (str):
The file containing Hadoop binaries.
"""
# 0. Check requirements
java_major_version = 7
if not check_java_version(java_major_version, self.hosts):
msg = "Java 1.%d+ required" % java_major_version
logger.error(msg)
raise HadoopException(msg)
self.java_home = get_java_home(self.master)
# 1. Copy hadoop tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir +
" " + self.logs_dir +
" " + self.hadoop_temp_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir and create other dirs
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
def check_java_version(java_major_version, hosts):
tr = TaktukRemote("java -version 2>&1 | grep version", hosts)
tr.run()
for p in tr.processes:
match = re.match('.*[^.0-9]1\.([0-9]+).[0-9].*', p.stdout)
version = int(match.group(1))
if java_major_version > version:
msg = "Java 1.%d+ required" % java_major_version
return False
return True
" " + self.conf_dir +
" " + self.logs_dir +
" " + self.hadoop_temp_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir and create other dirs
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.logs_dir +
" && mkdir -p " + self.hadoop_temp_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.logs_dir +
" && chmod g+w " + self.hadoop_temp_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 4. Specify environment variables
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote("tar xf /tmp/" + os.path.basename(tar_file) +
" -C /tmp", self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.warehouse_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.warehouse_dir,
self.hosts)
SequentialActions([mv_base_dir, mkdirs, chmods]).run()
# 3. Specify environment variables
command = "cat >> " + self.conf_dir + "/hive-env.sh << EOF\n"
command += "JAVA_HOME=" + self.java_home + "\n"
command += "HIVE_HOME=" + self.base_dir + "\n"
command += "HIVE_CONF_DIR=" + self.conf_dir + "\n"
command += "HADOOP_HOME=" + self.hc.base_dir + "\n"
command += "EOF\n"
command += "chmod +x " + self.conf_dir + "/hive-env.sh"
action = Remote(command, self.hosts)
action.run()
get_java_home = SshProcess('echo $(readlink -f /usr/bin/javac | '
'sed "s:/bin/javac::")', self.master)
get_java_home.run()
self.java_home = get_java_home.stdout.strip()
logger.info("All required packages are present")
# 1. Copy Hive tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir +
" " + self.warehouse_dir +
" " + self.logs_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote("tar xf /tmp/" + os.path.basename(tar_file) +
" -C /tmp", self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hosts)
mkdirs = TaktukRemote("mkdir -p " + self.conf_dir +
" && mkdir -p " + self.warehouse_dir,
self.hosts)
chmods = TaktukRemote("chmod g+w " + self.base_dir +
" && chmod g+w " + self.conf_dir +
" && chmod g+w " + self.warehouse_dir,
def bootstrap(self, tar_file):
# 0. Check requirements
java_major_version = 7
if not check_java_version(java_major_version, self.hosts):
msg = "Java 1.%d+ required" % java_major_version
logger.error(msg)
raise SparkException(msg)
self.java_home = get_java_home(self.master)
# 1. Copy hadoop tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
rm_dirs = TaktukRemote("rm -rf " + self.base_dir +
" " + self.conf_dir,
self.hosts)
put_tar = TaktukPut(self.hosts, [tar_file], "/tmp")
tar_xf = TaktukRemote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hosts)
rm_tar = TaktukRemote(
"rm /tmp/" + os.path.basename(tar_file),
self.hosts)
SequentialActions([rm_dirs, put_tar, tar_xf, rm_tar]).run()
# 2. Move installation to base dir
logger.info("Create installation directories")
mv_base_dir = TaktukRemote(
"mv /tmp/" + os.path.basename(tar_file).replace(".tgz", "") + " " +
self.base_dir,