Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, api_key=None, proxy_dict=None, env=None, json_encoder=None, adapter=None):
if api_key:
self._FCM_API_KEY = api_key
elif os.getenv('FCM_API_KEY', None):
self._FCM_API_KEY = os.getenv('FCM_API_KEY', None)
else:
raise AuthenticationError("Please provide the api_key in the google-services.json file")
self.FCM_REQ_PROXIES = None
self.requests_session = requests.Session()
retries = Retry(backoff_factor=1, status_forcelist=[502, 503],
method_whitelist=(Retry.DEFAULT_METHOD_WHITELIST | frozenset(['POST'])))
self.requests_session.mount('http://', adapter or HTTPAdapter(max_retries=retries))
self.requests_session.mount('https://', adapter or HTTPAdapter(max_retries=retries))
self.requests_session.headers.update(self.request_headers())
self.requests_session.mount(self.INFO_END_POINT, HTTPAdapter(max_retries=self.INFO_RETRIES))
if proxy_dict and isinstance(proxy_dict, dict) and (('http' in proxy_dict) or ('https' in proxy_dict)):
self.FCM_REQ_PROXIES = proxy_dict
self.requests_session.proxies.update(proxy_dict)
self.send_request_responses = []
if env == 'app_engine':
failure = parsed_response.get('failure', 0)
canonical_ids = parsed_response.get('canonical_ids', 0)
results = parsed_response.get('results', [])
message_id = parsed_response.get('message_id', None) # for topic messages
if message_id:
success = 1
if multicast_id:
response_dict['multicast_ids'].append(multicast_id)
response_dict['success'] += success
response_dict['failure'] += failure
response_dict['canonical_ids'] += canonical_ids
response_dict['results'].extend(results)
response_dict['topic_message_id'] = message_id
elif response.status_code == 401:
raise AuthenticationError("There was an error authenticating the sender account")
elif response.status_code == 400:
raise InvalidDataError(response.text)
elif response.status_code == 404:
raise FCMNotRegisteredError("Token not registered")
else:
raise FCMServerError("FCM server is temporarily unavailable")
return response_dict
def send_request(self, payloads=None):
for payload in payloads:
request = HTTPRequest(
url=self.FCM_END_POINT,
method='POST',
headers=self.request_headers(),
body=payload
)
response = yield self.http_client.fetch(request)
if response.code == 200:
yield self.parse_response(response)
return
elif response.code == 401:
raise AuthenticationError('There was an error authenticating the sender account')
elif response.code == 400:
raise InternalPackageError('Unknown internal error')
else:
raise FCMServerError('FCM server is temporarily unavailable')