Как реализовать прикладной протокол на основе TCP

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Всем привет! Меня зовут Алексей Румянцев, я Node.js разработчик в OBRIO. Компания входит в экосистему бизнесов Genesis и занимается разработкой мобильных приложений и игр.

Наиболее известное наше приложение — Nebula — сейчас самое загружаемое в США в нише астрологии. Его используют более 10 млн пользователей в 50 странах. Мы несколько раз занимали первое место в рейтинге лайфстайл-приложений дня в американском и британском App Store, и даже обогнали Tinder. Сейчас команда, включая меня, работает над новым веб-продуктом, который мы анонсируем совсем скоро.

Я начинал свой профессиональный путь с работы в конструкторском бюро при факультете информатики и вычислительной техники КПИ. Там, в частности, занимался разработкой протокола удаленного вызова процедур (RPC) и его прототипированием на Node.js. Удивительным образом сложилось, что на следующей же работе я уже реализовывал прикладной протокол на базе Transmission Control Protocol (TCP). В статье поделюсь своим опытом в решении этой задачи, а также расскажу о вызовах, которые предстали предо мной.

Постановка задачи

Вкратце о проблеме. Приложение использует внешний сервис для получения потоковых данных. Для передачи данных по сети он использует протокол собственной разработки. В таком подходе определенно есть свои плюсы: это производительнее и гораздо оптимальнее по утилизации памяти. Но также есть и очевидный недостаток — сложность в реализации.

Теперь расскажу о методе взаимодействия с сервисом более детально. Все данные передаются по протоколу TCP в бинарном формате. Сначала клиент подключается к серверу, после чего ему необходимо отправить сообщение авторизации и получить соответствующее сообщение с ее результатом. Также для поддержания соединения необходимо раз в минуту отправлять эхо-сообщение, которое служит аналогом пинг-понг-подхода. Каждый пакет протокола имеет заголовок, который начинается байтом 0xab и заканчивается байтом 0xba, и тело. Общение происходит в формате «запрос-ответ», где ответ на отправленный пакет определяется через указание соответствующего ID.

Отмечу, что если разработчик имеет достаточно компетенции и времени, подобный протокол лучше реализовывать на более низком уровне (C++/Rust) и подключить в Node.js в качестве аддона.

Передо мной стояла задача максимально удобно реализовать работу с этим внешним сервисом и использовать его функционал по максимуму. Я хотел создать программный интерфейс, который будет иметь методы:

  • для подключения к удаленному серверу;
  • для авторизации;
  • для инициализации и остановки пинг-понг-процесса;
  • для запрашивания исторических данных.

Краткий экскурс в протокол TCP

Протокол TCP находится на транспортном уровне OSI. Его задачи:

  • доставить пакеты из одного приложения на узле А в другое приложение на узле Б;
  • переотправка потерянных пакетов;
  • соблюдение исходного порядка пакетов.

Протокол определяет приложения по специальным идентификаторам ― портам. TCP использует клиент-серверный подход. Сервер мониторит подключения к конкретному порту, а клиенты инициируют эти подключения.

Также важно отметить, что протокол TCP производит фрагментацию пересылаемых данных. Это ― одна из основных проблем при реализации, так как при постоянном трафике пакеты прикладного протокола оказываются разбитыми между разными пакетами TCP и пересылаются по частям. При чем в пакете TCP может идти окончание одного пакета прикладного протокола и начало следующего.

Список всех возможных вариантов фрагментации:

  • n-ное количество пакетов целиком;
  • n-ное количество пакетов целиком и начало следующего пакета;
  • концовка пакета и n-ное количество пакетов целиком;
  • концовка пакета и начало следующего пакета;
  • концовка пакета, n-ное количество пакетов целиком и начало следующего пакета;
  • середина пакета.

Главная сложность состоит в том, чтобы «научить» программу корректно переходить от одного состояния («читаю пакет») к другому («пакет прочитал, жду следующий») и сохранять состояние читаемого пакета (буфер).

Решение задачи

Часть 1. Фрагментация

Как я уже упомянул выше, каждый пакет протокола имеет заголовок и тело. В заголовке указывается размер пакета и его тип.

Решая проблему сбора частей пакета из-за фрагментации на нижнем уровне, я пришел к абстракции Reader. Это объект, который позволяет собрать пакет из частей, основываясь на его заданной длине. Reader работает таким образом:

  • при инициализации указывается длина пакета и callback-функция, которая вызывается при прочтении указанного количества байтов;
  • через метод take принимается набор байтов и дописывается во внутренний буфер столько байтов, сколько требуется для прочтения заданной длины, или все, если набор байтов меньше необходимой длины. Если же нужное количество байтов прочитано ― вызывается приватный метод end, который инициирует вызов callback-функции.

Ниже представлен код класса Reader:

/* reader.ts */

class Reader {
 private readonly buffer: Buffer;
 private offset = 0;
 private ended = false;

 constructor(
   private size: number,
   private onEnd: (buffer: Buffer) => void
 ) {
   this.buffer = Buffer.alloc(size);
 }

 take(data: Buffer): number {
   if (this.ended) {
     return 0;
   }

   const bytesRead = data.copy(
     this.buffer,
     this.offset,
     0,
     this.size,
   );
   this.offset += bytesRead;

   if (this.ready) {
     this.end();
   }

   return bytesRead;
 }

 get ready(): boolean {
   return this.offset === this.size;
 }

 get isEnded(): boolean {
   return this.ended;
 }

 end(): void {
   this.ended = true;
   this.onEnd(this.buffer);
 }
}

Оказывается, имея такую простую абстракцию, можно без проблем решить проблему фрагментации пакетов. Все, что нам нужно сделать, — это указать количество байтов, которые обработает читатель, и «скармливать» ему данные пакет за пакетом. Но теперь встает вопрос: как узнать, сколько байтов нам необходимо прочитать?

В нашем случае ответ прост — нужно всего лишь прочитать заголовок. Он имеет одинаковый размер для всех типов пакетов, что делает имплементацию протокола еще проще. Я решил это, расширив базовый класс Reader до класса HeaderReader таким образом: передать в конструктор базового класса первым параметром (размером пакета) значение длины заголовка, которое является константой.

/* header-reader.ts */

class HeaderReader extends Reader {
 constructor(onEnd: (buffer: Buffer) => void) {
   super(HEADER_SIZE, onEnd);
 }
}

Последний шаг на этом этапе — реализовать абстракцию, которая будет оборачивать протокол транспортного уровня и читать пакеты из потока прикладного протокола. Для этого я создал класс Connection. Он агрегирует класс Transport, который оборачивает протокол TCP для реализации автоматического переподключения, и подписывается на событие data, знаменующее получение нового TCP-сообщения. Также Connection имеет свойство structs, что является объектом класса rxjs.Subject. Это сделано для удобства обработки пакетов прикладного уровня на следующем уровне абстракции.

/* connection.ts */

class Connection extends EventEmitter {
  private reader: StructReader;
  structs = new Subject<{ structType: StructType; buffer: Buffer }>();

 constructor(private transport: Transport) {
   super();
   this.reader = new HeaderReader(this.headerOnRead.bind(this));
   this.transport.on('data', (data) => this.processData(data));
 }

 private headerOnRead(header: Buffer) {
   const structSize = Connection.headerGetSize(header);
   this.reader = new StructReader(
     structSize,
     this.structOnRead.bind(this, header)
   );
 }

 private structOnRead(header: Buffer, struct: Buffer) {
   const structType = Connection.headerGetType(header);
   const buffer = Buffer.concat([header, struct]);
   this.structs.next({ structType, buffer });
 }

 private static headerGetSize(header: Buffer): number {
   return header.readUInt32LE(2) - HEADER_SIZE;
 }

 private static headerGetType(header: Buffer): StructType {
   return header.readUInt16LE(6);
 }

 private processData(data: Buffer) {
   if (!this.reader || this.reader.isEnded) {
     this.reader = new HeaderReader(this.headerOnRead.bind(this));
   }
   if (this.reader) {
     const offset = this.reader.take(data);
     if (data.length > offset) this.processData(data.slice(offset));
   }
 }

 async connect(): Promise<void> {
   await this.transport.connect();
 }

 async close(): Promise<void> {
   await this.transport.close();
 }

 async send(struct: Struct) {
   const data = struct.toBuffer();
   await this.transport.write(data, 'hex');
 }
}

Часть 2. Сериализация и десериализация

Одна из задач разрабатываемой библиотеки — преобразование объектов языка программирования (то есть обычных объектов JavaScript) в байтовый формат протокола (Buffer). Здесь замечу, что я хотел представлять структуру пакетов в декларативном виде, поэтому получил следующий формат:

/* authorize-response.schema.ts */

const authorizeResponse = new Schema<AuthorizeResponse>({
 version: UINT_32,
 status: UINT_32,
 rows: UINT_32,
 config: new ArrayType<AuthorizeConfig>(
   'rows',
   new Schema<AuthorizeConfig>({
     delay: UINT_32,
     securityType: UINT_32,
     exchangeId: UINT_32,
   })
 ),
});

Каждый пакет описывается в виде так называемой схемы — обертки, которая, имея набор полей и тип каждого поля, умеет преобразовывать объекты в набор байтов, и наоборот. Опускаясь на уровень ниже, вы видите, что каждое поле значением имеет какой-то объект типа. Все объекты типов имеют такой интерфейс:

  • метод write — для записи значения в буфер;
  • метод parse — для чтения значения из буфера;
  • метод size — для определения, сколько байтов займет значение в буфере.
/* data-type.interface.ts */

interface DataType<T> {
 parse(buffer: Buffer, offset: number, instance: unknown): T;

 write(buffer: Buffer, value: T, offset: number): number;

 size(value: T): number;
}

Ниже описаны примеры реализации скалярных и составных типов данных:

/* int8.type.ts  */

class Int8Type implements DataType<number> {
 size() {
   return 1;
 }

 parse(buffer: Buffer, offset: number): number {
   return buffer.readInt8(offset);
 }

 write(buffer: Buffer, value: number, offset: number): number {
   return buffer.writeInt8(value, offset);
 }
}

const INT_8 = new Int8Type();
/* string.type.ts */

class StringType implements DataType<string> {
 parse(buffer: Buffer, offset: number, instance: unknown): string {
   return buffer.slice(offset, buffer.indexOf(DELIMITER, offset)).toString();
 }

 write(buffer: Buffer, value: string, offset: number): number {
   return buffer.write(value, offset) + DELIMITER_OFFSET;
 }

 size(value: string): number {
   return value.length + DELIMITER_OFFSET;
 }
}

const STRING = new StringType();
/* array.type.ts */

class ArrayType<T> implements DataType<T[]> {
 constructor(private countProp: string, private schema: Schema<T>) {}

 parse(
   buffer: Buffer,
   offset: number,
   instance: Record<string, unknown>
 ): T[] {
   const array: T[] = [];
   const count = instance[this.countProp];
   if (typeof count !== 'number') {
     throw new Error('`countProp` references to non-number value');
   }
   for (let i = 0; i < count; ++i) {
     const value = this.schema.parse(buffer, offset);
     offset += this.schema.size(value);
     array.push(value);
   }
   return array;
 }

 write(buffer: Buffer, value: T[], offset: number): number {
   return value.reduce(
     (bytesWritten: number, item) =>
       bytesWritten + this.schema.write(buffer, item, offset + bytesWritten),
     0
   );
 }

 size(value: T[]): number {
   return value.reduce((acc: number, item) => acc + this.schema.size(item), 0);
 }
}

Зная, как устроены типы, теперь можно перейти к рассмотрению класса Schema — ключевого элемента в решении проблемы сериализации и десериализации. Интерфейс Schema подобен интерфейсу DataType. Но он также имеет метод create для создания объекта класса Buffer из заданного объекта соответствующего типа.

/* schema.ts */

class Schema<T> {
 constructor(private definition: SchemaDefinition<T>) {}

 write(buffer: Buffer, instance: T, offset: number = 0) {
   let totalBytesWritten = 0;

   for (const key in this.definition) {
     if (!this.definition.hasOwnProperty(key)) {
       continue;
     }
     const definition = this.definition[key];
     const value = instance[key];
     definition.write(buffer, value, offset);
     const bytesWritten = definition.size(value);
     offset += bytesWritten;
     totalBytesWritten += bytesWritten;
   }

   return totalBytesWritten;
 }

 parse(buffer: Buffer, offset: number = 0): T {
   const instance: T = Object.create(null);

   for (const key in this.definition) {
     if (!this.definition.hasOwnProperty(key)) {
       continue;
     }
     const definition = this.definition[key];
     const value = definition.parse(buffer, offset, instance);
     offset += definition.size(value);
     Object.defineProperty(instance, key, {
       value,
       enumerable: true,
     });
   }

   return instance;
 }

 size(instance: T): number {
   let size = 0;

   for (const key in instance) {
     const definition = this.definition[key];
     const value = instance[key];
     size += definition.size(value);
   }

   return size;
 }

 create(instance: T): Buffer {
   const size = this.size(instance);
   const buffer = Buffer.alloc(size);
   this.write(buffer, instance);
   return buffer;
 }
}

Завершающим «штрихом» композиции является класс Struct, который призван обернуть сериализованные объекты сообщений в пакеты протокола. То есть просто добавить заголовок с соответствующими параметрами размера и типа пакета:

/* struct.ts */

class Struct {
 constructor(private type: StructType, private payload: Buffer) {}

 getType() {
   return this.type;
 }

 toBuffer(): Buffer {
   const size = HEADER_SIZE + this.payload.length;
   const buffer = Buffer.alloc(size);
   buffer.writeUInt16LE(0xab);
   buffer.writeUInt32LE(size, 2);
   buffer.writeUInt16LE(this.type, 6);
   buffer.writeUInt16LE(0xba, 8);
   this.payload.copy(buffer, 10);
   return buffer;
 }
}

Пример описания ping-пакета:

/* echo.struct.ts */

class EchoStruct extends Struct {
 constructor(instance: EchoMessage) {
   super(StructType.STRUCT_ECHO, echoMessage.create(instance));
 }
}

Часть 3. Клиент

Итак, мы решили проблему фрагментации пакетов, сериализации и десериализации. Теперь нужно собрать все в единый механизм. Для этого я реализовал класс Client — высокоуровневый интерфейс, позволяющий использовать прикладной протокол. Основные функции этого класса:

  • чтение и преобразование пакетов;
  • отправка сообщений;
  • получение ответов на сообщения.

В конструкторе класса Client происходит инициализация объектов Transport и Connection, а также подписка на connection.structs.

/* client.ts#constructor + handleRawStruct */

class Client extends EventEmitter {
 private readonly transport: Transport;
 private readonly connection: Connection;
 private readonly structBus = new StructBus();

 private requestId = 1;
 private heartbeatInterval: NodeJS.Timeout | null = null;

 constructor(tcpOptions: SocketConnectOpts) {
   super();

   this.transport = new Transport(tcpOptions);
   this.connection = new Connection(this.transport);
  this.connection.structs.subscribe(this.handleRawStruct.bind(this));
 }

 private handleRawStruct({
   structType,
   buffer,
 }: {
   structType: StructType;
   buffer: Buffer;
 }) {
   const struct = buffer.slice(HEADER_SIZE);
   this.structBus.push(structType, struct);
 }
}

Обработчик поступающих пакетов передает их во вспомогательный класс StructBus, который по типу пакета выбирает, какую схему использовать для парсинга, и отправляет его в нужный rxjs.Subject, транслирующий только пакеты этого типа.

/* struct-bus.ts */

class StructBus {
 echoMessages = new Subject<EchoMessage>();
 requestErrors = new Subject<RequestError>();
 authorizeResponses = new Subject<AuthorizeResponse>();
 historyResponses = new Subject<HistoryResponse>();

 push(structType: StructType, buffer: Buffer) {
   switch (structType) {
     case StructType.STRUCT_ECHO:
       this.echoMessages.next(echoMessage.parse(buffer));
       break;
     case StructType.STRUCT_REQUEST_ERROR:
       this.requestErrors.next(requestError.parse(buffer));
       break;
     case StructType.STRUCT_AUTHORIZE_RESPONSE:
       this.authorizeResponses.next(authorizeResponse.parse(buffer));
       break;
     case StructType.STRUCT_HISTORY_RESPONSE:
       this.historyResponses.next(historyResponse.parse(buffer));
       break;
   }
 }
}

StructBus позволяет реализовать весь оставшийся функционал клиента.

/* client.ts#echo */

async echo(): Promise<EchoMessage> {
 await this.connection.send(new EchoStruct({ time: 0 }));
 return this.structBus.echoMessages.pipe(first()).toPromise();
}
/* client.ts#auth */

async auth(credentials: AuthorizeRequest): Promise<AuthorizeResponse> {
 await this.connection.send(new AuthStruct(credentials));
 const response = await this.structBus.authorizeResponses
   .pipe(first())
   .toPromise();
 if (response.status === AuthorizeStatus.REJECT) {
   throw new Error('Invalid credentials');
 } else {
   return response;
 }
}
/* client.ts#fetchResponse + getHistory*/

private fetchResponse<T extends { id: number }>(
 requestId: number,
 subject: Subject<T>
) {
 return race(
   subject.pipe(first((response) => response.id === requestId)),
   this.structBus.requestErrors.pipe(
     first((response) => response.requestId === requestId)
   )
 ).toPromise();
}

async getHistory(args: GetHistoryArgs) {
 const requestId = this.getNewRequestId();
 await this.connection.send(
   new HistoryRequestStruct({ ...args, id: requestId })
 );
 return this.fetchResponse(requestId, this.structBus.historyResponses);
}

Свой опыт реализации прикладного протокола на базе TCP я попытался передать максимально сжато, но в то же время цельно. Репозиторий с полным примером вы можете найти на Гитхабе. Также я открыт к обсуждениям и с радостью отвечу на ваши вопросы.

👍НравитсяПонравилось11
В избранноеВ избранном4
LinkedIn
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Протокол определяет приложения по специальным идентификаторам ― портам. TCP использует клиент-серверный подход. Сервер мониторит подключения к конкретному порту, а клиенты инициируют эти подключения.

Вот тут много всяких программистов сидит, подскажите, а то я ничего не понял, этот дяденька всерьёз всё пишет?

Данное объяснение не претендует на абсолютную точность и предназначено для того, чтобы в общих чертах донести, что из себя представляет протокол TCP.

простите что это было?

Министатья

Как реализовать прикладной протокол на основе TCP

к.м.к. на доу тоже пора заводить уровень «мини песочница» ))

Протокол TCP находится на транспортном уровне OSI. Его задачи:

доставить пакеты;
переотправка потерянных пакетов;
соблюдение исходного порядка пакетов.

Еще одна задача протокола tcp — организовать Flow Control, чтобы принимающая сторона случайно не захлебнулась. В двух словах: если трансмиттер отправляет данные быстрее, чем бизнес логика ресивера успевает их обработать, то передача данных останавливается (пока в буфере ресивера снова не появится достаточно свободного места).

Ваш клиент ломает flow control на прикладном уровне — вы всегда готовы обрабатывать событие data, как сильно бы вас не дудосили. Это частая боль программ написанных на событиях и коллбеках. Когда работаешь с сокетами синхронно (вызвал блокирующий socket.read(), обработал кусок данных, опять вызвал read()), то флоу контроль становится прозрачным для юзера и идеально работает. Нода же у себя под капотом дергает этот read() каждый раз, как только в сокете появляются свежие данные, и ей без разницы, как вы с этим справляетесь.

Если такая проблема у вас произойдет, то, скорее всего, Connection.structs сьест всю память и вы поймаете OOM kill. Чтобы это пофиксить, придется реализовать лапшу из коллбеков хитроумный алгоритм, который будет вызывать socket.pause() и socket.resume()

Также важно отметить, что протокол TCP производит фрагментацию пересылаемых данных.

В общем все так, но если уж быть совсем занудой, то tcp ничего не фрагментирует, и нет понятия tcp пакета. Он просто скидывает все в единый поток из байтиков. Этот поток фрагментируется на куски нодой, когда ивент луп доходит до обработки события «в сокете шото появилось» и выгребает все содержимое tcp буффера в новый обьект.

В общем все так, но если уж быть совсем занудой, то tcp ничего не фрагментирует,

Если быть совсем занудой, то фрагментирует на куски не более чем мaximum segment size (MSS). Размер которого в свою очередь приколочен гвоздями к MTU минус размеры заголовков IP и TCP.

да, я неправильно сформулировал мысль. Имел в виду то, что после транспортировки по сети пейлоады снова сливаются в один стрим в кернел буфере, и пользователь с этими фрагментами уже не работает

Не совсем так. Представим что у нас пользовательское сообщение занимает условные 800 байт и передающая сторона отправляет два таких сообщения. В MSS пакета поместится первое сообщение и часть второго. Которое немедленно отправится к приемнику. Вторая часть второго сообщения будет висеть в стеке до наступления одного из следующих событий:
— получение ACK от приемной стороны
— отправка после дозаписи новых данных в сокет (Nagle’s algorithm)
— таймаут
Не стоит забывать что этой второй части нужно еще добраться до приемника и не быть дропнутым или доставленным гораздо позже сетевой аппаратурой между передатчиком и приемником. В это время первый пакет (первое сообщение и часть второго) вполне может добраться до приемника и попасть в буфер сокета точно в таком виде, в котором оно было в MSS.

Если быть совсем занудой, то есть такая штука как www.kernel.org/...​egmentation-offloads.html

Если она поддерживается сетевым устройством, то оно будет само пересобирать отдельные TCP пакеты в большой пакет и отдавать в таком виде операционной системе. Приложение может получить данные блоками, которые больше, чем MSS (при условии, что размеры буфера сокета установлены соответствующие).

Если она поддерживается сетевым устройством, то оно будет само пересобирать отдельные TCP пакеты в большой пакет и отдавать в таком виде операционной системе. Приложение может получить данные блоками, которые больше, чем MSS (при условии, что размеры буфера сокета установлены соответствующие).

это не совсем так здесь (Segmentation Offload) речь идёт скорее об части общего TCP Offload Engine основная идея которого в том что центральный процессор в формировании tcp/ip вообще не участвует полностью передая (offload) это всё процессору сетевой карты а со стороны cpu это выглядит просто как посыл нужного куска данных фактически любого размера и просто как приём входящего tcp потока без участия cpu

конкретно то об чём ты говоришь

то оно будет само пересобирать отдельные TCP пакеты в большой пакет и отдавать в таком виде операционной системе. Приложение может получить данные блоками, которые больше, чем MSS

это есть LRO large receive offload который кстати не обязательно аппаратный но может быть и чисто программный реализуемый на уровне операционка ядра которое просто уменьшает таким путём количество системных вызовов обратно на уровень приложения

т.е. в принципе воддержка железом сетевой карты даже не обязательна

вообще это всё основывается в т.ч. на том что сам tcp уже достаточно не прост и стандарт и допускает отсылку только одного ack/nak на несколько входящих пакетов ок даже не совсем так а точнее позволяет отправлять все ack на целый пакован входящих но с задержкой и соотв. единым пакетом ethernet да ещё и вешать на него ответ уже data пакет ))

но на самом деле это всё очень интересно но только это просто пустая теория на уровне приложения уровня node.js которое вряд ли способно переваривать потоки 10 гигабит и с соотв. уровнями задержки тоже для этого уровня достаточно лишь того что

— tcp это просто поток двунаправленый
— этот поток может фрагментироваться и это нормально

а это азы

т.е. весь «вопрос фрагментации» состоит лишь в том что исходящий пакет из точки А в точке Б может быть принят в виде нескольких разрознённых кусков именно на уровне сокета а всё что под ним к делу совершенно не относится от слова совсем

ЗЫ: так на пример лично я сомневаюсь что ситуации с «началом следующего пакета» кто-то реально реализовывал бы б на сетевом уровне самой операционки

s.dou.ua/...​-files/image1_wrWLPuF.jpg

т.е. в этом месте можно попытаться применить TCP_NODELAY при этом крайне желательно понимая что именно и для чего именно это делается и ещё и имея конкретные метрики по конкретному приложению но боюсь конкретно сабж описан именно по вопросу пакетов как

Главная сложность состоит в том, чтобы «научить» программу корректно переходить от одного состояния («читаю пакет») к другому («пакет прочитал, жду следующий») и сохранять состояние читаемого пакета (буфер).

т.е. вся речь идёт всего лишь об том что входящий поток просто непрерывный «сырой» поток и что нам надо из него просто выцеплять наши пакеты просто последовательно просто разбираясь самим где начало пакета и где конец

как простой примитивный пример мы можем пересылать пакет уровня приложения в виде сколько там байт заголовка обозначающего длину пакета т.е. сперва читаем скажем 4 байт и это будет длина х и дальше просто единым вызовом читаем х байт на уровне сокета и всё дальше сама операционка сделает всю магию положив нам в буфер только заданное количество байт раз за разом

в более продвинутом случае мы просто делаем то же ж самое на уровне самого приложения разбивая работу на 2 этапа № 1 прочесть из сокета всё что есть и сложить в свой буфер и оповестить дальше № 2 посмотреть в буфер и продолжить разбор не полного пакета по ходу если таковое возможно либо просто подождать следующего блока пока данных будет уже достаточно

к.м.к. вариант с много ходовкой для приложения уровня сервера js как бекенда для мобильного приложения вряд ли нужен к тому же ж я не знаю не реализован ли он уже внутри самого js что вполне возможно учитывая его асинхронную природу на примере сабжевого кода

Благодарю, очень хорошее замечание касательно flow control. Это действительно стоит учитывать в подобных решениях

И почему нельзя было использовать protobuf ?

Приложение использует внешний сервис для получения потоковых данных. Для передачи данных по сети он использует протокол собственной разработки.

Отличное изобретение. Предлагаю назвать его Velosiped

Также я открыт к обсуждениям и с радостью отвечу на ваши вопросы

Упаковку структуры в big-endian лучше сделать согласно RFC1700.

The convention in the documentation of Internet Protocols is to
express numbers in decimal and to picture data in «big-endian» order

P.S.: ну и волшебные константы признаков начала/конца заголовка вынести в какие-то переменные с читаемым именем. Что-то типа
#define PROTO_SOH 0xAB
#define PROTO_EOH 0xBA

Подписаться на комментарии