ETL: Airflow

Михаил Шавкунов — NLP Data Scientist

Airflow

  • Open-source инструмент для ETL процессов на Python
  • Подходит для запуска как Python скриптов, так и sql, bash
  • Использует концепцию DAG (directed acyclic graph)
  • Аналогично luigi, позволяет интегрировать тесты, git для командной разработки
  • Подходит для задач, которые нужны запускать через регулярные промежутки времени

Основные компоненты Airflow

  • Scheduler: планировщик
  • Workers: для выполнения задач
  • Webserver: веб-интерфейс для управления DAGs, просмотра логов
  • Database: хранилище метаданных
  • Executors: компонент для запуска задач

Способы задания DAG в Airflow

DAG - концепция аналогичная в Luigi, ориентированный ациклический граф

  1. С использованием контекстного менеджера
 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")

Способы задания DAG в Airflow

  1. С использованием стандартного конструктора.
 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

Способы задания DAG в Airflow

  1. Через декоратор
import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task") # Operator that does literally nothing.

generate_dag()

Задачи в Airflow

  • Operators - выполняют такие задачи как чтение/запись данных из/в базы данных, выполнение скриптов Python, запуск командной строки
  • Hooks - предоставляют методы для взаимодействия с различными внешними системами
  • Sensors - проверяют, выполняется ли определенный критерий, прежде чем разрешить выполнение последующих задач

Список основных операторов и хуков в Airflow

Operators

Самые популярные операторы:

  • BashOperator
  • PythonOperator
  • MySqlOperator
  • PostgresOperator

Hooks

  • Позволяют легко взаимодействовать с сторонними платформами
  • В web интерфейсе можно задавать параметры всех соединений (connections)
  • Хуки можно использовать внутри PythonOperator Примеры:
  • S3Hook
  • PostgresHook

Sensors

  • Используется для ожидания события (появления файла, выполнения критерия по sql запросу, определенного времени) Примеры:
  • FileSensor
  • SqlSensor
  • DateTimeSensor

Зависимости в DAG

first_task >> [second_task, third_task]
third_task << fourth_task

Можно задать зависимости еще одним способом.

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

Установка Airflow

  • Airflow поддерживает установку только через pip (не поддерживаются poetry, pip-tools)
  • Можно развернуть airflow локально
  • Можно использовать docker compose с официальным образом airflow
  • Различные компании предлагают облачные решения для работы с Airflow

Демо

Покажем как мог бы выглядеть тот же Tf-IDF пайплайн в Airflow

Итоги Airflow

Плюсы

  • Позволяет оформить в пайплайн и использовать в командной работе
  • Можно переиспользовать в проде и в других DAGs
  • Реализован запуск пайплайна по расписанию
  • Поддерживает триггеры, автоматический перезапуск задач, мониторинг
  • Airflow подготавливает окружение для запуска задач
  • Распределяет ресурсы в зависимости от требований задачи

Минусы

  • Требуется запуск нескольких сервисов для работы airflow
  • Много кода для разового запуска пайплайна