Вступ до Project Loom. Частина 3. Structured concurrency

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Робота з платформними потоками завжди була пов’язана із виконанням важких операцій блокування. Починаючи з JDK 5, пряме використаття потоків є чимось накшталт анти-патерну.

Для уникнення ситуацій блокування, розробники повинні використовувати Executor та ExecutorService, які приймають завдання на виконання у форматі Runnable, для відстеження роботи завдання розробники отримували об’єкт типу Future.

Такий підхід до программування укладається у рамки концепції асинхронного програмування із застосуванням колбеків. Проблема такого підходу полягає у:

  • Неможливості якісно писати юніт-тести на логіку виконання, яка базується на асинхронних викликах.
  • Профіліювання такого коду є майже неможливим.

Структурний паралелізм (structured concurrency) як альтернатива асинхронній моделі програмування

Структурований паралелізм — це парадигма програмування, спрямована на підвищення чіткості, якості та часу розробки програм за допомогою структурованого підходу до паралельного програмування. Основна концепція полягає в інкапсуляції паралельних потоків виконання за допомогою конструкцій потоку керування, які мають чіткі точки входу та виходу та гарантують, що всі породжені потоки були завершені перед виходом.

Така інкапсуляція дозволяє пропагувати помилки в одночасних потоках до батьківської області керуючої структурного блоку та керувати ними за допомогою нативних механізмів обробки помилок. Щоб бути ефективною, ця модель повинна застосовуватися послідовно на всіх рівнях програми — інакше паралельні потоки можуть витікати, стати обособленими, або помилки виконання не пропагуватимуться належним чином.

Структурований паралелізм (що базується на моделі типу fork-join) аналогічний структурованому програмуванню, яке представило конструкції потоку керування інкапсульовані у послідовні оператори та со-програми.

У Java структурований паралелізм представлений класом StructuredTaskScope. Цей клас реалізує модель типу fork-join:

static Callable<Integer> task = () -> {
    System.out.println(Thread.currentThread());
    Thread.sleep(Duration.ofSeconds(1));
    return 42;
};
static void simpleScope() throws InterruptedException {
    try(var scope = new StructuredTaskScope<>()) {
        var fut = scope.fork(task);
        scope.join();
        System.out.println(fut.resultNow());
    }
}

Що треба знати:

  • Кожна задача виконується у віртуальному потоці за замовчанням (про це згодом).
  • Кожен виклик StructuredTaskScope::fork повертає Future.
  • StructuredTaskScope::join заблокує поток, у якому був створений цей скоуп, до моменту виконання усіх завдань.
  • StructuredTaskScope належить тому потоку який його створив, делегування неможливе.

Чим StructuredTaskScope відрізняється від ExecutorService:

  • StructuredTaskScope грає роль точки запуску потоків (що дуже схоже на модель Executors::newVirtualThreadPerTask);
  • StructuredTaskScope побудований на основі work-stealing ForkJoinPool де кожен поток пулу має власну чергу задач на виконання, що дозволяє перехоплювати задачі на виконання з інших черг очікування, у свою ж чергу ExecutorService подубований навколо єдиної черги виконання, де потоки беруть завдання з одного потоку.

Різновиди StructuredTaskScope

Існує два різновиди скоупів:

Основна ідея полягає в тому, що обидві реалізації побудовані на факті, що скоуп повинен завершитися за певних умов: або за першим успішним результатом, або за першою помилкою. Розглянемо приклад:

try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Integer>()) {
    var a = scope.fork(() -> {
        Thread.sleep(Duration.ofSeconds(4));
        return 1;
    });
    var b = scope.fork(() -> {
        Thread.sleep(Duration.ofSeconds(5));
        return 2;
    });
    var c = scope.fork(() -> {
        Thread.sleep(Duration.ofSeconds(8));
        return 3;
    });
    scope.join();
    System.out.println(scope.result());
    System.out.println("a.state = " + a.state());
    System.out.println("b.state = " + b.state());
    System.out.println("c.state = " + c.state());
}

Скоуп відпрацює таким чином, що лише одна задача буде виконана, а всі інші — відмінені (FAILED). Якщо в одному із завдань виникне помилка, то скоуп буде працювати до тих пір, поки не з’явиться успішний результат. Якщо ж такого не станеться, то винике помилка виконання контенту, ця помилка буде містити лише першу помилку, що виникла.

Таким же чином працює й реалізація StructuredTaskScope.ShutdownOnFailure, що дозволить завершити скоуп за наявності першої ж помилки.

Блокуючі виклики: join & joinUntil

Окрім StructuredTaskScope::join, що заблокує поток-власника цього скоупу до кінця виконання, ще є StructuredTaskScope::joinUntil, що дозволить завершити скоуп за певний проміжок часу:

static void joinUntil() throws InterruptedException, TimeoutException {
    try(var scope = new StructuredTaskScope<Void>()) {
        var fut = scope.fork(() -> {
            Thread.sleep(Duration.ofSeconds(10));
            return null;
        });
        scope.joinUntil(Instant.now().plusSeconds(4));
    }
}

Кастомізація

Як було зазначено раніше, за замовчанням, завдання, що створюються у межах StructuredTaskScope , виконуються безпосередньо у віртуальних потоках. Це не означає, що StructuredTaskScope працюють лише з віртуальними потоками, є можливість визначати StructuredTaskScope таким чином, щоб кожна задача виконувалася у потоці певної конфігурації:

static void customFactory() throws InterruptedException, TimeoutException {
    try(var scope = new StructuredTaskScope<Void>("custom", Thread.ofPlatform().factory())) {
        var fut = scope.fork(() -> {
            System.out.println(Thread.currentThread());
            return null;
        });
        scope.join();
    }
}

Отже, як і ExecutorService, StructuredTaskScope може бути створена з відповідною фабрикою потоків: або з платформеними, або з віртуальними.

Операції з StructuredTaskScope

Один різновид завдань, пов’язаних з StructuredTaskScope — це виконання великої кількості типових завдань:

static void boundStream() throws InterruptedException {
    try(var scope = new StructuredTaskScope<Integer>()) {
        for (Callable<Integer> integerCallable : Collections.nCopies(100, task)) {
            scope.fork(integerCallable);
        }
        scope.join();
    }
}

Приналежність скоупу до батьківського потоку

Виникає питання, як пришвидшити опрацювання колекції, коли завдань не 100, а, скажімо, 1М? Відповідь очевидна — застосування параллельних стрімів:

static void unboundForkInScope() {
    try(var scopeJoinUntil = new StructuredTaskScope<>()) {
        Collections.nCopies(1_000_000, task)
                .parallelStream().forEach(scopeJoinUntil::fork);
        scopeJoinUntil.join();
    } catch (WrongThreadException | TimeoutException | InterruptedException e) {
        System.err.println(e.getMessage());
    }
}

Виникає певна проблема — параллельні стріми виконуються у межах окремого ForkJoinPool (ForkJoinPool.commonPool, за замовчанням), який розповсюджує задачі поміж платформених потоків. Суть проблеми полягає у тому, що скоуп належить до того потоку, який його запустив, і керування ним не може бути делеговано. Отже, таке виконання такого скоупу призведе до наступної помилки:

java.lang.WrongThreadException: Current thread not owner or thread in flock

Задля того, щоб зробити запуск типових задач більш швидким, із використанням параллельних стрімів, треба запускати сам стрім таким чином, щоб керування скоупом залишилося у межах батьківського потоку:

static void parallelStreamInScope() {
    try(var scope = new StructuredTaskScope<>()) {
        scopeJoinUntil.fork(() -> {
            return Collections.nCopies(taskNo, task)
.parallelStream().map(scope::fork).toList();
        });
        scope.join();
    } catch (WrongThreadException | InterruptedException e) {
        System.err.println(e.getMessage());
    }
}

Таким чином усі запуски задач через StructuredTaskScope::fork відбуваться у межах одно батьківського потоку, бо нема різниці, що саме виконує задача, єдина вимога — уникати спроб запуску задач у межах скоупу за межами потоку, до якого нележить сам скоуп.

Трохи висновків

Структурний параллелізм, як модель програмування, є повноцінною альтернативою асинхронному прогрумуванню. Основа структурного параллелізму — модель fork-join. Реалізація цієї моделі зазвичай впроваджується на основі «легких» потоків (у Java — на базі віртуальних потоків), а не на основі системних, себто, важких потоків або процесів.

Як «бекенд», згідно з теорією, повинен використовуватися пул потоків, але розуміючи специфіку віртуальних потоків, то їх взагалі не треба збирати у пули, враховуючи їхню природу. Одночасно із цим щось таки повинно виконувати ці віртуальні потоки, тому в якості справжнього бекенду використовується work-stealing ForkJoinPool, що ефективно дозволяє виконувати задачі у межах декількох черг очікування.

У Java cтруктурний параллелізм представлений добіркою классів на основі StructuredTaskScope. Цей клас дозволяє виконувати завдання, огорнуті у Callable. Метод StructuredTaskScope::fork повертає об’єкт типу Future. Наразі існує три види скоупів:

  • StructuredTaskScope — блокує поток до виконання усіх завдань;
  • StructuredTaskScope.ShutdownOnSuccess — виконується допоки не виконається одна із задач успішно;
  • StructuredTaskScope.ShutdownOnFailure — виконується до першої помилки в одному із завдань.

З точки зору програмування, нема жодних обмежень на вкладеність скоупів один в одний. Загалом нема жодних обмежень на модель програмування, окрім тих, які вимагають створення задач у скоупі у межах одного потоку: або у межах батьківського потоку, або у межах форку. Загалом цінність реалізації структурного параллелізму у Java є дуже великою враховуючи ті проблеми, які у собі несе асинхронна модель програмування.

Код

Код для цієї статті доступний тут.

P.S.

Пам’ятайте про те, що Project Loom, віртуальні треди та структурний параллелізм доступно у режимі Preview. Для того, щоб працювати з цим API:

  • Скомпілюйте програму за допомогою javac --release 19 --enable-preview Main.java та запустіть її за допомогою java --enable-preview Main.
  • Використовуючи програму запуску вихідного коду, запустіть програму з java --source 19 --enable-preview Main.java.
  • Використовуючи jshell, почніть його з jshell --enable-preview.

----

dev.java | inside.java | Java on YouTube | Java on Twitter | Me on Twitter

Сподобалась стаття? Натискай «Подобається» внизу. Це допоможе автору виграти подарунок у програмі #ПишуНаDOU

👍ПодобаєтьсяСподобалось0
До обраногоВ обраному0
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Підписатись на коментарі