Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.channel_layer.send("first_channel", {"pay": "load"})
for _ in range(4):
with self.assertRaises(RedisSentinelChannelLayer.ChannelFull):
self.channel_layer.send("first_channel", {"pay": "load"})
# check that channel full exception are counted as such, not towards messages
self.assertEqual(self.channel_layer.global_statistics()["channel_full_count"], 4)
self.assertEqual(
self.channel_layer.channel_statistics("first_channel")["channel_full_count"], 4)
# Encrypted variant of conformance tests
@unittest.skipUnless(sentinel_exists(), "Redis sentinel not running")
class EncryptedRedisLayerTests(ConformanceTestCase):
expiry_delay = 1.1
receive_tries = len(SERVICE_NAMES)
@classmethod
def setUpClass(cls):
super(EncryptedRedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
symmetric_encryption_keys=["test", "old"],
services=SERVICE_NAMES,
)
@classmethod
def setUpClass(cls):
super(EncryptedRedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
symmetric_encryption_keys=["test", "old"],
services=SERVICE_NAMES,
)
# Test that the backend can auto-discover masters from Sentinel
@unittest.skipUnless(sentinel_exists(), "Redis sentinel not running")
class AutoDiscoverRedisLayerTests(ConformanceTestCase):
expiry_delay = 1.1
receive_tries = len(SERVICE_NAMES)
@classmethod
def setUpClass(cls):
super(AutoDiscoverRedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
)
# Test that the backend can cache Sentinel master connections if given a refresh interval
def sentinel_exists():
if not SENTINEL_HOSTS:
return False
sen = redis.sentinel.Sentinel(SENTINEL_HOSTS)
try:
sen.discover_master(SERVICE_NAMES[0])
except MasterNotFoundError:
return False
return True
# Default conformance tests
@unittest.skipUnless(sentinel_exists(), "Redis sentinel not running")
class RedisLayerTests(ConformanceTestCase):
expiry_delay = 1.1
receive_tries = len(SERVICE_NAMES)
@classmethod
def setUpClass(cls):
super(RedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
services=SERVICE_NAMES
)
# The functionality this test is for is not yet present (it's not required,
receive_tries = len(SERVICE_NAMES)
@classmethod
def setUpClass(cls):
super(AutoDiscoverRedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
)
# Test that the backend can cache Sentinel master connections if given a refresh interval
@unittest.skipUnless(sentinel_exists(), "Redis sentinel not running")
class CachingRedisLayerTests(ConformanceTestCase):
expiry_delay = 1.1
receive_tries = len(SERVICE_NAMES)
@classmethod
def setUpClass(cls):
super(CachingRedisLayerTests, cls).setUpClass()
cls.channel_layer = RedisSentinelChannelLayer(
hosts=SENTINEL_HOSTS,
expiry=1,
group_expiry=2,
capacity=5,
services=SERVICE_NAMES,
sentinel_refresh_interval=60,
)
from __future__ import unicode_literals
from asgi_redis import RedisLocalChannelLayer
from asgiref.conformance import ConformanceTestCase
from .constants import REDIS_HOSTS
# Local layer conformance tests
class RedisLocalLayerTests(ConformanceTestCase):
expiry_delay = 1.1
capacity_limit = 5
@classmethod
def setUpClass(cls):
super(RedisLocalLayerTests, cls).setUpClass()
cls.channel_layer = RedisLocalChannelLayer(
hosts=REDIS_HOSTS,
expiry=1,
group_expiry=2,
capacity=5
)
from __future__ import unicode_literals
from asgi_ipc import IPCChannelLayer
from asgiref.conformance import ConformanceTestCase
# Default conformance tests
class IPCLayerTests(ConformanceTestCase):
channel_layer = IPCChannelLayer(expiry=1, group_expiry=2, capacity=5)
expiry_delay = 1.1
capacity_limit = 5
def setUpClass(cls):
# Don't let this actual class run, it's abstract
if cls is ConformanceTestCase:
raise unittest.SkipTest("Skipping base class tests")