DASK — твой помощник в Data Science. Разбираем преимущества фреймворка на примерах

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Привет! Меня зовут Сергей Пащенко, я Data Scientist в NIX.

В моей практике на продакшене часто возникает потребность в оптимизации определенных процессов с точки зрения быстродействия, памяти и т. п. Я использую самые различные алгоритмы и всегда ориентируюсь на бизнес-идею и желания заказчика. Что касается отработки big data, то здесь большую роль играет вычислительная мощность. Поэтому так важно найти инструмент, который ускорит этот процесс, с чем как раз и поможет фреймворк DASK.

В статье я разберу конфигурацию кластера DASK, отложенные функции и декораторы. Я приведу несколько простых примеров чтения csv-файлов, определения датафреймов, оборачивания исходных функций в декораторы и покажу, чем полезен мониторинг метрик. Также на реальном кейсе я расскажу о применении DASK в продакшен-задаче.

Если вы специалист в области Big Data или Data Science, этот материал поможет вам легко расширять, создавать новые проекты и проводить имплементацию в уже существующие на базе фреймворка DASK. Кроме того, вы сможете применять разработанные модули в облачных сервисах и платформах.

За 10 лет количество данных значительно увеличилось и продолжает расти

Давайте вспомним среднестатистического пользователя девайсов в 2010 году. Зачастую он за компьютером, не выпускает из рук смартфон и сидит в нескольких соцсетях. Сегодня же поток данных идет из куда большего количества каналов. Десятки соцсетей и сервисов, умные часы и очки, телевизионные и игровые приставки и даже смарт-автомобили — все они могут собирать и обрабатывать данные, закачивать прошивки и т. д. Отсюда и стремительный рост big data.

По информации International Data Corporation, к 2025 году ожидается рост до порядка 170 петабайт данных. По личным наблюдениям, могу сказать, что это в 10 раз больше, сравнивая со статистикой десятилетней давности.

Также наблюдается эволюция вычислительных инстансов — центральных процессоров и графических ускорителей. Увеличивать количество транзисторов на единицу площади становится все сложнее. Хотя про Graphics Processing Unit так сказать нельзя. По статистике, за 10 лет рост производительности в его случае ведет себя почти линейно. Однако в последние годы появились более мощные графические ускорители, которые позволяют эффективно и быстро справляться с задачами по Computer Vision, Data Mining, Time Series Prediction и NLP. Еще пять лет назад это все считалось практически нерешаемым.

Почему DASK

Каждый Data Scientist когда-либо строил дата пайплайны (Data Pipeline, ML Pipeline или их комбинации). До тренировки модели или ее использования на инференсе данные необходимо обработать. Если их много, для этого подойдет именно DASK.

Разобраться с DASK не составит большого труда. У фреймворка обширная документация с огромным количеством примеров. В цикле преобразования (Extract, Transform, Load) DASK занимает место в точке трансформатора. Фреймворк можно использовать как in-point для трансформации одних данных в другие. Так, Raw data ((сырые данные) мы преобразовываем в enriched data и складываем в хранилище, чтобы в итоге задействовать для обучения модели.

Возможности и преимущества DASK

«Под капотом» у DASK — pandas-фреймворки. Он поддерживает docker-контейнеры, разные форматы данных (.csv, .parquet, .json) и может работать с дампами базы данных. Также DASK поддерживает декораторы, что позволяет перевести функции из разряда обычно исполняемых в ранг отложенных. Этот фреймворк предусматривает параллельный режим выполнения и, что тоже важно, масштабируемость. Мы можем насетапить определенное количество compute-инстансов и раскидать задачи, которые задаются при помощи DASK-графа и инициализируют compute-инстансы. Так можно значительно повысить производительность.

В отличие от обычного мультипроцессинга, позволяющего просто распараллелить задачу, DASK дает возможность смотреть на метрики производительности. А это важно при построении и анализе Data Pipeline по преобразованию больших данных. Ведь когда вы строите пайплайн, вам необходимо четко понимать, какой модуль и какой элемент работает эффективно и где возникает bottleneck. С DASK же можно увидеть эти метрики и определить, что делать дальше.

Кроме того, DASK можно интегрировать в Google Cloud Platform, Azure и AWS, что позволит автоматически масштабировать компьют-инстансы. К тому же многие клиенты сейчас очень часто обращаются к этим облачным платформам. Значит, большое количество бизнес-задач вы сможете решать при помощи DASK и тем самым покрывать необходимые потребности заказчиков.

Три способа конфигурации DASK кластера

Сразу оговорюсь: далее мы рассмотрим пример локального кластера. Если вам интересно, как насетапить клауд-кластер DASK, вы можете пройтись по ссылкам в конце статьи и узнать все детали. Что касается примера, здесь все просто. Вызываем клиента, выбираем опцию локального кластера и указываем количество воркеров, а затем — количество потоков на наших воркеров. Готово — кластер насетаплен.

from dask. distributed import Client, LocalCluster 
local_cluster =  LocalCluster (n_workers = 4, threads_pet_worker=2)
client = Client (local_Cluster) 
print (client) 
✓ 1.3s
<Client: ‘tcp://127.0.0.1:43271’ process=4 threads=8, memory=7.76 GiB>

Теперь предлагаю посмотреть, как DASK может превращать обычные функции в отложенные на примере функции, которая выполняет суммирование элементов списка. В первом случае выполняется обыкновенная функция, во втором случае функция оборачивается в декоратор delayed, а в третьем — идет вызов модуля compute.

import dask 
import dask.dataframe as ddf 
from dask.delayed import delayed
✓ 1.5s

Время выполнения этого блока занимает полторы секунды.

def get_sum (data):
      sum = Ø 
      for current_item in data:
            sum + = current_item 
      return sum 
data = [1, 2, 3, 4]
res = get_sum (data) 
print (res)
✓ 0.5s
10

@delayed 
def get_sum (data):
       sum = Ø 
       for current_item in data:
             sum + = current_item 
       return sum 
data = [1, 2, 3, 4]
res = get_sum (data) 
print (res)
✓ 0.4s
Delayed (‘get_sum-e2a760b3-b2c1-4cec-b4a7-a8584bd33ece’)

@delayed 
def get_sum (data):
       sum = Ø 
for current_item in data:
             sum + = current_item 
       return sum 
data = [1, 2, 3, 4]
res = get_sum (data).compute ()
 print (res)
✓ 0.5s
10

Отмечу несколько важных моментов. Во-первых, результат функции составляет 10 — это по сути сумма всех элементов. Во-вторых, время выполнения достигает 0,4-0,5 секунд на каждом модуле. Но при внимательном изучении второго модуля можно заметить: здесь на самом деле нет никакого результата. Вместо этого есть объект DASK dataframe. Почему так происходит? Когда вы оборачиваете исходную функцию в декоратор, то по факту она становится ленивой и ничего не делает. Чтобы заставить функцию делать что-то полезное, нужно принудительно дернуть ее. Для этого как раз и вызывают метод compute, (как в третьем блоке кода), благодаря чему происходит вычисление.

Как происходит чтение csv-файлов в DASK

DASK по синтаксису почти не отличается от pandas. Если вы знакомы с этим инструментом, то легко освоите DASK для оптимизации своих пайплайнов по обработке данных. Это хорошо видно на следующем примере:

import pandas as pd 
import dask. dataframe as ddf 
✓ 0.2s

%time pd_df = pd/read_csv( “/home/serhii/Projects/Dask/example.csv”) 
✓ 3.2s
CPU times: user 1.81 s, sys: 549 ms, total: 2.36 s 
Wall time: 3.16 s 
%time dk_df = ddf.read_csv (“/home/serhii/Projects/Dask/example.csv”)
✓ 0.7s
CPU times: user 57.6 s, sys: 20.7 ms, total: 78.3. s 
Wall time: 667 s 
%time dk_df = ddf.read_csv (“/home/serhii/Projects/Dask/example.csv”).сompute ()
CPU times: user 1.35 s, sys: 366 ms, total: 1.72 s 
Wall time: 1.24 s 

В этом примере используются полусинтетические данные заказчика, которые помещены в csv-файл. Попробуем их вычитать. В одном случае в первой строке это делается при помощи pandas датафрейма и занимает 2,36 с. Во втором случае идет обращение к DASK датафрейму и запись данных в DASK датафрейм. На это уходит около 78 миллисекунд. Можно подумать, насколько же быстрее DASK справляется с задачей, однако на самом деле это не так. Не стоит забывать про ключевое слово compute. 78 миллисекунд — это создание инструкции графа для отложенной задачи. Чтобы заставить DASK выполнять задачу, нужно вызвать compute, что и происходит в предпоследней строке. Тогда требуемое на считывание данных время составляет уже 1,72 с. Выигрыш, конечно, есть, но уже не такой радикальный.

В данном примере мы читаем обыкновенный single csv-файл, но для DASK это не очень хороший подход. Лучше на практике использовать parquet-файлы, которые позволяют эффективнее распределить нагрузку между compute-инстансами или корами процессора. Если такой возможности нет, то стоит хотя бы сделать партицированный csv. В таком случае DASK будет смотреть на каждую партицию в отдельности и вычитывать ее в параллельных потоках. Но даже при работе с single csv-файлом можно получить небольшой профит.

Объединение датафреймов

Для примера возьмем демонстрационные данные для оценки работы join датафрейма. Создаем два DASK датафрейма: DASK Dataframe 1 и DASK Dataframe 2. После этого создаем результирующий датафрейм, где указываем ключевое слово merge, и то, как мы мержим. В нашем случае — по полю name:

columns_name = [ “NAME” , “VALUE”] 
test_data_1 = [ [ “name 1”, 1], [ “name 2”, 2], [ “name 3”, 3], [ “name 4”, 4], [ “name 5”, 5] ] 
test_data_1 = [ [ “name 1”, 11], [ “name 7”, 7],  [ “name 3”, 33], [ “name 8”, 8], [ “name 9”, 9] ]
dask_df1 = ddf.from_pandas (pd.DataFrame(data = test_data_1, columns = columns_name), npartitions=2)
dask_df2 = ddf.from_pandas (pd.DataFrame(data = test_data_2, columns = columns_name), npartitions=2)
joined_df = dask_df1 = merge (dask_df2, left_on = “NAME”, right_on = “NAME”)
print (joined_df.head())
✓ 0.2 s

DASK поддерживает индексирование. Это означает, что вы можете насетапить на каждый столбец исходного датафрейма бинарный ключ. В итоге DASK пересортирует данные, и объединение датафреймов произойдет очень быстро. Ниже в коде все то же самое, но я добавил еще две строки, которые указывают два параметра объединения set index:

columns_name = [ “NAME” , “VALUE”] 
test_data_1 = [ [ “name 1”, 1], [ “name 2”, 2], [ “name 3”, 3], [ “name 4”, 4], [ “name 5”, 5] ] 
test_data_1 = [ [ “name 1”, 11], [ “name 7”, 7],  [ “name 3”, 33], [ “name 8”, 8], [ “name 9”, 9] ]
dask_df1 = ddf.from_pandas (pd.DataFrame(data = test_data_1, columns = columns_name), npartitions=2)
dask_df2 = ddf.from_pandas (pd.DataFrame(data = test_data_2, columns = columns_name), npartitions=2)
dask_df1 = dask_df1.set_index ( ‘NAME’). persist () 
dask_df2 =  dask_df2.set_index ( ‘NAME’). persist () 
joined_df = dask_df1.merge (dask_df2, right_index = True, left _index = True”)
print (joined_df.head())
✓ 0.2 s

Мы устанавливаем set index на поле name первого датафрейма и аналогично — на поле name второго датафрейма и после этого делаем merge. Только в данном случае указываем не right on и left on, а right index и left index, являющиеся битмапами.

Мониторинг метрик

На мой взгляд, одна из самых классных фич DASK. При работе с big data очень легко потеряться в том, что пошло не так. Вы задействовали многопоточность или раскидали нагрузку на разные compute-инстансы, но производительность увеличилась незначительно, осталась прежней или вообще просела. В такой ситуации стоит обратиться к DASK дашборду и посмотреть на мониторинг метрик. Вы сможете увидеть, что идет не так, какой модуль или функция занимает больше всего времени, где происходит утечка данных или образуются bottleneck. Полученная информация позволит принять подходящее решение по оптимизации данного модуля.

DASK дашборд доступен на локальном хосте и включает несколько вкладок: статус, воркеры, задачи, система, профилировщик и граф состояния. Далее вкратце пробежимся по каждому из них.

Первая вкладка — Статус. Здесь отображается, сколько памяти потребляет каждый воркер, как идет процесс загрузки и выполнение отдельной задачи, а также видно задержки в передачи управления между задачами. На основе этой информации, можно сделать правильные выводы о производительности отдельных модулей и построить гипотезы по их улучшению.

Следующий пункт — воркеры. Это compute-инстансы, которые позволяют вам выполнять определенную задачу. На этой вкладке можно детально посмотреть, как загружен тот или иной воркер и как распределяется нагрузка между вычислительными compute-инстансами.

Затем идет вкладка System. Она позволяет анализировать, сколько потребляется оперативной памяти и какой своп задействован.

Ниже представлен профилировщик. Прямоугольники на картинке — это время выполнения того или иного модуля в виде Python-функции или декоратора, который входит в состав Data Pipeline. То есть вы можете видеть, сколько времени выполняется каждый модуль:

Более того, DASK умеет анализировать вложенную структуру этих функций. Представьте ситуацию: у вас есть обычная функция с вложенной следующей функцией, которая вызывает третью функцию. Даже если при помощи декоратора delayed обернуть функцию на верхнем уровне, DASK может декомпозировать задачи и проанализировать декомпозиционный граф внутрь, а затем понять, как будет выполняться функция до самого мелкого уровня. Так вы сможете понять, как вообще происходит выполнение задач и на какой модуль обратить внимание.

Пример использования DASK на практике

У нашей команды был заказчик — компания из США, которая занималась сбором и предоставлением данных об автомобильных дилерских центрах. Информация поступала как от автосалонов (о покупке машин или покупателях), так и от самих пользователей, приобретающих автомобили. Перед нами стояла задача в сжатые сроки обработать довольно много таких данных. На схеме ниже показана схема взаимодействия и наша роль в обработки данных.

Основных сложностей было две. Во-первых, все нужно было поднять быстро. Во-вторых, требовалось развернуть это в существующей экосистеме заказчика. Мы имели код на Python и установленную заказчиком логику по обработке данных. Данные были в виде URL-адресов, включающих UTM-параметры. Они содержали информацию о том, куда заходил пользователь, с каких сайтов он переходил, какие компании посещал и т.д. Все это скрапится и передается на анализ.

Использовать мультипроцессинг было неудобно, но надо было как-то разделять нагрузку на compute-инстансы. Как в такой ситуации применить DASK? Данные могли предоставляться либо в csv-формате, либо чаще всего в дампе базы данных. Между последней и базой данных, куда пушились результаты, мы как раз и имплементировали DASK. Входные данные назвали Raw Data (сырые данные), а те, которые пушили — Enrich Data. Вот, как это было реализовано в коде:

def batch (input_data):
       sub_results = [ ] 
       for current_sample in input_data: 
             sub_results. append (referParsev3 (*current_sample)) 
       current_result_df = pd. DataFrame (data=sub_results, columns=set_columns_name) 
       current_values = current_result_df  
return current_values  

Обратите внимание на функцию refererParseV3. Именно она отвечает за логику заказчика, и ее надо было оптимизировать и ускорить.

Далее приведу пример цикла, где происходит батч extraction данных по 500 записей, и эти batch-данные сохраняются в переменной current date. Затем следует функция batch, которая и оборачивает логику заказчика. Она транслирует эти батчи в модуль, где обрабатывается логика и после чего возвращается ответ:

batch_size = 500 
Prepare batch to calculation 
number_iteration = int (add_DF,shape [0] / batch_cize )
for index in range (number_iteration); 
     if index ! = number_iteration - 1:
        сurrent_data = values [index * batch_size: index* batch_size + batch_size] 
else: 
      сurrent_data = values  [index * batch_size]
      Make batch function as delayed. It means that we will calculate it later

result_batch_refererV3 = dask. delayed (batch) (current_data) 
result_fillna = dask. delayed (fillna_def) (result_batch_refererV3)
result_batch = dask. delayed (write_db_file) (result_fillna) 
Create stack of delayed functions 
batches_result.append (result_batch)

time start = time. time () 
result = dask. compute (*batches_result) 
time_end = time. time () - time_start 

Чтобы DASK датафрейм мог эффективно обрабатывать эту логику, необходимо сделать функцию отложенной. Для этого создаем result batch refererV3 и оборачиваем функцию butch в delayed декоратор, а затем передаем в качестве input data в current data. То есть получаются данные, разбитые по батчу. В итоге результат помещается в result batch referrer parse v3. Результат обработки отправляем в следующий модуль обработки данных. В этом случае модуль называется fillna_def. Он предназначен для очистки данных от пустых значений:

def wite_db_file (data) : 
       data. to_csv (“Result. csv”, mode= ‘a’ ) 
def fillna_def (data) :  
       data = data. fillna ( “ “)
       return data 

После обработки этих данных мы передаем блок управления в функцию записи в базу данных write DB. Но не стоит забывать: после построения пайплайна ничего не происходит. У нас создается стек отложенных задач и сохраняется в переменную batches results. И на данном этапе работы цикла ничего не выполнилось. Чтобы стартовать выполнение, надо запустить compute. Для этого создаем переменную results и вызываем для данного batches results метод compute.

Как анализировать DASK дашборд и метрики

Ниже вы видите разделенный на две части экран. Левая сторона — DASK дашборд. В правой — выведены системные метрики в виде монитора ресурсов и есть три функции: batch, fillna_def и write_db. В процессе выполнения цикла появляется статус-бар. Он показывает, что выполняется функция batch, итог передается в fillna_def, и результат уже этой функции отправляется в результат записи в базу данных.

Здесь задействовано четыре воркера. Так как насетаплен локальный кластер, то использованы все ядра процессора, поэтому вы видите четыре направления по обработке данных.

Хочу остановиться на таск процессинге. Он показывает, сколько времени и какая задача занимает ресурсов, а также общее Bytes stored (количество данных, которые записываются в память). Для примера я взял небольшую часть для ускорения процесса. В реальности это может занимать часы и даже дни в зависимости от количества данных.

Отмечу, что мониторинг ресурсов показывает, что CPU подгружен под 100%. Данные хорошо распараллеливаются между compute-инстансами, что позволяет значительно ускорить работу.

Давайте проанализируем эффективность выполнения задачи на чуть большем объеме данных. В Single Thread данный процесс занял около 250 секунд, а DASK справился чуть меньше, чем за 100 секунд. В качестве примера был проведен Multiprocessing, но сам DASK в принципе построен на модуле Multiprocessing. Поэтому за счет использования локального кластера результаты плюс-минус похожи. В DASK есть возможность масштабирования на несколько compute-инстансов в кластере (допустим, в Azure или Google Cloud). А это может более существенно повысить производительность.

Best practices

В завершение приведу, на мой взгляд, несколько лучших практик использования DASK. Примеры будут только на первый взгляд простые. На деле же они могут ввести в замешательство, и далее я объясню, почему.

Например, разберем чтение достаточно большого csv-файла. Слева вы видите код с ошибкой. Она заключается в том, что мы сначала вычитываем его в pandas датафрейме, затем пробегаем в цикле и применяем какие-то методы к анализу данных из датафрейма. Но так делать не надо. Ведь DASK может самостоятельно вычитывать датафреймы в параллельном режиме.

Справа приведен блок правильного кода. Здесь мы по сути оборачиваем вычитку датафреймов в dask.delayed декоратор и параллельно читаем.

@dask.delayed 
def process (a,b) : 
       a = “Some function logic”
df = pd. read_csv ( “very_big_file_csv”) 
results = [ ] 
for item in L:
     result  = process (item, df) 
     results. append (result)  

@dask.delayed 
def process (a,b) : 
       a = “Some function logic”
df = dask.delayed (pd.read_csv) ( “very_big_file_csv”) 
results = [ ] 
for item in L:
     result  = process (item, df) 
     results. append (result)  

Следующий пример — чтение множества csv-файлов. Если у вас есть директория с большим количеством csv-файлов, нет смысла перебирать их по одному и вычитывать в pandas. Гораздо проще сразу запустить несколько воркеров, которые справятся с этой задачей в разы быстрее. Обратите внимание на левый блок. В цикле мы точно так же проходимся по нашим файлам и вычитываем каждый по отдельности в pandas датафрейм, а затем конвертируем его в DASK датафрейм. Но так кода больше, да и в целом это неэффективно.

На примере справа мы просто указываем filenames (имя директории с нашими файлами) и используем метод DASK датафрейма read_csv. Таким образом в параллельном потоке мы учитываем все данные.

df = ddf. DataFrame () 
for fn in filenames : 
      df = pd. read_csv (fn) 
      ddf = ddf. append(df) 


df = ddf. read.csv (filenames) 

Последний пример — про избегание мгновенных вычислений. Важный нюанс: даже если вы используете ключевое слово compute, еще на факт, что вы все делаете правильно. В примере слева кажется, что мы вычитываем csv-файл при помощи DASK датафрейма. Однако, когда мы сетапим какие-то операции на выполнение (в данном случае это нахождение минимального и максимального значений по полю х), то во второй строчке мы сразу вызываем ключевое слово compute. Это заставляет DASK перейти к мгновенному вычислению. Получается, поиск запрашиваемых значений выполняется последовательно: сначала минимумы и только потом максимумы. По сути эти Single Thread. Чтобы сделать Multi Threading для DASK, необходимо обернуть все в функцию compute и задать вычисления внутри нее, что и реализовано в коде справа.

df = ddf.read_csv( “some_file.csv”) 
xmin = df.x.min (). compute () 
xmax = df.x. max.compute () 

df = ddf.read_csv( “some_file.csv” ) 
xmin, xmax = dask. compute (df.x.min () , df.x. max.compute () )

DASK — удобный фреймворк, который понравится дата сайентисту, когда-либо работавшему с Рandas. Даже если вы с ним не знакомы, подробная документация DASK позволит вам быстро его освоить. Перечисленные в статье возможности фреймворка делают DASK хорошим дополнительным инструментом практически для любого специалиста в сфере Data Science.

👍ПодобаєтьсяСподобалось3
До обраногоВ обраному4
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Каждый Data Scientist когда-либо строил дата пайплайны (Data Pipeline, ML Pipeline или их комбинации).

Я не строил :). Правда, с пета- и уж тем более зетабайтами у меня еще не было необходимости работать, а терабайты (кусками по паре сотен гигов) неплохо впихуются в RAM, только железА побольше надо. А вот железО я собирал собственноручно — бо в клауде такое б в копеечку вышло.
letyourmoneygrow.com/...​beerwulfs-ebay-adventure

А статья интересная, возьму на заметку на будущее.

А статья интересная, возьму на заметку на будущее.

„Tesla K40: I am pretty disappointed. (On my models) it is not really much faster than a high-end XEON ... and I still cannot fix the desynchronization problem!”

Таки-да. Кстати, desynchronization problem решилась как-то сама собой (благодаря то ли дополнительным кулерам, то ли новой версии TF). Но мне это уже стало неактуально — задачу я вполне решил и на CPU. Хотя сервачок с Теслой держу — мало ли будут другие задачи, где она сможет проявить свою мощь.

Сергей,
спасибо большое за статью. Сейчас как раз разбираюсь с данным фреймворком и рад что не один такой — по моим наблюдениям он только набирает популярность.

Из нюансов, которые я бы хотел добавить про Dask:
1) Чистый пайтон — это прям ми-ми-ми как по мне, особенно если сравнивать с альтернативами на Скала и Спарке (привет Датабрикс из коробки на AWS и Azure)
2) С другой стороны тот же Спарк в моем конкретном случае показывает себя как более надежный и отказоустойчивый инструмент, который намного лучше работает с памятью. Worker killed out of Memory это про Даск.
3) Вы правильно использовали persist() после индексации датафреймов, но совершенно ничего про это не упомянули — крайне важно в ключевых точках обработки данных записывать их в кластер ( как бы имитируя то, что делает спарк со своими RDD) , иначе каждый раз ваш граф будет вычисляться (и индексироваться например) заново.

Из проблемных мест Даска:
1) Партицирование — по умолчанию это либо каждый индивидуальный файл при чтении или одна единственная партиция при вызове compute() на клиенте, что с легкостью может съесть всю память на клиенте
2) Каждая партиция — это объект пандас, и насколько я могу судить, эти объекты тоже нормально так места в памяти занимают, ощущение будто при вычитывании 1 Гб из паркетов — получаем датафрейм размером 2-3 Гб;
3) Группировка больших датафреймов по небольшому количеству значений тоже с легкостью создаёт огромные партиции и съедает память на отдельно взятом воркере — об этом даже отдельная статья/видео есть от Saturn Cloud.
4) Join операции — к сожалению Даск не поддерживает мульти-Индексы и если , например, нужно объединить датафреймы по двум колонкам ( идентификатор и например отметка по времени), то приходиться создавать отдельную колонку с конкатенацией этих значений. Кстати даск может и обычный df.join() делать, но к сожалению только по индексу.

Из полезных материалов стоит упомянуть блоги Coiled.io и Saturn Cloud — обе компании предоставляют услуги по настройке и использованию кластеров с Даском. Метью Роклин из первой компании ведёт свой канал на ютубе — можно много полезного почерпнуть.
Также есть просто невероятно полезная книжка от Jesse C. Daniel по Даску — очень хорошо структурирована, наглядные примеры и интересные кейсы.

