PostgreSQL UPSERT з історією: від 90 секунд до 5 через CTE

Привіт, пару років тому писав про вставку та оновлення пачками в PostgreSQL за допомогою UNNEST — варто доповнити цю тему використанням CTE на прикладі реальної задачі.

100к email-ів з аудит-логом вставлялись 90 секунд. Переписав через CTE — стало 5.

Задача: взяти файл з активними email-адресами й оновити їх стан в БД зі збереженням історії змін. Усі адреси з файлу помічаються як активні, всі які були в БД, але відсутні у файлі — деактивуються. Додатково записується історія змін для аудиту — хто і коли оновив стани. Теоретично у файлі може бути від 100 до 100 000 адресів.

Ось так виглядає схема БД для збереження стану та історії змін:

CREATE TABLE rtt_email_files
(
    id         BIGSERIAL NOT NULL PRIMARY KEY,
    -- meta: file name, size, ...
    created_at TIMESTAMP NOT NULL,
    created_by BIGINT    NOT NULL REFERENCES users (id)
);

CREATE TABLE rtt_emails
(
    id         BIGSERIAL NOT NULL PRIMARY KEY,
    email      VARCHAR   NOT NULL UNIQUE, -- lowercase email address
    active     BOOLEAN   NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE rtt_email_history
(
    id         BIGSERIAL NOT NULL PRIMARY KEY,
    email_id   BIGINT    NOT NULL REFERENCES rtt_emails (id),
    active     BOOLEAN   NOT NULL,
    file_id    BIGINT    NOT NULL REFERENCES rtt_email_files (id),
    created_at TIMESTAMP NOT NULL
);;

Для взаємодії з PostgreSQL я використовую sqlc, ось такі запити підготував:

-- name: RttEmailFilesNew :one
INSERT INTO rtt_email_files (created_at, created_by)
VALUES (@created_at, @created_by)
RETURNING id;

-- name: RttEmailsActivate :many
INSERT INTO rtt_emails AS t (email, active, created_at, updated_at)
VALUES (UNNEST(@emails::VARCHAR[]), TRUE, @created_at, @created_at)
ON CONFLICT (email) DO UPDATE
    SET active     = EXCLUDED.active,
        updated_at = EXCLUDED.updated_at
WHERE t.active IS DISTINCT FROM EXCLUDED.active
RETURNING id;

-- name: RttEmailsDeactivate :many
UPDATE rtt_emails
SET active     = FALSE,
    updated_at = @updated_at
WHERE (@emails::VARCHAR[] IS NULL OR NOT email = ANY (@emails::VARCHAR[]))
  AND active = TRUE
RETURNING id;

-- name: RttEmailHistoryNew :exec
INSERT INTO rtt_email_history (email_id, active, file_id, created_at)
VALUES (UNNEST(@email_ids::BIGINT[]), @active, @file_id, @created_at);

А ось так виглядає код вставки:

func (r *RttEmailRepository) Upload(
	ctx context.Context,
	file *FileMetadata,
	emails []string,
	createdAt time.Time,
	createdBy int64,
) (activatedCount int64, deactivatedCount int64, err error) {
	txErr := r.db.WithTransaction(ctx, func(queries *dbs.Queries) error {
		fileID, err := queries.RttEmailFilesNew(ctx, dbs.RttEmailFilesNewParams{
			// file metadata
			CreatedAt: createdAt,
			CreatedBy: createdBy,
		})
		if err != nil {
			return fmt.Errorf("failed to create Rtt email file record: %v", err)
		}

		activatedIDs, err := queries.RttEmailsActivate(ctx, dbs.RttEmailsActivateParams{
			Emails:    emails,
			CreatedAt: createdAt,
		})
		if err != nil {
			return fmt.Errorf("failed to activate Rtt emails: %v", err)
		}

		if len(activatedIDs) > 0 {
			err := queries.RttEmailHistoryNew(ctx, dbs.RttEmailHistoryNewParams{
				EmailIds:  activatedIDs,
				Active:    true,
				FileID:    fileID,
				CreatedAt: createdAt,
			})
			if err != nil {
				return fmt.Errorf("failed to create Rtt email history records for activated emails: %v", err)
			}
		}

		deactivatedIDs, err := queries.RttEmailsDeactivate(ctx, dbs.RttEmailsDeactivateParams{
			UpdatedAt: createdAt,
			Emails:    emails,
		})
		if err != nil {
			return fmt.Errorf("failed to deactivate Rtt emails: %v", err)
		}

		if len(deactivatedIDs) > 0 {
			err := queries.RttEmailHistoryNew(ctx, dbs.RttEmailHistoryNewParams{
				EmailIds:  deactivatedIDs,
				Active:    false,
				FileID:    fileID,
				CreatedAt: createdAt,
			})
			if err != nil {
				return fmt.Errorf("failed to create Rtt email history records for deactivated emails: %v", err)
			}
		}

		activatedCount = int64(len(activatedIDs))
		deactivatedCount = int64(len(deactivatedIDs))

		return nil
	})
	if txErr != nil {
		return 0, 0, txErr
	}

	return activatedCount, deactivatedCount, nil
}

Обробка 10к email-адрес займає ~5 секунд, 100к — вже ~90 секунд.

Проблема в зайвих round-trips до БД: bulk-підхід робить окремі запити для UPSERT, деактивації та запису історії. Рішення — перенести всю логіку в один запит через CTE.

Тепер запит виглядає ось так:

-- name: RttEmailsSync :one
WITH
    --
    data (email) AS (
        --
        SELECT UNNEST(@emails::VARCHAR[])
        --
    ),
    deactivated (id) AS (
        --
        UPDATE rtt_emails
        SET active     = FALSE,
            updated_at = @now::TIMESTAMP
        WHERE email NOT IN (SELECT email FROM data)
          AND active = TRUE
        RETURNING id
        --
    ),
    activated (id) AS (
        --
        INSERT INTO rtt_emails (email, active, created_at, updated_at)
        SELECT email, TRUE, @now::TIMESTAMP, @now::TIMESTAMP
        FROM data
        ON CONFLICT (email) DO UPDATE
            SET active     = EXCLUDED.active,
                updated_at = EXCLUDED.updated_at
        WHERE rtt_emails.active IS DISTINCT FROM EXCLUDED.active
        RETURNING id
        --
    ),
    deactivated_history AS (
        --
        INSERT INTO rtt_email_history (email_id, active, file_id, created_at)
        SELECT id, FALSE, @file_id::BIGINT, @now::TIMESTAMP
        FROM deactivated
        --
    ),
    activated_history AS (
        --
        INSERT INTO rtt_email_history (email_id, active, file_id, created_at)
        SELECT id, TRUE, @file_id::BIGINT, @now::TIMESTAMP
        FROM activated
        --
    )
SELECT (SELECT COUNT(*) FROM activated)   AS activated_count,
       (SELECT COUNT(*) FROM deactivated) AS deactivated_count;

А так виглядає код вставки:

func (r *RttEmailRepository) UploadFast(
	ctx context.Context,
	file *FileMetadata,
	emails []string,
	createdAt time.Time,
	createdBy int64,
) (activatedCount int64, deactivatedCount int64, err error) {
	txErr := r.db.WithTransaction(ctx, func(queries *dbs.Queries) error {
		fileID, err := queries.RttEmailFilesNew(ctx, dbs.RttEmailFilesNewParams{
			// file metadata
			CreatedAt: createdAt,
			CreatedBy: createdBy,
		})
		if err != nil {
			return fmt.Errorf("failed to create Rtt email file record: %v", err)
		}

		result, err := queries.RttEmailsSync(ctx, dbs.RttEmailsSyncParams{
			Emails: emails,
			Now:    createdAt,
			FileID: fileID,
		})
		if err != nil {
			return fmt.Errorf("failed to sync Rtt emails: %v", err)
		}

		activatedCount = result.ActivatedCount
		deactivatedCount = result.DeactivatedCount

		return nil
	})
	if txErr != nil {
		return 0, 0, txErr
	}

	return activatedCount, deactivatedCount, nil
}

Обробка 10к email-адрес займає ~3 секунди, 100к — ~5 секунд, 1кк — ~30 секунд.

Для мене таке суттєве пришвидшення за рахунок зменшення передачі даних між БД та застосунком стало відкриттям, тому оформив цю тему.

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

👍ПодобаєтьсяСподобалось9
До обраногоВ обраному3
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Підписатись на коментарі