Go ClickHouse example. Зберігаємо статистику

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Привіт, мене звати Ярослав, прийшовши в компанію Evrius, отримав завдання: розробити gRPC-мікросервіс на Go для збереження статистики в ClickHouse. Через півроку написав статтю про gRPC, логічно ж через пару років написати і про ClickHouse.

Стаття буде про швидкість збереження в ClickHouse та організацію процесу збереження. Стаття класичної структури від створення проєкту з docker-compose до посилання на код репозиторію.

Ілюстрація Марії Рибак

Опис задачі

Уявімо, що ми почали працювати над ще одним проєктом анонімного пошуку роботи (поки всі домени yeews.com, yaaws.com, yaaws.co, yaaws.io та yaaws.ai доступні для купівлі), такі проєкти зараз популярні (yaaws is yet another anonym work search).

І нам треба зберігати в статистику різноманітні події:

  • Онлайн.
  • Перегляд вакансії.
  • Відгук на вакансію.

Саме цим будемо далі займатись в статті. Ми розглянемо саме збереження подій, але щоб побачити повну картину, треба пофантазувати, яку статистику буде цікаво побачити користувачам.

На основі подій онлайну можна вивести графік активності по годинам: одна крива лінія по онлайну кандидатів та майже пряма лінія по онлайну рекрутерів. Буде загальний графік онлайну для України та для кожного міста, включно з Ялтою.

Налаштування проєкту

Я створю Go проєкт, заверну в docker, підключу ClickHouse та напишу простий тест збереження в БД.

Для створення Go проєкту є команда go mod init:

go mod init gitlab.com/go-yp/go-clickhouse-example
go: creating new go.mod: module gitlab.com/go-yp/go-clickhouse-example
tree .
└── go.mod
cat go.mod
module gitlab.com/go-yp/go-clickhouse-example

go 1.16

Ще додам файл main.go:

package main

import "fmt"

func main() {
	fmt.Println("Привіт, світе!")
}
Тепер підготую docker-compose.yml який буде мати опис контейнеру з Go та опис контейнеру з ClickHouse:
version: "2"

services:
  app:
    container_name: "yaaws_app"
    image: golang:1.16.1-alpine
    volumes:
      - .:/go/src/gitlab.com/go-yp/go-clickhouse-example
      - ./docker/golang/.ash_history:/root/.ash_history:ro
      - go-modules:/go/pkg/mod # Put modules cache into a separate volume
    command: "sleep 365d"
    working_dir: "/go/src/gitlab.com/go-yp/go-clickhouse-example"
    env_file:
      - .env
    links:
      - clickhouse
    depends_on:
      - clickhouse

  clickhouse:
    container_name: "yaaws_clickhouse"
    image: yandex/clickhouse-server:21.7.5.29
#    ports:
#      - "8123:8123"
#      - "9000:9000"

volumes:
  go-modules: # Define the volume
Тепер підніму контейнери та перевірю їх готовність до роботи:
sudo docker-compose up -d
sudo docker exec yaaws_app go run main.go
Привіт, світе!
sudo docker exec yaaws_clickhouse clickhouse-client --query "SELECT VERSION();"
21.7.5.29
sudo docker-compose down

Як бачимо, команди успішно виконались в контейнерах та вивели очікувані результати.

Простий тест для збереження в ClickHouse

Я підключу до проєкту нову офіційну Go-бібліотеку для роботи з ClickHouse github.com/ClickHouse/clickhouse-go, в тесті створю табличку для збереження онлайну. Буду зберігати онлайн пачками по 10_000 записів, перевірю, що в базі правильне число записів.

Коли я тільки починав розробляти свій перший мікросервіс на Go, ще на початку 2019 року, то підключив стару офіційну Go бібліотеку від ClickHouse, яка працювала дуже повільно: з того, що пам’ятаю, то бенчмарки показували швидкість вставки 10_000 записів в секунду (вставляло 5 пачок по 2000 в кожній), що мало. Поспілкувавшись з колегою, дізнався, що в інших мікросервісах використовується бібліотека github.com/kshvakov/clickhouse, яка виявилась значно продуктивнішою.

В ClickHouse прийняли правильне рішення та зробили офіційною github.com/kshvakov/clickhouse, а тому й ми будемо використовувати нову офіційну бібліотеку.

Опис таблиці для збереження онлайну:

CREATE TABLE user_online_source_statistics
(
    date    Date,
    time    DateTime,
    user_id UInt32,
    role    UInt8 -- employee = 1, employer = 2
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY user_id;
Рушій MergeTree використовується для збереження сирої статистики.
Очищення таблиці в коді:
package main

import "database/sql"

const (
	// language=SQL
	createTableSQL = `CREATE TABLE user_online_source_statistics
(
    date    Date,
    time    DateTime,
    user_id UInt32,
    role    UInt8
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY user_id;`

	// language=SQL
	dropTableSQL = `DROP TABLE IF EXISTS user_online_source_statistics;`
)

func resetTable(connect *sql.DB) error {
	if _, err := connect.Exec(dropTableSQL); err != nil {
		return err
	}

	if _, err := connect.Exec(createTableSQL); err != nil {
		return err
	}

	return nil
}
Створення фікстур:
type UserOnlineEvent struct {
	Time   int32
	UserID uint32
	Role   uint8
}
package main

import "time"

func userOnlineEventFixtures(count int) []UserOnlineEvent {
	var result = make([]UserOnlineEvent, count)

	var now = int32(time.Now().Unix())

	for i := 0; i < count; i++ {
		result[i] = UserOnlineEvent{
			Time:   now,
			UserID: uint32(i),
			Role:   role(i),
		}
	}

	return result
}

func role(index int) uint8 {
	const (
		employee = 1
		employer = 2
	)

	if index&1 == 1 {
		return employee
	}

	return employer
}

Функція для збереження статистики в БД:

package main

import (
	"database/sql"
	"fmt"
	"log"
)

const (
	// language=SQL
	insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role)
VALUES (?, ?, ?, ?);`
)

func StoreUserOnlineStatistics(connect *sql.DB, users []UserOnlineEvent) (err error) {
	return execTx(connect, func(tx *sql.Tx) error {
		stmt, stmtErr := tx.Prepare(insertSQL)
		if stmtErr != nil {
			return stmtErr
		}
		defer stmt.Close()

		for _, user := range users {
			_, insertErr := stmt.Exec(
				user.Time,
				user.Time,
				user.UserID,
				user.Role,
			)

			if insertErr != nil {
				log.Printf("insert err %+v", insertErr)

				continue
			}
		}

		return nil
	})
}

func execTx(connect *sql.DB, fn func(*sql.Tx) error) error {
	var tx, txErr = connect.Begin()
	if txErr != nil {
		return txErr
	}

	var err = fn(tx)

	if err != nil {
		if rbErr := tx.Rollback(); rbErr != nil {
			return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr)
		}
		return err
	}

	return tx.Commit()
}

А тепер використання цього всього в бенчмарку:

package main

import (
	"database/sql"
	_ "github.com/ClickHouse/clickhouse-go"
	"sync/atomic"
	"testing"
	"time"
)

func BenchmarkUserOnlineStatisticsStore(b *testing.B) {
	const (
		packSize = 10000
	)

	var startTime = time.Now()

	connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws")
	if err != nil {
		b.Fatal(err)
	}

	if err := connect.Ping(); err != nil {
		b.Fatal(err)
	}

	if err := resetTable(connect); err != nil {
		b.Fatal(err)
	}

	var fixtures = userOnlineEventFixtures(packSize)

	var count uint32 = 0

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var storeErr = StoreUserOnlineStatistics(connect, fixtures)
			if storeErr != nil {
				b.Error(storeErr)

				continue
			}

			atomic.AddUint32(&count, 1)
		}
	})

	b.Logf("inserted count %10d by %6d %s", count*packSize, count, time.Since(startTime))
}
sudo docker exec yaaws_clickhouse clickhouse-client --query "CREATE DATABASE yaaws;"

Усе готово до запуску бенчмарка, запускаю на 100 секунд:

sudo docker exec yaaws_app go test ./examples/001-clickhouse-user-online-statistics-benchmark/... -v -bench=. -benchmem -benchtime=100s
goos: linux
goarch: amd64
pkg: gitlab.com/go-yp/go-clickhouse-example/examples/001-clickhouse-user-online-statistics-benchmark
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkUserOnlineStatisticsStore
    user_online_statistics_test.go:49: inserted count      10000 by      1 3.756016629s
    user_online_statistics_test.go:49: inserted count    1000000 by    100 250.611035ms
    user_online_statistics_test.go:49: inserted count  100000000 by  10000 22.209494737s
    user_online_statistics_test.go:49: inserted count  540960000 by  54096 2m11.718269866s
BenchmarkUserOnlineStatisticsStore-12    	   54096	   2420397 ns/op	 2850018 B/op	   69824 allocs/op
PASS
ok  	gitlab.com/go-yp/go-clickhouse-example/examples/001-clickhouse-user-online-statistics-benchmark	157.961s
sudo docker exec yaaws_clickhouse clickhouse-client --query "SELECT COUNT() FROM yaaws.user_online_source_statistics;"
540_960_000

Тест показав швидкість вставки 4_000_000 записів в секунду.

Якщо у вас є зауваження до тесту чи побажання, то пишіть в коментарях, оновлю та додам нові результати, так вже робив в попередніх статтях. Або вам цікаво, скільки займає таблиця зараз, або скільки буде займати, якщо усі user_id будуть унікальні, то теж таке питайте в коментарях, поспілкуймось!

Організація процесу збереження

В ClickHouse ефективно зберігати пачками тому перед збереженням події потрібно накопичувати в буфері. Проста реалізація буферу:

package main

type UserOnlineEvent struct {
	Time   int32
	UserID uint32
	Role   uint8
}
package main

import "sync"

const (
	packSize = 10000
)

type UserOnlineEventBuffer struct {
	current       []UserOnlineEvent
	complete      [][]UserOnlineEvent
	currentLength int
	mu            sync.Mutex
}

func NewUserOnlineEventBuffer() *UserOnlineEventBuffer {
	return &UserOnlineEventBuffer{}
}

func (b *UserOnlineEventBuffer) Add(event UserOnlineEvent) {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.current = append(b.current, event)
	b.currentLength += 1

	if b.isCurrentPackFull() {
		b.completeCurrentPack()
	}
}

func (b *UserOnlineEventBuffer) GetAndClear() [][]UserOnlineEvent {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.completeCurrentPack()

	var result = b.complete
	b.complete = nil
	return result
}

func (b *UserOnlineEventBuffer) isCurrentPackFull() bool {
	return b.currentLength >= packSize
}

func (b *UserOnlineEventBuffer) completeCurrentPack() {
	if b.currentLength > 0 {
		b.complete = append(b.complete, b.current)

		b.current = make([]UserOnlineEvent, 0, packSize)

		b.currentLength = 0
	}
}
package main

import (
	"github.com/stretchr/testify/require"
	"sync/atomic"
	"testing"
)

func BenchmarkUserOnlineEventBuffer(b *testing.B) {
	var buffer = NewUserOnlineEventBuffer()

	var expectedCount uint32 = 0

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var i = atomic.AddUint32(&expectedCount, 1)

			buffer.Add(UserOnlineEvent{
				Time:   int32(i),
				UserID: uint32(i),
				Role:   uint8(i % 2),
			})
		}
	})

	var (
		packs       = buffer.GetAndClear()
		actualCount = 0
	)

	for _, pack := range packs {
		actualCount += len(pack)
	}

	require.Equal(b, b.N, actualCount)
}
go test ./examples/002-buffer/... -v -bench=. -benchmem
BenchmarkUserOnlineEventBuffer
BenchmarkUserOnlineEventBuffer-12    	17010046	        68.42 ns/op	      12 B/op	       0 allocs/op
PASS
ok  	gitlab.com/go-yp/go-clickhouse-example/examples/002-buffer	1.248s

Щоб доповнити картину, потрібен компонент Storage, який буде зберігати в БД, а також компонент Worker, який буде щохвилини брати дані з буферу та передавати в Storage.

Storage вже бачили раніше:

package main

import (
	"database/sql"
	"fmt"
	"log"
)

type UserOnlineEventStorage struct {
	db *sql.DB
}

func NewUserOnlineEventStorage(db *sql.DB) *UserOnlineEventStorage {
	return &UserOnlineEventStorage{db: db}
}

func (s *UserOnlineEventStorage) Store(events []UserOnlineEvent) (err error) {
	const (
		// language=SQL
		insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role)
VALUES (?, ?, ?, ?);`
	)

	return execTx(s.db, func(tx *sql.Tx) error {
		stmt, stmtErr := tx.Prepare(insertSQL)
		if stmtErr != nil {
			return stmtErr
		}
		defer stmt.Close()

		for _, user := range events {
			_, insertErr := stmt.Exec(
				user.Time,
				user.Time,
				user.UserID,
				user.Role,
			)

			if insertErr != nil {
				log.Printf("insert err %+v", insertErr)

				continue
			}
		}

		return nil
	})
}

Worker буде складнішим:

package main

import (
	"sync"
	"time"
)

type UserOnlineEventWorker struct {
	buffer         *UserOnlineEventBuffer
	storage        *UserOnlineEventStorage
	flushInterval  time.Duration
	mu             sync.Mutex
	shutdownSignal chan struct{}
	shutdown       bool
}

