Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test__validate_rejects_error_without_error(self):
iq = stanza.IQ(structs.IQType.ERROR)
iq.autoset_id()
with self.assertRaisesRegex(
ValueError,
r"IQ with type='error' requires error payload"):
iq._validate()
def test_is_iq_payload(self):
self.assertIn(
rfc3921.Session.TAG,
stanza.IQ.CHILD_MAP
)
self.assertIs(
stanza.IQ.CHILD_MAP[rfc3921.Session.TAG],
stanza.IQ.payload.xq_descriptor
)
def test_registered_at_IQ(self):
self.assertIn(
disco_xso.InfoQuery.TAG,
stanza.IQ.CHILD_MAP
)
def test_on_entry_removed(self):
request = roster_xso.Query(
items=[
roster_xso.Item(
jid=self.user1,
subscription="remove",
),
],
ver="foobar"
)
iq = stanza.IQ(type_=structs.IQType.SET)
iq.payload = request
old_item = self.s.items[self.user1]
cb = unittest.mock.Mock()
with self.s.on_entry_removed.context_connect(cb):
run_coroutine(self.s.handle_roster_push(iq))
run_coroutine(self.s.handle_roster_push(iq))
self.assertSequenceEqual(
[
unittest.mock.call(old_item),
],
cb.mock_calls
)
def test_item_objects_do_not_change_during_push(self):
old_item = self.s.items[self.user1]
request = roster_xso.Query(
items=[
roster_xso.Item(
jid=self.user1,
subscription="both"
),
],
ver="foobar"
)
iq = stanza.IQ(type_=structs.IQType.SET)
iq.payload = request
self.assertIsNone(
run_coroutine(self.s.handle_roster_push(iq))
)
self.assertIs(old_item, self.s.items[self.user1])
self.assertEqual("both", old_item.subscription)
post_groups = (existing.groups | add_to_groups) - remove_from_groups
post_name = existing.name
if name is not _Sentinel:
post_name = name
item = roster_xso.Item(
jid=jid,
name=post_name,
groups=[
roster_xso.Group(name=group_name)
for group_name in post_groups
])
yield from self.client.send(
stanza.IQ(
structs.IQType.SET,
payload=roster_xso.Query(items=[
item
])
),
timeout=timeout
)
:param jid: Address of the PubSub service.
:type jid: :class:`aioxmpp.JID`
:param config: Configuration form
:type config: :class:`aioxmpp.forms.Data`
:param node: Name of the PubSub node to query.
:type node: :class:`str`
:raises aioxmpp.errors.XMPPError: as returned by the service
:return: The configuration of the node.
:rtype: :class:`~.forms.Data`
.. seealso::
:class:`aioxmpp.pubsub.NodeConfigForm`
"""
iq = aioxmpp.stanza.IQ(to=jid, type_=aioxmpp.structs.IQType.SET)
iq.payload = pubsub_xso.OwnerRequest(
pubsub_xso.OwnerConfigure(node=node)
)
iq.payload.payload.data = config
yield from self.client.send(iq)
# raise if it is not a stanza
if not isinstance(stanza_obj, stanza.StanzaBase):
raise RuntimeError(
"unexpected stanza class: {}".format(stanza_obj))
# now handle stanzas, these always increment the SM counter
if self._sm_enabled:
self._sm_inbound_ctr += 1
self._sm_inbound_ctr &= 0xffffffff
# check if the stanza has errors
if exc is not None:
self._process_incoming_erroneous_stanza(stanza_obj, exc)
return
if isinstance(stanza_obj, stanza.IQ):
self._process_incoming_iq(stanza_obj)
elif isinstance(stanza_obj, stanza.Message):
self._process_incoming_message(stanza_obj)
elif isinstance(stanza_obj, stanza.Presence):
self._process_incoming_presence(stanza_obj)
Update the affiliations at a node.
:param jid: Address of the PubSub service.
:type jid: :class:`aioxmpp.JID`
:param node: Name of the node to modify
:type node: :class:`str`
:param affiliations_to_set: The affiliations to set at the node.
:type affiliations_to_set: :class:`~collections.abc.Iterable` of tuples
consisting of the JID to affiliate and the affiliation to use.
:raises aioxmpp.errors.XMPPError: as returned by the service
`affiliations_to_set` must be an iterable of pairs (`jid`,
`affiliation`), where the `jid` indicates the JID for which the
`affiliation` is to be set.
"""
iq = aioxmpp.stanza.IQ(
type_=aioxmpp.structs.IQType.SET,
to=jid,
payload=pubsub_xso.OwnerRequest(
pubsub_xso.OwnerAffiliations(
node,
affiliations=[
pubsub_xso.OwnerAffiliation(
jid,
affiliation
)
for jid, affiliation in affiliations_to_set
]
)
)
)