Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@moto.mock_sqs
def test_should_send_sms_all_notifications(notify_api, notify_db, notify_db_session, notify_config, mocker):
mocker.patch('app.sms_wrapper.send', return_value=("1234", "twilio"))
create_notification = Notification(
id=1000,
to="to",
message="message",
created_at=datetime.utcnow(),
status='created',
method='sms',
job_id=1234
)
db.session.add(create_notification)
db.session.commit()
read_notification_2 = Notification.query.get(1234)
@mock_sqs
@patch("pyqs.worker.sys")
def test_master_handles_signals(sys):
"""
Test managing process handles OS signals
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")
# Mock out sys.exit
sys.exit = Mock()
# Have our inner method send our signal
def process_counts():
os.kill(os.getpid(), signal.SIGTERM)
@moto.mock_sqs
def test_send_sms_messages_to_queue_adds_message_queue(notify_api):
q = set_up_mock_queue('sms')
sms = Notification(to='+441234512345',
message='hello world',
job_id=1234)
q.send_message(MessageBody=json.dumps(sms.serialize()),
MessageAttributes={'type': {'StringValue': 'sms', 'DataType': 'String'}})
messages = [notification]
b = send_messages_to_queue('sms', messages)
assert b is True
@mock_sqs
def test_receive_messages_with_message_group_id_on_requeue():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"}
)
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
queue.send_message(MessageBody="message-2", MessageGroupId="group")
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
@mock_sqs
def test_receive_message_for_queue_with_receive_message_wait_time_seconds_set():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="test-queue", Attributes={"ReceiveMessageWaitTimeSeconds": "2"}
)
queue.receive_messages()
@mock_sqs
def test_set_queue_attribute():
sqs = boto3.resource("sqs", region_name="us-east-1")
conn = boto3.client("sqs", region_name="us-east-1")
conn.create_queue(QueueName="test-queue", Attributes={"VisibilityTimeout": "3"})
queue = sqs.Queue("test-queue")
queue.attributes["VisibilityTimeout"].should.equal("3")
queue.set_attributes(Attributes={"VisibilityTimeout": "45"})
queue = sqs.Queue("test-queue")
queue.attributes["VisibilityTimeout"].should.equal("45")
@moto.mock_sqs
def test_should_allow_correctly_formed_sms_request(notify_api, notify_db, notify_db_session, notify_config):
data_for_post = json.dumps({
"notification": {
"to": "+441234512345",
"message": "hello world"
}
})
set_up_mock_queue(data_for_post, 'sms')
response = notify_api.test_client().post(
'/sms/notification',
headers={
'Authorization': 'Bearer 1234'
},
data=data_for_post,
content_type='application/json'
@mock_sqs
def test_receive_messages_with_message_group_id():
sqs = boto3.resource("sqs", region_name="us-east-1")
queue = sqs.create_queue(
QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"}
)
queue.set_attributes(Attributes={"VisibilityTimeout": "3600"})
queue.send_message(MessageBody="message-1", MessageGroupId="group")
queue.send_message(MessageBody="message-2", MessageGroupId="group")
messages = queue.receive_messages()
messages.should.have.length_of(1)
message = messages[0]
# received message is not deleted!
messages = queue.receive_messages(WaitTimeSeconds=0)
@mock_sqs
@mock_sns
def test_publish_subject():
conn = boto3.client("sns", region_name="us-east-1")
conn.create_topic(Name="some-topic")
response = conn.list_topics()
topic_arn = response["Topics"][0]["TopicArn"]
sqs_conn = boto3.resource("sqs", region_name="us-east-1")
sqs_conn.create_queue(QueueName="test-queue")
conn.subscribe(
TopicArn=topic_arn,
Protocol="sqs",
Endpoint="arn:aws:sqs:us-east-1:123456789012:test-queue",
)
message = "my message"
@mock_sqs
@mock_sns
@mock_ses
def test_no_sns_feedback():
__test_sns_feedback__("test", None)