Мы все были там. Когда выполняются десятки или сотни заданий cron, и вы (или ваш босс) рветесь за волосы, пытаясь понять, почему критически важное задание не было выполнено прошлой ночью. Вы озадачены и сбиты с толку. Разве не запустилось? Это не удалось? Почему это могло потерпеть неудачу?

В AMPATH использование заданий cron для планирования заданий ETL становилось все более невозможным. Наши процессы ETL включают сначала денормализацию и сглаживание данных, а затем использование денормализованных данных для построения вычисляемых таблиц. Рассчитанные таблицы затем создают отчеты в реальном времени и системы поддержки принятия решений для Министерства здравоохранения и врачей соответственно. Вначале использование заданий cron было простым и эффективным способом их выполнения.

Рабочий процесс cron был следующим:

  1. Запускайте денормализационные задания каждую вторую минуту часа
  2. Выполняйте задания вычисляемых таблиц каждые пятую минуту с надеждой, что выполнение предыдущих заданий займет всего три минуты.

Со временем, когда объем данных и потребность в дополнительных отчетах увеличились, стало трудно оценить время, которое займет конкретное задание, и мы более чем часто сталкиваемся с тайм-аутами ожидания блокировок и взаимоблокировками. Крон достиг своего предела. Нам нужен был способ:

  1. Управляйте сложными отношениями между должностями.
  2. Управляйте всеми заданиями централизованно с четко определенным пользовательским интерфейсом.
  3. Отчеты об ошибках и предупреждения.
  4. Просмотр и анализ времени выполнения задания.
  5. Безопасность (защита учетных данных баз данных).

Войдите в Apache Airflow

В поисках решений для улучшения или замены нашего рабочего процесса ETL я наткнулся на инструмент с открытым исходным кодом Apache Airflow. Концепция Airflow Direct Acryclic Graph (DAG) предлагала способ построения и планирования сложных и динамических конвейеров данных в чрезвычайно простых, тестируемых и масштабируемый способ. И я испугался! Это именно то, что мы искали.

Мы быстро настраиваем воздушный поток с помощью docker-compose, например this. И начал преобразовывать некоторые из основных заданий ETL, которые у нас были, в код Python для Airflow. Вот гитхаб репо для скриптов.

Концепции воздушного потока

Мы собираемся рассмотреть некоторые из основных концепций, чтобы вы начали работу с Airflow. Для получения более подробной документации перейдите на этот официальный сайт документации здесь.

Группы DAG и операторы

В Airflow все рабочие процессы считаются DAG. Вы можете думать о DAG как о наборе задач, находящихся в некотором роде. Вот как DAG выглядит в представлении Airflow Graph View:

DAG обычно имеет расписание, время начала и уникальный идентификатор. Задачи внутри DAG состоят из операторов. Операторы определяют, что на самом деле выполняется для конкретной задачи. Примеры операторов в Airflow:

BashOperator: для выполнения команд / сценариев оболочки

PythonOperator: для выполнения кода Python.

Вы можете определить простой DAG, который будет просто распечатывать «Hello World!» Каждые 10 минут следующим образом:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'email_on_success': False,
'start_date': datetime(2019, 5, 31)
}
dag = DAG(
    dag_id='hello_world_dag',
    default_args=default_args,
    schedule_interval= '*/10 * * * *')
task1 = BashOperator(
           task_id="echo_hello_world"
           bash_command='echo Hello World!',
           dag=dag)

Расширяющие операторы

Что еще более интересно, вы можете расширять / создавать собственные операторы, если ни один из встроенных операторов не соответствует вашим потребностям. Вот пример того, как я расширил встроенный MySQL Operator, чтобы он возвращал результаты оператора после выполнения:

from airflow.hooks.mysql_hook import MySqlHook
class CustomMySqlOperator(MySqlOperator):
    def execute(self, context):
      self.log.info('Executing: %s', self.sql)
      hook = MySqlHook(
             mysql_conn_id=self.mysql_conn_id,
             schema=self.database)
      return hook.get_records(self.sql, parameters=self.parameters);
....

# Use the custom operator in one of your tasks
task = CustomMySqlOperator(
          task_id='custom_mysql_task',
          sql='select * from person;',
          mysql_conn_id='mysql_conn',
          database='etl',
          dag=dag)

Определение отношений между задачами

Часто вы обнаруживаете, что некоторые из ваших задач необходимо выполнять одна за другой. В воздушном потоке вы можете определить такие отношения:

task1 = BashOperator(
           task_id="echo_hello_world"
           bash_command='echo I will execute first!',
           dag=dag)
task2 = BashOperator(
           task_id="echo_hello_world"
           bash_command='echo I will execute second!',
           dag=dag)
# There are multiple ways you can define this relationship
# Using the bitwise operators or methods
# Option 1
task1 >> task2
# Option 2
task2 << task1
# Option 3
task1.set_downstream(task2)
# Option 4
task2.set_upstream(task1)

Разветвление

Иногда хочется выполнять задачи в зависимости от определенных условий. Это вполне возможно при использовании воздушного потока, вот пример:

...
run_task = BashOperator(
           task_id="echo_hello_world"
           bash_command='echo Hello World!',
           dag=dag)
sleep = BashOperator(
           task_id="echo_hello_world"
           bash_command='sleep 1m',
           dag=dag)
### function decides which task to run depending on the time by returning the task_id
def decide_path():
    now = datetime.now(timezone('Africa/Nairobi'))
    if now.hour >= 19:
       return "run_task"
    else:
       return "sleep"
branch = BranchPythonOperator(
                 task_id='check_time',
                 python_callable=decide_path,
                 dag=dag)
...

Подключения

Одна из лучших вещей в Airflow - это безопасность. Airflow обрабатывает и шифрует ваши учетные данные за вас, поэтому вам никогда не придется делать это самостоятельно или включать их в свой код.

После добавления и сохранения учетных данных в подключениях вы можете получить к ним доступ, просто добавив идентификатор подключения в свой оператор при определении вашей задачи.

Заключение

Airflow меняет правила игры, когда дело доходит до планирования и мониторинга рабочих процессов. В этой статье я рассмотрел только основные концепции, которые помогли мне начать работу, но есть еще много других, которые я не рассмотрел. Не стесняйтесь копать глубже и протягивать руку.