func NewUserOnlineEventWorker(buffer *UserOnlineEventBuffer, storage *UserOnlineEventStorage, flushInterval time.Duration) *UserOnlineEventWorker {
	var worker = &UserOnlineEventWorker{
		buffer:         buffer,
		storage:        storage,
		flushInterval:  flushInterval,
		shutdownSignal: make(chan struct{}),
	}

	go worker.run()

	return worker
}

// FlushAndClose
func (w *UserOnlineEventWorker) Shutdown() {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.shutdown {
		w.store()

		return
	}

	w.shutdown = true

	w.shutdownSignal <- struct{}{}
	<-w.shutdownSignal

	close(w.shutdownSignal)

	w.store()
}

func (w *UserOnlineEventWorker) run() {
	ticker := time.NewTicker(w.flushInterval)

	for {
		select {
		case <-ticker.C:
			w.store()

		case <-w.shutdownSignal:
			ticker.Stop()

			w.shutdownSignal <- struct{}{}

			return
		}
	}
}

func (w *UserOnlineEventWorker) store() {
	packs := w.buffer.GetAndClear()

	if len(packs) == 0 {
		return
	}

	for _, pack := range packs {
		err := w.storage.Store(pack)

		if err != nil {
			// log error
		}
	}
}

А тепер об’єднаємо всі компоненти разом:

package main

import (
	"database/sql"
	_ "github.com/ClickHouse/clickhouse-go"
	"sync/atomic"
	"testing"
	"time"
)

func BenchmarkUserOnlineEvent(b *testing.B) {
	var startTime = time.Now()

	connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws")
	if err != nil {
		b.Fatal(err)
	}

	// ...

	var (
		buffer  = NewUserOnlineEventBuffer()
		storage = NewUserOnlineEventStorage(connect)
		worker  = NewUserOnlineEventWorker(buffer, storage, time.Second)
	)

	var counter = uint32(0)

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var i = atomic.AddUint32(&counter, 1)

			buffer.Add(UserOnlineEvent{
				Time:   int32(i),
				UserID: uint32(i),
				Role:   uint8(i % 2),
			})
		}
	})

	// for graceful shutdown
	worker.Shutdown()

	b.Logf("inserted count %10d %s", b.N, time.Since(startTime))
}
go test ./examples/002-buffer/... -v -bench=BenchmarkUserOnlineEvent$ -benchmem -benchtime=50s
BenchmarkUserOnlineEvent
    user_online_event_test.go:50: inserted count          1 302.102019ms
    user_online_event_test.go:50: inserted count        100 54.727407ms
    user_online_event_test.go:50: inserted count      10000 63.394769ms
    user_online_event_test.go:50: inserted count    1000000 1.55245884s
    user_online_event_test.go:50: inserted count   39230679 36.666568388s
    user_online_event_test.go:50: inserted count   64229607 58.031452997s
BenchmarkUserOnlineEvent-12          	64229607	       900.9 ns/op	     296 B/op	       7 allocs/op
PASS
ok  	gitlab.com/go-yp/go-clickhouse-example/examples/002-buffer	161.734s

Організація процесу збереження, масштабування

В попередньому прикладі ми працювали з конкретним типом, але для кожного типу події потрібно буде написати свій Buffer, Storage та Worker, що буде дублюванням коду, бо поки в Go відсутні дженерики.

Щоб повторно використовувати написану логіку (без дублювання коду), можна винести відмінності в інтерфейс. Для цього розглянемо метод збереження в БД:

func (s *UserOnlineEventStorage) Store(events []UserOnlineEvent) (err error) {
	const (
		// language=SQL
		insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role)
VALUES (?, ?, ?, ?);`
	)

	return execTx(s.db, func(tx *sql.Tx) error {
		stmt, stmtErr := tx.Prepare(insertSQL)
		if stmtErr != nil {
			return stmtErr
		}
		defer stmt.Close()

		for _, user := range events {
			_, insertErr := stmt.Exec(
				user.Time,
				user.Time,
				user.UserID,
				user.Role,
			)

			if insertErr != nil {
				log.Printf("insert err %+v", insertErr)

				continue
			}
		}

		return nil
	})
}

Щоб зробити метод універсальним, потрібно винести SQL та поля в інтерфейс:

type Entity interface {
	Query() string
	Values() []interface{}
}
type UserOnlineEventEntity struct {
	UserOnlineEvent
}

func (e UserOnlineEventEntity) Query() string {
	const (
		// language=SQL
		insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role)
VALUES (?, ?, ?, ?);`
	)

	return insertSQL
}

func (e *UserOnlineEventEntity) Values() []interface{} {
	return []interface{}{
		e.Time,
		e.Time,
		e.UserID,
		e.Role,
	}
}
type Storage struct {
	db *sql.DB
}

func NewStorage(db *sql.DB) *Storage {
	return &Storage{db: db}
}

func (s *Storage) Store(entities []Entity) (err error) {
	if len(entities) == 0 {
		return nil
	}

	var (
		entity    = entities[0]
		insertSQL = entity.Query()
	)

	return execTx(s.db, func(tx *sql.Tx) error {
		stmt, stmtErr := tx.Prepare(insertSQL)
		if stmtErr != nil {
			return stmtErr
		}
		defer stmt.Close()

		for _, entity := range entities {
			_, insertErr := stmt.Exec(entity.Values()...)

			if insertErr != nil {
				log.Printf("insert err %+v", insertErr)

				continue
			}
		}

		return nil
	})
}

В Buffer-і та Worker-і зміню конкретний тип на інтерфейс Entity та запущу тест:

func BenchmarkUserOnlineEvent(b *testing.B) {
	var startTime = time.Now()

	connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws")
	if err != nil {
		b.Fatal(err)
	}

	// ...

	var (
		buffer  = NewBuffer()
		storage = NewStorage(connect)
		worker  = NewWorker(buffer, storage, time.Second)
	)

	var counter = uint32(0)

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var i = atomic.AddUint32(&counter, 1)

			var event = UserOnlineEvent{
				Time:   int32(i),
				UserID: uint32(i),
				Role:   uint8(i % 2),
			}

			buffer.Add(&UserOnlineEventEntity{event})
		}
	})

	// for graceful shutdown
	worker.Shutdown()

	b.Logf("inserted count %10d %s", b.N, time.Since(startTime))
}
go test ./examples/003-entity-interface/... -v -bench=BenchmarkUserOnlineEvent$ -benchmem -benchtime=50s
BenchmarkUserOnlineEvent
    user_online_event_test.go:53: inserted count          1 24.790717ms
    user_online_event_test.go:53: inserted count        100 16.887622ms
    user_online_event_test.go:53: inserted count      10000 49.692264ms
    user_online_event_test.go:53: inserted count    1000000 1.001504459s
    user_online_event_test.go:53: inserted count   60448701 1m0.404778045s
