Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@sub(topic="test-topic", prefix="rele")
def sub_stub(data, **kwargs):
return data["id"]
@sub(topic="photo-updated", filter_by=landscape_filter)
def sub_process_landscape_photos(data, **kwargs):
return f'Received a photo of type {kwargs.get("type")}'
@sub(topic="some-cool-topic", prefix="rele")
def crashy_sub_stub(data, **kwargs):
raise ValueError("I am an exception from a sub")
@sub(topic="published-time-type")
def sub_published_time_type(data, **kwargs):
logger.info(f'{type(kwargs["published_at"])}')
@sub(topic="photo-updated", filter_by=[landscape_filter, gif_filter])
def sub_process_landscape_gif_photos(data, **kwargs):
return f'Received a {kwargs.get("format")} photo of type {kwargs.get("type")}'
@sub(topic="some-cool-topic", prefix="rele")
def sub_stub(data, **kwargs):
return data["id"]
@sub(topic="some-fancy-topic")
def sub_fancy_stub(data, **kwargs):
logger.info(
f'I used to have a prefix, but not anymore, only {data["id"]}'
f'id {kwargs["lang"]}'
)
return data["id"]
@sub(topic="some-cool-topic", prefix="rele")
def sub_stub(data, **kwargs):
print(f"I am a task doing stuff.")
@sub(topic="some-cool-topic", prefix="rele")
def sub_stub(data, **kwargs):
logger.info(f'I am a task doing stuff with ID {data["id"]} ' f'({kwargs["lang"]})')
return data["id"]