Як ми автоматизували тестування даних в DWH

Привіт! Мене звати Ангеліна, я Data Engineer з Uklon. Вже понад 7 років я працюю з даними, за цей час здобула значний досвід у розробці, впровадженні та автоматизації дата-інфраструктур. Оскільки бізнесу потрібно забезпечити високу якість та ефективність даних, питання автоматизації тестування займає велику частину моєї роботи.

В Uklon ми давно займаємось автоматизацією роботи з даними та тестуємо різні шляхи й інструменти. У цій статті поділюся кейсом впровадження підходу Slim CI, а також розкажу, як здешевити, спростити та автоматизувати процес тестування моделей даних та залежних від них моделей.

Трохи контексту

Тестування безумовно є важливою частиною створення програмних продуктів. І якщо про підходи до тестування програмного коду вже сказано доволі багато, то підходи до тестування моделей даних і безпосередньо самих даних, зокрема тестування наповнення DWH, висвітлюються не часто.

Якщо коротко, то DWH складається з моделей даних (таблички або view). Більшість моделей пов’язані між собою і часто залежні одна від одної. На основі цих залежностей будується Lineage Graph. За допомогою Lineage Graph можна побачити, що зміна в одній моделі даних може викликати зміни у сотнях залежних від неї моделей.

Загалом задача кожної з моделей — це або підготовлювати дані для наступної моделі, або містити дані, готові для аналітичного споживання. Логічно, що для коректної аналітики потрібні коректні дані. Їх коректність найчастіше тестується такими перевірками: наявність за певний проміжок часу, перевірка на допустимі значення, перевірка на дублікати тощо.

На практиці виходить так, що якщо вносиш зміну в одну модель даних, то тестувати треба і цю модель, і всі залежні від неї (а таких може бути й сотні). Тестувати їх бажано на даних, еквівалентних продакшн-даним. Це звучить дорого і довго, але на практиці процес можна здешевити, спростити й автоматизувати. В Uklon ми зробили це за допомогою імплементації Slim CI на базі технологій DBT, Snowflake і Gitlab.

Короткий екскурс залученими технологіями

В цьому розділі я не буду детально описувати всі функції, переваги або недоліки DBT, Snowflake і Gitlab, а сфокусуюсь на тому, як вони використовуються в Uklon як частина Business intelligence. Тож дуже коротко про залучені технології:

Snowflake — Cloud Data Platform і навіть більше. Хмарна платформа, що дозволяє ефективно виконувати велику кількість аналітичних запитів, гнучко налаштовувати доступи до об’єктів DWH і до самих даних, моніторити використання ресурсів тощо.

DBT (Data Build Tool) — це open-source фреймворк для трансформації даних. Загалом за допомогою DBT можна:

  • розробляти DWH (трансформація даних);
  • тестувати й документувати дані;
  • використовувати систему контролю версій.

В Uklon DBT використовується для розробки DWH.

Тобто у DBT ми пишемо код, який виконується на Snowflake.

Gitlab — система контролю версій, що також надає інструмент CI/CD.

Slim CI — це CI-підхід, який виконує ті завдання і тести, що стосуються модифікованих сутностей або залежних від них. Саме цей підхід допомагає нам уникати надлишкового тестування внесених змін.

Як побудований процес внесення змін в DWH

Загалом процес змін в дата-моделі складається з наступних етапів:

  1. Внесення змін у відповідну дата-модель і локальне тестування.
  2. Створення Merge Request, під час якого відбувається не лише рев’ю кода інженером або аналітиком, а і CI-джобою, яка перевіряє форматування, документацію тощо.
  3. Автоматичний запуск CI-джоби, яка тестує моделі даних (про цю джобу згодом поговоримо детальніше).
  4. Після успішного проходження рев’ю — мердж і застосування змін на продакшн-середовищі.

Схематично це виглядає так:

Імплементація Slim CI

Ми імплементували Slim CI за допомогою СI-джоби на Gitlab. Задачі цієї джоби:

  1. Створення окремої бази даних для тестування.
  2. Клонування продакшн-даних в цю нову базу даних.
  3. Запуск* модифікованих дата-моделей разом із їх дочірніми моделями.

*Під запуском моделей даних я маю на увазі виконання команди dbt-build. Ця команда компілює sql код моделей (DDL і DML), виконує його і запускає тести, пов’язані з цими моделями даних.

Тепер детальніше про кожен пункт.

Створення окремої бази даних для тестування

