Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def from_unix(unix_sec):
"""Create a Timestamp from posix timestamp in seconds.
:param unix_float: Posix timestamp in seconds.
:type unix_float: int or float.
"""
seconds = int(unix_sec // 1)
nanoseconds = int((unix_sec % 1) * 10 ** 9)
return Timestamp(seconds, nanoseconds)
def test_slice_with_limit(self):
message = self.request_message('SLICE', ['1', 3])
header, content = self.handler.command(message)
plain_header = msgpack.unpackb(header)
plain_content = msgpack.unpackb(content)
self.assertEqual(plain_header['status'], SUCCESS_STATUS)
self.assertIsInstance(plain_content['datas'], tuple)
self.assertEqual(len(plain_content['datas']), 3)
self.assertEqual(plain_content['datas'][0], ('1', '11'))
self.assertEqual(plain_content['datas'][1], ('2', '12'))
self.assertEqual(plain_content['datas'][2], ('3', '13'))
def test_pack_datetime():
t = Timestamp(42, 14000)
dt = t.to_datetime()
assert dt == datetime.datetime(1970, 1, 1, 0, 0, 42, 14, tzinfo=_utc)
packed = msgpack.packb(dt, datetime=True)
packed2 = msgpack.packb(t)
assert packed == packed2
unpacked = msgpack.unpackb(packed)
print(packed, unpacked)
assert unpacked == t
unpacked = msgpack.unpackb(packed, timestamp=3)
assert unpacked == dt
x = []
packed = msgpack.packb(dt, datetime=False, default=x.append)
assert x
assert x[0] == dt
assert msgpack.unpackb(packed) is None
def _decode(self, payload):
"""
Helper function that decodes data based on the given Encoder.
"""
if isinstance(self.api._encoder, JSONEncoder):
return json.loads(payload)
elif isinstance(self.api._encoder, MsgpackEncoder):
return msgpack.unpackb(payload, encoding='utf-8')
def test_put_of_valid_key(self):
message = self.request_message('PUT', ['a', '1'])
header, content = self.handler.command(message)
plain_header = msgpack.unpackb(header)
plain_content = msgpack.unpackb(content)
self.assertEqual(plain_header['status'], SUCCESS_STATUS)
self.assertEqual(plain_content['datas'], None)
coll.insert(mock_raw_event(1, i))
start_date = 20
end_date = 50
filter = {'sid' : [1]}
args = (coll, filter, start_date, end_date)
cursor = create_pymongo_iterator(*args)
# We filter to only get dt's between 20 and 50
expected = (mock_raw_event(1, i) for i in xrange(20, 51))
# Assert that our iterator returns the expected values.
for cursor_event, expected_event in izip_longest(cursor, expected):
del cursor_event['_id']
# Easiest way to convert unicode to strings.
cursor_event = msgpack.loads(msgpack.dumps(cursor_event))
assert expected_event.keys() == cursor_event.keys()
assert expected_event.values() == cursor_event.values()
def _make_fake_socket(self, family, type):
udp_socket = self.mox.CreateMockAnything()
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp_socket.bind((self.CONF.collector.udp_address,
self.CONF.collector.udp_port))
def stop_udp(anything):
# Make the loop stop
self.srv.stop()
udp_socket.recvfrom(64 * 1024).WithSideEffects(
stop_udp).AndReturn(
(msgpack.dumps(self.counter),
('127.0.0.1', 12345)))
self.mox.ReplayAll()
return udp_socket
def _send_event(self, event):
if not isinstance(event, dict):
event = self._event_to_dict(event)
# event to a msgpack body message
body = msgpack.Packer().pack(event)
# big endian body len
body_len = struct.pack(">I", len(body))
# first write body length
self._unix_stream_server._connection_socket.sendall(body_len)
# then write body content
self._unix_stream_server._connection_socket.sendall(body)
def setUp(self):
factory = EchoServerFactory(True)
self.proto = factory.buildProtocol(("127.0.0.1", 0))
self.transport = proto_helpers.StringTransport()
self.proto.makeConnection(self.transport)
self.packer = msgpack.Packer(encoding="utf-8")
def testPackUnicode():
test_data = ["", "abcd", ["defgh"], "Русский текст"]
for td in test_data:
re = unpackb(packb(td), use_list=1, raw=False)
assert re == td
packer = Packer()
data = packer.pack(td)
re = Unpacker(BytesIO(data), raw=False, use_list=1).unpack()
assert re == td