Чому вам НЕ потрібен ReactiveX
Багато «нових ідей» сьогодні в програмуванні (та й не тільки) висловлені далеко не вперше. Але той факт, що ідея згадується в статті десь років
ReactiveX — це бібліотека (набір бібліотек), що винесла ідею «реактивного» підходу у програмуванні на новий рівень і зробила його популярним (RxJava має 47К зірок на Github,
У мене була нагода використовувати цю бібліотеку ще з перших версій RxJava в різних варіантах: в Android застосунках, desktop проєктах та в серверних Java-демонах. Дещо працює з ReactiveX і до сьогодні, щось встигли переписати без використання цієї бібліотеки, а часто все від початку було написано без неї. З огляду на такий досвід і написана ця стаття.
Звідки взявся ReactiveX
Вікіпедія пише, що ReactiveX виник як побічний продукт якогось проєкту в Microsoft, про який ніхто не знає. Але це насправді нікому не цікаво. 🙂
Корисніше буде зрозуміти, що спонукало шукати новий підхід. У тому вигляді, у якому ця бібліотека є зараз, вона покриває цілий спектр задач, про які є сенс згадати.
Підписка на події. Стара як світ задача: на екрані є кнопка, потрібно щось зробити, коли хтось натиснув на неї. Як це вирішують:
- Найпростіше: зробити поле onClick, що містить посилання на callback.
- Інколи цього не достатньо, і ду-у-уже бажано, аби була змога підписатися на подію декілька разів. Тоді роблять обгортки на кшталт EventListener — класу, що інкапсулює посилання на список callback-ів. Зазвичай у UI-фреймворках на цьому й зупиняються, оскільки щось складніше починає бити по performance.
- Іноді до попереднього хочеться додати, наприклад, debounce, обробку в іншому потоці, чи викликати ще якусь тривалу операцію, після якої вже зробити щось своє. Тут RX і допоможе.
Застосування як Promise. Існує багато історій про те, як поява Promise в JS полегшила роботу та допомогла уникнути callback hell. В цілому, Promise реалізує частину того, що доступно в ReactiveX. Тому, з появою останнього, promise-based-підхід став доступний в більшості мов програмування. Цікаво, що promise-и з нами орієнтовно лише з 2014. А в RxJava перші релізи вийшли у 2013. Можна сказати (з натяжкою), що ці інструменти — однолітки.
Застосування як Stream. Тут я маю на увазі як Java Stream, хоч і подібні ідеї присутні багато де: зробити ітератор, перетворити його, згорнути результат куди треба. У певному сенсі ReactiveX теж так може.
Побудова стабільних pipeline-ів. Наприклад, у вас є потік подій (запити, логи, транзакції), і ви хочете зробити демон, який їх приймає, обробляє (паралельно в різних потоках) і складає в базу (батчами, щоб якось ефективно було). Це все можна реалізувати на чергах, але в термінах ReactiveX, цей демон може мати вигляд одного pipeline.
Зрештою якщо об’єднати ці всі ідеї та додати до них купу цікавих операторів, вийде ReactiveX.
Як працює ReactiveX
Тут і далі приклади будуть на RxJava — оскільки саме із цією версією ReactiveX мені доводилось працювати найбільше.
Розглянемо код
Observable<Integer> A = Observable.range(0, 30) .skip(10) .take(5) .map((v) -> v+100); A.subscribe((v) -> System.out.println("A"+v)); A.subscribe((v) -> System.out.println("B"+v));
Тут ми можемо спостерігати одразу декілька цікавих аспектів:
- Доки ми не зробимо subscribe, як такого pipeline-у обробки подій чи даних не існуватиме. До цього моменту — то лише декларації. Тому тут два subscribe-и виведуть дані двічі (більш зрозуміло стає, якщо розібратись як працює Observable.create).
- Також бачимо, як ми можемо перетворювати «потік даних» від
Observable
доsubscribe
, застосовуючи декілька з численних готових операторів (skip, take, map). - Є сенс додатково зауважити, що оскільки кожен
subscribe
запускає окремий потік обробки, всі ці оператори роблять свою роботу теж двічі.
В цілому, якщо не вдаватись у деталі, ReactiveX будує передачу даних на push-based принципі (подивіться інтерфейс Observer.java): тобто дані передаються між ланками шляхом виклику onNext
.
Тобто:
- ви будуєте pipeline;
- у нього викликаєте
subscribe
; - цей
subscribe
по всьому ланцюжку доходить до початковогоObservable
(їх може бути декілька); - в
Observable
викликаєтьсяsubscribe
, який починає onNext-ити події; - ці події проходять операторами;
- й осідають в Consumer-і subscribe-а.
Кожен потік даних ініційований subscribe-ом складається з послідовності onNext-ів та в кінці одним onError або onComplete. Тут важливо розуміти, що якщо десь виникає необроблена помилка і створюється onError
, обробка даних зупиняється і залишок даних ігнорується. Ця поведінка повністю збігається з логікою роботи promise-ів. Але не очевидна, якщо потрібен, наприклад, EventHandler (зупинимось на цьому пізніше).
Якщо ви раніше не користувались RxJava, для повноцінного розуміння потрібно також розібратись з Observable
vs Flowable
, різними способами їх створення, поняттям backpressure, з операторами subscribeOn
/ observeOn
та подивитись бібліотеку операторів (там багато гарних діаграм 🙂). Але опис цих речей виходить за рамки статті, краще глянути документацію.
Типові способи використання
Використання в ролі EventHandler. Типовим способом опису джерела подій є PublishSubject наступним способом:
// створюємо “реєстр” обробників PublishSubject<String> onEvent= PublishSubject.create(); // підписуємось на події onEvent.subscribe((v) -> System.out.println("A: "+v)); // ініціюємо подію onEvent.onNext("1"); onEvent.onNext("2"); //підписуємось на подію, але ігноруємо більше однієї події за секунду onEvent.debounce(1, TimeUnit.SECONDS).subscribe((v) -> System.out.println("B: "+v)); onEvent.onNext("3"); onEvent.onNext("4");
На цей onEvent
можна підписатись багато разів, з різними операторами у дорозі.
Застосування як Promise. Типовий спосіб використання виглядає як:
service.apiCall() .flatMap(value -> service.anotherApiCall(value)) .flatMap(next -> service.finalCall(next)) .subscribe()
Тут ми експлуатуємо flatMap
так, щоб Observable
, які повертаються з service.*
, оброблялись почергово. Тобто три виклики будуть оброблятися послідовно, хоч вони й мають асинхронну природу.
Застосування як Stream. Для такого сценарію в документації є приклад:
Flowable.range(1, 10) .parallel() .runOn(Schedulers.computation()) .map(v -> v * v) .sequential() .blockingSubscribe(System.out::println);
Тут ми паралельно обробляємо 10 задач на окремому Scheduler
(інкапсулює Thread Pool), а після обробки складаємо їх знову в одну послідовність.
Типові проблеми
Неочікуваний onError. Якщо використовувати сценарій з EventHandler-ом і підписуватись на Subject, і з якоїсь причини обробник події викидає помилку, підписка скасовується. Ну а зловити NPE десь посеред onClick — то звичайна справа. Погодьтесь, що відписати обробника події як реакцію на помилку — то занадто. Тут можна запропонувати використати retry-оператор, але очевидності до роботи це не додає.
Лeгко зловити OutOfMemory. Найпростіший приклад може виглядати так:
Observable.range(1, 1000000000) .toFlowable(BackpressureStrategy.BUFFER) .blockingSubscribe(x -> { Thread.sleep(1000); System.out.println(x); });
Цей код виїсть усю доступну пам’ять і вивалиться в OutOfMemory. Чому? А все тому, що отой toFlowable
створює буфер необмеженого розміру. І це є стандартна поведінка для багатьох операторів. За своєю природою буфери в реактивному підході — це перша необхідність. І їх використовується багато. Але ось чому вони тут за замовчуванням unbounded — це загадка. Якщо вам здається, що у вас достатньо досвіду, щоб такого уникати — вас точно десь очікує «сплячий» OutOfMemory.
Автори бібліотеки могли б легко нівелювати подібну проблему, якби всі без винятку буфери за замовчуванням були б поставлені, наприклад, розміром в 100 елементів, але з певних причин вибір був зроблений на користь того, аби залишити їх необмеженими. Для цієї дискусії навіть тікет є.
Відсутність інструментів відлагоджування та моніторингу. Якщо під час робити десь вилітає exception, то stack trace, скоріш за все, буде містити велику кількість сміття, водночас корисних викликів може й не виявитись (залежить від операторів, які використовуються). Це можна вважати проблемою, але варто зазначити, що цей нюанс є практично у всіх подібних інструментах. Навіть з корутинами не завжди все в порядку: почитайте за Kotlin.
З приводу ж моніторингу, питання може стати руба, якщо порядок обробки даних є одним з центральних процесів вашої програми. Між етапами обробки будуть черги, і ви захочете знати, де ж у вас вузьке місце: чи файлова система, чи мережа, чи cpu, чи десь щось погано написано. Й інформація про те, в якому місці пайплайну у вас застряють дані, дуже допомагає для пошуку проблеми. І ви захочете це експортувати кудись в Prometheus і потім в Grafana. У RxJava налагодити моніторинг — місія майже неможлива.
Неочікувана поведінка операторів. Тут, звісно, твердження суб’єктивне, адже в документації написано, що і як працює. Але все-таки розгляньмо наступний випадок.
Часто виникає задача, коли є потік подій, які треба масово записати в базу. Щоб це робити ефективно, їх треба писати в базу batch-ами. Для такого групування є оператор buffer. Наче добре, але ... цей буфер може або віддавати групи по count елементів (не підходить, оскільки незаповнений буфер буде вічно сидіти в пам’яті), або кожний фіксований інтервал часу (не підходить, адже потенційно недоутилізує всі можливості бази). А треба, щоб він віддавав пачки не більше ніж count, але віддавав батч одразу, коли підписник готовий писати наступну пачку в базу (без жодної затримки, якщо в черзі є задачі).
Можна було б ще наводити приклади операторів, що «чогось не вміють». Але, з іншої сторони, неможливо реалізувати всі-всі-всі варіанти в бібліотеці.
Надзвичайна складність реалізації своїх операторів. Як продовження попереднього. Наче ж просто: якщо в бібліотеці чогось не вистачає — просто візьми та допиши. Насправді свого часу я так і зробив. Зрештою в мене є декілька операторів під свої потреби, правда, ще під старі версії RxJava.
Процес написання свого оператора виглядає десь так: спочатку шукаєш найбільш схожий за структурою готовий оператор. Потім довгий час намагаєшся розібратись, як він працює. Опісля копіюєш і правиш під себе. У кінці приходить розуміння, що впевненості в тому, що нічого не провтикав, немає зовсім. І справа навіть не в тому, заплутаний код чи ні, а в тому, чи вдалося повністю дотриматись досить неочевидних контрактів. Унаслідок цього пишеться купа тестів і приймається на віру, що воно не зламається.
Якщо хочете, можете взяти два-три цікавих вам оператори й спробувати розібратись, як вони працюють.
Альтернативи
Шукати єдину альтернативу не варто, оскільки однією з причин проблем з ReactiveX є те, що автори спробували зробити все й одразу.
Але для специфічних задач є не гірші, а то й кращі рішення.
Для EventHandler використовуйте те, що є у вашому UI-фреймворку. У більшості випадків цього достатньо. А замість debounce / throttle ставте кнопці disable, поки ви не хочете повторних натискань. Якщо ж у вас «свій фреймворк», і там треба події — не полініться і напишіть свій EventHandler клас на 40 стрічок. Усі ми їздимо на своїх велосипедах — це нормально.
Для Promise ... краще взагалі не використовуйте Promise — це напівміра і, як на мене, антипаттерн. Замість нього подивіться на корутини. Вони вже зʼявляються майже всюди. Це зробить ваш код чистішим та простішим. На Java, якщо дуже треба, напишіть частину на Kotlin — пограєтесь з компіляцією, але буде краще, ніж з RX чи CompletableFuture.
Для Stream-like задач в Java використовуйте Stream, а в інших мовах шукайте схожі механізми. Функціональне програмування зараз люблять і такі можливості з’являються.
Для багатопотокових pipeline-ів у мене зараз немає гарної рекомендації. Розумію, що рішення на чергах буде громіздким. Але ви будете знати, що воно не зламається через причину, яку треба шукати глибоко в надрах бібліотеки. Існують, також, проекти як Apache Beam, але це тема для окремої дискусії.
Для Android писати код складно. Але Rx простіше не зробить — ви просто поміняєте одні граблі на інші. Хочете простіше — пишіть на Flutter/Dart — кращого підходу для UI я ще не бачив (якщо не розглядати веб — то інше трохи).
Однаково, ReactiveX!
Якщо ж ви все-таки вірите в те, що цей інструмент для вас, не піддавайтесь на зовнішню простоту бібліотеки та кожен раз намагайтесь переконатись, що ви точно розумієте, що відбувається:
- Що буде, якщо підписатись двічі? Чи subscribe взагалі присутній? (наче смішна помилка, але трапляється часто)
- А якщо десь помилка? Навіть якщо її «не може бути» (OOM, NPE чи ділення на нуль може бути всюди).
- Скільки та які потоки запускаються, скільки їх запускається, коли та де вони запускаються (в яких pool-ах)?
- Як налаштувати моніторинг процесів (якщо такий має сенс)?
- Чи ви впевнені, що якийсь Map/FlatMap, що виконується в якомусь pool-і, не зависне і не призведе до того, що pool більше не зможе приймати задачі? Така проблема запросто може зʼявитись у зовсім неочікуваному місці.
- З іншої сторони: чи впевнені ви, що у вас немає безконтрольного створення потоків, що кінець-кінцем призведе до виснаження ресурсів, якраз коли навантаження трохи підросте?
- Як «тихо» зупинити виконання, якщо прийшов, наприклад, KeyboardInterrupt, і водночас не втратити дані, що перебувають у процесі обробки?
Звісно, схожі проблеми можуть виникнути й без використання ReactiveX, але бібліотека точно не спростить роботу з ними.
Висновки
Однією з проблем бібліотеки є те, що автори намагались реалізувати інструмент як швейцарський ніж, який можна було б застосувати для багатьох принципово різних задач (як приклад, promise, stream та event listener), що диктувало певні архітектурні рішення (як логіка обробки помилок).
З іншої сторони, реалізація вийшла такою, що використання бібліотеки змушує враховувати багато факторів, деякі з яких не те щоб очевидні. А як ви розумієте, програмувати складно, коли не завжди очевидним є розуміння того, що може піти не так.
В цілому, ReactiveX несе дуже гарну ідею, але реалізація в тому вигляді, в якому вона є зараз, навіть якщо у всьому розібратись, спричиняє більше проблем, ніж їх вирішує.
23 коментарі
Додати коментар Підписатись на коментаріВідписатись від коментарів