Команду створення окремої бази даних CREATE DATABASE <DATABASE_NAME> ми огорнули у dbt macro:

{#
-- This macro creates a database.
#}
{% macro create_database(database_name, retention=1) %}

 {% if database_name %}

   {{ log("Creating database " ~ database_name ~ "...", info=True) }}

   {% call statement('create_database', fetch_result=True, auto_begin=False) -%}
       CREATE DATABASE {{ database_name }}
       DATA_RETENTION_TIME_IN_DAYS = {{retention}}
   {%- endcall %}

   {%- set result = load_result('create_database') -%}
   {{ log(result['data'][0][0], info=True)}}

 {% else %}

   {{ exceptions.raise_compiler_error("Invalid arguments. Missing database name") }}

 {% endif %}

{% endmacro %}

Вхідні параметри:

  • database_name — ім’я новоствореної бази даних.
  • retention — значення параметра DATA_RETENTION_TIME_IN_DAYS, що визначає кількість днів зберігання даних.

Тут важливо зрозуміти, що Snowflake може зберігати базу даних навіть після її явного видалення. Така поведінка зумовлена механізмом Time Travel.

За замовчуванням DATA_RETENTION_TIME_IN_DAYS дорівнює одному дню. Тому коли викликаємо macro create_database, ми явно передаємо значення 0, оскільки не хочемо платити за зберігання даних з бази, яка існує тільки протягом відпрацювання CI-джоби.

Клонування продакшн-даних в нову базу даних

Після створення бази даних ми маємо клонувати туди продакшн-дані. Зрозуміло, що фактичне клонування десятків терабайт даних — це довге і дороге задоволення, але його можна пришвидшити й здешевити за допомогою zero copy-cloning. Суть цього механізму в тому, що безпосередньо дані фізично не копіюються.

В новій базі даних ми отримуємо посилання на вже існуючі дані й платимо лише за ті дані, які змінили чи додали вже після клонування. Тобто лише за різницю між даними на оригінальній базі даних і на клонованій.

Загалом є два способи клонування.

  • Командою Snowflake:

CREATE DATABASE <object_name> CLONE <source_object_name>

В такому випадку очевидно не треба використовувати macro create_database.

  • За допомогою dbt:

dbt clone

Спочатку ми спробували перший варіант, але він виявився занадто довгим і займав близько семи хвилин. Тож для пришвидшення клонування ми спробували використати команду dbt clone і це дало результат — процес став в середньому тривати хвилину.

Таке пришвидшення в основному можливе завдяки тому, що dbt clone можна запускати у декілька потоків. Також можна обмежити кількість об’єктів, що копіюємо. В нашому випадку це лише пов’язані зі зміненим об’єкти. Ось такий вигляд має dbt clone, що ми застосовуємо:

dbt clone —select @state:modified —state=target-base —threads=36

Запуск модифікованих дата-моделей з дочірніми моделями

Запуск модифікованих дата-моделей разом із їх дочірніми моделями виконується командою dbt build. Ми застосовуємо dbt build в такому вигляді:

dbt build —select state:modified+ —state=target-base —threads=36 ${BUILD_MODE}

Нарешті повний код CI-джоби:

dbt-test:
 image: ${DBT_IMAGE}
 stage: test
 when: manual
 interruptible: true
 variables:
   DBT_PROFILES_DIR: ${DBT_PROJECT_DIR}/profile
   RUN_ELEMENTARY: "false"
   CI_DATABASE: db_${CI_COMMIT_SHORT_SHA}
   CI_WAREHOUSE: CI_WH
 script:
   - git branch
   - echo $CI_DATABASE
   - git fetch
   - cd ${DBT_PROJECT_DIR}
   - pip install -q -r requirements.txt
   - dbt deps
   # set evn variables
   - SNOWFLAKE_DBT_DATABASE=ANALYTICS
   - SNOWFLAKE_DBT_WAREHOUSE=${CI_WAREHOUSE}
   - echo $SNOWFLAKE_DBT_WAREHOUSE
   # generate mainfest.json for main branch
   - git checkout origin/main
   - dbt compile --target-path=target-base
   - dbt docs generate --target-path=target-base
   # create new db
   - |
      dbt run-operation create_database --args "{'database_name': $CI_DATABASE, 'retention': 0}"
   # generate manifest.json for feature branch
   - git checkout $CI_COMMIT_REF_NAME
   - dbt compile
   # clone db
   - SNOWFLAKE_DBT_DATABASE=$CI_DATABASE
   - dbt clone --select @state:modified --state=target-base  --threads=36
   # compare manifests
   - dbt ls --select state:modified+ --state=target-base
   - echo ${BUILD_MODE}
   - dbt build --select state:modified+ --state=target-base --threads=36 ${BUILD_MODE}
   - dbt docs generate --models state:modified+ --state=target-base
   - recce run -o recce.json
 artifacts:
   paths:
     - snowflake-dbt/recce.json
   expire_in: 1 hour
 after_script:
   - |
      dbt run-operation drop_database --args "{'database_name': $CI_DATABASE}"

Найпоширеніші кейси виявлення помилки

Припустимо ми видалили колонку CALL_STARTED_AT в моделі our_model, оскільки вважали її зайвою. Оскільки моделей і залежностей між ними багато, а локально ми тестували неуважно. Видалення поля може потенційно призвести до помилок у дочірніх моделях. Це і сталося, але CI джоб виявив цю помилку до мерджа змін і впав з помилкою:

Completed with 1 error and 0 warnings:
Database Error in model another_our_model (models/marts/another_our_model.sql)
000904 (42000): SQL compilation error: error line 19 at position 58
  invalid identifier 'CALL_STARTED_AT'

Окей, цей кейс можна покрити уважним локальним тестуванням, але автоматизація зручніша, надійніша і швидша.

Розглянемо наступний варіант. Ми вирішили, що ця умова в нашій моделі our_model зайва:

qualify ROW_NUMBER() over (partition by call_id order by __created_ts_millis desc) = 1

Відповідно ми прибрали її та прогнали змінену модель і її дочірні моделі локально. Тестовий датасет був неповний, тому локальне тестування не виявило проблем. Але наш CI-джоб при тестуванні на всіх необхідних продакшн-даних видає наступну помилку:

Completed with 1 error and 0 warnings:
Database Error in model another_our_model (models/marts/another_our_model.sql)
  100090 (42P18): Duplicate row detected during DML action
  Row Values: [redacted]

Бонус

Для красивої візуалізації потенційних змін ми вирішили використати recce. Recce — це інструмент перевірки зміни даних для проєктів dbt. CI-джоба створює артефакт recce.json, за допомогою якого можна побудувати Lineage graph для наших змін:

Висновки

Якісне тестування допомагає виявити проблеми вчасно і запобігти не лише помилкам «компіляції» моделей даних, а і проблемам неузгодженості даних.

Використання підходу Slim CI й механізму zero copy-cloning дозволяє тестувати зміни на продакшн-даних без надлишкового використання ресурсів.

Усі статті, обговорення, новини про тестування — в одному місці. Підписуйтеся на DOU | QA!

👍ПодобаєтьсяСподобалось10
До обраногоВ обраному8
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Дякую, цікавва стаття! Занотуав собі для POC)

