Небольшой удобный подход к планированию задач ETL с помощью python.

Введение

Python стал универсальным языком. Он особенно широко используется в аналитике и решении алгоритмических задач в науке о данных, но также популярен в веб-разработке. Эта комбинация делает его разумным выбором для различных задач извлечения-преобразования-загрузки (ETL).

Однако многие из этих задач довольно малы и не требуют больших фреймворков, таких как Airflow или Luigi. При опросе одной или нескольких веб-страниц на наличие данных более чем достаточно простого скрипта на Python плюс crontab. Тем не менее, когда проект становится немного больше, управление несколькими заданиями с помощью cron может стать громоздким. В то же время для голой установки Airflow для мелких работ требуется как минимум 4 ГБ ОЗУ и 2 процессора (здесь). Если подумать о расходах на AWS, то это как минимум постоянно работающий инстанс t2.small.

Есть ли что-то между ними? Достаточно маленький, чтобы его можно было использовать, скажем, t2.nano (очень дешевый) и достаточно «обслуживаемый» и «расширяемый»?

В этом посте я хотел бы поделиться с вами простым подходом, который использует пакет schedule Python с несколькими модификациями.

Планировщик Python

Библиотека расписание Python предлагает простое планирование задач. Его можно установить с помощью pip, и он довольно прост в использовании. К сожалению, в документации нет примеров его использования в более крупном проекте:

import schedule
import time

def job():
    print("I'm working...")

    # Run job every 3 second/minute/hour/day/week,
    # Starting 3 second/minute/hour/day/week from now
    schedule.every(3).seconds.do(job)
    schedule.every(3).minutes.do(job)
    schedule.every(3).hours.do(job)
    schedule.every(3).days.do(job)
    schedule.every(3).weeks.do(job)

    # Run job every minute at the 23rd second
    schedule.every().minute.at(":23").do(job)

    # Run job every hour at the 42rd minute
    schedule.every().hour.at(":42").do(job)

    # Run jobs every 5th hour, 20 minutes and 30 seconds in.
    # If current time is 02:00, first execution is at 06:20:30
    schedule.every(5).hours.at("20:30").do(job)

    # Run job every day at specific HH:MM and next HH:MM:SS
    schedule.every().day.at("10:30").do(job)
    schedule.every().day.at("10:30:42").do(job)

    # Run job on a specific day of the week
    schedule.every().monday.do(job)
    schedule.every().wednesday.at("13:15").do(job)
    schedule.every().minute.at(":17").do(job)

    while True:
        schedule.run_pending()
            time.sleep(1)

Как видите, все функции вызываются на уровне модуля, что вполне допустимо для размещения его в скрипте. Однако если у вас есть несколько разных заданий, код быстро загромождается, особенно если для разных вызываемых объектов требуются разные параметры.

Другими словами, может быть предпочтительнее воспользоваться преимуществами объектно-ориентированного подхода и определить вокруг него некоторую «архитектуру».

Использование в проекте

Предположим, в качестве аргумента, что у нас есть набор выделенных задач ETL, смоделированных с использованием следующего абстрактного класса:

from abc import ABC, abstractmethod
from typing import Any, Dict, TypeVar


E = TypeVar("ETL")


class BaseETL(ABC):
    def __init__(self, **kwargs: Dict) -> None:
        self.raw_data = None
        self.transformed_data = None

    @abstractmethod
    def extract(self, **kwargs: Dict) -> E:
        ...

    @abstractmethod
    def transform(self, **kwargs: Dict) -> E:
        ...

    @abstractmethod
    def load(self, **kwargs: Dict) -> Any:
        ...

    def run(self, **kwargs: Dict) -> None:
        self.extract(**kwargs).transform(**kwargs).load(**kwargs)

Любой класс, реализующий процесс ETL, будет наследоваться от этого базового класса. Метод extract может, например, получить веб-сайт. Затем transform преобразует необработанный HTML в формат, приемлемый для базы данных. Наконец, load сохранит данные в базе данных. Все методы, выполняемые в этом порядке, могут быть обернуты методом run.

Теперь, после того, как классы ETL определены, мы хотели бы красиво запланировать каждый из них через модуль schedule.

Два примера ETL-задач

Для краткости в следующих примерах давайте пропустим наследование и сосредоточимся только на методе run. Предположим, что их методы extract, transform и load реализованы в другом месте.

etl.py

class DummyETL:  # normally DummyETL(BaseETL)
    def __init__(self, init_param: int) -> None:
        # super().__init__()  # - not needed here
        self.init_param = init_param

    def run(self, p1: int, p2: int) -> None:
        name = self.__class__.__name__
        print(f"{name}({self.init_param}, p1={p1}, p2={p1})")


class EvenDummierETL:  # same...
    def __init__(self, init_param: int) -> None:
        # super().__init__()  # - same
        self.init_param = init_param

    def run(self, p1: int) -> None:
        name = self.__class__.__name__
        print(f"{name}({self.init_param}, p1={p1})")

Параметры конструкторов могут, например, указывать URL-адреса страниц для парсинга. Для разнообразия параметры методов run можно использовать для передачи секретов.

Теперь, когда мы определили классы ETL, давайте создадим отдельный реестр, чтобы связать процессы с каким-то расписанием.

реестр.py

import schedule

from etl import DummyETL, EvenDummierETL


def get_registry():
    dummy_etl = DummyETL(init_param=13)
    dummier_etl = EvenDummierETL(init_param=15)

    return [
        (dummy_etl, schedule.every(1).seconds),
        (dummier_etl, schedule.every(1).minutes.at(":05")),
    ]

Функция get_registry — это место для определения расписания. Хотя значения параметров жестко закодированы, вы можете представить себе ситуацию, когда функция загружает их из файла конфигурации. В любом случае он возвращает список кортежей, соответствующих объектам ETL с Jobs (от schedule). Обратите внимание, что это наше соглашение. Вакансии еще не связаны с каким-либо конкретным Scheduler (опять же, из schedule). Однако соглашение позволяет нам сделать это в любой другой части проекта. Нам не нужно связывать их с объектом уровня модуля, как показано в примере документации.

Наш планировщик на основе планировщика

Наконец, давайте создадим новый класс, который будет активировать весь механизм.

планировщик.py

import time
from typing import Dict, List, Tuple, TypeVar

from schedule import Job, Scheduler

from etl import DummyETL, EvenDummierETL
from etl import E  # we could do so from e.g. etl.base


S = TypeVar("Scheduler")


class TaskScheduler:
    def __init__(self, registry: List[Tuple[E, Job]]) -> None:
        self.scheduler = Scheduler()
        self.registry = []

        for task, job in registry:
            self.registry.append(task)
            self.scheduler.jobs.append(job)

    def register(self, run_params: Dict) -> S:
        jobs = self.scheduler.get_jobs()
        for task, job in zip(self.registry, jobs):
            params = run_params.get(task.__class__.__name__)
            job.do(task.run, **params)

        return self

    def run(self, polling_seconds: int) -> None:
        while True:
            time.sleep(polling_seconds)
            self.scheduler.run_pending()

Наш TaskScheduler использует композицию для создания одного экземпляра Scheduler и добавления к нему ранее зарегистрированных заданий. Хотя это и не обязательно, мы используем typing, чтобы дать четкую подсказку о том, что должно быть предоставлено конструктору для правильной регистрации заданий. Затем метод register является отдельным методом, обеспечивающим привязку. И последнее, но не менее важное: run активирует механизмы.

Сценарий, использующий эту реализацию, будет выглядеть так:

запустить.py

from registry import get_registry
from scheduler import TaskScheduler


if __name__ == "__main__":
    run_params = {
        "DummyETL": dict(p1=1, p2=2),  # e.g. from env vars
        "EvenDummierETL": dict(p1=3),
    }
    
    registry = get_registry()  # e.g. from script's args or config
    task_scheduler = TaskScheduler(registry).register(run_params)
    task_scheduler.run()

Вероятно, самым слабым местом этого решения является соглашение об использовании __class__.__name__ в качестве ключей в словаре run_params. Однако, учитывая простоту подхода, это может быть нормально, особенно если эти параметры будут определены во время выполнения. Существует много альтернатив, одной из которых может быть создание дополнительного уровня абстракции, например. такие объекты, как DummyTask, которые будут служить мостом между объектами ETL и реестром.

Другой подход к TaskScheduler

Возвращаясь к TaskScheduler, мы также можем определить его через наследование, а не через композицию (как раньше). Это означало бы расширение функциональности собственного класса Scheduler schedule. В этом случае TaskScheduler будет следующим:

class TaskScheduler(Scheduler):  # <- here
    def __init__(self, registry: List[Tuple[E, Job]]) -> None:
        super().__init__()  # <- here
        self.registry = []

        for task, job in registry:
            self.registry.append(task)
            self.jobs.append(job)  # <- here

    def register(self, run_params: Dict) -> S:
        jobs = self.get_jobs()  # <- here
        for task, job in zip(self.registry, jobs):
            params = run_params.get(task.__class__.__name__)
            job.do(task.run, **params)

        return self

    def run(self, polling_seconds: int) -> None:
        while True:
            time.sleep(polling_seconds)
            self.run_pending()  # <- and here

Вы сами решаете, какой способ лучше, если таковой имеется;).

Заключение

В этой краткой статье мы показали, как можно расширить простой модуль schedule для создания небольшой рабочей машины ETL. Самое главное, такой подход позволяет лучше организовать код в рамках небольшого проекта, не прибегая к помощи больших пушек.

Первоначально опубликовано на https://zerowithdot.com.