Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def get_trusted_http_payload(request, webhook_secret):
"""Get a verified HTTP request body from request."""
http_req_headers = request.headers
is_secret_provided = webhook_secret is not None
is_payload_signed = 'x-hub-signature' in http_req_headers
if is_payload_signed and not is_secret_provided:
raise ValidationFailure('secret not provided')
if not is_payload_signed and is_secret_provided:
raise ValidationFailure('signature is missing')
raw_http_req_body = await request.read()
if is_payload_signed and is_secret_provided:
validate_webhook_payload(
payload=raw_http_req_body,
signature=http_req_headers['x-hub-signature'],
secret=webhook_secret,
)
return raw_http_req_body
def validate_event(payload: bytes, *, signature: str, secret: str) -> None:
"""Validate the signature of a webhook event."""
# https://developer.github.com/webhooks/securing/#validating-payloads-from-github
signature_prefix = "sha1="
if not signature.startswith(signature_prefix):
raise ValidationFailure("signature does not start with "
f"{repr(signature_prefix)}")
hmac_ = hmac.new(secret.encode("UTF-8"), msg=payload, digestmod="sha1")
calculated_sig = signature_prefix + hmac_.hexdigest()
if not hmac.compare_digest(signature, calculated_sig):
raise ValidationFailure("payload's signature does not align "
"with the secret")
def validate_event(payload: bytes, *, signature: str, secret: str) -> None:
"""Validate the signature of a webhook event."""
# https://developer.github.com/webhooks/securing/#validating-payloads-from-github
signature_prefix = "sha1="
if not signature.startswith(signature_prefix):
raise ValidationFailure("signature does not start with "
f"{repr(signature_prefix)}")
hmac_ = hmac.new(secret.encode("UTF-8"), msg=payload, digestmod="sha1")
calculated_sig = signature_prefix + hmac_.hexdigest()
if not hmac.compare_digest(signature, calculated_sig):
raise ValidationFailure("payload's signature does not align "
"with the secret")
async def dispatch(request):
github = request.app.plugins["github"]
payload = await request.read()
try:
event = Event.from_http(request.headers, payload, secret=github.verify)
await github.router.dispatch(event, app=request.app)
except ValidationFailure:
LOG.debug(
"Github webhook failed verification: %s, %s", request.headers, payload
)
return Response(status=401)
except Exception as e:
LOG.exception(e)
return Response(status=500)
else:
return Response(status=200)
is performed for a content-type of "application/json" (GitHub does
support other content-types). If the content-type does not match,
BadRequest is raised.
If the appropriate headers are provided for event validation, then it
will be performed unconditionally. Any failure in validation
(including not providing a secret) will lead to ValidationFailure being
raised.
"""
if "x-hub-signature" in headers:
if secret is None:
raise ValidationFailure("secret not provided")
validate_event(body, signature=headers["x-hub-signature"],
secret=secret)
elif secret is not None:
raise ValidationFailure("signature is missing")
try:
data = _decode_body(headers["content-type"], body, strict=True)
except (KeyError, ValueError) as exc:
raise BadRequest(http.HTTPStatus(415),
"expected a content-type of "
"'application/json' or "
"'application/x-www-form-urlencoded'") from exc
return cls(data, event=headers["x-github-event"],
delivery_id=headers["x-github-delivery"])
The mapping providing the headers is expected to support lowercase keys.
Since this method assumes the body of the HTTP request is JSON, a check
is performed for a content-type of "application/json" (GitHub does
support other content-types). If the content-type does not match,
BadRequest is raised.
If the appropriate headers are provided for event validation, then it
will be performed unconditionally. Any failure in validation
(including not providing a secret) will lead to ValidationFailure being
raised.
"""
if "x-hub-signature" in headers:
if secret is None:
raise ValidationFailure("secret not provided")
validate_event(body, signature=headers["x-hub-signature"],
secret=secret)
elif secret is not None:
raise ValidationFailure("signature is missing")
try:
data = _decode_body(headers["content-type"], body, strict=True)
except (KeyError, ValueError) as exc:
raise BadRequest(http.HTTPStatus(415),
"expected a content-type of "
"'application/json' or "
"'application/x-www-form-urlencoded'") from exc
return cls(data, event=headers["x-github-event"],
delivery_id=headers["x-github-delivery"])
async def get_event_from_request(request, webhook_secret):
"""Retrieve Event out of HTTP request if it's valid."""
webhook_event_signature = request.headers.get(
'X-Hub-Signature', '',
)
try:
http_req_body = await get_trusted_http_payload(
request, webhook_secret,
)
except ValidationFailure as no_signature_exc:
logger.error(
EVENT_LOG_INVALID_MSG,
request.headers.get('X-GitHub-Event'),
request.headers.get('X-GitHub-Delivery'),
webhook_event_signature,
)
logger.debug(
'Webhook HTTP query signature validation failed because: %s',
no_signature_exc,
)
raise web.HTTPForbidden from no_signature_exc
else:
event = GidgetHubWebhookEvent.from_http_request(
http_req_headers=request.headers,
http_req_body=http_req_body,
)
async def get_trusted_http_payload(request, webhook_secret):
"""Get a verified HTTP request body from request."""
http_req_headers = request.headers
is_secret_provided = webhook_secret is not None
is_payload_signed = 'x-hub-signature' in http_req_headers
if is_payload_signed and not is_secret_provided:
raise ValidationFailure('secret not provided')
if not is_payload_signed and is_secret_provided:
raise ValidationFailure('signature is missing')
raw_http_req_body = await request.read()
if is_payload_signed and is_secret_provided:
validate_webhook_payload(
payload=raw_http_req_body,
signature=http_req_headers['x-hub-signature'],
secret=webhook_secret,
)
return raw_http_req_body