Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_http_exception():
detail = "Random HTTP error happened."
status_code = status.HTTP_400_BAD_REQUEST
error_code = 999
with pytest.raises(HTTPException) as excinfo:
raise HTTPException(
status_code=status_code,
error_code=error_code,
detail=detail,
fields=[{"field": "because of this."}],
)
exc = excinfo.value
assert exc.error_code == error_code
assert exc.detail == detail
assert exc.status_code == status_code
assert exc.fields == [{"field": "because of this."}]
) -> None:
user_dict = {
"username": "not_taken_username",
"password": "password",
"email": "free_email@email.com",
}
user_dict.update({credentials_part: credentials_value})
async with pool.acquire() as conn:
users_repo = UsersRepository(conn)
await users_repo.create_user(**user_dict)
response = await authorized_client.put(
app.url_path_for("users:update-current-user"),
json={"user": {credentials_part: credentials_value}},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_bad_request_exception():
detail = "Some really bad request was sent."
error_code = 999
with pytest.raises(BadRequestError) as excinfo:
raise BadRequestError(
error_code=error_code,
detail=detail
)
exc = excinfo.value
assert exc.error_code == error_code
assert exc.detail == detail
assert exc.status_code == status.HTTP_400_BAD_REQUEST
credentials_part: str,
credentials_value: str,
) -> None:
registration_json = {
"user": {
"email": "test@test.com",
"username": "username",
"password": "password",
}
}
registration_json["user"][credentials_part] = credentials_value
response = await client.post(
app.url_path_for("auth:register"), json=registration_json
)
assert response.status_code == HTTP_400_BAD_REQUEST
async def update_current_user(
user_update: UserInUpdate = Body(..., embed=True, alias="user"),
current_user: User = Depends(get_current_user_authorizer()),
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserInResponse:
if user_update.username and user_update.username != current_user.username:
if await check_username_is_taken(users_repo, user_update.username):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.USERNAME_TAKEN
)
if user_update.email and user_update.email != current_user.email:
if await check_email_is_taken(users_repo, user_update.email):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.EMAIL_TAKEN
)
user = await users_repo.update_user(user=current_user, **user_update.dict())
token = jwt.create_access_token_for_user(user, str(config.SECRET_KEY))
return UserInResponse(
user=UserWithToken(
username=user.username,
email=user.email,
bio=user.bio,
self._debug_log(operation_name, query, variables)
background = BackgroundTasks()
context = {"request": request, "background": background}
result = await self.execute(
query, variables=variables, context=context, operation_name=operation_name
)
error_data = (
[format_graphql_error(err) for err in result.errors]
if result.errors
else None
)
response_data = {"data": result.data, "errors": error_data}
status_code = (
status.HTTP_400_BAD_REQUEST if result.errors else status.HTTP_200_OK
)
return JSONResponse(
response_data, status_code=status_code, background=background
)
async def create_new_article(
article_create: ArticleInCreate = Body(..., embed=True, alias="article"),
user: User = Depends(get_current_user_authorizer()),
articles_repo: ArticlesRepository = Depends(get_repository(ArticlesRepository)),
) -> ArticleInResponse:
slug = get_slug_for_article(article_create.title)
if await check_article_exists(articles_repo, slug):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=strings.ARTICLE_ALREADY_EXISTS,
)
article = await articles_repo.create_article(
slug=slug,
title=article_create.title,
description=article_create.description,
body=article_create.body,
author=user,
tags=article_create.tags,
)
return ArticleInResponse(article=ArticleForResponse.from_orm(article))
async def reset_password(token: str = Body(...), password: str = Body(...)):
try:
data = jwt.decode(
token,
reset_password_token_secret,
audience=reset_password_token_audience,
algorithms=[JWT_ALGORITHM],
)
user_id = data.get("user_id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.RESET_PASSWORD_BAD_TOKEN,
)
user = await user_db.get(user_id)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.RESET_PASSWORD_BAD_TOKEN,
)
user.hashed_password = get_password_hash(password)
await user_db.update(user)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.RESET_PASSWORD_BAD_TOKEN,
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
)
else:
return PlainTextResponse(
"Method Not Allowed", status_code=status.HTTP_405_METHOD_NOT_ALLOWED
)
try:
query = data["query"]
variables = data.get("variables")
operation_name = data.get("operationName")
except KeyError:
return PlainTextResponse(
"No GraphQL query found in the request",
status_code=status.HTTP_400_BAD_REQUEST,
)
background = BackgroundTasks()
context = {"request": request, "background": background}
result = await self.execute(
query, variables=variables, context=context, operation_name=operation_name
)
error_data = (
[format_graphql_error(err) for err in result.errors]
if result.errors
else None
)
response_data = {"data": result.data}
if error_data:
response_data["errors"] = error_data
async def update_current_user(
user_update: UserInUpdate = Body(..., embed=True, alias="user"),
current_user: User = Depends(get_current_user_authorizer()),
users_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserInResponse:
if user_update.username and user_update.username != current_user.username:
if await check_username_is_taken(users_repo, user_update.username):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.USERNAME_TAKEN
)
if user_update.email and user_update.email != current_user.email:
if await check_email_is_taken(users_repo, user_update.email):
raise HTTPException(
status_code=HTTP_400_BAD_REQUEST, detail=strings.EMAIL_TAKEN
)
user = await users_repo.update_user(user=current_user, **user_update.dict())
token = jwt.create_access_token_for_user(user, str(config.SECRET_KEY))
return UserInResponse(
user=UserWithToken(
username=user.username,
email=user.email,
bio=user.bio,
image=user.image,
token=token,
)