×Закрыть

Реактивные библиотеки RX

За последние несколько лет термин «реактивное программирование» стал популярен во всех языках программирования. Даже был опубликован Reactive Manifesto, который, впрочем, дает очень общую формулировку реактивных систем. Да, приложения должны быстро отвечать (Responsive), не падать в случае ошибок (Resilient), быть гибкими в плане увеличения/уменьшения мощностей (Elastic) и базироваться на асинхронных событиях (Message Driven).

Декларация, конечно, верная, а что конкретно делать программисту? Давайте поговорим о реактивных (Rx) библиотеках, существующих во многих языках программирования.

Лет семь назад Эрик Майер (Erik Meijer) из Майкрософта предложил модель программирования Reactive Extensions (Rx) и имплементировал ее, как набор библиотек (Rx.NET) для соединения и обработки асинхронных потоков данных. Потоки эти основаны на событиях, типа кто-то твитанул, а ты получил нотификацию. Оно, конечно, можно и зайти на страничку Твиттера и каждые 10 секунд ее перегружать, в надежде, что кто-то из тех, кого вы фоловите выдал новый твит, но тогда это не реактивно, а просто polling. Нагрузка на сервер значительно вырастет, если каждый пользователь будет его дергать раз в 10 секунд. Гораздо экономичнее просто подписаться на поток твитов и получать их только тогда, когда они есть.

Или возьмем онлайновый аукцион, когда другие покупатели перебивают вашу цену на фотокамеру. Это явный поток, на который можно и нужно подписаться, вместо того, чтобы постоянно проверять, не побили ли ваше предложение.

Еще пример — поток цен на акции во время работы биржи. Или поток сигналов от сенсора, например, акселерометра в телефоне. Даже процесс, когда пользователь тащит мышку по экрану, можно рассматривать, как поток координат мышиного указателя.

Пять лет назад Microsoft выложил Rx.NET в опенсорс. Народу понравилось, и библиотеку портировали на многие языки программирования: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Давайте познакомимся с основными Rx понятиями. Сразу скажу, что эта статья не учебник по реактивному программированию, а просто описание основных игроков любой Rx библиотеки. Я работаю и с RxJS, и с RxJava, но в этой статье буду использовать JavaScript в примерах кода. Для начала, посмотрим на нереактивный код:

let a1 = 2;

let b1 = 4;



let c1 = a1 + b1;  // c1 = 6



a1 = 55;           // c1 = 6, but should be 59
b1 = 20;           // c1 = 6, but should be 75

После выполнения этого кода, c1 остается равным шести. Оно, конечно, можно добавить строчки после изменения a1 и b1 и пересчитывать c1, но правильнее было бы, чтобы c1 мгновенно пересчитывалась при изменении a1 или b1, как в Excel spreadsheet. Иными словами, мы хотим перейти к push-модели, когда новые и асинхронно изменяющиеся значения are pushed к потребителю. Мы хотим уйти от pull-модели, когда потребитель периодически спрашивает поставщика: «У тебя есть что-то новенькое для меня?... А сейчас?... А может сейчас есть? ».

Observable, Observer, Subscriber

Давайте посмотрим на основных игроков реактивных библиотек: Observable, Observer и Subscriber.

Observable — это объект или функция, которая выдает последовательности данных во времени (a.k.a. The Producer).

Observer — это объект или функция, которая знает, как обрабатывать последовательности данных (a.k.a. The Consumer).

Subscriber — это объект или функция, которая связывает Observable и Observer.

Я думаю многие из работающих программистов, увидев это диаграмму, скажут, что мы это и так знаем — обычный messaging и pub-sub. Это и так, и не так:

  1. Rx библиотеки заточены на асинхронную обработку без блокировки обработки данных.
  2. Rx предлагает простой API с выделенными каналами для передачи данных, ошибок и сигнала об окончании потока данных.
  3. В Rx библиотеках есть больше сотни операторов, которыми можно обрабатывать потоки идущие к подписчику. Операторы можно собирать в цепочки, т.е операторы composable.
  4. В некоторых реализациях RX, например, RxJava2, хорошо поддерживается backpressure, т.е. ситуация, когда продюсер выдает данные быстрее, чем подписчик может обработать.
  5. Для Rx messaging не нужно поднимать специальные серверы. Все включено в код вашего приложения.
  6. В языках, которые поддерживают multi-threading, работа со threads упрощается, как и переключения с одних threads на другие. Разработчики на Андроиде — это оценят, ибо там вывод на экран всегда должен выполняться main thread, а вычисления — другими.

Как же все-таки Observable передает данные в Observer? Observer может имплементировать три метода (названия могут слегка отличаться в зависимости от языка):

  • next() - вот тебе новое значение из потока;
  • error() - вот тебе ошибка, произошедшая в потоке;
  • complete() - поток завершен.

В следующем примере функция getData() превращает массив с пивом в Observable и возвращает его. Кому? Подписчику, когда он появится. А подписчик — getData().subscribe(...​) - передает Observer, как аргумент функции subscribe(). Соответственно, Observer состоит из трех функций:

  • что делать, когда придет следующий элемент потока;
  • что делать, если придет ошибка;
  • что делать, если придет сигнал об окончании потока.
// Defining the function with observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),   // an error arrived
            () => console.log("The stream is over")   // the signal that the stream completed arrived
);

Наш Observer состоит из трех fat arrow functions, которые появились в спецификации языка ECMAScript 6. Функции next() и complete() выполнятся только тогда, когда мы вызовем subscribe(). Посмотреть этот пример в действии можно здесь: bit.ly/2jm69aM (откройте консоль броузера и нажмите Run).

Операторы

Операторы — это функции, которыми можно преобразовывать данные между моментом, когда Observable их отправил, и моментом, когда подписчик их получил. Т.е. преобразовываем данные во время движения. В Rx библиотеках операторов много. Больше сотни.

Каждый оператор — это функция, которая принимает Observable на вход, трансформирует полученное значение и выдает новый Observable на выходе. Так как вход и выход любого оператора одного типа (Observable), операторы можно связывать. Вот, например, как можно отфильтровать пиво, которое дешевле, чем 8 у.е. и преобразовать пивные объекты в строки.

Изучение Rx операторов требует времени, и если у читателей ДОУ будет интерес, то я продолжу писать об Rx. Документация Rx библиотек часто включает marble diagrams, которые могут помочь в понимании, что делает конкретный оператор. Вот, например, как иллюстрируется оператор filter:

Входной поток (Observable), представленный разными геометрическими фигурами, фильтруется, чтобы на выходе получить другой поток (Observable), в котором будут только круги.

А как же все-таки сделать c1=a1+b1 реактивным?

Сначала нужно превратить a1 и b1 в потоки, например, так:

const a1 = Rx.Observable.from([2, 55]);

Но этот поток выстрелит 2 и 55 мгновенно, а мы хотим добавить временную составляющую. Для имитации задержки можно использовать отдельный поток, который просто выстреливает числа через определенные интервалы времени, а чтобы связать его с нашим потоком, который выдает 2 и 55, мы используем оператор zip:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

Когда появится подписчик, наш поток выдаст 2, а через 1.2 секунды — 55. Похожим способом сделаем поток для b1, только с задержкой в полторы секунды. И снова, используя композицию потоков и оператор combineLatest, мы скажем: «Скомбинируй потоки a1 и b1, суммируя их последние значения». Весь код будет выглядеть вот так:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

Чтобы увидеть этот код в действии, посетите Plunker по адресу bit.ly/2nphn0k, откройте консоль броузера и нажмите кнопку Run. Вы увидите, как c1 будет пересчитываться, как только потоки a1 и b1 будут выдавать новые значения.

В мае я буду проводить трехдневный воркшоп в Киеве по разработке веб приложений с Angular 4, который использует RxJS во многих местах. Зарегистрироваться на воркшоп можно здесь: bit.ly/2n6CoKy. Для читателей ДОУ существует скидка 10%. При регистрации введите промо код dou.

Если вы еще не работали с реактивными библиотеками, советую посмотреть на Rx библиотеку для вашего языка программирования и начать использовать ее в реальных проектах. Rx библиотеки не требуют изменения стиля программирования всего проекта. Используйте их там, где можно сделать так, что асинхронные данные проходят через последовательность алгоритмов (операторов).

38 комментариев

Подписаться на комментарииОтписаться от комментариев Комментарии могут оставлять только пользователи с подтвержденными аккаунтами.

Отличная статья! Только сейчас на пальцах понял как это все работает. Много сайтов перечитал, в результате такая каша была в голове! У вас прям талант доходчиво объяснять!

В июле/августе я буду проводить очередной онлайн воркшоп по Ангуляру (на английском), в котором используется RxJS. Детали здесь: bit.ly/2s1f9ad

А как это на Responsiveness влияет?

Техническая статья хорошего качества на ДОУ! Ура!

Яков, спасибо за, как всегда, интересную информацию!
Когда приедете в Киев? Будете на JEEConf в этом году?

Да, у меня будет 2 доклада на JEEConf jeeconf.com/speakers

функция getData() превращает массив с пивом
Потетсил.. НИ ХРЕ НА!!!

Чтобы получилось, надо было дать на вход реальный массив пива! Вот у меня, например, после нескольких бутылок пива, на выходе всегда получается строка :)

Чувак, ты крут! ))))

пони бегают по кругу ... главное новое название погромче придумать.

преобразовать пивные объекты в строки.
з одного боку, це кощунство !
з іншого, в результаті все саме так і відбувається.

Самая коварная вещь в Rx — горячие/холодные сигналы. С ними, как с ленивыми вычислениями: очень сложно явно отследить, где что. Существуют ли «хорошие практики» на эту тему (напр., в контроллерах использовать только холодные, а во вьюхах — только горячие. Пример с потолка)?

Если судить только по этой статье, то вместо того чтобы написать
var с1=function(){return a1 + b1}
, надо зачем-то трахаться с абстракциями.

Для написания больших библиотек может оно чем-то и полезно. Но чаще всего такие поделки обречены на медленную и мучительную смерть. Медленную, потому что найдутся те, кто повёлся и вогнал в боевой код. А добавить в сырой проект явно проще, чем потом убрать из готового.

В чём проблема: код лишается читаемость. А читаемость — самая главная задача языков программирования начиная с ассемблера. Можно придумать 100500 способов написать короче. Но если это НЕ ЧИТАЕТСЯ, значит оно генерит мусор для мозга при попытке читать. Как результат — колоссальные нагрузки по времени и деньгам, чтобы писать без ошибок. А потом ещё большие затраты на отказ править «несрочные» ошибки, просто потому что правка не-авторами это очень дорого.

Выгода от такого подхода есть: крупноблочные абстракции и API. Реактивное API — это хорошо, его тупо взял и пользуешь. Но ключевая проблема API так и не решается: его очень сложно менять. И самыми живучими оказываются те интерфейсы, которые работают по низкоуровневым протоколам — их очень легко расширить или дополнить, не потеряв совместимость. Так что выгода не особенная, просто ещё один стандарт. Ах, как мы любим 100500 стандартов!

С коментарием про ухудшение читаемости кода согласен. Человек, который читает код, должен знать Rx.

Иначе говоря, более 100 операторов во всей их причудливости — делают фактически ещё одним языком. Однако при этом стандартом не являются, что имеет следствием некислую вероятность им пересечься что по ключевым словам, что по примитивам, что по ЗНАНИЮ этих ключевых слов и примитивов программистом — он вполне может знать другую библиотеку или язык.

Как результат: ДОРОЖЕ нанять спецов, ДОРОЖЕ поддерживать, ВДВОЕ ДОРОЖЕ поддерживать когда эта библиотека устареет (а это проблема всех «бурно развивающихся» одной фирмой), ВПЯТЕРО ДОЛЬШЕ развивать проект после устаревания библиотеки. И всё это — ради мифического удобства написания кода?

Я согласен, что Rx на JavaScript фактически занял нишу функционального программирования, но [падшая женщина] какого [достоинства] со своими словами и правилами, что делает его ещё более мерзким когда пишешь сразу на нескольких языках. А смею заметить, JS — язык скриптовый, и потому пользуется не только лишь всеми. А дошло до того, что ради пустяковых задач приходится нанимать неимоверно дорогих обезьян которые по итогу всё равно пишут говнокод, и ещё говнокодистей чем раньше — с ещё большими шансами, что бросят его недоделанным, побежав в новый проект где дедлайн и ответственность ещё не наступили.

Вангую: следующими будут иероглифы. Это ведь сокращение кода!

есть вероятность, что будет стандартом
tc39.github.io/proposal-observable

насчет сокращения кода: www.reddit.com/…​ok_it_changes_everything :)

Если знать что такое Rx, то нормально читается. Ясно, что нужно создать поток, а не просто вернуть значение (т.е. асинхронное потенциально ошибочное вычисление, по факту монаду).
Не знаю как в этом вашем JS, а в императивные языки Rx затаскивает хорошие плюшки из функциональщины и возможность писать сильно компактнее (и надёжнее, т.к. проще отследить, что все кейсы обрабатываются).

Скажу вещь, не очевидную бюрократам-теоретикам: легко читаемый код не нуждается в 100% покрытии тестами. И даже в 10%-м. Потому что прочитать код становится проще, чем прочитать тесты.

Теорию качества знать? А зачем! Даёшь больше контроля!

Читаемый код не гарантирует что что-то может перестать работать, например, в случае проблем с инкапсуляцией и модульностью. Реактивный код позволяет довольно легко инкапсулировать состояния и нарезать код на модули. А вот о «теории качества» действительно не знаю: буду благодарен, если просветите.

