Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify_swap(storage, constraints, report_error, report_warning):
""" Verify the existence of swap.
:param storage: a storage to check
:param constraints: a dictionary of constraints
:param report_error: a function for error reporting
:param report_warning: a function for warning reporting
"""
swaps = storage.fsset.swap_devices
if not swaps:
installed = util.total_memory()
required = Size("%s MiB" % (constraints[STORAGE_MIN_RAM] + isys.NO_SWAP_EXTRA_RAM))
if not constraints[STORAGE_SWAP_IS_RECOMMENDED]:
if installed < required:
report_warning(_("You have not specified a swap partition. "
"%(requiredMem)s of memory is recommended to continue "
"installation without a swap partition, but you only "
"have %(installedMem)s.")
% {"requiredMem": required, "installedMem": installed})
else:
if installed < required:
report_error(_("You have not specified a swap partition. "
"%(requiredMem)s of memory is required to continue "
"installation without a swap partition, but you only "
"have %(installedMem)s.")
% {"requiredMem": required, "installedMem": installed})
else:
def _get_total_space(self):
""" Total disk space requirement for this device and its container. """
space = Size(0)
if self.container and self.container.exists:
return space
if self.container_size == SIZE_POLICY_AUTO:
# automatic container size management
if self.container:
space += sum([p.size for p in self.container.parents])
space -= self.container.freeSpace
# we need to account for the LVM metadata being placed somewhere
space += self.container.lvm_metadata_space
# we need to account for the LVM metadata being placed on each disk
# (and thus taking up to one extent from each disk)
space += len(self.disks) * lvm.LVM_PE_SIZE
elif self.container_size == SIZE_POLICY_MAX:
# grow the container as large as possible
def _get_parents(self):
""" Get selected parents for newly created device """
parents = []
# for encrypted parents add space for luks metada
if self._encryption_chooser.encrypt:
reserved_size = crypto.LUKS_METADATA_SIZE
else:
reserved_size = size.Size(0)
if self.selected_parent.type == "lvmvg":
if self.selected_type == "lvmthinpool":
free = self.selected_free.size * POOL_RESERVED
else:
free = self.selected_free.size
parent = ProxyDataContainer(device=self.selected_parent,
free_space=self.selected_free,
min_size=self._get_parent_min_size(),
max_size=self._get_parent_max_size(self.selected_parent, free),
reserved_size=reserved_size)
parents.append(parent)
else:
for row in self.parents_store:
if row[3]:
:type requests: list of :class:`PartitionRequest`
.. note::
We will limit partition growth based on disklabel limitations
for partition end sector, so a 10TB disk with an msdos disklabel
will be treated like a 2TiB disk.
.. note::
If you plan to allocate aligned partitions you should pass in an
aligned geometry instance.
"""
self.geometry = geometry # parted.Geometry
self.sectorSize = Size(self.geometry.device.sectorSize)
self.path = self.geometry.device.path
super(DiskChunk, self).__init__(self.geometry.length, requests=requests)
def get_file_system_free_space(self, mount_points=("/", "/usr")):
"""Get total file system free space on the given mount points.
Calculates total free space in / and /usr, by default.
:param mount_points: a list of mount points
:return: a total size
"""
free = Size(0)
btrfs_volumes = []
for mount_point in mount_points:
device = self.mountpoints.get(mount_point)
if not device:
continue
# don't count the size of btrfs volumes repeatedly when multiple
# subvolumes are present
if isinstance(device, BTRFSSubVolumeDevice):
if device.volume in btrfs_volumes:
continue
else:
btrfs_volumes.append(device.volume)
if device.format.exists:
def get_supported_pe_sizes():
return [Size(pe_size) for pe_size in blockdev.lvm.get_supported_pe_sizes()]
# cannot be obtained the actual current size of the device.
# If it was not possible to obtain the current size of the device
# then _min_instance_size is 0, but since _resizable is False
# that information can not be used to shrink the filesystem below
# its unknown actual minimum size.
# * self._get_min_size() is only run if fsck succeeds and a current
# existing size can be obtained.
if not self.exists:
return
self._current_info = None
self._min_instance_size = Size(0)
self._resizable = self.__class__._resizable
# try to gather current size info
self._size = Size(0)
try:
if self._info.available:
self._current_info = self._info.do_task()
except FSError as e:
log.info("Failed to obtain info for device %s: %s", self.device, e)
try:
self._size = self._size_info.do_task()
except (FSError, NotImplementedError) as e:
log.warning("Failed to obtain current size for device %s: %s", self.device, e)
else:
self._min_instance_size = self._size
# We absolutely need a current size to enable resize. To shrink the
# filesystem we need a real minimum size provided by the resize
# tool. Failing that, we can default to the current size,
# effectively disabling shrink.
from parted import PARTITION_PREP
import os
import logging
log = logging.getLogger("blivet")
class PPCPRePBoot(DeviceFormat):
""" Generic device format. """
_type = "prepboot"
_name = N_("PPC PReP Boot")
parted_flag = PARTITION_PREP
_formattable = True # can be formatted
_linux_native = True # for clearpart
_max_size = Size("10 MiB")
_min_size = Size("4 MiB")
_supported = True
def __init__(self, **kwargs):
"""
:keyword device: path to block device node
:keyword exists: whether this is an existing format
:type exists: bool
.. note::
The 'device' kwarg is required for existing formats. For non-
existent formats, it is only necessary that the :attr:`device`
attribute be set before the :meth:`create` method runs. Note
that you can specify the device at the last moment by specifying
it via the 'device' kwarg to the :meth:`create` method.
"""
self._update_labels(totalDisks, totalReclaimableSpace, 0)
description = _("You can remove existing file systems you no longer need to free up space "
"for this installation. Removing a file system will permanently delete all "
"of the data it contains.")
if canShrinkSomething:
description += "\n\n"
description += _("There is also free space available in pre-existing file systems. "
"While it's risky and we recommend you back up your data first, you "
"can recover that free disk space and make it available for this "
"installation below.")
self._reclaimDescLabel.set_text(description)
self._update_reclaim_button(Size(0))
def reset(self):
"""Get rid of any existing error messages and prepare to run the
check again.
"""
self.success = False
self.deficit = Size(bytes=0)
self.error_message = ""