2FA and Logout added
parent
6b74fc5f9f
commit
f7a128e150
13
pdm.lock
13
pdm.lock
|
@ -5,7 +5,7 @@
|
||||||
groups = ["default"]
|
groups = ["default"]
|
||||||
strategy = ["cross_platform", "inherit_metadata"]
|
strategy = ["cross_platform", "inherit_metadata"]
|
||||||
lock_version = "4.4.1"
|
lock_version = "4.4.1"
|
||||||
content_hash = "sha256:b57525e7baee5eff69181af4f35e3137f3f5fea752042dc53c35886d639f35d6"
|
content_hash = "sha256:44570334cf53d86490f0cf7ecef005ebd02a7d89e42b9b865d1fc882b5b113b3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "annotated-types"
|
name = "annotated-types"
|
||||||
|
@ -353,6 +353,17 @@ files = [
|
||||||
{file = "pyjwt-2.9.0.tar.gz", hash = "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c"},
|
{file = "pyjwt-2.9.0.tar.gz", hash = "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c"},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pyotp"
|
||||||
|
version = "2.9.0"
|
||||||
|
requires_python = ">=3.7"
|
||||||
|
summary = "Python One Time Password Library"
|
||||||
|
groups = ["default"]
|
||||||
|
files = [
|
||||||
|
{file = "pyotp-2.9.0-py3-none-any.whl", hash = "sha256:81c2e5865b8ac55e825b0358e496e1d9387c811e85bb40e71a3b29b288963612"},
|
||||||
|
{file = "pyotp-2.9.0.tar.gz", hash = "sha256:346b6642e0dbdde3b4ff5a930b664ca82abfa116356ed48cc42c7d6590d36f63"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "python-dotenv"
|
name = "python-dotenv"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
|
|
|
@ -16,6 +16,7 @@ dependencies = [
|
||||||
"psycopg2>=2.9.9",
|
"psycopg2>=2.9.9",
|
||||||
"passlib>=1.7.4",
|
"passlib>=1.7.4",
|
||||||
"argon2-cffi>=23.1.0",
|
"argon2-cffi>=23.1.0",
|
||||||
|
"pyotp>=2.9.0",
|
||||||
]
|
]
|
||||||
requires-python = ">=3.12"
|
requires-python = ">=3.12"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
from fastapi import APIRouter, Depends, status, HTTPException, Response, Request
|
from fastapi import APIRouter, Depends, status, HTTPException, Response, Request
|
||||||
|
|
||||||
from src.auth.schemas import Login_request
|
from src.auth.schemas import Login_request, Login_reply, verify_2FA
|
||||||
|
|
||||||
|
from src.auth.services import login_user, verify_2FA_login, logout_user
|
||||||
|
|
||||||
router = APIRouter(prefix="/auth")
|
router = APIRouter(prefix="/auth")
|
||||||
|
|
||||||
|
@ -10,24 +12,33 @@ router = APIRouter(prefix="/auth")
|
||||||
status_code=status.HTTP_202_ACCEPTED,
|
status_code=status.HTTP_202_ACCEPTED,
|
||||||
summary="Login user",
|
summary="Login user",
|
||||||
description="Login user with email and password",
|
description="Login user with email and password",
|
||||||
|
response_model=Login_reply,
|
||||||
)
|
)
|
||||||
def login(login: Login_request, request: Request, response: Response):
|
def login(login_request: Login_request, request: Request, response: Response):
|
||||||
if not login.email.__contains__("@"):
|
if not login_request.email.__contains__("@"):
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_400_BAD_REQUEST, detail="No valid email address"
|
status_code=status.HTTP_400_BAD_REQUEST, detail="No valid email address"
|
||||||
)
|
)
|
||||||
|
|
||||||
if login.password == "" or input.email == "":
|
if login_request.password == "" or input.email == "":
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_400_BAD_REQUEST, detail="Email or password is empty"
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Email or password is empty"
|
||||||
)
|
)
|
||||||
|
|
||||||
return "/api/auth/login"
|
return login_user(login_request=login_request, db=request.state.db, request=request, response=response)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/logout")
|
@router.post("/2FA", status_code=status.HTTP_202_ACCEPTED, summary="Verify 2FA endpoint", description="Verify user by 2FA authentication", response_model=Login_reply)
|
||||||
def logout():
|
def verify_2FA_auth(code: verify_2FA, request: Request, response: Response):
|
||||||
return "/api/auth/logout"
|
if code.auth_code is None or code.auth_code == "":
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad input data")
|
||||||
|
|
||||||
|
return verify_2FA_login(code=code, db=request.state.db, request=request, response=response)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/logout", status_code=status.HTTP_200_OK, summary="logout user", description="Log out user", response_model=Login_reply)
|
||||||
|
def logout(request: Request, response: Response):
|
||||||
|
return logout_user(db=request.state.db, request=request, response=response)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/register")
|
@router.post("/register")
|
||||||
|
|
|
@ -10,3 +10,7 @@ class Login_reply(BaseModel):
|
||||||
logged_in: bool
|
logged_in: bool
|
||||||
requested_2FA: bool
|
requested_2FA: bool
|
||||||
type_2FA: str
|
type_2FA: str
|
||||||
|
|
||||||
|
|
||||||
|
class verify_2FA(BaseModel):
|
||||||
|
auth_code: str
|
|
@ -1,12 +1,14 @@
|
||||||
from src.auth.schemas import Login_request, Login_reply
|
from src.auth.schemas import Login_request, Login_reply, verify_2FA
|
||||||
from src.user.models import User
|
from src.user.models import User
|
||||||
from src.auth.models import Auth_2fa_pending, Auth_token_valid
|
from src.auth.models import Auth_2fa_pending, Auth_token_valid
|
||||||
from src.auth.pwd import password_hash, password_verify
|
from src.auth.pwd import password_hash, password_verify
|
||||||
from src.auth.static import Types_2FA
|
from src.auth.static import Types_2FA
|
||||||
from src.base.jwt import jwt_encode, jwt_decode
|
from src.base.jwt import jwt_encode, jwt_decode
|
||||||
from src.base.random_string import random_string
|
from src.base.random_string import random_string
|
||||||
|
from src.base.totp import totp_generate_secret, totp_verify
|
||||||
from fastapi import Response, status, HTTPException, Request
|
from fastapi import Response, status, HTTPException, Request
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
from sqlalchemy import and_
|
||||||
import datetime, os
|
import datetime, os
|
||||||
|
|
||||||
|
|
||||||
|
@ -14,60 +16,103 @@ def __generate_2FA_pending_token(
|
||||||
user: User, db: Session, response: Response, code: str = ""
|
user: User, db: Session, response: Response, code: str = ""
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Create 2FA_token cookie and save validate data to DB (2FA pending)"""
|
"""Create 2FA_token cookie and save validate data to DB (2FA pending)"""
|
||||||
data = {}
|
try:
|
||||||
data["sub"] = user.id
|
data = {}
|
||||||
data["exp"] = (
|
data["sub"] = user.id
|
||||||
datetime.datetime.now()
|
data["exp"] = (
|
||||||
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES")))
|
datetime.datetime.now()
|
||||||
).timestamp()
|
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES")))
|
||||||
|
).timestamp()
|
||||||
|
|
||||||
jwt_token = jwt_encode(data=data)
|
jwt_token = jwt_encode(data=data)
|
||||||
|
|
||||||
response.set_cookie(
|
response.set_cookie(
|
||||||
key="2FA_token_temp",
|
key="2FA_token_temp",
|
||||||
value=jwt_token,
|
value=jwt_token,
|
||||||
httponly=True,
|
httponly=True,
|
||||||
expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60),
|
expires=(int(os.getenv("LOGIN_2FA_PENDING_MINUTES")) * 60),
|
||||||
)
|
|
||||||
db.add(
|
|
||||||
Auth_2fa_pending(
|
|
||||||
user=user,
|
|
||||||
token=jwt_token,
|
|
||||||
code=code,
|
|
||||||
expire=datetime.datetime.now()
|
|
||||||
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))),
|
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
db.add(
|
||||||
|
Auth_2fa_pending(
|
||||||
|
user=user,
|
||||||
|
token=jwt_token,
|
||||||
|
code=code,
|
||||||
|
expire=datetime.datetime.now()
|
||||||
|
+ datetime.timedelta(minutes=int(os.getenv("LOGIN_2FA_PENDING_MINUTES"))),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating cookie or DB connection")
|
||||||
|
|
||||||
|
def __remove_2FA_pending_token(pending: Auth_2fa_pending, db: Session, response: Response) -> bool:
|
||||||
|
db.query(Auth_2fa_pending).filter(Auth_2fa_pending.id == pending.id).delete()
|
||||||
|
response.delete_cookie("2FA_token_temp")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def __generate_acces_token(user: User, db: Session, response: Response) -> bool:
|
def __generate_acces_token(user: User, db: Session, response: Response) -> bool:
|
||||||
"""Create acces_token cookie and save validate data to DB (login)"""
|
"""Create acces_token cookie and save validate data to DB (login)"""
|
||||||
data = {}
|
try:
|
||||||
data["sub"] = user.id
|
data = {}
|
||||||
data["exp"] = (
|
data["sub"] = user.id
|
||||||
datetime.datetime.now()
|
data["exp"] = (
|
||||||
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))
|
datetime.datetime.now()
|
||||||
).timestamp()
|
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")))
|
||||||
data["rand"] = random_string(8)
|
).timestamp()
|
||||||
response.set_cookie(
|
data["rand"] = random_string(8)
|
||||||
key="acces_token",
|
response.set_cookie(
|
||||||
value=jwt_encode(data=data),
|
key="acces_token",
|
||||||
httponly=True,
|
value=jwt_encode(data=data),
|
||||||
expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60),
|
httponly=True,
|
||||||
)
|
expires=(int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")) * 60),
|
||||||
db.add(
|
|
||||||
Auth_token_valid(
|
|
||||||
user=user,
|
|
||||||
random=data["rand"],
|
|
||||||
expire=datetime.datetime.now()
|
|
||||||
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))),
|
|
||||||
)
|
)
|
||||||
)
|
db.add(
|
||||||
|
Auth_token_valid(
|
||||||
|
user=user,
|
||||||
|
random=data["rand"],
|
||||||
|
expire=datetime.datetime.now()
|
||||||
|
+ datetime.timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES"))),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating acces cookie or DB connection")
|
||||||
|
|
||||||
|
|
||||||
def login(
|
def __generate_refresh_token(user: User, db: Session, response: Response) -> bool:
|
||||||
|
"""Create refresh_token cookie and save validate data to DB (login)"""
|
||||||
|
try:
|
||||||
|
data = {}
|
||||||
|
data["sub"] = user.id
|
||||||
|
data["exp"] = (
|
||||||
|
datetime.datetime.now()
|
||||||
|
+ datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")))
|
||||||
|
).timestamp()
|
||||||
|
data["rand"] = random_string(8)
|
||||||
|
response.set_cookie(
|
||||||
|
key="refresh_token",
|
||||||
|
value=jwt_encode(data=data),
|
||||||
|
httponly=True,
|
||||||
|
expires=(int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES")) * 60),
|
||||||
|
)
|
||||||
|
db.add(
|
||||||
|
Auth_token_valid(
|
||||||
|
user=user,
|
||||||
|
random=data["rand"],
|
||||||
|
expire=datetime.datetime.now()
|
||||||
|
+ datetime.timedelta(minutes=int(os.getenv("REFRESH_TOKEN_EXPIRE_MINUTES"))),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except:
|
||||||
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error on creating refresh cookie or DB connection")
|
||||||
|
|
||||||
|
|
||||||
|
def login_user(
|
||||||
form: Login_request, db: Session, response: Response, request: Request
|
form: Login_request, db: Session, response: Response, request: Request
|
||||||
) -> Response:
|
) -> Login_reply:
|
||||||
user: User = (
|
user: User = (
|
||||||
db.query(User).filter(User.email == form.email).first()
|
db.query(User).filter(User.email == form.email).first()
|
||||||
) # search user by email
|
) # search user by email
|
||||||
|
@ -91,7 +136,7 @@ def login(
|
||||||
__generate_acces_token(user=user, db=db, response=response)
|
__generate_acces_token(user=user, db=db, response=response)
|
||||||
return Login_reply(
|
return Login_reply(
|
||||||
logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY
|
logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY
|
||||||
)
|
) #log in without 2FA only if valid refresh_token
|
||||||
|
|
||||||
# match types of 2FA and run specified tasks
|
# match types of 2FA and run specified tasks
|
||||||
match user.type_2fa:
|
match user.type_2fa:
|
||||||
|
@ -105,3 +150,45 @@ def login(
|
||||||
return Login_reply(
|
return Login_reply(
|
||||||
logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY
|
logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY
|
||||||
) # not logged in
|
) # not logged in
|
||||||
|
|
||||||
|
|
||||||
|
def verify_2FA_login(code: verify_2FA, db: Session, request: Request, response: Response) -> Login_reply:
|
||||||
|
jwt_cookie = request.cookies.get("2FA_token_temp", None)
|
||||||
|
|
||||||
|
if jwt_cookie is None:
|
||||||
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized")
|
||||||
|
|
||||||
|
jwt = jwt_decode(jwt_cookie)
|
||||||
|
|
||||||
|
if jwt is None or datetime.datetime.fromtimestamp(jwt["exp"]) < datetime.datetime.now():
|
||||||
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authorized")
|
||||||
|
|
||||||
|
auth_pending = db.query(Auth_2fa_pending).filter(and_(Auth_2fa_pending.token == jwt_cookie, Auth_2fa_pending.user_id == jwt["sub"])).first()
|
||||||
|
|
||||||
|
match auth_pending.user.type_2fa:
|
||||||
|
case Types_2FA.TOTP:
|
||||||
|
if totp_verify(auth_pending.user.totp_secret, code.auth_code):
|
||||||
|
__generate_acces_token(user=auth_pending.user, db=db, response=response)
|
||||||
|
__generate_refresh_token(user=auth_pending.user, db=db, response=response)
|
||||||
|
__remove_2FA_pending_token(pending=auth_pending, db=db, response=response)
|
||||||
|
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
||||||
|
#here add another verify 2FA
|
||||||
|
case _:
|
||||||
|
if auth_pending.code == code.auth_code:
|
||||||
|
__generate_acces_token(user=auth_pending.user, db=db, response=response)
|
||||||
|
__generate_refresh_token(user=auth_pending.user, db=db, response=response)
|
||||||
|
__remove_2FA_pending_token(pending=auth_pending, db=db, response=response)
|
||||||
|
return Login_reply(logged_in=True, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
||||||
|
|
||||||
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad code and cookie combination")
|
||||||
|
|
||||||
|
|
||||||
|
def logout_user(db: Session, request: Request, response:Response) -> Login_reply:
|
||||||
|
acces_token_random = jwt_decode(request.cookies.get("acces_token"))["rand"]
|
||||||
|
refresh_token_random = jwt_decode(request.cookies.get("refresh_token"))["rand"]
|
||||||
|
user_id = jwt_decode(request.cookies.get("refresh_token"))["sub"]
|
||||||
|
response.delete_cookie("refresh_token")
|
||||||
|
response.delete_cookie("acces_token")
|
||||||
|
db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == acces_token_random, Auth_token_valid.user_id == user_id)).delete()
|
||||||
|
db.query(Auth_token_valid).filter(and_(Auth_token_valid.random == refresh_token_random, Auth_token_valid.user_id == user_id)).delete()
|
||||||
|
return Login_reply(logged_in=False, requested_2FA=False, type_2FA=Types_2FA.ANY)
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
import pyotp
|
||||||
|
from fastapi import HTTPException, status
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def totp_verify(totp_secret: str, totp_code: str) -> bool:
|
||||||
|
totp = pyotp.TOTP(totp_secret)
|
||||||
|
|
||||||
|
if totp.verify(totp_code):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Bad OTP Code")
|
||||||
|
|
||||||
|
|
||||||
|
def totp_generate_secret():
|
||||||
|
return pyotp.random_base32(length=64)
|
|
@ -46,4 +46,8 @@ class BaseMiddleware(BaseHTTPMiddleware):
|
||||||
detail="Acces token not valid, please log-in",
|
detail="Acces token not valid, please log-in",
|
||||||
)
|
)
|
||||||
|
|
||||||
return await call_next(request)
|
response = await call_next(request)
|
||||||
|
|
||||||
|
request.state.db.commit()
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
Loading…
Reference in New Issue