Налаштовуємо кастомну JWT-авторизацію на FastAPI
Усім привіт! Мене звати Владислав Мусаєлян, я Back-end Engineer у Renesandro — платформі для автоматизації створення реклами, її запуску та масштабування за допомогою штучного інтелекту.
Авторизація користувачів відіграє ключову роль у забезпеченні захищеного доступу до ресурсів. Один з найпопулярніших способів керування доступом — це використання JSON Web Token (JWT). FastAPI — досить новий фреймворк для Python, тому він ще не має такої великої кількості бібліотек, як його аналоги — Django чи Flask.
У цій статті я розгляну, як налаштувати власну систему авторизації, використовуючи JWT у поєднанні з потужним і гнучким FastAPI. Зосереджусь на реальних прикладах з нашої платформи та практичних порадах, які допоможуть вам не лише зрозуміти основи, а й впровадити їх у свій проєкт.
Структура проєкта
├── migrations ├── src │ ├── auth │ │ ├── __init__.py │ │ ├── config.py │ │ ├── dals.py │ │ ├── hashing.py │ │ ├── models.py │ │ ├── routers.py │ │ ├── schemas.py │ │ ├── services.py │ │ ├── database.py │ │ ├── exceptions.py │ │ └── main.py ├── tests │ ├── conftest.py │ ├── __init__.py │ └── auth │ ├── __init__.py │ ├── test_routers.py │ └── utils.py ├── pyproject.toml ├── Makefile ├── docker-compose.yml ├── alembic.ini ├── Dockerfile ├── requirements.txt
Почнемо з файлу конфігурації, у ньому ми зберігатимемо налаштування для підключення до бази даних, тестової
бази даних, а також секрети проєкту.
src/config.py
import os from dotenv import load_dotenv from pydantic import PostgresDsn load_dotenv() DEBUG: int = os.getenv("DEBUG", 1) SECRET_KEY: str = os.getenv("SECRET_KEY", "secret_key") POSTGRES_USER: str = os.getenv("POSTGRES_USER", default="postgres") POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", default="postgres") DB_HOST: str = os.getenv("DB_HOST", default="0.0.0.0") DB_PORT: int = os.getenv("DB_PORT", default=5432) DB_NAME: str = os.getenv("POSTGRES_DB", default="db") DATABASE_URL: PostgresDsn = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" POSTGRES_TEST_USER: str = os.getenv("POSTGRES_TEST_USER", default="postgres_test") POSTGRES_TEST_PASSWORD: str = os.getenv("POSTGRES_TEST_PASSWORD", default="postgres_test") DB_TEST_PORT: int = os.getenv("DB_TEST_PORT", default=5433) DB_TEST_HOST: str = os.getenv("DB_TEST_HOST", default="0.0.0.0") DB_TEST_NAME: str = os.getenv("POSTGRES_DB_TEST", default="db_test") DATABASE_TEST_ASYNC_URL: PostgresDsn = f"postgresql+asyncpg://{POSTGRES_TEST_USER}:{POSTGRES_TEST_PASSWORD}@{DB_TEST_HOST}:{DB_TEST_PORT}/{DB_TEST_NAME}" DATABASE_TEST_SYNC_URL: PostgresDsn = f"postgresql://{POSTGRES_TEST_USER}:{POSTGRES_TEST_PASSWORD}@{DB_TEST_HOST}:{DB_TEST_PORT}/{DB_TEST_NAME}"
Та файл конфігурації для JWT-змінних:
src/auth/config.pyimport os from dotenv import load_dotenv load_dotenv() ALGORITHM: str = os.getenv("ALGORITHM", "HS256") ACCESS_TOKEN_EXPIRE_MINUTES: int = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30) REFRESH_TOKEN_EXPIRE_DAYS: int = os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 30)
Трохи теорії
Access Token — це короткостроковий токен, який використовується для доступу до захищених ресурсів. Він має обмежений термін дії, зазвичай від кількох хвилин до години, після чого потребує оновлення.
Refresh Token — це довгостроковий токен, який дозволяє отримувати новий access token без повторної авторизації користувача. Він використовується для того, щоб продовжити сесію користувача після того, як access token спливе.
У прикладі ми використовували оптимальні значення, які можуть відрізнятися залежно від проєкту.
Моделі
Далі опишемо нашу модель користувача, а також додамо таблицю для зберігання токенів, які внесені до чорного списку (TokenBlacklist). Вона потрібна для реалізації логіки виходу з системи (logout). Для цього використовуватимемо бібліотеку SQLAlchemy для роботи з базою даних та Alembic для міграцій.
import uuid from sqlalchemy import Boolean, Column, DateTime, String, func, select from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.ext.asyncio import AsyncSession from src.database import Base class User(Base): """User model representing users in the application.""" __tablename__ = "user" user_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) user_name = Column(String, unique=True, nullable=False) email = Column(String, unique=True, index=True, nullable=False) hashed_password = Column(String, nullable=False) is_active = Column(Boolean(), default=True) is_superuser = Column(Boolean(), default=False) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) class TokenBlacklist(Base): """Token blacklist model representing tokens in the application.""" __tablename__ = "token_blacklist" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) token = Column(String, primary_key=True, index=True) blacklisted_on = Column(DateTime, default=func.now()) @classmethod async def find_by_id(cls, db: AsyncSession, id: UUID): result = await db.execute(select(cls).where(cls.id == id)) return result.scalars().first()
Тепер виникає питання: як правильно зберігати паролі в базі даних? Для цього створимо сервіс для кодування та перевірки паролів, будемо використовувати бібліотеки passlib.
from passlib.context import CryptContext bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class Hasher: @staticmethod def verify_password(plain_password: str, hashed_password: str): return bcrypt_context.verify(plain_password, hashed_password) @staticmethod def get_password_hash(password: str) -> str: return bcrypt_context.hash(password)
Організація доступу до БД та Сервісна логіка
У файлі dals.py (дивіться на GitHub-репозиторії) описано клас із методами для взаємодії з базою даних, а у файлі services.py додано основні методи для генерації токенів. Розглянемо на прикладі функцію, яка генерує наші токени.
src/auth/services.pydef create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode.update({EXP: expire}) encoded_jwt = jwt.encode( to_encode, SECRET_KEY, algorithm=ALGORITHM ) return encoded_jwt def create_refresh_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta( days=REFRESH_TOKEN_EXPIRE_DAYS ) to_encode.update({EXP: expire}) encoded_jwt = jwt.encode( to_encode, SECRET_KEY, algorithm=ALGORITHM ) return encoded_jwt
data — дані, які ми хочемо зашифрувати у наш JWT-токен, туди не можна передавати sensetive інформацію, різні
ключі, паролі. Як приклад, можна передавати ID юзера.
Розглянемо, як перевіряти токени, витягувати з них інформацію та оновлювати їх.
src/auth/services.pyasync def decode_access_token(token: str, db: AsyncSession): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) black_list_token = await TokenBlacklist.find_by_id(db=db, id=payload[JTI]) if black_list_token: raise JWTError("Token is blacklisted") except JWTError: raise AuthFailedException() return payload def refresh_token_state(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError as ex: raise AuthFailedException() return TokenPair( access=create_access_token(data={**payload}), refresh=create_refresh_token(data={**payload}), )
Через функцію jwt.decode() декодуємо наш токен та отримуємо з нього нашу інформацію.
Views
Тепер можемо створити ендпоїнти для отримання токену та його оновлення.
src/auth/routers.py@auth_router.post("/login", status_code=status.HTTP_200_OK, response_model=TokenPair) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db) ): user = await authenticate_user(db, form_data.username, form_data.password) if not user: raise AuthFailedException() return create_token_pair(user=user) @auth_router.post("/refresh", status_code=status.HTTP_200_OK, response_model=TokenPair) async def refresh_token(refresh: Annotated[str, None] = None): if not refresh: raise BadRequestException(detail="refresh token required") return refresh_token_state(token=refresh)
Наступним кроком необхідно додати авторизацію до наших наступних роутів. Зробимо це таким чином: створимо функцію, яку будемо використовувати в якості Dependency Injection для валідації токена.
src/auth/services.py
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> User: token_data = await decode_access_token(token, db=db) if token_data is None: raise AuthFailedException() user_dal = UserDAL(db) user = await user_dal.get_user_by_id(uuid.UUID(token_data[SUB])) if user is None: raise AuthFailedException() return user
І далі використовуємо це у нашому view:
@auth_router.post("/example-endpoint", status_code=status.HTTP_200_OK) async def example_endpoint( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ):
Висновки
Ми створили власний сервіс авторизації на FastAPI, який досить лекго можна кастомізувати під різні потреби.Ось посилання на репозиторій, де також ще є логіки logout, змінення пароля, тести.
Якщо є питання, або ідеї, як можна покращити представлене рішення — пишіть, буду радий.
8 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів