Разработка сервиса на базе ML моделей с использованием очередей является одним из решений для создания масштабируемых и эффективных систем.
Описание проблемы
Представим ситуацию, когда у нас есть веб-сервис, разработанный с использованием FastAPI, который предоставляет API для обработки изображений. Пользователи могут загружать изображения на сервер, и наш сервис должен асинхронно обрабатывать их, например, проводить анализ изображений с использованием моделей машинного обучения и возвращать результаты пользователю.
И вот проблема: когда много людей одновременно запрашивают данные, ваш веб-сайт начинает тормозить. Это происходит потому, что он не успевает вовремя обрабатывать запросы и они копятся в очередь на обработку, тратя ресурсы (оперативную память). Соответственно возрастает время обработки запросов и пользовательские клиенты (например браузер) долго висит пока загружает страницу. При увеличении нагрузки на ваш сервис, время ответа начинает возрастать, так как запросы требуют выполнения сложных и время-затратных операций.
Вот где на помощь приходят брокеры сообщений. Таким образом, когда кто-то отправляет запрос, ваш сервис просто добавляет его в очередь задач. Затем backend worker (например Celery) берет эти задачи из очереди и обрабатывает их в фоновом режиме, не ограничивая ваш сервис, выполнением только текущей задачи.
Брокер сообщений - это промежуточное программное обеспечение, которое обеспечивает передачу сообщений между различными компонентами приложения или системы. Он играет роль посредника между отправителями и получателями сообщений, обеспечивая их доставку и обработку в соответствии с определенными правилами и механизмами.
Основные функции брокера сообщений включают в себя:
Примеры популярных брокеров сообщений включают в себя RabbitMQ, *Apache Kafka, Redis и другие.
Паттерн обмена сообщениями (Message Exchange Pattern):
Типы общения:
Коммуникационные модели:
Гарантии доставки:
– это два разных инструмента для обмена сообщениями, каждый из которых подходит для определенных задач.
Apache Kafka предназначен для работы с огромными объемами данных, когда десятки тысяч и больше сообщений обрабатываются каждую секунду. Это система, которая легко масштабируется и обеспечивает высокую отказоустойчивость, что особенно важно для крупных проектов.
С другой стороны, RabbitMQ проще в установке и настройке. Он хорошо справляется с асинхронным обменом данными в микросервисной архитектуре. RabbitMQ не требует дополнительных компонентов и больших ресурсов, так как сообщения удаляются из очереди после их чтения. Кроме того, он предлагает большие возможности для настройки маршрутизации сообщений. RabbitMQ - это хороший выбор, если вы не имеете особых требований к отказоустойчивости и пропускной способности.
Важно понимать, что нет абсолютно лучшего инструмента - выбор зависит от конкретных потребностей проекта.
– это мощный и гибкий брокер сообщений, который широко используется в распределенных системах для обмена данными между различными компонентами приложений. Он реализует стандартный протокол AMQP (Advanced Message Queuing Protocol), что делает его совместимым с широким спектром языков программирования и платформ. Давайте более подробно рассмотрим основные объекты и понятия в RabbitMQ:
Основные объекты и понятия RabbitMQ:
import pika
# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание обмена с именем 'logs' и типом 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Создание временной очереди и связывание ее с обменом
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
Создается временная очередь без имени (параметр queue=''
) с флагом exclusive=True
, что делает ее приватной для соединения и удаляет ее, когда соединение закрывается. Эта очередь затем связывается с обменником ‘logs’
# Пример маршрутизации сообщений в RabbitMQ с помощью библиотеки pika
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='my_exchange', routing_key='my_key', body='Hello, RabbitMQ!')
Этот пример демонстрирует базовую отправку сообщения в RabbitMQ.
В параметрах метода basic_publish
указаны:
exchange='my_exchange'
: Имя обменника, через который отправляется сообщение.routing_key='my_key'
: Маршрутизационный ключ, который используется для определения, какое сообщение отправляется в какую очередь.body='Hello, RabbitMQ!'
: Тело сообщения.– это данные, которые передаются между издателями и потребителями через RabbitMQ. Оно состоит из заголовка и тела, где заголовок содержит метаданные, а тело - собственно данные.
Концепции, связанные с обработкой сообщений в системе
Издатель (Publisher) - это приложение, которое отправляет сообщения в обмены RabbitMQ.
Потребитель (Consumer) - это приложение, которое принимает сообщения из очередей RabbitMQ для последующей обработки.
Отправка сообщения (Publish) в обмен:
import pika
# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Публикация сообщения в обмен 'logs'
channel.basic_publish(exchange='logs', routing_key='', body='Hello World!')
print("Сообщение успешно опубликовано в обмен 'logs'")
Этот код создает соединение с брокером RabbitMQ и публикует сообщение “Hello World!” в Exchange с именем ‘logs’.
Получение сообщения (Consume) из очереди:
import pika
# Callback-функция, которая будет вызвана при получении сообщения
def callback(ch, method, properties, body):
print("Получено сообщение:", body)
# Подключение к брокеру RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди 'hello', если ее нет
channel.queue_declare(queue='hello')
# Указание, что при получении сообщения вызывается функция callback
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print('Ожидание сообщений. Для выхода нажмите CTRL+C')
channel.start_consuming()
Этот код подключается к брокеру RabbitMQ, создает очередь с именем ‘hello’ (если ее нет) и ожидает сообщения. Когда сообщение поступает в очередь ‘hello’, вызывается функция callback
, которая просто выводит сообщение в консоль.