Ну и как сказал Руслан: вставки кода стоить перепроверить — очень много «очепяток» в тексте. Например  <blockquote>batch_cize != batch_size</blockquote>, pd / read_csv и

add_DF,shape [0]

Кстати для доступа к shape [0] — разве не нужно вызвать компьют?

П.С.
Изобилие англицизмов не добавляет читабельности статье, но это мое скромное имхо .

А вообще пишите ещё — тема интересная и я считаю важная.

Если не секрет — чем вас не устроил Databricks?

Датабрикс — замечательный инструмент, пусть и крепко завязанный на свою экосистему. Умолчим о финансовой стороне вопроса- говорят подписка стоит немало у ребят. Датабрикс нельзя развернуть у себя локально, только кластер. Когда кластер поднят — начинаются вопросы конфигурирации типа default parallelism, shuffle repartition, dbs secret scope и тучи других флажков, которые могут производительность как вверх так и вниз увести.

Ну джавовские стек трейсы читать при ошибках- все ещё тяжеловато.

В моём случае это больше проверка нового инструментария плюс создание песочницы, где новые сотрудники смогут потрогать исторические данные в привычном пандас. Как ни крути но синтасически пандас и пайспарк отличаются достаточно чтобы простая копипаста ячеек ноутбука перестала работать.

Ну и дашборд состояния кластера в Даске просто огонь.

Если у вас техническое решение построено на локальных вычислительных мощностях — Databricks вам конечно не подходит.
Default parallelism скорее свойство обычного spark кластера чем особенность Databricks,
Shuffle repartition вам может и не понадобится трогать если вы будете использовать delta tables,
Secret scope управляется Databricks эксклюзивно, да и скорее всего вам придется использовать инфраструктуру cloud для хранения секретов
Трейсы — это наследие spark кластера, их невозможно избежать, если вы его используете
Если вам привычнее pandas — во первых вы можете продолжить его использовать, но можете глянуть на koalas, он аналогичен по синтаксису, но адаптирован под spark.
Дашбоарды — есть технические решения, но это довольно обширный вопрос. Для мультиклауд я бы посоветовал глянуть на комбинацию mlflow (если есть ml) и graphana.
В целом же это вопрос привычки скорее.

Змінна test_data_2 узагалі відсутня.
Тут описка — joined_df = dask_df1 = merge (dask_df2, left_on = “NAME”, right_on = “NAME”), після dask_df1 має бути крапка.

Підписатись на коментарі