Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_replication_rule(self, rule_id, estimate_ttc=False):
"""
Get a replication rule.
:param rule_id: The id of the rule to be retrieved.
:param estimate_ttc: bool, if rule_info should return ttc information
:raises: RuleNotFound
"""
path = self.RULE_BASEURL + '/' + rule_id
url = build_url(choice(self.list_hosts), path=path)
data = dumps({'estimate_ttc': estimate_ttc})
r = self._send_request(url, type='GET', data=data)
if r.status_code == codes.ok:
return next(self._load_json_data(r))
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def add_files(self, lfns, ignore_availability=False):
"""
Bulk add files :
- Create the file and replica.
- If doesn't exist create the dataset containing the file as well as a rule on the dataset on ANY sites.
- Create all the ascendants of the dataset if they do not exist
:param lfns: List of lfn (dictionary {'lfn': , 'rse': , 'bytes': , 'adler32': , 'guid': , 'pfn': }
:param ignore_availability: A boolean to ignore blacklisted sites.
"""
path = '/'.join([self.DIRAC_BASEURL, 'addfiles'])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='POST', data=dumps({'lfns': lfns, 'ignore_availability': ignore_availability}))
if r.status_code == codes.created:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_dataset_replicas(self, scope, name, deep=False):
"""
List dataset replicas for a did (scope:name).
:param scope: The scope of the dataset.
:param name: The name of the dataset.
:param deep: Lookup at the file level.
:returns: A list of dict dataset replicas.
"""
payload = {}
if deep:
payload = {'deep': True}
url = build_url(self.host,
path='/'.join([self.REPLICAS_BASEURL, quote_plus(scope), quote_plus(name), 'datasets']),
params=payload)
r = self._send_request(url, type='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
if result.status_code != codes.ok: # pylint: disable-msg=E1101
exc_cls, exc_msg = self._get_exception(headers=result.headers,
status_code=result.status_code,
data=result.content)
raise exc_cls(exc_msg)
self.ssh_challenge_token = result.headers['x-rucio-ssh-challenge-token']
LOG.debug('got new ssh challenge token \'%s\'' % self.ssh_challenge_token)
# sign the challenge token with the private key
with open(private_key_path, 'r') as fd_private_key_path:
private_key = fd_private_key_path.read()
signature = ssh_sign(private_key, self.ssh_challenge_token)
headers['X-Rucio-SSH-Signature'] = signature
url = build_url(self.auth_host, path='auth/ssh')
result = None
for retry in range(self.AUTH_RETRIES + 1):
try:
result = self.session.get(url, headers=headers, verify=self.ca_cert)
break
except ConnectionError as error:
if 'alert certificate expired' in str(error):
raise CannotAuthenticate(str(error))
LOG.warning('ConnectionError: ' + str(error))
self.ca_cert = False
if retry > self.request_retries:
raise
if not result:
LOG.error('cannot get auth_token')
def get_global_account_usage(self, account, rse_expression=None):
"""
List the account usage for one or all RSE expressions of this account.
:param account: The account name.
:param rse_expression: The rse expression.
"""
if rse_expression:
path = '/'.join([self.ACCOUNTS_BASEURL, account, 'usage', 'global', quote_plus(rse_expression)])
else:
path = '/'.join([self.ACCOUNTS_BASEURL, account, 'usage', 'global'])
url = build_url(choice(self.list_hosts), path=path)
res = self._send_request(url, type='GET')
if res.status_code == codes.ok:
return self._load_json_data(res)
else:
exc_cls, exc_msg = self._get_exception(headers=res.headers, status_code=res.status_code, data=res.content)
raise exc_cls(exc_msg)
def attach_dids(self, scope, name, dids, rse=None):
"""
Attach data identifier.
:param scope: The scope name.
:param name: The data identifier name.
:param dids: The content.
:param rse: The RSE name when registering replicas.
"""
path = '/'.join([self.DIDS_BASEURL, quote_plus(scope), quote_plus(name), 'dids'])
url = build_url(choice(self.list_hosts), path=path)
data = {'dids': dids}
if rse:
data['rse'] = rse
r = self._send_request(url, type='POST', data=render_json(**data))
if r.status_code == codes.created:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
client_secret = next((secrets[i] for i in secrets if 'issuer' in secrets[i] and # NOQA: W504
secrets[i]['issuer'] == kwargs.get('issuer')), None)
redirect_url = kwargs.get('redirect_uri', None)
if not redirect_url:
redirect_to = kwargs.get("redirect_to", "auth/oidc_token")
redirect_urls = [u for u in client_secret["redirect_uris"] if redirect_to in u]
redirect_url = random.choice(redirect_urls)
if not redirect_url:
raise CannotAuthenticate("Could not pick any redirect URL(s) from the ones defined "
+ "in Rucio OIDC Client configuration file.") # NOQA: W503
auth_args["redirect_uri"] = redirect_url
oidc_client = OIDC_CLIENTS[client_secret["issuer"]]
auth_args["client_id"] = oidc_client.client_id
if kwargs.get('first_init', False):
auth_url = build_url(oidc_client.authorization_endpoint, params=auth_args)
return {'redirect': redirect_url, 'auth_url': auth_url}
oidc_client.construct_AuthorizationRequest(request_args=auth_args)
# parsing the authorization query string by the Rucio OIDC Client (creates a Grant)
oidc_client.parse_response(AuthorizationResponse,
info='code=' + kwargs.get('code', rndstr()) + '&state=' + auth_args['state'],
sformat="urlencoded")
return {'client': oidc_client, 'state': auth_args['state']}
except Exception:
raise CannotAuthenticate(traceback.format_exc())
write: integer representing the priority of this procotol for write operations (default = -1)
delete: integer representing the priority of this procotol for delete operations (default = -1)
extended_attributes: miscellaneous protocol specific information e.g. spacetoken for SRM (default = None)
:return: True if protocol was created successfully else False.
:raises Duplicate: if protocol with same hostname, port and protocol identifier
already exists for the given RSE.
:raises RSENotFound: if the RSE doesn't exist.
:raises KeyNotFound: if params is missing manadtory attributes to create the
protocol.
:raises AccessDenied: if not authorized.
"""
scheme = params['scheme']
path = '/'.join([self.RSE_BASEURL, rse, 'protocols', scheme])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='POST', data=dumps(params))
if r.status_code == codes.created:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_identities(self, account):
"""
List all identities on an account.
:param account: The account name.
"""
path = '/'.join([self.ACCOUNTS_BASEURL, account, 'identities'])
url = build_url(choice(self.list_hosts), path=path)
res = self._send_request(url)
if res.status_code == codes.ok:
identities = self._load_json_data(res)
return identities
else:
exc_cls, exc_msg = self._get_exception(headers=res.headers, status_code=res.status_code, data=res.content)
raise exc_cls(exc_msg)