Стрими в Node.js на реальному прикладі

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Привіт! Мене звати Олександр, я Full-Stack Javascript розробник в NineTwoThree. Хочу поділитися досвідом використання стримів в Node.js. Сподіваюсь, що стаття буде корисною для всіх, хто ще стримів не зачіпав, або зачіпав, але дуже обережно.

Якщо ви загуглите щось на кшталт «nodejs streams», то знайдете 100500 прикладів використання fs.createReadStream і fs.createWriteStream. Це, безумовно, правильні приклади, але тут ми їх розглядати не будемо (бо скільки вже можна).

Нещодавно у мене була задача, яку я вирішив купою стримів. Тому ділюсь прикладом «з реального життя».

Задача, яку треба було виконувати

Завдання було просте: генерувати csv-файл на основі даних з бази. І декілька нюансів:

  • кількість стовпчиків — приблизно 50;
  • кількість рядків — сотні тисяч і ліміту немає (не питайте, як і чим їх відкривають);
  • файл потрібно заливати на AWS S3.

CSV — простий текстовий формат. Існує купа ліб, які без проблем переганяють масиви чи об’єкти в такий формат. Але пройдімось потенційними проблемами.

По-перше, витягуючи з бази даних таку кількість записів, та ще й з такою кількістю полів, ми ризикуємо не втиснутись в доступну пам’ять (хай там як, оперативна пам’ять, доступна процесу, завжди обмежена).

По-друге, генерувати csv — значить записувати +/- такий же об’єм даних (просто інакше структурованих) в пам’ять. А отже, ми вдруге ризикуємо вилізти за ліміт.

По-третє, якщо створювати файл локально, а потім вантажити його на AWS S3, то потрібно ще й думати про файлову систему. Скільки її доступно, як не забути видалити файл після вивантаження в зовнішнє сховище і т. д. Це не те щоб проблема, але хотілося б уникнути цього головного болю.

Щоб вирішити проблеми з пам’яттю, можна обробляти записи частинами. Якщо в один момент часу ми будемо працювати зі шматком даних, а не зі всіма одразу, то пам’яті має вистачити.

Щоб не зберігати файл локально, треба одразу відправляти згенеровані частини в мережу.

Тобто, суть реалізації така: витягуємо з бази даних частину записів, кожну частину переганяємо в формат csv і відправляємо на S3. Тим часом витягуємо з бази наступну частину і т. д.

Який напишемо код

Тепер до коду. Витягувати з бази частину записів можна за допомогою limit і offset. Але, на щастя, вже є рішення, які роблять це ефективніше та відразу обгортають всю логіку джаваскріпт-стримом. Наприклад, для PostgreSQL є ліба pg-query-stream, що вигрібає дані за допомогою курсорів.

У моєму проєкті я використовую Objection.js ОРМ, що своєю чергою використовує Knex.js query builder. І knex пропонує зручний метод .stream(), який під капотом скористається послугами pg-query-stream, якщо конектимось до PostgreSQL СУБД. На жаль, в Objection.js не реалізований такий метод, але є, як мінімум, два воркераунди: будувати запит самим knex (без використання ОРМ), або перегнати Objection.js запит в knex запит за допомогою методу .toKnexQuery().

Я обрав другий варіант і мій запит має такий вигляд:

const recordsStream = FileRecord.query()
  .where({...})
  .orderBy("id")
  .toKnexQuery()
  .stream();

Тепер в константі recordsStream маємо PassThrough стрим. Не знаю, чому саме PassThrough, але нам то неважливо, головне, що ми можемо читати з нього дані (адже PassThrough наслідує Readable).

Stream в Node.js реалізує інтерфейс EventEmitter, що дозволяє зчитувати дані за допомогою лісенерів:

readable.on(“data”, () => {});

readable.on(“end”, () => {});

readable.on(“error”, () => {});

...

Але, на мій погляд, цікавішим є той факт, що Readable стрим також має реалізацію Symbol.asyncIterator. А це означає, що зчитувати дані також можна за допомогою for await...of циклу:

for await (const record of recordsStream) {
  processCurrentRecord(record);
}

По-перше, такий варіант виглядає читабельнішим. По-друге, маємо більший контроль. Адже наступний шматок даних не буде отриманий, поки не завершиться ітерація з попереднім. Якщо необхідно виконувати якісь асинхронні дії на кожній ітерації, то ми можемо додати await і бути впевненими, що поки той проміс не зарезолвиться, обробка наступних даних не почнеться:

for await (const record of recordsStream) {
  await processCurrentRecord(record);
}

Knex метод .stream() видає по одному рядку за раз. Я трішки пошукав, як це можна змінити, але нічого не знайшов. Та в моєму випадку різниці немає, тому довго я на цьому не зупинявся.

Отже, на цьому етапі ми вже маємо обробку рядків один за одним, тому про обсяг доступної пам’яті не переживаємо.

Далі потрібно з тих рядків генерувати csv та відправляти на S3. Погугливши лібу для генерації csv стримом, я знайшов csv-stringify.

