Прийшов час осідлати справжнього Буцефала🏇🏻Приборкай норовливого коня разом з Newxel🏇🏻Умови на сайті
×Закрыть

Строим Serverless BI

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Привет! Меня зовут Антон Мартыненко, я работаю бекенд инженером в компании PFC Technologies AB в Швеции и хочу вам рассказать о том, как использовать AWS Lambda на конкретном примере. Статья будет интересна бекенд инженерам и всем кто работает с облачной инфраструктурой. В ней я приведу пример как можно использовать Serverless в реальном бизнесе для обработки данных. Этот же подход можно применять в других облачных сервисах — Microsoft Azure и Google Cloud Platform.

AWS Lambda — это современная Serverless платформа компании Amazon на которой легко запускать код и на которой вы платите только за использованные ресурсы. Большой плюс такого подхода, что не надо обслуживать сервера — Amazon все это делает за вас. AWS Lambda поддерживает горизонтальное автомасштабирование — в зависимости от нагрузки AWS Lambda увеличивает или уменьшает количество экземпляров вашей функции. Это позволяет с легкостью обрабатывать пиковые нагрузки, а также экономить деньги во время простоя или низких нагрузок. Amazon использует модель оплаты pay-as-you-go — это значит вы не платите за ресурсы (процессоры, память) которые не используются. Подробнее читайте здесь.

Суть проблемы

Практически все компании хотят знать больше о своих пользователях, изучать их поведение и понимать, как улучшить свой продукт. Как правило, этим занимаются специально обученные люди, которые работают с инструментами Business Intelligence. Эти инструменты помогают обрабатывать данные о пользователях, находить зависимости, строить различные графики и делать выводы.

Если коротко — на вход инструментам Business Intelligence подаем данные, затем BI специалист строит определенные запросы — фильтрует, группирует, коррелирует с другими данными. На выходе получаем графики или просто числа, которые помогают менеджменту принимать решения о том, что делать (или не делать) дальше.

Еще важно упомянуть, что в связи с европейской директивой General Data Protection Regulation (далее GDPR) с данными пользователей нужно обращаться определенным образом, например, нужно ограничивать и контролировать доступ к персональным данным. Иметь возможность удалять персональные данные а еще лучше их анонимизировать чтобы данные перестали быть персональными.

В нашей компании моей команде была поставлена задача сделать доступными финансовые транзакции пользователей для анализа в BI инструментах. При этом нужно было сделать GDPR-совместимое решение.

Архитектура решения

Без глубокого погружения в детали скажу, что все финансовые транзакции в конце концов попадают в AWS DynamoDB, откуда мы их можем читать для дальнейшей обработки. Для тех, кто не знаком с DynamoDB — это Serverless NoSQL хранилище в Amazon. В нашей системе транзакции не меняются после того, как сохранены — то есть они Immutable. Как следствие — каждую транзакцию достаточно считать и обработать один раз.

Для решения задачи мы решили считывать транзакции из DynamoDB, анонимизировать их и сохранять в AWS RDS (PostgreSQL). Наш BI инструмент позволяет подключить любую SQL базу, в том числе PostgreSQL.

Код мы пишем на Go, но в этой статье и в этом решении это не имеет значения — все то же самое можно делать на любом другом языке, который поддерживается AWS Lambda. Наша команда выбрала AWS Lambda, где собственно запускается код, который делает всю работу.

Примерная архитектура решения на диаграмме.

У DynamoDB есть отличная функция — DynamoDB stream, который можно напрямую подключить как источник данных в AWS Lambda. Наша Лямбда получает DynamoDB stream event в котором лежит JSON с транзакцией. Ламбда распаковывает JSON и преобразовывает в объект Go — это так называемый unmarshalling данных. Далее мы удаляем или анонимизируем некоторые поля из транзакции, по которым можно определить пользователя. Мы анонимизируем поля транзакции при помощи хеширования с секретом. Например:

anonymizedUserID := sha256(userID + secretString)

Строка с секретом secretString хранится в AWS Secrets Manager и доступ к этому секрету имеет ограниченное количество людей. То есть когда BI специалист работает с транзакциями — он не может при помощи хеша восстановить userID имея доступ только к anonymizedUserID. И с точки зрения GDPR такое решение является достаточным для анонимизации данных пользователей. Лямбда считывает секрет один раз при старте и держит его в памяти.

Далее мы просто сохраняем анонимизированную транзакцию в базу данных PostgreSQL к которой имеют доступ BI инструменты и где специалисты выполняют запросы к анонимизированным данным.

Тестирование

Мы часто практикуем TDD, а значит начинали мы конечно же с тестов. Первым делом мы нашли фреймворк который нам помог с тестированием, запуском Лямбды локально и с деплойментом в облако. Это AWS Serverless Application Model (или коротко SAM).

