Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tokens = num.split(",")
for t in tokens:
# hack to run validation on each
self.set_virt_file_size(t)
# if no exceptions raised, good enough
self.virt_file_size = num
return
try:
inum = int(num)
if inum != float(num):
raise CX(_("invalid virt file size (%s)" % num))
if inum >= 0:
self.virt_file_size = inum
return
raise CX(_("invalid virt file size (%s)" % num))
except:
raise CX(_("invalid virt file size (%s)" % num))
def copyfile(src, dst, api=None, logger=None):
try:
if logger is not None:
logger.info("copying: %s -> %s" % (src, dst))
if os.path.isdir(src):
shutil.copytree(src, dst)
else:
shutil.copyfile(src, dst)
except:
if not os.access(src, os.R_OK):
raise CX(_("Cannot read: %s") % src)
if os.path.samefile(src, dst):
# accomodate for the possibility that we already copied
# the file as a symlink/hardlink
raise
# traceback.print_exc()
elif isinstance(ref, profile.Profile):
match = self.api.find_profile(ref.name)
elif isinstance(ref, distro.Distro):
match = self.api.find_distro(ref.name)
elif isinstance(ref, repo.Repo):
match = self.api.find_repo(ref.name)
elif isinstance(ref, image.Image):
match = self.api.find_image(ref.name)
elif isinstance(ref, mgmtclass.Mgmtclass):
match = self.api.find_mgmtclass(ref.name)
elif isinstance(ref, package.Package):
match = self.api.find_package(ref.name)
elif isinstance(ref, file.File):
match = self.api.find_file(ref.name)
else:
raise CX("internal error, unknown object type")
if match:
raise CX(_("An object already exists with that name. Try 'edit'?"))
# the duplicate mac/ip checks can be disabled.
if not check_for_duplicate_netinfo:
return
if isinstance(ref, system.System):
for (name, intf) in list(ref.interfaces.items()):
match_ip = []
match_mac = []
match_hosts = []
input_mac = intf["mac_address"]
input_ip = intf["ip_address"]
input_dns = intf["dns_name"]
def check_if_valid(self):
"""
Insure name, path, owner, group, and mode are set.
Templates are only required for files, is_dir = False
"""
if not self.name:
raise CX("name is required")
if not self.path:
raise CX("path is required")
if not self.owner:
raise CX("owner is required")
if not self.group:
raise CX("group is required")
if not self.mode:
raise CX("mode is required")
if not self.is_dir and self.template == "":
raise CX("Template is required when not a directory")
def set_ipv6_address(self, address, interface):
"""
Set IPv6 address on interface.
@param: str address (ip address)
@param: str interface (interface name)
@returns: True or CX
"""
address = validate.ipv6_address(address)
if address != "" and utils.input_boolean(self.collection_mgr._settings.allow_duplicate_ips) is False:
matched = self.collection_mgr.api.find_items("system", {"ipv6_address": address})
for x in matched:
if x.name != self.name:
raise CX("IP address duplicated: %s" % address)
intf = self.__get_interface(interface)
intf["ipv6_address"] = address
if ilen >= 64:
raise CX("too many interfaces submitted")
# validate things first
name = info.get("name", "")
inames = list(interfaces.keys())
if self.api.find_system(name=name):
raise CX("system name conflicts")
if hostname != "" and self.api.find_system(hostname=hostname):
raise CX("hostname conflicts")
for iname in inames:
mac = info["interfaces"][iname].get("mac_address", "")
ip = info["interfaces"][iname].get("ip_address", "")
if ip.find("/") != -1:
raise CX("no CIDR ips are allowed")
if mac == "":
raise CX("missing MAC address for interface %s" % iname)
if mac != "":
system = self.api.find_system(mac_address=mac)
if system is not None:
raise CX("mac conflict: %s" % mac)
if ip != "":
system = self.api.find_system(ip_address=ip)
if system is not None:
raise CX("ip conflict: %s" % ip)
# looks like we can go ahead and create a system now
obj = self.api.new_system()
obj.set_profile(profile)
obj.set_name(name)
if hostname != "":
"""
Add an object to the collection
with_copy is a bit of a misnomer, but lots of internal add operations
can run with "with_copy" as False. True means a real final commit, as if
entered from the command line (or basically, by a user).
With with_copy as False, the particular add call might just be being run
during deserialization, in which case extra semantics around the add don't really apply.
So, in that case, don't run any triggers and don't deal with any actual files.
"""
item_base.Item.remove_from_cache(ref)
if ref is None:
raise CX("Unable to add a None object")
if ref.name is None:
raise CX("Unable to add an object without a name")
ref.check_if_valid()
if ref.uid == '':
ref.uid = self.collection_mgr.generate_uid()
if save is True:
now = time.time()
if ref.ctime == 0:
ref.ctime = now
ref.mtime = now
if self.lite_sync is None:
self.lite_sync = litesync.CobblerLiteSync(self.collection_mgr, logger=logger)
# migration path for old API parameter that I've renamed.
def write_dhcp_file(self):
"""
DHCP files are written when manage_dhcp is set in
/etc/cobbler/settings.
"""
settings_file = "/etc/dnsmasq.conf"
template_file = "/etc/cobbler/dnsmasq.template"
try:
f2 = open(template_file, "r")
except:
raise CX(_("error writing template to file: %s") % template_file)
template_data = ""
template_data = f2.read()
f2.close()
system_definitions = {}
counter = 0
# we used to just loop through each system, but now we must loop
# through each network interface of each system.
for system in self.systems:
if not system.is_management_supported(cidr_ok=False):
continue
profile = system.get_conceptual_parent()
def write_genders_file(config, profiles_genders, distros_genders, mgmtcls_genders):
"""
genders file is over-written when manage_genders is set in
/var/lib/cobbler/settings.
"""
templar_inst = cobbler.templar.Templar(config)
try:
f2 = open(template_file, "r")
except:
raise CX(_("error reading template: %s") % template_file)
template_data = ""
template_data = f2.read()
f2.close()
metadata = {
"date": time.asctime(time.gmtime()),
"profiles_genders": profiles_genders,
"distros_genders": distros_genders,
"mgmtcls_genders": mgmtcls_genders
}
templar_inst.render(template_data, metadata, settings_file, None)
# now do full templating scan, where we will also templatify the snippet insertions
t = Template(source=raw_data, searchList=[search_table], compilerSettings={'useStackFrame': False})
if fix_cheetah_class:
t.SNIPPET = functools.partial(t.SNIPPET, t)
t.read_snippet = functools.partial(t.read_snippet, t)
try:
data_out = t.respond()
self.last_errors = t.errorCatcher().listErrors()
if self.last_errors:
self.logger.warning("errors were encountered rendering the template")
self.logger.warning("\n" + pprint.pformat(self.last_errors))
except Exception as e:
self.logger.error(utils.cheetah_exc(e))
raise CX("Error templating file, check cobbler.log for more details")
return data_out