Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(tmp_path):
temp_dir = tmp_path
os.environ['EFB_DATA_PATH'] = str(temp_dir)
config_path = ehforwarderbot.utils.get_config_path()
config = {
"master_channel": "tests.mocks.master.MockMasterChannel",
"slave_channels": ["tests.mocks.slave.MockSlaveChannel"],
"middlewares": ["tests.mocks.middleware.MockMiddleware"]
}
with open(config_path, 'w') as f:
yaml.dump(config, f)
config = ehforwarderbot.config.load_config()
ehforwarderbot.__main__.init(config)
assert coordinator.master.channel_id == master.MockMasterChannel.channel_id
assert slave.MockSlaveChannel.channel_id in coordinator.slaves
assert coordinator.middlewares[0].middleware_id == middleware.MockMiddleware.middleware_id
def test_custom_path_module_loading():
with tempfile.TemporaryDirectory() as f:
os.environ['EFB_DATA_PATH'] = f
modules_path = ehforwarderbot.utils.get_custom_modules_path()
config = {
"master_channel": "master.MockMasterChannel",
"slave_channels": ["slave.MockSlaveChannel",
"slave.MockSlaveChannel#instance"],
"middlewares": ["middleware.MockMiddleware",
"middleware.MockMiddleware#instance"]
}
test_path: Path = Path(inspect.getfile(inspect.currentframe())).parent / 'mocks'
# noinspection PyTypeChecker
shutil.copy(test_path / 'master.py', modules_path)
# noinspection PyTypeChecker
shutil.copy(test_path / 'slave.py', modules_path)
# noinspection PyTypeChecker
shutil.copy(test_path / 'middleware.py', modules_path)
config = dump_and_load_config(config)
ehforwarderbot.__main__.init(config)
def init():
"""
Initialize all channels.
"""
global shared_data, master_thread, slave_threads
# Init shared data object
shared_data = SharedData(args.profile)
# Include custom channels
custom_channel_path = utils.get_custom_channel_path()
if custom_channel_path not in sys.path:
sys.path.insert(0, custom_channel_path)
# Initialize all channels
# (Load libraries and modules and init them with Queue `q`)
l = logging.getLogger("ehForwarderBot")
conf = config.load_config()
slaves = {}
for i in conf['slave_channels']:
l.critical("\x1b[0;37;46m Initializing slave %s... \x1b[0m" % str(i))
obj = pydoc.locate(i)
shared_data.add_channel(obj(shared_data), is_slave=True)
def __init__(self, channel: 'TelegramChannel'):
base_path = utils.get_data_path(channel.channel_id)
self.logger.debug("Loading database...")
database.init(str(base_path / 'tgdata.db'))
database.connect()
self.logger.debug("Database loaded.")
self.task_queue: 'Queue[Optional[Tuple[Callable, Sequence[Any], Dict[str, Any]]]]' = Queue()
self.worker_thread = Thread(target=self.task_worker, name="ETM database worker thread")
self.worker_thread.start()
self.logger.debug("Checking database migration...")
if not ChatAssoc.table_exists():
self._create()
else:
msg_log_columns = {i.name for i in database.get_columns("msglog")}
slave_chat_info_columns = {i.name for i in database.get_columns("slavechatinfo")}
def load_config(self):
"""
Load configuration from path specified by the framework.
Configuration file is in YAML format.
"""
config_path = efb_utils.get_config_path(self.channel_id)
if not config_path.exists():
return
with config_path.open() as f:
d = yaml.full_load(f)
if not d:
return
self.config: Dict[str, Any] = d
def authenticate(self, qr_reload, first_start=False):
self.master_qr_picture_id = None
qr_callback = getattr(self, qr_reload, self.master_qr_code)
if getattr(self, 'bot', None): # if a bot exists
self.bot.cleanup()
with coordinator.mutex:
self.bot: wxpy.Bot = wxpy.Bot(cache_path=str(efb_utils.get_data_path(self.channel_id) / "wxpy.pkl"),
qr_callback=qr_callback,
logout_callback=self.exit_callback,
user_agent=self.flag('user_agent'),
start_immediately=not first_start)
self.bot.enable_puid(
efb_utils.get_data_path(self.channel_id) / "wxpy_puid.pkl",
self.flag('puid_logs')
)
self.done_reauth.set()
if hasattr(self, "slave_message"):
self.slave_message.bot = self.bot
self.slave_message.wechat_msg_register()
def load_config() -> Dict[str, Any]:
_ = coordinator.translator.gettext
# Include custom channels
custom_channel_path = str(utils.get_custom_modules_path())
if custom_channel_path not in sys.path:
sys.path.insert(0, custom_channel_path)
conf_path = utils.get_config_path()
if not conf_path.exists():
raise FileNotFoundError(_("Config File does not exist. ({})").format(conf_path))
with conf_path.open() as f:
data: Dict[str, Any] = OPTIONAL_DEFAULTS.copy()
data.update(YAML().load(f))
# Verify configuration
# - Master channel
master_channel_id = data.get("master_channel", None)
if not master_channel_id:
raise ValueError(_("Master Channel is not specified in the profile config."))
def __init__(self, instance_id: str = None):
super().__init__(instance_id)
storage_path = utils.get_data_path(self.middleware_id)
config_path = utils.get_config_path(self.middleware_id)
if not os.path.exists(storage_path):
os.makedirs(storage_path)
if not os.path.exists(config_path):
raise EFBException("Filter middleware is not configured.")
else:
config = yaml.load(open(config_path, encoding ="UTF-8"))
self.config_version = 0
self.match_mode = config.get("match_mode") # fuzz and exact
if self.match_mode is None:
self.match_mode = "fuzz"
self.chat = EFBChat()
self.chat.channel_name = self.middleware_name
def save_config(self):
coordinator.profile = self.profile
conf_path = utils.get_config_path()
if not os.path.exists(conf_path):
os.makedirs(os.path.dirname(conf_path))
with open(conf_path, 'w') as f:
self.yaml.dump(self.config, f)