Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_subscriptions_in_a_module(self, subscriptions):
assert len(subscriptions) == 1
func_sub = subscriptions[0]
assert isinstance(func_sub, Subscription)
assert func_sub.name == "rele-test-topic"
assert func_sub({"id": 4}, lang="en") == 4
from rele import Subscription, sub
@sub(topic="some-cool-topic", prefix="rele")
def sub_stub(data, **kwargs):
return data["id"]
class ClassBasedSub(Subscription):
topic = "alternative-cool-topic"
def __init__(self):
self._func = self.callback_func
super().__init__(self._func, self.topic)
def callback_func(self, data, **kwargs):
return data["id"]
class CustomException(Exception):
pass
def some_other_type():
print("Im a function")
def test_loads_subscriptions_when_they_are_class_based(self):
subscriptions = load_subscriptions_from_paths(
["tests.subs"],
sub_prefix="test",
filter_by=[lambda attrs: attrs.get("lang") == "en"],
)
assert len(subscriptions) == 2
klass_sub = subscriptions[0]
assert isinstance(klass_sub, Subscription)
assert klass_sub.name == "test-alternative-cool-topic"
assert klass_sub({"id": 4}, lang="en") == 4
def test_subs_without_prefix_return_subscription_objects(self):
assert isinstance(sub_fancy_stub, Subscription)
assert sub_fancy_stub.topic == "some-fancy-topic"
assert sub_fancy_stub.name == "some-fancy-topic"
def test_subs_return_subscription_objects(self):
assert isinstance(sub_stub, Subscription)
assert sub_stub.topic == "some-cool-topic"
assert sub_stub.name == "rele-some-cool-topic"