ETL: luigi

Михаил Шавкунов — NLP Data Scientist

Цель ETL в Data Science

Основные задачи перед ETL(extract/transform/load):

  • Воспроизводимость экспериментов
  • Переиспользование блоков предобработки
  • Предсказуемый объем работы для прода
  • Командное использование наработок

Как выстроить ETL

  • Пайплайн предобработки - граф исполнения
  • Выстраивание зависимостей между вершинами графа
  • Запуск разных пайплайнов одной командой
  • Хранение конфига каждого запуска

luigi

  • Строит DAG(directed acyclic graph) на основе классов
  • Каждый класс в явном виде указывает зависимость от другого
  • Финальный билд DAG можно сделать как в Python, так и в CLI
  • Поддерживает интеграции с различными база данных на уровне питоновского кода

Интерфейс Task ноды luigi

Расписание пайплайнов

В luigi запуск пайплайна по расписанию поддерживается только с использованием cron.

my_tasks.py:

class DataDump(luigi.ExternalTask):
    date = luigi.DateParameter()
    def output(self): return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/var/log/dump/%Y-%m-%d.txt'))

class AggregationTask(luigi.Task):
    date = luigi.DateParameter()
    window = luigi.IntParameter()
    def requires(self): return [DataDump(self.date - datetime.timedelta(i)) for i in range(self.window)]
    def run(self): run_some_cool_stuff(self.input())
    def output(self): return luigi.contrib.hdfs.HdfsTarget('/aggregated-%s-%d' % (self.date, self.window))

class RunAll(luigi.Task):
    ''' Dummy task that triggers execution of a other tasks'''
    def requires(self):
        for window in [3, 7, 14]:
            for d in range(10): # guarantee that aggregations were run for the past 10 days
               yield AggregationTask(datetime.date.today() - datetime.timedelta(d), window)

Дальше нужно запускать код через cron. В настройках cron можно указать:

30 0 * * * my-user luigi RunAll --module my_tasks

Best practices

  • Результат одной задачи может быть входными данными для другой.
  • Если не отработала одна из задач, то можно перезапустить пайплайн, начиная с нее с уже готовыми файлами от других задач.
  • Рекомендуется промежуточные файлы от задач собирать в отдельную папку и очищать ее периодически.
  • Можно использовать класс luigi.WrapperTask, чтобы запускать несколько пайплайнов.
class AllReports(luigi.WrapperTask):
    date = luigi.DateParameter(default=datetime.date.today())
    def requires(self):
        yield SomeReport(self.date)
        yield SomeOtherReport(self.date)
        yield CropReport(self.date)
        yield TPSReport(self.date)
        yield FooBarBazReport(self.date)

Best practices

  • Если изменился код паплайна и нужно пересчитать данные за прошлые дни, можно использовать следующий инструмент. Временной промежуток от start (включительно) до stop.
luigi --module all_reports RangeDaily --of AllReportsV2 --start 2014-10-31 --stop 2014-12-25
  • Возможно добавление ограничения ресурсов для части задачи. Может быть полезно после тяжелых вычислений.
class A(luigi.Task):

    # set maximum resources a priori
    resources = {"some_resource": 3}

    def run(self):
        # do something
        ...

        # decrease consumption of "some_resource" by one
        self.decrease_running_resources({"some_resource": 1})

        # continue with reduced resources
        ...
  • Доступен мониторинг через сообщения об ошибка на почту благодаря luigi.notifications

Демо luigi

  • Посмотрим на оформление пайплайна Tf-idf
  • Пример запуска всего графа

Итоги luigi - плюсы

  • Позволяет оформить код препроцессинга в пайплайн
  • Используется в командной работе
  • Простая интеграция с командной строкой
  • Зависимости децентрализованы. Нет больше файла с конфигурацией. Каждая задача определяет, какие входные данные ей нужны
  • Веб-сервер отображает граф задач
  • Можно переиспользовать в проде
  • Можно переиспользовать логику классов
  • Может составлять граф с учетом динамических зависимостей задач
  • Можно паралелльно запускать задачи через запуск нескольких воркеров в скедулере

Итоги luigi - минусы

  • boilerplate code:
    • Нужно пробрасывать один и тот же конфиг каждый раз
    • Неудобство в использовании множественных output
  • Чтение/запись в каждой ноде как часть избыточности
  • Новый пайплайн - создание дубликатов нод
  • Больше подходит для обработки данных батчами, чем для стриминга данных
  • Нельзя задать из коробки расписание, только через cron
  • Предполагается, что каждая задача представляет собой значительный объем работы
  • Потенциально можно запланировать порядка нескольких тысяч задач, но масштабироваться > 1000 задач не предполагается
  • Luigi не поддерживает распределенное выполнение
  • Когда воркеры начинают выполнять тысячи задач ежедневно, начинается перегруз воркеров