Как реализовать прикладной протокол на основе TCP
Всем привет! Меня зовут Алексей Румянцев, я Node.js разработчик в OBRIO. Компания входит в экосистему бизнесов Genesis и занимается разработкой мобильных приложений и игр.
Наиболее известное наше приложение — Nebula — сейчас самое загружаемое в США в нише астрологии. Его используют более 10 млн пользователей в 50 странах. Мы несколько раз занимали первое место в рейтинге лайфстайл-приложений дня в американском и британском App Store, и даже обогнали Tinder. Сейчас команда, включая меня, работает над новым веб-продуктом, который мы анонсируем совсем скоро.
Я начинал свой профессиональный путь с работы в конструкторском бюро при факультете информатики и вычислительной техники КПИ. Там, в частности, занимался разработкой протокола удаленного вызова процедур (RPC) и его прототипированием на Node.js. Удивительным образом сложилось, что на следующей же работе я уже реализовывал прикладной протокол на базе Transmission Control Protocol (TCP). В статье поделюсь своим опытом в решении этой задачи, а также расскажу о вызовах, которые предстали предо мной.
Постановка задачи
Вкратце о проблеме. Приложение использует внешний сервис для получения потоковых данных. Для передачи данных по сети он использует протокол собственной разработки. В таком подходе определенно есть свои плюсы: это производительнее и гораздо оптимальнее по утилизации памяти. Но также есть и очевидный недостаток — сложность в реализации.
Теперь расскажу о методе взаимодействия с сервисом более детально. Все данные передаются по протоколу TCP в бинарном формате. Сначала клиент подключается к серверу, после чего ему необходимо отправить сообщение авторизации и получить соответствующее сообщение с ее результатом. Также для поддержания соединения необходимо раз в минуту отправлять эхо-сообщение, которое служит аналогом пинг-понг-подхода. Каждый пакет протокола имеет заголовок, который начинается байтом 0xab и заканчивается байтом 0xba, и тело. Общение происходит в формате «запрос-ответ», где ответ на отправленный пакет определяется через указание соответствующего ID.
Отмечу, что если разработчик имеет достаточно компетенции и времени, подобный протокол лучше реализовывать на более низком уровне (C++/Rust) и подключить в Node.js в качестве аддона.
Передо мной стояла задача максимально удобно реализовать работу с этим внешним сервисом и использовать его функционал по максимуму. Я хотел создать программный интерфейс, который будет иметь методы:
- для подключения к удаленному серверу;
- для авторизации;
- для инициализации и остановки пинг-понг-процесса;
- для запрашивания исторических данных.
Краткий экскурс в протокол TCP
Протокол TCP находится на транспортном уровне OSI. Его задачи:
- доставить пакеты из одного приложения на узле А в другое приложение на узле Б;
- переотправка потерянных пакетов;
- соблюдение исходного порядка пакетов.
Протокол определяет приложения по специальным идентификаторам ― портам. TCP использует клиент-серверный подход. Сервер мониторит подключения к конкретному порту, а клиенты инициируют эти подключения.
Также важно отметить, что протокол TCP производит фрагментацию пересылаемых данных. Это ― одна из основных проблем при реализации, так как при постоянном трафике пакеты прикладного протокола оказываются разбитыми между разными пакетами TCP и пересылаются по частям. При чем в пакете TCP может идти окончание одного пакета прикладного протокола и начало следующего.
Список всех возможных вариантов фрагментации:
- n-ное количество пакетов целиком;
- n-ное количество пакетов целиком и начало следующего пакета;
- концовка пакета и n-ное количество пакетов целиком;
- концовка пакета и начало следующего пакета;
- концовка пакета, n-ное количество пакетов целиком и начало следующего пакета;
- середина пакета.
Главная сложность состоит в том, чтобы «научить» программу корректно переходить от одного состояния («читаю пакет») к другому («пакет прочитал, жду следующий») и сохранять состояние читаемого пакета (буфер).
Решение задачи
Часть 1. Фрагментация
Как я уже упомянул выше, каждый пакет протокола имеет заголовок и тело. В заголовке указывается размер пакета и его тип.
Решая проблему сбора частей пакета из-за фрагментации на нижнем уровне, я пришел к абстракции Reader. Это объект, который позволяет собрать пакет из частей, основываясь на его заданной длине. Reader работает таким образом:
- при инициализации указывается длина пакета и callback-функция, которая вызывается при прочтении указанного количества байтов;
- через метод take принимается набор байтов и дописывается во внутренний буфер столько байтов, сколько требуется для прочтения заданной длины, или все, если набор байтов меньше необходимой длины. Если же нужное количество байтов прочитано ― вызывается приватный метод end, который инициирует вызов callback-функции.
Ниже представлен код класса Reader:
/* reader.ts */ class Reader { private readonly buffer: Buffer; private offset = 0; private ended = false; constructor( private size: number, private onEnd: (buffer: Buffer) => void ) { this.buffer = Buffer.alloc(size); } take(data: Buffer): number { if (this.ended) { return 0; } const bytesRead = data.copy( this.buffer, this.offset, 0, this.size, ); this.offset += bytesRead; if (this.ready) { this.end(); } return bytesRead; } get ready(): boolean { return this.offset === this.size; } get isEnded(): boolean { return this.ended; } end(): void { this.ended = true; this.onEnd(this.buffer); } }
Оказывается, имея такую простую абстракцию, можно без проблем решить проблему фрагментации пакетов. Все, что нам нужно сделать, — это указать количество байтов, которые обработает читатель, и «скармливать» ему данные пакет за пакетом. Но теперь встает вопрос: как узнать, сколько байтов нам необходимо прочитать?
В нашем случае ответ прост — нужно всего лишь прочитать заголовок. Он имеет одинаковый размер для всех типов пакетов, что делает имплементацию протокола еще проще. Я решил это, расширив базовый класс Reader до класса HeaderReader таким образом: передать в конструктор базового класса первым параметром (размером пакета) значение длины заголовка, которое является константой.
/* header-reader.ts */ class HeaderReader extends Reader { constructor(onEnd: (buffer: Buffer) => void) { super(HEADER_SIZE, onEnd); } }
Последний шаг на этом этапе — реализовать абстракцию, которая будет оборачивать протокол транспортного уровня и читать пакеты из потока прикладного протокола. Для этого я создал класс Connection. Он агрегирует класс Transport, который оборачивает протокол TCP для реализации автоматического переподключения, и подписывается на событие data, знаменующее получение нового TCP-сообщения. Также Connection имеет свойство structs, что является объектом класса rxjs.Subject. Это сделано для удобства обработки пакетов прикладного уровня на следующем уровне абстракции.
/* connection.ts */ class Connection extends EventEmitter { private reader: StructReader; structs = new Subject<{ structType: StructType; buffer: Buffer }>(); constructor(private transport: Transport) { super(); this.reader = new HeaderReader(this.headerOnRead.bind(this)); this.transport.on('data', (data) => this.processData(data)); } private headerOnRead(header: Buffer) { const structSize = Connection.headerGetSize(header); this.reader = new StructReader( structSize, this.structOnRead.bind(this, header) ); } private structOnRead(header: Buffer, struct: Buffer) { const structType = Connection.headerGetType(header); const buffer = Buffer.concat([header, struct]); this.structs.next({ structType, buffer }); } private static headerGetSize(header: Buffer): number { return header.readUInt32LE(2) - HEADER_SIZE; } private static headerGetType(header: Buffer): StructType { return header.readUInt16LE(6); } private processData(data: Buffer) { if (!this.reader || this.reader.isEnded) { this.reader = new HeaderReader(this.headerOnRead.bind(this)); } if (this.reader) { const offset = this.reader.take(data); if (data.length > offset) this.processData(data.slice(offset)); } } async connect(): Promise<void> { await this.transport.connect(); } async close(): Promise<void> { await this.transport.close(); } async send(struct: Struct) { const data = struct.toBuffer(); await this.transport.write(data, 'hex'); } }
Часть 2. Сериализация и десериализация
Одна из задач разрабатываемой библиотеки — преобразование объектов языка программирования (то есть обычных объектов JavaScript) в байтовый формат протокола (Buffer). Здесь замечу, что я хотел представлять структуру пакетов в декларативном виде, поэтому получил следующий формат:
/* authorize-response.schema.ts */ const authorizeResponse = new Schema<AuthorizeResponse>({ version: UINT_32, status: UINT_32, rows: UINT_32, config: new ArrayType<AuthorizeConfig>( 'rows', new Schema<AuthorizeConfig>({ delay: UINT_32, securityType: UINT_32, exchangeId: UINT_32, }) ), });
Каждый пакет описывается в виде так называемой схемы — обертки, которая, имея набор полей и тип каждого поля, умеет преобразовывать объекты в набор байтов, и наоборот. Опускаясь на уровень ниже, вы видите, что каждое поле значением имеет какой-то объект типа. Все объекты типов имеют такой интерфейс:
- метод write — для записи значения в буфер;
- метод parse — для чтения значения из буфера;
- метод size — для определения, сколько байтов займет значение в буфере.
/* data-type.interface.ts */ interface DataType<T> { parse(buffer: Buffer, offset: number, instance: unknown): T; write(buffer: Buffer, value: T, offset: number): number; size(value: T): number; }
Ниже описаны примеры реализации скалярных и составных типов данных:
/* int8.type.ts */ class Int8Type implements DataType<number> { size() { return 1; } parse(buffer: Buffer, offset: number): number { return buffer.readInt8(offset); } write(buffer: Buffer, value: number, offset: number): number { return buffer.writeInt8(value, offset); } } const INT_8 = new Int8Type();
/* string.type.ts */ class StringType implements DataType<string> { parse(buffer: Buffer, offset: number, instance: unknown): string { return buffer.slice(offset, buffer.indexOf(DELIMITER, offset)).toString(); } write(buffer: Buffer, value: string, offset: number): number { return buffer.write(value, offset) + DELIMITER_OFFSET; } size(value: string): number { return value.length + DELIMITER_OFFSET; } } const STRING = new StringType();
/* array.type.ts */ class ArrayType<T> implements DataType<T[]> { constructor(private countProp: string, private schema: Schema<T>) {} parse( buffer: Buffer, offset: number, instance: Record<string, unknown> ): T[] { const array: T[] = []; const count = instance[this.countProp]; if (typeof count !== 'number') { throw new Error('`countProp` references to non-number value'); } for (let i = 0; i < count; ++i) { const value = this.schema.parse(buffer, offset); offset += this.schema.size(value); array.push(value); } return array; } write(buffer: Buffer, value: T[], offset: number): number { return value.reduce( (bytesWritten: number, item) => bytesWritten + this.schema.write(buffer, item, offset + bytesWritten), 0 ); } size(value: T[]): number { return value.reduce((acc: number, item) => acc + this.schema.size(item), 0); } }
Зная, как устроены типы, теперь можно перейти к рассмотрению класса Schema — ключевого элемента в решении проблемы сериализации и десериализации. Интерфейс Schema подобен интерфейсу DataType. Но он также имеет метод create для создания объекта класса Buffer из заданного объекта соответствующего типа.
/* schema.ts */ class Schema<T> { constructor(private definition: SchemaDefinition<T>) {} write(buffer: Buffer, instance: T, offset: number = 0) { let totalBytesWritten = 0; for (const key in this.definition) { if (!this.definition.hasOwnProperty(key)) { continue; } const definition = this.definition[key]; const value = instance[key]; definition.write(buffer, value, offset); const bytesWritten = definition.size(value); offset += bytesWritten; totalBytesWritten += bytesWritten; } return totalBytesWritten; } parse(buffer: Buffer, offset: number = 0): T { const instance: T = Object.create(null); for (const key in this.definition) { if (!this.definition.hasOwnProperty(key)) { continue; } const definition = this.definition[key]; const value = definition.parse(buffer, offset, instance); offset += definition.size(value); Object.defineProperty(instance, key, { value, enumerable: true, }); } return instance; } size(instance: T): number { let size = 0; for (const key in instance) { const definition = this.definition[key]; const value = instance[key]; size += definition.size(value); } return size; } create(instance: T): Buffer { const size = this.size(instance); const buffer = Buffer.alloc(size); this.write(buffer, instance); return buffer; } }
Завершающим «штрихом» композиции является класс Struct, который призван обернуть сериализованные объекты сообщений в пакеты протокола. То есть просто добавить заголовок с соответствующими параметрами размера и типа пакета:
/* struct.ts */ class Struct { constructor(private type: StructType, private payload: Buffer) {} getType() { return this.type; } toBuffer(): Buffer { const size = HEADER_SIZE + this.payload.length; const buffer = Buffer.alloc(size); buffer.writeUInt16LE(0xab); buffer.writeUInt32LE(size, 2); buffer.writeUInt16LE(this.type, 6); buffer.writeUInt16LE(0xba, 8); this.payload.copy(buffer, 10); return buffer; } }
Пример описания ping-пакета:
/* echo.struct.ts */ class EchoStruct extends Struct { constructor(instance: EchoMessage) { super(StructType.STRUCT_ECHO, echoMessage.create(instance)); } }
Часть 3. Клиент
Итак, мы решили проблему фрагментации пакетов, сериализации и десериализации. Теперь нужно собрать все в единый механизм. Для этого я реализовал класс Client — высокоуровневый интерфейс, позволяющий использовать прикладной протокол. Основные функции этого класса:
- чтение и преобразование пакетов;
- отправка сообщений;
- получение ответов на сообщения.
В конструкторе класса Client происходит инициализация объектов Transport и Connection, а также подписка на connection.structs.
/* client.ts#constructor + handleRawStruct */ class Client extends EventEmitter { private readonly transport: Transport; private readonly connection: Connection; private readonly structBus = new StructBus(); private requestId = 1; private heartbeatInterval: NodeJS.Timeout | null = null; constructor(tcpOptions: SocketConnectOpts) { super(); this.transport = new Transport(tcpOptions); this.connection = new Connection(this.transport); this.connection.structs.subscribe(this.handleRawStruct.bind(this)); } private handleRawStruct({ structType, buffer, }: { structType: StructType; buffer: Buffer; }) { const struct = buffer.slice(HEADER_SIZE); this.structBus.push(structType, struct); } }
Обработчик поступающих пакетов передает их во вспомогательный класс StructBus, который по типу пакета выбирает, какую схему использовать для парсинга, и отправляет его в нужный rxjs.Subject, транслирующий только пакеты этого типа.
/* struct-bus.ts */ class StructBus { echoMessages = new Subject<EchoMessage>(); requestErrors = new Subject<RequestError>(); authorizeResponses = new Subject<AuthorizeResponse>(); historyResponses = new Subject<HistoryResponse>(); push(structType: StructType, buffer: Buffer) { switch (structType) { case StructType.STRUCT_ECHO: this.echoMessages.next(echoMessage.parse(buffer)); break; case StructType.STRUCT_REQUEST_ERROR: this.requestErrors.next(requestError.parse(buffer)); break; case StructType.STRUCT_AUTHORIZE_RESPONSE: this.authorizeResponses.next(authorizeResponse.parse(buffer)); break; case StructType.STRUCT_HISTORY_RESPONSE: this.historyResponses.next(historyResponse.parse(buffer)); break; } } }
StructBus позволяет реализовать весь оставшийся функционал клиента.
/* client.ts#echo */ async echo(): Promise<EchoMessage> { await this.connection.send(new EchoStruct({ time: 0 })); return this.structBus.echoMessages.pipe(first()).toPromise(); }
/* client.ts#auth */ async auth(credentials: AuthorizeRequest): Promise<AuthorizeResponse> { await this.connection.send(new AuthStruct(credentials)); const response = await this.structBus.authorizeResponses .pipe(first()) .toPromise(); if (response.status === AuthorizeStatus.REJECT) { throw new Error('Invalid credentials'); } else { return response; } }
/* client.ts#fetchResponse + getHistory*/ private fetchResponse<T extends { id: number }>( requestId: number, subject: Subject<T> ) { return race( subject.pipe(first((response) => response.id === requestId)), this.structBus.requestErrors.pipe( first((response) => response.requestId === requestId) ) ).toPromise(); } async getHistory(args: GetHistoryArgs) { const requestId = this.getNewRequestId(); await this.connection.send( new HistoryRequestStruct({ ...args, id: requestId }) ); return this.fetchResponse(requestId, this.structBus.historyResponses); }
Свой опыт реализации прикладного протокола на базе TCP я попытался передать максимально сжато, но в то же время цельно. Репозиторий с полным примером вы можете найти на Гитхабе. Также я открыт к обсуждениям и с радостью отвечу на ваши вопросы.
19 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів