Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(tmp_path):
temp_dir = tmp_path
os.environ['EFB_DATA_PATH'] = str(temp_dir)
config_path = ehforwarderbot.utils.get_config_path()
config = {
"master_channel": "tests.mocks.master.MockMasterChannel",
"slave_channels": ["tests.mocks.slave.MockSlaveChannel"],
"middlewares": ["tests.mocks.middleware.MockMiddleware"]
}
with open(config_path, 'w') as f:
yaml.dump(config, f)
config = ehforwarderbot.config.load_config()
ehforwarderbot.__main__.init(config)
assert coordinator.master.channel_id == master.MockMasterChannel.channel_id
assert slave.MockSlaveChannel.channel_id in coordinator.slaves
assert coordinator.middlewares[0].middleware_id == middleware.MockMiddleware.middleware_id
"tests.mocks.slave.MockSlaveChannel#instance1",
"tests.mocks.slave.MockSlaveChannel#instance2"
]
config = {
"master_channel": master_id,
"slave_channels": slave_ids
}
config = dump_and_load_config(config)
ehforwarderbot.__main__.init(config)
assert coordinator.master.channel_id == master_id
assert isinstance(coordinator.master, master.MockMasterChannel)
for i in slave_ids:
assert i in coordinator.slaves
assert isinstance(coordinator.slaves[i], slave.MockSlaveChannel)
def send_text_msg(self):
slave = next(iter(coordinator.slaves.values()))
wonderland = slave.get_chat('wonderland001')
msg = EFBMsg()
msg.deliver_to = slave
msg.chat = wonderland
msg.author = EFBChat(self).self()
msg.type = MsgType.Text
msg.text = "Hello, world."
return coordinator.send_message(msg)
def send_message_recall_status(self):
slave = next(iter(coordinator.slaves.values()))
alice = slave.get_chat('alice')
msg = EFBMsg()
msg.deliver_to = slave
msg.chat = alice
msg.author = EFBChat(self).self()
msg.uid = "1"
status = EFBMessageRemoval(self, slave, msg)
return coordinator.send_status(status)
telemetry_set_metadata({i: cls.__version__})
instance_id = i.split('#', 1)[1:]
instance_id = (instance_id and instance_id[0]) or None
coordinator.add_middleware(cls(instance_id=instance_id))
logger.log(99, "\x1b[0;37;42m %s \x1b[0m",
_("Middleware {name} ({id}) # {instance_id} is initialized.")
.format(name=cls.middleware_name, id=cls.middleware_id,
instance_id=instance_id or _("Default profile")))
logger.log(99, "\x1b[1;37;42m %s \x1b[0m", _("All middlewares are initialized."))
coordinator.master_thread = threading.Thread(target=coordinator.master.poll, name=f"{coordinator.master.channel_id} polling thread")
coordinator.slave_threads = {key: threading.Thread(target=coordinator.slaves[key].poll,
name=f"{key} polling thread")
for key in coordinator.slaves}
Filters.regex(r"^/(?P[0-9]+)_(?P[a-z0-9_-]+)"),
self.extra_call))
self.command_conv = ConversationHandler(
entry_points=[],
states={Flags.COMMAND_PENDING: [CallbackQueryHandler(self.command_exec)]},
fallbacks=[CallbackQueryHandler(self.bot.session_expired)],
per_message=True,
per_chat=True,
per_user=False
)
self.bot.dispatcher.add_handler(self.command_conv)
self.modules_list: List[Any[SlaveChannel, Middleware]] = []
for i in sorted(coordinator.slaves.keys()):
self.modules_list.append(coordinator.slaves[i])
self.modules_list.extend(coordinator.middlewares)
def extra_call(self, update: Update, context: CallbackContext):
"""
Invoke an additional feature from slave channel.
"""
groupdict = context.match.groupdict()
if int(groupdict['id']) >= len(coordinator.slaves):
return self.bot.reply_error(update, self._("Invalid module ID. (XC01)"))
slaves = coordinator.slaves
channel = slaves[sorted(slaves)[int(groupdict['id'])]]
functions = channel.get_extra_functions()
if groupdict['command'] not in functions:
return self.bot.reply_error(update, self._("Command not found in selected module. (XC02)"))
# noinspection PyUnresolvedReferences
header = "{} {}: {}\n-------\n".format(
channel.channel_emoji, channel.channel_name,
functions[groupdict['command']].name # type: ignore
)
msg = self.bot.send_message(update.message.chat.id,
def get_slave_channels_ids() -> List[str]:
"""Get the collection of slave channel IDs in current instance"""
return list(coordinator.slaves.keys())
def stop_gracefully(*_, **__):
logger = logging.getLogger(__name__)
if hasattr(coordinator, "master") and isinstance(coordinator.master, EFBChannel):
coordinator.master.stop_polling()
logger.debug("Stop signal sent to master: %s" % coordinator.master.channel_name)
else:
logger.info("Valid master channel is not found.")
for i in coordinator.slaves:
if isinstance(coordinator.slaves[i], EFBChannel):
coordinator.slaves[i].stop_polling()
logger.debug("Stop signal sent to slave: %s" % coordinator.slaves[i].channel_name)
if coordinator.master_thread and coordinator.master_thread.is_alive():
coordinator.master_thread.join()
for i in coordinator.slave_threads.values():
if i.is_alive():
i.join()