Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if chat_type == GroupChat:
self.fill_group(chat)
self.chats_by_chat_type[chat_type.__name__].append(chat)
self.chats_by_notification_state[notification].append(chat)
self.chats_by_profile_picture[avatar is not None].append(chat)
self.chats_by_alias[alias is not None].append(chat)
self.chats.append(chat)
name = "Unknown Chat"
self.unknown_chat: PrivateChat = PrivateChat(
channel=self,
name=name,
alias="不知道",
uid=ChatID(self.CHAT_ID_FORMAT.format(hash=hash(name))),
notification=ChatNotificationState.ALL
)
name = "Unknown Chat @ unknown channel"
self.unknown_channel: PrivateChat = PrivateChat(
module_id="__this_is_not_a_channel__",
module_name="Unknown Channel",
channel_emoji="‼️",
name=name,
alias="知らんでぇ",
uid=ChatID(self.CHAT_ID_FORMAT.format(hash=hash(name))),
notification=ChatNotificationState.ALL
)
name = "backup_chat"
self.backup_chat: PrivateChat = PrivateChat(
def chat_id_str_to_id(s: EFBChannelChatIDStr) -> Tuple[ModuleID, ChatID, Optional[ChatID]]:
"""
Reverse of chat_id_to_str.
Returns:
channel_id, chat_uid, group_id
"""
ids = s.split(" ", 2)
channel_id = ModuleID(ids[0])
chat_uid = ChatID(ids[1])
if len(ids) < 3:
group_id = None
else:
group_id = ChatID(ids[2])
return channel_id, chat_uid, group_id
def make_system_member(self, name: str = "", alias: Optional[str] = None, id: ChatID = ChatID(""),
uid: ChatID = ChatID(""), vendor_specific: Dict[str, Any] = None, description: str = "",
middleware: Optional[Middleware] = None) -> ETMSystemChatMember:
# TODO: remove deprecated ID
assert not id, f"id is {id!r}"
return ETMSystemChatMember(self.db, self, name=name, alias=alias, uid=uid,
vendor_specific=vendor_specific, description=description, middleware=middleware)
def chat_migration(self, update: Update, context: CallbackContext):
"""Triggered by any message update with either
``migrate_from_chat_id`` or ``migrate_to_chat_id``
or both (which shouldn’t happen).
"""
message = update.effective_message
if message.migrate_from_chat_id is not None:
from_id = ChatID(message.migrate_from_chat_id)
to_id = ChatID(message.chat.id)
elif message.migrate_to_chat_id is not None:
from_id = ChatID(message.chat.id)
to_id = ChatID(message.migrate_to_chat_id)
else:
# Per ptb filter specs, this part of code should not be reached.
return
self.chat_migration_by_id(from_id, to_id)
def __init__(self, channel: 'EFBChannel', chat_id: ChatID,
new_members: Collection[ChatID] = tuple(),
removed_members: Collection[ChatID] = tuple(),
modified_members: Collection[ChatID] = tuple()):
"""__init__(channel: EFBChannel, chat_id: str, new_members: Collection[str]=tuple(), removed_members: Collection[str]=tuple(), modified_members: Optional[Collection[str]]=tuple())
Args:
channel (:obj:`.EFBChannel`): Slave channel that issues the update
chat_id (str): Unique ID of the chat.
new_members (Optional[Collection[str]]): Unique ID of new members
removed_members (Optional[Collection[str]]): Unique ID of removed members
modified_members (Optional[Collection[str]]): Unique ID of modified members
"""
self.channel: 'EFBChannel' = channel
self.chat_id: ChatID = chat_id
self.new_members: Collection[ChatID] = new_members
self.removed_members: Collection[ChatID] = removed_members
self.modified_members: Collection[ChatID] = modified_members
self.destination_channel: 'EFBChannel' = coordinator.master
self.verify()
def __init__(self, channel: 'WeChatChannel'):
self.channel: 'WeChatChannel' = channel
self.logger: logging.Logger = logging.getLogger(__name__)
# noinspection PyProtectedMember
self._ = self.channel._
self.MISSING_GROUP: GroupChat = GroupChat(
channel=self.channel,
uid=ChatID("__error_group__"),
name=self._("Group Missing")
)
self.MISSING_CHAT: PrivateChat = PrivateChat(
channel=self.channel,
uid=ChatID("__error_chat__"),
name=self._("Chat Missing")
)
self.efb_chat_objs: Dict[str, Chat] = {}
# Cached Chat objects. Key: tuple(chat PUID, group PUID or None)
# Load system chats
self.system_chats: List[Chat] = []
for i in channel.flag('system_chats_to_include'):
self.system_chats.append(
remark = self.get_friend_remark(qq_uid)
if context['message_type'] == 'private':
context['alias'] = remark
chat: PrivateChat = self.chat_manager.build_efb_chat_as_private(context)
# efb_msg.chat: EFBChat = self.chat_manager.build_efb_chat_as_user(context, True)
else:
chat = self.chat_manager.build_efb_chat_as_group(context)
if 'anonymous' not in context or context['anonymous'] is None:
if context['message_type'] == 'group':
if context['sub_type'] == 'notice':
context['event_description'] = self._("System Notification")
context['uid_prefix'] = 'group_notification'
author = chat.add_system_member(
name=context['event_description'],
uid=ChatID("__{context[uid_prefix]}__".format(context=context))
)
else:
if remark is not None:
context['nickname'] = remark
g_id = context['group_id']
member_info = self.coolq_api_query('get_group_member_info',
group_id=g_id,
user_id=qq_uid)
if member_info is not None:
context['alias'] = member_info['card']
author = self.chat_manager.build_or_get_efb_member(chat, context)
elif context['message_type'] == 'private':
author = chat.other
else:
author = self.chat_manager.build_or_get_efb_member(chat, context)
else: # anonymous user in group
def make_system_member(self, name: str = "", alias: Optional[str] = None, id: ChatID = ChatID(""),
uid: ChatID = ChatID(""), vendor_specific: Dict[str, Any] = None, description: str = "",
middleware: Optional[Middleware] = None) -> ETMSystemChatMember:
# TODO: remove deprecated ID
assert not id, f"id is {id!r}"
return ETMSystemChatMember(self.db, self, name=name, alias=alias, uid=uid,
vendor_specific=vendor_specific, description=description, middleware=middleware)
Args:
chat_object (ETMChatType): Chat object for pickling
Returns:
SlaveChatInfo: The inserted or updated row
"""
slave_channel_id = chat_object.module_id
slave_channel_name = chat_object.module_name
slave_channel_emoji = chat_object.channel_emoji
slave_chat_uid = chat_object.uid
slave_chat_name = chat_object.name
slave_chat_alias = chat_object.alias
slave_chat_type = chat_object.chat_type_name
parent_chat: Optional['ETMChatType'] = getattr(chat_object, 'chat', None)
slave_chat_group_id: Optional[ChatID]
if parent_chat:
slave_chat_group_id = parent_chat.uid
else:
slave_chat_group_id = None
chat_info = self.get_slave_chat_info(slave_channel_id=slave_channel_id,
slave_chat_uid=slave_chat_uid,
slave_chat_group_id=slave_chat_group_id)
if chat_info is not None:
chat_info.slave_channel_name = slave_channel_name
chat_info.slave_channel_emoji = slave_channel_emoji
chat_info.slave_chat_name = slave_chat_name
chat_info.slave_chat_alias = slave_chat_alias
chat_info.slave_chat_type = slave_chat_type
chat_info.pickle = chat_object.pickle
chat_info.save()