Celery

Дмитрий Шурмакин — ML Engineer

Celery

– это мощный инструмент для асинхронной обработки задач. Он предоставляет возможность выполнять задачи в фоновом режиме и эффективно управлять их выполнением через распределенную очередь. Одним из ключевых преимуществ Celery является его способность использовать различные брокеры сообщений, такие как Redis, RabbitMQ, и даже локальные брокеры, что обеспечивает гибкость и масштабируемость в реализации решений. Этот инструмент идеально подходит для обработки задач в реальном времени, а также для выполнения долгих и ресурсоемких операций в фоновом режиме.

Основные понятия

Задача (Task): Задача в Celery представляет собой отдельную единицу работы, которую нужно выполнить. Она может быть любой функцией или методом Python. Для создания задачи используется декоратор @task.

Пример:

from celery import Celery

app = Celery(
        'tasks',
        backend='redis://localhost',
        broker='pyamqp://guest:guest@localhost//',
    )

@app.task
def add(x, y):
    return x + y

Различные агрегационные функции

Celery предоставляет различные агрегационные функции для управления выполнением задач и их результатами. Они позволяют совершать сложные операции над группами задач, а также управлять порядком их выполнения.

Примеры агрегационных функций:

  • chain: Выполняет задачи последовательно, передавая результат одной задачи в следующую.
from celery import chain

result = chain(add.s(4, 4) | add.s(5)).delay()
print(result.get())  # Результат выполнения цепочки задач
  • group: Группирует задачи и выполняет их параллельно.
from celery import group

result = group(add.s(i, i) for i in range(10))().get()
print(result)  # Результат выполнения группы задач
  • chord: Ожидает завершения набора задач и выполняет дополнительную задачу после их завершения.
from celery import chord

result = chord((add.s(i, i) for i in range(10)), add.s()).delay()
print(result.get())  # Результат выполнения последующей задачи

Shared задачи

Shared задачи в Celery - это задачи, которые могут быть использованы в любом месте вашего приложения. Они могут быть определены в отдельном модуле и импортированы в любом другом месте вашего проекта.

Пример определения shared задачи:

from celery import shared_task

@shared_task
def process_data(data):
    pass

Это позволяет упростить структуру вашего приложения и повторно использовать функциональность задач в разных частях кода.


Частичная сигнатура задачи

Celery также поддерживает частичное определение задач, что означает предварительную фиксацию некоторых аргументов. Вот пример:

partial_add = add.s(4) # Фиксируем первый аргумент

result = partial_add.delay(4) # Теперь передаем только второй аргумент


Задание задачи с определенным приоритетом

@app.task(priority=5)
def high_priority_task():
    pass

Рабочие процессы (Workers)

Рабочие процессы отвечают за выполнение задач из очередей. Они могут быть запущены на разных узлах и обрабатывать задачи параллельно.

# Запуск рабочего процесса Celery
celery -A tasks worker --loglevel=info

tasks здесь это название файла в котором определена переменная-singleton app

Бекенд (Backend)

Бекенд в Celery - это хранилище результатов выполнения задач. Celery поддерживает различные бекенды, такие как Redis, SQLAlchemy. Бекенд используется для хранения результатов задач и для отслеживания их выполнения.

Пример настройки бекенда:

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

Результат задачи (Task Result)

Результат задачи представляет собой объект, который предоставляет информацию о статусе выполнения задачи и ее результате. Результат можно получить с помощью метода get() или ожидать его завершения с помощью метода wait().

Пример:

result = add.delay(4, 4)
print(result.get())  # Получить результат
result.wait()  # Ожидание завершения задачи
  • delay - это метод, предоставляемый объектом задачи (task object). Он является простым способом отправки задачи в очередь без явного указания имени задачи.
  • Этот метод возвращает объект AsyncResult, который можно использовать для получения результата выполнения задачи.

async_result = app.send_task("add", args=[3, 5])
async_result.get()
  • send_task - это более универсальный метод, который позволяет отправлять задачи в очередь, указывая имя задачи явно.
  • Этот метод принимает имя задачи в качестве аргумента, а также может принимать дополнительные аргументы и ключевые слова для передачи в задачу.

Дополнительно

Flower: это веб-интерфейс для мониторинга и управления Celery. Он предоставляет графики и статистику выполнения задач, а также позволяет динамически управлять очередями и работниками через веб-интерфейс.

Метрики и логи: вы можете интегрировать программу с инструментами мониторинга, такими как Prometheus и Grafana, чтобы визуализировать и анализировать метрики. Логирование также играет важную роль в отслеживании действий ваших задач и работников.

Несколько слов о Redis

Redis - это база данных типа key-value, которая широко используется для кеширования данных и обмена сообщениями в системе.

При использовании RabbitMQ в качестве брокера сообщений, Redis может быть также использован для хранения результатов выполнения задач, особенно в случаях, когда нужна быстрая передача сообщений между компонентами приложения или когда необходима стабильность, которую может предоставить Redis.

В контексте приложения с использованием Celery, обычно управление задачами осуществляется через брокер (broker), а результаты выполнения этих задач могут храниться в специальном хранилище (backend). Если нужно просто отправить задачу и не сохранять результат, то можно обойтись только брокером. Если же нужно получить результат выполнения задачи и продолжить работу с ним в приложении, то используется backend.

В рамках этого материала мы рассмотрим проект, где в качестве брокера выступает RabbitMQ, а бекэнда Redis.