import { stringify } from "csv-stringify";
…
const stringifier = stringify(); // (1)
…
stringifier.write(data);         // (2)
…
stringifier.end();               // (3)

Тут в рядку (1) ми створюємо об’єкт класу Stringifier (це клас бібліотеки), що наслідує Transform стрим. А отже, далі ми можемо використовувати його як звичайний стрим, тобто писати в нього дані (2) та сповіщати, що даних більше нема (3).

З цим теж розібрались. Залишається передача згенерованих даних на S3.

На щастя, AWS SDK дає можливість завантажувати зі стрима. Тобто, можна передати в аплоад метод як шлях до готового файлу, так і Readable-стрим.

Я не буду тут наводити конкретний приклад використання AWS аплоад-методу, бо у різних версіях SDK підхід абсолютно різний. Але ідея та ж: можна спокійно передавати стрим і бібліотека сама розбереться, що з ним робити:

const uploadPromise = uploadToS3("dest/path/fileName.csv", stringifier);
… // write data to stringifier
await uploadPromise;

Тут ми «запускаємо» аплоад, потім накидуємо даних в стрим і потім чекаємо, доки все буде відправлено.

Залишається один нюанс. Річ у тім, що коли ми пишемо дані в stringifier, то ніяк не перевіряємо, чи встигають вони звідти зчитуватись та відправлятись. Якщо швидкість запису буде більшою за швидкість зчитування, то дані будуть накопичуватись в стримі, а точніше, в буфері. Але той буфер має певний розмір (який можна налаштовувати). І, якщо він неконтрольовано переповнюється, то ми знову ризикуємо отримати проблеми з пам’яттю. Це явище відоме під назвою backpressure (не впевнений, як це правильно перекласти українською).

Щоб цього не сталось, ми маємо перевіряти, чи ще можна писати, чи треба почекати. На щастя, це дуже просто визначити. Метод .write(), яким ми закидуємо дані в стрим, повертає булеве значення: true, якщо буфер ще не повний, і false, якщо повний. Тобто, якщо ми викликаємо .write(), а він повертає false, то необхідно зупинитись і почекати, доки буфер не очиститься, а вже потім продовжувати писати.

Але як визначити, коли буфер готовий приймати нові дані? Для цього Writable стрим емітить івент drain. Отже, як тільки отримали false від .write(), навішуємо лісенер івента drain і продовжуємо тільки після того, як цей івент заемітився. Наприклад, так:

const bufferNotFull = writable.write(record);
if (!bufferNotFull) {
    await new Promise((res) => {
      writable.once("drain", res);
    });
  });
}

Чесно кажучи, я так і не зміг добитися переповнення буфера на реальному прикладі. Чи то AWS ліба все відразу забирає і зберігає десь в себе, чи то всі дані перед відправкою буферизуються на рівні операційної системи/ мережевої карти... Не було часу з цим розбиратись. Але, теоретично, це може трапитись. Тому краще додати такий запобіжник.

Тепер збираємо все до купи.

import { stringify } from "csv-stringify";
...

const processCurrentRecord = async (record, writable) => {
  const dataToWrite = prepareRecord(record);
  const bufferNotFull = writable.write(dataToWrite);
  if (!bufferNotFull) {
    await new Promise((res) => {
      writable.once("drain", res);
    });
  }
};

const generateFile = async () => {
  const stringifier = stringify();
  const uploadPromise = uploadToS3("dest/path/fileName.csv", stringifier);
  stringifier.write(["Column 1", "Column2 ", ...]); // add column names
  const recordsStream = FileRecord.query()
    .where({ ... })
    .orderBy("id")
    .toKnexQuery()
    .stream();
  for await (const record of recordsStream) {
    await processCurrentRecord(record, stringifier);
  }
  stringifier.end();
  await uploadPromise;
};

Замість висновку

Це лише один з можливих варіантів реалізації. Писати в стрім можна також за допомогою асинхронного ітератора. Перевіряти, чи повний буфер, мабуть, можна за допомогою властивості writable.writableNeedDrain. Теоретично, можна було б імплементнути Transform стрим. Можна додати обробку еррорів...

Головне — не допустити неконтрольованого росту занятої пам’яті і, відповідно, крашу всього додатка.

👍ПодобаєтьсяСподобалось6
До обраногоВ обраному3
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Цей підхід є трошки небезпечним в плані ціни за S3. Не знаю як рахується стрім операція, але якщо вона чарджиться як звичайний put то може влетіти в копійчину робити запит на кожен рядок з бази, треба в балк накопичувати і за раз записувати

Не знаю як рахується стрім операція, але якщо вона чарджиться як звичайний put

А що таке стрім і звичайний запит (put) в http? Типу детектити chunked encoding?

то може влетіти в копійчину робити запит на кожен рядок з баз

Тут же один запит, та і S3 же немає чогось типу append, то як може бути інакше?

не бачу різниці між аплоадом стріма і цілого файла

Думаю ще небезпека у тому, що коли відваліться конект на S3 то прийдеться робити все заново. Тому дійсно краще спочатку писати у локальний файл -> компресити -> відправляти на S3. Згоден, що бувають випадки коли локальний диск не варіант, але це рідко.

Я думаю, що коли логіка виконується на aws ec2 і стрімить на aws s3, то це прямо дуууже нереальний кейс, що відвалиться конекшн.

Якщо код розрахований виключно на роботу в такому середовищі, то, напевно, це не проблема.

Стрими

Потоки або стріми.

еррорів

Помилок, хиб.

«потоки» зазвичай використовують для позначення тредів (threads)

най буде «струмені»

стримів

Чому це stream англіцизм став раптом «стрИмом»?

Теоретично, можна було б імплементнути Transform стрим.

Що є найкращим варіантом, щоб не накручувати різних await/write/end/drain замість готового pipe/pipeline з Transform з готовим backpressure та мати reusable код. Тим більш тут навіть трансформація в prepareRecord синхронна, та якби і ні можна написати простий хелпер типу

const transform = (reducer) => new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    Promise.resolve(reducer.call(this, encoding)).then((_chunk) => callback(null, _chunk), callback);
  },
});

// ...

await pipeline(
  Readable.from(data), // Knex
  transform(async (record) => {
    await setTimeout(200); // якби була тут потрібна якась асинхронщина на промісах
    return 'foo';
  }),
  process.stdout // S3
);
await new Promise((res) => {
writable.once("drain", res);
});

в EE є статичний метод once.

а от редактори ДОУ мені сказали, що стрІм — це не правильно. І виправили на стрИм)))

а от редактори ДОУ мені сказали, що стрІм — це не правильно

Тоді ясно =) Але в українській мові нема такого запозиченого слова, тобто це прямий англіцизм, що має писатись так, щоб воно мало найближче звучання до оригіналу. Але редакторам видніше, просто треба якось притримуватись єдиних правил, бо буде каша в українському IT. Чомусь в російській фонетичне звучання правильне, а у нас своя п’янка))

о, так точно краще. я теж, коли бачу

write/end/drain

— чи точно задача вимагає такого низького рівня?

Хороший приклад.

Але я його робив би по іншому, якраз щоб не морочитись з буферами. З недавнього робив — видати «всі» файли з S3 bucket одним json’ом. і на гора responsу я видаю стрім.

Коротенько
замість

потім накидуємо даних в стрим і потім чекаємо, доки все буде відправлено.

push використовуємо pull стратегію

Куди треба записати? у файл на S3
1. ото нехай там і буде — чи отримання стріма, чи асінхронний генератор
2 .а в тому стріму/генератору — stringify витягує з стріма/асінх генератора 3
3. який є стрімом knex або ще якоюсь обробкою стріма knex

таким чином всі стріми/генератори чекають коли їх смикне запис у файл S3

Іншими словами — кому «більше всіх» треба — ото він нехай і буде ініциатором — «дай мені ще»

Недолік правда — stream.PassThrough() теоретично може забезпечити паралельність роботи — поки з нього хтось вичитав і асінхронно пише(тобто віддав керування нашому единому треду V8) — інший вже вичитав і пише в цей stream.

ну і ще приклад коду нагуглив, з PassThrough і knex:
Programmatically Stream (Upload) Large Files to Amazon S3

Теоретично, можна було б імплементнути Transform стрим

The stream.PassThrough class is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. Its purpose is primarily for examples and testing, but there are some use cases where stream.PassThrough is useful as a building block for novel sorts of streams.

Не зовсім зрозумів. Все одно ж хтось має писати в стрім, щоб потім хтось інший міг з нього витягувати.
Той стрім, який stringify робить. З нього витягує дані авс аплоадер. Але ж в нього треба й писати дані.
Чи щось я не так зрозумів?

з прикладу по лінку
function upload(S3)
let pass = new stream.PassThrough();

let params = {
...
Body: pass
};

S3.upload(params,
...
return pass;
}
readStream.pipe(upload(S3));

readStream читає і пайпить у стрім наданий йому фунцкією upload
питання — а що буде коли readStream читає і пише у стрім швидше аніж S3.upload з нього забирає? приклад з using Knex.js & streaming data from a table там аналогічний.

З нього витягує дані авс аплоадер

ні. ви pushите.
якби він вигягував, то код був би в такому порядку

const recordsStream =
вказуємо його як вхідний для prepareRecord і робимо/огортаємо стрімом — асіхнронним генератором
який вказуємо вхідним для stringifier і робимо стрімом або асіхнронним генератором
і фінал: запускаємо uploadToS3(сюди, звідси)

а у вас у фіналі:

for await (const record of recordsStream)

тобто — при push ініциатором виступає джерело данних (як у вас)
при pull ініціатор підготовленного pipeline — той хто забирає данні

...о, ChatGPT підказав, треба почитати
Backpressuring in Streams
pipe() здається розрулить ситуацію і для push — тобто коли readStream працює швидше за writeStream

pipe() здається розрулить ситуацію і для push

Без цього в них не було б користі, крім як валити сервери по пам’яті) Не розрулить тільки обробку помилок без додаткового коду, на відміну від pipeline. Тому він не рекомендується для звичайних кейсів.

Підписатись на коментарі