Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _copy_or_move(self, environ, start_response, is_move):
"""
@see: http://www.webdav.org/specs/rfc4918.html#METHOD_COPY
@see: http://www.webdav.org/specs/rfc4918.html#METHOD_MOVE
"""
srcPath = environ["PATH_INFO"]
provider = self._davProvider
srcRes = provider.get_resource_inst(srcPath, environ)
srcParentRes = provider.get_resource_inst(util.get_uri_parent(srcPath), environ)
def _debug_exception(e):
"""Log internal exceptions with stacktrace that otherwise would be hidden."""
if self._verbose >= 5:
_logger.exception("_debug_exception")
return
# --- Check source ----------------------------------------------------
if srcRes is None:
self._fail(HTTP_NOT_FOUND, srcPath)
if "HTTP_DESTINATION" not in environ:
self._fail(HTTP_BAD_REQUEST, "Missing required Destination header.")
if not environ.setdefault("HTTP_OVERWRITE", "T") in ("T", "F"):
# Overwrite defaults to 'T'
self._fail(HTTP_BAD_REQUEST, "Invalid Overwrite header.")
def do_PUT(self, environ, start_response):
"""
@see: http://www.webdav.org/specs/rfc4918.html#METHOD_PUT
"""
path = environ["PATH_INFO"]
provider = self._davProvider
res = provider.get_resource_inst(path, environ)
parentRes = provider.get_resource_inst(util.get_uri_parent(path), environ)
isnewfile = res is None
# Test for unsupported stuff
if "HTTP_CONTENT_ENCODING" in environ:
util.fail(HTTP_NOT_IMPLEMENTED, "Content-encoding header is not supported.")
# An origin server that allows PUT on a given target resource MUST send
# a 400 (Bad Request) response to a PUT request that contains a
# Content-Range header field
# (http://tools.ietf.org/html/rfc7231#section-4.3.4)
if "HTTP_CONTENT_RANGE" in environ:
util.fail(
HTTP_BAD_REQUEST, "Content-range header is not allowed on PUT requests."
)
ll = self.get_url_lock_list(u)
for l in ll:
_logger.debug(" check parent {}, {}".format(u, lock_string(l)))
if u != url and l["depth"] != "infinity":
# We only consider parents with Depth: infinity
continue
elif l["scope"] == "shared" and lock_scope == "shared":
# Only compatible with shared locks (even by same
# principal)
continue
# Lock conflict
_logger.debug(
" -> DENIED due to locked parent {}".format(lock_string(l))
)
errcond.add_href(l["root"])
u = util.get_uri_parent(u)
if lock_depth == "infinity":
# Check child URLs for conflicting locks
childLocks = self.storage.get_lock_list(
url, include_root=False, include_children=True, token_only=False
)
for l in childLocks:
assert util.is_child_uri(url, l["root"])
# if util.is_child_uri(url, l["root"]):
_logger.debug(
" -> DENIED due to locked child {}".format(lock_string(l))
)
errcond.add_href(l["root"])
finally:
self._lock.release()
self._fail(
HTTP_MEDIATYPE_NOT_SUPPORTED,
"The server does not handle any body content.",
)
# Only accept Depth: 0 (but assume this, if omitted)
if environ.setdefault("HTTP_DEPTH", "0") != "0":
self._fail(HTTP_BAD_REQUEST, "Depth must be '0'.")
if provider.exists(path, environ):
self._fail(
HTTP_METHOD_NOT_ALLOWED,
"MKCOL can only be executed on an unmapped URL.",
)
parentRes = provider.get_resource_inst(util.get_uri_parent(path), environ)
if not parentRes or not parentRes.is_collection:
self._fail(HTTP_CONFLICT, "Parent must be an existing collection.")
# TODO: should we check If headers here?
# self._evaluate_if_headers(res, environ)
# Check for write permissions on the PARENT
self._check_write_permission(parentRes, "0", environ)
parentRes.create_collection(util.get_uri_name(path))
return util.send_status_response(environ, start_response, HTTP_CREATED)
except Exception as e:
error_list = [(res.get_href(), as_DAVError(e))]
return self._send_response(
environ, start_response, res, HTTP_NO_CONTENT, error_list
)
# --- Implement file-by-file processing -------------------------------
# Hidden paths (ancestors of failed deletes) {
def _get_context(self, environ, dav_res):
"""
@see: http://www.webdav.org/specs/rfc4918.html#rfc.section.9.4
"""
assert dav_res.is_collection
is_readonly = environ["wsgidav.provider"].is_readonly()
context = {
"htdocs": (self.config.get("mount_path") or "") + ASSET_SHARE,
"rows": [],
"version": __version__,
"display_path": compat.unquote(dav_res.get_href()),
"url": dav_res.get_href(), # util.make_complete_url(environ),
"parent_url": util.get_uri_parent(dav_res.get_href()),
"config": self.dir_config,
"is_readonly": is_readonly,
"access": "read-only" if is_readonly else "read-write",
"is_authenticated": False,
}
trailer = self.dir_config.get("response_trailer")
if trailer is True:
trailer = "${version} - ${time}"
if trailer:
trailer = trailer.replace(
"${version}",
"<a href="https://github.com/mar10/wsgidav/">WsgiDAV/{}</a>".format(
__version__
),
)
elif not destPath.startswith(provider.mount_path + provider.share_path):
# Inter-realm copying not supported, since its not possible to
# authentication-wise
self._fail(HTTP_BAD_GATEWAY, "Inter-realm copy/move is not supported.")
destPath = destPath[len(provider.mount_path + provider.share_path) :]
assert destPath.startswith("/")
# destPath is now relative to current mount/share starting with '/'
destRes = provider.get_resource_inst(destPath, environ)
destExists = destRes is not None
destParentRes = provider.get_resource_inst(
util.get_uri_parent(destPath), environ
)
if not destParentRes or not destParentRes.is_collection:
self._fail(HTTP_CONFLICT, "Destination parent must be a collection.")
self._evaluate_if_headers(srcRes, environ)
self._evaluate_if_headers(destRes, environ)
# Check permissions
# http://www.webdav.org/specs/rfc4918.html#rfc.section.7.4
if is_move:
self._check_write_permission(srcRes, "infinity", environ)
# Cannot remove members from locked-0 collections
if srcParentRes:
self._check_write_permission(srcParentRes, "0", environ)
# Cannot create or new members in locked-0 collections