Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def decorator(func):
rule = UserLongPollEventRule(64, *rules)
rule.create(
func,
{
"name": "chat_voice_message_states",
"data": ["user_ids", "peer_id", "total_count", "ts"],
"dataclass": events.ChatVoiceMessageStates,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(13, *rules)
rule.create(
func,
{
"name": "chat_remove",
"data": ["peer_id", "local_id"],
"dataclass": events.DeleteMessages,
},
)
self.rules.append(rule)
return func
def add_handled_rule(
self, rules: typing.List[AbstractRule], func: typing.Callable
) -> UserLongPollEventRule:
rule = UserLongPollEventRule(4, *rules)
rule.create(
func,
{
"name": "message_new",
"data": ["message_id", "flags", *ADDITIONAL_FIELDS],
"dataclass": Message,
},
)
self.rules.append([rule])
return rule
def decorator(func):
rule = UserLongPollEventRule(10, *rules)
rule.create(
func,
{
"name": "chat_flag_remove",
"data": ["peer_id", "mask"],
"dataclass": events.ResetDialogFlags,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(6, *rules)
rule.create(
func,
{
"name": "message_read_in",
"data": ["peer_id", "local_id"],
"dataclass": events.InRead,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(51, *rules)
rule.create(
func,
{
"name": "chat_edit",
"data": ["chat_id", "self"],
"dataclass": events.ChatEdit,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(3, *rules)
rule.create(
func,
{
"name": "message_flag_remove",
"data": ["message_id", "mask", *ADDITIONAL_FIELDS],
"dataclass": events.ResetMessageFlags,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(1, *rules)
rule.create(
func,
{
"name": "message_flag_change",
"data": ["message_id", "flags", *ADDITIONAL_FIELDS],
"dataclass": events.ReplaceMessageFlags,
},
)
self.rules.append(rule)
return func
def decorator(func):
rule = UserLongPollEventRule(5, *rules)
rule.create(
func,
{
"name": "message_edit",
"data": [
"message_id",
"mask",
"peer_id",
"timestamp",
"new_text",
*ADDITIONAL_FIELDS,
],
"dataclass": events.EditMessage,
},
)
self.rules.append(rule)
def __init__(self):
self.event: UserEvents = UserEvents()
self.message_rules: typing.List[UserLongPollEventRule] = list()
# Main message handling managers
self.message: MessageHandler = MessageHandler(default_rules=[PrivateMessage()])
self.chat_message: MessageHandler = MessageHandler(
default_rules=[ChatMessage()]
)
self.message_handler: MessageHandler = MessageHandler(default_rules=[Any()])