Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_client_pool(self, hostname, mock_socket_values, serializer=None, **kwargs):
mock_client = pymemcache.client.base.Client(
hostname, serializer=serializer, **kwargs
)
tracer = get_dummy_tracer()
Pin.override(mock_client, tracer=tracer)
mock_client.sock = MockSocket(mock_socket_values)
client = pymemcache.client.base.PooledClient(hostname, serializer=serializer)
client.client_pool = pymemcache.pool.ObjectPool(lambda: mock_client)
return mock_client
encoding='ascii'):
self.server = server
self.serde = serde or LegacyWrappingSerde(serializer, deserializer)
self.connect_timeout = connect_timeout
self.timeout = timeout
self.no_delay = no_delay
self.ignore_exc = ignore_exc
self.socket_module = socket_module
self.default_noreply = default_noreply
self.allow_unicode_keys = allow_unicode_keys
if isinstance(key_prefix, six.text_type):
key_prefix = key_prefix.encode('ascii')
if not isinstance(key_prefix, bytes):
raise TypeError("key_prefix should be bytes.")
self.key_prefix = key_prefix
self.client_pool = pool.ObjectPool(
self._create_client,
after_remove=lambda client: client.close(),
max_size=max_pool_size,
lock_generator=lock_generator)
self.encoding = encoding