Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import threading
from typing import Set, Optional, IO, List
from logging import getLogger
from ehforwarderbot import EFBChannel, EFBMsg, EFBStatus, ChannelType, MsgType, coordinator, EFBChat
from ehforwarderbot.message import EFBMsgLinkAttribute, EFBMsgLocationAttribute
from ehforwarderbot.status import EFBMessageRemoval
from ehforwarderbot.types import ModuleID, MessageID
from ehforwarderbot.types import ChatID
class MockMasterChannel(EFBChannel):
channel_name: str = "Mock Master"
channel_emoji: str = "➕"
channel_id: ModuleID = ModuleID("tests.mocks.master.MockMasterChannel")
channel_type: ChannelType = ChannelType.Master
supported_message_types: Set[MsgType] = {MsgType.Text, MsgType.Link}
__version__: str = '0.0.1'
logger = getLogger(channel_id)
polling = threading.Event()
def poll(self):
self.polling.wait()
def send_status(self, status: EFBStatus):
self.logger.debug("Received status: %r", status)
def send_message(self, msg: EFBMsg) -> EFBMsg:
self.logger.debug("Received message: %r", msg)
from ehforwarderbot.message import MessageCommands, MessageCommand, StatusAttribute, LinkAttribute, \
Substitutions, LocationAttribute
from ehforwarderbot.status import MessageRemoval, ReactToMessage, MessageReactionsUpdate, ChatUpdates, \
MemberUpdates
from ehforwarderbot.types import ModuleID, ChatID, MessageID, ReactionName, Reactions
from ehforwarderbot.utils import extra
_T = TypeVar("_T")
ChatTypeName = Literal['PrivateChat', 'GroupChat', 'SystemChat']
class MockSlaveChannel(SlaveChannel):
channel_name: str = "Mock Slave"
channel_emoji: str = "➖"
channel_id: ModuleID = ModuleID("tests.mocks.slave")
supported_message_types: Set[MsgType] = {
MsgType.Text, MsgType.Image, MsgType.Voice, MsgType.Animation,
MsgType.Video, MsgType.File, MsgType.Location, MsgType.Link,
MsgType.Sticker, MsgType.Status, MsgType.Unsupported
}
__version__: str = '0.0.2'
logger = getLogger(channel_id)
CHAT_ID_FORMAT = "__chat_{hash}__"
polling = threading.Event()
__picture_dict: Dict[str, str] = {}
suggested_reactions: List[ReactionName] = [
def locate_module(module_id: ModuleID, module_type: str = None):
"""
Locate module by module ID
Args:
module_id: Module ID
module_type: Type of module, one of ``'master'``, ``'slave'`` and ``'middleware'``
"""
entry_point = None
if module_type:
entry_point = 'ehforwarderbot.%s' % module_type
module_id = ModuleID(module_id.split('#', 1)[0])
if entry_point:
for i in pkg_resources.iter_entry_points(entry_point):
if i.name == module_id:
return i.load()
return pydoc.locate(module_id)
def chat_id_str_to_id(s: EFBChannelChatIDStr) -> Tuple[ModuleID, ChatID, Optional[ChatID]]:
"""
Reverse of chat_id_to_str.
Returns:
channel_id, chat_uid, group_id
"""
ids = s.split(" ", 2)
channel_id = ModuleID(ids[0])
chat_uid = ChatID(ids[1])
if len(ids) < 3:
group_id = None
else:
group_id = ChatID(ids[2])
return channel_id, chat_uid, group_id
class EFBMiddleware(ABC):
"""
Middleware class.
Attributes:
middleware_id (str):
Unique ID of the middleware.
Convention of IDs is specified in :doc:`/guide/packaging`.
This ID will be appended with its instance ID when available.
middleware_name (str): Human-readable name of the middleware.
instance_id (str):
The instance ID if available.
"""
middleware_id: ModuleID = ModuleID("efb.empty_middleware")
middleware_name: str = "Empty Middleware"
instance_id: Optional[InstanceID] = None
__version__: str = 'undefined version'
def __init__(self, instance_id: Optional[InstanceID] = None):
"""
Initialize the middleware.
Inherited initializer must call the "super init" method
at the beginning.
Args:
instance_id: Instance ID of the middleware.
"""
if instance_id:
self.instance_id = InstanceID(instance_id)
self.middleware_id = ModuleID(self.middleware_id + "#" + instance_id)
def __init__(self, instance_id: Optional[InstanceID] = None):
"""
Initialize the middleware.
Inherited initializer must call the "super init" method
at the beginning.
Args:
instance_id: Instance ID of the middleware.
"""
if instance_id:
self.instance_id = InstanceID(instance_id)
self.middleware_id = ModuleID(self.middleware_id + "#" + instance_id)
import logging
from contextlib import suppress
from typing import TYPE_CHECKING, Optional, Dict, Tuple, Iterator, overload, cast, MutableSequence, Collection
from typing_extensions import Literal
from ehforwarderbot import coordinator
from ehforwarderbot.chat import Chat, ChatMember, BaseChat, SystemChatMember, SelfChatMember
from ehforwarderbot.exceptions import EFBChatNotFound
from ehforwarderbot.types import ModuleID, ChatID
from .chat import convert_chat, ETMChatType, ETMChatMember, unpickle, ETMSystemChat
if TYPE_CHECKING:
from . import TelegramChannel
CacheKey = Tuple[ModuleID, ChatID]
"""Cache storage key: module_id, chat_id"""
class ChatObjectCacheManager:
"""Maintain and update chat objects from all slave channels and
middlewares.
"""
def __init__(self, channel: 'TelegramChannel'):
self.channel = channel
self.db = channel.db
self.logger = logging.getLogger(__name__)
self.cache: Dict[CacheKey, ETMChatType] = dict()
self.logger.debug("Loading chats from slave channels...")
``None``.
Reactions should be ordered in a meaningful way, e.g., the order
used by the IM platform, or frequency of usage. Note that it is
not necessary to list all suggested reactions if that is too long,
or not feasible.
Set to ``None`` when it is known that no reaction is supported to
ANY message in the channel. Set to empty list when it is not feasible
to provide a list of suggested reactions, for example, the list of
reactions is different for each chat or message.
"""
channel_name: str = "Empty channel"
channel_emoji: str = "�"
channel_id: ModuleID = ModuleID("efb.empty_channel")
channel_type: ChannelType
instance_id: Optional[InstanceID] = None
supported_message_types: Set[MsgType] = set()
suggested_reactions: Optional[List[ReactionName]] = None
__version__: str = 'undefined version'
def __init__(self, instance_id: InstanceID = None):
"""
Initialize the channel.
Inherited initializer must call the "super init" method
at the beginning.
Args:
instance_id: Instance ID of the channel.
"""
if instance_id:
def __init__(self, db: 'DatabaseManager', *, channel: Optional[SlaveChannel] = None,
middleware: Optional[Middleware] = None,
module_name: str = "", channel_emoji: str = "", module_id: ModuleID = ModuleID(""), name: str = "",
alias: Optional[str] = None, uid: ChatID = ChatID(""), vendor_specific: Dict[str, Any] = None,
description: str = "", notification: ChatNotificationState = ChatNotificationState.ALL,
with_self: bool = True):
super().__init__(db, channel=channel, middleware=middleware, module_name=module_name,
channel_emoji=channel_emoji,
module_id=module_id, name=name, alias=alias, uid=uid, vendor_specific=vendor_specific,
description=description, notification=notification, with_self=with_self)
mutex: threading.Lock = threading.Lock()
"""Mutual exclusive lock for user interaction through CLI interface"""
master: EFBChannel # late init
"""The instance of the master channel."""
slaves: Dict[ModuleID, EFBChannel] = dict()
"""Instances of slave channels. Keys are the channel IDs."""
middlewares: List[EFBMiddleware] = list()
"""Instances of middlewares. Sorted in the order of execution."""
master_thread: Optional[threading.Thread] = None
"""The thread running poll() of the master channel."""
slave_threads: Dict[ModuleID, threading.Thread] = dict()
"""Threads running poll() from slave channels. Keys are the channel IDs."""
translator: NullTranslations = NullTranslations()
"""Internal GNU gettext translator."""
def add_channel(channel: EFBChannel):
"""
Register the channel with the coordinator.
Args:
channel (EFBChannel): Channel to register
"""
global master, slaves
if isinstance(channel, EFBChannel):
if channel.channel_type == ChannelType.Slave: