Налаштовуємо кастомну JWT-авторизацію на FastAPI

Усім привіт! Мене звати Владислав Мусаєлян, я Back-end Engineer у Renesandro — платформі для автоматизації створення реклами, її запуску та масштабування за допомогою штучного інтелекту.

Авторизація користувачів відіграє ключову роль у забезпеченні захищеного доступу до ресурсів. Один з найпопулярніших способів керування доступом — це використання JSON Web Token (JWT). FastAPI — досить новий фреймворк для Python, тому він ще не має такої великої кількості бібліотек, як його аналоги — Django чи Flask.

У цій статті я розгляну, як налаштувати власну систему авторизації, використовуючи JWT у поєднанні з потужним і гнучким FastAPI. Зосереджусь на реальних прикладах з нашої платформи та практичних порадах, які допоможуть вам не лише зрозуміти основи, а й впровадити їх у свій проєкт.

Структура проєкта

├── migrations
├── src
│   ├── auth
│   │   ├── __init__.py
│   │   ├── config.py
│   │   ├── dals.py
│   │   ├── hashing.py
│   │   ├── models.py
│   │   ├── routers.py
│   │   ├── schemas.py
│   │   ├── services.py
│   │   ├── database.py
│   │   ├── exceptions.py
│   │   └── main.py
├── tests
│   ├── conftest.py
│   ├── __init__.py
│   └── auth
│       ├── __init__.py
│       ├── test_routers.py
│       └── utils.py
├── pyproject.toml
├── Makefile
├── docker-compose.yml
├── alembic.ini
├── Dockerfile
├── requirements.txt

Почнемо з файлу конфігурації, у ньому ми зберігатимемо налаштування для підключення до бази даних, тестової
бази даних, а також секрети проєкту.

src/config.py


import os
from dotenv import load_dotenv

from pydantic import PostgresDsn


load_dotenv()

DEBUG: int = os.getenv("DEBUG", 1)
SECRET_KEY: str = os.getenv("SECRET_KEY", "secret_key")

POSTGRES_USER: str = os.getenv("POSTGRES_USER", default="postgres")
POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", default="postgres")
DB_HOST: str = os.getenv("DB_HOST", default="0.0.0.0")
DB_PORT: int = os.getenv("DB_PORT", default=5432)
DB_NAME: str = os.getenv("POSTGRES_DB", default="db")

DATABASE_URL: PostgresDsn = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

POSTGRES_TEST_USER: str = os.getenv("POSTGRES_TEST_USER", default="postgres_test")
POSTGRES_TEST_PASSWORD: str = os.getenv("POSTGRES_TEST_PASSWORD", default="postgres_test")
DB_TEST_PORT: int = os.getenv("DB_TEST_PORT", default=5433)
DB_TEST_HOST: str = os.getenv("DB_TEST_HOST", default="0.0.0.0")
DB_TEST_NAME: str = os.getenv("POSTGRES_DB_TEST", default="db_test")

DATABASE_TEST_ASYNC_URL: PostgresDsn = f"postgresql+asyncpg://{POSTGRES_TEST_USER}:{POSTGRES_TEST_PASSWORD}@{DB_TEST_HOST}:{DB_TEST_PORT}/{DB_TEST_NAME}"
DATABASE_TEST_SYNC_URL: PostgresDsn = f"postgresql://{POSTGRES_TEST_USER}:{POSTGRES_TEST_PASSWORD}@{DB_TEST_HOST}:{DB_TEST_PORT}/{DB_TEST_NAME}"

Та файл конфігурації для JWT-змінних:

src/auth/config.py

import os

from dotenv import load_dotenv

load_dotenv()

ALGORITHM: str = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES: int = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30)
REFRESH_TOKEN_EXPIRE_DAYS: int = os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 30)

Трохи теорії

Access Token — це короткостроковий токен, який використовується для доступу до захищених ресурсів. Він має обмежений термін дії, зазвичай від кількох хвилин до години, після чого потребує оновлення.

Refresh Token — це довгостроковий токен, який дозволяє отримувати новий access token без повторної авторизації користувача. Він використовується для того, щоб продовжити сесію користувача після того, як access token спливе.

У прикладі ми використовували оптимальні значення, які можуть відрізнятися залежно від проєкту.

Моделі

Далі опишемо нашу модель користувача, а також додамо таблицю для зберігання токенів, які внесені до чорного списку (TokenBlacklist). Вона потрібна для реалізації логіки виходу з системи (logout). Для цього використовуватимемо бібліотеку SQLAlchemy для роботи з базою даних та Alembic для міграцій.

src/auth/models.py

import uuid

from sqlalchemy import Boolean, Column, DateTime, String, func, select
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.asyncio import AsyncSession

from src.database import Base


class User(Base):
    """User model representing users in the application."""
    __tablename__ = "user"

    user_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
    user_name = Column(String, unique=True, nullable=False)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean(), default=True)
    is_superuser = Column(Boolean(), default=False)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())


class TokenBlacklist(Base):
    """Token blacklist model representing tokens in the application."""
    __tablename__ = "token_blacklist"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
    token = Column(String, primary_key=True, index=True)
    blacklisted_on = Column(DateTime, default=func.now())

    @classmethod
    async def find_by_id(cls, db: AsyncSession, id: UUID):
        result = await db.execute(select(cls).where(cls.id == id))
        return result.scalars().first()


Тепер виникає питання: як правильно зберігати паролі в базі даних? Для цього створимо сервіс для кодування та перевірки паролів, будемо використовувати бібліотеки passlib.

src/auth/hashing.py

from passlib.context import CryptContext

bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class Hasher:
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str):
        return bcrypt_context.verify(plain_password, hashed_password)

    @staticmethod
    def get_password_hash(password: str) -> str:
        return bcrypt_context.hash(password)

Організація доступу до БД та Сервісна логіка

У файлі dals.py (дивіться на GitHub-репозиторії) описано клас із методами для взаємодії з базою даних, а у файлі services.py додано основні методи для генерації токенів. Розглянемо на прикладі функцію, яка генерує наші токени.

src/auth/services.py

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(
            minutes=ACCESS_TOKEN_EXPIRE_MINUTES
        )
    to_encode.update({EXP: expire})
    encoded_jwt = jwt.encode(
        to_encode, SECRET_KEY, algorithm=ALGORITHM
    )
    return encoded_jwt


def create_refresh_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(
        days=REFRESH_TOKEN_EXPIRE_DAYS
    )

    to_encode.update({EXP: expire})
    encoded_jwt = jwt.encode(
        to_encode, SECRET_KEY, algorithm=ALGORITHM
    )
    return encoded_jwt

data — дані, які ми хочемо зашифрувати у наш JWT-токен, туди не можна передавати sensetive інформацію, різні
ключі, паролі. Як приклад, можна передавати ID юзера.

Розглянемо, як перевіряти токени, витягувати з них інформацію та оновлювати їх.

src/auth/services.py

async def decode_access_token(token: str, db: AsyncSession):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        black_list_token = await TokenBlacklist.find_by_id(db=db, id=payload[JTI])
        if black_list_token:
            raise JWTError("Token is blacklisted")
    except JWTError:
        raise AuthFailedException()

    return payload


def refresh_token_state(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError as ex:
        raise AuthFailedException()

    return TokenPair(
        access=create_access_token(data={**payload}),
        refresh=create_refresh_token(data={**payload}),
    )

Через функцію jwt.decode() декодуємо наш токен та отримуємо з нього нашу інформацію.

Views

Тепер можемо створити ендпоїнти для отримання токену та його оновлення.

src/auth/routers.py

@auth_router.post("/login", status_code=status.HTTP_200_OK, response_model=TokenPair)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise AuthFailedException()
    return create_token_pair(user=user)


@auth_router.post("/refresh", status_code=status.HTTP_200_OK, response_model=TokenPair)
async def refresh_token(refresh: Annotated[str, None] = None):
    if not refresh:
        raise BadRequestException(detail="refresh token required")
    return refresh_token_state(token=refresh)

Наступним кроком необхідно додати авторизацію до наших наступних роутів. Зробимо це таким чином: створимо функцію, яку будемо використовувати в якості Dependency Injection для валідації токена.

src/auth/services.py


async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> User:
    token_data = await decode_access_token(token, db=db)
    if token_data is None:
        raise AuthFailedException()
    user_dal = UserDAL(db)
    user = await user_dal.get_user_by_id(uuid.UUID(token_data[SUB]))
    if user is None:
        raise AuthFailedException()
    return user

І далі використовуємо це у нашому view:


@auth_router.post("/example-endpoint", status_code=status.HTTP_200_OK)
async def example_endpoint(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user)
):

Висновки

Ми створили власний сервіс авторизації на FastAPI, який досить лекго можна кастомізувати під різні потреби.
Ось посилання на репозиторій, де також ще є логіки logout, змінення пароля, тести.

Якщо є питання, або ідеї, як можна покращити представлене рішення — пишіть, буду радий.

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

👍ПодобаєтьсяСподобалось14
До обраногоВ обраному7
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

У цілому погоджуюся із попереднім оратором.
Для ознайомлення з технологією JWT все це ок. Але використовувати саморобні велосипеди у царині безпеки наполегливо не раджу. Готових рішень достатньо: вбудовані у хмари, монстроїдальні комерційні, open source. Тоді бекенду треба буде сфокусуватися на валідації токенів і авторизації дій, а не на генерації токенів.

1) паролі без salt
2) обидва токени взагалі не обов’язково мають бути JWT, з таким же успіхом можна генерити випадкові рядки, ну але раз вже JWT то payload refresh token завжди мінімальна, payload access token depends
3) з п.2 випливає що дональд трамп може взяти рефреш токен і віддати його майору фсб, щоб або прямо використати його як AT, або наштампувати cобі скільки хоче AT і рефрешити похідні токени. ЗИ. В статті не видно, але create_token_pair використовує один uuid для jti обох токенів, це частково знімає проблему і вішає секуріті на волосинку
4) собсно не ведеться трекінг похідних токенів, нема поняття сесії => неможливо відслідкувати токени в сесії крім як поки по jti і собсно їх інвалідувати
5) замість блекліста краще вести вайтліст і просто видаляти ті токени з таблиці, незрозуміло нащо той блекліст
6) нема ротації ключів, зміна ключа фактично знищить всі токени
7) витік ключа дозволить будь-кому штампувати валідні токени, валідні токени мають трекатись теж (відповідно з 3 + 5 рекомендую замість блекліста вести вайтліст)
8) бачу створення експайрі, але не побачив перевірку експайрі токенів. PS особіность декодингу pyjwt, питання знято
9) реалізовано блекліст uuid через jti, а інвалідувати треба сесію, але сесії тут нема, замість сесії jti. В принципі це принаймні означає що jti грає якусь роль акі сесії бо копіюється при рефреші і хоча б у поточній версії однаковй у AT та RT. Але це означає що необережна зміна відкриє діру з п 3. З іншого боку навіть зара все що треба щоб отримати доступ з перехопленим токеном це почекати поки хтось одного дня почистить зростаючу безікнечно таблицю

фаціт: за старання плюс :) але в продакшені краще все таки щось використати готове

data — дані, які ми хочемо зашифрувати у наш JWT-токен, туди не можна передавати sensetive інформацію, різні
ключі, паролі

JWT це публічні дані, це фактично цифровий підпис якихось фактів які перевірені IdP та собсно тому складають пейлоад, але це не шифрування. Думай про нього як про наприклад сертифікат TLS чи який цифровий підпис на електронній пошті, в жодному JWT токені не можна передавати «різні ключі, паролі»

JWT можуть бути зашифрованими, відповідно, якісь чутливі дані передавати можна. Але все це значно ускладнює процес і частіше за все не треба.

Нууу так є стандарт але ж в самописному idp цього ж нема

Плюс навіть зашифрований тре передавати не через паблік. це ж як викласти свою базу кейпас в інтернет — зашифровану. Як китайській компартії треба буде витратять рік але розшифрують. Ну такоє звісно, як то кажуть один раз не... і нізя але як хочеться то можно...

1)TokenBlacklist — дивна назва для таблиці нективних токенів...
2) при декодуванні є перевірка з таблиці неактивних токенів, чому немає перевірки на колізію при створенні токена з таблиці токенів та неактивних токенів?
3) чи потрібна вам історія всіх токенів, може є сенс тримати 1 таблицю із статусом токену isActive і id юзера, тобто 1 юзер 1 токен в валідному чи невалідному статусі, а при логіні чи рефрешу перезаписуєте токен, а при розлогіненні переводити в isActive = false

2) У JWT-токенах використовується унікальна інформація, зокрема iat (issued at), тому кожен новий токен відрізняється від попередніх. Це виключає можливість колізій під час генерації. Крім того, токени не зберігаються в базі як валідні — ми перевіряємо їх під час запиту, і якщо токен в чорному списку або його строк дії сплив, він не буде прийнятий.
Таким чином, колізії не можуть виникнути, і немає потреби робити додаткові перевірки при створенні токенів.

3) Зберігання кількох токенів забезпечує кращий користувацький досвід, оскільки дозволяє підтримувати паралельні сесії на різних пристроях. Наприклад, користувач може залишатися залогіненим на телефоні, навіть якщо увійшов на комп’ютері.

Також це дає можливість вибірково інвалідовувати токени або вийти з усіх сесій одразу, якщо потрібно. Якщо ж зберігати лише один токен із статусом isActive та перезаписувати його при кожному вході, це може створити незручності — користувача постійно викидатиме з одного пристрою після входу на іншому. Тому використання окремих токенів і чорного списку є більш гнучким рішенням.

Підписатись на коментарі