Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, path, check_for_duplicate_uids=True):
super(VdirStore, self).__init__(MemoryIndex())
self.path = path
self._check_for_duplicate_uids = check_for_duplicate_uids
# Set of blob ids that have already been scanned
self._fname_to_uid = {}
# Maps uids to (sha, fname)
self._uid_to_fname = {}
cp = configparser.ConfigParser()
cp.read([os.path.join(self.path, CONFIG_FILENAME)])
def save_config(cp, message):
with open(os.path.join(self.path, CONFIG_FILENAME), 'w') as f:
cp.write(f)
self.config = FileBasedCollectionMetadata(cp, save=save_config)
def save_config(cp, message):
f = StringIO()
cp.write(f)
self._import_one(
CONFIG_FILENAME, [f.getvalue().encode('utf-8')],
message)
return FileBasedCollectionMetadata(cp, save=save_config)
def _iterblobs(self, ctag=None):
if ctag is None:
tree = self._get_current_tree()
else:
tree = self.repo.object_store[ctag.encode('ascii')]
for (name, mode, sha) in tree.iteritems():
name = name.decode(DEFAULT_ENCODING)
if name == CONFIG_FILENAME:
continue
yield (name, mode, sha)
def config(self):
if RepoCollectionMetadata.present(self.repo):
return RepoCollectionMetadata(self.repo)
else:
cp = configparser.ConfigParser()
try:
cf = self._get_raw(CONFIG_FILENAME)
except KeyError:
pass
else:
if cf is not None:
cp.read_string(b''.join(cf).decode('utf-8'))
def save_config(cp, message):
f = StringIO()
cp.write(f)
self._import_one(
CONFIG_FILENAME, [f.getvalue().encode('utf-8')],
message)
return FileBasedCollectionMetadata(cp, save=save_config)
def iter_with_etag(self, ctag=None):
"""Iterate over all items in the store with etag.
:param ctag: Ctag to iterate for
:yield: (name, content_type, etag) tuples
"""
for name in os.listdir(self.path):
if name.endswith('.tmp'):
continue
if name == CONFIG_FILENAME:
continue
if name.endswith('.ics'):
content_type = 'text/calendar'
elif name.endswith('.vcf'):
content_type = 'text/vcard'
else:
continue
yield (name, content_type, self._get_etag(name))