From f7a128e1504ef480eaee00d0bcd4b9d8592781d2 Mon Sep 17 00:00:00 2001 From: Jan Benicek Date: Mon, 14 Oct 2024 10:38:15 +0200 Subject: [PATCH] 2FA and Logout added --- pdm.lock | 13 ++- pyproject.toml | 1 + src/auth/router.py | 27 +++-- src/auth/schemas.py | 4 + src/auth/services.py | 177 +++++++++++++++++++++++-------- src/base/totp.py | 16 +++ src/middleware/baseMiddleware.py | 6 +- 7 files changed, 189 insertions(+), 55 deletions(-) create mode 100644 src/base/totp.py diff --git a/pdm.lock b/pdm.lock index c9e9589..72dfdb7 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:b57525e7baee5eff69181af4f35e3137f3f5fea752042dc53c35886d639f35d6" +content_hash = "sha256:44570334cf53d86490f0cf7ecef005ebd02a7d89e42b9b865d1fc882b5b113b3" [[package]] name = "annotated-types" @@ -353,6 +353,17 @@ files = [ {file = "pyjwt-2.9.0.tar.gz", hash = "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c"}, ] +[[package]] +name = "pyotp" +version = "2.9.0" +requires_python = ">=3.7" +summary = "Python One Time Password Library" +groups = ["default"] +files = [ + {file = "pyotp-2.9.0-py3-none-any.whl", hash = "sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612"}, + {file = "pyotp-2.9.0.tar.gz", hash = "sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63"}, +] + [[package]] name = "python-dotenv" version = "1.0.1" diff --git a/pyproject.toml b/pyproject.toml index 6fdfe27..72c4357 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "psycopg2>=2.9.9", "passlib>=1.7.4", "argon2-cffi>=23.1.0", + "pyotp>=2.9.0", ] requires-python = ">=3.12" readme = "README.md" diff --git a/src/auth/router.py b/src/auth/router.py index 5b488f9..a07f27a 100644 --- a/src/auth/router.py +++ b/src/auth/router.py @@ -1,6 +1,8 @@ from fastapi import APIRouter, Depends, status, HTTPException, Response, Request -from src.auth.schemas import Login_request +from src.auth.schemas import Login_request, Login_reply, verify_2FA + +from src.auth.services import login_user, verify_2FA_login, logout_user router = APIRouter(prefix="/auth") @@ -10,24 +12,33 @@ router = APIRouter(prefix="/auth") status_code=status.HTTP_202_ACCEPTED, summary="Login user", description="Login user with email and password", + response_model=Login_reply, ) -def login(login: Login_request, request: Request, response: Response): - if not login.email.__contains__("@"): +def login(login_request: Login_request, request: Request, response: Response): + if not login_request.email.__contains__("@"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No valid email address" ) - if login.password == "" or input.email == "": + if login_request.password == "" or input.email == "": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email or password is empty" ) - return "/api/auth/login" + return login_user(login_request=login_request, db=request.state.db, request=request, response=response) -@router.post("/logout") -def logout(): - return "/api/auth/logout" +@router.post("/2FA", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply) +def verify_2FA_auth(code: verify_2FA, request: Request, response: Response): + if code.auth_code is None or code.auth_code == "": + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad input data") + + return verify_2FA_login(code=code, db=request.state.db, request=request, response=response) + + +@router.get("/logout", status_code=status.HTTP_200_OK, summary="logout user", description="Log out user", response_model=Login_reply) +def logout(request: Request, response: Response): + return logout_user(db=request.state.db, request=request, response=response) @router.post("/register") diff --git a/src/auth/schemas.py b/src/auth/schemas.py index c943932..0c47123 100644 --- a/src/auth/schemas.py +++ b/src/auth/schemas.py @@ -10,3 +10,7 @@ class Login_reply(BaseModel): logged_in: bool requested_2FA: bool type_2FA: str + + +class verify_2FA(BaseModel): + auth_code: str \ No newline at end of file diff --git a/src/auth/services.py b/src/auth/services.py index a9f3d36..b2030d5 100644 --- a/src/auth/services.py +++ b/src/auth/services.py @@ -1,12 +1,14 @@ -from src.auth.schemas import Login_request, Login_reply +from src.auth.schemas import Login_request, Login_reply, verify_2FA from src.user.models import User from src.auth.models import Auth_2fa_pending, Auth_token_valid from src.auth.pwd import password_hash, password_verify from src.auth.static import Types_2FA from src.base.jwt import jwt_encode, jwt_decode from src.base.random_string import random_string +from src.base.totp import totp_generate_secret, totp_verify from fastapi import Response, status, HTTPException, Request from sqlalchemy.orm import Session +from sqlalchemy import and_ import datetime, os @@ -14,60 +16,103 @@ def __generate_2FA_pending_token( user: User, db: Session, response: Response, code: str = "" ) -> bool: """Create 2FA_token cookie and save validate data to DB (2FA pending)""" - data = {} - data["sub"] = user.id - data["exp"] = ( - datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))) - ).timestamp() + try: + data = {} + data["sub"] = user.id + data["exp"] = ( + datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))) + ).timestamp() - jwt_token = jwt_encode(data=data) + jwt_token = jwt_encode(data=data) - response.set_cookie( - key="2FA_token_temp", - value=jwt_token, - httponly=True, - expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60), - ) - db.add( - Auth_2fa_pending( - user=user, - token=jwt_token, - code=code, - expire=datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))), + response.set_cookie( + key="2FA_token_temp", + value=jwt_token, + httponly=True, + expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60), ) - ) + + db.add( + Auth_2fa_pending( + user=user, + token=jwt_token, + code=code, + expire=datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))), + ) + ) + return True + except: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating cookie or DB connection") + +def __remove_2FA_pending_token(pending: Auth_2fa_pending, db: Session, response: Response) -> bool: + db.query(Auth_2fa_pending).filter(Auth_2fa_pending.id == pending.id).delete() + response.delete_cookie("2FA_token_temp") + return True def __generate_acces_token(user: User, db: Session, response: Response) -> bool: """Create acces_token cookie and save validate data to DB (login)""" - data = {} - data["sub"] = user.id - data["exp"] = ( - datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))) - ).timestamp() - data["rand"] = random_string(8) - response.set_cookie( - key="acces_token", - value=jwt_encode(data=data), - httponly=True, - expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60), - ) - db.add( - Auth_token_valid( - user=user, - random=data["rand"], - expire=datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))), + try: + data = {} + data["sub"] = user.id + data["exp"] = ( + datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))) + ).timestamp() + data["rand"] = random_string(8) + response.set_cookie( + key="acces_token", + value=jwt_encode(data=data), + httponly=True, + expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60), ) - ) + db.add( + Auth_token_valid( + user=user, + random=data["rand"], + expire=datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))), + ) + ) + return True + except: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating acces cookie or DB connection") -def login( +def __generate_refresh_token(user: User, db: Session, response: Response) -> bool: + """Create refresh_token cookie and save validate data to DB (login)""" + try: + data = {} + data["sub"] = user.id + data["exp"] = ( + datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))) + ).timestamp() + data["rand"] = random_string(8) + response.set_cookie( + key="refresh_token", + value=jwt_encode(data=data), + httponly=True, + expires=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")) * 60), + ) + db.add( + Auth_token_valid( + user=user, + random=data["rand"], + expire=datetime.datetime.now() + + datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))), + ) + ) + return True + except: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating refresh cookie or DB connection") + + +def login_user( form: Login_request, db: Session, response: Response, request: Request -) -> Response: +) -> Login_reply: user: User = ( db.query(User).filter(User.email == form.email).first() ) # search user by email @@ -91,7 +136,7 @@ def login( __generate_acces_token(user=user, db=db, response=response) return Login_reply( logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY - ) + ) #log in without 2FA only if valid refresh_token # match types of 2FA and run specified tasks match user.type_2fa: @@ -105,3 +150,45 @@ def login( return Login_reply( logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY ) # not logged in + + +def verify_2FA_login(code: verify_2FA, db: Session, request: Request, response: Response) -> Login_reply: + jwt_cookie = request.cookies.get("2FA_token_temp", None) + + if jwt_cookie is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized") + + jwt = jwt_decode(jwt_cookie) + + if jwt is None or datetime.datetime.fromtimestamp(jwt["exp"]) < datetime.datetime.now(): + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized") + + auth_pending = db.query(Auth_2fa_pending).filter(and_(Auth_2fa_pending.token == jwt_cookie, Auth_2fa_pending.user_id == jwt["sub"])).first() + + match auth_pending.user.type_2fa: + case Types_2FA.TOTP: + if totp_verify(auth_pending.user.totp_secret, code.auth_code): + __generate_acces_token(user=auth_pending.user, db=db, response=response) + __generate_refresh_token(user=auth_pending.user, db=db, response=response) + __remove_2FA_pending_token(pending=auth_pending, db=db, response=response) + return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY) + #here add another verify 2FA + case _: + if auth_pending.code == code.auth_code: + __generate_acces_token(user=auth_pending.user, db=db, response=response) + __generate_refresh_token(user=auth_pending.user, db=db, response=response) + __remove_2FA_pending_token(pending=auth_pending, db=db, response=response) + return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY) + + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad code and cookie combination") + + +def logout_user(db: Session, request: Request, response:Response) -> Login_reply: + acces_token_random = jwt_decode(request.cookies.get("acces_token"))["rand"] + refresh_token_random = jwt_decode(request.cookies.get("refresh_token"))["rand"] + user_id = jwt_decode(request.cookies.get("refresh_token"))["sub"] + response.delete_cookie("refresh_token") + response.delete_cookie("acces_token") + db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == acces_token_random, Auth_token_valid.user_id == user_id)).delete() + db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == refresh_token_random, Auth_token_valid.user_id == user_id)).delete() + return Login_reply(logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY) diff --git a/src/base/totp.py b/src/base/totp.py new file mode 100644 index 0000000..8dd3e97 --- /dev/null +++ b/src/base/totp.py @@ -0,0 +1,16 @@ +import pyotp +from fastapi import HTTPException, status + + + +def totp_verify(totp_secret: str, totp_code: str) -> bool: + totp = pyotp.TOTP(totp_secret) + + if totp.verify(totp_code): + return True + else: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad OTP Code") + + +def totp_generate_secret(): + return pyotp.random_base32(length=64) \ No newline at end of file diff --git a/src/middleware/baseMiddleware.py b/src/middleware/baseMiddleware.py index 740d9d9..11c1010 100644 --- a/src/middleware/baseMiddleware.py +++ b/src/middleware/baseMiddleware.py @@ -46,4 +46,8 @@ class BaseMiddleware(BaseHTTPMiddleware): detail="Acces token not valid, please log-in", ) - return await call_next(request) + response = await call_next(request) + + request.state.db.commit() + + return response