Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fname = generate_didname(metadata=metadata, dsn=dsn, did_type='file')
lfns.append(fname)
logging.info('%s Generating file %s in dataset %s', prepend_str, fname, dsn)
physical_fname = '%s/%s' % (tmpdir, "".join(fname.split('/')))
physical_fnames.append(physical_fname)
generate_file(physical_fname, filesize)
fnames.append(fname)
logging.info('%s Upload %s to %s', prepend_str, dsn, site)
dsn = '%s:%s' % (scope, dsn)
status = upload(files=lfns, scope=scope, metadata=metadata, rse=site, account=account, source_dir=tmpdir, worker_number=worker_number, total_workers=total_workers, dataset_lifetime=dataset_lifetime, did=dsn, set_metadata=set_metadata)
for physical_fname in physical_fnames:
remove(physical_fname)
rmdir(tmpdir)
if status:
monitor.record_counter(counters='automatix.addnewdataset.done', delta=1)
monitor.record_counter(counters='automatix.addnewfile.done', delta=nbfiles)
monitor.record_timer('automatix.datasetinjection', (time() - start_time) * 1000)
break
else:
logging.info('%s Failed to upload files. Will retry another time (attempt %s/%s)', prepend_str, str(retry + 1), str(totretries))
if once is True:
logging.info('%s Run with once mode. Exiting', prepend_str)
break
tottime = time() - starttime
if status:
logging.info('%s It took %s seconds to upload one dataset on %s', prepend_str, str(tottime), str(sites))
if sleep_time > tottime:
logging.info('%s Will sleep for %s seconds', prepend_str, str(sleep_time - tottime))
sleep(sleep_time - tottime)
else:
logging.info('%s Retrying a new upload', prepend_str)
heartbeat.die(executable, hostname, pid, hb_thread)
ssl_key_file=config_get('messaging-fts3', 'ssl_key_file'),
ssl_cert_file=config_get('messaging-fts3', 'ssl_cert_file'),
vhost=config_get('messaging-fts3', 'broker_virtual_host', raise_exception=False),
reconnect_attempts_max=999))
logging.info('receiver started')
while not graceful_stop.is_set():
heartbeat.live(executable, hostname, pid, hb_thread)
for conn in conns:
if not conn.is_connected():
logging.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0])
record_counter('daemons.messaging.fts3.reconnect.%s' % conn.transport._Transport__host_and_ports[0][0].split('.')[0])
conn.set_listener('rucio-messaging-fts3', Receiver(broker=conn.transport._Transport__host_and_ports[0], id=id, total_threads=total_threads, full_mode=full_mode))
conn.start()
conn.connect()
conn.subscribe(destination=config_get('messaging-fts3', 'destination'),
id='rucio-messaging-fts3',
ack='auto')
time.sleep(1)
logging.info('receiver graceful stop requested')
for conn in conns:
try:
conn.disconnect()
except Exception:
logging.warning(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
elif match('.*ORA-03135.*', str(e.args[0])):
logging.warning(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
else:
logging.error(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
except (DatabaseException, DatabaseError) as e:
if match('.*QueuePool.*', str(e.args[0])):
logging.warning(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
elif match('.*ORA-03135.*', str(e.args[0])):
logging.warning(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
else:
logging.critical(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
except Exception as e:
logging.critical(traceback.format_exc())
record_counter('rule.judge.exceptions.%s' % e.__class__.__name__)
if once:
break
die(executable=executable, hostname=hostname, pid=pid, thread=current_thread)
def touch_transfer(external_host, transfer_id, session=None):
"""
Update the timestamp of requests in a transfer. Fails silently if the transfer_id does not exist.
:param request_host: Name of the external host.
:param transfer_id: External transfer job id as a string.
:param session: Database session to use.
"""
record_counter('core.request.touch_transfer')
try:
# don't touch it if it's already touched in 30 seconds
session.query(models.Request).with_hint(models.Request, "INDEX(REQUESTS REQUESTS_EXTERNALID_UQ)", 'oracle')\
.filter_by(external_id=transfer_id)\
.filter(models.Request.state == RequestState.SUBMITTED)\
.filter(models.Request.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=30))\
.update({'updated_at': datetime.datetime.utcnow()}, synchronize_session=False)
except IntegrityError as error:
raise RucioException(error.args)
:returns: True if update was successful.
"""
job = None
params_dict = {"params": {"priority": priority}}
params_str = json.dumps(params_dict, cls=APIEncoder)
job = requests.post('%s/jobs/%s' % (self.external_host, transfer_id),
verify=self.verify,
data=params_str,
cert=self.cert,
headers={'Content-Type': 'application/json'},
timeout=timeout) # TODO set to 3 in conveyor
if job and job.status_code == 200:
record_counter('transfertool.fts3.%s.update_priority.success' % self.__extract_host(self.external_host))
return job.json()
record_counter('transfertool.fts3.%s.update_priority.failure' % self.__extract_host(self.external_host))
raise Exception('Could not update priority of transfer: %s', job.content)
ret = transfer_core.update_transfer_state(external_host=None, transfer_id=task_id, state=resps[task_id])
record_counter('daemons.conveyor.poller.update_request_state.%s' % ret)
else:
for transfer_id in resps:
try:
transf_resp = resps[transfer_id]
# transf_resp is None: Lost.
# is Exception: Failed to get fts job status.
# is {}: No terminated jobs.
# is {request_id: {file_status}}: terminated jobs.
if transf_resp is None:
transfer_core.update_transfer_state(external_host, transfer_id, RequestState.LOST, logging_prepend_str=prepend_str)
record_counter('daemons.conveyor.poller.transfer_lost')
elif isinstance(transf_resp, Exception):
logging.warning(prepend_str + "Failed to poll FTS(%s) job (%s): %s" % (external_host, transfer_id, transf_resp))
record_counter('daemons.conveyor.poller.query_transfer_exception')
else:
for request_id in transf_resp:
if request_id in request_ids:
ret = request_core.update_request_state(transf_resp[request_id], logging_prepend_str=prepend_str)
# if True, really update request content; if False, only touch request
if ret:
cnt += 1
record_counter('daemons.conveyor.poller.update_request_state.%s' % ret)
# should touch transfers.
# Otherwise if one bulk transfer includes many requests and one is not terminated, the transfer will be poll again.
transfer_core.touch_transfer(external_host, transfer_id)
except (DatabaseException, DatabaseError) as error:
if re.match('.*ORA-00054.*', error.args[0]) or re.match('.*ORA-00060.*', error.args[0]) or 'ERROR 1205 (HY000)' in error.args[0]:
logging.warn(prepend_str + "Lock detected when handling request %s - skipping" % request_id)
else:
rse_ids = [rse['id'] for rse in rses]
else:
rse_ids = None
for activity in activities:
if activity_next_exe_time[activity] > time.time():
time.sleep(1)
continue
logging.info("%s:%s Starting to submit jobs on activity: %s" % (process, thread, activity))
logging.info("%s:%s Starting to get transfers" % (process, thread))
ts = time.time()
transfers = get_transfers_from_requests(process, total_processes, thread, total_threads, rse_ids, mock, bulk, activity, activity_shares, scheme)
record_timer('daemons.conveyor.submitter.get_transfers_from_requests.per_transfer', (time.time() - ts) * 1000/(len(transfers) if len(transfers) else 1))
record_counter('daemons.conveyor.submitter.get_transfers_from_requests', len(transfers))
logging.info("%s:%s Starting to submit transfers" % (process, thread))
for request_id in transfers:
try:
transfer = transfers[request_id]
ts = time.time()
tmp_metadata = transfer['file_metadata']
transfer_ids = fts3.submit_transfers([transfer, ], tmp_metadata)
record_timer('daemons.conveyor.submitter.submit_transfer', (time.time() - ts) * 1000)
if 'previous_attempt_id' in transfer['file_metadata']:
logging.info('COPYING REQUEST %s PREVIOUS %s DID %s:%s selection_strategy %s FROM %s TO %s USING %s TRANSFERID: %s' % (transfer['request_id'],
transfer['file_metadata']['previous_attempt_id'],
transfer['file_metadata']['scope'],
transfer['file_metadata']['name'],
fts_source_strategy,
logging.info('RECEIVED DID %s:%s FROM %s TO %s REQUEST %s TRANSFER_ID %s STATE %s' % (response['scope'],
response['name'],
response['src_rse'],
response['dst_rse'],
response['request_id'],
response['transfer_id'],
response['new_state']))
if self.__full_mode:
ret = request.update_request_state(response)
record_counter('daemons.conveyor.receiver.update_request_state.%s' % ret)
else:
try:
logging.debug("Update request %s update time" % response['request_id'])
set_transfer_update_time(response['external_host'], response['transfer_id'], datetime.datetime.utcnow() - datetime.timedelta(hours=24))
record_counter('daemons.conveyor.receiver.set_transfer_update_time')
except Exception as error:
logging.debug("Failed to update transfer's update time: %s" % str(error))
except Exception:
logging.critical(traceback.format_exc())
transfer['dest_urls'],
transfer_ids[transfer['request_id']]['external_host'] if transfer['request_id'] in transfer_ids else None,
transfer_ids[transfer['request_id']]['external_id'] if transfer['request_id'] in transfer_ids else None))
else:
logging.info('COPYING REQUEST %s DID %s:%s selection_strategy %s FROM %s TO %s USING %s TRANSFERID: %s' % (transfer['request_id'],
transfer['file_metadata']['scope'],
transfer['file_metadata']['name'],
fts_source_strategy,
transfer['sources'],
transfer['dest_urls'],
transfer_ids[transfer['request_id']]['external_host'] if transfer['request_id'] in transfer_ids else None,
transfer_ids[transfer['request_id']]['external_id'] if transfer['request_id'] in transfer_ids else None))
if not transfer['request_id'] in transfer_ids:
xfers_ret = {transfer['request_id']: {'state': RequestState.SUBMITTING, 'external_host': transfer['external_host'], 'external_id': None, 'dest_url': transfer['dest_urls'][0]}}
request.set_request_transfers(xfers_ret)
record_counter('daemons.conveyor.submitter.lost_request.%s' % urlparse.urlparse(transfer['external_host']).hostname.replace('.', '_'))
logging.warn("Failed to submit request: %s, set request SUBMITTING" % (transfer['request_id']))
else:
xfers_ret = {transfer['request_id']: {'state': RequestState.SUBMITTED,
'external_host': transfer_ids[transfer['request_id']]['external_host'],
'external_id': transfer_ids[transfer['request_id']]['external_id'],
'dest_url': transfer['dest_urls'][0]}}
request.set_request_transfers(xfers_ret)
except UnsupportedOperation, e:
# The replica doesn't exist, need to cancel the request
logging.warning(e)
logging.info('Cancelling transfer request %s' % transfer['request_id'])
try:
# TODO: for now, there is only ever one destination
request.cancel_request_did(transfer['file_metadata']['scope'], transfer['file_metadata']['name'], transfer['dest_urls'][0])
except Exception, e:
logging.warning('Cannot cancel request: %s' % str(e))
"""
Query the detailed status of a transfer in FTS3 via JSON.
:param transfer_id: FTS transfer identifier as a string.
:returns: Detailed transfer status information as a dictionary.
"""
files = None
files = requests.get('%s/jobs/%s/files' % (self.external_host, transfer_id),
verify=self.verify,
cert=self.cert,
headers={'Content-Type': 'application/json'},
timeout=5)
if files and (files.status_code == 200 or files.status_code == 207):
record_counter('transfertool.fts3.%s.query_details.success' % self.__extract_host(self.external_host))
return files.json()
record_counter('transfertool.fts3.%s.query_details.failure' % self.__extract_host(self.external_host))
return