BenchmarkUserOnlineEvent-12    	60448701	       998.9 ns/op	     380 B/op	       9 allocs/op
PASS
ok  	gitlab.com/go-yp/go-clickhouse-example/examples/003-entity-interface	61.667s

Якщо у вас є повний доступ до коду, то методи Query та Values інтерфейсу Entity краще додати до структури UserOnlineEvent напряму, без обгорток.

Більше подій

Розглянемо події перегляд вакансії та відгук на вакансію як приклад повторного використання коду:

CREATE TABLE user_vacancy_view_source_stats
(
    date       Date,
    time       DateTime,
    user_id    UInt32,
    vacancy_id UInt32
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY tuple();
CREATE TABLE user_vacancy_apply_source_stats
(
    date       Date,
    time       DateTime,
    user_id    UInt32,
    vacancy_id UInt32
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY tuple();

З ORDER BY tuple() вставка в таблицю буде без додаткового сортування, що пришвидшить вставку в порівнянні з ORDER BY user_id чи ORDER BY (user_id, vacancy_id).

package events

type VacancyViewEvent struct {
	Time      int32
	UserID    uint32
	VacancyID uint32
}
package entities

import "gitlab.com/go-yp/go-clickhouse-example/examples/004-more-events/events"

type VacancyViewEventEntity struct {
	events.VacancyViewEvent
}

func (e VacancyViewEventEntity) Query() string {
	const (
		// language=SQL
		insertSQL = `INSERT INTO user_vacancy_view_source_stats (date, time, user_id, vacancy_id)
VALUES (?, ?, ?, ?);`
	)

	return insertSQL
}

func (e *VacancyViewEventEntity) Values() []interface{} {
	return []interface{}{
		e.Time,
		e.Time,
		e.UserID,
		e.VacancyID,
	}
}
func BenchmarkEvents(b *testing.B) {
	var startTime = time.Now()

	connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws")
	if err != nil {
		b.Fatal(err)
	}

	// ...

	if err := userOnlineResetTable(connect); err != nil {
		b.Fatal(err)
	}

	if err := vacancyViewResetTable(connect); err != nil {
		b.Fatal(err)
	}

	if err := vacancyApplyResetTable(connect); err != nil {
		b.Fatal(err)
	}

	var (
		storage            = NewStorage(connect)
		userOnlineBuffer   = NewBuffer()
		userOnlineWorker   = NewWorker(userOnlineBuffer, storage, time.Second)
		vacancyViewBuffer  = NewBuffer()
		vacancyViewWorker  = NewWorker(vacancyViewBuffer, storage, time.Second)
		vacancyApplyBuffer = NewBuffer()
		vacancyApplyWorker = NewWorker(vacancyApplyBuffer, storage, time.Second)
	)

	var counter = uint32(0)

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var i = atomic.AddUint32(&counter, 1)

			// user online
			{
				var event = events.UserOnlineEvent{
					Time:   int32(i),
					UserID: uint32(i),
					Role:   uint8(i % 2),
				}

				userOnlineBuffer.Add(&entities.UserOnlineEventEntity{event})
			}

			// vacancy view
			{
				var event = events.VacancyViewEvent{
					Time:      int32(i),
					UserID:    uint32(i),
					VacancyID: uint32(i % 1024),
				}

				vacancyViewBuffer.Add(&entities.VacancyViewEventEntity{event})
			}

			// vacancy apply
			{
				var event = events.VacancyApplyEvent{
					Time:      int32(i),
					UserID:    uint32(i),
					VacancyID: uint32(i % 1024),
				}

				vacancyApplyBuffer.Add(&entities.VacancyApplyEventEntity{event})
			}
		}
	})

	// for graceful shutdown
	userOnlineWorker.Shutdown()
	vacancyViewWorker.Shutdown()
	vacancyApplyWorker.Shutdown()

	b.Logf("inserted count %10d %s", 3*b.N, time.Since(startTime))
}
go test ./examples/004-more-events/... -v -bench=. -benchmem -benchtime=50s
BenchmarkEvents
    events_test.go:91: inserted count          3 56.929436ms
    events_test.go:91: inserted count        300 48.357479ms
    events_test.go:91: inserted count      30000 83.768898ms
    events_test.go:91: inserted count    3000000 1.263198844s
    events_test.go:91: inserted count  147202194 59.571394682s
BenchmarkEvents-12    	49067398	      1214 ns/op	    1292 B/op	      11 allocs/op

Для кожного Worker-а можна налаштувати свій інтервал збереження.

Важливо зберігати кожну подію у свій буфер.

Збереження агрегованої статистики

В попередніх прикладах ми зберігали сирі дані, ось приклади агрегуючих запитів по сирим даним:

SELECT date, toStartOfHour(time), role, COUNT(DISTINCT (user_id)) AS unique_user_count
FROM user_online_source_statistics
WHERE date > today() - 7
GROUP BY date, toStartOfHour(time), role
ORDER BY date, toStartOfHour(time);
date toStartOfHour(time) role unique_user_count
2021-07-29 2021-07-29 00:00:00 2 1800
2021-07-29 2021-07-29 00:00:00 1 1800
2021-07-29 2021-07-29 01:00:00 1 1800
2021-07-29 2021-07-29 01:00:00 2 1800
2021-07-29 2021-07-29 02:00:00 1 1800
SELECT date, toStartOfHour(time), COUNT(DISTINCT user_id, vacancy_id) unique_views
FROM user_vacancy_view_source_stats
WHERE date > today() - 7
GROUP BY date, toStartOfHour(time)
ORDER BY date, toStartOfHour(time);
date toStartOfHour(time) unique_views
2021-07-29 2021-07-29 00:00:00 3600
2021-07-29 2021-07-29 01:00:00 3600
2021-07-29 2021-07-29 02:00:00 3600
2021-07-29 2021-07-29 03:00:00 3600
2021-07-29 2021-07-29 04:00:00 3600

Коли сирих даних багато, то агрегуючі запити виконуються повільно та навантажують ClickHouse.

У ClickHouse є спеціальний рушій SummingMergeTree, який при вставці агрегує дані за заданомим ключем, відповідно розмір такої таблиці менший, а запити виконуються швидше.

Розглянемо SummingMergeTree на прикладі:

CREATE TABLE user_vacancy_hour_stats
(
    date        Date,
    time        DateTime,
    user_id     UInt32,
    vacancy_id  UInt32,
    view_count  UInt64,
    apply_count UInt64
) ENGINE = SummingMergeTree()
      PARTITION BY date
      ORDER BY (date, time, user_id, vacancy_id);
INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count)
VALUES ('2021-08-02', '2021-08-02 21:00:00', 3, 5, 1, 1),
       ('2021-08-02', '2021-08-02 21:00:00', 3, 5, 1, 1),
       ('2021-08-02', '2021-08-02 21:00:00', 7, 8, 1, 1);

INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count)
VALUES (1627940000, 1627940000, 3, 5, 1, 1),
       (1627940000, 1627940000, 3, 5, 1, 1),
       (1627940000, 1627940000, 7, 8, 1, 1);

Після вставки в таблицю user_vacancy_hour_stats, рядки з однаковим ключем, в цьому випадку ORDER BY (date, time, user_id, vacancy_id), об’єднуються в один, а їх значення сумуються.

Виглядає це так:

SELECT date, time, user_id, vacancy_id, view_count, apply_count
FROM user_vacancy_hour_stats;
date time user_id vacancy_id view_count apply_count
2021-08-02 2021-08-02 21:00:00 3 5 2 2
2021-08-02 2021-08-02 21:00:00 7 8 1 1
2021-08-02 2021-08-02 21:00:00 3 5 2 2
2021-08-02 2021-08-02 21:00:00 7 8 1 1

Об’єднання відбувається з затримкою, а тому краще явно робити запити:

SELECT date, time, user_id, vacancy_id, SUM(view_count), SUM(apply_count)
FROM user_vacancy_hour_stats
GROUP BY date, time, user_id, vacancy_id;
date time user_id vacancy_id sum(view_count) sum(apply_count)
2021-08-02 2021-08-02 21:00:00 7 8 2 2
2021-08-02 2021-08-02 21:00:00 3 5 4 4

Зверніть увагу, що в поле time вставляємо вже округлене значення.

Варіанти збереження агрегованої статистики

Перший: ми можемо на стороні сервера, в даному випадку Go, зберігати подію перегляд вакансії одночасно в сиру статистику user_vacancy_view_source_stats та агреговану user_vacancy_hour_stats.

Другий: на стороні серверу зберігати тільки в сиру статистику, а на стороні ClickHouse створити materialized view, який при вставці сирих даних в таблицю user_vacancy_view_source_stats буде додатково зберігати в агрегуючу таблицю user_vacancy_hour_stats.

Збереження агрегуючої статистики на стороні серверу

На стороні серверу можна робити попередню обробку даних: фільтрацію та агрегацію. Так, події перегляд вакансії та відгук на вакансію, можна додатково зберігати в спільний буфер, щоб потім зберегти в user_vacancy_hour_stats.

CREATE TABLE user_vacancy_hour_stats
(
    date        Date,
    time        DateTime,
    user_id     UInt32,
    vacancy_id  UInt32,
    view_count  UInt64,
    apply_count UInt64
) ENGINE = SummingMergeTree()
      PARTITION BY date
      ORDER BY (date, time, user_id, vacancy_id);

Для того, щоб дві різні події зберігати в один буфер, їх треба привести до однієї:

package entities

type (
	// ORDER BY (date, time, user_id, vacancy_id)
	VacancyHourKey struct {
		Time      int32
		UserID    uint32
		VacancyID uint32
	}

	VacancyHourValue struct {
		ViewCount  uint64
		ApplyCount uint64
	}

	VacancyHour struct {
		VacancyHourKey
		VacancyHourValue
	}
)

func (v VacancyHour) Query() string {
	const (
		// language=SQL
		insertSQL = `INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count)
VALUES (?, ?, ?, ?, ?, ?);`
	)

	return insertSQL
}

func (v *VacancyHour) Values() []interface{} {
	return []interface{}{
		v.Time,
		v.Time,
		v.UserID,
		v.VacancyID,
		v.ViewCount,
		v.ApplyCount,
	}
}

func (sum VacancyHourValue) Merge(value VacancyHourValue) VacancyHourValue {
	return VacancyHourValue{
		ViewCount:  sum.ViewCount + value.ViewCount,
		ApplyCount: sum.ApplyCount + value.ApplyCount,
	}
}
package summing

import (
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events"
)

func VacancyView(source events.VacancyViewEvent) (entities.VacancyHourKey, entities.VacancyHourValue) {
	return entities.VacancyHourKey{
			Time:      roundHour(source.Time),
			UserID:    source.UserID,
			VacancyID: source.VacancyID,
		}, entities.VacancyHourValue{
			ViewCount:  1,
			ApplyCount: 0,
		}
}

func VacancyApply(source events.VacancyApplyEvent) (entities.VacancyHourKey, entities.VacancyHourValue) {
	return entities.VacancyHourKey{
			Time:      roundHour(source.Time),
			UserID:    source.UserID,
			VacancyID: source.VacancyID,
		}, entities.VacancyHourValue{
			ViewCount:  0,
			ApplyCount: 1,
		}
}

func roundHour(s int32) int32 {
	return s / 3600 * 3600
}

Як бачимо, VacancyHour реалізує інтерфейс Entity.

Для VacancyHour потрібен новий буфер, який реалізує серверну агрегацію, яка за логікою схожа на SummingMergeTree:

package buffer

import (
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/summing"
	"sync"
)

type VacancyHourBuffer struct {
	current       map[entities.VacancyHourKey]entities.VacancyHourValue
	complete      []map[entities.VacancyHourKey]entities.VacancyHourValue
	currentLength int
	mu            sync.Mutex
}

func NewVacancyHourBuffer() *VacancyHourBuffer {
	return &VacancyHourBuffer{
		current: make(map[entities.VacancyHourKey]entities.VacancyHourValue),
	}
}

func (b *VacancyHourBuffer) AddVacancyView(event events.VacancyViewEvent) {
	b.add(summing.VacancyView(event))
}

func (b *VacancyHourBuffer) AddVacancyApply(event events.VacancyApplyEvent) {
	b.add(summing.VacancyApply(event))
}

func (b *VacancyHourBuffer) add(key entities.VacancyHourKey, value entities.VacancyHourValue) {
	b.mu.Lock()
	defer b.mu.Unlock()

	if sum, exists := b.current[key]; exists {
		b.current[key] = sum.Merge(value)
	} else {
		b.current[key] = value

		b.currentLength += 1
	}

	if b.isCurrentPackFull() {
		b.completeCurrentPack()
	}
}

func (b *VacancyHourBuffer) GetAndClear() [][]entities.Entity {
	var complete = b.getAndClear()

	var result = make([][]entities.Entity, 0, len(complete))

	for _, keyValueMap := range complete {
		var pack = make([]entities.Entity, 0, len(keyValueMap))

		for key, value := range keyValueMap {
			pack = append(pack, &entities.VacancyHour{key, value})
		}

		result = append(result, pack)
	}

	return result
}

