Мы обрабатываем данные миллиона украинских компаний за один час. Как мы это сделали

Привет! Я руковожу платформой Опендатабот, которую мы создали, чтобы каждый украинец знал, какие открытые данные о нем и его близких есть у государства.

Мы начали с простого чатбота, который отслеживал рейдерские захваты компаний, а сейчас мы следим за всеми ключевыми реестрами в стране. У нас в чатботах 1 800 000 пользователей и тысяча компаний подключено по API.

Я расскажу об украинских открытых данных и подходе Publisher/Subscriber для обработки больших данных.

Открытые данные

Большая часть государственных реестров в Украине являются публичными. Некоторые из них государство публикует бесплатно в формате, который можно обрабатывать автоматически. Такие данные законодательство называет «открытыми данными».

Ежедневно наша система отслеживает до сотни тысяч изменений в открытых реестрах. Главный ресурс, который мы используем — data.gov.ua.

Часть данных в нашей системе платные. Мы получаем их в режиме реального времени через API Минюста и МВД.

Например, вот как выглядят открытые данные по украинскому программисту в реестре компаний и фоп:

На портале открытых данных уже более 60 000 наборов. Стандартизация открытых государственных данных в Украине еще не произошла, поэтому каждое министерство публикует данные в формате, который каждому из них нравится.

Например:

  • Реестр компаний Минюста — ежедневный XML, 6 млн записей, файлы размером 2 и 10Гб.
  • Судебный — ежедневный CSV с ссылками на 10 млн судебных решений в формате RTF, DOC текущего года, всего 80 миллионов решений.
  • Реестр розыска МВД — ежедневный JSON с фото разыскиваемых на сотни мегабайт
  • Налоговые долги фоп и компаний — ежемесячный CSV на 800 тысяч записей про должников

Эволюция архитектуры

Наш технологический стек состоит из Phalcon (минималистичный и быстрый фреймворк на PHP), кластера ElasticSearch для документов (судебные решения, компании, фоп) и MySQL для хранения табличных данных (пользователи, подписки, простые реестры). Для работы с электронной цифровой подписью и для построения новой версии нашего фронтенда используем Node.js

Монолит

Первая монолитная версия Опендатабота работала в одном потоке:

  1. Исправляем и чистим данные из CSV, JSON, XML (дубли в данных не баг, а особенность реестров)
  2. Размещаем в базах данных, находим изменения
  3. По каждому изменению смотрим кто на него подписан и шлем ему уведомления через очередь сообщений

Кроме дублей в реестрах бывают неожиданное удаления данных и просто ошибки. Удаление компаний из реестров как правило признак рейдерского захвата. И мы активно работали над тем, чтоб такое регистрационное действие запретить полностью. В 2019 нам это удалось и теперь это делать запрещено.

Кроме недостатков монолита, были еще и недостатки в обработке данных:

  1. Ошибка в любой записи новых данных делает непригодной всю базу данных, невозможность откатить неудачный файл с реестром
  2. Высокая связность, код знает про все: входящие данные, поисковое хранилище, хранение изменений, подписки пользователей и то, как их уведомить про изменения

Конвейер

После перехода на микросервисы мы получили конвейер очередей, каждая из которых ожидает предыдущую, Недостатки конвейера — каждая очередь знает про следующую, все идет последовательно, сложно организовать параллельную обработку.

Publisher/Subscriber

Следующим шагом был переход на архитектуру publisher/subscriber, где каждый микросервис никак не связан с другими сервисами, подписан на определенные события в шине и сам может генерировать новые события.

Входящие данные — любые атомарные данные, которые пришли к нам в систему — будь это каждая из 4 миллиона строк CSV файла или один ответ API. Логгирующий сервис Datastream позволяет «переиграть» любую обработку данных, заново начав поставлять в шину события нужного типа. Хранение изменений и уведомление пользователей — две независимые системы, работающие параллельно.

В качестве брокера очередей мы выбрали RabbitMQ, как наиболее простой и понятный нам. Так как это не Kafka, нам нужно следить, чтобы скорость обработки сообщений была ненамного ниже, чем скорость публикации новых сообщений. Проще говоря, закинуть 10 миллионов событий за 10 минут и потом их разгребать пол-дня — это точно не история про RabbitMQ, необходимо контролировать помещение событий в шину.

События на которых упали наши или внешние сервисы (к слову uptime серверов реестра недвижимости 97%) мы отправляем в специальную очередь, которая периодически снова публикует события в шину.

Так как все микросервисы работают отдельно, и связаны только шиной, мы не стали писать интеграционные тесты, которые проверяют, что при появлений в системе события Х, они сформируют изменения Y и уведомят пользователя Z. Тесты пишутся на каждый сервис отдельно.

Как понять что происходит в шине событий?

Задача, которую необходимо решать при такой архитектуре — реализовать сквозное логгирование, чтобы отследить прохождения события через все микросервисы и все подписчики.

При создании события мы генерируем ему ключ activityId, который потом передается через все очереди и обработки. Для сбора и объединения логов со всех серверов мы используем Fluentd, для хранения ElasticSearch и для визуализации — Kibana.

Для того, чтобы избежать рассинхрона логов по мере сбора с разных серверов и сервисов — ведем вместе с activityId также и время генерации лога, по которому уже сортируем Kibana.

Event sourcing

Опендатабот — платформа, которая помогает людям чувствовать себя безопасно, бизнесу работать эффективнее и движет Украину к открытому государству.

Впереди большие и сложные задачи — новые данные (полтора миллиона имущественных деклараций), новые задачи (путешествия во времени в реестрах), новые клиенты (мы работаем со всеми ведущими банками страны).

Наш следующий шаг в технологиях — реализация шаблона event sourcing и перевод всех баз данных на хранение изменений, а не состояний. Такой подход усложняет поиск и агрегацию данных, но упростит «путешествия по времени», и нам будет проще сказать, кто был собственником или директором компании в определенный момент времени.

В этом случае мы сможем отказаться от Datastream, любое переигрывание обработки данных будет состоять только в исключении неверно созданных изменений.

Наша цель — отправка уведомлений пользователям через час после публикации файла любого реестра. Для этого, чтобы одновременная обработка нескольких реестров не тормозила существующие сервера и могла подключать по необходимости новые, мы сделали систему балансировки, которая управляет количеством воркеров в supervisor на основании важности задачи, объема очереди и текущей загрузки каждого нашего сервера.

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

👍ПодобаєтьсяСподобалось2
До обраногоВ обраному9
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Datastream позволяет «переиграть» любую обработку данных, заново начав поставлять в шину события нужного типа.

У вас там по описанию судя сплошной fanout, как решали пересылку на отдельный выбранный сервис как в pull-based брокерах, а не всем сервисам слушателям копий сообщений после повторной отправки?

В этом случае мы сможем отказаться от Datastream, любое переигрывание обработки данных будет состоять только в исключении неверно созданных изменений

Звучит странновато , в event sourcing во время записи решаются конфликты, а не при чтении и агрегат там всегда в одном состоянии должен быть, если вы делаете разные состояния из event stream — это не event sourcing , а просто хендлиг persistent лога событий брокера для разного рода целей.

так вот кто был агрегатором моих данных. Не успел я зарегистрировать СПД, как начался ддос моих аккаунтов в вайбер, телеграмм, etc. Одни особо отличились, звонили 2 раза и приглашали в офис для мега уникального делового предложения. За статью однозначно спасибо, было очень интересно. А вот за нарушение моего покоя, спасибо не скажу.

Інформація про ФОП у відкритому доступі тому маркетологи можуть напряму з офіційних реєстрів отримувати.
Після реєстрації ФОП також отримую спам від банків (Таскомбанк, Ощадбанк) і від СпортЛайф, а ще безлічі нонеймів та гроші частіше в смс почав вигравати

да, информация в открытом доступе, но я не думаю, что заинтересованные лица вручную перебирают весь реестр. Это очень утомительное и затратное дело. Скорее всего используют аппы, один из которых описан в статье(я так думаю, что есть возможность приобрести премиум аккаунт, где возможности аппы будут совершенно иные). Либо кто-то из налоговой за небольшую плату инфу сливает, кто недавно зарегистрировал СПД. Учитывая то, что мы живем в Украине, второй вариант вполне вероятен.

Написати прототип який буде парсити гігабайтний JSON і оновлювати в БД це задачка на пару годин яку вже зустрічав в тестових завданнях
Тому в довгій перспективі краще-правильно замовити розробку у фрілансера ніж давати хабаря
З 1 січня 2021 набуде чинності закон зі штрафами за спам, можливо вирішить проблему

Благодаря OpenDataBot, жизнь всех юристов в Украине стала намного проще ) Thanks!

Дякую! Дуже інформативно!

Откуда в Украине миллион компаний? Хорошая страна наверное, где у каждой десятой семьи своя компания

Более 1 млн 200 тысяч компаний открытых, 500К закрытых.

В стране 2М открытых и 2М закрытых ФОП.

У нас большая страна, и мы любим схемы.

Якщо у тебе немає компанії, то у когось їх дві.

Скоріше двадцять дві тисячі. Гарем обналу.

Підписатись на коментарі