Тесты, или если быть точнее интеграционные тесты, мы запускаем в докере. Есть как минимум два контейнера — контейнер с кодом и контейнер с тестами. Все вместе мы запускаем при помощи docker compose и контейнер с тестами взаимодействует с первым контейнером (например делает вызовы REST API или отправляет сообщения в очередь). Также мы запускаем контейнеры с зависимостями, например мы запускаем localstack для эмуляции AWS Secret Manager, базу данных PostgreSQL и другие зависимости нашего Serverless приложения

Мы столкнулись с тем, что SAM тоже использует докер для запуска Lambda, то есть нам пришлось разобраться как запускать Docker контейнер внутри нашего Docker контейнера и как сделать так чтобы наш код и наши тесты могли общаться с зависимостями. Важно запустить все контейнеры в одной сети чтобы они могли общаться друг с другом.

Наш контейнер с кодом (Dockerfile) выглядел так:

FROM lambci/lambda:build-go1.x

#some more stuff here

RUN mkdir -p /go/src/github.com/company-name/lambda-name

WORKDIR /go/src/github.com/company-name/lambda-name

COPY . .

go install -race ./vendor/github.com/company-name/migration-tool/migrate-db

ENV DATABASE_URL='postgres://user:pass@bi-db:5432/bi-db?sslmode=disable'

Внутри lambci/lambda:build-go1.x есть SAM и SAM CLI, а это значит что можно внутри нашего контейнера запускать SAM при помощи команды sam local start-lambda. Так как sam запускается внутри докер контейнера — надо не забыть про важные параметры: sam local start-lambda --host 0.0.0.0 --docker-network <network_name> --skip-pull-image --docker-volume-basedir <lambda_base_dir>

К сожалению, нам не получилось локально подключить DynamoDB stream к SAM напрямую, пришлось изобретать небольшой велосипед с конвертацией типов из github.com/aws/aws-sdk-go/service/dynamodb в github.com/aws/aws-lambda-go/events Надеюсь в других AWS SDK (например на джаве или c#) этой проблемы нет.

Для запуска теста наш тестовый код из docker-compose.tests.yml присоединялся к SAM и отправлял вызов Lambda кода при помощи AWS SAM SDK. После вызова функции тест подключался в базе и проверял что тестовая транзакция анонимизирована и сохранена в базе данных.

Примерный код теста выглядит так:

func testFoo(t *testing.T) {
  // Create transaction with data
  transaction := Transaction{...}

  // Marshall transaction into dynamoDb type
  dynamoDbMap, err := dynamodbattribute.MarshalMap(transaction)
  require.NoError(t, err)

  // Convert dynamoDb type into the type that AWS Lambda can understand
  convertedDynamoDbMap := convertMap(dynamoDbMap)

  e := events.DynamoDBEvent{
     Records: []events.DynamoDBEventRecord{
        {
           Change: events.DynamoDBStreamRecord{
              NewImage: convertedDynamoDbMap,
           },
        },
     },
  }

  bytes, err := json.Marshal(e)
  require.NoError(t, err)

  //Act
  invokeLambda(t, bytes)

  //Assert
  txs, err := getTransactions(biDatabase)
  require.NoError(t, err)

  //Assertions here ...
}

Запуск Лямбды

func invokeLambda(t *testing.T, payload []byte) {
  // Create aws session
  ...

  c := lambda.New(session)
  invokeInput := lambda.InvokeInput{
     FunctionName: aws.String("Anonymize"),
     Payload:      payload,
  }

  invokeOutput, err := c.Invoke(&invokeInput)
  require.NoError(t, err)
  require.Nil(t, invokeOutput.FunctionError)
}

Деплоймент в клауд

AWS Serverless Application Model (или просто SAM) — достаточно сложный инструмент, который решает несколько задач. Я не смогу здесь детально рассказать о SAM, но если коротко о деплойменте то:

  • Serverless application состоит из Lambda functions (собственно ваш код), Event sources (источники событий, которые запускают ваш код) и Resources (например хранилище секретов или база данных куда сохраняется результаты выполнения кода)
  • Сначала вы описываете шаблон (template.yaml) в которым вы определяете Lambda functions, Event sources и Resources
  • Далее при помощи SAM command line interface (SAM CLI) на основе шаблона вы создаете package — пакет с приложением и конфигурацией к нему готовые для деплоймента. Пакет загружается в AWS S3 — файловое хранилище в облаке
  • После этого можно задеплоить приложение опять же при помощи SAM CLI. SAM создает (или обновляет) Application stack — по сути это Lambda+Triggers+Resources все вместе как одна сущность — ваше Serverless приложение

Самое сложное во всем этом — создание правильного шаблона (template.yaml) и подготовка ресурсов, которые будут использоваться вашим приложением. Формат описания шаблона очень похож на AWS CloudFormation — по сути это просто декларативный скрипт с параметрами. То, что сегодня называют Infrastructure as Code. Если у вас есть опыт написания скриптов, то проблем с template.yaml быть не должно. В интернете можно найти много примеров как сделать SAM шаблон.

Я советую использовать AWS Systems Manager Parameter Store (сокращенно Parameter Store) для хранения конфигурации вашего приложения. Это упростит работу с несколькими окружениями (например dev, staging, prod) и упростит изменение конфигурации вашего приложения, которое уже запущено и работает.

Для того, чтобы все это вместе задеплоить и запустить мы написали 3 скрипта: one-time-setup.sh, package.sh, deploy.sh

Немножко подробнее остановлюсь на one-time-setup.sh. Этот скрипт нужен, чтобы создать пользователя, от которого будет запускаться ваш код и дать ему права на доступ к ресурсам. Например на доступ к секрету в AWS Secrets Manager. Содержимое скрипта выглядит примерно так:

aws iam create-role --profile $PROFILE --role-name $ROLE_NAME …
aws iam attach-role-policy --profile $PROFILE --role-name $ROLE_NAME --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
aws iam attach-role-policy --profile $PROFILE --role-name $ROLE_NAME --policy-arn ...

Так как этот скрипт создает пользователя и назначает ему права доступа — для запуска этого скрипта требуются высокий уровень доступа, а точнее доступ к AWS IAM. Другими словами, деплоить наше Serverless приложение может любой программист, а создать пользователя и назначить права доступа может только администратор.

В скрипте package.sh — вызов sam package

В скрипте deploy.sh — вызов sam deploy, а также запуск миграций базы данных

По умолчанию AWS Lambda пишет логи в CloudWatch. Для мониторинга и алертов мы используем Loggly — для этого нам понадобилась еще одна Лямбда, которая пересылает логи из CloudWatch в Loggly.

Импорт исторических данных при первом запуске

У AWS DynamoDb stream есть ограничение — в нем хранятся данные только за последние 24 часа. Данные старше 24 часов удаляются автоматически и недоступны для чтения. Для импорта исторических данных мы нашли простое решение:

1) Создаем новую AWS DynamoDb таблицу с такой же схемой как и оригинал

