Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initiate_callback(self, state, auth_data, urlopen, expect_success=True, **kargs):
headers = {"Content-Type": "application/json"}
urlopen.return_value = MockResponse(headers, json.dumps(auth_data))
query = urlencode({"code": "1234", "state": state})
resp = self.client.get(u"{}?{}".format(self.sso_path, query), **kargs)
if expect_success:
assert resp.status_code == 200
assert urlopen.called
assert urlopen.call_args[1]["data"]["code"] == "1234"
assert urlopen.call_args[1]["data"]["client_secret"] == "my_client_secret"
return resp
def _patch_artifact_manifest(path, org, release, project=None):
manifest = json.loads(open(path, "rb").read())
manifest["org"] = org
manifest["release"] = release
if project:
manifest["project"] = project
return json.dumps(manifest)
def test_inf(self):
res = float("inf")
self.assertEquals(json.dumps(res), "null")
def to_string(self, is_public=False, **kwargs):
return json.dumps({'csp-report': self.get_api_context()}, indent=2)
)
resp = client.post(
endpoint,
data=data,
auth=god,
)
except client.ApiError as exc:
return HttpResponse(
status=exc.status_code,
content=exc.body,
content_type='application/json',
)
return HttpResponse(
status=resp.status_code,
content=json.dumps(resp.data),
content_type='application/json',
)
def to_json_string(self):
"""
>>> x = _ConfigBase( a = _ConfigBase(b = _ConfigBase( w=[1,2,3])))
>>> x.to_json_string()
'{"a": {"b": {"w": [1, 2, 3]}}}'
:return:
"""
data = self.to_dict()
data = _to_camel_case_dict(data)
return utils.json.dumps(data)
return
# We want to send only serializable items from request.META
meta = {}
for key, value in request.META.items():
try:
json.dumps([key, value])
meta[key] = value
except (TypeError, ValueError):
pass
meta["SENTRY_API_VIEW_NAME"] = self.__class__.__name__
kafka_publisher.publish(
channel=getattr(settings, "KAFKA_RAW_EVENTS_PUBLISHER_TOPIC", "raw-store-events"),
value=json.dumps([meta, base64.b64encode(data), project_config.to_dict()]),
)
except Exception as e:
logger.debug("Cannot publish event to Kafka: {}".format(e.message))
return HttpResponseForbidden()
try:
group = Group.objects.get(pk=gid)
except Group.DoesNotExist:
return HttpResponseForbidden()
gb, created = GroupBookmark.objects.get_or_create(
project=group.project,
user=request.user,
group=group,
)
if not created:
gb.delete()
response = HttpResponse(json.dumps({'bookmarked': created}))
response['Content-Type'] = 'application/json'
return response
def send(self, **kwargs):
"Sends the message to the server."
if settings.SERVERS:
message = base64.b64encode(json.dumps(kwargs).encode('zlib'))
for url in settings.SERVERS:
timestamp = time.time()
signature = get_signature(message, timestamp)
headers = {
'Authorization': get_auth_header(signature, timestamp, '%s/%s' % (self.__class__.__name__, sentry.VERSION)),
'Content-Type': 'application/octet-stream',
}
try:
return self.send_remote(url=url, data=message, headers=headers)
except urllib2.HTTPError, e:
body = e.read()
logger.error('Unable to reach Sentry log server: %s (url: %%s, body: %%s)' % (e,), url, body,
exc_info=True, extra={'data': {'body': body, 'remote_url': url}})
logger.log(kwargs.pop('level', None) or logging.ERROR, kwargs.pop('message', None))
except urllib2.URLError, e:
def build_payload(self, group, event, triggering_rules):
payload = {
"message": event.message,
"alias": "sentry: %d" % group.id,
"source": "Sentry",
"details": {
"Sentry ID": six.text_type(group.id),
"Sentry Group": getattr(group, "message_short", group.message).encode("utf-8"),
"Checksum": group.checksum,
"Project ID": group.project.slug,
"Project Name": group.project.name,
"Logger": group.logger,
"Level": group.get_level_display(),
"URL": group.get_absolute_url(),
"Triggering Rules": json.dumps(triggering_rules),
},
"entity": group.culprit,
}
payload["tags"] = [
"%s:%s" % (six.text_type(x).replace(",", ""), six.text_type(y).replace(",", ""))
for x, y in event.tags
]
return payload