func (b *VacancyHourBuffer) getAndClear() []map[entities.VacancyHourKey]entities.VacancyHourValue {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.completeCurrentPack()

	var complete = b.complete
	b.complete = nil

	return complete
}

func (b *VacancyHourBuffer) isCurrentPackFull() bool {
	return b.currentLength >= packSize
}

func (b *VacancyHourBuffer) completeCurrentPack() {
	if b.currentLength > 0 {
		b.complete = append(b.complete, b.current)

		b.current = make(map[entities.VacancyHourKey]entities.VacancyHourValue, len(b.current))

		b.currentLength = 0
	}
}

А тепер, як це буде виглядати разом:

package buffer

import (
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events"
)

type VacancyProxyBuffer struct {
	vacancyViewBuffer  *Buffer
	vacancyApplyBuffer *Buffer
	vacancyHourBuffer  *VacancyHourBuffer
}

func NewVacancyProxyBuffer(vacancyViewBuffer *Buffer, vacancyApplyBuffer *Buffer, vacancyHourBuffer *VacancyHourBuffer) *VacancyProxyBuffer {
	return &VacancyProxyBuffer{
		vacancyViewBuffer:  vacancyViewBuffer,
		vacancyApplyBuffer: vacancyApplyBuffer,
		vacancyHourBuffer:  vacancyHourBuffer,
	}
}

func (b *VacancyProxyBuffer) AddVacancyView(event events.VacancyViewEvent) {
	b.vacancyViewBuffer.Add(&entities.VacancyViewEventEntity{event})

	b.vacancyHourBuffer.AddVacancyView(event)
}

func (b *VacancyProxyBuffer) AddVacancyApply(event events.VacancyApplyEvent) {
	b.vacancyApplyBuffer.Add(&entities.VacancyApplyEventEntity{event})

	b.vacancyHourBuffer.AddVacancyApply(event)
}
package main

import (
	"database/sql"
	_ "github.com/ClickHouse/clickhouse-go"
	"github.com/stretchr/testify/require"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/buffer"
	"gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events"
	"sync/atomic"
	"testing"
	"time"
)

func BenchmarkEvents(b *testing.B) {
	var startTime = time.Now()

	connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws")
	require.NoError(b, err)

	require.NoError(b, connect.Ping())

	require.NoError(b, vacancyViewResetTable(connect))
	require.NoError(b, vacancyApplyResetTable(connect))
	require.NoError(b, vacancyHourResetTable(connect))

	var (
		storage = NewStorage(connect)

		vacancyViewBuffer = buffer.NewBuffer()
		vacancyViewWorker = NewWorker(vacancyViewBuffer, storage, time.Second)

		vacancyApplyBuffer = buffer.NewBuffer()
		vacancyApplyWorker = NewWorker(vacancyApplyBuffer, storage, time.Second)

		vacancyHourBuffer = buffer.NewVacancyHourBuffer()
		vacancyHourWorker = NewWorker(vacancyHourBuffer, storage, time.Second)

		vacancyProxyBuffer = buffer.NewVacancyProxyBuffer(
			vacancyViewBuffer,
			vacancyApplyBuffer,
			vacancyHourBuffer,
		)
	)

	var counter = uint32(0)

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			var i = atomic.AddUint32(&counter, 1)

			{
				var event = events.VacancyViewEvent{
					Time:      int32(i),
					UserID:    uint32(i % 32),
					VacancyID: uint32(i % 64),
				}

				vacancyProxyBuffer.AddVacancyView(event)
			}

			{
				var event = events.VacancyApplyEvent{
					Time:      int32(i),
					UserID:    uint32(i % 32),
					VacancyID: uint32(i % 64),
				}

				vacancyProxyBuffer.AddVacancyApply(event)
			}
		}
	})

	// for graceful shutdown
	vacancyViewWorker.Shutdown()
	vacancyApplyWorker.Shutdown()
	vacancyHourWorker.Shutdown()

	b.Logf("inserted count ~ %10d %s", 3*b.N, time.Since(startTime))
}

Тепер кожна подія записується в два буфери, один для сирої статистики, а другий для агрегованої.

Агрегування на стороні Go, до збереження в БД, може зменшити навантаження на БД. За потреби можна писати сиру статистику в одну БД yaaws, а агреговану в другу yaaws_aggs. В залежності від навантаження можна взагалі перестати писати сиру статистику і писати лише агреговану.

MATERIALIZED VIEW

Щоб показати, як працює MATERIALIZED VIEW, ми будемо зберігати сирі дані у три різні таблиці, а агрегована статистика буде зберігатись в четверту таблиці за допомогою MATERIALIZED VIEW.

Опишу три таблиці для сирої статистики:

