Вступ до Project Loom. Частина 2: Continuations

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

У минулій статті циклу я хотів вас познайомити із Project Loom — реалізацією віртуальних потоків (Virtual Threads) у JDK, починаючи із 19-ї версії (у режимі Preview Feature). Якщо ви не читали попередню статтю, то дуже рекомендую це зробити, проте ось ключові моменти, що стосуються віртуальних потоків:

  • Модель реалізації мережевих застосунків типу thread-per-request має недоліки, бо кількість системних потоків вичерпується швидше за всі інші ресурси (процесорний час, оперативна пам’ять, диск).
  • Вводиться ще один підтип потоків — Virtual Thread.
  • Тепер існує два види потоків: Virtual Thread, Platform Thread.
  • Virtual Thread є реалізацією Thread.
  • Віртуальні потоки — Java-об’єкти.
  • Віртуальні потоки більше не співвідносяться до системних потоків як один до одного.
  • Віртуальні потоки виконуються у середині потоків-носіїв (carrier thread) типу Platform Thread.
  • Потоки-носії об’єднані у пул типу work-stealing ForkJoinPool, конфігурація якого можлива через спеціальні властивості JVM.
  • У той час як кількість платформенних потоків обмежена доступними для процесу системними потоками, кількість віртуальних потоків обмежена лише наявною пам’яттю.
  • Віртуальні потоки ідеально підходять для задач які часто блокуються (типу I/O), платформенні потоки — для задач із високим навантаженням на CPU.

Щоб надалі не плутатися у термінології, давайте зафіксуємо наступні поняття:

  • Системний потік — потоки операційної системи (POSIX thread, Win32 thread).
  • Платформенний потік — класичні потоки Java (Thread).

Виконання віртуальних потоків

Як ви, мабуть, знаєте, то модель реалізації серверних застосунків thread-per-request, що базується на платформенних потоках, має один великий недолік — поток, в якому виконується задача, повністю резервується під ії виконання (від моменту отримання запиту до фактичного надання відповіді). Тобто, якщо навіть потік очікує відповіді від сторонніх підсистем (запит в БД, наприклад) або інших потоків, то це призведе до простоювання в очікуванні (idling), що не є ефективним у той час, коли інші задачі могли б виконуватися на цьому потоці, утилізуючи системні можливості на максимум.

Ситуація із віртуальними потоками концептуально інша. Механізм роботи із віртуальними потоками розроблений таким чином, щоб дозволити «паркувати» (park, unpark) віртуальні потоки у той самий момент, коли вони очікують на завершення операцій певного характеру, таким чином інші віртуальні потоки мають змогу зайняти оперативний час потоку-носія, тому, із дуже великою вірогідністю, потоки-носії не будуть простоювати. За це відповідає планувальник (scheduler) типу work-stealing ForkJoinPool. Такий тип планувальників особливо добре підходить для планування потоків, які, як правило, часто блокуються та виконують I/O-операції або комунікують із іншими потоками. Очікується, що ForkJoinPool в асинхронному режимі може служити чудовим планувальником потоків за замовчуванням для більшості випадків використання віртуальних потоків на потоках-носіях. Давайте розглянемо простий приклад використання віртуальних потоків на прикладі потоків, що пишуть у консоль ідентифікатор актуального потоку:

