Batch UPDATE в PostgreSQL

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Привіт, я розробляю анонімний пошук роботи та хочу зробити статистику онлайну публічною, щоб рекрутер міг побачити, скільки кандидатів Senior Go Developer було онлайн цього тижня.

Як це буде працювати: коли користувач щось робить на сайті, у Redis зберігатиметься user_id та timestamp, а кожні десять хвилин запускатиметься команда перенесення онлайну пачками з Redis в PostgreSQL.

У попередніх двох статтях розглянули доступні типи даних для збереження онлайну в Redis, а також вибрали оптимальний тип даних для збереження онлайну в Redis на основі тестів. Розглянули типи Hash, Set та Sorted set, а також бази Redis, KeyDB та DragonflyDB. Результати тестування в репозиторії research-online-redis-go.

А в цій статті протестуємо варіанти збереження онлайну пачками в PostgreSQL, результати тестування в репозиторії research-online-postgres-go.

Від MySQL до PostgreSQL

Коли я працював з MySQL, то знав тільки два варіанти оновлення пачками:
  1. У транзакції, де UPDATE викликається в циклі.
  2. Через UPDATE CASE WHEN.
CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
START TRANSACTION;

UPDATE user_online
SET online = '2023-08-07 10:01:00'
WHERE user_id = 1;

-- ... WHERE user_id = 2;
-- ... WHERE user_id = 3;
-- ... WHERE user_id = 4;

UPDATE user_online
SET online = '2023-08-07 10:05:00'
WHERE user_id = 5;

COMMIT;
UPDATE user_online
SET online = CASE user_id
    WHEN 1 THEN '2023-08-07 10:01:00'::TIMESTAMP
    WHEN 2 THEN '2023-08-07 10:02:00'::TIMESTAMP
    WHEN 3 THEN '2023-08-07 10:03:00'::TIMESTAMP
    WHEN 4 THEN '2023-08-07 10:04:00'::TIMESTAMP
    WHEN 5 THEN '2023-08-07 10:05:00'::TIMESTAMP
END
WHERE user_id IN (1, 2, 3, 4, 5);

Коли я почав працювати з PostgreSQL, то хотів знайти красивіше рішення: таким рішенням є використання unnest, але це очевидне використання функції unnest постійно пропускав.

Якось в одному з PR-ів Іл’яса побачив конструкцію UPDATE FROM VALUES:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
    VALUES (1, '2023-08-07 10:01:00'::TIMESTAMP),
           (2, '2023-08-07 10:02:00'::TIMESTAMP),
           (3, '2023-08-07 10:03:00'::TIMESTAMP),
           (4, '2023-08-07 10:04:00'::TIMESTAMP),
           (5, '2023-08-07 10:05:00'::TIMESTAMP)
) AS from_t (user_id, online)
WHERE to_t.user_id = from_t.user_id;

Приклад UPDATE FROM VALUES виглядає краще за попередній приклад UPDATE CASE WHEN, тому я вирішив ще раз пошукати красиві рішення та перевірити їх на швидкодію.

Функція unnest для оновлення пачками

Функція unnest перетворює масиви в рядки. Розглянемо, як працює unnest на простих прикладах:
SELECT unnest(ARRAY[9, 10]) AS user_id;
user_id
9
10
SELECT unnest(ARRAY[9, 10])                                        AS user_id,
       unnest(ARRAY['2023-08-07 15:09:00', '2023-08-07 15:10:00']) AS online;
user_idonline
9 2023-08-07 15:09:00
10 2023-08-07 15:10:00

Тепер, коли ми розуміємо як працює unnest, можемо використати unnest для оновлення пачками:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest(ARRAY[9, 10])                                                            AS user_id,
                unnest(ARRAY[TIMESTAMP '2023-08-07 15:09:00', TIMESTAMP '2023-08-07 15:10:00']) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id;

Варіанти оновлення пачками в PostgreSQL

Розглянемо варіанти UPDATE та UPSERT.

Підготовка таблиці та даних:

CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
TRUNCATE user_online;
INSERT INTO user_online (user_id, online)
VALUES (1, '2023-08-07 10:01:00'),
       (2, '2023-08-07 10:02:00'),
       (3, '2023-08-07 10:03:00'),
       (4, '2023-08-07 10:04:00'),
       (5, '2023-08-07 10:05:00'),
       (6, '2023-08-07 10:06:00'),
       (7, '2023-08-07 10:07:00'),
       (8, '2023-08-07 10:08:00'),
       (9, '2023-08-07 10:09:00'),
       (10, '2023-08-07 10:10:00'),
       (11, '2023-08-07 10:11:00'),
       (12, '2023-08-07 10:12:00');

Перший варіант — це UPSERT у циклі:

START TRANSACTION;
INSERT INTO user_online (user_id, online)
VALUES (1, '2023-08-07 11:01:00')
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

INSERT INTO user_online (user_id, online)
VALUES (2, '2023-08-07 11:02:00')
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;
COMMIT;

Другий варіант — це UPDATE у циклі:

START TRANSACTION;
UPDATE user_online
SET online = '2023-08-07 12:03:00'
WHERE user_id = 3;

UPDATE user_online
SET online = '2023-08-07 12:04:00'
WHERE user_id = 4;
COMMIT;

Третій варіант — це UPSERT з unnest:

INSERT INTO user_online (user_id, online)
VALUES (unnest(ARRAY[11, 12]),
        unnest(ARRAY['2023-08-07 16:11:00'::TIMESTAMP, '2023-08-07 16:12:00'::TIMESTAMP]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

Четвертий варіант — це UPDATE з unnest:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
    SELECT unnest(ARRAY[9, 10])                                                              AS user_id,
           unnest(ARRAY['2023-08-07 15:09:00'::TIMESTAMP, '2023-08-07 15:10:00'::TIMESTAMP]) AS online
) AS from_t
WHERE to_t.user_id = from_t.user_id;

Опис задачі

У нас є чотири варіанти оновлення записів пачками. Для кожного варіанту я напишу сервіс-обгортку на Go. Кожен сервіс на Go покрию тестом, а також напишу бенчмарки, щоб вибрати оптимальне рішення.

Почнемо з налаштування проєкту, яке буде схоже на налаштування проєкту зі статті «Go: ефективна робота з SQL».

Налаштування проєкту

Я створю Go-проєкт, загорну в Docker, підімкну Postgres і перевірю, що контейнери робочі.
go mod init github.com/doutivity/research-online-postgres-go
go: creating new go.mod: module github.com/doutivity/research-online-postgres-go

Тепер підготую docker-compose.yml, який матиме опис контейнеру з Go та опис контейнеру з Postgres:

version: "3.7"

services:
  app:
    container_name: "research-online-postgres-go-app"
    image: golang:1.21.0-alpine
    working_dir: /go/src/github.com/doutivity/research-online-postgres-go
    volumes:
      - .:/go/src/github.com/doutivity/research-online-postgres-go
    command: "sleep infinity"
    depends_on:
      - postgres1

  postgres1:
    container_name: "research-online-postgres-1"
    image: postgres:16.0
    environment:
      POSTGRES_DB: "yaaws"
      POSTGRES_USER: "yaroslav"
      POSTGRES_PASSWORD: "AnySecretPassword!!"
    ports:
      - "5432:5432"
docker-compose up -d
docker exec research-online-postgres-go-app go version
docker exec research-online-postgres-1 psql -U yaroslav -d yaaws -c "SELECT VERSION();"
Go 1.21.0
PostgreSQL 16.0
Як бачимо, команди успішно виконались та вивели очікувані результати.

Налаштування роботи з БД в Go

Для роботи з БД в повсякденній роботі я використовую два інструменти, goose для роботи з міграціями та sqlc для генерування Go-коду на основі SQL-запитів і схеми БД.
go install github.com/pressly/goose/v3/cmd/goose@latest
go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
goose --version # v3.15.1
sqlc version # v1.21.0

Створимо першу міграцію з таблицею user_online:

# Creates new migration file with the current timestamp
# Example: make create-new-migration-file NAME=<name>
create-new-migration-file:
	$(eval NAME ?= unknown)
	mkdir -p ./migrations/
	goose -dir ./migrations/ create $(NAME) sql
make create-new-migration-file NAME=user_online
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── Makefile
+└── migrations
+    └── 20230807225910_user_online.sql
cat ./migrations/20230807225910_user_online.sql
-- +goose Up
-- +goose StatementBegin
CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE user_online;
-- +goose StatementEnd
POSTGRES_URI=postgresql://yaroslav:AnySecretPassword!!@localhost:5432/yaaws?sslmode=disable

migrate-up:
	goose -dir ./migrations/ -table schema_migrations postgres $(POSTGRES_URI) up
make migrate-up
2023/10/28 00:57:22 OK   20230807225910_user_online.sql (6.18ms)
2023/10/28 00:57:22 goose: successfully migrated database to version: 20230807225910

Тепер, коли ми створили міграції, можна підготувати SQL-запити, налаштувати sqlc, вказавши де розташована тека з SQL-запитами та тека зі схемою БД, щоб згенерувати Go-код для взаємодії з БД.

mdkir -p ./internal/storage/postgres/queries/
touch ./internal/storage/postgres/queries/user_online.sql
cat ./internal/storage/postgres/queries/user_online.sql
-- name: UserOnlineUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (@user_id, @online)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineUpdate :exec
UPDATE user_online
SET online = @online
WHERE user_id = @user_id;

-- name: UserOnlineUnnestUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (unnest(@user_ids::BIGINT[]),
        unnest(@onlines::TIMESTAMP[]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineUnnestUpdate :exec
UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest(@user_ids::BIGINT[])   AS user_id,
                unnest(@onlines::TIMESTAMP[]) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id;
touch sqlc.yaml
cat sqlc.yaml
version: "2"
sql:
  - engine: "postgresql"
    queries: "./internal/storage/postgres/queries/"
    schema: "./migrations/"

    gen:
      go:
        package: "dbs"
        sql_package: "pgx/v5"
        out: "./internal/storage/postgres/dbs/"
        emit_prepared_queries: true
sqlc generate
tree .
 ├── docker-compose.yml
 ├── go.mod
+├── internal
+│   └── storage
+│       └── postgres
+│           ├── dbs
+│           │   ├── db.go
+│           │   └── user_online.sql.go
+│           └── queries
+│               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
+└── sqlc.yaml

Згенерований Go-код:

cat ./internal/storage/postgres/dbs/db.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.21.0

package dbs

import (
	"context"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgconn"
)

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
	SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
}

func New(db DBTX) *Queries {
	return &Queries{db: db}
}

type Queries struct {
	db DBTX
}

func (q *Queries) WithTx(tx pgx.Tx) *Queries {
	return &Queries{
		db: tx,
	}
}
cat ./internal/storage/postgres/dbs/user_online.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.21.0
// source: user_online.sql

package dbs

import (
	"context"

	"github.com/jackc/pgx/v5/pgtype"
)

const userOnlineUnnestUpdate = `-- name: UserOnlineUnnestUpdate :exec
UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest($1::BIGINT[])   AS user_id,
                unnest($2::TIMESTAMP[]) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id
`

type UserOnlineUnnestUpdateParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpdate(ctx context.Context, arg UserOnlineUnnestUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpdate, arg.UserIds, arg.Onlines)
	return err
}

const userOnlineUnnestUpsert = `-- name: UserOnlineUnnestUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (unnest($1::BIGINT[]),
        unnest($2::TIMESTAMP[]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online
`

type UserOnlineUnnestUpsertParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpsert(ctx context.Context, arg UserOnlineUnnestUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpsert, arg.UserIds, arg.Onlines)
	return err
}

const userOnlineUpdate = `-- name: UserOnlineUpdate :exec
UPDATE user_online
SET online = $1
WHERE user_id = $2
`

type UserOnlineUpdateParams struct {
	Online pgtype.Timestamp
	UserID int64
}

