Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _upload_impl(self, item, href):
fpath = self._get_filepath(href)
try:
with atomic_write(fpath, mode='wb', overwrite=False) as f:
f.write(item.raw.encode(self.encoding))
return fpath, get_etag_from_file(f)
except OSError as e:
if e.errno == errno.EEXIST:
raise exceptions.AlreadyExistingError(existing_href=href)
else:
raise
def upload(self, item):
href = self._get_href(item)
if href in self.items:
raise exceptions.AlreadyExistingError(existing_href=href)
etag = _random_string()
self.items[href] = (etag, item)
return href, etag
def upload(self, item):
href = item.ident
if href in self._items:
raise exceptions.AlreadyExistingError(existing_href=href)
self._items[href] = item, item.hash
return href, item.hash
def check_error(e):
try:
errors.check_exception(e[0])
except errors.Error.ItemNotFound as e:
raise exceptions.NotFoundError(e) from e
except errors.Error.ItemAlreadyExisting as e:
raise exceptions.AlreadyExistingError(e) from e
except errors.Error.WrongEtag as e:
raise exceptions.WrongEtagError(e) from e
except errors.Error.ReadOnly as e:
raise exceptions.ReadOnlyError(e) from e
except errors.Error.UnsupportedVobject as e:
raise exceptions.UnsupportedVobjectError(e) from e
except (errors.Error.BadDiscoveryConfig,
errors.Error.BadCollectionConfig) as e:
raise TypeError(e) from e
except errors.Error.MetadataValueUnsupported as e:
raise exceptions.UnsupportedMetadataError(e) from e
def upload(self, item):
try:
entry = self._item_type.create(self._journal.collection, item.raw)
entry.save()
except etesync.exceptions.DoesNotExist as e:
raise exceptions.NotFoundError(e)
except etesync.exceptions.AlreadyExists as e:
raise exceptions.AlreadyExistingError(e)
return item.uid, item.hash