Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(tmp_path):
temp_dir = tmp_path
os.environ['EFB_DATA_PATH'] = str(temp_dir)
config_path = ehforwarderbot.utils.get_config_path()
config = {
"master_channel": "tests.mocks.master.MockMasterChannel",
"slave_channels": ["tests.mocks.slave.MockSlaveChannel"],
"middlewares": ["tests.mocks.middleware.MockMiddleware"]
}
with open(config_path, 'w') as f:
yaml.dump(config, f)
config = ehforwarderbot.config.load_config()
ehforwarderbot.__main__.init(config)
assert coordinator.master.channel_id == master.MockMasterChannel.channel_id
assert slave.MockSlaveChannel.channel_id in coordinator.slaves
assert coordinator.middlewares[0].middleware_id == middleware.MockMiddleware.middleware_id
def dump_and_load_config(config):
config_path = ehforwarderbot.utils.get_config_path()
with config_path.open('w') as f:
yaml.dump(config, f)
return ehforwarderbot.config.load_config()
def test_no_config_file():
with tempfile.TemporaryDirectory() as f:
os.environ['EFB_DATA_PATH'] = f
with pytest.raises(FileNotFoundError):
ehforwarderbot.config.load_config()
def main():
args = parser.parse_args()
if getattr(args, "version", None):
versions = _("EH Forwarder Bot\n"
"Version: {version}\n"
"Python version:\n"
"{py_version}\n"
"Running on profile \"{profile}\"."
).format(version=__version__, py_version=sys.version, profile=args.profile)
try:
if args.profile:
coordinator.profile = str(args.profile)
conf = config.load_config()
# Master channel
master_channel: EFBChannel = utils.locate_module(conf['master_channel'], 'master')
instance_id = conf['master_channel'].split('#', 1)[1:]
instance_id = (instance_id and instance_id[0]) or _("Default instance")
versions += "\n\n" + _("Master channel:") + "\n " + _("{name} ({id}) {version} # {instance_id}") \
.format(name=master_channel.channel_name,
id=master_channel.channel_id,
version=master_channel.__version__,
instance_id=instance_id)
versions += "\n\n" + ngettext("Slave channel:", "Slave channels:", len(conf['slave_channels']))
for i in conf['slave_channels']:
instance_id = i.split('#', 1)[1:]
instance_id = (instance_id and instance_id[0]) or _("Default instance")
slave_channel: EFBChannel = utils.locate_module(i, 'slave')
versions += "\n " + _("{name} ({id}) {version} # {instance_id}") \
.format(name=slave_channel.channel_name,
instance_id = (instance_id and instance_id[0]) or _("Default instance")
middleware: EFBMiddleware = utils.locate_module(i, 'middleware')
versions += "\n " + _("{name} ({id}) {version} # {instance_id}") \
.format(name=middleware.middleware_name,
id=middleware.middleware_id,
version=middleware.__version__,
instance_id=instance_id)
else:
versions += "\n " + _("No middleware is enabled.")
finally:
print(versions)
else:
if args.profile:
coordinator.profile = str(args.profile)
conf = config.load_config()
setup_logging(args, conf)
setup_telemetry(conf['telemetry'])
atexit.register(stop_gracefully)
signal.signal(signal.SIGTERM, stop_gracefully)
signal.signal(signal.SIGINT, stop_gracefully)
init(conf)
poll()
"""
global shared_data, master_thread, slave_threads
# Init shared data object
shared_data = SharedData(args.profile)
# Include custom channels
custom_channel_path = utils.get_custom_channel_path()
if custom_channel_path not in sys.path:
sys.path.insert(0, custom_channel_path)
# Initialize all channels
# (Load libraries and modules and init them with Queue `q`)
l = logging.getLogger("ehForwarderBot")
conf = config.load_config()
slaves = {}
for i in conf['slave_channels']:
l.critical("\x1b[0;37;46m Initializing slave %s... \x1b[0m" % str(i))
obj = pydoc.locate(i)
shared_data.add_channel(obj(shared_data), is_slave=True)
l.critical("\x1b[0;37;42m Slave channel %s (%s) initialized. \x1b[0m" % (obj.channel_name, obj.channel_id))
l.critical("\x1b[0;37;46m Initializing master %s... \x1b[0m" % str(conf['master_channel']))
shared_data.add_channel(pydoc.locate(conf['master_channel'])(shared_data), is_slave=False)
l.critical("\x1b[0;37;42m Master channel %s (%s) initialized. \x1b[0m" %
(shared_data.master.channel_name, shared_data.master.channel_id))
l.critical("\x1b[1;37;42m All channels initialized. \x1b[0m")