2) Включаем AWS DynamoDb stream в новой таблице

3) Подключаем ее к нашей Лямбде, которая уже запущена и обрабатывает данные с таблицы-оригинала

4) Копируем данные из таблицы-оригинала в новую таблицу

Во время копирования все записи появляются в стриме новой таблицы. Таким образом мы просто добавляем исторические данные в новый AWS DynamoDb stream, который подключен к той же самой Лямбде. Во время копирования данных наша Лямбда автоматически масштабировалась горизонтально — при помощи логов мы видели что увеличилось количество shards в AWS DynamoDb stream и параллельно с этим увеличилось количество экземпляров Лямбды. Если AWS DynamoDb stream подключен к Лямбде, то на каждый AWS DynamoDb stream shard создается отдельный экземпляр Лямбды. Если вы планируете использовать такой способ импорта исторических данных — убедитесь, что ваша Лямбда сможет обработать данные быстрее чем за 24 часа.

Выводы

Современные облачные сервисы позволяют построить Serverless инфраструктуру которая будет простой в поддержке, стоить адекватных денег, а также будет автомасштабироваться в соответствии с вашими нагрузками. Стоит отметить, что были сложности в процессе реализации и нам пришлось потратить некоторое время, чтобы во всем разобраться, мы даже столкнулись с некоторыми багами в SAM. Думаю, справедливо сказать, что еще пройдет некоторое время, пока Serverless станет массовым. Но как преимущества я отмечу надежную работу решения в целом, низкая цена поддержки и обновления, автомасштабирование и infrastructure-as-code с которой может справиться любой бекенд девелопер.

Почитать/посмотреть по теме:

LinkedIn

Похожие статьи

Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

BI то у вас как раз самый стандарный :-), то что вы делаете это ETL. Для одного двух источников, такой подход в принципе ок (правда сертификацию пройти будет сложно). Но с увеличением количества типов данных, будет очень сложно поддерживать. Советую посмотреть в сторону immuta.com — она даже сама умеет определять PII, HIPAA колонки, можно делать k anonymity и прочее.

А зачем в принципе генерировать этот

anonymizedUserID

если id можно сделать просто произвольным и не иметь больше никогда возможности деанонить эти records?

Я не совсем понял что имеется ввиду. Нам надо данные джойнить с другими данными пользователей из других источников.

Спасибо, отличная статья!
Я так понимаю, лямбда только распаковывает и анонимизирует данные для записи в PostgreSQL? А там они как хранятся? В виде Star Schema или как-то еще подготовленными для BI?

Мне просто интересно, на каком этапе происходит трансформация данных в нужную модель. В лямбде или данные уже приходят как-то трансформированными в DynamoDB?

В DynamoDB у нас лежат транзакции, в которых много данных в виде key-value. Мы не усложняли — когда мы пишем в Postgres, мы просто пишем в одну табличку с кучей полей. Например данные о пользователях (возраст, пол, дата регистрации) хранится в другой табличке, они приходят из другого источника и также анонимизированы. Табличку с транзакциями можно заджойнить по anonymizedUserID с другими данными. Но я не могу это назвать Star Schema =)

Компания у нас маленькая и схема данных достаточно простая в сравнении с большими банками.

Подписаться на комментарии