Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def copy_function(host, files_to_copy, collector=None):
action = Put([host], files_to_copy, tmp_dir)
action.run()
local_final_size = 0
for f in files_to_copy:
src_file = os.path.join(tmp_dir, os.path.basename(f))
if self.pre_load_function:
src_file = self.pre_load_function(src_file, host)
action = SshProcess("du -b " + src_file + "| cut -f1", host)
action.run()
local_final_size += int(action.stdout.strip())
hc.execute("fs -put " + src_file + " " +
os.path.join(dest, os.path.basename(src_file)),
"""
self._check_initialization()
if not self.running:
logger.warn("The cluster was stopped. Starting it automatically")
self.start()
if not node:
node = self.master
exec_dir = "/tmp"
# Copy necessary files to cluster
files_to_copy = job.get_files_to_copy()
action = Put([node], files_to_copy, exec_dir)
action.run()
# Get command
command = job.get_command(exec_dir)
# Execute
logger.info("Executing jar job. Command = {" + self.bin_dir +
"/hadoop " + command + "} in " + str(node))
proc = SshProcess(self.bin_dir + "/hadoop " + command, node)
if verbose:
red_color = '\033[01;31m'
proc.stdout_handlers.append(sys.stdout)
proc.stderr_handlers.append(
A tuple with the standard and error outputs of the process executing
the job.
"""
if not self.running:
logger.warn("The cluster was stopped. Starting it automatically")
self.start()
if node is None:
node = self.master
exec_dir = "/tmp"
# Copy necessary files to cluster
files_to_copy = job.get_files_to_copy()
action = Put([node], files_to_copy, exec_dir)
action.run()
# Get command
command = job.get_command(exec_dir)
# Execute
logger.info("Executing spark job. Command = {" + self.bin_dir +
"/spark-submit " + command + "} in " + str(node))
proc = SshProcess(self.bin_dir + "/spark-submit " + command, node)
if verbose:
red_color = '\033[01;31m'
proc.stdout_handlers.append(sys.stdout)
proc.stderr_handlers.append(
def bootstrap(self, tar_file):
# 1. Remove used dirs if existing
action = Remote("rm -rf " + self.base_dir, self.hc.hosts)
action.run()
action = Remote("rm -rf " + self.conf_dir, self.hc.hosts)
action.run()
# 1. Copy Mahout tar file and uncompress
logger.info("Copy " + tar_file + " to hosts and uncompress")
action = Put(self.hc.hosts, [tar_file], "/tmp")
action.run()
action = Remote(
"tar xf /tmp/" + os.path.basename(tar_file) + " -C /tmp",
self.hc.hosts)
action.run()
# 2. Move installation to base dir
logger.info("Create installation directories")
action = Remote(
"mv /tmp/" +
os.path.basename(tar_file).replace(".tar.gz", "") + " " +
self.base_dir,
self.hc.hosts)
action.run()
# 3 Create other dirs
def copy_function(host, files_to_copy):
action = Put([host], files_to_copy, tmp_dir)
action.run()
for f in files_to_copy:
src_file = os.path.join(tmp_dir, os.path.basename(f))
hc.execute("fs -put " + src_file + " " +
os.path.join(dest, os.path.basename(src_file)),
host, True, False)