Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_different_connections_in_conn_groups(self):
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=self.hosts,
metadata_max_age_ms=10000)
await client.bootstrap()
self.add_cleanup(client.close)
node_id = client.get_random_node()
broker = client.cluster.broker_metadata(node_id)
client.cluster.add_coordinator(
node_id, broker.host, broker.port, rack=None,
purpose=(CoordinationType.GROUP, ""))
conn1 = await client._get_conn(node_id)
conn2 = await client._get_conn(
node_id, group=ConnectionGroup.COORDINATION)
async def test_exclude_internal_topics(self):
# Create random topic
my_topic = "some_noninternal_topic"
client = AIOKafkaClient(
loop=self.loop, bootstrap_servers=self.hosts,
client_id="test_autocreate")
await client.bootstrap()
await client._wait_on_metadata(my_topic)
await client.close()
# Check if only it will be subscribed
pattern = "^.*$"
consumer = AIOKafkaConsumer(
loop=self.loop, bootstrap_servers=self.hosts,
metadata_max_age_ms=200, group_id="some_group_1",
auto_offset_reset="earliest",
exclude_internal_topics=False)
consumer.subscribe(pattern=pattern)
await consumer.start()
self.assertIn("__consumer_offsets", consumer.subscription())
async def test_compacted_topic_consumption(self):
# Compacted topics can have offsets skipped
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=[])
client.ready = mock.MagicMock()
client.ready.side_effect = asyncio.coroutine(lambda a: True)
client.force_metadata_update = mock.MagicMock()
client.force_metadata_update.side_effect = asyncio.coroutine(
lambda: False)
client.send = mock.MagicMock()
subscriptions = SubscriptionState(loop=self.loop)
fetcher = Fetcher(client, subscriptions, loop=self.loop)
tp = TopicPartition('test', 0)
req = FetchRequest(
-1, # replica_id
100, 100, [(tp.topic, [(tp.partition, 155, 100000)])])
async def test_bootstrap(self):
client = AIOKafkaClient(loop=self.loop,
bootstrap_servers='0.42.42.42:444')
with self.assertRaises(ConnectionError):
await client.bootstrap()
client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
await client.bootstrap()
await self.wait_topic(client, 'test_topic')
metadata = await client.fetch_all_metadata()
self.assertTrue('test_topic' in metadata.topics())
client.set_topics(['t2', 't3'])
client.set_topics(['t2', 't3']) # should be ignored
client.add_topic('t2') # shold be ignored
# bootstrap again -- no error expected
await client.bootstrap()
await client.close()
async def test_concurrent_send_on_different_connection_groups(self):
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=self.hosts,
metadata_max_age_ms=10000)
await client.bootstrap()
self.add_cleanup(client.close)
await self.wait_topic(client, self.topic)
node_id = client.get_random_node()
broker = client.cluster.broker_metadata(node_id)
client.cluster.add_coordinator(
node_id, broker.host, broker.port, rack=None,
purpose=(CoordinationType.GROUP, ""))
wait_request = FetchRequest_v0(
-1, # replica_id
async def test_force_metadata_update_multiple_times(self):
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=self.hosts,
metadata_max_age_ms=10000)
await client.bootstrap()
self.add_cleanup(client.close)
orig = client._metadata_update
with mock.patch.object(client, '_metadata_update') as mocked:
async def new(*args, **kw):
await asyncio.sleep(0.2, loop=self.loop)
return (await orig(*args, **kw))
mocked.side_effect = new
client.force_metadata_update()
await asyncio.sleep(0.01, loop=self.loop)
self.assertEqual(
def _setup_error_after_data(self):
subscriptions = SubscriptionState(loop=self.loop)
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=[])
fetcher = Fetcher(client, subscriptions, loop=self.loop)
tp1 = TopicPartition('some_topic', 0)
tp2 = TopicPartition('some_topic', 1)
subscriptions.subscribe(set(["some_topic"]))
subscriptions.assign_from_subscribed({tp1, tp2})
assignment = subscriptions.subscription.assignment
subscriptions.seek(tp1, 0)
subscriptions.seek(tp2, 0)
# Add some data
messages = [ConsumerRecord(
topic="some_topic", partition=1, offset=0, timestamp=0,
timestamp_type=0, key=None, value=b"some", checksum=None,
async def test_coordinator_ensure_coordinator_known(self):
client = AIOKafkaClient(loop=self.loop, bootstrap_servers=self.hosts)
subscription = SubscriptionState(loop=self.loop)
subscription.subscribe(topics=set(['topic1']))
coordinator = GroupCoordinator(
client, subscription, loop=self.loop,
heartbeat_interval_ms=20000)
coordinator._coordination_task.cancel() # disable for test
try:
await coordinator._coordination_task
except asyncio.CancelledError:
pass
coordinator._coordination_task = self.loop.create_task(
asyncio.sleep(0.1, loop=self.loop)
)
self.add_cleanup(coordinator.close)
client.ready = mock.Mock()
async def test_fetcher_offsets_for_times(self):
client = AIOKafkaClient(
loop=self.loop,
bootstrap_servers=[])
client.ready = mock.MagicMock()
client.ready.side_effect = asyncio.coroutine(lambda a: True)
client._maybe_wait_metadata = mock.MagicMock()
client._maybe_wait_metadata.side_effect = asyncio.coroutine(
lambda: False)
client.cluster.leader_for_partition = mock.MagicMock()
client.cluster.leader_for_partition.return_value = 0
client._api_version = (0, 10, 1)
subscriptions = SubscriptionState(loop=self.loop)
fetcher = Fetcher(client, subscriptions, loop=self.loop)
tp0 = TopicPartition("topic", 0)
tp1 = TopicPartition("topic", 1)
__all__ = [
# Clients API
"AIOKafkaProducer",
"AIOKafkaConsumer",
# ABC's
"ConsumerRebalanceListener",
# Errors
"ConsumerStoppedError", "IllegalOperation",
# Structs
"ConsumerRecord", "TopicPartition", "OffsetAndTimestamp",
"OffsetAndMetadata"
]
(PY_35, ensure_future, AIOKafkaClient)