diff --git a/src/auth/models.py b/src/auth/models.py index 0409581..9d78fa5 100644 --- a/src/auth/models.py +++ b/src/auth/models.py @@ -9,8 +9,9 @@ class Auth_token_valid(Base): id = Column(Integer, primary_key=True, index=True, autoincrement=True) user_id = Column(Integer, ForeignKey("users.id"), index=True) - random = Column(String) + random = Column(String, index=True, unique=True) expire = Column(DateTime) acces = Column(Boolean) user = relationship("User", back_populates="auth_token_valid") + diff --git a/src/auth/router.py b/src/auth/router.py index a07f27a..e52fdb4 100644 --- a/src/auth/router.py +++ b/src/auth/router.py @@ -2,8 +2,6 @@ from fastapi import APIRouter, Depends, status, HTTPException, Response, Request from src.auth.schemas import Login_request, Login_reply, verify_2FA -from src.auth.services import login_user, verify_2FA_login, logout_user - router = APIRouter(prefix="/auth") @@ -15,30 +13,17 @@ router = APIRouter(prefix="/auth") response_model=Login_reply, ) def login(login_request: Login_request, request: Request, response: Response): - if not login_request.email.__contains__("@"): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail="No valid email address" - ) - - if login_request.password == "" or input.email == "": - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail="Email or password is empty" - ) - - return login_user(login_request=login_request, db=request.state.db, request=request, response=response) + return "/api/auth/login" @router.post("/2FA", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply) def verify_2FA_auth(code: verify_2FA, request: Request, response: Response): - if code.auth_code is None or code.auth_code == "": - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad input data") - - return verify_2FA_login(code=code, db=request.state.db, request=request, response=response) + return "/api/auth/2FA" @router.get("/logout", status_code=status.HTTP_200_OK, summary="logout user", description="Log out user", response_model=Login_reply) def logout(request: Request, response: Response): - return logout_user(db=request.state.db, request=request, response=response) + return "/api/auth/logout" @router.post("/register") diff --git a/src/auth/services.py b/src/auth/services.py index b2030d5..e69de29 100644 --- a/src/auth/services.py +++ b/src/auth/services.py @@ -1,194 +0,0 @@ -from src.auth.schemas import Login_request, Login_reply, verify_2FA -from src.user.models import User -from src.auth.models import Auth_2fa_pending, Auth_token_valid -from src.auth.pwd import password_hash, password_verify -from src.auth.static import Types_2FA -from src.base.jwt import jwt_encode, jwt_decode -from src.base.random_string import random_string -from src.base.totp import totp_generate_secret, totp_verify -from fastapi import Response, status, HTTPException, Request -from sqlalchemy.orm import Session -from sqlalchemy import and_ -import datetime, os - - -def __generate_2FA_pending_token( - user: User, db: Session, response: Response, code: str = "" -) -> bool: - """Create 2FA_token cookie and save validate data to DB (2FA pending)""" - try: - data = {} - data["sub"] = user.id - data["exp"] = ( - datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))) - ).timestamp() - - jwt_token = jwt_encode(data=data) - - response.set_cookie( - key="2FA_token_temp", - value=jwt_token, - httponly=True, - expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60), - ) - - db.add( - Auth_2fa_pending( - user=user, - token=jwt_token, - code=code, - expire=datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))), - ) - ) - return True - except: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating cookie or DB connection") - -def __remove_2FA_pending_token(pending: Auth_2fa_pending, db: Session, response: Response) -> bool: - db.query(Auth_2fa_pending).filter(Auth_2fa_pending.id == pending.id).delete() - response.delete_cookie("2FA_token_temp") - return True - - -def __generate_acces_token(user: User, db: Session, response: Response) -> bool: - """Create acces_token cookie and save validate data to DB (login)""" - try: - data = {} - data["sub"] = user.id - data["exp"] = ( - datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))) - ).timestamp() - data["rand"] = random_string(8) - response.set_cookie( - key="acces_token", - value=jwt_encode(data=data), - httponly=True, - expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60), - ) - db.add( - Auth_token_valid( - user=user, - random=data["rand"], - expire=datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))), - ) - ) - return True - except: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating acces cookie or DB connection") - - -def __generate_refresh_token(user: User, db: Session, response: Response) -> bool: - """Create refresh_token cookie and save validate data to DB (login)""" - try: - data = {} - data["sub"] = user.id - data["exp"] = ( - datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))) - ).timestamp() - data["rand"] = random_string(8) - response.set_cookie( - key="refresh_token", - value=jwt_encode(data=data), - httponly=True, - expires=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")) * 60), - ) - db.add( - Auth_token_valid( - user=user, - random=data["rand"], - expire=datetime.datetime.now() - + datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))), - ) - ) - return True - except: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating refresh cookie or DB connection") - - -def login_user( - form: Login_request, db: Session, response: Response, request: Request -) -> Login_reply: - user: User = ( - db.query(User).filter(User.email == form.email).first() - ) # search user by email - - if user == None or not password_verify( - password_plain=form.password, password_hashed=user.password - ): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, detail="Bad email or password" - ) # if user not exist or received bad password return 403 error - - refresh_token = request.cookies.get("refresh_token", None) - if refresh_token: - refresh_token_decoded = jwt_decode(refresh_token) - if ( - refresh_token_decoded != None - and refresh_token_decoded["sub"] == user.id - and datetime.datetime.fromtimestamp(refresh_token_decoded["exp"]) - > datetime.datetime.now() - ): - __generate_acces_token(user=user, db=db, response=response) - return Login_reply( - logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY - ) #log in without 2FA only if valid refresh_token - - # match types of 2FA and run specified tasks - match user.type_2fa: - case Types_2FA.TOTP: - __generate_2FA_pending_token(user=user, db=db, response=response) - return Login_reply( - logged_in=False, requested_2FA=True, type_2FA=Types_2FA.TOTP - ) - # here add more supported 2FA methods - - return Login_reply( - logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY - ) # not logged in - - -def verify_2FA_login(code: verify_2FA, db: Session, request: Request, response: Response) -> Login_reply: - jwt_cookie = request.cookies.get("2FA_token_temp", None) - - if jwt_cookie is None: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized") - - jwt = jwt_decode(jwt_cookie) - - if jwt is None or datetime.datetime.fromtimestamp(jwt["exp"]) < datetime.datetime.now(): - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized") - - auth_pending = db.query(Auth_2fa_pending).filter(and_(Auth_2fa_pending.token == jwt_cookie, Auth_2fa_pending.user_id == jwt["sub"])).first() - - match auth_pending.user.type_2fa: - case Types_2FA.TOTP: - if totp_verify(auth_pending.user.totp_secret, code.auth_code): - __generate_acces_token(user=auth_pending.user, db=db, response=response) - __generate_refresh_token(user=auth_pending.user, db=db, response=response) - __remove_2FA_pending_token(pending=auth_pending, db=db, response=response) - return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY) - #here add another verify 2FA - case _: - if auth_pending.code == code.auth_code: - __generate_acces_token(user=auth_pending.user, db=db, response=response) - __generate_refresh_token(user=auth_pending.user, db=db, response=response) - __remove_2FA_pending_token(pending=auth_pending, db=db, response=response) - return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY) - - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad code and cookie combination") - - -def logout_user(db: Session, request: Request, response:Response) -> Login_reply: - acces_token_random = jwt_decode(request.cookies.get("acces_token"))["rand"] - refresh_token_random = jwt_decode(request.cookies.get("refresh_token"))["rand"] - user_id = jwt_decode(request.cookies.get("refresh_token"))["sub"] - response.delete_cookie("refresh_token") - response.delete_cookie("acces_token") - db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == acces_token_random, Auth_token_valid.user_id == user_id)).delete() - db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == refresh_token_random, Auth_token_valid.user_id == user_id)).delete() - return Login_reply(logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY) diff --git a/src/auth/static.py b/src/auth/static.py index e797f39..3fc2b3f 100644 --- a/src/auth/static.py +++ b/src/auth/static.py @@ -2,6 +2,6 @@ from enum import Enum class Types_2FA(Enum): - ANY = "ANY" + NONE = "NONE" TOTP = "TOTP" EMAIL = "EMAIL" diff --git a/src/main.py b/src/main.py index d7cdae8..a49e1c0 100644 --- a/src/main.py +++ b/src/main.py @@ -3,7 +3,7 @@ from src.middleware.baseMiddleware import BaseMiddleware from src.base.database import engine, Base from src.user.models import User -from src.auth.models import Auth_2fa_pending, Auth_token_valid +from src.auth.models import Auth_token_valid from src.auth.router import router as authRouter diff --git a/src/middleware/baseMiddleware.py b/src/middleware/baseMiddleware.py index 11c1010..1f01235 100644 --- a/src/middleware/baseMiddleware.py +++ b/src/middleware/baseMiddleware.py @@ -17,7 +17,7 @@ class BaseMiddleware(BaseHTTPMiddleware): detail="Cannot connect to database", ) - acces_cookie = request.cookies.get("acces_token", None) + """acces_cookie = request.cookies.get("acces_token", None) if acces_cookie != None: acces_cookie_decoded = jwt_decode(acces_cookie) @@ -44,7 +44,7 @@ class BaseMiddleware(BaseHTTPMiddleware): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Acces token not valid, please log-in", - ) + )""" response = await call_next(request)