Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def perm_del_protocol(issuer, kwargs):
"""
Checks if an account can delete protocols from an RSE.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin')
def perm_add_dids(issuer, kwargs):
"""
Checks if an account can bulk add data identifiers.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
# Check the accounts of the issued rules
if issuer != 'root' and not has_account_attribute(account=issuer, key='admin'):
for did in kwargs['dids']:
for rule in did.get('rules', []):
if rule['account'] != issuer:
return False
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin')
def perm_set_status(issuer, kwargs):
"""
Checks if an account can set status on an data identifier.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
if kwargs.get('open', False):
if issuer != 'root' and not has_account_attribute(account=issuer, key='admin'):
return False
return (_is_root(issuer) or has_account_attribute(account=issuer, key='admin')
or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer)) # NOQA: W503
def perm_config(issuer, kwargs):
"""
Checks if an account can read/write the configuration.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed to call the API call, otherwise False
"""
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin')
def perm_add_did(issuer, kwargs):
"""
Checks if an account can add an data identifier to a scope.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
# Check the accounts of the issued rules
if issuer != 'root' and not has_account_attribute(account=issuer, key='admin'):
for rule in kwargs.get('rules', []):
if rule['account'] != issuer:
return False
return _is_root(issuer)\
or has_account_attribute(account=issuer, key='admin')\
or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer)\
or kwargs['scope'] == u'mock'
def perm_attach_dids_to_dids(issuer, kwargs):
"""
Checks if an account can append an data identifier to the other data identifier.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
if _is_root(issuer) or has_account_attribute(account=issuer, key='admin'):
return True
else:
attachments = kwargs['attachments']
scopes = [did['scope'] for did in attachments]
scopes = list(set(scopes))
for scope in scopes:
if not rucio.core.scope.is_scope_owner(scope, issuer):
return False
return True
def perm_add_replicas(issuer, kwargs):
"""
Checks if an account can add replicas.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
is_root = _is_root(issuer)
is_temp = str(kwargs.get('rse', '')).endswith('_Temp')
is_admin = has_account_attribute(account=issuer, key='admin')
return is_root or is_temp or is_admin
def perm_attach_dids(issuer, kwargs):
"""
Checks if an account can append an data identifier to the other data identifier.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
return (_is_root(issuer)
or has_account_attribute(account=issuer, key='admin') # NOQA: W503
or rucio.core.scope.is_scope_owner(scope=kwargs['scope'], account=issuer) # NOQA: W503
or kwargs['scope'] == 'mock') # NOQA: W503
def perm_skip_availability_check(issuer, kwargs):
"""
Checks if an account can skip the availabity check to add/delete file replicas.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin')
def perm_del_rse_attribute(issuer, kwargs):
"""
Checks if an account can delete a RSE attribute.
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:returns: True if account is allowed, otherwise False
"""
if _is_root(issuer) or has_account_attribute(account=issuer, key='admin'):
return True
return False