Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_conf_files(self, host):
action = Remote("ls " + self.conf_dir + "/*.xml", [host])
action.run()
output = action.processes[0].stdout
remote_conf_files = []
for f in output.split():
remote_conf_files.append(os.path.join(self.conf_dir, f))
tmp_dir = "/tmp/mliroz_temp_hadoop/"
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
action = Get([host], remote_conf_files, tmp_dir)
action.run()
temp_conf_files = [os.path.join(tmp_dir, f) for f in
os.listdir(tmp_dir)]
return temp_conf_files
def _get_conf_files(self, host):
action = Remote("ls " + self.conf_dir + "/*.conf", [host])
action.run()
output = action.processes[0].stdout
remote_conf_files = []
for f in output.split():
remote_conf_files.append(os.path.join(self.conf_dir, f))
tmp_dir = "/tmp/mliroz_temp_spark/"
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
action = Get([host], remote_conf_files, tmp_dir)
action.run()
temp_conf_files = [os.path.join(tmp_dir, f) for f in
os.listdir(tmp_dir)]
return temp_conf_files
hosts = cluster.get_hosts()
# Copy conf files from first host in the cluster
action = Remote("ls " + self.conf_dir + "/*.xml", [hosts[0]])
action.run()
output = action.processes[0].stdout
remote_conf_files = []
for f in output.split():
remote_conf_files.append(os.path.join(self.conf_dir, f))
tmp_dir = "/tmp/mliroz_temp_hadoop/"
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
action = Get([hosts[0]], remote_conf_files, tmp_dir)
action.run()
# Do replacements in temp file
if conf_file:
f = os.path.join(tmp_dir, conf_file)
for name, value in params.iteritems():
replace_in_xml_file(f, name, value, True)
else:
temp_conf_files = [os.path.join(tmp_dir, f) for f in
os.listdir(tmp_dir)]
for name, value in params.iteritems():
for f in temp_conf_files:
if replace_in_xml_file(f, name, value):
break
else:
"Local conf dir does not exist. Using default configuration")
base_conf_files = []
missing_conf_files = self.conf_mandatory_files
for f in base_conf_files:
f_base_name = os.path.basename(f)
if f_base_name in missing_conf_files:
missing_conf_files.remove(f_base_name)
logger.info("Copying missing conf files from master: " + str(
missing_conf_files))
remote_missing_files = [os.path.join(self.conf_dir, f)
for f in missing_conf_files]
action = Get([self.master], remote_missing_files, self.init_conf_dir)
action.run()
logger.info("Copying output to " + local_path)
tmp_dir = "/tmp"
# Remove file in tmp dir if exists
proc = SshProcess("rm -rf " +
os.path.join(tmp_dir, os.path.basename(remote_path)),
self.hc.master)
proc.run()
# Get files in master
self.hc.execute("fs -get " + remote_path + " " + tmp_dir,
verbose=False)
# Copy files from master
action = Get([self.hc.master],
[os.path.join(tmp_dir, os.path.basename(remote_path))],
local_path)
action.run()
history_dir = os.path.join(self.logs_dir, "history")
if job_ids:
pattern = " -o ".join("-name " + jid + "*" for jid in job_ids)
list_dirs = SshProcess("find " + history_dir + " " + pattern,
self.master)
list_dirs.run()
else:
list_dirs = SshProcess("find " + history_dir + " -name job_*",
self.master)
list_dirs.run()
remote_files = []
for line in list_dirs.stdout.splitlines():
remote_files.append(line)
action = Get([self.master], remote_files, dest)
action.run()