Вітаю!

Я бачу що цей стек SF + dbt набирає обертів.

З цього приводу маю декілька питань:

1) SF наче не має жорстких constraints ніж not null, як Ви покриваєте цю прогалину — виключно dbt?
1.1) з цих особливостей СФ випливає наступні два умовні типи тестування БД — «технічне» і «бізнес» тестування. Наприклад еволюція схеми і дублікати — це ближче до технічного тестування, бо мало стосується значень в самих табличках . А перевірка що вік замовника більше 21 року, що

CALL_STARTED_AT

— не може бути більше поточного timestamp тощо — це виключно забаганки бізнесу.
Чи вирізняєте Ви ці 2 види тестів і як їх реалізували з Дбт? Чи використовуєте пакети dbt tests та інші
2) З прикладу в статті СІ впав тільки тому що дублікати були зазначені на рівні ERROR, замість WARN наприклад- чи всі тести на цьому максимальному рівні чи є нижчого і що робити якщо моделька пройшла але купа вранішніх?
3) при інтенсивній розробці кожен білд/ тест відбувається на повному наборі даних — це трошки нагоняє витрати. Який розмір / витрати асоційовані з

CI_WH

, можна у відносних порівняннях з основними WH

4) самі фахівці дбт в своїх блогах радять не сильно нажимати на тести саме в дбт, а виносити окремо в інші інструменти — як Ви ставитеся до цього і як тоді забезпечити достатній рівень QA?

5) Я так розумію дбт Ви самостійно підтримуєте безкоштовний пакет, не розглядали комерційний дбт клауд?

