Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def re_evaluator(once=False):
"""
Main loop to check the re-evaluation of dids.
"""
hostname = socket.gethostname()
pid = os.getpid()
current_thread = threading.current_thread()
paused_dids = {} # {(scope, name): datetime}
# Make an initial heartbeat so that all judge-evaluators have the correct worker number on the next try
live(executable='rucio-judge-evaluator', hostname=hostname, pid=pid, thread=current_thread, older_than=60 * 30)
graceful_stop.wait(1)
while not graceful_stop.is_set():
try:
# heartbeat
heartbeat = live(executable='rucio-judge-evaluator', hostname=hostname, pid=pid, thread=current_thread, older_than=60 * 30)
start = time.time() # NOQA
# Refresh paused dids
paused_dids = dict((k, v) for k, v in iteritems(paused_dids) if datetime.utcnow() < v)
# Select a bunch of dids for re evaluation for this worker
dids = get_updated_dids(total_workers=heartbeat['nr_threads'] - 1,
worker_number=heartbeat['assign_thread'],
limit=100,
logging.error(err)
if 'activemq' in services_list:
try:
activemq_endpoint = config_get('hermes', 'activemq_endpoint', False, None)
if not activemq_endpoint:
logging.error('ActiveMQ defined in the services list, but no endpoint can be find. Exiting')
sys.exit(1)
except Exception as err:
logging.error(err)
executable = 'hermes2'
hostname = socket.getfqdn()
pid = os.getpid()
hb_thread = threading.current_thread()
heartbeat.sanity_check(executable=executable, hostname=hostname, pid=pid, thread=hb_thread)
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
# Make an initial heartbeat so that all daemons have the correct worker number on the next try
GRACEFUL_STOP.wait(10)
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread, older_than=3600)
while not GRACEFUL_STOP.is_set():
message_status = deepcopy(services_list)
stime = time.time()
try:
start_time = time.time()
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread, older_than=3600)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
messages = retrieve_messages(bulk=bulk,
thread=heart_beat['assign_thread'],
total_threads=heart_beat['nr_threads'])
hostname = socket.gethostname()
pid = os.getpid()
current_thread = threading.current_thread()
paused_rules = {} # {rule_id: datetime}
# Make an initial heartbeat so that all judge-repairers have the correct worker number on the next try
executable = 'judge-repairer'
live(executable=executable, hostname=hostname, pid=pid, thread=current_thread, older_than=60 * 30)
graceful_stop.wait(1)
while not graceful_stop.is_set():
try:
# heartbeat
heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=current_thread, older_than=60 * 30)
start = time.time()
# Refresh paused rules
iter_paused_rules = deepcopy(paused_rules)
for key in iter_paused_rules:
if datetime.utcnow() > paused_rules[key]:
del paused_rules[key]
# Select a bunch of rules for this worker to repair
rules = get_stuck_rules(total_workers=heartbeat['nr_threads'],
worker_number=heartbeat['assign_thread'],
delta=-1 if once else 1800,
limit=100,
blacklisted_rules=[key for key in paused_rules])
logging.debug('rule_repairer[%s/%s] index query time %f fetch size is %d' % (heartbeat['assign_thread'], heartbeat['nr_threads'], time.time() - start, len(rules)))
patterns = pattern.split(",")
suspicious_patterns = [re.compile(pat.strip()) for pat in patterns]
logging.debug("Suspicious patterns: %s" % [pat.pattern for pat in suspicious_patterns])
retry_protocol_mismatches = conveyor_config.get('retry_protocol_mismatches', False)
executable = 'conveyor-finisher'
if activities:
activities.sort()
executable += '--activities ' + str(activities)
hostname = socket.getfqdn()
pid = os.getpid()
hb_thread = threading.current_thread()
heartbeat.sanity_check(executable=executable, hostname=hostname)
# Make an initial heartbeat so that all finishers have the correct worker number on the next try
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logging.info('%s Finisher starting - db_bulk(%i) bulk (%i)', prepend_str, db_bulk, bulk)
graceful_stop.wait(10)
while not graceful_stop.is_set():
start_time = time.time()
try:
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread, older_than=3600)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logging.debug('%s Starting new cycle', prepend_str)
if activities is None:
activities = [None]
for activity in activities:
logging.debug('%s Working on activity %s', prepend_str, activity)
:param sleep_time: Days to sleep.
"""
logging.info('Starting Light Injector %s-%s: Will work on RSEs: %s', worker_number, total_workers, str(rses))
pid = os.getpid()
thread = threading.current_thread()
hostname = socket.gethostname()
executable = ' '.join(sys.argv)
hash_executable = hashlib.sha256(sys.argv[0] + ''.join(rses)).hexdigest()
sanity_check(executable=None, hostname=hostname)
injecting_time = time.time()
while not GRACEFUL_STOP.is_set():
try:
# heartbeat
heartbeat = live(executable=executable, hostname=hostname, pid=pid, thread=thread, hash_executable=hash_executable)
logging.info('Light Injector({0[worker_number]}/{0[total_workers]}): Live gives {0[heartbeat]}'.format(locals()))
nothing_to_do = True
random.shuffle(rses)
for rse in rses:
inject(rse, older_than)
if once:
break
next_inject_time = injecting_time + 3600 * 24 * sleep_time
logging.info('Will sleep %s seconds(about %s days)' % (next_inject_time - time.time(), (next_inject_time - time.time()) * 1.0 / 86400))
while not GRACEFUL_STOP.is_set() and time.time() < next_inject_time:
time.sleep(1)
except:
logging.critical(traceback.format_exc())
:param once: Run only once.
"""
sleep_time = 60
update_history_threshold = 3600
update_history_time = time.time()
executable = ' '.join(argv)
hostname = socket.getfqdn()
pid = os.getpid()
hb_thread = threading.current_thread()
heartbeat.sanity_check(executable=executable, hostname=hostname)
while not graceful_stop.is_set():
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
stime = time.time()
replicas = []
try:
replicas = list_bad_replicas(limit=bulk, thread=heart_beat['assign_thread'], total_threads=heart_beat['nr_threads'])
for replica in replicas:
scope, name, rse_id, rse = replica['scope'], replica['name'], replica['rse_id'], replica['rse']
logging.info(prepend_str + 'Working on %s:%s on %s' % (scope, name, rse))
list_replicas = get_replicas_state(scope=scope, name=name)
if ReplicaState.AVAILABLE not in list_replicas and ReplicaState.TEMPORARY_UNAVAILABLE not in list_replicas:
logging.info(prepend_str + 'File %s:%s has no other available or temporary available replicas, it will be marked as lost' % (scope, name))
try:
update_rules_for_lost_replica(scope=scope, name=name, rse_id=rse_id, nowait=True)
logging.info(prepend_str + 'Minos starting')
time.sleep(10) # To prevent running on the same partition if all the daemons restart at the same time
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
states_mapping = {BadPFNStatus.BAD: BadFilesStatus.BAD,
BadPFNStatus.SUSPICIOUS: BadFilesStatus.SUSPICIOUS,
BadPFNStatus.TEMPORARY_UNAVAILABLE: BadFilesStatus.TEMPORARY_UNAVAILABLE}
logging.info(prepend_str + 'Minos started')
chunk_size = 10 # The chunk size used for the commits
while not graceful_stop.is_set():
start_time = time.time()
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
pfns = []
try:
bad_replicas = {}
temporary_unvailables = {}
pfns = get_bad_pfns(thread=heart_beat['assign_thread'], total_threads=heart_beat['nr_threads'], limit=bulk)
# Class the PFNs into bad_replicas and temporary_unavailable
for pfn in pfns:
path = pfn['pfn']
account = pfn['account']
reason = pfn['reason']
expires_at = pfn['expires_at']
state = pfn['state']
if states_mapping[state] in [BadFilesStatus.BAD, BadFilesStatus.SUSPICIOUS]:
if (account, reason, state) not in bad_replicas:
Creates a Minos Worker that gets a list of bad PFNs,
extract the scope, name and rse_id and fill the bad_replicas table.
:param bulk: The number of requests to process.
:param once: Run only once.
:param sleep_time: Time between two cycles.
"""
executable = ' '.join(argv)
hostname = socket.getfqdn()
pid = os.getpid()
hb_thread = threading.current_thread()
heartbeat.sanity_check(executable=executable, hostname=hostname)
hb_thread = threading.current_thread()
heartbeat.sanity_check(executable=executable, hostname=hostname)
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logging.info(prepend_str + 'Minos starting')
time.sleep(10) # To prevent running on the same partition if all the daemons restart at the same time
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
states_mapping = {BadPFNStatus.BAD: BadFilesStatus.BAD,
BadPFNStatus.SUSPICIOUS: BadFilesStatus.SUSPICIOUS,
BadPFNStatus.TEMPORARY_UNAVAILABLE: BadFilesStatus.TEMPORARY_UNAVAILABLE}
logging.info(prepend_str + 'Minos started')
chunk_size = 10 # The chunk size used for the commits
while not graceful_stop.is_set():
start_time = time.time()
heartbeat.sanity_check(executable=executable, hostname=hostname)
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logging.info(prepend_str + 'Minos Temporary Expiration starting')
time.sleep(10) # To prevent running on the same partition if all the daemons restart at the same time
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
logging.info(prepend_str + 'Minos Temporary Expiration started')
chunk_size = 10 # The chunk size used for the commits
while not graceful_stop.is_set():
start_time = time.time()
heart_beat = heartbeat.live(executable, hostname, pid, hb_thread)
prepend_str = 'Thread [%i/%i] : ' % (heart_beat['assign_thread'], heart_beat['nr_threads'])
try:
# Get list of expired TU replicas
logging.info(prepend_str + 'Getting list of expired replicas')
expired_replicas = list_expired_temporary_unavailable_replicas(total_workers=heart_beat['nr_threads'],
worker_number=heart_beat['assign_thread'],
limit=1000)
logging.info(prepend_str + '%s expired replicas returned' % len(expired_replicas))
logging.debug(prepend_str + 'List of expired replicas returned %s' % str(expired_replicas))
replicas = []
bad_replicas = []
for replica in expired_replicas:
replicas.append({'scope': replica[0], 'name': replica[1], 'rse_id': replica[2], 'state': ReplicaState.AVAILABLE})
bad_replicas.append({'scope': replica[0], 'name': replica[1], 'rse_id': replica[2], 'state': BadFilesStatus.TEMPORARY_UNAVAILABLE})
session = get_session()