Как реализовать прикладной протокол на основе TCP
Всем привет! Меня зовут Алексей Румянцев, я Node.js разработчик в OBRIO. Компания входит в экосистему бизнесов Genesis и занимается разработкой мобильных приложений и игр.
Наиболее известное наше приложение — Nebula — сейчас самое загружаемое в США в нише астрологии. Его используют более 10 млн пользователей в 50 странах. Мы несколько раз занимали первое место в рейтинге лайфстайл-приложений дня в американском и британском App Store, и даже обогнали Tinder. Сейчас команда, включая меня, работает над новым веб-продуктом, который мы анонсируем совсем скоро.
Я начинал свой профессиональный путь с работы в конструкторском бюро при факультете информатики и вычислительной техники КПИ. Там, в частности, занимался разработкой протокола удаленного вызова процедур (RPC) и его прототипированием на Node.js. Удивительным образом сложилось, что на следующей же работе я уже реализовывал прикладной протокол на базе Transmission Control Protocol (TCP). В статье поделюсь своим опытом в решении этой задачи, а также расскажу о вызовах, которые предстали предо мной.

Постановка задачи
Вкратце о проблеме. Приложение использует внешний сервис для получения потоковых данных. Для передачи данных по сети он использует протокол собственной разработки. В таком подходе определенно есть свои плюсы: это производительнее и гораздо оптимальнее по утилизации памяти. Но также есть и очевидный недостаток — сложность в реализации.
Теперь расскажу о методе взаимодействия с сервисом более детально. Все данные передаются по протоколу TCP в бинарном формате. Сначала клиент подключается к серверу, после чего ему необходимо отправить сообщение авторизации и получить соответствующее сообщение с ее результатом. Также для поддержания соединения необходимо раз в минуту отправлять эхо-сообщение, которое служит аналогом пинг-понг-подхода. Каждый пакет протокола имеет заголовок, который начинается байтом 0xab и заканчивается байтом 0xba, и тело. Общение происходит в формате «запрос-ответ», где ответ на отправленный пакет определяется через указание соответствующего ID.
Отмечу, что если разработчик имеет достаточно компетенции и времени, подобный протокол лучше реализовывать на более низком уровне (C++/Rust) и подключить в Node.js в качестве аддона.
Передо мной стояла задача максимально удобно реализовать работу с этим внешним сервисом и использовать его функционал по максимуму. Я хотел создать программный интерфейс, который будет иметь методы:
- для подключения к удаленному серверу;
- для авторизации;
- для инициализации и остановки пинг-понг-процесса;
- для запрашивания исторических данных.
Краткий экскурс в протокол TCP
Протокол TCP находится на транспортном уровне OSI. Его задачи:
- доставить пакеты из одного приложения на узле А в другое приложение на узле Б;
- переотправка потерянных пакетов;
- соблюдение исходного порядка пакетов.
Протокол определяет приложения по специальным идентификаторам ― портам. TCP использует клиент-серверный подход. Сервер мониторит подключения к конкретному порту, а клиенты инициируют эти подключения.
Также важно отметить, что протокол TCP производит фрагментацию пересылаемых данных. Это ― одна из основных проблем при реализации, так как при постоянном трафике пакеты прикладного протокола оказываются разбитыми между разными пакетами TCP и пересылаются по частям. При чем в пакете TCP может идти окончание одного пакета прикладного протокола и начало следующего.
Список всех возможных вариантов фрагментации:
- n-ное количество пакетов целиком;
- n-ное количество пакетов целиком и начало следующего пакета;
- концовка пакета и n-ное количество пакетов целиком;
- концовка пакета и начало следующего пакета;
- концовка пакета, n-ное количество пакетов целиком и начало следующего пакета;
- середина пакета.

