ML-сервис с FastAPI + RabbitMQ + Celery

Дмитрий Шурмакин — ML Engineer

ML-сервис с FastAPI + RabbitMQ + Celery

Описание проекта

Давайте рассмотрим проект классификации текста для определения эмоциональной тональности. Мы будем использовать FastAPI, Celery, RabbitMQ и Redis для создания системы, способной асинхронно анализировать эмоциональный окрас текстов.

Логика проекта:

  1. Клиент отправляет HTTP запрос с данными текста на FastAPI.
  2. FastAPI получает запрос, проверяет его и отправляет задачу на обработку в Celery через RabbitMQ.
  3. Celery получает задачу из очереди, загружает ML модель для анализа текста, обрабатывает запрос и возвращает результат.
  4. При обработке запроса, Celery может использовать Redis для кэширования промежуточных результатов, чтобы ускорить повторные запросы с теми же данными.
  5. Результат возвращается обратно через RabbitMQ в FastAPI, который отправляет его обратно клиенту в виде HTTP ответа.

Объяснение функционала

  1. FastAPI приложение (app.py): Это веб-приложение, которое принимает запросы на анализ текста через API. Когда приходит запрос, он отправляется в RabbitMQ для асинхронной обработки.
  2. Celery воркер (predict_worker.py): Это отдельный компонент, который слушает очередь RabbitMQ на предмет запросов на анализ текста. Когда приходит запрос, он обрабатывается асинхронно с использованием машинной модели.
  3. Машинная модель (model.py): Это модель машинного обучения, которая используется для анализа тональности текста на основе входных данных.

Таким образом, наш проект использует FastAPI для создания веб-сервиса, Celery для асинхронной обработки задач, RabbitMQ для обмена сообщениями, Redis для кэширования данных и машинное обучение для определения эмоциональной тональности текста.

Структура проекта


markdownCopy code
model_service_api
├─ .docker
│  ├─ api.Dockerfile
│  └─ worker.Dockerfile
├─ .gitignore
├─ README.md
├─ docker-compose.yml
├─ poetry.lock
├─ pyproject.toml
└─ src
   ├─ api
   │  ├─ __init__.py
   │  └─ routes
   │     ├─ __init__.py
   │     ├─ healthcheck.py
   │     ├─ predict.py
   │     └─ router.py
   ├─ app.py
   ├─ connections
   │  └─ broker.py
   ├─ models
   │  ├─ __init__.py
   │  ├─ healthcheck.py
   │  └─ requests.py
   ├─ services
   │  ├─ __init__.py
   │  ├─ model.py
   │  └─ utils.py
   └─ worker
      └─ predict_worker.py

Пример кода:

FastAPI (app.py):

from fastapi import FastAPI, HTTPException
from celery import Celery
from pydantic import BaseModel

app = FastAPI()

celery_app = Celery('tasks',
    backend='redis://localhost',
    broker='pyamqp://guest:guest@localhost//'б
)

class TextRequest(BaseModel):
    text: str

@app.post("/predict/")
async def predict_emotion(text_request: TextRequest, background_tasks: BackgroundTasks):
    async_result = celery_app.send_task("analyze_sentiment", args=[text_request.text])
    result = async_result.get()
    return result

Celery (tasks.py):

from celery import Celery
from model import EmotionClassifier

celery_app = Celery('tasks', backend='redis://my_container_redis', broker='pyamqp://guest:guest@my_container_rabbitmq//')

@celery_app.task(name='analyze_sentiment')
def analyze_sentiment(text):
    sentiment = EmotionClassifier.predict_emotion(text)
    return sentiment

Mодель( model.py):

from transformers import pipeline

import torch

torch.set_num_threads(1)
#устанавливаем количество потоков

class EmotionClassifier:
    """
    Классификатор эмоций на основе предобученной модели Transformers.
    """
    model = pipeline(model="seara/rubert-tiny2-ru-go-emotions")

    @classmethod
    def predict_emotion(cls, text: str):
        """
        Предсказание эмоций по тексту.
        Args:
            text (str): Текст для анализа эмоций.
        """
        result = cls.model(text)
        return result

Заключение:

Этот проект позволяет эффективно анализировать эмоциональную тональность текстов, используя асинхронную обработку в Celery, обмен сообщениями через RabbitMQ и кэширование результатов в Redis. Такая архитектура обеспечивает высокую производительность и масштабируемость сервиса.