Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.workflow.register(self.bus)
# Build mercurial worker and queue
self.mercurial = MercurialWorker(
QUEUE_MERCURIAL,
QUEUE_MERCURIAL_APPLIED,
repositories=self.workflow.get_repositories(
taskcluster_config.secrets["repositories"],
cache_root,
default_ssh_key=taskcluster_config.secrets["ssh_key"],
),
)
self.mercurial.register(self.bus)
# Setup monitoring for newly created tasks
self.monitoring = Monitoring(
taskcluster_config,
QUEUE_MONITORING,
taskcluster_config.secrets["admins"],
MONITORING_PERIOD,
)
self.monitoring.register(self.bus)
# Setup monitoring for newly created community tasks
if community_config is not None:
self.community_monitoring = Monitoring(
community_taskcluster_config,
QUEUE_MONITORING_COMMUNITY,
taskcluster_config.secrets["admins"],
MONITORING_PERIOD,
)
self.community_monitoring.register(self.bus)
),
)
self.mercurial.register(self.bus)
# Setup monitoring for newly created tasks
self.monitoring = Monitoring(
taskcluster_config,
QUEUE_MONITORING,
taskcluster_config.secrets["admins"],
MONITORING_PERIOD,
)
self.monitoring.register(self.bus)
# Setup monitoring for newly created community tasks
if community_config is not None:
self.community_monitoring = Monitoring(
community_taskcluster_config,
QUEUE_MONITORING_COMMUNITY,
taskcluster_config.secrets["admins"],
MONITORING_PERIOD,
)
self.community_monitoring.register(self.bus)
else:
self.community_monitoring = None
self.bugbug_utils = BugbugUtils(self.workflow.api)
self.bugbug_utils.register(self.bus)
else:
self.workflow = None
self.mercurial = None
self.monitoring = None
self.community_monitoring = None