Added verification od 2FA code

main
Jan Beníček 2024-11-04 17:21:17 +01:00
parent b69c123949
commit 70ae68b951
4 changed files with 73 additions and 19 deletions

View File

@ -1,4 +1,4 @@
from sqlalchemy import Column, String, Integer, ForeignKey, Boolean, DateTime from sqlalchemy import Column, String, Integer, ForeignKey, Boolean, DateTime, Float
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from src.base.database import Base from src.base.database import Base
@ -9,8 +9,19 @@ class Auth_token_valid(Base):
user_id = Column(Integer, ForeignKey("users.id"), index=True) user_id = Column(Integer, ForeignKey("users.id"), index=True)
jti = Column(String, index=True, unique=True) jti = Column(String, index=True, unique=True)
expire = Column(Integer) expire = Column(Float)
access = Column(Boolean) access = Column(Boolean)
user = relationship("User", back_populates="auth_token_valid") user = relationship("User", back_populates="auth_token_valid")
class Auth_2fa_code(Base):
__tablename__ = "auth_2fa_code"
user_id = Column(Integer, ForeignKey("users.id"), index=True)
otp_code = Column(String)
jti = Column(String, index=True)
expire = Column(Float)
user = relationship("User", back_populates="auth_2fa_pendings")

View File

@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends, status, HTTPException, Response, Request from fastapi import APIRouter, Depends, status, HTTPException, Response, Request
from src.auth.schemas import Login_request, Login_reply, verify_2FA from src.auth.schemas import Login_request, Login_reply, verify_2FA
from src.auth.services import login_user, request_2fa from src.auth.services import login_user, request_2fa, verify_2fa
router = APIRouter(prefix="/auth") router = APIRouter(prefix="/auth")
@ -18,13 +18,16 @@ def login(login_request: Login_request, request: Request, response: Response):
@router.get("/2fa", status_code=status.HTTP_202_ACCEPTED, summary="Verify if required login", description="Verify if refresh token valid and required full login or 2FA only", response_model=Login_reply) @router.get("/2fa", status_code=status.HTTP_202_ACCEPTED, summary="Verify if required login", description="Verify if refresh token valid and required full login or 2FA only", response_model=Login_reply)
def verify_2FA(request: Request, response: Response): def verify_2FA(request: Request, response: Response):
return request_2fa(db=request.state.db,request=request) # Run verification of refresh_token return request_2fa(db=request.state.db,request=request) # Verification of refresh_token
@router.post("/2fa", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply) @router.post("/2fa", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply)
def verify_2FA_code(code: verify_2FA, request: Request, response: Response): def verify_2FA_code(code: verify_2FA, request: Request, response: Response):
return "/api/auth/2FA" if code == None or code.auth_code == "":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Error input data")
return verify_2fa(otp_code=code.auth_code, db=request.statue.db, request=request, response=response)

View File

@ -1,6 +1,6 @@
from src.auth.schemas import Login_request, Login_reply, verify_2FA from src.auth.schemas import Login_request, Login_reply, verify_2FA
from src.user.models import User from src.user.models import User
from src.auth.models import Auth_token_valid from src.auth.models import Auth_token_valid, Auth_2fa_code
from src.auth.pwd import password_hash, password_verify from src.auth.pwd import password_hash, password_verify
from src.auth.static import Types_2FA from src.auth.static import Types_2FA
from src.base.jwt import jwt_encode, jwt_decode, jwt_verify_by_jti from src.base.jwt import jwt_encode, jwt_decode, jwt_verify_by_jti
@ -18,9 +18,22 @@ def _return_2fa_request(db: Session, user: User) -> Login_reply:
case Types_2FA.TOTP: case Types_2FA.TOTP:
return Login_reply(logged_in=True, requested_2FA=True, type_2FA=Types_2FA.TOTP) return Login_reply(logged_in=True, requested_2FA=True, type_2FA=Types_2FA.TOTP)
case _: case _:
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.NONE) return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.NONE) # Temporary
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.NONE) return Login_reply(logged_in=False, requested_2FA=False, type_2FA=Types_2FA.NONE)
def _set_acces_cookie(db: Session, user: User, response: Response):
payload = {
"user_id": user.id,
"username": user.username,
"exp": (datetime.datetime.utcnow() + datetime.timedelta(minutes=float(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))).timestamp(),
"jti": random_string(length=16),
"access": True
} # generate JWT payload data
db.add(Auth_token_valid(user_id=user.id, jti=payload.get("jti"), expire=payload.get("exp"), access=True)) # save verify JWT token data to DB (if data removed from DB, session is invalidated)
response.set_cookie("acces_token", value=jwt_encode(payload), max_age=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))*60), httponly=True, domain=os.getenv("SSO_BASE_DOMAIN")) # set JWT token to httponly cookie
@ -33,35 +46,61 @@ def login_user(login_data: Login_request, db: Session, response: Response) -> Lo
payload = { payload = {
"user_id": user.id, "user_id": user.id,
"username": user.username, "username": user.username,
"exp": int((datetime.datetime.utcnow() + datetime.timedelta(minutes=float(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")))).timestamp()), "exp": (datetime.datetime.utcnow() + datetime.timedelta(minutes=float(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")))).timestamp(),
"jti": random_string(length=16), "jti": random_string(length=16),
"access": False "access": False
} # generate JWT payload data } # generate JWT payload data
db.add(Auth_token_valid(user_id=user.id, jti=payload.get("jti"), expire=payload.get("exp"), access=False)) # save verify JWT token data to DB (if data removed from DB, session is invalidated) db.add(Auth_token_valid(user_id=user.id, jti=payload.get("jti"), expire=payload.get("exp"), access=False)) # save verify JWT token data to DB (if data removed from DB, session is invalidated)
response.set_cookie("refresh_token", value=jwt_encode(payload), max_age=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))*60), httponly=True, path="/api/auth/2fa") # set JWT token to httponly cookie response.set_cookie("refresh_token", value=jwt_encode(payload), max_age=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))*60), httponly=True, path="/api/auth/2fa") # set JWT token to httponly cookie
return _return_2fa_request(db=db, user=user) return _return_2fa_request(db=db, user=user) # return detail of 2FA
def request_2fa(db: Session, request: Request) -> Login_reply: def request_2fa(db: Session, request: Request) -> Login_reply:
try: try:
refresh_token: dict = jwt_decode(token=request.cookies.get("refresh_token", None)) refresh_token: dict = jwt_decode(token=request.cookies.get("refresh_token", None)) # Load refresh_token cookie
refresh_token_db = db.query(Auth_token_valid).filter(Auth_token_valid.jti == refresh_token.get("jti"), Auth_token_valid.user_id == refresh_token.get("user_id")).first() refresh_token_db = db.query(Auth_token_valid).filter(Auth_token_valid.jti == refresh_token.get("jti"), Auth_token_valid.user_id == refresh_token.get("user_id")).first() # Load refresh_token validation data from DB
if refresh_token_db.expire < datetime.datetime.utcnow().timestamp():
db.delete(refresh_token_db)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If token expired remove them from DB and return unauthorized
if refresh_token_db.access != refresh_token.get("access"): if refresh_token_db.access != refresh_token.get("access"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If token not be a refresh_token return unauthorized
return _return_2fa_request(db=db, user=refresh_token_db.user) # return detail of 2FA
return _return_2fa_request(db=db, user=refresh_token_db.user)
except: except:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized") raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If error return unauthorized
def verify_2fa(otp_code: str, db: Session, request: Request, response: Response) -> Login_reply:
try:
refresh_token: dict = jwt_decode(token=request.cookies.get("refresh_token", None)) # Load refresh_token cookie
refresh_token_db = db.query(Auth_token_valid).filter(Auth_token_valid.jti == refresh_token.get("jti"), Auth_token_valid.user_id == refresh_token.get("user_id")).first() # Load refresh_token validation data from DB
if refresh_token_db.expire < datetime.datetime.utcnow().timestamp():
db.delete(refresh_token_db)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If token expired remove them from DB and return unauthorized
if refresh_token_db.access != refresh_token.get("access"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If token not be a refresh_token return unauthorized
match refresh_token_db.user.type_2fa:
case Types_2FA.TOTP:
if totp_verify(totp_secret=refresh_token_db.user.totp_secret, totp_code=otp_code):
_set_acces_cookie(db=db, user=refresh_token_db.user, response=response)
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.NONE)
case _:
otp_db = db.query(Auth_2fa_code).filter(Auth_2fa_code.otp_code == otp_code, Auth_2fa_code.jti == refresh_token_db.jti, Auth_2fa_code.user_id == refresh_token_db.user_id).first()
if otp_db != None and otp_db.expire > datetime.datetime.utcnow().timestamp():
_set_acces_cookie(db=db, user=refresh_token_db.user, response=response)
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.NONE)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="2FA code no match")
except:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized") # If error return unauthorized

View File

@ -20,3 +20,4 @@ class User(Base):
totp_secret = Column(String) totp_secret = Column(String)
auth_token_valid = relationship("Auth_token_valid", back_populates="user") auth_token_valid = relationship("Auth_token_valid", back_populates="user")
auth_2fa_pendings = relationship("Auth_2fa_code", back_populates="user")