Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_text_msg(self):
slave = next(iter(coordinator.slaves.values()))
wonderland = slave.get_chat('wonderland001')
msg = EFBMsg()
msg.deliver_to = slave
msg.chat = wonderland
msg.author = EFBChat(self).self()
msg.type = MsgType.Text
msg.text = "Hello, world."
return coordinator.send_message(msg)
def __init__(self, instance_id=None):
super().__init__(instance_id)
alice = EFBChat(self)
alice.chat_name = "Alice"
alice.chat_uid = "alice"
alice.chat_type = ChatType.User
self.alice = alice
bob = EFBChat(self)
bob.chat_name = "Bob"
bob.chat_alias = "Little bobby"
bob.chat_uid = "bob"
bob.chat_type = ChatType.User
self.bob = bob
carol = EFBChat(self)
carol.chat_name = "Carol"
carol.chat_uid = "carol"
carol.chat_type = ChatType.User
self.carol = carol
dave = EFBChat(self)
dave.chat_name = "デブ" # Nah, that's a joke
dave.chat_uid = "dave"
dave.chat_type = ChatType.User
self.dave = dave
wonderland = EFBChat(self)
def test_pickle(slave_channel):
chat = EFBChat(slave_channel)
chat.chat_uid = "00001"
chat.chat_name = "Chat"
chat.chat_alias = "chaT"
chat.chat_type = ChatType.User
chat_dup = pickle.loads(pickle.dumps(chat))
for attr in ("module_name", "module_id", "channel_emoji", "chat_name",
"chat_type", "chat_alias", "chat_uid", "is_chat"):
assert getattr(chat, attr) == getattr(chat_dup, attr)
alice.chat_name = "Alice"
alice.chat_uid = "alice"
alice.chat_type = ChatType.User
self.alice = alice
bob = EFBChat(self)
bob.chat_name = "Bob"
bob.chat_alias = "Little bobby"
bob.chat_uid = "bob"
bob.chat_type = ChatType.User
self.bob = bob
carol = EFBChat(self)
carol.chat_name = "Carol"
carol.chat_uid = "carol"
carol.chat_type = ChatType.User
self.carol = carol
dave = EFBChat(self)
dave.chat_name = "デブ" # Nah, that's a joke
dave.chat_uid = "dave"
dave.chat_type = ChatType.User
self.dave = dave
wonderland = EFBChat(self)
wonderland.chat_name = "Wonderland"
wonderland.chat_uid = "wonderland001"
wonderland.chat_type = ChatType.Group
wonderland.members = [bob.copy(), carol.copy(), dave.copy()]
for i in wonderland.members:
i.group = wonderland
self.wonderland = wonderland
self.chats: List[EFBChat] = [alice, bob, wonderland]
def test_is_self():
chat = EFBChat().self()
assert chat.is_self
def send_message_recall_status(self):
slave = next(iter(coordinator.slaves.values()))
alice = slave.get_chat('alice')
msg = EFBMsg()
msg.deliver_to = slave
msg.chat = alice
msg.author = EFBChat(self).self()
msg.uid = "1"
status = EFBMessageRemoval(self, slave, msg)
return coordinator.send_status(status)
def test_verify_wrong_chat_type(slave_channel):
chat = EFBChat(slave_channel)
chat.chat_uid = "00001"
chat.chat_name = "Chat"
chat.chat_type = "user"
with pytest.raises(ValueError):
chat.verify()
storage_path = utils.get_data_path(self.middleware_id)
config_path = utils.get_config_path(self.middleware_id)
if not os.path.exists(storage_path):
os.makedirs(storage_path)
if not os.path.exists(config_path):
raise EFBException("Filter middleware is not configured.")
else:
config = yaml.load(open(config_path, encoding ="UTF-8"))
self.config_version = 0
self.match_mode = config.get("match_mode") # fuzz and exact
if self.match_mode is None:
self.match_mode = "fuzz"
self.chat = EFBChat()
self.chat.channel_name = self.middleware_name
self.chat.channel_id = self.middleware_id
self.chat.channel_emoji = "📜"
self.chat.chat_uid = "__zhangzhishan.filter__"
self.chat.chat_name = self.middleware_name
self.chat.chat_type = ChatType.System
self.logger = logging.getLogger("zhangzhishan.filter")
hdlr = logging.FileHandler('./zhangzhishan.filter.log', encoding ="UTF-8")
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
self.logger.addHandler(hdlr)
self.logger.setLevel(logging.ERROR)
def build_efb_chat_as_user(self, context, is_chat):
efb_chat = EFBChat(self.channel)
uid = context['user_id']
efb_chat.chat_uid = 'private' + '_' + str(uid)
chat_name = ''
if 'nickname' not in context:
i: dict = self.channel.QQClient.get_stranger_info(uid)
chat_name = ""
if i:
chat_name = i['nickname']
else:
chat_name = context['nickname']
efb_chat.chat_name = chat_name
efb_chat.chat_alias = None if 'alias' not in context else context['alias']
efb_chat.chat_type = ChatType.User
efb_chat.is_chat = is_chat
efb_chat.vendor_specific = {'is_anonymous': False}
if not is_chat and context['message_type'] != 'private':
def __init__(self, channel: 'QQMessengerChannel'):
self.channel: 'QQMessengerChannel' = channel
self.logger: logging.Logger = logging.getLogger(__name__)
self.MISSING_GROUP: EFBChat = EFBChat(self.channel)
self.MISSING_GROUP.chat_uid = "__error__"
self.MISSING_GROUP.chat_type = ChatType.Group
self.MISSING_GROUP.chat_name = self.MISSING_GROUP.chat_alias = "Chat Missing"
self.MISSING_USER: EFBChat = EFBChat(self.channel)
self.MISSING_USER.chat_uid = "__error__"
self.MISSING_USER.chat_type = ChatType.User
self.MISSING_USER.chat_name = self.MISSING_USER.chat_alias = "Chat Missing"