Приклад роботи з Azure Data Factory: як копіювати дані з бекапу у сховище

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Вітаю всіх. Мене звати Іван, наразі я працюю дата-інженером в компанії Luxoft DXC Technology Company.

На нашому проєкті досить багато процесів, інфраструктури та пайплайнів побудовано на хмарній платформі Azure від компанії Microsoft. Одним з таких пайплайнів хочу поділитися з шановною аудиторією.

Опис завдання

Наш постачальник даних з певною періодичністю додає повний бекап Бази Даних у своє S3 сховище, і там ці бекапи й живуть. Наша мета — знайти найсвіжіший бекап, завантажити його в наше ADLS сховище, ну, і потім розгорнути на відповідному Azure SQL Server.

Темою даної статті буде саме опис автоматизації пошуку цього файлу з допомогою такого інструменту як Azure Data Factory. Чому саме цей інструмент було обрано і які були розглянуті альтернативи?

  • По-перше, вже існувало декілька схожих пайплайнів на проєкті;
  • по-друге, стосовно Azure Databricks пролунала думка, що це трошки задорого;
  • по-третє, мені хотілось спробувати саме цей інструмент :-).

З інших доступних варіантів я б назвав HDInsight, Synapse Analytics або власний кластер spark-on-k8s в Azure Kubernetes Services. Якщо можете запропонувати інші варіанти або ідеї — запрошую у коментарі.

Джерела, з якими будемо працювати в прикладі

Зважаючи на вимоги конфіденційності я постарався знайти якийсь загальний приклад і, на мою думку, тут є досить детальний опис того, як створити свій перший ADF пайплайн. Що важливіше — хлопці досі хостять вихідні дані і навіть надають доступ до них за цією адресою.

Компанія Microsoft надає можливість безкоштовно використовувати ресурси платформи Azure протягом 12 місяців (з певними обмеженнями потужності, звісно), тому якщо вам ще не доводилося працювати з цією платформою — можна почати за цим посиланням.

Не буду детально розписувати процес створення Subscription, Resource Group та Azure Data Factory — все досить детально викладено в цьому гайді, а покрокова інструкція по створенню власного сховища — тут.

Задача, яку будемо розвязувати

Уявімо, що якась 3-тя сторона (вендор або постачальник даних) з певною періодичністю оновлює потрібні вам дані в своєму сховищі, до якого у вас є доступ, у вигляді окремих файлів — це можуть бути логи, CSV, нові партиції для parquet або навіть свіжі бекапи бази даних на кшталт **.bak** файлів.

Нас цікавить саме останній файл, який необхідно знайти і завантажити в наше сховище для подальшої обробки.

Так в нашому прикладі з **mdwresources** я пропоную розглянути теку *nyctaxidata* з досить відомим та поширеним набором даних про деталі роботи однієї зі служб таксі Нью-Йорка. Нижче бачимо, що тека містить 6 файликів за перше півріччя 2019 р.:

Інтуїтивно результатом роботи має бути файл за 06 місяць — тож пересвідчімося в цьому.

Для цього ми створили нашу ADF і готові до складання пайплайну. Чому я кажу складання? Бо сам портал і елементи пайплайну підштовхують до No Code/ Low Code підходу в роботі.

Не скажу, що це набагато простіше, порівняно з написанням якихось spark.read.parquet або SQL-запитів, проте, як на мене, дещо швидше.

Перший елемент GetMetadata з пункту General дозволяє от прям миттєво почати будувати конекти до джерел даних, передавати якісь параметри доступу тощо. Проте я раджу йти дещо у зворотному напрямку і починати створення пайплайну у такій послідовності:

**Linked Service** -> **Data Set** -> down-stream pipelines

Тому, що допоки перші 2 пункти не будуть працювати — ADF не дасть зберегти пайплайн. А так ще можна впевнитись, що є доступ до джерела та створено відповідний датасет.