Ой, да ладно: JS і «читаемость»... Rx в даному випадку тільки на користь. По факту, потихеньку включать в стандарт (чого тільки не придумаєш, коли багатопоточності немає).
P.S.
І не ієрогліфи, а картинки:
rxmarbles.com

О, да, сразу всё понятно: дорога дальняя, дом казённый. Объяснять — а нахера!

Начал работать над проектом, в котором Angular 2 и RX используются. А начиная с версии 2.1 TS поддерживает компиляцию async/await в ES5, что очень радует. Так вот с RX приходится писать код такого плана:

const response = await this.http.get('api/resource').first().toPromise();
const json = response.json();

Хвост в виде .first().toPromise() как-то не очень удобен и ничего толкового по этому поводу не нашел. После C# отказываться от async/await совсем не хочется. Поделитесь опытом, как еще удобно можно изпользовать эти API?

In Angular the Http object methods return an observable, e.g.

this.http.get('api/resource').map(res => res.json()).subscribe(...);

I didn’t try to combine it via promises with async/await though. See if this helps:

labs.encoded.io/…​/asyncawait-with-angular

Thank you Yakov, my example is from Angular, I just find async/await syntax more clean as it helps to avoid callbacks and makes it possible to write asynchronous code in synchronous manner without callbacks.

It would make very good sense to use Observable and subsribe() method if http.get produced a stream of values, but it returns at most one value and I don’t see a simple straightforward way to consume it with await keyword.

I don’t see issues with consuming observables that consist of just one value. For example:

github.com/…​stclient/app.component.ts

There is no issue in doing that. Maybe I just got used to async/await in C# and trying to find out I can apply same approaches in Angular. Maybe it doesn’t make sense to. I still find it nice to be able to consume asynchronous APIs in same function, i.g.:

try {
    var resp = await http.get('api');
    var model = resp.json();
    // Do some logic with model.
    await http.put('api', model);
    service.notifySuccess();
} catch (e) {
    loger.log(e);
    return;
}

All in the same method, straightforward and easy to read and understand.

this.http.get('api/resource').map(x => x.json())
this.http.get('api/resource').concatMap(x => this.giveMeAnotherObservable(x.json()))

Я о том, что с async/await код намного читабельнее без разных subscribe, map, flatMap, concatMap, arrow functions, functions и тому подобного для такого простого кейса.

Согласен, но есть одно но. В мире .NET есть негласное правило — возвращать Task в активном (горячем) состоянии, чтобы его можно было await-тить.
А в мире Angular & RxJS наоборот, большинство API возвращает cold Observables.
С API можно бороться, а можно его использовать.

Не уверен к чему тут упомянуты горячие таски и холодные observable. Таска представляет асинхнонную операцию (значение), observable — поток таких асинхнонных значений. Использовать да, можно, у меня скорее вопрос зачем возвращать observable где ожидется только одно значение и как его легко консьюмить с await, чтобы писать понятную логику вместо чейнов then, catch, map, subscribe.

Можно, конечно и с промисами, но давай слегка усложним задачу. Когда твой клиент сделал HTTP request сервер лежал и надо сделать несколько попыток приконнектиться каждые две секунды. Теперь напиши это на промисах. А с Observable несколько строчек:
github.com/…​pp/home/home.component.ts

Да, это немного другая программная модель, и, может, async/await сюда лепить и не стоит. А задача довольно простая:

for(let i = 0; i < 3; i++)  {
    try {
        let resp = await http.get('api');
        ...
        break;
    } catch {
        await Promise.delay(2000);
    }
}

И легко выносится в сервисную фунуцию.

А вот еще один сценарий. Пользователь находится в зоне медленного интернета и выдал HTTP запрос. Через 5 секунд пользователю надоело ждать и он выдал новый HTTP запрос. Как убить первый незавершенный запрос?
С RxJS — это одна строчка кода.

Да, это, пожалуй единственный убедительный аргумент который мне встечался, так как предложение внедрить cancellation token было отменено в ES6 и я не помню планируют ли его добавить в следующей версии. Но когда добавят эта задача тоже будет решаться в одну строчку :).

Я ведь не против Rx, я за то, чтобы их можно было удобно await’ить.

Единственная проблема с ркс надо хорошо знать апи чтобы эффективно его использовать, а оно не самое тртвиальное если начинаешь

Для Clojure/Script есть хорошая книга по этой теме www.packtpub.com/…​jure-reactive-programming

Подписаться на комментарии