Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_format_bundle():
bundle, stats = format_bundle((message_1[0], message_2[0]), timetag=None)
assert struct.pack('>%iB' % len(message_1[1]), *message_1[1]) in bundle
assert struct.pack('>%iB' % len(message_2[1]), *message_2[1]) in bundle
timetag, messages = read_bundle(bundle)
assert timetag == approx(time())
assert len(messages) == 2
assert messages[0][::2] == message_1[2]
assert messages[1][::2] == message_2[2]
assert stats.calls == 2
assert stats.bytes == 72
assert stats.params == 6
assert stats.types['f'] == 3
assert stats.types['i'] == 2
example:
(
('/create', ['name', 'value']),
('/select', ['name']),
('/update', ['name', 'value2']),
('/delete', ['name']),
)
`timetag` is optional but can be a float of the number of seconds
since 1970 when the events described in the bundle should happen.
See `send_message` documentation for the other parameters.
"""
if not sock:
sock = SOCK
bundle, stats = format_bundle(
messages, timetag=timetag, encoding=encoding,
encoding_errors=encoding_errors
)
sock.sendto(bundle, (ip_address, port))
if safer:
sleep(10e-9)
return stats