Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def react_to_message_status(self, status: ReactToMessage):
if self.accept_message_reactions == "reject_one":
raise EFBMessageReactionNotPossible("Message reaction is rejected by flag.")
if self.accept_message_reactions == "reject_all":
raise EFBOperationNotSupported("All message reactions are rejected by flag.")
message = self.messages_sent.get(status.msg_id)
if message is None:
raise EFBOperationNotSupported("Message is not found.")
if status.reaction is None:
for idx, i in message.reactions.items():
message.reactions[idx] = [j for j in i if not isinstance(j, SelfChatMember)]
else:
if status.reaction not in message.reactions:
message.reactions[status.reaction] = []
message.reactions[status.reaction].append(message.chat.self)
coordinator.send_status(MessageReactionsUpdate(
chat=message.chat,
msg_id=message.uid,
reactions=message.reactions
))
def compare_substitutions(tg_msg: Message, efb_msg: EFBMessage) -> None:
"""Compare application of substitution in message text."""
if not efb_msg.substitutions:
return
self_subs: List[Tuple[MessageEntityMentionName, str]] = tg_msg.get_entities_text(cls=MessageEntityMentionName)
other_subs: List[Tuple[MessageEntityCode, str]] = tg_msg.get_entities_text(cls=MessageEntityCode)
for coord, chat in efb_msg.substitutions.items():
size = coord[1] - coord[0]
if isinstance(chat, SelfChatMember):
assert any(ent.length == size for ent, _ in self_subs), (
f"string of size {size} is not found in self_subs: "
f"{[(x.to_dict(), y) for x, y in self_subs]}"
)
else:
assert any(ent.length == size for ent, _ in other_subs), (
f"string of size {size} is not found in other_subs: "
f"{[(x.to_dict(), y) for x, y in self_subs]}"
async def test_master_message(helper, client, bot_group, slave, channel, factory: MessageFactory):
chat = slave.chat_without_alias
with link_chats(channel, (chat, ), bot_group):
# Send message
tg_msg = await factory.send_message(client, bot_group)
efb_msg = slave.messages.get(timeout=5)
assert efb_msg.chat == chat
assert isinstance(efb_msg.author, SelfChatMember)
assert efb_msg.deliver_to is slave
assert not efb_msg.edit
assert not efb_msg.edit_media
factory.compare_message(tg_msg, efb_msg)
await factory.finalize_message(tg_msg, efb_msg)
# Edit message
edited_msg = await factory.edit_message(client, tg_msg)
if edited_msg is not None:
efb_msg = slave.messages.get(timeout=5)
assert efb_msg.chat == chat
assert isinstance(efb_msg.author, SelfChatMember)
assert efb_msg.deliver_to is slave
assert efb_msg.edit
assert not efb_msg.edit_media
factory.compare_message(edited_msg, efb_msg)
def send_status(self, status: 'Status'):
if isinstance(status, MessageRemoval):
if not isinstance(status.message.author, SelfChatMember):
raise EFBMessageError(self._('You can only recall your own messages.'))
try:
uid_type = status.message.uid.split('_')
self.recall_message(uid_type[1])
except CoolQAPIFailureException:
raise EFBMessageError(
self._('Failed to recall the message. This message may have already expired.'))
else:
raise EFBOperationNotSupported()
# todo
raise EFBOperationNotSupported(self._("Failed to recall the message!\n"
"This message may have already expired."))
if msg.type in [MsgType.Text, MsgType.Link]:
if msg.text == "kick`":
group_id = chat_type[1]
user_id = msg.target.author.uid
self.coolq_api_query("set_group_kick",
group_id=group_id,
user_id=user_id)
else:
if isinstance(msg.target, Message):
max_length = 50
tgt_text = coolq_text_encode(process_quote_text(msg.target.text, max_length))
tgt_alias = ""
if chat_type[0] != 'private' and not isinstance(msg.target.author, SelfChatMember):
tgt_alias += m.coolq_code_at_wrapper(msg.target.author.uid)
else:
tgt_alias = ""
msg.text = "%s%s\n\n%s" % (tgt_alias, tgt_text, coolq_text_encode(msg.text))
msg.uid = self.coolq_send_message(chat_type[0], chat_type[1], msg.text)
self.logger.debug('[%s] Sent as a text message. %s', msg.uid, msg.text)
elif msg.type in (MsgType.Image, MsgType.Sticker, MsgType.Animation):
self.logger.info("[%s] Image/Sticker/Animation %s", msg.uid, msg.type)
text = ''
if not self.client_config['is_pro']: # CoolQ Air
if self.client_config['air_option']['upload_to_smms']:
text = '[Image] {}'
smms_data = None
try:
smms_data = upload_image_smms(msg.file, msg.path)
except CoolQUnknownException as e:
def get_or_enrol_member(cached: ETMChatType, member: ChatMember) -> ETMChatMember:
# TODO: Add test case for this
try:
return cached.get_member(member.uid)
except KeyError:
cached_member: ETMChatMember
if isinstance(member, SystemChatMember):
cached_member = cached.add_system_member(name=member.name, alias=member.alias, uid=member.uid,
vendor_specific=member.vendor_specific.copy(),
description=member.description)
elif isinstance(member, SelfChatMember):
cached_member = cached.add_self()
else:
cached_member = cached.add_member(name=member.name, alias=member.alias, uid=member.uid,
vendor_specific=member.vendor_specific.copy(),
description=member.description)
cached_member.module_id = member.module_id
cached_member.module_name = member.module_name
cached_member.channel_emoji = member.channel_emoji
return cached_member
def html_substitutions(self, msg: Message) -> str:
"""Build a Telegram-flavored HTML string for message text substitutions."""
text = msg.text
if msg.substitutions:
ranges = sorted(msg.substitutions.keys())
t = ""
prev = 0
for i in ranges:
t += html.escape(text[prev:i[0]])
sub_chat = msg.substitutions[i]
if isinstance(sub_chat, SelfChatMember) or (isinstance(sub_chat, Chat) and sub_chat.has_self):
t += f'<a>'
t += html.escape(text[i[0]:i[1]])
t += "</a>"
else:
t += '<code>'
t += html.escape(text[i[0]:i[1]])
t += '</code>'
prev = i[1]
t += html.escape(text[prev:])
return t
elif text:
return html.escape(text)
return text
def add_self(self) -> ETMSelfChatMember:
if getattr(self, 'self', None) and isinstance(self.self, ETMSelfChatMember):
return self.self
assert not any(isinstance(i, SelfChatMember) for i in self.members)
s = ETMSelfChatMember(self.db, self)
self.members.append(s)
return s
return cached_obj
# if chat name or alias changes, update cache
efb_chat: Chat
chat_id = ChatID(chat.puid or f"__invalid_{uuid4()}__")
if cached_obj:
efb_chat = cached_obj
efb_chat.uid = chat_id
efb_chat.name = chat_name
efb_chat.alias = chat_alias
efb_chat.vendor_specific = {'is_mp': isinstance(chat, wxpy.MP)}
if isinstance(chat, wxpy.Group):
# Update members if necessary
remote_puids = {i.puid for i in chat.members}
local_ids = {i.uid for i in efb_chat.members if not isinstance(i, SelfChatMember)}
# Add missing members
missing_puids = remote_puids - local_ids
for member in chat.members:
if member.puid in missing_puids:
member_name, member_alias = self.get_name_alias(member)
efb_chat.add_member(name=member_name, alias=member_alias, uid=member.puid,
vendor_specific={'is_mp': False})
elif chat == chat.bot.self:
efb_chat = PrivateChat(channel=self.channel, uid=chat_id, name=chat_name,
alias=chat_alias, vendor_specific={'is_mp': True}, other_is_self=True)
elif isinstance(chat, wxpy.Group):
efb_chat = GroupChat(channel=self.channel, uid=chat_id, name=chat_name,
alias=chat_alias, vendor_specific={'is_mp': False})
for i in chat.members:
if i.user_name == self.bot.self.user_name:
continue