Тестування навантаження з MQTT

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.

Привіт, друзі. Нещодавно дали мені цікаву задачу — провести тестування навантаження системи, що використовує протокол MQTT. Я навіть написав невеликий пост в блозі про це.

Задача на перший погляд проста — згенерувати навантаження на брокер близько 1000 повідомлень в секунду і промоніторити, що сервіс встигне всі повідомлення вчасно зчитати.

Єдина складність — я раніше ніколи не працював з MQTT. Більше того — ніколи навіть не чув (чи забув, це вже не так важливо). Тому роботу я почав з дослідження, що ж таке MQTT? Тим паче, що вимагався не просто тест, а обов’язково з умовою QoS = 2.

На щастя, навіть вікіпедія дає необхідний мінімум інформації. Це простий протокол для обміну повідомленнями між пристроями по принципу публікації/підписки, найчастіже через TCP/IP. QoS (Quality of Service) — рівні гарантованості доставки повідомлення:
0 — максимум один клієнт відправляє 1 повідомлення і не хвилюється, чи брокер його отримав. Не гарантує доставку, але забезпечує високу швидкість роботи
1 — мінімум один. клієнт відправляє одне й теж повідомлення до тих пір, поки не отримає підтвердження, що брокер його отримав
2 — тільки один. клієнт та брокер гарантують, що лише одне повідомлення буде надіслано і отримано. Найменша швидкість роботи, але гарантований результат.

З протоколом розібрались, наступне питання — чим створити навантаження? Трохи пошукавши, знайшов плагін для JMeter і написав пробний тест: клієнт підключається, публікує повідомлення і віключається. І наче все працює, але швидкість роботи набагато нижча за очікувану, приблизно 200 повідомлень в секунду. Тут є 2 шляхи — робити розподілене тестування чи оптимізувати тест. Оптимізувати здається простіше. Додав в тест Loop Controller і помістив в нього публікацію повідомлень. Вийшло так, що кожен клієнт підключається та в замкнутому циклі публікує повідомлення, поки тест не зупиниш. В такій конфігурації досяг необхідної швидкості і навіть рекордної в 1200 повідомлень за секунду.

І тепер настала черга перевірити, як же тестований сервіс зправляється з таким навантаженням? Очікувано для мене, і не очікуванно для команди розробки — погано. Виправили баги, провели оптимізацію, провели ще кілька тестів, і я все одно помічаю, що в метриках JMeter пише більше повідомлень, ніж отримує брокер, чого не має бути, в нас QoS=2!

Може хтось стикався з подібною проблемою?

І поки розробники продовжують оптимізовувати сервіс, виконую свій план Б. Знайшов реалізацію MQTT клієнта на python та пишу скрипт, що зможе «спамити» велику кількість повідомлень, використовуючи бібліотеку asyncio.

Додам навіть витяг з коду — впевнений, не самий гарний, але зі своєю задачею зправляється чудово.

import asyncio
import sys
import logging
import atexit
import json
import paho.mqtt.client as mqtt
import datetime as dt

logger = logging.getLogger()
logger.setLevel(logging.INFO)

MESSAGE_COUNT = [0]
start_time = dt.datetime.now()

# read configs from file
QOS = 2
MILESTONE = 500
TOPIC, PAYLOAD, HOST, PORT, CREDENTIALS, DELAY, USERS = 'aaa', 'bbb', 'ccc', 1883, ('user', 'password'), 1/10, 10
# ---

@atexit.register
def exit_handler():
    print('\n\n\n================')
    print(f'total messages sent = {MESSAGE_COUNT[0]}')
    print(f'total test time = {(dt.datetime.now() - start_time).seconds} sec')


