Batch UPDATE в PostgreSQL

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Привіт, я розробляю анонімний пошук роботи та хочу зробити статистику онлайну публічною, щоб рекрутер міг побачити, скільки кандидатів Senior Go Developer було онлайн цього тижня.

Як це буде працювати: коли користувач щось робить на сайті, у Redis зберігатиметься user_id та timestamp, а кожні десять хвилин запускатиметься команда перенесення онлайну пачками з Redis в PostgreSQL.

У попередніх двох статтях розглянули доступні типи даних для збереження онлайну в Redis, а також вибрали оптимальний тип даних для збереження онлайну в Redis на основі тестів. Розглянули типи Hash, Set та Sorted set, а також бази Redis, KeyDB та DragonflyDB. Результати тестування в репозиторії research-online-redis-go.

А в цій статті протестуємо варіанти збереження онлайну пачками в PostgreSQL, результати тестування в репозиторії research-online-postgres-go.

Від MySQL до PostgreSQL

Коли я працював з MySQL, то знав тільки два варіанти оновлення пачками:
  1. У транзакції, де UPDATE викликається в циклі.
  2. Через UPDATE CASE WHEN.
CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
START TRANSACTION;

UPDATE user_online
SET online = '2023-08-07 10:01:00'
WHERE user_id = 1;

-- ... WHERE user_id = 2;
-- ... WHERE user_id = 3;
-- ... WHERE user_id = 4;

UPDATE user_online
SET online = '2023-08-07 10:05:00'
WHERE user_id = 5;

COMMIT;
UPDATE user_online
SET online = CASE user_id
    WHEN 1 THEN '2023-08-07 10:01:00'::TIMESTAMP
    WHEN 2 THEN '2023-08-07 10:02:00'::TIMESTAMP
    WHEN 3 THEN '2023-08-07 10:03:00'::TIMESTAMP
    WHEN 4 THEN '2023-08-07 10:04:00'::TIMESTAMP
    WHEN 5 THEN '2023-08-07 10:05:00'::TIMESTAMP
END
WHERE user_id IN (1, 2, 3, 4, 5);

Коли я почав працювати з PostgreSQL, то хотів знайти красивіше рішення: таким рішенням є використання unnest, але це очевидне використання функції unnest постійно пропускав.

Якось в одному з PR-ів Іл’яса побачив конструкцію UPDATE FROM VALUES:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
    VALUES (1, '2023-08-07 10:01:00'::TIMESTAMP),
           (2, '2023-08-07 10:02:00'::TIMESTAMP),
           (3, '2023-08-07 10:03:00'::TIMESTAMP),
           (4, '2023-08-07 10:04:00'::TIMESTAMP),
           (5, '2023-08-07 10:05:00'::TIMESTAMP)
) AS from_t (user_id, online)
WHERE to_t.user_id = from_t.user_id;

Приклад UPDATE FROM VALUES виглядає краще за попередній приклад UPDATE CASE WHEN, тому я вирішив ще раз пошукати красиві рішення та перевірити їх на швидкодію.

Функція unnest для оновлення пачками

Функція unnest перетворює масиви в рядки. Розглянемо, як працює unnest на простих прикладах:
SELECT unnest(ARRAY[9, 10]) AS user_id;
user_id
9
10
SELECT unnest(ARRAY[9, 10])                                        AS user_id,
       unnest(ARRAY['2023-08-07 15:09:00', '2023-08-07 15:10:00']) AS online;
user_idonline
9 2023-08-07 15:09:00
10 2023-08-07 15:10:00

Тепер, коли ми розуміємо як працює unnest, можемо використати unnest для оновлення пачками:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest(ARRAY[9, 10])                                                            AS user_id,
                unnest(ARRAY[TIMESTAMP '2023-08-07 15:09:00', TIMESTAMP '2023-08-07 15:10:00']) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id;

Варіанти оновлення пачками в PostgreSQL

Розглянемо варіанти UPDATE та UPSERT.

Підготовка таблиці та даних:

CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
TRUNCATE user_online;
INSERT INTO user_online (user_id, online)
VALUES (1, '2023-08-07 10:01:00'),
       (2, '2023-08-07 10:02:00'),
       (3, '2023-08-07 10:03:00'),
       (4, '2023-08-07 10:04:00'),
       (5, '2023-08-07 10:05:00'),
       (6, '2023-08-07 10:06:00'),
       (7, '2023-08-07 10:07:00'),
       (8, '2023-08-07 10:08:00'),
       (9, '2023-08-07 10:09:00'),
       (10, '2023-08-07 10:10:00'),
       (11, '2023-08-07 10:11:00'),
       (12, '2023-08-07 10:12:00');

Перший варіант — це UPSERT у циклі:

START TRANSACTION;
INSERT INTO user_online (user_id, online)
VALUES (1, '2023-08-07 11:01:00')
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

INSERT INTO user_online (user_id, online)
VALUES (2, '2023-08-07 11:02:00')
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;
COMMIT;

Другий варіант — це UPDATE у циклі:

START TRANSACTION;
UPDATE user_online
SET online = '2023-08-07 12:03:00'
WHERE user_id = 3;

UPDATE user_online
SET online = '2023-08-07 12:04:00'
WHERE user_id = 4;
COMMIT;

Третій варіант — це UPSERT з unnest:

INSERT INTO user_online (user_id, online)
VALUES (unnest(ARRAY[11, 12]),
        unnest(ARRAY['2023-08-07 16:11:00'::TIMESTAMP, '2023-08-07 16:12:00'::TIMESTAMP]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

Четвертий варіант — це UPDATE з unnest:

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
    SELECT unnest(ARRAY[9, 10])                                                              AS user_id,
           unnest(ARRAY['2023-08-07 15:09:00'::TIMESTAMP, '2023-08-07 15:10:00'::TIMESTAMP]) AS online
) AS from_t
WHERE to_t.user_id = from_t.user_id;

Опис задачі

У нас є чотири варіанти оновлення записів пачками. Для кожного варіанту я напишу сервіс-обгортку на Go. Кожен сервіс на Go покрию тестом, а також напишу бенчмарки, щоб вибрати оптимальне рішення.

Почнемо з налаштування проєкту, яке буде схоже на налаштування проєкту зі статті «Go: ефективна робота з SQL».

Налаштування проєкту

Я створю Go-проєкт, загорну в Docker, підімкну Postgres і перевірю, що контейнери робочі.
go mod init github.com/doutivity/research-online-postgres-go
go: creating new go.mod: module github.com/doutivity/research-online-postgres-go

Тепер підготую docker-compose.yml, який матиме опис контейнеру з Go та опис контейнеру з Postgres:

version: "3.7"

services:
  app:
    container_name: "research-online-postgres-go-app"
    image: golang:1.21.0-alpine
    working_dir: /go/src/github.com/doutivity/research-online-postgres-go
    volumes:
      - .:/go/src/github.com/doutivity/research-online-postgres-go
    command: "sleep infinity"
    depends_on:
      - postgres1

  postgres1:
    container_name: "research-online-postgres-1"
    image: postgres:16.0
    environment:
      POSTGRES_DB: "yaaws"
      POSTGRES_USER: "yaroslav"
      POSTGRES_PASSWORD: "AnySecretPassword!!"
    ports:
      - "5432:5432"
docker-compose up -d
docker exec research-online-postgres-go-app go version
docker exec research-online-postgres-1 psql -U yaroslav -d yaaws -c "SELECT VERSION();"
Go 1.21.0
PostgreSQL 16.0
Як бачимо, команди успішно виконались та вивели очікувані результати.

Налаштування роботи з БД в Go

Для роботи з БД в повсякденній роботі я використовую два інструменти, goose для роботи з міграціями та sqlc для генерування Go-коду на основі SQL-запитів і схеми БД.
go install github.com/pressly/goose/v3/cmd/goose@latest
go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest
goose --version # v3.15.1
sqlc version # v1.21.0

Створимо першу міграцію з таблицею user_online:

# Creates new migration file with the current timestamp
# Example: make create-new-migration-file NAME=<name>
create-new-migration-file:
	$(eval NAME ?= unknown)
	mkdir -p ./migrations/
	goose -dir ./migrations/ create $(NAME) sql
make create-new-migration-file NAME=user_online
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── Makefile
+└── migrations
+    └── 20230807225910_user_online.sql
cat ./migrations/20230807225910_user_online.sql
-- +goose Up
-- +goose StatementBegin
CREATE TABLE user_online
(
    user_id BIGINT    NOT NULL PRIMARY KEY,
    online  TIMESTAMP NOT NULL
);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE user_online;
-- +goose StatementEnd
POSTGRES_URI=postgresql://yaroslav:AnySecretPassword!!@localhost:5432/yaaws?sslmode=disable

migrate-up:
	goose -dir ./migrations/ -table schema_migrations postgres $(POSTGRES_URI) up
make migrate-up
2023/10/28 00:57:22 OK   20230807225910_user_online.sql (6.18ms)
2023/10/28 00:57:22 goose: successfully migrated database to version: 20230807225910

Тепер, коли ми створили міграції, можна підготувати SQL-запити, налаштувати sqlc, вказавши де розташована тека з SQL-запитами та тека зі схемою БД, щоб згенерувати Go-код для взаємодії з БД.

mdkir -p ./internal/storage/postgres/queries/
touch ./internal/storage/postgres/queries/user_online.sql
cat ./internal/storage/postgres/queries/user_online.sql
-- name: UserOnlineUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (@user_id, @online)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineUpdate :exec
UPDATE user_online
SET online = @online
WHERE user_id = @user_id;

-- name: UserOnlineUnnestUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (unnest(@user_ids::BIGINT[]),
        unnest(@onlines::TIMESTAMP[]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineUnnestUpdate :exec
UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest(@user_ids::BIGINT[])   AS user_id,
                unnest(@onlines::TIMESTAMP[]) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id;
touch sqlc.yaml
cat sqlc.yaml
version: "2"
sql:
  - engine: "postgresql"
    queries: "./internal/storage/postgres/queries/"
    schema: "./migrations/"

    gen:
      go:
        package: "dbs"
        sql_package: "pgx/v5"
        out: "./internal/storage/postgres/dbs/"
        emit_prepared_queries: true
sqlc generate
tree .
 ├── docker-compose.yml
 ├── go.mod
+├── internal
+│   └── storage
+│       └── postgres
+│           ├── dbs
+│           │   ├── db.go
+│           │   └── user_online.sql.go
+│           └── queries
+│               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
+└── sqlc.yaml

Згенерований Go-код:

cat ./internal/storage/postgres/dbs/db.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.21.0

package dbs

import (
	"context"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgconn"
)

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
	SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
}

func New(db DBTX) *Queries {
	return &Queries{db: db}
}

type Queries struct {
	db DBTX
}

func (q *Queries) WithTx(tx pgx.Tx) *Queries {
	return &Queries{
		db: tx,
	}
}
cat ./internal/storage/postgres/dbs/user_online.go
// Code generated by sqlc. DO NOT EDIT.
// versions:
//   sqlc v1.21.0
// source: user_online.sql

package dbs

import (
	"context"

	"github.com/jackc/pgx/v5/pgtype"
)

const userOnlineUnnestUpdate = `-- name: UserOnlineUnnestUpdate :exec
UPDATE user_online AS to_t
SET online = from_t.online
FROM (
         SELECT unnest($1::BIGINT[])   AS user_id,
                unnest($2::TIMESTAMP[]) AS online
     ) AS from_t
WHERE to_t.user_id = from_t.user_id
`

type UserOnlineUnnestUpdateParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpdate(ctx context.Context, arg UserOnlineUnnestUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpdate, arg.UserIds, arg.Onlines)
	return err
}

const userOnlineUnnestUpsert = `-- name: UserOnlineUnnestUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES (unnest($1::BIGINT[]),
        unnest($2::TIMESTAMP[]))
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online
`

type UserOnlineUnnestUpsertParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpsert(ctx context.Context, arg UserOnlineUnnestUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpsert, arg.UserIds, arg.Onlines)
	return err
}