CREATE TABLE user_vacancy_view_source_stats
(
    date       Date,
    time       DateTime,
    user_id    UInt32,
    vacancy_id UInt32
) ENGINE = Null(); -- /dev/null
CREATE TABLE user_vacancy_apply_source_stats
(
    date       Date,
    time       DateTime,
    user_id    UInt32,
    vacancy_id UInt32
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY tuple();
CREATE TABLE user_vacancy_hired_source_stats
(
    date       Date,
    time       DateTime,
    user_id    UInt32,
    vacancy_id UInt32
) ENGINE = MergeTree()
      PARTITION BY date
      ORDER BY tuple();

Агрегуюча таблиця:

CREATE TABLE user_vacancy_hour_stats
(
    date        Date,
    time        DateTime,
    user_id     UInt32,
    vacancy_id  UInt32,
    view_count  UInt64,
    apply_count UInt64,
    hired_count UInt64
) ENGINE = SummingMergeTree()
      PARTITION BY date
      ORDER BY (date, time, user_id, vacancy_id);

Тепер 3 MATERIALIZED VIEW для кожної таблиці з сирою статистикою:

CREATE MATERIALIZED VIEW _mv_user_vacancy_view_source_stats
    TO user_vacancy_hour_stats
AS
SELECT date,
       toStartOfHour(time) AS time,       -- naming matters
       user_id,
       vacancy_id,
       toUInt64(COUNT(*))  AS view_count, -- naming matters
       toUInt64(0)         AS apply_count -- naming matters
FROM user_vacancy_view_source_stats
GROUP BY date, time, user_id, vacancy_id; -- with GROUP BY
CREATE MATERIALIZED VIEW _mv_user_vacancy_apply_source_stats
    TO user_vacancy_hour_stats
AS
SELECT date,
       toStartOfHour(time) AS time,       -- naming matters
       user_id,
       vacancy_id,
       toUInt64(0)         AS view_count, -- naming matters
       toUInt64(COUNT(*))  AS apply_count -- naming matters
FROM user_vacancy_apply_source_stats
GROUP BY date, time, user_id, vacancy_id; -- with GROUP BY
CREATE MATERIALIZED VIEW _mv_user_vacancy_hired_source_stats
    TO user_vacancy_hour_stats
AS
SELECT date,
       toStartOfHour(time) AS time,       -- naming matters
       user_id,
       vacancy_id,
       toUInt64(1)         AS hired_count -- naming matters
FROM user_vacancy_hired_source_stats; -- without GROUP BY

Зробимо вставку в кожну сиру таблицю:

INSERT INTO user_vacancy_view_source_stats (date, time, user_id, vacancy_id)
SELECT 1627940000 + number, 1627940000 + number, number % 1000, number % 2000
FROM system.numbers
LIMIT 8000000;
-- completed in 4 s 731 ms
INSERT INTO user_vacancy_apply_source_stats (date, time, user_id, vacancy_id)
SELECT 1627940000 + number * 16, 1627940000 + number * 16, number % 1000, number % 2000
FROM system.numbers
LIMIT 500000;
-- completed in 327 ms
INSERT INTO user_vacancy_hired_source_stats (date, time, user_id, vacancy_id)
SELECT 1627940000 + number * 400, 1627940000 + number * 400, number % 1000, number % 2000
FROM system.numbers
LIMIT 20000;
-- completed in 15 ms

А тепер перевіримо результат:

SELECT COUNT(*),
       COUNT(DISTINCT (date)),
       COUNT(DISTINCT (time)),
       SUM(view_count),
       SUM(apply_count),
       SUM(hired_count)
FROM user_vacancy_hour_stats;
count() uniqExact(date) uniqExact(time) sum(view_count) sum(apply_count) sum(hired_count)
4445600 94 2223 8000000 500000 20000
SELECT time, COUNT(*), SUM(view_count), SUM(apply_count), SUM(hired_count)
FROM user_vacancy_hour_stats
GROUP BY time
ORDER BY time DESC
LIMIT 5;
time count() sum(view_count) sum(apply_count) sum(hired_count)
2021-11-03 11:00:00 2000 2800 175 7
2021-11-03 10:00:00 2000 3600 225 9
2021-11-03 09:00:00 2000 3600 225 9
2021-11-03 08:00:00 2000 3600 225 9
2021-11-03 07:00:00 2000 3600 225 9

Як бачимо, таблиця user_vacancy_hour_stats містить правильні значення.

Епілог

У цій статті ми розглянули лише три рушії MergeTree, SummingMergeTree та Null. ClickHouse має значно більше рушіїв: ReplacingMergeTree, CollapsingMergeTree, AggregatingMergeTree та інші. Також ClickHouse має інтеграції з MySQL, PostgresSQL та MongoDB.

Якщо вам було мало тексту, то можете почитати враження Володимира Рожкова про роботу з ClickHouse.

І звісно, репозиторій на GitLab gitlab.com/go-yp/go-clickhouse-example.

Про рекламу вакансій

Зазвичай при написанні статті, коли я вже знаю, що буду публікувати, то шукаю компанію, яка готова прорекламувати в статті свої вакансії з Go, якщо вакансія виглядає привабливою (LGTM), то заходжу на сторінку компанії на DOU, шукаю контакти та пишу пропозицію.

У компанії Softwarium є три вакансії на DOU:

Мені подобається, що у вакансіях вказана винагорода, етапи співбесіди, опис проєкту а також склад команди, з опису видно, що вони дійсно шукають. Стосовно вакансій пишіть Юлії.

👍НравитсяПонравилось5
В избранноеВ избранном7
LinkedIn
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Где сожно посмотреть структурные диаграммы реальных проектов на Го?
Чтобы было видно, какие есть рутины, через какие каналы они коммуницируют, и чем эта сеть вся занимается. Типа такого www.chromium.org/...​ulti-process-architecture
Акторы/каналы хорошо на словах, а в реальности там начинаются вьюшки и оркестрация, как только логика зависит от состояния большей части системы. Пытаюсь узнать, для каких типов задач оно хорошо подходит.

Краще створи окрему тему і можливо там отримаєш відповідь, бо я частіше бачу діаграми БД ніж проєктів

Через півроку написав статтю про gRPC

Жаль что не заметил сразу ту статью, но в любом случае мне там невозможно коментить. Ну так вот, в статье ещё не хватает примера стримингового метода, со стримингов входящим или исходящим. Или чата. Ещё считаю, что многим пригодится рассмотреть пример балансировки нагрузки, это несложно реализуется. Ну и докер-файл по клиенту и серверу, и docker-compose, чтоб всё сразу запустить.

Как-то недавно сделал тестовое задание для одной конторы, там как раз пример с gRPC. Пацаны сказали, что тестовое очень понравилось, они в первый раз такое проработанное видят, но насчёт сотрудничества тем не менее не договорились. Вот оно:
github.com/schwarzlichtbezirk/dfs
Общий смысл в том, чтобы загружать через сервис какие-либо файлы, которые будут разделяться на равные куски, и эти куски храниться на нодах. Думаю, что код будет всем интересен, кто учит gRPC под Go. Там и стриминг есть.

Я почав писати статтю Приклад gRPC-мікросервісу на Go для збереження статистики в ClickHouse ще в середині червня, там хочу описати: gRPC streaming, graceful shutdown та тестування.

Але в процесі написання статті зрозумів, що стаття буде великою, а тому частину з ClickHouse виділив в меншу статтю Go ClickHouse example. Зберігаємо статистику.

Хочу написати про github.com/kyleconroy/sqlc, Aerospike та продовження статті про JSON.

Якщо є бажання та час допомогти з написанням статей то приєднуйся.

gRPC streaming, graceful shutdown та тестування.

Кстати, насчёт graceful shutdown — это довольно важный вопрос, который почему-то большинство начисто игнорируют. Корректное завершение нужно прежде всего для сохранения целостности данных в БД, и записи файлов.

То есть, приложение должно ловить SIGTERM и SIGINT, и корректно завершать слушающие потоки. Кроме того, транзакции, где есть последовательность каких-то запросов надо лочить через waitgroup, и после завершения потоков дожидаться конца отработки транзакций. Дальше записывать несохранённые данные. И всё это должно укладываться в интервал времени, отведённый для завершения работы.

Ещё есть нюанс в том, что под вендой SIGTERM не посылается для приложений, скомпиленных под Go 1.13 и ранних версий, потом это пофиксили. Ну и в докере приложение нужно запускать без шелла, чтоб приходил сигнал.

graceful shutdown

На дворе шел 21й век.

Это классика, вроде как исключения в деструкторах.

Классика для 80х разве что.
Все РДБМС и половину NoSql поддерживают durability из acid. Это не твоя головная боль, это боль базы данных привести в порядок все свои файлы и стартануть без сбоев даже если электричество пропало.

Ты как говнокодер не должен писать такой код. Во-первых ты его нормально не напишешь, он не простой. Во-вторых проблемы сбоев электричества, сбоев оси, хардвар и тд редко но случаются. И до твоего грейсфул кода дело просто не дойдет.

Вы не совсем уловили суть. Допустим, выполняется транзакция. По ходу выполнения осуществляется ряд последовательных апдейтов в базу. Предположим, один апдейт списывает деньги с одного аккаунта, а второй — добавляет на другой аккаунт. И для примера, это делается за 2 запроса с каким-то расчётом между ними. Если приложение пропускает SIGTERM, потом оно просто дропнется, 1-ый апдейт в базу выполнится, а 2-ой — нет. В результате нарушается логическая целостность данных. То что сама база будет в порядке — ещё ничего не значит. Ну а остановка приложения случается куда чаще, чем сбои электричества, железа и т.п. Наша задача как говнокодеров обеспечить корректную работу приложения.

Вообще-то смысл транзакции в атомарности операции. Если списывают и зачисляют в одной и той же транзакции, то сбой оборудования в середине приведет к откату всех операций в транзакции автоматом. Вам не нужно вмешиваться в работу СУБД.

Абсолютно с вами согласен. И таких атомарных операций, о выполнении которых на стороне СУБД не стоит беспокоиться — может быть несколько на стороне приложения втечении 1 транзакции. А может и с несколькими СУБД, и с другими сервисами, от которых зависит приложение. Задача в том, чтоб обеспечить атомарность транзакции на стороне этого приложения.

Это называется «распределенная транзакция» и требует очереди сообщений с гарантированной доставкой и всякую хрень вроде оркестрации.

Есть один минус — страна производства ClickHouse. И нет, это не ради холивара

Російсько-українська війна (з 2014).

ClickHouse має відкритий код.

Можна спробувати LocustDB (Rust) як альтернативу ClickHouse, але для LocustDB потрібно буде писати клієнт на Go.

От того что он «має відкритий код» никому не лучше. Основные коммиты туда идут от россиян. Если не нужен sql, то уж лучше Elasticsearch

Я в статті описав свій досвід роботи з ClickHouse для популяризації Go в Україні, додатково поширив в групі GolangUA.

Так, Московія веде війну проти України, щоб повернути тимчасово окуповані території потрібна сильна економіка, я вважаю що популяризація Go сприяє збільшенню винагород а отже позитивно впливає на економіку України.

Я поважаю вашу думку і вважаю ваш коментар корисним а також вважаю що українська мова допомагає Україні захищатись від Московської агресії.

ClickHouse написаний московитами, але ми можемо його використовувати для розвитку проєктів.

Українською будь ласка!

Не только этот недостаток.
Это вообще достаточно спартанская база, без транзакций, без джоинов, без возможности удаления данных и с возможностью создать и использовать на таблице всего один (!) кластерный индекс.
Она подходит только если данных действительно много и на все остальные параметры уже какбэ наплевать.

без джоинов, без возможности удаления данных

С

с возможностью создать и использовать на таблице всего один (!) кластерный индекс.

Скипиндексы год назад были. В BigQuery например и того нет

Она подходит только если данных действительно много и на все остальные параметры уже какбэ наплевать.

На то собственно и затачивался

Ну и много ручных операций, что как бы серьёзный минус.

Есть джоины
Есть удаление и апдейты (ALTER DELETE/UPDATE/MODIFY)

без транзакций

А зачем вам транзакции в аналитической БД?

Есть джоины

Можно пример джоинов ? (Может уже добавили конечно, но я об этом не знаю)

Есть удаление и апдейты (ALTER DELETE/UPDATE/MODIFY)

Для этого нужно понимать как колоночные хранилища работают.
Удаление и апдейт крайне тяжёлая операция. Фактически ребилд колонки или ее большого сегмента
Кроме того в случае кликхауса и к инсертам есть вопросы. Они там работают на приемлемой скорости только пачками, единичные инсерты работают медленно.

А зачем вам транзакции в аналитической БД?

Чтобы держать базу в консистентном состоянии. Кэп.

Можно пример джоинов

clickhouse.tech/...​t/join/#select-join-types

Для этого нужно понимать как колоночные хранилища работают.
Удаление модификация и апдейт крайне тяжёлая операция.

Это отменяет тот факт что эту операцию можно провести?

Кроме того в случае кликхауса и к инсертам есть вопросы. Они там работают на приемлемой скорости только пачками, единичные инсерты работают медленно.

Есть выход — не делайте вставку построчно.

Нужно вообще понимать, нужна ли под ваши задачи колоночная БД со всеми её недостатками? Если не нужна, то зачем её использовать?

clickhouse.tech/...​t/join/#select-join-types

Теперь осталось подтвердить, что на джоинах и внешних индексах «не тормозит».
Но вот что-то мне подсказывает что теперь тормозит. Да тормозит еще похлеще чем в MS SQL/Oracle построчных движках.

Это отменяет тот факт что эту операцию можно провести?

Говорят на роликах можно провести операцию доезжания до хабаровска.
Стоит ли теперь ролики квалифицировать как транспорт международного сообщения ?

Есть выход — не делайте вставку построчно.

А причем здесь построчно если у вас колоночное хранилище ? Вы еще не поняли с чем вы имеете дело ?

Нужно вообще понимать, нужна ли под ваши задачи колоночная БД со всеми её недостатками? Если не нужна, то зачем её использовать?

Нужно вообще понимать, что в той функциональности которую предоставляет кликхаус, его ближайший конкурент это запись и чтение данных в файлик ридером. Если не ленится будет еще +15%-20% фрагов к лозунгу «не тормозит».
Собственно это то как Яндекс метрики и начинались. Зачем писать а базу, будем писать в файл

Хорошая статья. Очень понятно описаны сложные вещи.
Я бы хотел прокомментировать момент с сортировкой по `tuple()`. С такой сортировкой возможно выиграете со скоростью вставки, но проиграете со сжатием и эффективностью выборок. Я бы делал `ORDER BY (date, time)` и добавил бы партиционирование по `toYYYYMM(date)`. Партиции по дням, как у вас, будут генерить очень много файлов и дальнейшие вставки могут быть более медленными (Оптимально не более 100 партиций на таблицу). Туда же и семплирование можно докрутить по дате.

Я бы делал `ORDER BY (date, time)` и добавил бы партиционирование по `toYYYYMM(date)`. Партиции по дням, как у вас, будут генерить очень много файлов и дальнейшие вставки могут быть более медленными (Оптимально не более 100 партиций на таблицу).

или отдельные таблицы для горячих/архивных данных и для каждой свой шаг партиционирования — daily/monthly
вариантов много

Очень грамотный материал, спасибо огромное !

Подписаться на комментарии