6) Чи розглядали альтернативні інструменти до дбт? Умовно Flyway і stored procedures в SF?

7) Чи є streaming sources у Вашого dwh? Чи використовуєте інструменти СФ як Dynamic tables, materialized views та SnowPipes? Як вони інтегруються з дбт кодом?

Уявімо що навіть всі існуючі дбт тести не спрацювали і в наші таблички попролізало всіляке сміттячко — як Ви це все будете відкочувати назад і чи є відповідні дбт моделі/ механізми?

П С
Чого CI DATABASE просто не створити як Transient і не гратися з ретеншн тайм🤓?

Вітаю!
Рада, що вас зацікавила моя стаття 😊.

1) Так, dbt допомагає компенсувати відсутність constraints в Snowflake. Наприклад, ми часто використовуємо unique_key.

1.1) Щодо пакетів, які використовуємо для тестування:
— dbt_utils
— elementary
— dbt_project_evaluator
— dbt_expectations

2) Питання стосовно того, як реагувати на варнінги, є дещо філософським. Загалом, у нас налаштовані алерти на падіння тестів. Якщо виникли дублікати в моделі чи відсутні «свіжі» дані в сьорсі, то намагаємось якомога швидше з’ясувати причину і пофіксити.

3) Частка витрат на CI_WH незначна, оскільки моделі набагато більше кверяються, ніж створюються. Тут, на всяк випадок, уточню, що тести відпрацьовують не лише, коли ми впроваджуємо зміни, а і під час періодичного білду моделей (а це вже інший WH).

4) Поки не стикалися з проблемою надлишкової експлуатації тестів dbt.

5) Так, нам достатньо dbt Core. Це не забирає багато часу і поки є оптимальним для нас варіантом.

6) В нашому випадку dbt набагато краще закриває потреби щодо трансформації даних, ніж Flyway і SPs. Додатково, ми використовуємо schemachange для «ручних» активностей.
Підкажіть, будь ласка, більш конкретний кейс використання Flyway і SPs. Так я зможу краще пояснити наш вибір dbt як інструмента.

7.1) Якщо загалом про streaming, то є Kafka Connect. Чи це питання стосується streaming лише всередині Snowflake?
Dynamic tables поки «не прижились». Щодо materialized views — ще не використовуємо.

7.2) Якщо попролізало сміттячко, то фіксимо root cause проблему в сирих даних і робимо partial full-refresh залежних моделей.

З приводу CI_DATABASE. Тут абсолютно погоджуюсь, можна і так зробити👍.

Якщо залишились ще питання, то буду рада відповісти🙂.

Kafka connect він же поки що тільки на інпут працює? Якщо щось з дата марту треба output стрімити то інший інструмент треба? Чи у вас немає таких випадків?

Наскільки я знаю в СФ тільки sprocs підтримують tracing, dbt ще не вміє ? Як Ви роздаєте ролі/ привілегії в СФ , через дбт? І стейдж таблички ви їх теж через дбт створюєте?

Ось до цього питання ще хотілось би повернутися:

Чи вирізняєте Ви ці 2 види тестів і як їх реалізували з Дбт?

Чи тюнили Ви якісь СФ обʼєкти ( налаштування патрицій тощо) і чи не перетирає це дбт білд?

Що по referential integrity- теж через дбт перевіряєте? З цікавості — яка приблизна кількість рядків в найбільшій/ найменшій табличці і скіко то часу займає прогнати тести?

Так, Kafka Connect в нашому кейсі тільки стрімить дані в DWH. Нам не часто доводиться переливати дані з март-рівня, але якщо й доводиться, то використовуємо Airbyte.

DBT логує результат команди dbt build для моделей. Наразі не потребуємо більш детального логування.
RBAC реалізовано за допомогою permifrost.
Stage-моделі ми створюємо також через dbt. Найчастіше це не таблички, а view.

Загалом ми не вирізняємо ці 2 типи тестів.

Якщо я правильно зрозуміла питання про налаштування партицій, то це зазначаємо за потреби у config-блоці моделі, в параметрі cluster_by. Якщо раніше був визначений інший ключ кластеризації, то dbt build його змінить.

Зазвичай із constraints ми використовуємо лише unique_key, але в dbt є можливість налаштування primary key і foreign key.

Щодо обсягів даних, то в деяких таблицях їх терабайти. Тестування і run таких моделей можна суттєво оптимізувати завдяки інкрементальній матеріалізації, умові для обмеження даних у самих тестах тощо.

Підписатись на коментарі