public class IOswitch extends Util {
    public static void main(String[] args) throws InterruptedException {
        var threads = IntStream.range(0, 10).mapToObj(
            index -> Thread.ofVirtual().unstarted(() -> {
            try {
                updatePlatformOrVirtualThreadMap();
                System.out.println(Thread.currentThread());
                updatePlatformOrVirtualThreadMap();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        })).toList();
        threads.parallelStream().forEach(Thread::start);
        for (Thread thread : threads) {
            thread.join();
        }
        printVirtualThreadMap();
    }
}

Результат:

VirtualThread[#22]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1: [
    VirtualThread[#29] was mounted 2 times
    VirtualThread[#26] was mounted 2 times
    VirtualThread[#21] was mounted 2 times
    VirtualThread[#30] was mounted 2 times
]
ForkJoinPool-1-worker-4: [
    VirtualThread[#27] was mounted 1 times
    VirtualThread[#25] was mounted 1 times
    VirtualThread[#24] was mounted 1 times
    VirtualThread[#23] was mounted 1 times
    VirtualThread[#22] was mounted 2 times
]
ForkJoinPool-1-worker-2: [
    VirtualThread[#27] was mounted 1 times
]
ForkJoinPool-1-worker-3: [
    VirtualThread[#28] was mounted 2 times
    VirtualThread[#25] was mounted 1 times
    VirtualThread[#24] was mounted 1 times
    VirtualThread[#23] was mounted 1 times
]

У моєму випадку, у JVM за замовчанням є лише 4 потоки-носії (ForkJoinPool-1-worker-<1, 2, 3, 4>), тому що в мене є всього 4 CPU. Але, як зазначено раніше, є можливість маніпулювати розмірністю пулу потоків-носіїв через властивості JVM (jdk.virtualThreadScheduler.parallelism). Згідно документації, існує ще дві властивості:

  • jdk.virtualThreadScheduler.maxPoolSize — максимальний розмір пулу потоків-носіїв. Якщо встановлено, то вибирається мінімальне значення у порівнянні до jdk.virtualThreadScheduler.parallelism, якщо ні, то максимальне між jdk.virtualThreadScheduler.parallelism та 256.
  • jdk.virtualThreadScheduler.minRunnable — мінімально допустима кількість основних потоків, які не одночасно не заблоковані.

Важливе! Слід розуміти, дефолтні конфігурації ForkJoinPool’у є оптимальними для всіх потенційних сценарієв застовування, тому змінюйте ці значенні дуже обережно і проводьте тестування!

Як можете бачити, PrintWriter::println провокує JVM на те, щоб віртуальний поток був припаркований до тих пір, коли println завершить своє виконання. Результат роботи цього коду є дуже показовим, бо доводить тезу, що немає жодних гарантій, що віртуальний поток опиниться на тому ж самому потоку-носії, окрім ситуацій, які явно цього вимагають (мова йде про певні примітиви синхронізації). У цьому нема жодних проблем, бо віртуальні потоки майже, як-то кажуть, self-contained (не знаю відповідного терміну українською, вибачте). Логічно було би запитати, а що ж відбувається із віртуальним потоком у той час, коли він припаркований? А нічого, ба більше того, JVM копіює увесь стек віртуального потоку у heap. Таким чином стан потоку залишиться незмінним, окрім фактичного потоку-носія. Отже, підтверджується факт, що кількість віртуальних потоків обмежена лише наявною пам’яттю для JVM.

Для більш поглибленого розуміння давайте порівняємо, як себе поводять платформенні та віртуальні потоки в ідентичній ситуації:

public class VirtualThreadPinning extends Util {
    static void _runTask(Thread.Builder builder) throws Exception {
        var threads = IntStream.range(0, 10).mapToObj(index -> builder.unstarted(() -> {
            try {
                updatePlatformOrVirtualThreadMap();
                System.out.println(Thread.currentThread());
                updatePlatformOrVirtualThreadMap();
                Thread.sleep(Duration.ofSeconds(1));
                updatePlatformOrVirtualThreadMap();
                System.out.println(Thread.currentThread());
                updatePlatformOrVirtualThreadMap();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        })).toList();
        // mounts a virtual thread to a carrier thread and runs in
        threads.parallelStream().forEach(Thread::start);
        for (Thread thread : threads) {
            thread.join();
        }
    }
    static void runWith_virtual() {
        try {
            _runTask(Thread.ofVirtual());
            printVirtualThreadMap();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    static void runWith_platform() {
        try {
            _runTask(Thread.ofPlatform());
            printPlatformThreadMap();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Optional.of(System.getProperty("threadType")).ifPresentOrElse(
                threadType -> runMethod(VirtualThreadPinning.class, "runWith_" + threadType),
                () -> runMethod(VirtualThreadPinning.class, "runWith_platform"));
    }
}

Платформенні потоки

Thread[#24,Thread-3,5,main]
Thread[#26,Thread-5,5,main]
Thread[#23,Thread-2,5,main]
Thread[#27,Thread-6,5,main]
Thread[#28,Thread-7,5,main]
Thread[#29,Thread-8,5,main]
Thread[#22,Thread-1,5,main]
Thread[#21,Thread-0,5,main]
Thread[#30,Thread-9,5,main]
Thread[#25,Thread-4,5,main]
Thread[#23,Thread-2,5,main]
Thread[#30,Thread-9,5,main]
Thread[#28,Thread-7,5,main]
Thread[#24,Thread-3,5,main]
Thread[#27,Thread-6,5,main]
Thread[#29,Thread-8,5,main]
Thread[#21,Thread-0,5,main]
Thread[#26,Thread-5,5,main]
Thread[#22,Thread-1,5,main]
Thread[#25,Thread-4,5,main]
Thread-3: [
    PlatformThread[24] was mounted 4 times
]
Thread-4: [
    PlatformThread[25] was mounted 4 times
]
Thread-5: [
    PlatformThread[26] was mounted 4 times
]
Thread-6: [
    PlatformThread[27] was mounted 4 times
]
Thread-7: [
    PlatformThread[28] was mounted 4 times
]
Thread-8: [
    PlatformThread[29] was mounted 4 times
]
Thread-9: [
    PlatformThread[30] was mounted 4 times
]
Thread-0: [
    PlatformThread[21] was mounted 4 times
]
Thread-1: [
    PlatformThread[22] was mounted 4 times
]
Thread-2: [
    PlatformThread[23] was mounted 4 times
]

Як можна побачити, увесь код потоку виконується в середині одного й того ж потоку. Тобто, навіть про засипанні потік не перемикається на виконання іншої задачі, а просто чекає. З віртуальними потоками знову все інакше, а саме:

VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#23]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1: [
    VirtualThread[#29] was mounted 2 times
    VirtualThread[#22] was mounted 1 times
    VirtualThread[#21] was mounted 1 times
    VirtualThread[#30] was mounted 1 times
]
ForkJoinPool-1-worker-4: [
    VirtualThread[#29] was mounted 2 times
    VirtualThread[#27] was mounted 2 times
    VirtualThread[#26] was mounted 2 times
    VirtualThread[#23] was mounted 3 times
    VirtualThread[#22] was mounted 2 times
    VirtualThread[#21] was mounted 2 times
    VirtualThread[#30] was mounted 2 times
]
ForkJoinPool-1-worker-2: [
    VirtualThread[#28] was mounted 1 times
    VirtualThread[#27] was mounted 1 times
    VirtualThread[#21] was mounted 1 times
]
ForkJoinPool-1-worker-3: [
    VirtualThread[#28] was mounted 3 times
    VirtualThread[#27] was mounted 1 times
    VirtualThread[#26] was mounted 2 times
    VirtualThread[#25] was mounted 4 times
    VirtualThread[#24] was mounted 4 times
    VirtualThread[#23] was mounted 1 times
    VirtualThread[#22] was mounted 1 times
    VirtualThread[#30] was mounted 1 times
]

Та ж сама операція засипання на рівні з I/O-операцією призведе до «паркування» потоку і вивільнить поток-носій для виконання інших віртуальних потоків. Слід зазначити, що усі блокуючі методи були перероблені таким чином, щоб мати різний вплив на різні типи потоків, як, наприклад Thread::sleep:

public static void sleep(Duration duration) throws InterruptedException {
    long nanos = NANOSECONDS.convert(duration);  // MAX_VALUE if > 292 years
    if (nanos < 0)
        return;
    if (currentThread() instanceof VirtualThread vthread) {
        vthread.sleepNanos(nanos);
        return;
    }
    // convert to milliseconds
    long millis = MILLISECONDS.convert(nanos, NANOSECONDS);
    if (nanos > NANOSECONDS.convert(millis, MILLISECONDS)) {
        millis += 1L;
    }
    sleep(millis);
}

Поновлення (continuation) віртуальних потоків

Отже, віртуальні потоки були не єдиною зміною внесеною у JDK 19, починаючи з build 25. Наряду із відповідними класами для віртуальних потоків, сама JDK зазнала багато змін, які торкнулися майже усіх модулів. Однією важливою зміною було відгалудження логіки блокування потоків: для платформенних нічого не змінилося, блокуючі методи все ще блокують потік повністю, а от блокування віртуального потоку призводить до того, що поток-носій, який його виконував, маркується як вільний. Але от як саме це відбувається — дуже цікаве питання.

Одною із другорядних цілей було впровадження додаткових публічних API — Continuations API. Термін «continuation» означає обмежене продовження (також називається корутиною). З точки зору виконання коду, «continuation» — послідовний код, який може призупинити (сам себе) і відновити (бути відновленим ззовні). Отже, призупинене «continuation» — це об’єкт, який після відновлення або «виклику» виконує решту обчислень, починаючи з відповідної точки зупинки (я не знаю кращого терміну українською мовою, ніж «поновлення», але для цілістності тексту я буду використовувати англіцизм — контінуація).

Фактично, розмежоване продовження — це послідовна підпрограма з точкою входу (як потік), яку ми називатимемо просто точкою входу, яка може призупинити або припинити виконання в певний момент, який ми називатимемо точкою виходу. Коли обмежене продовження призупиняється, керування передається за межі продовження, а коли воно відновлюється, керування повертається до останньої точки виходу, а контекст виконання до точки входу не змінюється.

Фактично, віртуальний потік — це об’єкт типу Runnable, виконання якого складається із послідовних контінуацій:

public class Continuation {
    …
    private final Runnable target;
    …
    private Continuation parent;
    private Continuation child;
    …
    private void mount() {…}
    …
    private void unmount() {…}
    …
    public final void run() {…}
    …
    public static boolean yield(ContinuationScope scope) {…}
    …
    private void enter0() {
       target.run();
    }
}

Враховуючи те, що всі методи блокування та синхронізації потоків мають свій власний вплив на віртуальні потоки, то при запуску Runnable, коли трапляється метод блокування потоку, віртуальний потік «паркується» через метод yield, таким чином наступає оперативна пауза для коду у Runnable, стек виконання переміщується у heap, до тих пір, поки не прийде сигнал відновлення run ззовні. У цей момент стек виконання (той самий віртуальний поток за хешем) буде вивантажений з heap та розміщений у новому потоку-носії.

Якщо розглянути контінуацію поза контекстом віртуальних тредів, то за функціоналом контінуація дуже схожа на Python’ський генератор (окрім необхідності запуску генератору):

public class Continuations {
    public static void main(String[] args) {
        ContinuationScope scope = new ContinuationScope("scope");
        Continuation c = new Continuation(scope, () -> {
            System.out.println("stage one");
            // unmounts a continuation
            Continuation.yield(scope);
            System.out.println("stage two");
        });
        System.out.println("start");
        // mounts a continuation to a carrier thread
        c.run();
        System.out.println("after start");
        c.run();
        System.out.println("stop");
    }
}

Результат:

start
stage one
after start
stage two
stop

Тобто за допомогую Continuation::yeild можна поставити на паузу виконання певного коду та віддати керування контінуацією за межі Runnable. Як бачите, сфера застосування контінуацій дуже різноманітна, але наразі повноцінно-довершена реалізація існує лише у вигляді віртуальних потоків, що дозволяє призупиняти та відновлювати виконання підчас та після блокуючих викликів.

Слід зазначити, що контінуації наразі не доступні для публічного використання, доступ можливий із відповідними параметрами JVM:

--add-opens=java.base/jdk.internal.vm=ALL-UNNAMED

адже з самого початку такої першочергової цілі у Project Loom не стояло. Можливо, у майбутньому ситуація зміниться.

Підведемо підсумки

Прослідковується певна ієрархія об’єктів: VirtualThread —> Continuation —> PlatformThread —> ForkJoinPool. Контінуації, як такі, не є новим концептом у «Computer Science», частіше за все зустрічається альтернативне визначення — корутини (горутини). У Java-контінуації дозволяють призупиняти та продовжувати виконання байткоду з моменту блокуючого виклику. Виходячи з усього сказаного, віртуальний потік — лише байткод, виконання якого розподілено на декілька сегментів — контінуацій, за виконання котрих відповідають платформенні потоки, керовані планувальником типу work-stealing ForkJoinPool.

Бонус

Отже, віртуальний потік — це лише Java-об’єкт, тому їхня кількість обмежена лише наявною пам’яттю. Логічним питанням буде, а скільки і як швидко можна запустити віртуальних потоків. Мій первинний експеримент дав можливість запустити десь приблизно 2 мільйони потоків, але якщо скоротити обсяг задач для потоку до наповнення мапи (де ключ — це потік-носій, а значення — список віртуальних потоків, які були виконані цим потоком), то можна досягти ще більших результатів:

public class HowManyVirtualThreads extends Util {
    public static void main(String[] args) {
        var threads = IntStream.range(0, 10_000_000).mapToObj(
                i -> Thread.ofVirtual().unstarted(Util::updatePlatformOrVirtualThreadMap)).toList();
        var start = Instant.now();
        threads.parallelStream().forEach(Thread::start);
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        var stop = Instant.now();
        System.out.println("execution time: " + Duration.between(start, stop).toMillis() + "ms");
        System.out.println("platform threads: " + getVirtualThreadsMap().keySet().size());
    }
}

Результати:

execution time: 12153ms
platform threads: 4

10 мільйонів потоків за 12 секунд! Це майже 850 тисяч потоків за секунду! Враховуйте те, що мова йде про досить застарілий Macbook Pro 13″ 2018! Фактично, результати будуть інші на більш потужному залізі, скажемо, де є 32 CPU та як мінімум 64Гб RAM — cитуація буде кардинально інша. Слід врахувати, що зі зростанням стеку викликів кількість потоків буде якісно змінюватися, але не слід очікувати драматичних показників як у випадку платформенних потоків.

Код із цієї статті...

... доступний на GitHub.

P.S.

Пам’ятайте про те, що Virtual Threads, Continuations доступно у режимі Preview для JDK 19+25. Для того, щоб працювати з цим API:

  • Скомпілюйте програму за допомогою javac --release 19 --enable-preview Main.java та запустіть її за допомогою java --enable-preview Main.
  • Використовуючи програму запуску вихідного коду, запустіть програму з java --source 19 --enable-preview Main.java.
  • Використовуючи jshell, почніть його з jshell --enable-preview.

dev.java | inside.java | Java on YouTube | Java on Twitter

👍ПодобаєтьсяСподобалось5
До обраногоВ обраному1
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Чи правильно я розумію, що якщо я використовую віртуальний потік, то після припинення та продовження роботи мій код може виконуватися не в тому потоці, який був спочатку?

І так і ні. Якщо ваш код не використовує synchoronized, то не очікуйте, що віртуальний поток повернеться на той самий поток-носій після того як його відпаркують.

Як тоді бути з технологіями, які зберігали свій стан у ThreadLocal? Де їм тепер зберігати його?

ThreadLocals нікуди не ділися, віртуальні потоки теж мають їх. Дивіться на віртуальні потоки як на звичайні потоки, з точки зору програмування нема жодної різниці між платформеним та віртуальним потоком, а то як вони виконуються це відповідальність JVM.

А нічого, ба більше того, JVM копіює увесь стек віртуального потоку у heap. Таким чином стан потоку залишиться незмінним, окрім фактичного потоку-носія

Дякую за статтю. Коли ми використовуємо звичайні потоки, стек зберігається в пам’яті потоку. Тепер, JVM повинна постійно копіювати стек в heap і назад. Як це позначиться на performance?

У порівнянні із розмірами платформених потоків (>512Kb) проблеми із постійним переносом стеку в і з heap нема, бо стек віртуальних потоків на багато менший за платформений. Слід зазначити, але нема якогось навіть базового розміру стеку віртуального потоку, бо його розмір цілковито залежить від того який код в ньому запускається.

Коментар порушує правила спільноти і видалений модераторами.

стек зберігається в пам’яті потоку

І той стек при переключенні контексту порцессора буде вивантажено в cache L1 або інші (L2, L3), тобто ОС при переключенні процесів сама буде то робити. В данному випадку роботу замість ОС робить ВМ, але ж така і була наче ціль — віддати управління потоками повністю ВМ а не ОС.

Тобто вплив на продуктивність теоретично буде мінімальний

Підписатись на коментарі