Go ClickHouse example. Зберігаємо статистику
Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті
Привіт, мене звати Ярослав, прийшовши в компанію Evrius, отримав завдання: розробити gRPC-мікросервіс на Go для збереження статистики в ClickHouse. Через півроку написав статтю про gRPC, логічно ж через пару років написати і про ClickHouse.
Стаття буде про швидкість збереження в ClickHouse та організацію процесу збереження. Стаття класичної структури від створення проєкту з docker-compose до посилання на код репозиторію.
Ілюстрація Марії Рибак
Опис задачі
Уявімо, що ми почали працювати над ще одним проєктом анонімного пошуку роботи (поки всі домени yeews.com, yaaws.com, yaaws.co, yaaws.io та yaaws.ai доступні для купівлі), такі проєкти зараз популярні (yaaws is yet another anonym work search).
І нам треба зберігати в статистику різноманітні події:
- Онлайн.
- Перегляд вакансії.
- Відгук на вакансію.
Саме цим будемо далі займатись в статті. Ми розглянемо саме збереження подій, але щоб побачити повну картину, треба пофантазувати, яку статистику буде цікаво побачити користувачам.
На основі подій онлайну можна вивести графік активності по годинам: одна крива лінія по онлайну кандидатів та майже пряма лінія по онлайну рекрутерів. Буде загальний графік онлайну для України та для кожного міста, включно з Ялтою.
Налаштування проєкту
Я створю Go проєкт, заверну в docker, підключу ClickHouse та напишу простий тест збереження в БД.
Для створення Go проєкту є команда go mod init:
go mod init gitlab.com/go-yp/go-clickhouse-example
go: creating new go.mod: module gitlab.com/go-yp/go-clickhouse-example
tree .
└── go.mod
cat go.mod
module gitlab.com/go-yp/go-clickhouse-example go 1.16
Ще додам файл main.go:
package main import "fmt" func main() { fmt.Println("Привіт, світе!") }
version: "2" services: app: container_name: "yaaws_app" image: golang:1.16.1-alpine volumes: - .:/go/src/gitlab.com/go-yp/go-clickhouse-example - ./docker/golang/.ash_history:/root/.ash_history:ro - go-modules:/go/pkg/mod # Put modules cache into a separate volume command: "sleep 365d" working_dir: "/go/src/gitlab.com/go-yp/go-clickhouse-example" env_file: - .env links: - clickhouse depends_on: - clickhouse clickhouse: container_name: "yaaws_clickhouse" image: yandex/clickhouse-server:21.7.5.29 # ports: # - "8123:8123" # - "9000:9000" volumes: go-modules: # Define the volume
sudo docker-compose up -d
sudo docker exec yaaws_app go run main.go
Привіт, світе!
sudo docker exec yaaws_clickhouse clickhouse-client --query "SELECT VERSION();"
21.7.5.29
sudo docker-compose down
Як бачимо, команди успішно виконались в контейнерах та вивели очікувані результати.
Простий тест для збереження в ClickHouse
Я підключу до проєкту нову офіційну Go-бібліотеку для роботи з ClickHouse github.com/ClickHouse/clickhouse-go, в тесті створю табличку для збереження онлайну. Буду зберігати онлайн пачками по 10_000 записів, перевірю, що в базі правильне число записів.
Коли я тільки починав розробляти свій перший мікросервіс на Go, ще на початку 2019 року, то підключив стару офіційну Go бібліотеку від ClickHouse, яка працювала дуже повільно: з того, що пам’ятаю, то бенчмарки показували швидкість вставки 10_000 записів в секунду (вставляло 5 пачок по 2000 в кожній), що мало. Поспілкувавшись з колегою, дізнався, що в інших мікросервісах використовується бібліотека github.com/kshvakov/clickhouse, яка виявилась значно продуктивнішою.
В ClickHouse прийняли правильне рішення та зробили офіційною github.com/kshvakov/clickhouse, а тому й ми будемо використовувати нову офіційну бібліотеку.
Опис таблиці для збереження онлайну:
CREATE TABLE user_online_source_statistics ( date Date, time DateTime, user_id UInt32, role UInt8 -- employee = 1, employer = 2 ) ENGINE = MergeTree() PARTITION BY date ORDER BY user_id;
Очищення таблиці в коді:
package main import "database/sql" const ( // language=SQL createTableSQL = `CREATE TABLE user_online_source_statistics ( date Date, time DateTime, user_id UInt32, role UInt8 ) ENGINE = MergeTree() PARTITION BY date ORDER BY user_id;` // language=SQL dropTableSQL = `DROP TABLE IF EXISTS user_online_source_statistics;` ) func resetTable(connect *sql.DB) error { if _, err := connect.Exec(dropTableSQL); err != nil { return err } if _, err := connect.Exec(createTableSQL); err != nil { return err } return nil }
type UserOnlineEvent struct { Time int32 UserID uint32 Role uint8 }
package main import "time" func userOnlineEventFixtures(count int) []UserOnlineEvent { var result = make([]UserOnlineEvent, count) var now = int32(time.Now().Unix()) for i := 0; i < count; i++ { result[i] = UserOnlineEvent{ Time: now, UserID: uint32(i), Role: role(i), } } return result } func role(index int) uint8 { const ( employee = 1 employer = 2 ) if index&1 == 1 { return employee } return employer }
Функція для збереження статистики в БД:
package main import ( "database/sql" "fmt" "log" ) const ( // language=SQL insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role) VALUES (?, ?, ?, ?);` ) func StoreUserOnlineStatistics(connect *sql.DB, users []UserOnlineEvent) (err error) { return execTx(connect, func(tx *sql.Tx) error { stmt, stmtErr := tx.Prepare(insertSQL) if stmtErr != nil { return stmtErr } defer stmt.Close() for _, user := range users { _, insertErr := stmt.Exec( user.Time, user.Time, user.UserID, user.Role, ) if insertErr != nil { log.Printf("insert err %+v", insertErr) continue } } return nil }) } func execTx(connect *sql.DB, fn func(*sql.Tx) error) error { var tx, txErr = connect.Begin() if txErr != nil { return txErr } var err = fn(tx) if err != nil { if rbErr := tx.Rollback(); rbErr != nil { return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr) } return err } return tx.Commit() }
А тепер використання цього всього в бенчмарку:
package main import ( "database/sql" "sync/atomic" "testing" "time" _ "github.com/ClickHouse/clickhouse-go" ) func BenchmarkUserOnlineStatisticsStore(b *testing.B) { const ( packSize = 10000 ) var startTime = time.Now() connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws") if err != nil { b.Fatal(err) } if err := connect.Ping(); err != nil { b.Fatal(err) } if err := resetTable(connect); err != nil { b.Fatal(err) } var fixtures = userOnlineEventFixtures(packSize) var count uint32 = 0 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var storeErr = StoreUserOnlineStatistics(connect, fixtures) if storeErr != nil { b.Error(storeErr) continue } atomic.AddUint32(&count, 1) } }) b.Logf("inserted count %10d by %6d %s", count*packSize, count, time.Since(startTime)) }
sudo docker exec yaaws_clickhouse clickhouse-client --query "CREATE DATABASE yaaws;"
Усе готово до запуску бенчмарка, запускаю на 100 секунд:
sudo docker exec yaaws_app go test ./examples/001-clickhouse-user-online-statistics-benchmark/... -v -bench=. -benchmem -benchtime=100s
goos: linux goarch: amd64 pkg: gitlab.com/go-yp/go-clickhouse-example/examples/001-clickhouse-user-online-statistics-benchmark cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkUserOnlineStatisticsStore user_online_statistics_test.go:49: inserted count 10000 by 1 3.756016629s user_online_statistics_test.go:49: inserted count 1000000 by 100 250.611035ms user_online_statistics_test.go:49: inserted count 100000000 by 10000 22.209494737s user_online_statistics_test.go:49: inserted count 540960000 by 54096 2m11.718269866s BenchmarkUserOnlineStatisticsStore-12 54096 2420397 ns/op 2850018 B/op 69824 allocs/op PASS ok gitlab.com/go-yp/go-clickhouse-example/examples/001-clickhouse-user-online-statistics-benchmark 157.961s
sudo docker exec yaaws_clickhouse clickhouse-client --query "SELECT COUNT() FROM yaaws.user_online_source_statistics;"
540_960_000
Тест показав швидкість вставки 4_000_000 записів в секунду.
Якщо у вас є зауваження до тесту чи побажання, то пишіть в коментарях, оновлю та додам нові результати, так вже робив в попередніх статтях. Або вам цікаво, скільки займає таблиця зараз, або скільки буде займати, якщо усі user_id будуть унікальні, то теж таке питайте в коментарях, поспілкуймось!
Організація процесу збереження
В ClickHouse ефективно зберігати пачками тому перед збереженням події потрібно накопичувати в буфері. Проста реалізація буферу:
package main type UserOnlineEvent struct { Time int32 UserID uint32 Role uint8 }
package main import "sync" const ( packSize = 10000 ) type UserOnlineEventBuffer struct { current []UserOnlineEvent complete [][]UserOnlineEvent currentLength int mu sync.Mutex } func NewUserOnlineEventBuffer() *UserOnlineEventBuffer { return &UserOnlineEventBuffer{} } func (b *UserOnlineEventBuffer) Add(event UserOnlineEvent) { b.mu.Lock() defer b.mu.Unlock() b.current = append(b.current, event) b.currentLength += 1 if b.isCurrentPackFull() { b.completeCurrentPack() } } func (b *UserOnlineEventBuffer) GetAndClear() [][]UserOnlineEvent { b.mu.Lock() defer b.mu.Unlock() b.completeCurrentPack() var result = b.complete b.complete = nil return result } func (b *UserOnlineEventBuffer) isCurrentPackFull() bool { return b.currentLength >= packSize } func (b *UserOnlineEventBuffer) completeCurrentPack() { if b.currentLength > 0 { b.complete = append(b.complete, b.current) b.current = make([]UserOnlineEvent, 0, packSize) b.currentLength = 0 } }
package main import ( "sync/atomic" "testing" "github.com/stretchr/testify/require" ) func BenchmarkUserOnlineEventBuffer(b *testing.B) { var buffer = NewUserOnlineEventBuffer() var expectedCount uint32 = 0 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var i = atomic.AddUint32(&expectedCount, 1) buffer.Add(UserOnlineEvent{ Time: int32(i), UserID: uint32(i), Role: uint8(i % 2), }) } }) var ( packs = buffer.GetAndClear() actualCount = 0 ) for _, pack := range packs { actualCount += len(pack) } require.Equal(b, b.N, actualCount) }
go test ./examples/002-buffer/... -v -bench=. -benchmem
BenchmarkUserOnlineEventBuffer BenchmarkUserOnlineEventBuffer-12 17010046 68.42 ns/op 12 B/op 0 allocs/op PASS ok gitlab.com/go-yp/go-clickhouse-example/examples/002-buffer 1.248s
Щоб доповнити картину, потрібен компонент Storage, який буде зберігати в БД, а також компонент Worker, який буде щохвилини брати дані з буферу та передавати в Storage.
Storage вже бачили раніше:
package main import ( "database/sql" "fmt" "log" ) type UserOnlineEventStorage struct { db *sql.DB } func NewUserOnlineEventStorage(db *sql.DB) *UserOnlineEventStorage { return &UserOnlineEventStorage{db: db} } func (s *UserOnlineEventStorage) Store(events []UserOnlineEvent) (err error) { const ( // language=SQL insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role) VALUES (?, ?, ?, ?);` ) return execTx(s.db, func(tx *sql.Tx) error { stmt, stmtErr := tx.Prepare(insertSQL) if stmtErr != nil { return stmtErr } defer stmt.Close() for _, user := range events { _, insertErr := stmt.Exec( user.Time, user.Time, user.UserID, user.Role, ) if insertErr != nil { log.Printf("insert err %+v", insertErr) continue } } return nil }) }
Worker буде складнішим:
package main import ( "sync" "time" ) type UserOnlineEventWorker struct { buffer *UserOnlineEventBuffer storage *UserOnlineEventStorage flushInterval time.Duration mu sync.Mutex shutdownSignal chan struct{} shutdown bool } func NewUserOnlineEventWorker(buffer *UserOnlineEventBuffer, storage *UserOnlineEventStorage, flushInterval time.Duration) *UserOnlineEventWorker { var worker = &UserOnlineEventWorker{ buffer: buffer, storage: storage, flushInterval: flushInterval, shutdownSignal: make(chan struct{}), } go worker.run() return worker } // FlushAndClose func (w *UserOnlineEventWorker) Shutdown() { w.mu.Lock() defer w.mu.Unlock() if w.shutdown { w.store() return } w.shutdown = true w.shutdownSignal <- struct{}{} <-w.shutdownSignal close(w.shutdownSignal) w.store() } func (w *UserOnlineEventWorker) run() { ticker := time.NewTicker(w.flushInterval) for { select { case <-ticker.C: w.store() case <-w.shutdownSignal: ticker.Stop() w.shutdownSignal <- struct{}{} return } } } func (w *UserOnlineEventWorker) store() { packs := w.buffer.GetAndClear() if len(packs) == 0 { return } for _, pack := range packs { err := w.storage.Store(pack) if err != nil { // log error } } }
А тепер об’єднаємо всі компоненти разом:
package main import ( "database/sql" "sync/atomic" "testing" "time" _ "github.com/ClickHouse/clickhouse-go" ) func BenchmarkUserOnlineEvent(b *testing.B) { var startTime = time.Now() connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws") if err != nil { b.Fatal(err) } // ... var ( buffer = NewUserOnlineEventBuffer() storage = NewUserOnlineEventStorage(connect) worker = NewUserOnlineEventWorker(buffer, storage, time.Second) ) var counter = uint32(0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var i = atomic.AddUint32(&counter, 1) buffer.Add(UserOnlineEvent{ Time: int32(i), UserID: uint32(i), Role: uint8(i % 2), }) } }) // for graceful shutdown worker.Shutdown() b.Logf("inserted count %10d %s", b.N, time.Since(startTime)) }
go test ./examples/002-buffer/... -v -bench=BenchmarkUserOnlineEvent$ -benchmem -benchtime=50s
BenchmarkUserOnlineEvent user_online_event_test.go:50: inserted count 1 302.102019ms user_online_event_test.go:50: inserted count 100 54.727407ms user_online_event_test.go:50: inserted count 10000 63.394769ms user_online_event_test.go:50: inserted count 1000000 1.55245884s user_online_event_test.go:50: inserted count 39230679 36.666568388s user_online_event_test.go:50: inserted count 64229607 58.031452997s BenchmarkUserOnlineEvent-12 64229607 900.9 ns/op 296 B/op 7 allocs/op PASS ok gitlab.com/go-yp/go-clickhouse-example/examples/002-buffer 161.734s
Організація процесу збереження, масштабування
В попередньому прикладі ми працювали з конкретним типом, але для кожного типу події потрібно буде написати свій Buffer, Storage та Worker, що буде дублюванням коду, бо поки в Go відсутні дженерики.
Щоб повторно використовувати написану логіку (без дублювання коду), можна винести відмінності в інтерфейс. Для цього розглянемо метод збереження в БД:
func (s *UserOnlineEventStorage) Store(events []UserOnlineEvent) (err error) { const ( // language=SQL insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role) VALUES (?, ?, ?, ?);` ) return execTx(s.db, func(tx *sql.Tx) error { stmt, stmtErr := tx.Prepare(insertSQL) if stmtErr != nil { return stmtErr } defer stmt.Close() for _, user := range events { _, insertErr := stmt.Exec( user.Time, user.Time, user.UserID, user.Role, ) if insertErr != nil { log.Printf("insert err %+v", insertErr) continue } } return nil }) }
Щоб зробити метод універсальним, потрібно винести SQL та поля в інтерфейс:
type Entity interface { Query() string Values() []interface{} }
type UserOnlineEventEntity struct { UserOnlineEvent } func (e UserOnlineEventEntity) Query() string { const ( // language=SQL insertSQL = `INSERT INTO user_online_source_statistics (date, time, user_id, role) VALUES (?, ?, ?, ?);` ) return insertSQL } func (e *UserOnlineEventEntity) Values() []interface{} { return []interface{}{ e.Time, e.Time, e.UserID, e.Role, } }
type Storage struct { db *sql.DB } func NewStorage(db *sql.DB) *Storage { return &Storage{db: db} } func (s *Storage) Store(entities []Entity) (err error) { if len(entities) == 0 { return nil } var ( entity = entities[0] insertSQL = entity.Query() ) return execTx(s.db, func(tx *sql.Tx) error { stmt, stmtErr := tx.Prepare(insertSQL) if stmtErr != nil { return stmtErr } defer stmt.Close() for _, entity := range entities { _, insertErr := stmt.Exec(entity.Values()...) if insertErr != nil { log.Printf("insert err %+v", insertErr) continue } } return nil }) }
В Buffer-і та Worker-і зміню конкретний тип на інтерфейс Entity та запущу тест:
func BenchmarkUserOnlineEvent(b *testing.B) { var startTime = time.Now() connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws") if err != nil { b.Fatal(err) } // ... var ( buffer = NewBuffer() storage = NewStorage(connect) worker = NewWorker(buffer, storage, time.Second) ) var counter = uint32(0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var i = atomic.AddUint32(&counter, 1) var event = UserOnlineEvent{ Time: int32(i), UserID: uint32(i), Role: uint8(i % 2), } buffer.Add(&UserOnlineEventEntity{event}) } }) // for graceful shutdown worker.Shutdown() b.Logf("inserted count %10d %s", b.N, time.Since(startTime)) }
go test ./examples/003-entity-interface/... -v -bench=BenchmarkUserOnlineEvent$ -benchmem -benchtime=50s
BenchmarkUserOnlineEvent user_online_event_test.go:53: inserted count 1 24.790717ms user_online_event_test.go:53: inserted count 100 16.887622ms user_online_event_test.go:53: inserted count 10000 49.692264ms user_online_event_test.go:53: inserted count 1000000 1.001504459s user_online_event_test.go:53: inserted count 60448701 1m0.404778045s BenchmarkUserOnlineEvent-12 60448701 998.9 ns/op 380 B/op 9 allocs/op PASS ok gitlab.com/go-yp/go-clickhouse-example/examples/003-entity-interface 61.667s
Якщо у вас є повний доступ до коду, то методи Query та Values інтерфейсу Entity краще додати до структури UserOnlineEvent напряму, без обгорток.
Більше подій
Розглянемо події перегляд вакансії та відгук на вакансію як приклад повторного використання коду:
CREATE TABLE user_vacancy_view_source_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32 ) ENGINE = MergeTree() PARTITION BY date ORDER BY tuple();
CREATE TABLE user_vacancy_apply_source_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32 ) ENGINE = MergeTree() PARTITION BY date ORDER BY tuple();
З ORDER BY tuple() вставка в таблицю буде без додаткового сортування, що пришвидшить вставку в порівнянні з ORDER BY user_id чи ORDER BY (user_id, vacancy_id).
package events type VacancyViewEvent struct { Time int32 UserID uint32 VacancyID uint32 }
package entities import "gitlab.com/go-yp/go-clickhouse-example/examples/004-more-events/events" type VacancyViewEventEntity struct { events.VacancyViewEvent } func (e VacancyViewEventEntity) Query() string { const ( // language=SQL insertSQL = `INSERT INTO user_vacancy_view_source_stats (date, time, user_id, vacancy_id) VALUES (?, ?, ?, ?);` ) return insertSQL } func (e *VacancyViewEventEntity) Values() []interface{} { return []interface{}{ e.Time, e.Time, e.UserID, e.VacancyID, } }
func BenchmarkEvents(b *testing.B) { var startTime = time.Now() connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws") if err != nil { b.Fatal(err) } // ... if err := userOnlineResetTable(connect); err != nil { b.Fatal(err) } if err := vacancyViewResetTable(connect); err != nil { b.Fatal(err) } if err := vacancyApplyResetTable(connect); err != nil { b.Fatal(err) } var ( storage = NewStorage(connect) userOnlineBuffer = NewBuffer() userOnlineWorker = NewWorker(userOnlineBuffer, storage, time.Second) vacancyViewBuffer = NewBuffer() vacancyViewWorker = NewWorker(vacancyViewBuffer, storage, time.Second) vacancyApplyBuffer = NewBuffer() vacancyApplyWorker = NewWorker(vacancyApplyBuffer, storage, time.Second) ) var counter = uint32(0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var i = atomic.AddUint32(&counter, 1) // user online { var event = events.UserOnlineEvent{ Time: int32(i), UserID: uint32(i), Role: uint8(i % 2), } userOnlineBuffer.Add(&entities.UserOnlineEventEntity{event}) } // vacancy view { var event = events.VacancyViewEvent{ Time: int32(i), UserID: uint32(i), VacancyID: uint32(i % 1024), } vacancyViewBuffer.Add(&entities.VacancyViewEventEntity{event}) } // vacancy apply { var event = events.VacancyApplyEvent{ Time: int32(i), UserID: uint32(i), VacancyID: uint32(i % 1024), } vacancyApplyBuffer.Add(&entities.VacancyApplyEventEntity{event}) } } }) // for graceful shutdown userOnlineWorker.Shutdown() vacancyViewWorker.Shutdown() vacancyApplyWorker.Shutdown() b.Logf("inserted count %10d %s", 3*b.N, time.Since(startTime)) }
go test ./examples/004-more-events/... -v -bench=. -benchmem -benchtime=50s
BenchmarkEvents events_test.go:91: inserted count 3 56.929436ms events_test.go:91: inserted count 300 48.357479ms events_test.go:91: inserted count 30000 83.768898ms events_test.go:91: inserted count 3000000 1.263198844s events_test.go:91: inserted count 147202194 59.571394682s BenchmarkEvents-12 49067398 1214 ns/op 1292 B/op 11 allocs/op
Для кожного Worker-а можна налаштувати свій інтервал збереження.
Важливо зберігати кожну подію у свій буфер.
Збереження агрегованої статистики
В попередніх прикладах ми зберігали сирі дані, ось приклади агрегуючих запитів по сирим даним:
SELECT date, toStartOfHour(time), role, COUNT(DISTINCT (user_id)) AS unique_user_count FROM user_online_source_statistics WHERE date > today() - 7 GROUP BY date, toStartOfHour(time), role ORDER BY date, toStartOfHour(time);
date | toStartOfHour(time) | role | unique_user_count |
---|---|---|---|
2 | 1800 | ||
1 | 1800 | ||
1 | 1800 | ||
2 | 1800 | ||
1 | 1800 |
SELECT date, toStartOfHour(time), COUNT(DISTINCT user_id, vacancy_id) unique_views FROM user_vacancy_view_source_stats WHERE date > today() - 7 GROUP BY date, toStartOfHour(time) ORDER BY date, toStartOfHour(time);
date | toStartOfHour(time) | unique_views |
---|---|---|
3600 | ||
3600 | ||
3600 | ||
3600 | ||
3600 |
Коли сирих даних багато, то агрегуючі запити виконуються повільно та навантажують ClickHouse.
У ClickHouse є спеціальний рушій SummingMergeTree, який при вставці агрегує дані за заданомим ключем, відповідно розмір такої таблиці менший, а запити виконуються швидше.
Розглянемо SummingMergeTree на прикладі:
CREATE TABLE user_vacancy_hour_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32, view_count UInt64, apply_count UInt64 ) ENGINE = SummingMergeTree() PARTITION BY date ORDER BY (date, time, user_id, vacancy_id);
INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count) VALUES ('2021-08-02', '2021-08-02 21:00:00', 3, 5, 1, 1), ('2021-08-02', '2021-08-02 21:00:00', 3, 5, 1, 1), ('2021-08-02', '2021-08-02 21:00:00', 7, 8, 1, 1); INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count) VALUES (1627940000, 1627940000, 3, 5, 1, 1), (1627940000, 1627940000, 3, 5, 1, 1), (1627940000, 1627940000, 7, 8, 1, 1);
Після вставки в таблицю user_vacancy_hour_stats, рядки з однаковим ключем, в цьому випадку ORDER BY (date, time, user_id, vacancy_id), об’єднуються в один, а їх значення сумуються.
Виглядає це так:
SELECT date, time, user_id, vacancy_id, view_count, apply_count FROM user_vacancy_hour_stats;
date | time | user_id | vacancy_id | view_count | apply_count |
---|---|---|---|---|---|
3 | 5 | 2 | 2 | ||
7 | 8 | 1 | 1 | ||
3 | 5 | 2 | 2 | ||
7 | 8 | 1 | 1 |
Об’єднання відбувається з затримкою, а тому краще явно робити запити:
SELECT date, time, user_id, vacancy_id, SUM(view_count), SUM(apply_count) FROM user_vacancy_hour_stats GROUP BY date, time, user_id, vacancy_id;
date | time | user_id | vacancy_id | sum(view_count) | sum(apply_count) |
---|---|---|---|---|---|
7 | 8 | 2 | 2 | ||
3 | 5 | 4 | 4 |
Зверніть увагу, що в поле time вставляємо вже округлене значення.
Варіанти збереження агрегованої статистики
Перший: ми можемо на стороні сервера, в даному випадку Go, зберігати подію перегляд вакансії одночасно в сиру статистику user_vacancy_view_source_stats та агреговану user_vacancy_hour_stats.
Другий: на стороні серверу зберігати тільки в сиру статистику, а на стороні ClickHouse створити materialized view, який при вставці сирих даних в таблицю user_vacancy_view_source_stats буде додатково зберігати в агрегуючу таблицю user_vacancy_hour_stats.
Збереження агрегуючої статистики на стороні серверу
На стороні серверу можна робити попередню обробку даних: фільтрацію та агрегацію. Так, події перегляд вакансії та відгук на вакансію, можна додатково зберігати в спільний буфер, щоб потім зберегти в user_vacancy_hour_stats.
CREATE TABLE user_vacancy_hour_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32, view_count UInt64, apply_count UInt64 ) ENGINE = SummingMergeTree() PARTITION BY date ORDER BY (date, time, user_id, vacancy_id);
Для того, щоб дві різні події зберігати в один буфер, їх треба привести до однієї:
package entities type ( // ORDER BY (date, time, user_id, vacancy_id) VacancyHourKey struct { Time int32 UserID uint32 VacancyID uint32 } VacancyHourValue struct { ViewCount uint64 ApplyCount uint64 } VacancyHour struct { VacancyHourKey VacancyHourValue } ) func (v VacancyHour) Query() string { const ( // language=SQL insertSQL = `INSERT INTO user_vacancy_hour_stats (date, time, user_id, vacancy_id, view_count, apply_count) VALUES (?, ?, ?, ?, ?, ?);` ) return insertSQL } func (v *VacancyHour) Values() []interface{} { return []interface{}{ v.Time, v.Time, v.UserID, v.VacancyID, v.ViewCount, v.ApplyCount, } } func (sum VacancyHourValue) Merge(value VacancyHourValue) VacancyHourValue { return VacancyHourValue{ ViewCount: sum.ViewCount + value.ViewCount, ApplyCount: sum.ApplyCount + value.ApplyCount, } }
package summing import ( "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events" ) func VacancyView(source events.VacancyViewEvent) (entities.VacancyHourKey, entities.VacancyHourValue) { return entities.VacancyHourKey{ Time: roundHour(source.Time), UserID: source.UserID, VacancyID: source.VacancyID, }, entities.VacancyHourValue{ ViewCount: 1, ApplyCount: 0, } } func VacancyApply(source events.VacancyApplyEvent) (entities.VacancyHourKey, entities.VacancyHourValue) { return entities.VacancyHourKey{ Time: roundHour(source.Time), UserID: source.UserID, VacancyID: source.VacancyID, }, entities.VacancyHourValue{ ViewCount: 0, ApplyCount: 1, } } func roundHour(s int32) int32 { return s / 3600 * 3600 }
Як бачимо, VacancyHour реалізує інтерфейс Entity.
Для VacancyHour потрібен новий буфер, який реалізує серверну агрегацію, яка за логікою схожа на SummingMergeTree:
package buffer import ( "sync" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/summing" ) type VacancyHourBuffer struct { current map[entities.VacancyHourKey]entities.VacancyHourValue complete []map[entities.VacancyHourKey]entities.VacancyHourValue currentLength int mu sync.Mutex } func NewVacancyHourBuffer() *VacancyHourBuffer { return &VacancyHourBuffer{ current: make(map[entities.VacancyHourKey]entities.VacancyHourValue), } } func (b *VacancyHourBuffer) AddVacancyView(event events.VacancyViewEvent) { b.add(summing.VacancyView(event)) } func (b *VacancyHourBuffer) AddVacancyApply(event events.VacancyApplyEvent) { b.add(summing.VacancyApply(event)) } func (b *VacancyHourBuffer) add(key entities.VacancyHourKey, value entities.VacancyHourValue) { b.mu.Lock() defer b.mu.Unlock() if sum, exists := b.current[key]; exists { b.current[key] = sum.Merge(value) } else { b.current[key] = value b.currentLength += 1 } if b.isCurrentPackFull() { b.completeCurrentPack() } } func (b *VacancyHourBuffer) GetAndClear() [][]entities.Entity { var complete = b.getAndClear() var result = make([][]entities.Entity, 0, len(complete)) for _, keyValueMap := range complete { var pack = make([]entities.Entity, 0, len(keyValueMap)) for key, value := range keyValueMap { pack = append(pack, &entities.VacancyHour{key, value}) } result = append(result, pack) } return result } func (b *VacancyHourBuffer) getAndClear() []map[entities.VacancyHourKey]entities.VacancyHourValue { b.mu.Lock() defer b.mu.Unlock() b.completeCurrentPack() var complete = b.complete b.complete = nil return complete } func (b *VacancyHourBuffer) isCurrentPackFull() bool { return b.currentLength >= packSize } func (b *VacancyHourBuffer) completeCurrentPack() { if b.currentLength > 0 { b.complete = append(b.complete, b.current) b.current = make(map[entities.VacancyHourKey]entities.VacancyHourValue, len(b.current)) b.currentLength = 0 } }
А тепер, як це буде виглядати разом:
package buffer import ( "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/entities" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events" ) type VacancyProxyBuffer struct { vacancyViewBuffer *Buffer vacancyApplyBuffer *Buffer vacancyHourBuffer *VacancyHourBuffer } func NewVacancyProxyBuffer(vacancyViewBuffer *Buffer, vacancyApplyBuffer *Buffer, vacancyHourBuffer *VacancyHourBuffer) *VacancyProxyBuffer { return &VacancyProxyBuffer{ vacancyViewBuffer: vacancyViewBuffer, vacancyApplyBuffer: vacancyApplyBuffer, vacancyHourBuffer: vacancyHourBuffer, } } func (b *VacancyProxyBuffer) AddVacancyView(event events.VacancyViewEvent) { b.vacancyViewBuffer.Add(&entities.VacancyViewEventEntity{event}) b.vacancyHourBuffer.AddVacancyView(event) } func (b *VacancyProxyBuffer) AddVacancyApply(event events.VacancyApplyEvent) { b.vacancyApplyBuffer.Add(&entities.VacancyApplyEventEntity{event}) b.vacancyHourBuffer.AddVacancyApply(event) }
package main import ( "database/sql" "sync/atomic" "testing" "time" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/buffer" "gitlab.com/go-yp/go-clickhouse-example/examples/005-summing/events" "github.com/stretchr/testify/require" _ "github.com/ClickHouse/clickhouse-go" ) func BenchmarkEvents(b *testing.B) { var startTime = time.Now() connect, err := sql.Open("clickhouse", "tcp://clickhouse:9000?database=yaaws") require.NoError(b, err) require.NoError(b, connect.Ping()) require.NoError(b, vacancyViewResetTable(connect)) require.NoError(b, vacancyApplyResetTable(connect)) require.NoError(b, vacancyHourResetTable(connect)) var ( storage = NewStorage(connect) vacancyViewBuffer = buffer.NewBuffer() vacancyViewWorker = NewWorker(vacancyViewBuffer, storage, time.Second) vacancyApplyBuffer = buffer.NewBuffer() vacancyApplyWorker = NewWorker(vacancyApplyBuffer, storage, time.Second) vacancyHourBuffer = buffer.NewVacancyHourBuffer() vacancyHourWorker = NewWorker(vacancyHourBuffer, storage, time.Second) vacancyProxyBuffer = buffer.NewVacancyProxyBuffer( vacancyViewBuffer, vacancyApplyBuffer, vacancyHourBuffer, ) ) var counter = uint32(0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { var i = atomic.AddUint32(&counter, 1) { var event = events.VacancyViewEvent{ Time: int32(i), UserID: uint32(i % 32), VacancyID: uint32(i % 64), } vacancyProxyBuffer.AddVacancyView(event) } { var event = events.VacancyApplyEvent{ Time: int32(i), UserID: uint32(i % 32), VacancyID: uint32(i % 64), } vacancyProxyBuffer.AddVacancyApply(event) } } }) // for graceful shutdown vacancyViewWorker.Shutdown() vacancyApplyWorker.Shutdown() vacancyHourWorker.Shutdown() b.Logf("inserted count ~ %10d %s", 3*b.N, time.Since(startTime)) }
Тепер кожна подія записується в два буфери, один для сирої статистики, а другий для агрегованої.
Агрегування на стороні Go, до збереження в БД, може зменшити навантаження на БД. За потреби можна писати сиру статистику в одну БД yaaws, а агреговану в другу yaaws_aggs. В залежності від навантаження можна взагалі перестати писати сиру статистику і писати лише агреговану.
MATERIALIZED VIEW
Щоб показати, як працює MATERIALIZED VIEW, ми будемо зберігати сирі дані у три різні таблиці, а агрегована статистика буде зберігатись в четверту таблиці за допомогою MATERIALIZED VIEW.
Опишу три таблиці для сирої статистики:
CREATE TABLE user_vacancy_view_source_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32 ) ENGINE = Null(); -- /dev/null
CREATE TABLE user_vacancy_apply_source_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32 ) ENGINE = MergeTree() PARTITION BY date ORDER BY tuple();
CREATE TABLE user_vacancy_hired_source_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32 ) ENGINE = MergeTree() PARTITION BY date ORDER BY tuple();
Агрегуюча таблиця:
CREATE TABLE user_vacancy_hour_stats ( date Date, time DateTime, user_id UInt32, vacancy_id UInt32, view_count UInt64, apply_count UInt64, hired_count UInt64 ) ENGINE = SummingMergeTree() PARTITION BY date ORDER BY (date, time, user_id, vacancy_id);
Тепер 3 MATERIALIZED VIEW для кожної таблиці з сирою статистикою:
CREATE MATERIALIZED VIEW _mv_user_vacancy_view_source_stats TO user_vacancy_hour_stats AS SELECT date, toStartOfHour(time) AS time, -- naming matters user_id, vacancy_id, toUInt64(COUNT(*)) AS view_count, -- naming matters toUInt64(0) AS apply_count -- naming matters FROM user_vacancy_view_source_stats GROUP BY date, time, user_id, vacancy_id; -- with GROUP BY
CREATE MATERIALIZED VIEW _mv_user_vacancy_apply_source_stats TO user_vacancy_hour_stats AS SELECT date, toStartOfHour(time) AS time, -- naming matters user_id, vacancy_id, toUInt64(0) AS view_count, -- naming matters toUInt64(COUNT(*)) AS apply_count -- naming matters FROM user_vacancy_apply_source_stats GROUP BY date, time, user_id, vacancy_id; -- with GROUP BY
CREATE MATERIALIZED VIEW _mv_user_vacancy_hired_source_stats TO user_vacancy_hour_stats AS SELECT date, toStartOfHour(time) AS time, -- naming matters user_id, vacancy_id, toUInt64(1) AS hired_count -- naming matters FROM user_vacancy_hired_source_stats; -- without GROUP BY
Зробимо вставку в кожну сиру таблицю:
INSERT INTO user_vacancy_view_source_stats (date, time, user_id, vacancy_id) SELECT 1627940000 + number, 1627940000 + number, number % 1000, number % 2000 FROM system.numbers LIMIT 8000000; -- completed in 4 s 731 ms
INSERT INTO user_vacancy_apply_source_stats (date, time, user_id, vacancy_id) SELECT 1627940000 + number * 16, 1627940000 + number * 16, number % 1000, number % 2000 FROM system.numbers LIMIT 500000; -- completed in 327 ms
INSERT INTO user_vacancy_hired_source_stats (date, time, user_id, vacancy_id) SELECT 1627940000 + number * 400, 1627940000 + number * 400, number % 1000, number % 2000 FROM system.numbers LIMIT 20000; -- completed in 15 ms
А тепер перевіримо результат:
SELECT COUNT(*), COUNT(DISTINCT (date)), COUNT(DISTINCT (time)), SUM(view_count), SUM(apply_count), SUM(hired_count) FROM user_vacancy_hour_stats;
count() | uniqExact(date) | uniqExact(time) | sum(view_count) | sum(apply_count) | sum(hired_count) |
---|---|---|---|---|---|
4445600 | 94 | 2223 | 8000000 | 500000 | 20000 |
SELECT time, COUNT(*), SUM(view_count), SUM(apply_count), SUM(hired_count) FROM user_vacancy_hour_stats GROUP BY time ORDER BY time DESC LIMIT 5;
time | count() | sum(view_count) | sum(apply_count) | sum(hired_count) |
---|---|---|---|---|
2000 | 2800 | 175 | 7 | |
2000 | 3600 | 225 | 9 | |
2000 | 3600 | 225 | 9 | |
2000 | 3600 | 225 | 9 | |
2000 | 3600 | 225 | 9 |
Як бачимо, таблиця user_vacancy_hour_stats містить правильні значення.
Епілог
У цій статті ми розглянули лише три рушії MergeTree, SummingMergeTree та Null. ClickHouse має значно більше рушіїв: ReplacingMergeTree, CollapsingMergeTree, AggregatingMergeTree та інші. Також ClickHouse має інтеграції з MySQL, PostgresSQL та MongoDB.
Якщо вам було мало тексту, то можете почитати враження Володимира Рожкова про роботу з ClickHouse.
І звісно, репозиторій на GitLab gitlab.com/go-yp/go-clickhouse-example.
Про рекламу вакансій
Зазвичай при написанні статті, коли я вже знаю, що буду публікувати, то шукаю компанію, яка готова прорекламувати в статті свої вакансії з Go, якщо вакансія виглядає привабливою (LGTM), то заходжу на сторінку компанії на DOU, шукаю контакти та пишу пропозицію.
У компанії Softwarium є три вакансії на DOU:
- Senior Golang Developer/Team Lead (sign-on bonus 2000) $6000–7500
- Senior Golang Developer $5500–6800
- Middle Golang Developer $4000–5500
Мені подобається, що у вакансіях вказана винагорода, етапи співбесіди, опис проєкту а також склад команди, з опису видно, що вони дійсно шукають. Стосовно вакансій пишіть Юлії.
28 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів