Мы все были там. Когда выполняются десятки или сотни заданий cron, и вы (или ваш босс) рветесь за волосы, пытаясь понять, почему критически важное задание не было выполнено прошлой ночью. Вы озадачены и сбиты с толку. Разве не запустилось? Это не удалось? Почему это могло потерпеть неудачу?
В AMPATH использование заданий cron для планирования заданий ETL становилось все более невозможным. Наши процессы ETL включают сначала денормализацию и сглаживание данных, а затем использование денормализованных данных для построения вычисляемых таблиц. Рассчитанные таблицы затем создают отчеты в реальном времени и системы поддержки принятия решений для Министерства здравоохранения и врачей соответственно. Вначале использование заданий cron было простым и эффективным способом их выполнения.
Рабочий процесс cron был следующим:
- Запускайте денормализационные задания каждую вторую минуту часа
- Выполняйте задания вычисляемых таблиц каждые пятую минуту с надеждой, что выполнение предыдущих заданий займет всего три минуты.
Со временем, когда объем данных и потребность в дополнительных отчетах увеличились, стало трудно оценить время, которое займет конкретное задание, и мы более чем часто сталкиваемся с тайм-аутами ожидания блокировок и взаимоблокировками. Крон достиг своего предела. Нам нужен был способ:
- Управляйте сложными отношениями между должностями.
- Управляйте всеми заданиями централизованно с четко определенным пользовательским интерфейсом.
- Отчеты об ошибках и предупреждения.
- Просмотр и анализ времени выполнения задания.
- Безопасность (защита учетных данных баз данных).
Войдите в Apache Airflow
В поисках решений для улучшения или замены нашего рабочего процесса ETL я наткнулся на инструмент с открытым исходным кодом Apache Airflow. Концепция Airflow Direct Acryclic Graph (DAG) предлагала способ построения и планирования сложных и динамических конвейеров данных в чрезвычайно простых, тестируемых и масштабируемый способ. И я испугался! Это именно то, что мы искали.
Мы быстро настраиваем воздушный поток с помощью docker-compose, например this. И начал преобразовывать некоторые из основных заданий ETL, которые у нас были, в код Python для Airflow. Вот гитхаб репо для скриптов.
Концепции воздушного потока
Мы собираемся рассмотреть некоторые из основных концепций, чтобы вы начали работу с Airflow. Для получения более подробной документации перейдите на этот официальный сайт документации здесь.
Группы DAG и операторы
В Airflow все рабочие процессы считаются DAG. Вы можете думать о DAG как о наборе задач, находящихся в некотором роде. Вот как DAG выглядит в представлении Airflow Graph View:
DAG обычно имеет расписание, время начала и уникальный идентификатор. Задачи внутри DAG состоят из операторов. Операторы определяют, что на самом деле выполняется для конкретной задачи. Примеры операторов в Airflow:
BashOperator
: для выполнения команд / сценариев оболочки
PythonOperator
: для выполнения кода Python.
Вы можете определить простой DAG, который будет просто распечатывать «Hello World!» Каждые 10 минут следующим образом:
from airflow.models import DAG from airflow.operators.bash_operator import BashOperator default_args = { 'owner': 'airflow', 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': True, 'email_on_success': False, 'start_date': datetime(2019, 5, 31) } dag = DAG( dag_id='hello_world_dag', default_args=default_args, schedule_interval= '*/10 * * * *') task1 = BashOperator( task_id="echo_hello_world" bash_command='echo Hello World!', dag=dag)
Расширяющие операторы
Что еще более интересно, вы можете расширять / создавать собственные операторы, если ни один из встроенных операторов не соответствует вашим потребностям. Вот пример того, как я расширил встроенный MySQL Operator, чтобы он возвращал результаты оператора после выполнения:
from airflow.hooks.mysql_hook import MySqlHook class CustomMySqlOperator(MySqlOperator): def execute(self, context): self.log.info('Executing: %s', self.sql) hook = MySqlHook( mysql_conn_id=self.mysql_conn_id, schema=self.database) return hook.get_records(self.sql, parameters=self.parameters); .... # Use the custom operator in one of your tasks task = CustomMySqlOperator( task_id='custom_mysql_task', sql='select * from person;', mysql_conn_id='mysql_conn', database='etl', dag=dag)
Определение отношений между задачами
Часто вы обнаруживаете, что некоторые из ваших задач необходимо выполнять одна за другой. В воздушном потоке вы можете определить такие отношения:
task1 = BashOperator( task_id="echo_hello_world" bash_command='echo I will execute first!', dag=dag) task2 = BashOperator( task_id="echo_hello_world" bash_command='echo I will execute second!', dag=dag) # There are multiple ways you can define this relationship # Using the bitwise operators or methods # Option 1 task1 >> task2 # Option 2 task2 << task1 # Option 3 task1.set_downstream(task2) # Option 4 task2.set_upstream(task1)
Разветвление
Иногда хочется выполнять задачи в зависимости от определенных условий. Это вполне возможно при использовании воздушного потока, вот пример:
... run_task = BashOperator( task_id="echo_hello_world" bash_command='echo Hello World!', dag=dag) sleep = BashOperator( task_id="echo_hello_world" bash_command='sleep 1m', dag=dag) ### function decides which task to run depending on the time by returning the task_id def decide_path(): now = datetime.now(timezone('Africa/Nairobi')) if now.hour >= 19: return "run_task" else: return "sleep" branch = BranchPythonOperator( task_id='check_time', python_callable=decide_path, dag=dag) ...
Подключения
Одна из лучших вещей в Airflow - это безопасность. Airflow обрабатывает и шифрует ваши учетные данные за вас, поэтому вам никогда не придется делать это самостоятельно или включать их в свой код.
После добавления и сохранения учетных данных в подключениях вы можете получить к ним доступ, просто добавив идентификатор подключения в свой оператор при определении вашей задачи.
Заключение
Airflow меняет правила игры, когда дело доходит до планирования и мониторинга рабочих процессов. В этой статье я рассмотрел только основные концепции, которые помогли мне начать работу, но есть еще много других, которые я не рассмотрел. Не стесняйтесь копать глубже и протягивать руку.