Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'min_recent_requests': min_recent_requests,
'min_popularity': min_popularity
}
instance_id = str(uuid4()).split('-')[0]
elastic_url = config_get('c3po', 'elastic_url')
elastic_index = config_get('c3po', 'elastic_index')
ca_cert = False
if 'ca_cert' in c3po_options:
ca_cert = config_get('c3po', 'ca_cert')
auth = False
if ('elastic_user' in c3po_options) and ('elastic_pass' in c3po_options):
auth = HTTPBasicAuth(config_get('c3po', 'elastic_user'), config_get('c3po', 'elastic_pass'))
w = waiting_time
while not GRACEFUL_STOP.is_set():
if w < waiting_time:
w += 10
sleep(10)
continue
len_dids = did_queue.qsize()
if len_dids > 0:
logging.debug('(%s) %d did(s) in queue' % (instance_id, len_dids))
else:
logging.debug('(%s) no dids in queue' % (instance_id))
for _ in range(0, len_dids):
did = did_queue.get()
from sqlalchemy.exc import DatabaseError
from rucio.common.config import config_get
from rucio.common.exception import DatabaseException, UnsupportedOperation, RuleNotFound
from rucio.common.types import InternalAccount
from rucio.common.utils import chunks
from rucio.core.heartbeat import live, die, sanity_check
from rucio.core.monitor import record_counter
from rucio.core.did import list_expired_dids, delete_dids
logging.getLogger("requests").setLevel(logging.CRITICAL)
logging.basicConfig(stream=sys.stdout,
level=getattr(logging,
config_get('common', 'loglevel',
raise_exception=False,
default='DEBUG').upper()),
format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s')
GRACEFUL_STOP = threading.Event()
def undertaker(worker_number=1, total_workers=1, chunk_size=5, once=False):
"""
Main loop to select and delete dids.
"""
logging.info('Undertaker(%s): starting', worker_number)
logging.info('Undertaker(%s): started', worker_number)
executable = 'undertaker'
hostname = socket.gethostname()
pid = os.getpid()
import logging
import traceback
from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from rucio.common.config import config_get
from rucio.core import config as config_core
from rucio.core.rse import get_rse_id, get_rse_transfer_limits
queue_mode = config_get('conveyor', 'queue_mode', False, 'default')
if queue_mode.upper() == 'STRICT':
queue_mode = 'strict'
config_memcache = config_get('conveyor', 'using_memcache', False, 'False')
if config_memcache.upper() == 'TRUE':
using_memcache = True
else:
using_memcache = False
cache_time = int(config_get('conveyor', 'cache_time', False, 600))
REGION_SHORT = make_region().configure('dogpile.cache.memory',
expiration_time=cache_time)
def get_transfer_limits(activity, rse_id):
"""
Get RSE transfer limits.
:param activity: The activity.
# Authors:
# - Thomas Beermann, , 2016-2017
# - Cedric Serfon, , 2017
# - Andrew Lister, , 2019
#
# PY3K COMPATIBLE
import logging
from json import dumps, loads
from requests import post
from requests.auth import HTTPBasicAuth
from rucio.common.config import config_get, config_get_options
ELASTIC_URL = config_get('es-atlas', 'url')
ELASTIC_OPTIONS = config_get_options('es-atlas')
AUTH = None
if ('username' in ELASTIC_OPTIONS) and ('password' in ELASTIC_OPTIONS):
AUTH = HTTPBasicAuth(config_get('es-atlas', 'username'), config_get('es-atlas', 'password'))
if 'ca_cert' in ELASTIC_OPTIONS:
ELASTIC_CA_CERT = config_get('es-atlas', 'ca_cert')
else:
ELASTIC_CA_CERT = False
URL = ELASTIC_URL + '/atlas_rucio-popularity-*/_search'
def get_popularity(did):
elif self.auth_type == 'oidc':
self.creds['oidc_auto'] = config_get('client', 'oidc_auto')
self.creds['oidc_scope'] = config_get('client', 'oidc_scope')
self.creds['oidc_audience'] = config_get('client', 'oidc_audience')
self.creds['oidc_polling'] = config_get('client', 'oidc_polling')
self.creds['oidc_refresh_lifetime'] = config_get('client', 'oidc_refresh_lifetime')
self.creds['oidc_issuer'] = config_get('client', 'oidc_issuer')
if self.creds['oidc_auto']:
self.creds['oidc_username'] = config_get('client', 'oidc_username')
self.creds['oidc_password'] = config_get('client', 'oidc_password')
elif self.auth_type == 'x509':
self.creds['client_cert'] = path.abspath(path.expanduser(path.expandvars(config_get('client', 'client_cert'))))
self.creds['client_key'] = path.abspath(path.expanduser(path.expandvars(config_get('client', 'client_key'))))
elif self.auth_type == 'x509_proxy':
try:
self.creds['client_proxy'] = path.abspath(path.expanduser(path.expandvars(config_get('client', 'client_x509_proxy'))))
except NoOptionError as error:
# Recreate the classic GSI logic for locating the proxy:
# - $X509_USER_PROXY, if it is set.
# - /tmp/x509up_u`id -u` otherwise.
# If neither exists (at this point, we don't care if it exists but is invalid), then rethrow
if 'X509_USER_PROXY' in environ:
self.creds['client_proxy'] = environ['X509_USER_PROXY']
else:
fname = '/tmp/x509up_u%d' % geteuid()
if path.exists(fname):
self.creds['client_proxy'] = fname
else:
raise MissingClientParameter('Cannot find a valid X509 proxy; not in %s, $X509_USER_PROXY not set, and '
'\'x509_proxy\' not set in the configuration file.' % fname)
elif self.auth_type == 'ssh':
self.creds['ssh_private_key'] = path.abspath(path.expanduser(path.expandvars(config_get('client', 'ssh_private_key'))))
def __init__(self, external_host):
"""
Initializes the transfertool
:param external_host: The external host where the transfertool API is running
"""
super(FTS3MyProxyTransfertool, self).__init__(external_host)
self.hostcert = config_get('conveyor', 'hostcert', False, None)
self.hostkey = config_get('conveyor', 'hostkey', False, None)
self.deterministic = config_get_bool('conveyor', 'use_deterministic_id', False, False)
self.cmsweb_endpoint = config_get('conveyor', 'cmsweb_endpoint', False, 'cmsweb.cern.ch')
self.myproxy_endpoint = config_get('conveyor', 'myproxy_endpoint', False, 'px502.cern.ch')
import time
from prometheus_client import start_http_server
from statsd import StatsClient
from rucio.common.config import config_get
SERVER = config_get('monitor', 'carbon_server', raise_exception=False, default='localhost')
PORT = config_get('monitor', 'carbon_port', raise_exception=False, default=8125)
SCOPE = config_get('monitor', 'user_scope', raise_exception=False, default='rucio')
CLIENT = StatsClient(host=SERVER, port=PORT, prefix=SCOPE)
ENABLE_METRICS = config_get('monitor', 'enable_metrics', raise_exception=False, default=False)
if ENABLE_METRICS:
METRICS_PORT = config_get('monitor', 'metrics_port', raise_exception=False, default=8080)
start_http_server(METRICS_PORT)
def record_counter(counters, delta=1):
"""
Log one or more counters by arbitrary amounts
:param counters: The counter or a list of counters to be updated.
:param delta: The increment for the counter, by default increment by 1.
"""
if isinstance(counters, list):
for counter in counters:
if delta > 0:
CLIENT.incr(counter, delta)
else:
CLIENT.decr(counter, delta)
- James Perry, , 2019
PY3K COMPATIBLE
"""
try:
from ConfigParser import NoOptionError, NoSectionError
except ImportError:
from configparser import NoOptionError, NoSectionError
from rucio.common import config, exception
import importlib
if config.config_has_section('permission'):
try:
FALLBACK_POLICY = config.config_get('permission', 'policy')
except (NoOptionError, NoSectionError) as error:
FALLBACK_POLICY = 'generic'
elif config.config_has_section('policy'):
try:
FALLBACK_POLICY = config.config_get('policy', 'permission')
except (NoOptionError, NoSectionError) as error:
FALLBACK_POLICY = 'generic'
else:
FALLBACK_POLICY = 'generic'
if config.config_has_section('policy'):
try:
POLICY = config.config_get('policy', 'package') + ".permission"
except (NoOptionError, NoSectionError) as error:
# fall back to old system for now
POLICY = 'rucio.core.permission.' + FALLBACK_POLICY.lower()
from rucio.db.sqla.constants import BadFilesStatus, ReplicaState
from rucio.db.sqla.session import get_session
from rucio.common.config import config_get
from rucio.common.utils import chunks
from rucio.common.exception import DataIdentifierNotFound, ReplicaNotFound
from rucio.core.did import get_metadata
from rucio.core.replica import (update_replicas_states,
bulk_delete_bad_replicas, list_expired_temporary_unavailable_replicas)
from rucio.core import heartbeat
logging.basicConfig(stream=stdout,
level=getattr(logging,
config_get('common', 'loglevel',
raise_exception=False,
default='DEBUG').upper()),
format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s')
graceful_stop = threading.Event()
def minos_tu_expiration(bulk=1000, once=False, sleep_time=60):
"""
Creates a Minos Temporary Unavailable Replicas Expiration Worker that
gets the list of expired TU replicas and sets them back to AVAILABLE.
:param bulk: The number of requests to process.
:param once: Run only once.
:param sleep_time: Time between two cycles.
if not brokers_resolved:
logging.fatal('No brokers resolved.')
return
if not broker_timeout: # Allow zero in config
broker_timeout = None
logging.info('[broker] checking authentication method')
use_ssl = True
try:
use_ssl = config_get_bool('messaging-hermes', 'use_ssl')
except:
logging.info('[broker] could not find use_ssl in configuration -- please update your rucio.cfg')
port = config_get_int('messaging-hermes', 'port')
vhost = config_get('messaging-hermes', 'broker_virtual_host', raise_exception=False)
if not use_ssl:
username = config_get('messaging-hermes', 'username')
password = config_get('messaging-hermes', 'password')
port = config_get_int('messaging-hermes', 'nonssl_port')
conns = []
for broker in brokers_resolved:
if not use_ssl:
logging.info('[broker] setting up username/password authentication: %s' % broker)
con = stomp.Connection12(host_and_ports=[(broker, port)],
vhost=vhost,
keepalive=True,
timeout=broker_timeout)
else:
logging.info('[broker] setting up ssl cert/key authentication: %s' % broker)
con = stomp.Connection12(host_and_ports=[(broker, port)],