Тож для **Linked Service** буде достатньо подібного налаштування:

В дата сет я вже додав 2 параметри — кореневу теку та ім’я файлу.




Параметр з іменем файлу нам знадобиться, коли ми будемо перебирати файли в теці в одному з циклів. Взагалі я маю відзначити, що в ADF повно місць та можливостей для параметризації пайплайну — це і параметри пайплайну, і окремі змінні, також параметри просто окремих складових елементів. Ось як виглядає готовий пайплайн:

Так, в цьому пайплайні я задав параметр **`src_folder`** — якщо в якийсь момент ми захочемо шукати файли в іншій теці в межах нашого Дата Сету (в якому також вже завчасно передбачено параметр для кореневої теки). При кожному запуску пайплайну ADF питатиме значення для параметра. Щоб задати якісь константи, magic numbers, патерни тощо — краще використати змінні (variables).

Стосовно змінних, то в цьому випадку я використав 3 змінні: NYC_files_timestamps, latest_file_name та latest_timestamp.

Окремо виділив корисні опції в контекстному меню для навігації

З них перша — це результат якраз пошуку способу пришвидшити пайплайн. Як зазначено тут — пошук найсвіжішого файлу зводиться до ітеративного перебору кожного таймстемпу і порівняння з поточним максимумом.

Примітка: для порівняння двох дат в ADF спочатку необхідно їх привести до unix_time формату за допомогою вбудованої функції @ticks() На практиці це відбувалось ду-у-у-же довго, і тому я прийшов до наступного вирішення:

Перший цикл просто збирає тайстемпи в форматі тікс в нашу змінну-контейнер NYC_files_timestamps, а другий — бере іншу вбудовану функцію @max(NYC_files_timestamps), і порівнює кожен таймстемп з максимумом. За допомогою цього ми можемо виконувати цикл як **non-sequential**, і розігнати кількість потоків для виконання.




На мою особисту думку, от цей Pipeline expression builder — це сама незручна з т.з. UI частина платформи. На моєму ноутбуці зовсім мало елементів вміщується на екрані без скрола. Якщо не знати особливості вбудованих функцій, то досить важко прочитати doc_string.

Якщо ж провалитися в перший цикл ForEach, то він виглядає наступним чином:




Порівнюючи елемент GetMetadata в циклі з тим, що на початку пайплайну, бачимо, що хоч датасет й той самий, тепер ми передаємо другий параметр (ім’я поточного файлу). Для цього також є вбудована функція/ метод @item().

Другий ForEach цикл загалом подібний до вже описаного вище з додатковим If елементом, який я хочу навести тут:




Саме цей елемент і дає відповідь на наше питання — який файл було додано останнім до джерела. В пайплайні ще використано додатково Append елемент в кінці — такий собі костиль для виводу значень, які було пропущено під час дебагінгу.

Результати виконання пайплайну




Як бачимо, виявляється, на сервер додали файл за 04 місяць одним з останніх — трошки неочікувано, але в житті дата-інженера трапляється :-).

Взагалі стосовно виконання пайплайну в період тестування порекомендую запускати його саме за допомогою кнопки Debug, проте майте на увазі — якщо в пайплані є якісь елементи, що копіюють, змінюють чи видаляють дані, то вони все одно будуть виконані, і зміни будуть незворотні.

Вихідний код пайплайну

<details>
 <summary><b>Resulting pipeline in JSON format for VCS commit</b></summary>
```json
{
    "name": "Yellow_Cab",
    "properties": {
        "activities": [
            {
                "name": "Get Metadata From source Root Folder",
                "type": "GetMetadata",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "Yellow_Cab",
                        "type": "DatasetReference",
                        "parameters": {
                            "default_src_folder": {
                                "value": "@pipeline().parameters.src_folder",
                                "type": "Expression"
                            },
                            "filename": " "
                        }
                    },
                    "fieldList": [
                        "childItems",
                        "itemName",
                        "lastModified"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobStorageReadSettings",
                        "recursive": true,
                        "enablePartitionDiscovery": false
                    },
                    "formatSettings": {
                        "type": "BinaryReadSettings"
                    }
                }
            },
            {
                "name": "Loop through separate files in the folder",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Get Metadata From source Root Folder",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Get Metadata From source Root Folder').output.childItems",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Get Metadata for a particular file",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "Yellow_Cab",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "default_src_folder": {
                                            "value": "@pipeline().parameters.src_folder",
                                            "type": "Expression"
                                        },
                                        "filename": {
                                            "value": "@item().name",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "itemName",
                                    "lastModified"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobStorageReadSettings",
                                    "recursive": true,
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "BinaryReadSettings"
                                }
                            }
                        },
                        {
                            "name": "Append File timestamp to a list",
                            "type": "AppendVariable",
                            "dependsOn": [
                                {
                                    "activity": "Get Metadata for a particular file",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "NYC_files_timestamps",
                                "value": {
                                    "value": "@ticks(activity('Get Metadata for a particular file').output.lastModified)",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Find_the_latest_file_name",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Loop through separate files in the folder",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Get Metadata From source Root Folder').output.childItems",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Find_timestamp",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "Yellow_Cab",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "default_src_folder": {
                                            "value": "@pipeline().parameters.src_folder",
                                            "type": "Expression"
                                        },
                                        "filename": {
                                            "value": "@item().name",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "itemName",
                                    "lastModified"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobStorageReadSettings",
                                    "recursive": true,
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "BinaryReadSettings"
                                }
                            }
                        },
                        {
                            "name": "If Condition1",
                            "type": "IfCondition",
                            "dependsOn": [
                                {
                                    "activity": "Find_timestamp",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@equals( ticks(activity('Find_timestamp').output.lastModified)\n        , max(variables('NYC_files_timestamps'))\n        )",
                                    "type": "Expression"
                                },
                                "ifTrueActivities": [
                                    {
                                        "name": "Set variable1",
                                        "type": "SetVariable",
                                        "dependsOn": [],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "latest_file_name",
                                            "value": {
                                                "value": "@activity('Find_timestamp').output.itemName",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            }
                        },
                        {
                            "name": "Set variable2",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "latest_timestamp",
                                "value": {
                                    "value": "@string(max(variables('NYC_files_timestamps')))",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Append variable1",
                "type": "AppendVariable",
                "dependsOn": [
                    {
                        "activity": "Find_the_latest_file_name",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "NYC_files_timestamps",
                    "value": {
                        "value": "@variables('latest_file_name')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "parameters": {
            "src_folder": {
                "type": "string",
                "defaultValue": "nyctaxidata"
            }
        },
        "variables": {
            "NYC_files_timestamps": {
                "type": "Array"
            },
            "latest_file_name": {
                "type": "String"
            },
            "latest_timestamp": {
                "type": "String"
            }
        },
        "folder": {
            "name": "TESTS"
        },
        "annotations": []
    }
}
```
</details>

Замість висновків

ADF — досить простий та потужний інструмент, за допомогою якого можна швидко зібрати low-hanging fruits і автоматизувати цей збір на майбутнє. Йому точно не вистачає можливостей повноцінних рішень поширених мов програмування, але його і не позиціонують як заміну конкурентів. Навпаки, ADF може запускати датабрікс кластер, окремі jar файли тощо.

Дякую всім за приділені час та увагу. Сподіваюсь стаття стане комусь в пригоді в процесі роботи чи підготовки до сертифікації по платформі Azure.

Інші корисні ресурси

  1. ADF: copy last modified blob here («Microsoft Docs»).
  2. How to loop through each file after we get a list from S3 here.
  3. Tip used to create logging branch of the pipeline (also helps to debug) here.
👍ПодобаєтьсяСподобалось2
До обраногоВ обраному0
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Підписатись на коментарі