Главная сложность состоит в том, чтобы «научить» программу корректно переходить от одного состояния («читаю пакет») к другому («пакет прочитал, жду следующий») и сохранять состояние читаемого пакета (буфер).
Решение задачи
Часть 1. Фрагментация
Как я уже упомянул выше, каждый пакет протокола имеет заголовок и тело. В заголовке указывается размер пакета и его тип.
Решая проблему сбора частей пакета из-за фрагментации на нижнем уровне, я пришел к абстракции Reader. Это объект, который позволяет собрать пакет из частей, основываясь на его заданной длине. Reader работает таким образом:
- при инициализации указывается длина пакета и callback-функция, которая вызывается при прочтении указанного количества байтов;
- через метод take принимается набор байтов и дописывается во внутренний буфер столько байтов, сколько требуется для прочтения заданной длины, или все, если набор байтов меньше необходимой длины. Если же нужное количество байтов прочитано ― вызывается приватный метод end, который инициирует вызов callback-функции.
Ниже представлен код класса Reader:
/* reader.ts */
class Reader {
private readonly buffer: Buffer;
private offset = 0;
private ended = false;
constructor(
private size: number,
private onEnd: (buffer: Buffer) => void
) {
this.buffer = Buffer.alloc(size);
}
take(data: Buffer): number {
if (this.ended) {
return 0;
}
const bytesRead = data.copy(
this.buffer,
this.offset,
0,
this.size,
);
this.offset += bytesRead;
if (this.ready) {
this.end();
}
return bytesRead;
}
get ready(): boolean {
return this.offset === this.size;
}
get isEnded(): boolean {
return this.ended;
}
end(): void {
this.ended = true;
this.onEnd(this.buffer);
}
}
Оказывается, имея такую простую абстракцию, можно без проблем решить проблему фрагментации пакетов. Все, что нам нужно сделать, — это указать количество байтов, которые обработает читатель, и «скармливать» ему данные пакет за пакетом. Но теперь встает вопрос: как узнать, сколько байтов нам необходимо прочитать?
В нашем случае ответ прост — нужно всего лишь прочитать заголовок. Он имеет одинаковый размер для всех типов пакетов, что делает имплементацию протокола еще проще. Я решил это, расширив базовый класс Reader до класса HeaderReader таким образом: передать в конструктор базового класса первым параметром (размером пакета) значение длины заголовка, которое является константой.
/* header-reader.ts */
class HeaderReader extends Reader {
constructor(onEnd: (buffer: Buffer) => void) {
super(HEADER_SIZE, onEnd);
}
}
Последний шаг на этом этапе — реализовать абстракцию, которая будет оборачивать протокол транспортного уровня и читать пакеты из потока прикладного протокола. Для этого я создал класс Connection. Он агрегирует класс Transport, который оборачивает протокол TCP для реализации автоматического переподключения, и подписывается на событие data, знаменующее получение нового TCP-сообщения. Также Connection имеет свойство structs, что является объектом класса rxjs.Subject. Это сделано для удобства обработки пакетов прикладного уровня на следующем уровне абстракции.
/* connection.ts */
class Connection extends EventEmitter {
private reader: StructReader;
structs = new Subject<{ structType: StructType; buffer: Buffer }>();
constructor(private transport: Transport) {
super();
this.reader = new HeaderReader(this.headerOnRead.bind(this));
this.transport.on('data', (data) => this.processData(data));
}
private headerOnRead(header: Buffer) {
const structSize = Connection.headerGetSize(header);
this.reader = new StructReader(
structSize,
this.structOnRead.bind(this, header)
);
}
private structOnRead(header: Buffer, struct: Buffer) {
const structType = Connection.headerGetType(header);
const buffer = Buffer.concat([header, struct]);
this.structs.next({ structType, buffer });
}
private static headerGetSize(header: Buffer): number {
return header.readUInt32LE(2) - HEADER_SIZE;
}
private static headerGetType(header: Buffer): StructType {
return header.readUInt16LE(6);
}
private processData(data: Buffer) {
if (!this.reader || this.reader.isEnded) {
this.reader = new HeaderReader(this.headerOnRead.bind(this));
}
if (this.reader) {
const offset = this.reader.take(data);
if (data.length > offset) this.processData(data.slice(offset));
}
}
async connect(): Promise<void> {
await this.transport.connect();
}
async close(): Promise<void> {
await this.transport.close();
}
async send(struct: Struct) {
const data = struct.toBuffer();
await this.transport.write(data, 'hex');
}
}
Часть 2. Сериализация и десериализация
Одна из задач разрабатываемой библиотеки — преобразование объектов языка программирования (то есть обычных объектов JavaScript) в байтовый формат протокола (Buffer). Здесь замечу, что я хотел представлять структуру пакетов в декларативном виде, поэтому получил следующий формат:
/* authorize-response.schema.ts */
const authorizeResponse = new Schema<AuthorizeResponse>({
version: UINT_32,
status: UINT_32,
rows: UINT_32,
config: new ArrayType<AuthorizeConfig>(
'rows',
new Schema<AuthorizeConfig>({
delay: UINT_32,
securityType: UINT_32,
exchangeId: UINT_32,
})
),
});
Каждый пакет описывается в виде так называемой схемы — обертки, которая, имея набор полей и тип каждого поля, умеет преобразовывать объекты в набор байтов, и наоборот. Опускаясь на уровень ниже, вы видите, что каждое поле значением имеет какой-то объект типа. Все объекты типов имеют такой интерфейс:
- метод write — для записи значения в буфер;
- метод parse — для чтения значения из буфера;
- метод size — для определения, сколько байтов займет значение в буфере.
/* data-type.interface.ts */
interface DataType<T> {
parse(buffer: Buffer, offset: number, instance: unknown): T;
write(buffer: Buffer, value: T, offset: number): number;
size(value: T): number;
}
Ниже описаны примеры реализации скалярных и составных типов данных:
/* int8.type.ts */
class Int8Type implements DataType<number> {
size() {
return 1;
}
parse(buffer: Buffer, offset: number): number {
return buffer.readInt8(offset);
}
write(buffer: Buffer, value: number, offset: number): number {
return buffer.writeInt8(value, offset);
}
}
const INT_8 = new Int8Type();
/* string.type.ts */
class StringType implements DataType<string> {
parse(buffer: Buffer, offset: number, instance: unknown): string {
return buffer.slice(offset, buffer.indexOf(DELIMITER, offset)).toString();
}
write(buffer: Buffer, value: string, offset: number): number {
return buffer.write(value, offset) + DELIMITER_OFFSET;
}
size(value: string): number {
return value.length + DELIMITER_OFFSET;
}
}
const STRING = new StringType();
/* array.type.ts */
class ArrayType<T> implements DataType<T[]> {
constructor(private countProp: string, private schema: Schema<T>) {}
parse(
buffer: Buffer,
offset: number,
instance: Record<string, unknown>
): T[] {
const array: T[] = [];
const count = instance[this.countProp];
if (typeof count !== 'number') {
throw new Error('`countProp` references to non-number value');
}
for (let i = 0; i < count; ++i) {
const value = this.schema.parse(buffer, offset);
offset += this.schema.size(value);
array.push(value);
}
return array;
}
write(buffer: Buffer, value: T[], offset: number): number {
return value.reduce(
(bytesWritten: number, item) =>
bytesWritten + this.schema.write(buffer, item, offset + bytesWritten),
0
);
}
size(value: T[]): number {
return value.reduce((acc: number, item) => acc + this.schema.size(item), 0);
}
}
Зная, как устроены типы, теперь можно перейти к рассмотрению класса Schema — ключевого элемента в решении проблемы сериализации и десериализации. Интерфейс Schema подобен интерфейсу DataType. Но он также имеет метод create для создания объекта класса Buffer из заданного объекта соответствующего типа.
/* schema.ts */
class Schema<T> {
constructor(private definition: SchemaDefinition<T>) {}
write(buffer: Buffer, instance: T, offset: number = 0) {
let totalBytesWritten = 0;
for (const key in this.definition) {
if (!this.definition.hasOwnProperty(key)) {
continue;
}
const definition = this.definition[key];
const value = instance[key];
definition.write(buffer, value, offset);
const bytesWritten = definition.size(value);
offset += bytesWritten;
totalBytesWritten += bytesWritten;
}
return totalBytesWritten;
}
parse(buffer: Buffer, offset: number = 0): T {
const instance: T = Object.create(null);
for (const key in this.definition) {
if (!this.definition.hasOwnProperty(key)) {
continue;
}
const definition = this.definition[key];
const value = definition.parse(buffer, offset, instance);
offset += definition.size(value);
Object.defineProperty(instance, key, {
value,
enumerable: true,
});
}
return instance;
}
size(instance: T): number {
let size = 0;
for (const key in instance) {
const definition = this.definition[key];
const value = instance[key];
size += definition.size(value);
}
return size;
}
create(instance: T): Buffer {
const size = this.size(instance);
const buffer = Buffer.alloc(size);
this.write(buffer, instance);
return buffer;
}
}
Завершающим «штрихом» композиции является класс Struct, который призван обернуть сериализованные объекты сообщений в пакеты протокола. То есть просто добавить заголовок с соответствующими параметрами размера и типа пакета:
/* struct.ts */
class Struct {
constructor(private type: StructType, private payload: Buffer) {}
getType() {
return this.type;
}
toBuffer(): Buffer {
const size = HEADER_SIZE + this.payload.length;
const buffer = Buffer.alloc(size);
buffer.writeUInt16LE(0xab);
buffer.writeUInt32LE(size, 2);
buffer.writeUInt16LE(this.type, 6);
buffer.writeUInt16LE(0xba, 8);
this.payload.copy(buffer, 10);
return buffer;
}
}
Пример описания ping-пакета:
/* echo.struct.ts */
class EchoStruct extends Struct {
constructor(instance: EchoMessage) {
super(StructType.STRUCT_ECHO, echoMessage.create(instance));
}
}
Часть 3. Клиент
Итак, мы решили проблему фрагментации пакетов, сериализации и десериализации. Теперь нужно собрать все в единый механизм. Для этого я реализовал класс Client — высокоуровневый интерфейс, позволяющий использовать прикладной протокол. Основные функции этого класса:
- чтение и преобразование пакетов;
- отправка сообщений;
- получение ответов на сообщения.
В конструкторе класса Client происходит инициализация объектов Transport и Connection, а также подписка на connection.structs.
/* client.ts#constructor + handleRawStruct */
class Client extends EventEmitter {
private readonly transport: Transport;
private readonly connection: Connection;
private readonly structBus = new StructBus();
private requestId = 1;
private heartbeatInterval: NodeJS.Timeout | null = null;
constructor(tcpOptions: SocketConnectOpts) {
super();
this.transport = new Transport(tcpOptions);
this.connection = new Connection(this.transport);
this.connection.structs.subscribe(this.handleRawStruct.bind(this));
}
private handleRawStruct({
structType,
buffer,
}: {
structType: StructType;
buffer: Buffer;
}) {
const struct = buffer.slice(HEADER_SIZE);
this.structBus.push(structType, struct);
}
}
Обработчик поступающих пакетов передает их во вспомогательный класс StructBus, который по типу пакета выбирает, какую схему использовать для парсинга, и отправляет его в нужный rxjs.Subject, транслирующий только пакеты этого типа.
/* struct-bus.ts */
class StructBus {
echoMessages = new Subject<EchoMessage>();
requestErrors = new Subject<RequestError>();
authorizeResponses = new Subject<AuthorizeResponse>();
historyResponses = new Subject<HistoryResponse>();
push(structType: StructType, buffer: Buffer) {
switch (structType) {
case StructType.STRUCT_ECHO:
this.echoMessages.next(echoMessage.parse(buffer));
break;
case StructType.STRUCT_REQUEST_ERROR:
this.requestErrors.next(requestError.parse(buffer));
break;
case StructType.STRUCT_AUTHORIZE_RESPONSE:
this.authorizeResponses.next(authorizeResponse.parse(buffer));
break;
case StructType.STRUCT_HISTORY_RESPONSE:
this.historyResponses.next(historyResponse.parse(buffer));
break;
}
}
}
StructBus позволяет реализовать весь оставшийся функционал клиента.
/* client.ts#echo */
async echo(): Promise<EchoMessage> {
await this.connection.send(new EchoStruct({ time: 0 }));
return this.structBus.echoMessages.pipe(first()).toPromise();
}
/* client.ts#auth */
async auth(credentials: AuthorizeRequest): Promise<AuthorizeResponse> {
await this.connection.send(new AuthStruct(credentials));
const response = await this.structBus.authorizeResponses
.pipe(first())
.toPromise();
if (response.status === AuthorizeStatus.REJECT) {
throw new Error('Invalid credentials');
} else {
return response;
}
}
/* client.ts#fetchResponse + getHistory*/
private fetchResponse<T extends { id: number }>(
requestId: number,
subject: Subject<T>
) {
return race(
subject.pipe(first((response) => response.id === requestId)),
this.structBus.requestErrors.pipe(
first((response) => response.requestId === requestId)
)
).toPromise();
}
async getHistory(args: GetHistoryArgs) {
const requestId = this.getNewRequestId();
await this.connection.send(
new HistoryRequestStruct({ ...args, id: requestId })
);
return this.fetchResponse(requestId, this.structBus.historyResponses);
}
Свой опыт реализации прикладного протокола на базе TCP я попытался передать максимально сжато, но в то же время цельно. Репозиторий с полным примером вы можете найти на Гитхабе. Также я открыт к обсуждениям и с радостью отвечу на ваши вопросы.
Сподобалась стаття? Підписуйтесь на автора, щоб отримувати сповіщення про нові публікації на пошту.
19 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів