Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_field_dump_decorator(self):
test_field = Field('test_field')
dump_function = str
test_field = test_field.dump(dump_function)
assert test_field.to_str == dump_function
def test_field_load_decorator(self):
test_field = Field('test_field')
load_function = int
test_field = test_field.dump(load_function)
assert test_field.to_str == load_function
class DummyFieldListFile(FieldListFile):
dummy = Field('dummy', dump=str, load=int, default=12, doc='dummy_field')
# noinspection PyMethodMayBeStatic
class TestFieldListFile(object):
def test_field_list_file_creation(self):
with pytest.raises(AttributeError):
FieldListFile(test_argument=11)
field = FieldListFile()
assert field
def test_subclass_creation(self):
with pytest.raises(AttributeError):
DummyFieldListFile(test_argument=11)
field = DummyFieldListFile()
def test_field_load_decorator(self):
test_field = Field('test_field')
load_function = int
test_field = test_field.dump(load_function)
assert test_field.to_str == load_function
def test_field_creation(self):
field = Field('test_field')
assert field
yield (name, value)
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
', '.join(['%s=%r' % x for x in self.items()]))
class WalFileInfo(FieldListFile):
"""
Metadata of a WAL file.
"""
__slots__ = ('orig_filename',)
name = Field('name', doc='base name of WAL file')
size = Field('size', load=int, doc='WAL file size after compression')
time = Field('time', load=float, doc='WAL file modification time '
'(seconds since epoch)')
compression = Field('compression', doc='compression type')
@classmethod
def from_file(cls, filename, unidentified_compression=None, **kwargs):
"""
Factory method to generate a WalFileInfo from a WAL file.
Every keyword argument will override any attribute from the provided
file. If a keyword argument doesn't has a corresponding attribute
an AttributeError exception is raised.
:param str filename: the file to inspect
:param str unidentified_compression: the compression to set if
timeline = Field('timeline', load=int)
begin_time = Field('begin_time', load=load_datetime_tz)
begin_xlog = Field('begin_xlog')
begin_wal = Field('begin_wal')
begin_offset = Field('begin_offset', load=int)
size = Field('size', load=int)
deduplicated_size = Field('deduplicated_size', load=int)
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_offset = Field('end_offset', load=int)
status = Field('status', default=EMPTY)
server_name = Field('server_name')
error = Field('error')
mode = Field('mode')
config_file = Field('config_file')
hba_file = Field('hba_file')
ident_file = Field('ident_file')
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
copy_stats = Field('copy_stats', load=ast.literal_eval, dump=null_repr)
xlog_segment_size = Field('xlog_segment_size', load=int,
default=xlog.DEFAULT_XLOG_SEG_SIZE)
systemid = Field('systemid')
__slots__ = 'backup_id', 'backup_version'
def __init__(self, backup_id, **kwargs):
"""
Stores meta information about a single backup
# Timeline is an integer
timeline = Field('timeline', load=int)
begin_time = Field('begin_time', load=load_datetime_tz)
begin_xlog = Field('begin_xlog')
begin_wal = Field('begin_wal')
begin_offset = Field('begin_offset', load=int)
size = Field('size', load=int)
deduplicated_size = Field('deduplicated_size', load=int)
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_offset = Field('end_offset', load=int)
status = Field('status', default=EMPTY)
server_name = Field('server_name')
error = Field('error')
mode = Field('mode')
config_file = Field('config_file')
hba_file = Field('hba_file')
ident_file = Field('ident_file')
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
copy_stats = Field('copy_stats', load=ast.literal_eval, dump=null_repr)
xlog_segment_size = Field('xlog_segment_size', load=int,
default=xlog.DEFAULT_XLOG_SEG_SIZE)
systemid = Field('systemid')
__slots__ = 'backup_id', 'backup_version'
def __init__(self, backup_id, **kwargs):
"""
Stores meta information about a single backup
POTENTIALLY_OBSOLETE = 'OBSOLETE*'
NONE = '-'
RETENTION_STATUS = (OBSOLETE, VALID, POTENTIALLY_OBSOLETE, NONE)
version = Field('version', load=int)
pgdata = Field('pgdata')
# Parse the tablespaces as a literal Python list of namedtuple
# Output the tablespaces as a literal Python list of tuple
tablespaces = Field('tablespaces', load=load_tablespace_list,
dump=output_tablespace_list)
# Timeline is an integer
timeline = Field('timeline', load=int)
begin_time = Field('begin_time', load=load_datetime_tz)
begin_xlog = Field('begin_xlog')
begin_wal = Field('begin_wal')
begin_offset = Field('begin_offset', load=int)
size = Field('size', load=int)
deduplicated_size = Field('deduplicated_size', load=int)
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_offset = Field('end_offset', load=int)
status = Field('status', default=EMPTY)
server_name = Field('server_name')
error = Field('error')
mode = Field('mode')
config_file = Field('config_file')
hba_file = Field('hba_file')
ident_file = Field('ident_file')
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
# Output the tablespaces as a literal Python list of tuple
tablespaces = Field('tablespaces', load=load_tablespace_list,
dump=output_tablespace_list)
# Timeline is an integer
timeline = Field('timeline', load=int)
begin_time = Field('begin_time', load=load_datetime_tz)
begin_xlog = Field('begin_xlog')
begin_wal = Field('begin_wal')
begin_offset = Field('begin_offset', load=int)
size = Field('size', load=int)
deduplicated_size = Field('deduplicated_size', load=int)
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_offset = Field('end_offset', load=int)
status = Field('status', default=EMPTY)
server_name = Field('server_name')
error = Field('error')
mode = Field('mode')
config_file = Field('config_file')
hba_file = Field('hba_file')
ident_file = Field('ident_file')
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
copy_stats = Field('copy_stats', load=ast.literal_eval, dump=null_repr)
xlog_segment_size = Field('xlog_segment_size', load=int,
default=xlog.DEFAULT_XLOG_SEG_SIZE)
systemid = Field('systemid')
__slots__ = 'backup_id', 'backup_version'
WAITING_FOR_WALS = 'WAITING_FOR_WALS'
DONE = 'DONE'
SYNCING = 'SYNCING'
STATUS_COPY_DONE = (WAITING_FOR_WALS, DONE)
STATUS_ALL = (EMPTY, STARTED, WAITING_FOR_WALS, DONE, SYNCING, FAILED)
STATUS_NOT_EMPTY = (STARTED, WAITING_FOR_WALS, DONE, SYNCING, FAILED)
STATUS_ARCHIVING = (STARTED, WAITING_FOR_WALS, DONE, SYNCING)
#: Status according to retention policies
OBSOLETE = 'OBSOLETE'
VALID = 'VALID'
POTENTIALLY_OBSOLETE = 'OBSOLETE*'
NONE = '-'
RETENTION_STATUS = (OBSOLETE, VALID, POTENTIALLY_OBSOLETE, NONE)
version = Field('version', load=int)
pgdata = Field('pgdata')
# Parse the tablespaces as a literal Python list of namedtuple
# Output the tablespaces as a literal Python list of tuple
tablespaces = Field('tablespaces', load=load_tablespace_list,
dump=output_tablespace_list)
# Timeline is an integer
timeline = Field('timeline', load=int)
begin_time = Field('begin_time', load=load_datetime_tz)
begin_xlog = Field('begin_xlog')
begin_wal = Field('begin_wal')
begin_offset = Field('begin_offset', load=int)
size = Field('size', load=int)
deduplicated_size = Field('deduplicated_size', load=int)
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_time = Field('end_time', load=load_datetime_tz)
end_xlog = Field('end_xlog')
end_wal = Field('end_wal')
end_offset = Field('end_offset', load=int)
status = Field('status', default=EMPTY)
server_name = Field('server_name')
error = Field('error')
mode = Field('mode')
config_file = Field('config_file')
hba_file = Field('hba_file')
ident_file = Field('ident_file')
included_files = Field('included_files',
load=ast.literal_eval, dump=null_repr)
backup_label = Field('backup_label', load=ast.literal_eval, dump=null_repr)
copy_stats = Field('copy_stats', load=ast.literal_eval, dump=null_repr)
xlog_segment_size = Field('xlog_segment_size', load=int,
default=xlog.DEFAULT_XLOG_SEG_SIZE)
systemid = Field('systemid')
__slots__ = 'backup_id', 'backup_version'
def __init__(self, backup_id, **kwargs):
"""
Stores meta information about a single backup
:param str,None backup_id:
"""
self.backup_version = 2
self.backup_id = backup_id
super(BackupInfo, self).__init__(**kwargs)
def get_required_wal_segments(self):