func (q *Queries) UserOnlineUpdate(ctx context.Context, arg UserOnlineUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpdate, arg.Online, arg.UserID)
	return err
}

const userOnlineUpsert = `-- name: UserOnlineUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES ($1, $2)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online
`

type UserOnlineUpsertParams struct {
	UserID int64
	Online pgtype.Timestamp
}

func (q *Queries) UserOnlineUpsert(ctx context.Context, arg UserOnlineUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpsert, arg.UserID, arg.Online)
	return err
}

Робота з транзакціями

Під час роботи з транзакціями для зручності я використовуватиму обгортку:
cat ./internal/storage/postgres/database.go
package postgres

import (
	"context"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

// Database - repository
type Database struct {
	pool    *pgxpool.Pool
	queries *dbs.Queries
}

// NewDatabase - constructor
func NewDatabase(pool *pgxpool.Pool) *Database {
	var queries = dbs.New(pool)

	return &Database{
		pool:    pool,
		queries: queries,
	}
}

// Connection - getter
func (r *Database) Connection() *pgxpool.Pool {
	return r.pool
}

// Queries - getter
func (r *Database) Queries() *dbs.Queries {
	return r.queries
}

// WithTransaction - start transaction
func (r *Database) WithTransaction(ctx context.Context, fn func(queries *dbs.Queries) error) error {
	return withTransaction(ctx, r.pool, r.queries, fn)
}

func withTransaction(
	ctx context.Context,
	db *pgxpool.Pool,
	queries *dbs.Queries,
	fn func(queries *dbs.Queries) error,
) (err error) {
	tx, err := db.BeginTx(ctx, pgx.TxOptions{})
	if err != nil {
		return
	}

	defer func() {
		if p := recover(); p != nil {
			// a panic occurred, rollback and repanic
			tx.Rollback(ctx)

			panic(p)
		} else if err != nil {
			// something went wrong, rollback
			tx.Rollback(ctx)
		} else {
			// all good, commit
			err = tx.Commit(ctx)
		}
	}()

	err = fn(queries.WithTx(tx))

	return err
}
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── internal
 │   └── storage
 │       └── postgres
+│           ├── database.go
 │           ├── dbs
 │           │   ├── db.go
 │           │   └── user_online.sql.go
 │           └── queries
 │               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
 └── sqlc.yaml

Тестування

Отже, ми маємо чотири функції у згенерованому файлі ./internal/storage/postgres/dbs/user_online.go:
  • UserOnlineUpsert
  • UserOnlineUpdate
  • UserOnlineUnnestUpsert
  • UserOnlineUnnestUpdate
package dbs

import (
	"context"

	"github.com/jackc/pgx/v5/pgtype"
)

// ...

type UserOnlineUpsertParams struct {
	UserID int64
	Online pgtype.Timestamp
}

func (q *Queries) UserOnlineUpsert(ctx context.Context, arg UserOnlineUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpsert, arg.UserID, arg.Online)
	return err
}

// ...

type UserOnlineUpdateParams struct {
	Online pgtype.Timestamp
	UserID int64
}

func (q *Queries) UserOnlineUpdate(ctx context.Context, arg UserOnlineUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpdate, arg.Online, arg.UserID)
	return err
}

// ...

type UserOnlineUnnestUpsertParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpsert(ctx context.Context, arg UserOnlineUnnestUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpsert, arg.UserIds, arg.Onlines)
	return err
}

// ...

type UserOnlineUnnestUpdateParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpdate(ctx context.Context, arg UserOnlineUnnestUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpdate, arg.UserIds, arg.Onlines)
	return err
}

Перед тестуванням на швидкодію треба перевірити, що функції працюють коректно.
Функції мають різні сигнатури, тому адаптуємо їх до одного інтерфейсу OnlineStorage, який й покриємо тестом:

package main

import (
	"context"
)

type OnlineStorage interface {
	BatchStore(ctx context.Context, pairs []UserOnlinePair) error
}
package main

type UserOnlinePair struct {
	UserID    int64
	Timestamp int64
}
package main

import (
	"context"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func testOnlineStorage(
	t *testing.T,
	storage OnlineStorage,
) {
	t.Helper()

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	var (
		pair1v1 = UserOnlinePair{
			UserID:    1,
			Timestamp: 2e3,
		}
		pair2v1 = UserOnlinePair{
			UserID:    2,
			Timestamp: 3e4,
		}
		pair3v1 = UserOnlinePair{
			UserID:    3,
			Timestamp: 4e5,
		}
		pair4v1 = UserOnlinePair{
			UserID:    4,
			Timestamp: 5e6,
		}
		pair5v1 = UserOnlinePair{
			UserID:    5,
			Timestamp: 6e7,
		}

		pair2v2 = UserOnlinePair{
			UserID:    pair2v1.UserID,
			Timestamp: pair2v1.Timestamp + 10001,
		}
		pair3v2 = UserOnlinePair{
			UserID:    pair3v1.UserID,
			Timestamp: pair3v1.Timestamp + 10002,
		}
	)

	truncateOnline(t, ctx, pool)
	insertOnline(t, ctx, pool, []UserOnlinePair{
		pair1v1,
		pair2v1,
		pair3v1,
		pair4v1,
		pair5v1,
	})

	err = storage.BatchStore(ctx, []UserOnlinePair{
		pair2v2,
		pair3v2,
	})
	require.NoError(t, err)

	expectedOnline(t, ctx, pool, []UserOnlinePair{
		pair1v1,
		pair2v2,
		pair3v2,
		pair4v1,
		pair5v1,
	})
}

Тепер інтерфейс OnlineStorage потрібно реалізувати, зробимо це для UserOnlineUpsert та UserOnlineUnnestUpsert.
Для UserOnlineUpsert:

package main

import (
	"context"
	"time"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/jackc/pgx/v5/pgtype"
)

type TxLoopUpsertOnlineStorage struct {
	db *postgres.Database
}

func NewTxLoopUpsertOnlineStorage(db *postgres.Database) *TxLoopUpsertOnlineStorage {
	return &TxLoopUpsertOnlineStorage{db: db}
}

func (s *TxLoopUpsertOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	return s.db.WithTransaction(ctx, func(queries *dbs.Queries) error {
		for _, pair := range pairs {
			err := queries.UserOnlineUpsert(ctx, dbs.UserOnlineUpsertParams{
				UserID: pair.UserID,
				Online: pgtype.Timestamp{
					Time:  time.Unix(pair.Timestamp, 0).UTC(),
					Valid: true,
				},
			})
			if err != nil {
				return err
			}
		}

		return nil
	})
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func TestTxLoopUpsertOnlineStorage(t *testing.T) {
	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	storage := NewTxLoopUpsertOnlineStorage(postgres.NewDatabase(pool))

	testOnlineStorage(t, storage)
}

Для UserOnlineUnnestUpsert:

package main

import (
	"context"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"
)

type UnnestUpsertOnlineStorage struct {
	db *postgres.Database
}

func NewUnnestUpsertOnlineStorage(db *postgres.Database) *UnnestUpsertOnlineStorage {
	return &UnnestUpsertOnlineStorage{db: db}
}

func (s *UnnestUpsertOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	userIDs, timestamps := userOnlinePairsToPgxSlices(pairs)

	return s.db.Queries().UserOnlineUnnestUpsert(ctx, dbs.UserOnlineUnnestUpsertParams{
		UserIds: userIDs,
		Onlines: timestamps,
	})
}
package main

import (
	"time"

	"github.com/jackc/pgx/v5/pgtype"
)

func userOnlinePairsToPgxSlices(pairs []UserOnlinePair) ([]int64, []pgtype.Timestamp) {
	var (
		userIDs    = make([]int64, len(pairs))
		timestamps = make([]pgtype.Timestamp, len(pairs))
	)

	for i, pair := range pairs {
		userIDs[i] = pair.UserID
		timestamps[i] = pgtype.Timestamp{
			Time:  time.Unix(pair.Timestamp, 0).UTC(),
			Valid: true,
		}
	}

	return userIDs, timestamps
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func TestUnnestUpsertOnlineStorage(t *testing.T) {
	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	storage := NewUnnestUpsertOnlineStorage(postgres.NewDatabase(pool))

	testOnlineStorage(t, storage)
}

Для UserOnlineUpdate та UserOnlineUnnestUpdate схожа реалізація інтерфейсу OnlineStorage.

Тепер можемо запустити тести:

go-test:
	docker exec research-online-postgres-go-app go test ./... -v -count=1
make go-test
=== RUN   TestTxLoopUpdateOnlineStorage
--- PASS: TestTxLoopUpdateOnlineStorage (0.01s)
=== RUN   TestTxLoopUpsertOnlineStorage
--- PASS: TestTxLoopUpsertOnlineStorage (0.01s)
=== RUN   TestUnnestUpdateOnlineStorage
--- PASS: TestUnnestUpdateOnlineStorage (0.01s)
=== RUN   TestUnnestUpsertOnlineStorage
--- PASS: TestUnnestUpsertOnlineStorage (0.01s)
PASS
ok  	github.com/doutivity/research-online-postgres-go	0.081s
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── internal
 │   └── storage
 │       └── postgres
 │           ├── database.go
 │           ├── dbs
 │           │   ├── db.go
 │           │   └── user_online.sql.go
 │           └── queries
 │               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
+├── online_storage.go
+├── online_storage_test.go
 ├── sqlc.yaml
+├── tx_loop_update_online_storage.go
+├── tx_loop_update_online_storage_test.go
+├── tx_loop_upsert_online_storage.go
+├── tx_loop_upsert_online_storage_test.go
+├── unnest_update_online_storage.go
+├── unnest_update_online_storage_test.go
+├── unnest_upsert_online_storage.go
+├── unnest_upsert_online_storage_test.go
+└── user_online_pair.go

Тестування варіантів на швидкодію

Тепер, коли ми впевнені, що варіанти працюють коректно, можемо протестувати швидкодію. Для цього будемо вставляти пачками по 1000 записів:
package main

import (
	"context"
	"sync/atomic"
	"testing"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func benchmarkOnlineStorage(
	b *testing.B,
	storage OnlineStorage,
) {
	b.Helper()

	const (
		batch  = 1000
		online = 1679800725
	)

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(b, err)
	defer pool.Close()

	truncateOnline(b, ctx, pool)
	generateOnline(b, ctx, pool, int64(b.N*batch), online)

	var (
		startTimestamp = time.Now().Unix()
		counter        = int64(0)
	)

	pairs := make([]UserOnlinePair, batch)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		index := atomic.AddInt64(&counter, batch)

		for j := 0; j < batch; j++ {
			pairs[j] = UserOnlinePair{
				UserID:    int64(i*batch + j + 1), // 0 .. 99, 100 .. 199
				Timestamp: startTimestamp + index,
			}
		}

		err := storage.BatchStore(ctx, pairs)

		require.NoError(b, err)
	}
	b.StopTimer()

	expectedOnlineChangedCount(b, ctx, pool, int64(b.N*batch), online)
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func BenchmarkUnnestUpdateOnlineStorage(b *testing.B) {
	if testing.Short() {
		b.Skip()
	}

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(b, err)
	defer pool.Close()

	storage := NewUnnestUpdateOnlineStorage(postgres.NewDatabase(pool))

	benchmarkOnlineStorage(b, storage)
}
go-bench:
	mkdir -p ./output/

	# ... -bench='TxLoopUpsert'
	# ... -bench='TxLoopUpdate'
	# ... -bench='UnnestUpsert'
	# ... -bench='UnnestUpdate'

	docker exec research-online-postgres-go-app \
		go test ./... -v -run=$$^ -bench='UnnestUpdate' -benchmem -benchtime=1000x -count=10 \
		| tee ./output/bench-go-1000x-unnest-update.txt
make go-bench

Результати:

Namens/opB/opallocs/op
TxLoopUpdate20.45ms ± 2%160 1355 005
TxLoopUpsert26.59ms ± 24%168 1355 005
UnnestUpdate3.785ms ± 4%234 9852 028
UnnestUpsert4.235ms ± 6%234 9852 028

Ще варіанти

Postgres-драйвер github.com/jackc/pgx має додаткову можливість вставки пачками :batchexec, тому протестуємо ще пару варіантів на швидкодію:
-- name: UserOnlineBatchExecUpsert :batchexec
INSERT INTO user_online (user_id, online)
VALUES (@user_id, @online)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineBatchExecUpdate :batchexec
UPDATE user_online
SET online = @online
WHERE user_id = @user_id;
cat batch_exec_update_online_storage.go
package main

import (
	"context"
	"time"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/hashicorp/go-multierror"
	"github.com/jackc/pgx/v5/pgtype"
)

type BatchExecUpdateOnlineStorage struct {
	db *postgres.Database
}

func NewBatchExecUpdateOnlineStorage(db *postgres.Database) *BatchExecUpdateOnlineStorage {
	return &BatchExecUpdateOnlineStorage{db: db}
}

func (s *BatchExecUpdateOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	args := make([]dbs.UserOnlineBatchExecUpdateParams, len(pairs))
	for i, pair := range pairs {
		args[i] = dbs.UserOnlineBatchExecUpdateParams{
			Online: pgtype.Timestamp{
				Time:  time.Unix(pair.Timestamp, 0).UTC(),
				Valid: true,
			},
			UserID: pair.UserID,
		}
	}

	var batchErr error

	s.db.Queries().UserOnlineBatchExecUpdate(ctx, args).Exec(func(i int, err error) {
		if err != nil {
			batchErr = multierror.Append(batchErr, err)
		}
	})

	return batchErr
}

Результати:

Namens/opB/opallocs/op
TxLoopUpdate20.45ms ± 2%160 1355 005
TxLoopUpsert26.59ms ± 24%168 1355 005
UnnestUpdate3.785ms ± 4%234 9852 028
UnnestUpsert4.235ms ± 6%234 9852 028
BatchExecUpdate7.044ms ± 1%495 3155 032
BatchExecUpsert7.004ms ± 8%503 3165 032

ReadyToTouch

Описане збереження онлайну вже використовується на сайті readytotouch.com.

Заради цікавості протестував на різних VPS з Intel та AMD, всі результати в репозиторії.

👍ПодобаєтьсяСподобалось16
До обраногоВ обраному1
LinkedIn

13 коментарів

Підписатись на коментаріВідписатись від коментарів Коментарі можуть залишати тільки користувачі з підтвердженими акаунтами.

Цікава стаття. Ще було б цікаво глянути якісь тести на оптимальну кількість рядочків в одному батчі

Це ж ще залежить від розміру самого рядка тому подібні тести варто проводити вже з реальними даними

Саме цікаве у цих перформанс тестах — що буде у хайлі конкарент середі, коли ті самі строки оновлюються іншими сессіями.

Наприклад, у тій ж Postgre/mssql/oracle є merge statement, який теж дуже швидко робить балк інсерт чи апдейт. Але швидко якщо нема конкаренсі, а якщо є, то ой як не швидко :)

Очікую, що для TxLoop, Unnest та BatchExec буде схоже співвідношення при паралельному виконанні

Якщо вас цікавлять конкретні результати то є репозиторій github.com/doutivity/research-online-postgres-go для експериментів

топчик дякую ❤️ це прям раз в 5 можна прискорити апдейти багато де

на Кложе щоб це глобально на проект в 200к зробити потрібно раз в 10 меньше коду, лол 😊

Зробіть технічну частину й я оформлю у статтю

Дякую, крутяк, побільше б таких статей, на них багато грошиків на рекламі не заробиш, але дуже корисні для саморозвитку

Теоретично можна заробити за написання технічних статей англійською мовою на Vultr Introduction to Vultr Creator Program, але щось вони ще налаштовують

Ніколи не чув про unnest, дякс, бенчкарки з деталями імплементації — топчик! поділився з колегами :) Thanks

Ех, premature optimization detected

Стаття кльова

Підписатись на коментарі