Тестування навантаження з MQTT
Привіт, друзі. Нещодавно дали мені цікаву задачу — провести тестування навантаження системи, що використовує протокол MQTT. Я навіть написав невеликий пост в блозі про це.
Задача на перший погляд проста — згенерувати навантаження на брокер близько 1000 повідомлень в секунду і промоніторити, що сервіс встигне всі повідомлення вчасно зчитати.
Єдина складність — я раніше ніколи не працював з MQTT. Більше того — ніколи навіть не чув (чи забув, це вже не так важливо). Тому роботу я почав з дослідження, що ж таке MQTT? Тим паче, що вимагався не просто тест, а обов’язково з умовою QoS = 2.
На щастя, навіть вікіпедія дає необхідний мінімум інформації. Це простий протокол для обміну повідомленнями між пристроями по принципу публікації/підписки, найчастіже через TCP/IP. QoS (Quality of Service) — рівні гарантованості доставки повідомлення:
0 — максимум один клієнт відправляє 1 повідомлення і не хвилюється, чи брокер його отримав. Не гарантує доставку, але забезпечує високу швидкість роботи
1 — мінімум один. клієнт відправляє одне й теж повідомлення до тих пір, поки не отримає підтвердження, що брокер його отримав
2 — тільки один. клієнт та брокер гарантують, що лише одне повідомлення буде надіслано і отримано. Найменша швидкість роботи, але гарантований результат.
З протоколом розібрались, наступне питання — чим створити навантаження? Трохи пошукавши, знайшов плагін для JMeter і написав пробний тест: клієнт підключається, публікує повідомлення і віключається. І наче все працює, але швидкість роботи набагато нижча за очікувану, приблизно 200 повідомлень в секунду. Тут є 2 шляхи — робити розподілене тестування чи оптимізувати тест. Оптимізувати здається простіше. Додав в тест Loop Controller і помістив в нього публікацію повідомлень. Вийшло так, що кожен клієнт підключається та в замкнутому циклі публікує повідомлення, поки тест не зупиниш. В такій конфігурації досяг необхідної швидкості і навіть рекордної в 1200 повідомлень за секунду.
І тепер настала черга перевірити, як же тестований сервіс зправляється з таким навантаженням? Очікувано для мене, і не очікуванно для команди розробки — погано. Виправили баги, провели оптимізацію, провели ще кілька тестів, і я все одно помічаю, що в метриках JMeter пише більше повідомлень, ніж отримує брокер, чого не має бути, в нас QoS=2!
Може хтось стикався з подібною проблемою?
І поки розробники продовжують оптимізовувати сервіс, виконую свій план Б. Знайшов реалізацію MQTT клієнта на python та пишу скрипт, що зможе «спамити» велику кількість повідомлень, використовуючи бібліотеку asyncio.
Додам навіть витяг з коду — впевнений, не самий гарний, але зі своєю задачею зправляється чудово.
import asyncio import sys import logging import atexit import json import paho.mqtt.client as mqtt import datetime as dt logger = logging.getLogger() logger.setLevel(logging.INFO) MESSAGE_COUNT = [0] start_time = dt.datetime.now() # read configs from file QOS = 2 MILESTONE = 500 TOPIC, PAYLOAD, HOST, PORT, CREDENTIALS, DELAY, USERS = 'aaa', 'bbb', 'ccc', 1883, ('user', 'password'), 1/10, 10 # --- @atexit.register def exit_handler(): print('\n\n\n================') print(f'total messages sent = {MESSAGE_COUNT[0]}') print(f'total test time = {(dt.datetime.now() - start_time).seconds} sec') async def publisher(client_id: int): logging.info(f'publisher {client_id} started') client = mqtt.Client() client.username_pw_set(*CREDENTIALS) client.connect(host=HOST, port=PORT) client.loop_start() logging.info(f'connection status id {client_id} - {client.is_connected()}') while True: MESSAGE_COUNT[0] += 1 result = client.publish(topic=TOPIC, payload=PAYLOAD, qos=QOS) logging.debug(f'pub result - {result.rc}') if MESSAGE_COUNT[0] % MILESTONE == 0: delta = dt.datetime.now() - start_time logging.warning(f'{MESSAGE_COUNT[0]} messages sent. Rate = {MESSAGE_COUNT[0] / delta.seconds}') if MESSAGE_COUNT[0] == 100 * 1000 * 1000: exit() await asyncio.sleep(DELAY) async def main(): logging.info(f'test started at {start_time.strftime("%H:%M:%S")}') tasks = list() for i in range(USERS): tasks.append(publisher(i)) await asyncio.gather(*tasks) asyncio.run(main())
Власне, все просто — читаю конфіг з json файлу, створюю метод publisher, що містить MQTT клієнт, потім у функції main збираю необхідну кількість клієнтів та запускаю тест. Скрипт публікує поточне навантаження та при завершенні по Ctrl+C ще й пише статистику завдяки модулю atexit.
Для перевірки, що дійсно використувується QoS=2, написав ще один скрипт, що підписується на топік та читає всі повідомлення. Дуже зручно, що при кожному запуску тесту, і JMeter, і мій скрипт генерують для всіх відправлених повідомлень message id, що завжди починається з 1, тому додав перевірку на id і диво, в обох випадках все ідеально зходиться (хоч JMeter і продовжує в звіті писати, що надіслав більше повідомлень). Витяг з коду:
def message_handler(client, userdata, message: MQTTMessage): MESSAGE_COUNT[0] += 1 logging.debug( f'message id = {message.mid} message #{MESSAGE_COUNT[0]:05} qos={message.qos}, payload={message.payload}') if message.mid != MESSAGE_COUNT[0]: logging.error(f'message id = {message.mid} message #{MESSAGE_COUNT[0]:05}') exit() client = mqtt.Client() client.username_pw_set('server', 'Pa$$w0rd') client.on_message = message_handler client.connect(host='172.27.172.38', port=1883) client.subscribe(topic=TOPIC, qos=QOS) client.loop_start() logging.info(f'connection status - {client.is_connected()}')
Такі справи. Чи тестували ви коли небудь додатки з MQTT? Поділіться своїм досвідом.
17 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів