Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_nofails_classes(self):
boto_session = boto3.Session(region_name=os.environ['AWS_DEFAULT_REGION'])
a = AsyncException()
l = LambdaAsyncResponse(boto_session=boto_session)
# s = SnsAsyncResponse()
s = SnsAsyncResponse(arn="arn:abc:def", boto_session=boto_session)
payload = json.dumps(message).encode('utf-8')
if len(payload) > LAMBDA_ASYNC_PAYLOAD_LIMIT: # pragma: no cover
raise AsyncException("Payload too large for SNS")
self.response = self.client.publish(
TargetArn=self.arn,
Message=payload
)
self.sent = self.response.get('MessageId')
##
# Aync Routers
##
ASYNC_CLASSES = {
'lambda': LambdaAsyncResponse,
'sns': SnsAsyncResponse,
}
def route_lambda_task(event, context):
"""
Deserialises the message from event passed to zappa.handler.run_function
imports the function, calls the function with args
"""
message = event
return run_message(message)
def route_sns_task(event, context):
"""
Gets SNS Message, deserialises the message,
imports the function, calls the function with args