Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_meta(self, key):
try:
return normalize_meta_value(self.session.request('GET', key).text)
except exceptions.NotFoundError:
return u''
def test_metadata_normalization(self, requires_metadata, s, value):
x = s.get_meta('displayname')
assert x == normalize_meta_value(x)
if not getattr(self, 'dav_server', None):
# ownCloud replaces "" with "unnamed"
s.set_meta('displayname', value)
assert s.get_meta('displayname') == normalize_meta_value(value)
values = st.text().filter(lambda x: normalize_meta_value(x) == x)
metadata = st.dictionaries(keys, values)
def set_meta(self, key, value):
self.session.request(
'PUT',
key,
data=normalize_meta_value(value).encode('utf-8'),
headers={'Content-Type': 'text/plain; charset=utf-8'}
)
'''.format(
etree.tostring(etree.Element(xpath), encoding='unicode')
).encode('utf-8')
headers = self.session.get_default_headers()
headers['Depth'] = '0'
response = self.session.request(
'PROPFIND', '',
data=data, headers=headers
)
root = _parse_xml(response.content)
for prop in root.findall('.//' + xpath):
text = normalize_meta_value(getattr(prop, 'text', None))
if text:
return text
return ''
def set_meta(self, key, value):
try:
tagname, namespace = self._property_table[key]
except KeyError:
raise exceptions.UnsupportedMetadataError()
lxml_selector = '{%s}%s' % (namespace, tagname)
element = etree.Element(lxml_selector)
element.text = normalize_meta_value(value)
data = '''
{}
'''.format(etree.tostring(element, encoding='unicode')).encode('utf-8')
self.session.request(
'PROPPATCH', '',
data=data, headers=self.session.get_default_headers()
)
'''.format(
etree.tostring(etree.Element(xpath), encoding='unicode')
).encode('utf-8')
headers = self.session.get_default_headers()
headers['Depth'] = '0'
response = self.session.request(
'PROPFIND', '',
data=data, headers=headers
)
root = _parse_xml(response.content)
for prop in root.findall('.//' + xpath):
text = normalize_meta_value(getattr(prop, 'text', None))
if text:
return text
return u''
_a_to_b()
elif conflict_resolution == 'b wins':
_b_to_a()
else:
if callable(conflict_resolution):
logger.warning('Custom commands don\'t work on metasync.')
elif conflict_resolution is not None:
raise exceptions.UserError(
'Invalid conflict resolution setting.'
)
raise MetaSyncConflict(key)
for key in keys:
a = storage_a.get_meta(key)
b = storage_b.get_meta(key)
s = normalize_meta_value(status.get(key))
logger.debug(f'Key: {key}')
logger.debug(f'A: {a}')
logger.debug(f'B: {b}')
logger.debug(f'S: {s}')
if a != s and b != s:
_resolve_conflict()
elif a != s and b == s:
_a_to_b()
elif a == s and b != s:
_b_to_a()
else:
assert a == b
for key in set(status) - set(keys):
del status[key]
def get_meta(self, key):
return normalize_meta_value(self.metadata.get(key))
def get_meta(self, key):
fpath = os.path.join(self.path, key)
try:
with open(fpath, 'rb') as f:
return normalize_meta_value(f.read().decode(self.encoding))
except OSError as e:
if e.errno == errno.ENOENT:
return ''
else:
raise