rewrite clearing 2
parent
b68467d434
commit
f53ae6bf26
|
@ -9,8 +9,9 @@ class Auth_token_valid(Base):
|
|||
|
||||
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
||||
user_id = Column(Integer, ForeignKey("users.id"), index=True)
|
||||
random = Column(String)
|
||||
random = Column(String, index=True, unique=True)
|
||||
expire = Column(DateTime)
|
||||
acces = Column(Boolean)
|
||||
|
||||
user = relationship("User", back_populates="auth_token_valid")
|
||||
|
||||
|
|
|
@ -2,8 +2,6 @@ from fastapi import APIRouter, Depends, status, HTTPException, Response, Request
|
|||
|
||||
from src.auth.schemas import Login_request, Login_reply, verify_2FA
|
||||
|
||||
from src.auth.services import login_user, verify_2FA_login, logout_user
|
||||
|
||||
router = APIRouter(prefix="/auth")
|
||||
|
||||
|
||||
|
@ -15,30 +13,17 @@ router = APIRouter(prefix="/auth")
|
|||
response_model=Login_reply,
|
||||
)
|
||||
def login(login_request: Login_request, request: Request, response: Response):
|
||||
if not login_request.email.__contains__("@"):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST, detail="No valid email address"
|
||||
)
|
||||
|
||||
if login_request.password == "" or input.email == "":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST, detail="Email or password is empty"
|
||||
)
|
||||
|
||||
return login_user(login_request=login_request, db=request.state.db, request=request, response=response)
|
||||
return "/api/auth/login"
|
||||
|
||||
|
||||
@router.post("/2FA", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply)
|
||||
def verify_2FA_auth(code: verify_2FA, request: Request, response: Response):
|
||||
if code.auth_code is None or code.auth_code == "":
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad input data")
|
||||
|
||||
return verify_2FA_login(code=code, db=request.state.db, request=request, response=response)
|
||||
return "/api/auth/2FA"
|
||||
|
||||
|
||||
@router.get("/logout", status_code=status.HTTP_200_OK, summary="logout user", description="Log out user", response_model=Login_reply)
|
||||
def logout(request: Request, response: Response):
|
||||
return logout_user(db=request.state.db, request=request, response=response)
|
||||
return "/api/auth/logout"
|
||||
|
||||
|
||||
@router.post("/register")
|
||||
|
|
|
@ -1,194 +0,0 @@
|
|||
from src.auth.schemas import Login_request, Login_reply, verify_2FA
|
||||
from src.user.models import User
|
||||
from src.auth.models import Auth_2fa_pending, Auth_token_valid
|
||||
from src.auth.pwd import password_hash, password_verify
|
||||
from src.auth.static import Types_2FA
|
||||
from src.base.jwt import jwt_encode, jwt_decode
|
||||
from src.base.random_string import random_string
|
||||
from src.base.totp import totp_generate_secret, totp_verify
|
||||
from fastapi import Response, status, HTTPException, Request
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
import datetime, os
|
||||
|
||||
|
||||
def __generate_2FA_pending_token(
|
||||
user: User, db: Session, response: Response, code: str = ""
|
||||
) -> bool:
|
||||
"""Create 2FA_token cookie and save validate data to DB (2FA pending)"""
|
||||
try:
|
||||
data = {}
|
||||
data["sub"] = user.id
|
||||
data["exp"] = (
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES")))
|
||||
).timestamp()
|
||||
|
||||
jwt_token = jwt_encode(data=data)
|
||||
|
||||
response.set_cookie(
|
||||
key="2FA_token_temp",
|
||||
value=jwt_token,
|
||||
httponly=True,
|
||||
expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60),
|
||||
)
|
||||
|
||||
db.add(
|
||||
Auth_2fa_pending(
|
||||
user=user,
|
||||
token=jwt_token,
|
||||
code=code,
|
||||
expire=datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))),
|
||||
)
|
||||
)
|
||||
return True
|
||||
except:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating cookie or DB connection")
|
||||
|
||||
def __remove_2FA_pending_token(pending: Auth_2fa_pending, db: Session, response: Response) -> bool:
|
||||
db.query(Auth_2fa_pending).filter(Auth_2fa_pending.id == pending.id).delete()
|
||||
response.delete_cookie("2FA_token_temp")
|
||||
return True
|
||||
|
||||
|
||||
def __generate_acces_token(user: User, db: Session, response: Response) -> bool:
|
||||
"""Create acces_token cookie and save validate data to DB (login)"""
|
||||
try:
|
||||
data = {}
|
||||
data["sub"] = user.id
|
||||
data["exp"] = (
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))
|
||||
).timestamp()
|
||||
data["rand"] = random_string(8)
|
||||
response.set_cookie(
|
||||
key="acces_token",
|
||||
value=jwt_encode(data=data),
|
||||
httponly=True,
|
||||
expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60),
|
||||
)
|
||||
db.add(
|
||||
Auth_token_valid(
|
||||
user=user,
|
||||
random=data["rand"],
|
||||
expire=datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))),
|
||||
)
|
||||
)
|
||||
return True
|
||||
except:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating acces cookie or DB connection")
|
||||
|
||||
|
||||
def __generate_refresh_token(user: User, db: Session, response: Response) -> bool:
|
||||
"""Create refresh_token cookie and save validate data to DB (login)"""
|
||||
try:
|
||||
data = {}
|
||||
data["sub"] = user.id
|
||||
data["exp"] = (
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")))
|
||||
).timestamp()
|
||||
data["rand"] = random_string(8)
|
||||
response.set_cookie(
|
||||
key="refresh_token",
|
||||
value=jwt_encode(data=data),
|
||||
httponly=True,
|
||||
expires=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")) * 60),
|
||||
)
|
||||
db.add(
|
||||
Auth_token_valid(
|
||||
user=user,
|
||||
random=data["rand"],
|
||||
expire=datetime.datetime.now()
|
||||
+ datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))),
|
||||
)
|
||||
)
|
||||
return True
|
||||
except:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating refresh cookie or DB connection")
|
||||
|
||||
|
||||
def login_user(
|
||||
form: Login_request, db: Session, response: Response, request: Request
|
||||
) -> Login_reply:
|
||||
user: User = (
|
||||
db.query(User).filter(User.email == form.email).first()
|
||||
) # search user by email
|
||||
|
||||
if user == None or not password_verify(
|
||||
password_plain=form.password, password_hashed=user.password
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Bad email or password"
|
||||
) # if user not exist or received bad password return 403 error
|
||||
|
||||
refresh_token = request.cookies.get("refresh_token", None)
|
||||
if refresh_token:
|
||||
refresh_token_decoded = jwt_decode(refresh_token)
|
||||
if (
|
||||
refresh_token_decoded != None
|
||||
and refresh_token_decoded["sub"] == user.id
|
||||
and datetime.datetime.fromtimestamp(refresh_token_decoded["exp"])
|
||||
> datetime.datetime.now()
|
||||
):
|
||||
__generate_acces_token(user=user, db=db, response=response)
|
||||
return Login_reply(
|
||||
logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY
|
||||
) #log in without 2FA only if valid refresh_token
|
||||
|
||||
# match types of 2FA and run specified tasks
|
||||
match user.type_2fa:
|
||||
case Types_2FA.TOTP:
|
||||
__generate_2FA_pending_token(user=user, db=db, response=response)
|
||||
return Login_reply(
|
||||
logged_in=False, requested_2FA=True, type_2FA=Types_2FA.TOTP
|
||||
)
|
||||
# here add more supported 2FA methods
|
||||
|
||||
return Login_reply(
|
||||
logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY
|
||||
) # not logged in
|
||||
|
||||
|
||||
def verify_2FA_login(code: verify_2FA, db: Session, request: Request, response: Response) -> Login_reply:
|
||||
jwt_cookie = request.cookies.get("2FA_token_temp", None)
|
||||
|
||||
if jwt_cookie is None:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized")
|
||||
|
||||
jwt = jwt_decode(jwt_cookie)
|
||||
|
||||
if jwt is None or datetime.datetime.fromtimestamp(jwt["exp"]) < datetime.datetime.now():
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized")
|
||||
|
||||
auth_pending = db.query(Auth_2fa_pending).filter(and_(Auth_2fa_pending.token == jwt_cookie, Auth_2fa_pending.user_id == jwt["sub"])).first()
|
||||
|
||||
match auth_pending.user.type_2fa:
|
||||
case Types_2FA.TOTP:
|
||||
if totp_verify(auth_pending.user.totp_secret, code.auth_code):
|
||||
__generate_acces_token(user=auth_pending.user, db=db, response=response)
|
||||
__generate_refresh_token(user=auth_pending.user, db=db, response=response)
|
||||
__remove_2FA_pending_token(pending=auth_pending, db=db, response=response)
|
||||
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
||||
#here add another verify 2FA
|
||||
case _:
|
||||
if auth_pending.code == code.auth_code:
|
||||
__generate_acces_token(user=auth_pending.user, db=db, response=response)
|
||||
__generate_refresh_token(user=auth_pending.user, db=db, response=response)
|
||||
__remove_2FA_pending_token(pending=auth_pending, db=db, response=response)
|
||||
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
||||
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad code and cookie combination")
|
||||
|
||||
|
||||
def logout_user(db: Session, request: Request, response:Response) -> Login_reply:
|
||||
acces_token_random = jwt_decode(request.cookies.get("acces_token"))["rand"]
|
||||
refresh_token_random = jwt_decode(request.cookies.get("refresh_token"))["rand"]
|
||||
user_id = jwt_decode(request.cookies.get("refresh_token"))["sub"]
|
||||
response.delete_cookie("refresh_token")
|
||||
response.delete_cookie("acces_token")
|
||||
db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == acces_token_random, Auth_token_valid.user_id == user_id)).delete()
|
||||
db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == refresh_token_random, Auth_token_valid.user_id == user_id)).delete()
|
||||
return Login_reply(logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
|
@ -2,6 +2,6 @@ from enum import Enum
|
|||
|
||||
|
||||
class Types_2FA(Enum):
|
||||
ANY = "ANY"
|
||||
NONE = "NONE"
|
||||
TOTP = "TOTP"
|
||||
EMAIL = "EMAIL"
|
||||
|
|
|
@ -3,7 +3,7 @@ from src.middleware.baseMiddleware import BaseMiddleware
|
|||
|
||||
from src.base.database import engine, Base
|
||||
from src.user.models import User
|
||||
from src.auth.models import Auth_2fa_pending, Auth_token_valid
|
||||
from src.auth.models import Auth_token_valid
|
||||
|
||||
from src.auth.router import router as authRouter
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ class BaseMiddleware(BaseHTTPMiddleware):
|
|||
detail="Cannot connect to database",
|
||||
)
|
||||
|
||||
acces_cookie = request.cookies.get("acces_token", None)
|
||||
"""acces_cookie = request.cookies.get("acces_token", None)
|
||||
if acces_cookie != None:
|
||||
acces_cookie_decoded = jwt_decode(acces_cookie)
|
||||
|
||||
|
@ -44,7 +44,7 @@ class BaseMiddleware(BaseHTTPMiddleware):
|
|||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Acces token not valid, please log-in",
|
||||
)
|
||||
)"""
|
||||
|
||||
response = await call_next(request)
|
||||
|
||||
|
|
Loading…
Reference in New Issue