Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wechat_location_msg(self, msg: wxpy.Message) -> Message:
efb_msg = Message()
efb_msg.text = msg.text.split('\n')[0][:-1]
efb_msg.attributes = LocationAttribute(latitude=float(msg.location['x']),
longitude=float(msg.location['y']))
efb_msg.type = MsgType.Location
return efb_msg
def wechat_video_msg(self, msg: wxpy.Message) -> Message:
efb_msg = Message(type=MsgType.Video)
try:
if msg.file_size == 0:
raise EOFError
efb_msg.path, efb_msg.mime, efb_msg.file = self.save_file(msg)
efb_msg.filename = msg.file_name
efb_msg.text = ""
except EOFError:
efb_msg.type = MsgType.Text
efb_msg.text += self._("[Failed to download the video message, please check your phone.]")
return efb_msg
def handle_msg(context):
self.logger.debug(repr(context))
msg_element = context['message']
main_text: str = ''
messages: List[Message] = []
qq_uid = context['user_id']
at_list = {}
chat: Chat
author: ChatMember
remark = self.get_friend_remark(qq_uid)
if context['message_type'] == 'private':
context['alias'] = remark
chat: PrivateChat = self.chat_manager.build_efb_chat_as_private(context)
# efb_msg.chat: EFBChat = self.chat_manager.build_efb_chat_as_user(context, True)
else:
chat = self.chat_manager.build_efb_chat_as_group(context)
if 'anonymous' not in context or context['anonymous'] is None:
if context['message_type'] == 'group':
if context['sub_type'] == 'notice':
def qq_share_wrapper(self, data, chat: Chat = None):
efb_msg = Message(
type=MsgType.Link,
text='',
attributes=LinkAttribute(
title='' if 'title' not in data else data['title'],
description='' if 'content' not in data else data['content'],
image='' if 'image' not in data else data['image'],
url=data['url']
)
)
return [efb_msg]
def exit_callback(self):
# Don't send prompt if there's nowhere to send.
if not getattr(coordinator, 'master', None):
raise Exception(
self._("Web WeChat logged your account out before master channel is ready."))
self.logger.debug('Calling exit callback...')
if self._stop_polling_event.is_set():
return
msg = Message(
chat=self.user_auth_chat,
author=self.user_auth_chat.other,
deliver_to=coordinator.master,
text=self._(
"WeChat server has logged you out. Please log in again when you are ready."),
uid=f"__reauth__.{uuid4()}",
type=MsgType.Text,
)
on_log_out = self.flag("on_log_out")
on_log_out = on_log_out if on_log_out in (
"command", "idle", "reauth") else "command"
if on_log_out == "command":
msg.type = MsgType.Text
msg.commands = MessageCommands(
[MessageCommand(name=self._("Log in again"), callable_name="reauth", kwargs={"command": True})])
elif on_log_out == "reauth":
def wechat_voice_msg(self, msg: wxpy.Message) -> Message:
efb_msg = Message(type=MsgType.Voice)
try:
efb_msg.path, efb_msg.mime, efb_msg.file = self.save_file(msg)
efb_msg.text = ""
except EOFError:
efb_msg.type = MsgType.Text
efb_msg.text += self._("[Failed to download the voice message, please check your phone.]")
return efb_msg
def send_msg_to_master(self, context):
self.logger.debug(repr(context))
if not getattr(coordinator, 'master', None): # Master Channel not initialized
raise Exception(context['message'])
chat = self.chat_manager.build_efb_chat_as_system_user(context)
try:
author = chat.get_member(SystemChatMember.SYSTEM_ID)
except KeyError:
author = chat.add_system_member()
msg = Message(
uid="__{context[uid_prefix]}__.{uni_id}".format(context=context,
uni_id=str(int(time.time()))),
type=MsgType.Text,
chat=chat,
author=author,
deliver_to=coordinator.master
)
if 'message' in context:
msg.text = context['message']
if 'commands' in context:
msg.commands = MessageCommands(context['commands'])
coordinator.send_message(msg)
try:
author = chat.get_member(SystemChatMember.SYSTEM_ID)
except KeyError:
author = chat.add_system_member()
if any(i in msg.text for i in self.NEW_CHAT_PATTERNS):
coordinator.send_status(ChatUpdates(
channel=self.channel,
new_chats=(chat.uid,)
))
elif any(i in msg.text for i in self.CHAT_AND_MEMBER_UPDATE_PATTERNS):
# TODO: detect actual member changes from message text
coordinator.send_status(ChatUpdates(
channel=self.channel,
modified_chats=(chat.uid,)
))
return Message(
text=msg.text,
type=MsgType.Text,
chat=chat,
author=author,
)
def wechat_shared_image_msg(self, msg: wxpy.Message, source: str, text: str = "", mode: str = "image") -> Message:
efb_msg = Message()
efb_msg.type = MsgType.Image
efb_msg.text = self._("Via {source}").format(source=source)
if text:
efb_msg.text = "%s\n%s" % (text, efb_msg.text)
efb_msg.path, efb_msg.mime, efb_msg.file = self.save_file(msg, app_message=mode)
return efb_msg