Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_write_serde_v0_v1_no_compression(magic, key, value, checksum):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024 * 1024)
builder.append(0, timestamp=9999999, key=key, value=value)
buffer = builder.build()
batch = LegacyRecordBatch(buffer, magic)
msgs = list(batch)
assert len(msgs) == 1
msg = msgs[0]
assert msg.offset == 0
assert msg.timestamp == (9999999 if magic else None)
assert msg.timestamp_type == (0 if magic else None)
assert msg.key == key
assert msg.value == value
assert msg.checksum == checksum[magic] & 0xffffffff
def test_written_bytes_equals_size_in_bytes(magic):
key = b"test"
value = b"Super"
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024 * 1024)
size_in_bytes = builder.size_in_bytes(
0, timestamp=9999999, key=key, value=value)
pos = builder.size()
builder.append(0, timestamp=9999999, key=key, value=value)
assert builder.size() - pos == size_in_bytes
def test_record_overhead():
known = {
0: 14,
1: 22,
}
for magic, size in known.items():
assert LegacyRecordBatchBuilder.record_overhead(magic) == size
def test_legacy_correct_metadata_response(magic):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024 * 1024)
meta = builder.append(
0, timestamp=9999999, key=b"test", value=b"Super")
assert meta.offset == 0
assert meta.timestamp == (9999999 if magic else -1)
assert meta.crc == (-2095076219 if magic else 278251978) & 0xffffffff
assert repr(meta) == (
"LegacyRecordMetadata(offset=0, crc={}, size={}, "
"timestamp={})".format(meta.crc, meta.size, meta.timestamp)
)
def _make_compressed_batch(magic):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=LegacyRecordBatch.CODEC_GZIP,
batch_size=1024 * 1024)
for offset in range(10):
builder.append(
offset, timestamp=9999999, key=b"test", value=b"Super")
return builder.build()
client.ready = mock.MagicMock()
client.ready.side_effect = asyncio.coroutine(lambda a: True)
client.force_metadata_update = mock.MagicMock()
client.force_metadata_update.side_effect = asyncio.coroutine(
lambda: False)
client.send = mock.MagicMock()
subscriptions = SubscriptionState(loop=self.loop)
fetcher = Fetcher(client, subscriptions, loop=self.loop)
tp = TopicPartition('test', 0)
req = FetchRequest(
-1, # replica_id
100, 100, [(tp.topic, [(tp.partition, 155, 100000)])])
builder = LegacyRecordBatchBuilder(
magic=1, compression_type=0, batch_size=99999999)
builder.append(160, value=b"12345", key=b"1", timestamp=None)
builder.append(162, value=b"23456", key=b"2", timestamp=None)
builder.append(167, value=b"34567", key=b"3", timestamp=None)
batch = bytes(builder.build())
resp = FetchResponse(
[('test', [(
0, 0, 3000, # partition, error_code, highwater_offset
batch # Batch raw bytes
)])])
subscriptions.assign_from_user({tp})
assignment = subscriptions.subscription.assignment
tp_state = assignment.state_value(tp)
client.send.side_effect = asyncio.coroutine(lambda n, r: resp)
async def test_send_without_response(self):
"""Imitate producer without acknowledge, in this case client produces
messages and kafka does not send response, and we make sure that
futures do not stuck in queue forever"""
host, port = self.kafka_host, self.kafka_port
conn = await create_conn(host, port, loop=self.loop)
# prepare message
builder = LegacyRecordBatchBuilder(
magic=1, compression_type=0, batch_size=99999999)
builder.append(offset=0, value=b"foo", key=None, timestamp=None)
request = ProduceRequest(
required_acks=0, timeout=10 * 1000,
topics=[(b'foo', [(0, bytes(builder.build()))])])
# produce messages without acknowledge
req = []
for i in range(10):
req.append(conn.send(request, expect_response=False))
# make sure futures no stuck in queue
self.assertEqual(len(conn._requests), 0)
for x in req:
await x
conn.close()
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI. Default: PLAIN
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
Note:
Many configuration parameters are taken from the Java client:
https://kafka.apache.org/documentation.html#producerconfigs
"""
_PRODUCER_CLIENT_ID_SEQUENCE = 0
_COMPRESSORS = {
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
}
_closed = None # Serves as an uninitialized flag for __del__
_source_traceback = None
def __init__(self, *, loop, bootstrap_servers='localhost',
client_id=None,
metadata_max_age_ms=300000, request_timeout_ms=40000,
api_version='auto', acks=_missing,
key_serializer=None, value_serializer=None,
compression_type=None, max_batch_size=16384,
partitioner=DefaultPartitioner(), max_request_size=1048576,
linger_ms=0, send_backoff_ms=100,
retry_backoff_ms=100, security_protocol="PLAINTEXT",
def _serialize(self, topic, key, value):
if self._key_serializer:
serialized_key = self._key_serializer(key)
else:
serialized_key = key
if self._value_serializer:
serialized_value = self._value_serializer(value)
else:
serialized_value = value
message_size = LegacyRecordBatchBuilder.record_overhead(
self._producer_magic)
if serialized_key is not None:
message_size += len(serialized_key)
if serialized_value is not None:
message_size += len(serialized_value)
if message_size > self._max_request_size:
raise MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
" max_request_size configuration" % message_size)
return serialized_key, serialized_value
def prepare(magic: int):
samples = []
for _ in range(BATCH_SAMPLES):
batch = LegacyRecordBatchBuilder(
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
for offset in range(MESSAGES_PER_BATCH):
size = batch.append(
offset,
None, # random.randint(*TIMESTAMP_RANGE)
random_bytes(KEY_SIZE),
random_bytes(VALUE_SIZE))
assert size
samples.append(bytes(batch.build()))
return iter(itertools.cycle(samples))