Стрими в Node.js на реальному прикладі
Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.
Привіт! Мене звати Олександр, я Full-Stack Javascript розробник в NineTwoThree. Хочу поділитися досвідом використання стримів в Node.js. Сподіваюсь, що стаття буде корисною для всіх, хто ще стримів не зачіпав, або зачіпав, але дуже обережно.
Якщо ви загуглите щось на кшталт «nodejs streams», то знайдете 100500 прикладів використання fs.createReadStream
і fs.createWriteStream
. Це, безумовно, правильні приклади, але тут ми їх розглядати не будемо (бо скільки вже можна).
Нещодавно у мене була задача, яку я вирішив купою стримів. Тому ділюсь прикладом «з реального життя».
Задача, яку треба було виконувати
Завдання було просте: генерувати csv-файл на основі даних з бази. І декілька нюансів:
- кількість стовпчиків — приблизно 50;
- кількість рядків — сотні тисяч і ліміту немає (не питайте, як і чим їх відкривають);
- файл потрібно заливати на AWS S3.
CSV — простий текстовий формат. Існує купа ліб, які без проблем переганяють масиви чи об’єкти в такий формат. Але пройдімось потенційними проблемами.
По-перше, витягуючи з бази даних таку кількість записів, та ще й з такою кількістю полів, ми ризикуємо не втиснутись в доступну пам’ять (хай там як, оперативна пам’ять, доступна процесу, завжди обмежена).
По-друге, генерувати csv — значить записувати +/- такий же об’єм даних (просто інакше структурованих) в пам’ять. А отже, ми вдруге ризикуємо вилізти за ліміт.
По-третє, якщо створювати файл локально, а потім вантажити його на AWS S3, то потрібно ще й думати про файлову систему. Скільки її доступно, як не забути видалити файл після вивантаження в зовнішнє сховище і т. д. Це не те щоб проблема, але хотілося б уникнути цього головного болю.
Щоб вирішити проблеми з пам’яттю, можна обробляти записи частинами. Якщо в один момент часу ми будемо працювати зі шматком даних, а не зі всіма одразу, то пам’яті має вистачити.
Щоб не зберігати файл локально, треба одразу відправляти згенеровані частини в мережу.
Тобто, суть реалізації така: витягуємо з бази даних частину записів, кожну частину переганяємо в формат csv і відправляємо на S3. Тим часом витягуємо з бази наступну частину і т. д.
Який напишемо код
Тепер до коду. Витягувати з бази частину записів можна за допомогою limit
і offset
. Але, на щастя, вже є рішення, які роблять це ефективніше та відразу обгортають всю логіку джаваскріпт-стримом. Наприклад, для PostgreSQL є ліба pg-query-stream
, що вигрібає дані за допомогою курсорів.
У моєму проєкті я використовую Objection.js
ОРМ, що своєю чергою використовує Knex.js
query builder. І knex пропонує зручний метод .stream()
, який під капотом скористається послугами pg-query-stream
, якщо конектимось до PostgreSQL СУБД. На жаль, в Objection.js не реалізований такий метод, але є, як мінімум, два воркераунди: будувати запит самим knex (без використання ОРМ), або перегнати Objection.js запит в knex запит за допомогою методу .toKnexQuery()
.
Я обрав другий варіант і мій запит має такий вигляд:
const recordsStream = FileRecord.query() .where({...}) .orderBy("id") .toKnexQuery() .stream();
Тепер в константі recordsStream
маємо PassThrough
стрим. Не знаю, чому саме PassThrough
, але нам то неважливо, головне, що ми можемо читати з нього дані (адже PassThrough
наслідує Readable
).
Stream в Node.js реалізує інтерфейс EventEmitter
, що дозволяє зчитувати дані за допомогою лісенерів:
readable.on(“data”, () => {});
readable.on(“end”, () => {});
readable.on(“error”, () => {});
...
Але, на мій погляд, цікавішим є той факт, що Readable
стрим також має реалізацію Symbol.asyncIterator
. А це означає, що зчитувати дані також можна за допомогою for await...of
циклу:
for await (const record of recordsStream) { processCurrentRecord(record); }
По-перше, такий варіант виглядає читабельнішим. По-друге, маємо більший контроль. Адже наступний шматок даних не буде отриманий, поки не завершиться ітерація з попереднім. Якщо необхідно виконувати якісь асинхронні дії на кожній ітерації, то ми можемо додати await
і бути впевненими, що поки той проміс не зарезолвиться, обробка наступних даних не почнеться:
for await (const record of recordsStream) { await processCurrentRecord(record); }
Knex метод .stream()
видає по одному рядку за раз. Я трішки пошукав, як це можна змінити, але нічого не знайшов. Та в моєму випадку різниці немає, тому довго я на цьому не зупинявся.
Отже, на цьому етапі ми вже маємо обробку рядків один за одним, тому про обсяг доступної пам’яті не переживаємо.
Далі потрібно з тих рядків генерувати csv та відправляти на S3. Погугливши лібу для генерації csv стримом, я знайшов csv-stringify.
import { stringify } from "csv-stringify"; … const stringifier = stringify(); // (1) … stringifier.write(data); // (2) … stringifier.end(); // (3)
Тут в рядку (1) ми створюємо об’єкт класу Stringifier
(це клас бібліотеки), що наслідує Transform
стрим. А отже, далі ми можемо використовувати його як звичайний стрим, тобто писати в нього дані (2) та сповіщати, що даних більше нема (3).
З цим теж розібрались. Залишається передача згенерованих даних на S3.
На щастя, AWS SDK дає можливість завантажувати зі стрима. Тобто, можна передати в аплоад метод як шлях до готового файлу, так і Readable-стрим.
Я не буду тут наводити конкретний приклад використання AWS аплоад-методу, бо у різних версіях SDK підхід абсолютно різний. Але ідея та ж: можна спокійно передавати стрим і бібліотека сама розбереться, що з ним робити:
const uploadPromise = uploadToS3("dest/path/fileName.csv", stringifier); … // write data to stringifier await uploadPromise;
Тут ми «запускаємо» аплоад, потім накидуємо даних в стрим і потім чекаємо, доки все буде відправлено.
Залишається один нюанс. Річ у тім, що коли ми пишемо дані в stringifier
, то ніяк не перевіряємо, чи встигають вони звідти зчитуватись та відправлятись. Якщо швидкість запису буде більшою за швидкість зчитування, то дані будуть накопичуватись в стримі, а точніше, в буфері. Але той буфер має певний розмір (який можна налаштовувати). І, якщо він неконтрольовано переповнюється, то ми знову ризикуємо отримати проблеми з пам’яттю. Це явище відоме під назвою backpressure
(не впевнений, як це правильно перекласти українською).
Щоб цього не сталось, ми маємо перевіряти, чи ще можна писати, чи треба почекати. На щастя, це дуже просто визначити. Метод .write()
, яким ми закидуємо дані в стрим, повертає булеве значення: true
, якщо буфер ще не повний, і false
, якщо повний. Тобто, якщо ми викликаємо .write()
, а він повертає false
, то необхідно зупинитись і почекати, доки буфер не очиститься, а вже потім продовжувати писати.
Але як визначити, коли буфер готовий приймати нові дані? Для цього Writable стрим емітить івент drain
. Отже, як тільки отримали false
від .write()
, навішуємо лісенер івента drain
і продовжуємо тільки після того, як цей івент заемітився. Наприклад, так:
const bufferNotFull = writable.write(record); if (!bufferNotFull) { await new Promise((res) => { writable.once("drain", res); }); }); }
Чесно кажучи, я так і не зміг добитися переповнення буфера на реальному прикладі. Чи то AWS ліба все відразу забирає і зберігає десь в себе, чи то всі дані перед відправкою буферизуються на рівні операційної системи/ мережевої карти... Не було часу з цим розбиратись. Але, теоретично, це може трапитись. Тому краще додати такий запобіжник.
Тепер збираємо все до купи.
import { stringify } from "csv-stringify"; ... const processCurrentRecord = async (record, writable) => { const dataToWrite = prepareRecord(record); const bufferNotFull = writable.write(dataToWrite); if (!bufferNotFull) { await new Promise((res) => { writable.once("drain", res); }); } }; const generateFile = async () => { const stringifier = stringify(); const uploadPromise = uploadToS3("dest/path/fileName.csv", stringifier); stringifier.write(["Column 1", "Column2 ", ...]); // add column names const recordsStream = FileRecord.query() .where({ ... }) .orderBy("id") .toKnexQuery() .stream(); for await (const record of recordsStream) { await processCurrentRecord(record, stringifier); } stringifier.end(); await uploadPromise; };
Замість висновку
Це лише один з можливих варіантів реалізації. Писати в стрім можна також за допомогою асинхронного ітератора. Перевіряти, чи повний буфер, мабуть, можна за допомогою властивості writable.writableNeedDrain
. Теоретично, можна було б імплементнути Transform
стрим. Можна додати обробку еррорів...
Головне — не допустити неконтрольованого росту занятої пам’яті і, відповідно, крашу всього додатка.
19 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів