Брокеры сообщений. RabbitMQ

Дмитрий Шурмакин — ML Engineer

Брокеры сообщений

Разработка сервиса на базе ML моделей с использованием очередей является одним из решений для создания масштабируемых и эффективных систем.

Описание проблемы

Представим ситуацию, когда у нас есть веб-сервис, разработанный с использованием FastAPI, который предоставляет API для обработки изображений. Пользователи могут загружать изображения на сервер, и наш сервис должен асинхронно обрабатывать их, например, проводить анализ изображений с использованием моделей машинного обучения и возвращать результаты пользователю.

И вот проблема: когда много людей одновременно запрашивают данные, ваш веб-сайт начинает тормозить. Это происходит потому, что он не успевает вовремя обрабатывать запросы и они копятся в очередь на обработку, тратя ресурсы (оперативную память). Соответственно возрастает время обработки запросов и пользовательские клиенты (например браузер) долго висит пока загружает страницу. При увеличении нагрузки на ваш сервис, время ответа начинает возрастать, так как запросы требуют выполнения сложных и время-затратных операций.

Вот где на помощь приходят брокеры сообщений. Таким образом, когда кто-то отправляет запрос, ваш сервис просто добавляет его в очередь задач. Затем backend worker (например Celery) берет эти задачи из очереди и обрабатывает их в фоновом режиме, не ограничивая ваш сервис, выполнением только текущей задачи.

Немного о теории

Брокер сообщений - это промежуточное программное обеспечение, которое обеспечивает передачу сообщений между различными компонентами приложения или системы. Он играет роль посредника между отправителями и получателями сообщений, обеспечивая их доставку и обработку в соответствии с определенными правилами и механизмами.

Основные функции брокера сообщений включают в себя:

  1. Получение сообщений: Брокер сообщений принимает сообщения от отправителей и обрабатывает их для последующей передачи получателям.
  2. Хранение сообщений: Он временно хранит сообщения до момента их доставки получателям. Это позволяет обеспечить надежность доставки сообщений и обработку их в нужном порядке. В Apache Kafka сообщения хранятся в журнале, где данные хранятся до тех пор, пока не истечет указанный период, сообщения не удаляются после получения, их можно перечитывать. В RabbitMQ сообщение хранится до тех пор, пока подписчик не получит его из очереди, как только он отмечает, что сообщение получено, оно удаляется.
  3. Маршрутизация сообщений: Брокер сообщений определяет, какие сообщения должны быть отправлены на какие адреса (или очереди, разделы) на основе определенных правил и условий.
  4. Доставка сообщений: Он передает сообщения от отправителей к получателям, обеспечивая их доставку в соответствии с требованиями приложения и выбранными гарантиями доставки (например, “At most once”, “At least once”, “Exactly once”).
  5. Управление очередями: Брокер сообщений обеспечивает управление очередями сообщений, регулируя их размеры, жизненный цикл и другие параметры, а также обеспечивая механизмы для управления доступом и приоритетами.
  6. Обеспечение надежности: Брокер сообщений гарантирует доставку сообщений даже при сбоях в сети или приложении. Однако есть нюансы. RabbitMQ менее надежен, чем Kafka. Если сообщение подтверждено в RabbitMQ, но потребитель не успел его обработать и отказал, то сообщение теряется. В Kafka в такой ситуации можно восстановить данные, сдвинув офсет.


Примеры популярных брокеров сообщений включают в себя RabbitMQ, *Apache Kafka, Redis и другие.

Основные виды общения и паттерны обмена сообщениями

Паттерн обмена сообщениями (Message Exchange Pattern):

  • Point-to-Point (P2P): В этом паттерне сообщения отправляются от одного отправителя (издателя) к одному или нескольким получателям (потребителям), но каждое сообщение обрабатывается только одним из получателей.
  • Publish-Subscribe (Pub/Sub): В этом паттерне отправители (издатели) публикуют сообщения, а получатели (подписчики) подписываются на определенные темы. Все подписчики, подписанные на определенную тему, получат копию каждого сообщения, отправленного на эту тему.

Типы общения:

  • Direct: В этом типе общения сообщения отправляются напрямую на определенную очередь или раздел (topic) и получаются в том порядке, в котором они были отправлены.
  • Fanout: Все сообщения, отправленные в раздел (topic), будут доставлены каждому подписчику, независимо от того, на какую тему они подписаны.
  • Topic: В этом типе общения сообщения отправляются на раздел (topic), и они будут доставлены только тем подписчикам, которые подписаны на соответствующую тему или тег.

Коммуникационные модели:

  • Push-based: В этом модели брокер сообщений активно отправляет сообщения потребителям, которые подписались на определенные разделы или очереди.
  • Pull-based: В этой модели потребители запрашивают сообщения у брокера сообщений, когда они готовы к их обработке.

Гарантии доставки:

  • At most once: Сообщения могут быть доставлены один раз, могут быть потеряны, но не будут доставлены несколько раз.
  • At least once: Сообщения гарантированно будут доставлены по крайней мере один раз, но могут быть доставлены несколько раз в результате повторной доставки.
  • Exactly once: Сообщения гарантированно будут доставлены ровно один раз, и дубликаты будут предотвращены.

Apache Kafka и RabbitMQ

– это два разных инструмента для обмена сообщениями, каждый из которых подходит для определенных задач.

Apache Kafka предназначен для работы с огромными объемами данных, когда десятки тысяч и больше сообщений обрабатываются каждую секунду. Это система, которая легко масштабируется и обеспечивает высокую отказоустойчивость, что особенно важно для крупных проектов.

С другой стороны, RabbitMQ проще в установке и настройке. Он хорошо справляется с асинхронным обменом данными в микросервисной архитектуре. RabbitMQ не требует дополнительных компонентов и больших ресурсов, так как сообщения удаляются из очереди после их чтения. Кроме того, он предлагает большие возможности для настройки маршрутизации сообщений. RabbitMQ - это хороший выбор, если вы не имеете особых требований к отказоустойчивости и пропускной способности.

Важно понимать, что нет абсолютно лучшего инструмента - выбор зависит от конкретных потребностей проекта.

RabbitMQ

– это мощный и гибкий брокер сообщений, который широко используется в распределенных системах для обмена данными между различными компонентами приложений. Он реализует стандартный протокол AMQP (Advanced Message Queuing Protocol), что делает его совместимым с широким спектром языков программирования и платформ. Давайте более подробно рассмотрим основные объекты и понятия в RabbitMQ:

Основные объекты и понятия RabbitMQ:

  • Виртуальный хост (Virtual Host)
  • Канал (Channel)
  • Exchange
  • Очередь (Queue)
  • Маршрутизация
  • Сообщение (Message)
  • Издатель (Publisher) и Потребитель (Consumer)

Виртуальный хост (Virtual Host)

  • Виртуальный хост - это логическое разделение брокера RabbitMQ, которое позволяет изолировать ресурсы, доступные приложениям.
  • Каждый виртуальный хост имеет свое пространство имен для обменов, очередей, пользователей и разрешений. Это некая аналогия эндпоинта по которому клиенты обращаются, к брокеру.

Канал (Channel)

  • Канал - это логическое соединение между клиентом и брокером RabbitMQ.
  • Он используется для отправки и получения сообщений, управления очередями и других операций.

Exchange

  • Обмен - это объект, который принимает сообщения от издателей и направляет их в соответствующие очереди.
  • Существует четыре основных типа обменов, каждый из которых определяет различные правила маршрутизации сообщений. Вот их краткое описание:
  1. Direct Exchange:
    • Прямой обмен направляет сообщение в очередь с определенным маршрутом (routing key).
    • Маршрутизация происходит на основе точного соответствия между routing key сообщения и routing key очереди.
  2. Fanout Exchange :
    • Обмен направляет копию каждого сообщения во все связанные с ним очереди.
    • Этот тип обмена игнорирует routing key сообщений.
  3. Topic Exchange:
    • Обмен по теме позволяет отправлять сообщения в очереди на основе шаблонов в форме “topic”.
    • Routing key сообщения представляется в виде строки, содержащей несколько слов, разделенных точками.
    • Очереди связываются с обменом на основе шаблонов, и сообщение отправляется во все очереди, для которых routing key соответствует шаблону.
  4. Headers Exchange:
    • Обмен по заголовкам позволяет маршрутизировать сообщения на основе их заголовков.
    • Маршрутизация происходит на основе сопоставления заголовков сообщения с заданными критериями в правилах связывания очередей с обменом.

import pika

# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Создание обмена с именем 'logs' и типом 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# Создание временной очереди и связывание ее с обменом
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)

Создается временная очередь без имени (параметр queue='') с флагом exclusive=True, что делает ее приватной для соединения и удаляет ее, когда соединение закрывается. Эта очередь затем связывается с обменником ‘logs’

Очередь (Queue)

  • Очередь - это базовый элемент RabbitMQ, который хранит сообщения, ожидающие обработки.
  • Сообщения помещаются в очередь издателями и отправляются потребителям.
  • Очередь имеет название и может быть долговременной (persistent) или временной (temporary).
import pika

# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Создание очереди с именем 'hello'
channel.queue_declare(queue='hello')
print("Очередь 'hello' успешно создана")

Маршрутизация (Routing)

  • Маршрутизация - процесс направления сообщений от обменов к очередям в соответствии с правилами, определенными для каждого обмена.
  • В зависимости от типа обмена, маршрутизация может осуществляться по ключу маршрутизации, заголовкам сообщения или другим критериям.

# Пример маршрутизации сообщений в RabbitMQ с помощью библиотеки pika
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(exchange='my_exchange', routing_key='my_key', body='Hello, RabbitMQ!')

Этот пример демонстрирует базовую отправку сообщения в RabbitMQ.

В параметрах метода basic_publish указаны:

  • exchange='my_exchange': Имя обменника, через который отправляется сообщение.
  • routing_key='my_key': Маршрутизационный ключ, который используется для определения, какое сообщение отправляется в какую очередь.
  • body='Hello, RabbitMQ!': Тело сообщения.

Сообщение (Message)

– это данные, которые передаются между издателями и потребителями через RabbitMQ. Оно состоит из заголовка и тела, где заголовок содержит метаданные, а тело - собственно данные.

Концепции, связанные с обработкой сообщений в системе

  1. Durable (Сохранение): Гарантирует сохранение состояния очереди при перезагрузке сервера. Для этого можно использовать различные методы, такие как сохранение на диск или использование хранилища вроде Redis.
  2. TTL & expiration (Время жизни): Позволяет задать время жизни для сообщений в очереди, после которого они будут удалены. Это полезно для автоматического удаления устаревших сообщений.
  3. ACK (Подтверждение): Механизм подтверждения получения сообщения. Существуют два типа сообщений: те, которые следует удалить сразу после получения (ACK), и те, которые следует удерживать до получения подтверждения обработки.
  4. Dead lettering (Обработка ошибок): Позволяет обрабатывать сообщения, которые не удалось обработать с первого раза, например, из-за ошибок или недоступности внешних сервисов. Такие сообщения возвращаются в очередь с возможностью повторной обработки через определенное время.

Издатель (Publisher) и Потребитель (Consumer)

Издатель (Publisher) - это приложение, которое отправляет сообщения в обмены RabbitMQ.

Потребитель (Consumer) - это приложение, которое принимает сообщения из очередей RabbitMQ для последующей обработки.

Отправка сообщения (Publish) в обмен:

import pika

# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Публикация сообщения в обмен 'logs'
channel.basic_publish(exchange='logs', routing_key='', body='Hello World!')
print("Сообщение успешно опубликовано в обмен 'logs'")

Этот код создает соединение с брокером RabbitMQ и публикует сообщение “Hello World!” в Exchange с именем ‘logs’.

Получение сообщения (Consume) из очереди:

import pika

# Callback-функция, которая будет вызвана при получении сообщения
def callback(ch, method, properties, body):
    print("Получено сообщение:", body)

# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Создание очереди 'hello', если ее нет
channel.queue_declare(queue='hello')

# Указание, что при получении сообщения вызывается функция callback
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print('Ожидание сообщений. Для выхода нажмите CTRL+C')
channel.start_consuming()

Этот код подключается к брокеру RabbitMQ, создает очередь с именем ‘hello’ (если ее нет) и ожидает сообщения. Когда сообщение поступает в очередь ‘hello’, вызывается функция callback, которая просто выводит сообщение в консоль.