async def publisher(client_id: int):
    logging.info(f'publisher {client_id} started')
    client = mqtt.Client()
    client.username_pw_set(*CREDENTIALS)
    client.connect(host=HOST, port=PORT)
    client.loop_start()
    logging.info(f'connection status id {client_id} - {client.is_connected()}')

    while True:
        MESSAGE_COUNT[0] += 1
        result = client.publish(topic=TOPIC, payload=PAYLOAD, qos=QOS)
        logging.debug(f'pub result - {result.rc}')
        if MESSAGE_COUNT[0] % MILESTONE == 0:
            delta = dt.datetime.now() - start_time
            logging.warning(f'{MESSAGE_COUNT[0]} messages sent. Rate = {MESSAGE_COUNT[0] / delta.seconds}')

        if MESSAGE_COUNT[0] == 100 * 1000 * 1000:
            exit()
        await asyncio.sleep(DELAY)


async def main():
    logging.info(f'test started at {start_time.strftime("%H:%M:%S")}')

    tasks = list()
    for i in range(USERS):
        tasks.append(publisher(i))
    await asyncio.gather(*tasks)


asyncio.run(main())

Власне, все просто — читаю конфіг з json файлу, створюю метод publisher, що містить MQTT клієнт, потім у функції main збираю необхідну кількість клієнтів та запускаю тест. Скрипт публікує поточне навантаження та при завершенні по Ctrl+C ще й пише статистику завдяки модулю atexit.

Для перевірки, що дійсно використувується QoS=2, написав ще один скрипт, що підписується на топік та читає всі повідомлення. Дуже зручно, що при кожному запуску тесту, і JMeter, і мій скрипт генерують для всіх відправлених повідомлень message id, що завжди починається з 1, тому додав перевірку на id і диво, в обох випадках все ідеально зходиться (хоч JMeter і продовжує в звіті писати, що надіслав більше повідомлень). Витяг з коду:

def message_handler(client, userdata, message: MQTTMessage):
    MESSAGE_COUNT[0] += 1
    logging.debug(
        f'message id = {message.mid} message #{MESSAGE_COUNT[0]:05} qos={message.qos}, payload={message.payload}')
    if message.mid != MESSAGE_COUNT[0]:
        logging.error(f'message id = {message.mid} message #{MESSAGE_COUNT[0]:05}')
        exit()

client = mqtt.Client()
client.username_pw_set('server', 'Pa$$w0rd')
client.on_message = message_handler
client.connect(host='172.27.172.38', port=1883)
client.subscribe(topic=TOPIC, qos=QOS)
client.loop_start()
logging.info(f'connection status - {client.is_connected()}')

Такі справи. Чи тестували ви коли небудь додатки з MQTT? Поділіться своїм досвідом.

👍НравитсяПонравилось4
В избранноеВ избранном5
LinkedIn
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Допустимые теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

А зачем писать если можно взять готовый бенч? Типа github.com/emqx/emqtt-bench

Не побачив
max_inflight_messages_set
max_queued_messages_set
Ними не користувались?

На скількох message per sec все впало?

ні, такого не бачив. Дякую, почитав доку — корисно!

На скількох message per sec все впало?

в перших ітераціях тестовий сервіс не встигав обробляти й пів тисячі за секунду. Який максимум може видати скрипт на пайтон, я не перевіряв ¯\_(ツ)_/¯

Не зовсім ясно, що тестилося?
Клієнт, брокер?

MQTT тре тестувати або на С або Rust
а не python,

я так догадуюся, тестилася пропускна здатність удава з MQTT?

І власне, є кілька брокерів mosquitto, Kafka, etc.
І ще більше клієнтів: mosquitto, paho, mqttс etc,

досяг необхідної швидкості і навіть рекордної в 1200 повідомлень за секунду.

1000 повідомлень в секунду, це десь для Распбері 0.
Для ПК може бути і 100К в секунду і більше.

Загалом вигляда тестування сферичного коня у вакуумі з пудовими гирями на копитах.

Не зовсім ясно, що тестилося?
Клієнт, брокер?

гг, під кожною постом можна знайти людину, що не читала, але засуджує. Вгорі навіть схема є, що тестується ;)

MQTT тре тестувати або на С або Rust
а не python,
MQTT тре тестувати або на С або Rust
а не python,

.. користуватись лінукс, а не віндовс, їсти полуничне морозиво, а не шоколадне. Звідки така категоричність? Інструменти обираються згідно задач і часу, що на них виділено

я так догадуюся, тестилася пропускна здатність удава з MQTT?

ніт. Знову ж таки, читайте уважно

І власне, є кілька брокерів mosquitto, Kafka, etc.

і що? до чого цей комент? ми використовуємо mosquitto, але для посту це взагалі не важливо

І ще більше клієнтів: mosquitto, paho, mqttс etc,

і що? я знаю кунгфу, карате та багато інших страшних слів

1000 повідомлень в секунду, це десь для Распбері 0.
Для ПК може бути і 100К в секунду і більше.

замовник каже — треба, щоб сервіс читав з брокера 1000 в секунду. А ми йому — ні! це рівень расбері, ми проведемо тест на 100К! Ну і що, що вам не треба. І тести на С напишемо. або Rust, бо інакше це тестування «сферичного коня у вакуумі з пудовими гирями на копитах»

я написав що ти можеш досягнути на різному типу HW, так сказати практична межа.

Чи тестували ви коли небудь додатки з MQTT? Поділіться своїм досвідом.

навіщо питати, якщо потім ти посилаєш з тим досвідом в жопу?

Ладно, не буду заважати тестувати чи Пайтон може обслужити 1К запитів чи ні.
Удачі.

Ахаха. У мене є досвід, але я вам нічого конкретного не скажу :р. Друже, не ображайся, але ти не написав нічого конкретного:
* Чим тест на С кращий за пайтон чи жметер? В скільки разів? В яких умовах?
* Ти перелічив клієнти і брокери, але не вказав, наскільки вони ефективні, в яких умовах та сценаріях. Може маєш десь посилання на порівняльний тест?

Маєш конкретний досвід, метрики, посилання — будь-ласка пиши, а як ні — ображайся на здоров’я ;)

я тобі дав дві метрики, тобі не сподобалися, про що далі мова?

мда, ты странный
тебе сказали, что питон может не успевать выгребать сообщения. на сколько сообщений у тебя буфер на сервере, как быстро он переполняется
надеюсь для тебя не есть открытие, что приложение на С на порядок, а то и больше может быть быстрее питона?

а я десь писав, на чому працює сервер?
Чи що мені потрібно було саме вигрібати повідомлення для тесту, а не публікувати?
Так, програма на С буде працювати швидше за аналогічну на будь-якій інтерпретованій мові (на стековерфлоу навіть є тред з вимірами де приводять цифри в100-400 разів), ал повторюсь:

Інструменти обираються згідно задач і часу, що на них виділено

Якщо пайтон чудово зправляється з генерацією 1000 повідомлень, чи треба обов’язково і категорично використовувати С? Як у вас прості робочі задачі виріуюються?
— «Влад, напиши будь-ласка шел скрипт для копіювання логів з однієї директорії в іншу»
— «НІ! напишу програму на С, бо вона буде швидше працювати. На порядок, чи навіть більше»

нащо взагалі напридумували стільки мов і готові інструменти на кшталт JMeter, якщо якщо приходять експерти й кажуть, то все фігня, а от на С працює швидше

— «Влад, напиши будь-ласка шел скрипт для копіювання логів з однієї директорії в іншу»
— «НІ! напишу програму на С, бо вона буде швидше працювати. На порядок, чи навіть більше»

— алексей, напиши нагрузочный тест для эмкутити
— хорошо, сейчас только возьму язык потормознее, чтобы хорошо натрахаться!

— Олексій, поміріяй відстань між центрами отворів для установки унітазу і накерни.
— Зараз, тільки знайду кравецький сантиметр.

Ми розробили таку штуку для внутнішнього використання і заопенсорсили її: github.com/...​atter-mqtt-load-simulator
Якщо треба купу клієнтів підняти — є скрипти для Google Cloud, які піднімають багато машин і створюють дашборд у Grafana, щоб відслідковувати кількість клієнтів і повідомлень.

Сам не тестував, але як варіант ще можна спробувати інструмент для тестування навантаження — k6, він нативно MQTT ще не підтримує, але є робочий екстеншн — github.com/pmalhaire/xk6-mqtt

Подписаться на комментарии