const userOnlineUpdate = `-- name: UserOnlineUpdate :exec
UPDATE user_online
SET online = $1
WHERE user_id = $2
`

type UserOnlineUpdateParams struct {
	Online pgtype.Timestamp
	UserID int64
}

func (q *Queries) UserOnlineUpdate(ctx context.Context, arg UserOnlineUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpdate, arg.Online, arg.UserID)
	return err
}

const userOnlineUpsert = `-- name: UserOnlineUpsert :exec
INSERT INTO user_online (user_id, online)
VALUES ($1, $2)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online
`

type UserOnlineUpsertParams struct {
	UserID int64
	Online pgtype.Timestamp
}

func (q *Queries) UserOnlineUpsert(ctx context.Context, arg UserOnlineUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpsert, arg.UserID, arg.Online)
	return err
}

Робота з транзакціями

Під час роботи з транзакціями для зручності я використовуватиму обгортку:
cat ./internal/storage/postgres/database.go
package postgres

import (
	"context"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

// Database - repository
type Database struct {
	pool    *pgxpool.Pool
	queries *dbs.Queries
}

// NewDatabase - constructor
func NewDatabase(pool *pgxpool.Pool) *Database {
	var queries = dbs.New(pool)

	return &Database{
		pool:    pool,
		queries: queries,
	}
}

// Connection - getter
func (r *Database) Connection() *pgxpool.Pool {
	return r.pool
}

// Queries - getter
func (r *Database) Queries() *dbs.Queries {
	return r.queries
}

// WithTransaction - start transaction
func (r *Database) WithTransaction(ctx context.Context, fn func(queries *dbs.Queries) error) error {
	return withTransaction(ctx, r.pool, r.queries, fn)
}

func withTransaction(
	ctx context.Context,
	db *pgxpool.Pool,
	queries *dbs.Queries,
	fn func(queries *dbs.Queries) error,
) (err error) {
	tx, err := db.BeginTx(ctx, pgx.TxOptions{})
	if err != nil {
		return
	}

	defer func() {
		if p := recover(); p != nil {
			// a panic occurred, rollback and repanic
			tx.Rollback(ctx)

			panic(p)
		} else if err != nil {
			// something went wrong, rollback
			tx.Rollback(ctx)
		} else {
			// all good, commit
			err = tx.Commit(ctx)
		}
	}()

	err = fn(queries.WithTx(tx))

	return err
}
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── internal
 │   └── storage
 │       └── postgres
+│           ├── database.go
 │           ├── dbs
 │           │   ├── db.go
 │           │   └── user_online.sql.go
 │           └── queries
 │               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
 └── sqlc.yaml

Тестування

Отже, ми маємо чотири функції у згенерованому файлі ./internal/storage/postgres/dbs/user_online.go:
  • UserOnlineUpsert
  • UserOnlineUpdate
  • UserOnlineUnnestUpsert
  • UserOnlineUnnestUpdate
package dbs

import (
	"context"

	"github.com/jackc/pgx/v5/pgtype"
)

// ...

type UserOnlineUpsertParams struct {
	UserID int64
	Online pgtype.Timestamp
}

func (q *Queries) UserOnlineUpsert(ctx context.Context, arg UserOnlineUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpsert, arg.UserID, arg.Online)
	return err
}

// ...

type UserOnlineUpdateParams struct {
	Online pgtype.Timestamp
	UserID int64
}

func (q *Queries) UserOnlineUpdate(ctx context.Context, arg UserOnlineUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUpdate, arg.Online, arg.UserID)
	return err
}

// ...

type UserOnlineUnnestUpsertParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpsert(ctx context.Context, arg UserOnlineUnnestUpsertParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpsert, arg.UserIds, arg.Onlines)
	return err
}

// ...

type UserOnlineUnnestUpdateParams struct {
	UserIds []int64
	Onlines []pgtype.Timestamp
}

func (q *Queries) UserOnlineUnnestUpdate(ctx context.Context, arg UserOnlineUnnestUpdateParams) error {
	_, err := q.db.Exec(ctx, userOnlineUnnestUpdate, arg.UserIds, arg.Onlines)
	return err
}

Перед тестуванням на швидкодію треба перевірити, що функції працюють коректно.
Функції мають різні сигнатури, тому адаптуємо їх до одного інтерфейсу OnlineStorage, який й покриємо тестом:

package main

import (
	"context"
)

type OnlineStorage interface {
	BatchStore(ctx context.Context, pairs []UserOnlinePair) error
}
package main

type UserOnlinePair struct {
	UserID    int64
	Timestamp int64
}
package main

import (
	"context"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func testOnlineStorage(
	t *testing.T,
	storage OnlineStorage,
) {
	t.Helper()

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	var (
		pair1v1 = UserOnlinePair{
			UserID:    1,
			Timestamp: 2e3,
		}
		pair2v1 = UserOnlinePair{
			UserID:    2,
			Timestamp: 3e4,
		}
		pair3v1 = UserOnlinePair{
			UserID:    3,
			Timestamp: 4e5,
		}
		pair4v1 = UserOnlinePair{
			UserID:    4,
			Timestamp: 5e6,
		}
		pair5v1 = UserOnlinePair{
			UserID:    5,
			Timestamp: 6e7,
		}

		pair2v2 = UserOnlinePair{
			UserID:    pair2v1.UserID,
			Timestamp: pair2v1.Timestamp + 10001,
		}
		pair3v2 = UserOnlinePair{
			UserID:    pair3v1.UserID,
			Timestamp: pair3v1.Timestamp + 10002,
		}
	)

	truncateOnline(t, ctx, pool)
	insertOnline(t, ctx, pool, []UserOnlinePair{
		pair1v1,
		pair2v1,
		pair3v1,
		pair4v1,
		pair5v1,
	})

	err = storage.BatchStore(ctx, []UserOnlinePair{
		pair2v2,
		pair3v2,
	})
	require.NoError(t, err)

	expectedOnline(t, ctx, pool, []UserOnlinePair{
		pair1v1,
		pair2v2,
		pair3v2,
		pair4v1,
		pair5v1,
	})
}

Тепер інтерфейс OnlineStorage потрібно реалізувати, зробимо це для UserOnlineUpsert та UserOnlineUnnestUpsert.
Для UserOnlineUpsert:

package main

import (
	"context"
	"time"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/jackc/pgx/v5/pgtype"
)

type TxLoopUpsertOnlineStorage struct {
	db *postgres.Database
}

func NewTxLoopUpsertOnlineStorage(db *postgres.Database) *TxLoopUpsertOnlineStorage {
	return &TxLoopUpsertOnlineStorage{db: db}
}

func (s *TxLoopUpsertOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	return s.db.WithTransaction(ctx, func(queries *dbs.Queries) error {
		for _, pair := range pairs {
			err := queries.UserOnlineUpsert(ctx, dbs.UserOnlineUpsertParams{
				UserID: pair.UserID,
				Online: pgtype.Timestamp{
					Time:  time.Unix(pair.Timestamp, 0).UTC(),
					Valid: true,
				},
			})
			if err != nil {
				return err
			}
		}

		return nil
	})
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func TestTxLoopUpsertOnlineStorage(t *testing.T) {
	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	storage := NewTxLoopUpsertOnlineStorage(postgres.NewDatabase(pool))

	testOnlineStorage(t, storage)
}

Для UserOnlineUnnestUpsert:

package main

import (
	"context"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"
)

type UnnestUpsertOnlineStorage struct {
	db *postgres.Database
}

func NewUnnestUpsertOnlineStorage(db *postgres.Database) *UnnestUpsertOnlineStorage {
	return &UnnestUpsertOnlineStorage{db: db}
}

func (s *UnnestUpsertOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	userIDs, timestamps := userOnlinePairsToPgxSlices(pairs)

	return s.db.Queries().UserOnlineUnnestUpsert(ctx, dbs.UserOnlineUnnestUpsertParams{
		UserIds: userIDs,
		Onlines: timestamps,
	})
}
package main

import (
	"time"

	"github.com/jackc/pgx/v5/pgtype"
)

func userOnlinePairsToPgxSlices(pairs []UserOnlinePair) ([]int64, []pgtype.Timestamp) {
	var (
		userIDs    = make([]int64, len(pairs))
		timestamps = make([]pgtype.Timestamp, len(pairs))
	)

	for i, pair := range pairs {
		userIDs[i] = pair.UserID
		timestamps[i] = pgtype.Timestamp{
			Time:  time.Unix(pair.Timestamp, 0).UTC(),
			Valid: true,
		}
	}

	return userIDs, timestamps
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func TestUnnestUpsertOnlineStorage(t *testing.T) {
	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(t, err)
	defer pool.Close()

	storage := NewUnnestUpsertOnlineStorage(postgres.NewDatabase(pool))

	testOnlineStorage(t, storage)
}

Для UserOnlineUpdate та UserOnlineUnnestUpdate схожа реалізація інтерфейсу OnlineStorage.

Тепер можемо запустити тести:

go-test:
	docker exec research-online-postgres-go-app go test ./... -v -count=1
make go-test
=== RUN   TestTxLoopUpdateOnlineStorage
--- PASS: TestTxLoopUpdateOnlineStorage (0.01s)
=== RUN   TestTxLoopUpsertOnlineStorage
--- PASS: TestTxLoopUpsertOnlineStorage (0.01s)
=== RUN   TestUnnestUpdateOnlineStorage
--- PASS: TestUnnestUpdateOnlineStorage (0.01s)
=== RUN   TestUnnestUpsertOnlineStorage
--- PASS: TestUnnestUpsertOnlineStorage (0.01s)
PASS
ok  	github.com/doutivity/research-online-postgres-go	0.081s
tree .
 ├── docker-compose.yml
 ├── go.mod
 ├── internal
 │   └── storage
 │       └── postgres
 │           ├── database.go
 │           ├── dbs
 │           │   ├── db.go
 │           │   └── user_online.sql.go
 │           └── queries
 │               └── user_online.sql
 ├── Makefile
 ├── migrations
 │   └── 20230807225910_user_online.sql
+├── online_storage.go
+├── online_storage_test.go
 ├── sqlc.yaml
+├── tx_loop_update_online_storage.go
+├── tx_loop_update_online_storage_test.go
+├── tx_loop_upsert_online_storage.go
+├── tx_loop_upsert_online_storage_test.go
+├── unnest_update_online_storage.go
+├── unnest_update_online_storage_test.go
+├── unnest_upsert_online_storage.go
+├── unnest_upsert_online_storage_test.go
+└── user_online_pair.go

Тестування варіантів на швидкодію

Тепер, коли ми впевнені, що варіанти працюють коректно, можемо протестувати швидкодію. Для цього будемо вставляти пачками по 1000 записів:
package main

import (
	"context"
	"sync/atomic"
	"testing"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func benchmarkOnlineStorage(
	b *testing.B,
	storage OnlineStorage,
) {
	b.Helper()

	const (
		batch  = 1000
		online = 1679800725
	)

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(b, err)
	defer pool.Close()

	truncateOnline(b, ctx, pool)
	generateOnline(b, ctx, pool, int64(b.N*batch), online)

	var (
		startTimestamp = time.Now().Unix()
		counter        = int64(0)
	)

	pairs := make([]UserOnlinePair, batch)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		index := atomic.AddInt64(&counter, batch)

		for j := 0; j < batch; j++ {
			pairs[j] = UserOnlinePair{
				UserID:    int64(i*batch + j + 1), // 0 .. 99, 100 .. 199
				Timestamp: startTimestamp + index,
			}
		}

		err := storage.BatchStore(ctx, pairs)

		require.NoError(b, err)
	}
	b.StopTimer()

	expectedOnlineChangedCount(b, ctx, pool, int64(b.N*batch), online)
}
package main

import (
	"context"
	"testing"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/stretchr/testify/require"
)

func BenchmarkUnnestUpdateOnlineStorage(b *testing.B) {
	if testing.Short() {
		b.Skip()
	}

	ctx := context.Background()

	pool, err := pgxpool.New(ctx, dataSourceName)
	require.NoError(b, err)
	defer pool.Close()

	storage := NewUnnestUpdateOnlineStorage(postgres.NewDatabase(pool))

	benchmarkOnlineStorage(b, storage)
}
go-bench:
	mkdir -p ./output/

	# ... -bench='TxLoopUpsert'
	# ... -bench='TxLoopUpdate'
	# ... -bench='UnnestUpsert'
	# ... -bench='UnnestUpdate'

	docker exec research-online-postgres-go-app \
		go test ./... -v -run=$$^ -bench='UnnestUpdate' -benchmem -benchtime=1000x -count=10 \
		| tee ./output/bench-go-1000x-unnest-update.txt
make go-bench

Результати:

Namens/opB/opallocs/op
TxLoopUpdate20.45ms ± 2%160 1355 005
TxLoopUpsert26.59ms ± 24%168 1355 005
UnnestUpdate3.785ms ± 4%234 9852 028
UnnestUpsert4.235ms ± 6%234 9852 028

Ще варіанти

Postgres-драйвер github.com/jackc/pgx має додаткову можливість вставки пачками :batchexec, тому протестуємо ще пару варіантів на швидкодію:
-- name: UserOnlineBatchExecUpsert :batchexec
INSERT INTO user_online (user_id, online)
VALUES (@user_id, @online)
ON CONFLICT (user_id) DO UPDATE
    SET online = excluded.online;

-- name: UserOnlineBatchExecUpdate :batchexec
UPDATE user_online
SET online = @online
WHERE user_id = @user_id;
cat batch_exec_update_online_storage.go
package main

import (
	"context"
	"time"

	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres"
	"github.com/doutivity/research-online-postgres-go/internal/storage/postgres/dbs"

	"github.com/hashicorp/go-multierror"
	"github.com/jackc/pgx/v5/pgtype"
)

type BatchExecUpdateOnlineStorage struct {
	db *postgres.Database
}

func NewBatchExecUpdateOnlineStorage(db *postgres.Database) *BatchExecUpdateOnlineStorage {
	return &BatchExecUpdateOnlineStorage{db: db}
}

func (s *BatchExecUpdateOnlineStorage) BatchStore(ctx context.Context, pairs []UserOnlinePair) error {
	args := make([]dbs.UserOnlineBatchExecUpdateParams, len(pairs))
	for i, pair := range pairs {
		args[i] = dbs.UserOnlineBatchExecUpdateParams{
			Online: pgtype.Timestamp{
				Time:  time.Unix(pair.Timestamp, 0).UTC(),
				Valid: true,
			},
			UserID: pair.UserID,
		}
	}

	var batchErr error

	s.db.Queries().UserOnlineBatchExecUpdate(ctx, args).Exec(func(i int, err error) {
		if err != nil {
			batchErr = multierror.Append(batchErr, err)
		}
	})

	return batchErr
}

Результати:

Namens/opB/opallocs/op
TxLoopUpdate20.45ms ± 2%160 1355 005
TxLoopUpsert26.59ms ± 24%168 1355 005
UnnestUpdate3.785ms ± 4%234 9852 028
UnnestUpsert4.235ms ± 6%234 9852 028
BatchExecUpdate7.044ms ± 1%495 3155 032
BatchExecUpsert7.004ms ± 8%503 3165 032

ReadyToTouch

Описане збереження онлайну вже використовується на сайті readytotouch.com.

Заради цікавості протестував на різних VPS з Intel та AMD, всі результати в репозиторії.

👍ПодобаєтьсяСподобалось16
До обраногоВ обраному1
LinkedIn

17 коментарів

Підписатись на коментаріВідписатись від коментарів Коментарі можуть залишати тільки користувачі з підтвердженими акаунтами.
UPDATE user_online AS to_t
SET online = from_t.online
FROM (
SELECT unnest(ARRAY[9, 10]) AS user_id,
unnest(ARRAY[TIMESTAMP ’2023-08-07 15:09:00′, TIMESTAMP ’2023-08-07 15:10:00′]) AS online
) AS from_t
WHERE to_t.user_id = from_t.user_id;

На моє розуміння більш природньо записати це у такому вигляді

UPDATE user_online AS to_t
SET online = from_t.online
FROM (
            SELECT *
               FROM unnest(ARRAY[9, 10],  ARRAY[TIMESTAMP '2023-08-07 15:09:00', TIMESTAMP '2023-08-07 15:10:00']) AS t(user_id, online)
    ) AS from_t
WHERE to_t.user_id = from_t.user_id;

Такий варіант справді виглядає простіше

UPDATE user_online AS to_t
SET online = from_t.online
FROM unnest(ARRAY [9, 10],
            ARRAY [TIMESTAMP '2023-08-07 15:09:00', TIMESTAMP '2023-08-07 15:10:00']) AS from_t (user_id, online)
WHERE to_t.user_id = from_t.user_id;
-- name: UserOnlineUnnestUpdate :exec
UPDATE user_online AS to_t
SET online = from_t.online
FROM unnest(@user_ids::BIGINT[],
            @onlines::TIMESTAMP[]) AS from_t (user_id, online)
WHERE to_t.user_id = from_t.user_id;

Але при спробі згенерувати код отримую помилку:

function unnest(unknown, unknown) does not exist

В github.com/sqlc-dev/sqlc поки відсутня повна підтримка unnest тому доводиться трохи ускладнювати

Ваш запит відрізняється від мого, зверніть увагу. Фактично у вашому Update виконується підзапит

SELECT unnest(ARRAY[...], ARRAY[...])

При такому використанні unnest може мати тільки 1 аргумент. В моєму кейсі підзапит виглядає так

SELECT *
FROM unnest(ARRAY[...], ARRAY[...]) AS t(user_id, online)

При такому використанні unnest може мати більше 1 аргумента

UNNEST працює як треба у всіх прикладах, це sqlc відстає

Цікава стаття. Ще було б цікаво глянути якісь тести на оптимальну кількість рядочків в одному батчі

Це ж ще залежить від розміру самого рядка тому подібні тести варто проводити вже з реальними даними

Саме цікаве у цих перформанс тестах — що буде у хайлі конкарент середі, коли ті самі строки оновлюються іншими сессіями.

Наприклад, у тій ж Postgre/mssql/oracle є merge statement, який теж дуже швидко робить балк інсерт чи апдейт. Але швидко якщо нема конкаренсі, а якщо є, то ой як не швидко :)

Очікую, що для TxLoop, Unnest та BatchExec буде схоже співвідношення при паралельному виконанні

Якщо вас цікавлять конкретні результати то є репозиторій github.com/doutivity/research-online-postgres-go для експериментів

топчик дякую ❤️ це прям раз в 5 можна прискорити апдейти багато де

на Кложе щоб це глобально на проект в 200к зробити потрібно раз в 10 меньше коду, лол 😊

Зробіть технічну частину й я оформлю у статтю

Теоретично можна заробити за написання технічних статей англійською мовою на Vultr Introduction to Vultr Creator Program, але щось вони ще налаштовують

Ніколи не чув про unnest, дякс, бенчкарки з деталями імплементації — топчик! поділився з колегами :) Thanks

Ех, premature optimization detected

Стаття кльова

Підписатись на коментарі