Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, cookies, connector):
"""Create a new channel."""
# Event fired when channel connects with arguments ():
self.on_connect = event.Event('Channel.on_connect')
# Event fired when channel reconnects with arguments ():
self.on_reconnect = event.Event('Channel.on_reconnect')
# Event fired when channel disconnects with arguments ():
self.on_disconnect = event.Event('Channel.on_disconnect')
# Event fired when an array is received with arguments (array):
self.on_receive_array = event.Event('Channel.on_receive_array')
# True if the channel is currently connected:
self._is_connected = False
# True if the on_connect event has been called at least once:
self._on_connect_called = False
# Request cookies dictionary:
self._cookies = cookies
# Parser for assembling messages:
self._chunk_parser = None
# aiohttp connector for keep-alive:
"""
:class:`.Event` fired when the client connects for the first time.
"""
self.on_reconnect = event.Event('Client.on_reconnect')
"""
:class:`.Event` fired when the client reconnects after being
disconnected.
"""
self.on_disconnect = event.Event('Client.on_disconnect')
"""
:class:`.Event` fired when the client is disconnected.
"""
self.on_state_update = event.Event('Client.on_state_update')
"""
:class:`.Event` fired when an update arrives from the server.
Args:
state_update: A ``StateUpdate`` message.
"""
# http_utils.Session instance (populated by .connect()):
self._session = None
# Cookies required to initialize Session:
self._cookies = cookies
# channel.Channel instance (populated by .connect()):
self._channel = None
self._conversation = conversation # hangouts_pb2.Conversation
self._events = [] # [hangouts_pb2.Event]
self._events_dict = {} # {event_id: ConversationEvent}
self._send_message_lock = asyncio.Lock()
for event_ in events:
self.add_event(event_)
# Event fired when a user starts or stops typing with arguments
# (typing_message).
self.on_typing = event.Event('Conversation.on_typing')
# Event fired when a new ConversationEvent arrives with arguments
# (ConversationEvent).
self.on_event = event.Event('Conversation.on_event')
# Event fired when a watermark (read timestamp) is updated with
# arguments (WatermarkNotification).
self.on_watermark_notification = event.Event(
'Conversation.on_watermark_notification'
)
self.on_watermark_notification.add_observer(
self._on_watermark_notification
)
def __init__(self, cookies, connector):
"""Create a new channel."""
# Event fired when channel connects with arguments ():
self.on_connect = event.Event('Channel.on_connect')
# Event fired when channel reconnects with arguments ():
self.on_reconnect = event.Event('Channel.on_reconnect')
# Event fired when channel disconnects with arguments ():
self.on_disconnect = event.Event('Channel.on_disconnect')
# Event fired when a channel submission is received with arguments
# (submission):
self.on_message = event.Event('Channel.on_message')
# True if the channel is currently connected:
self._is_connected = False
# True if the on_connect event has been called at least once:
self._on_connect_called = False
# Request cookies dictionary:
self._cookies = cookies
# Parser for assembling messages:
self._push_parser = None
# aiohttp connector for keep-alive:
self._connector = connector
# Discovered parameters:
self._sid_param = None
self._gsessionid_param = None
# Initialize the list of conversations from Client's list of
# hangouts_pb2.ConversationState.
for conv_state in conv_states:
self.add_conversation(conv_state.conversation, conv_state.event)
self._client.on_state_update.add_observer(self._on_state_update)
self._client.on_connect.add_observer(self._sync)
self._client.on_reconnect.add_observer(self._sync)
# Event fired when a new ConversationEvent arrives with arguments
# (ConversationEvent).
self.on_event = event.Event('ConversationList.on_event')
# Event fired when a user starts or stops typing with arguments
# (typing_message).
self.on_typing = event.Event('ConversationList.on_typing')
# Event fired when a watermark (read timestamp) is updated with
# arguments (WatermarkNotification).
self.on_watermark_notification = event.Event(
'ConversationList.on_watermark_notification'
)
self._event_cont_token = event_cont_token
for event_ in events:
# Workaround to ignore observed events returned from
# syncrecentconversations.
if event_.event_type != hangouts_pb2.EVENT_TYPE_OBSERVED_EVENT:
self.add_event(event_)
self.on_event = event.Event('Conversation.on_event')
"""
:class:`.Event` fired when an event occurs in this conversation.
Args:
conv_event: :class:`.ConversationEvent` that occurred.
"""
self.on_typing = event.Event('Conversation.on_typing')
"""
:class:`.Event` fired when a users starts or stops typing in this
conversation.
Args:
typing_message: :class:`~hangups.parsers.TypingStatusMessage` that
occurred.
"""
self.on_watermark_notification = event.Event(
'Conversation.on_watermark_notification'
)
"""
:class:`.Event` fired when a watermark (read timestamp) is updated for
this conversation.
for conv_state in conv_states:
self.add_conversation(conv_state.conversation, conv_state.event)
self._client.on_state_update.add_observer(self._on_state_update)
self._client.on_connect.add_observer(self._sync)
self._client.on_reconnect.add_observer(self._sync)
# Event fired when a new ConversationEvent arrives with arguments
# (ConversationEvent).
self.on_event = event.Event('ConversationList.on_event')
# Event fired when a user starts or stops typing with arguments
# (typing_message).
self.on_typing = event.Event('ConversationList.on_typing')
# Event fired when a watermark (read timestamp) is updated with
# arguments (WatermarkNotification).
self.on_watermark_notification = event.Event(
'ConversationList.on_watermark_notification'
)
self._add_conversation(conv_state.conversation, conv_state.event,
conv_state.event_continuation_token)
self._client.on_state_update.add_observer(self._on_state_update)
self._client.on_connect.add_observer(self._sync)
self._client.on_reconnect.add_observer(self._sync)
self.on_event = event.Event('ConversationList.on_event')
"""
:class:`.Event` fired when an event occurs in any conversation.
Args:
conv_event: :class:`ConversationEvent` that occurred.
"""
self.on_typing = event.Event('ConversationList.on_typing')
"""
:class:`.Event` fired when a users starts or stops typing in any
conversation.
Args:
typing_message: :class:`~hangups.parsers.TypingStatusMessage` that
occurred.
"""
self.on_watermark_notification = event.Event(
'ConversationList.on_watermark_notification'
)
"""
:class:`.Event` fired when a watermark (read timestamp) is updated for
def __init__(self, client, user_list, conversation, events=[]):
"""Initialize a new Conversation."""
# pylint: disable=dangerous-default-value
self._client = client # Client
self._user_list = user_list # UserList
self._conversation = conversation # hangouts_pb2.Conversation
self._events = [] # [hangouts_pb2.Event]
self._events_dict = {} # {event_id: ConversationEvent}
self._send_message_lock = asyncio.Lock()
for event_ in events:
self.add_event(event_)
# Event fired when a user starts or stops typing with arguments
# (typing_message).
self.on_typing = event.Event('Conversation.on_typing')
# Event fired when a new ConversationEvent arrives with arguments
# (ConversationEvent).
self.on_event = event.Event('Conversation.on_event')
# Event fired when a watermark (read timestamp) is updated with
# arguments (WatermarkNotification).
self.on_watermark_notification = event.Event(
'Conversation.on_watermark_notification'
)
self.on_watermark_notification.add_observer(
self._on_watermark_notification
)
self._conv_dict = {} # {conv_id: Conversation}
self._sync_timestamp = sync_timestamp # datetime
self._user_list = user_list # UserList
# Initialize the list of conversations from Client's list of
# hangouts_pb2.ConversationState.
for conv_state in conv_states:
self.add_conversation(conv_state.conversation, conv_state.event)
self._client.on_state_update.add_observer(self._on_state_update)
self._client.on_connect.add_observer(self._sync)
self._client.on_reconnect.add_observer(self._sync)
# Event fired when a new ConversationEvent arrives with arguments
# (ConversationEvent).
self.on_event = event.Event('ConversationList.on_event')
# Event fired when a user starts or stops typing with arguments
# (typing_message).
self.on_typing = event.Event('ConversationList.on_typing')
# Event fired when a watermark (read timestamp) is updated with
# arguments (WatermarkNotification).
self.on_watermark_notification = event.Event(
'ConversationList.on_watermark_notification'
)