DAGs, Operators, Tasks, Sensors, Executor, Scheduler
- 2014년 에어비앤비(Airbnb)에서 시작되어, 현재 아파치(Apache) 재단 탑레벨 프로젝트
- Airflow는 Python 코드로 워크플로우(workflow)를 작성하고, 스케줄링, 모니터링 하는 플랫폼
- Airflow를 통해서 데이터엔지니어링의 ETL 작업을 자동화하고, DAG(Directed Acyclic Graph) 형태의 워크플로우 작성이 가능
- AWS, GCP 모두 Airflow managed service를 제공할 정도로 널리 사용
Airflow 배경
- 개인, 기업을 넘어 공공기관까지 일상적 비즈니스의 일부로 데이터 파이프라인을 도입하고 있다
- Airflow는 데이터 파이프라인중 배치 태스크에 중심을 둔 Batch-Oriented Framework 이다
Airflow 구조
- Scheduler:
- DAG Directory에 저장된 DAG(=workflow)를 트리거
- DAG에 저장된 태스크를 인스턴스화하여 Executer에게 전달
- Executer:
- 전달받은 태스크를 Worker에게 전달
- Worker:
- 태스크를 실행
- Web Server:
- User Interface를 통해 접근해 쉽게 Airflow를 동작시키기 위한 웹서버
- DAG Directory:
- 한 개 이상의 DAG를 정의해놓은 디렉터리
- Metadata DB:
- DAG 저장
- 태스크 결과 저장
- airflow.cfg:
- Airflow 설정값을 포함하는 파일
Airflow 동작원리
- 유저가 DAG로 워크플로 작성
- 유저가 정의한 DAG는 DAG Directory에 저장
- Scheduler는 DAG Directory의 모든 DAG를 확인하고 파싱, 태스크를 대기열에 추가
- 3-1. 파일로부터 DAG를 확인 (태스크, 의존성, 스케줄 주기)
- 3-2. 예약된 시간이 지난 태스크의 의존성을 확인하고, 이전 태스크가 모두 해결되었으면 대기열에 추가
- Worker는 예약된 태스크를 실행하고 그 결과를 Metadata DB에 저장
- 저장된 태스크 결과를 Web Server가 읽어감
- DAG(Directed Acyclic Graph): 워크플로우를 파이썬 언어로 코드화한 것
- Airflow의 핵심은 DAG를 잘 작성하는 것
- DAG는 크게 DAG 선언, Operator, Sensor, Operator간 의존성 주입
with DAG("my-dag") as dag:
ping = SimpleHttpOperator(endpoint="")
email = EmailOperator(to="", subject="Update complete")
ping >> email
- DAG에 작업(Task)을 템플릿 형태로 미리 정의해둔 것 (Operator, Sensor를 클래스, Task를 객체로 생각할 수 있음)
- 에어플로우 설치시 함께 제공되는 built-in Operator와, 필요할 때 별도로 설치 가능한 Operator가 있음
# built-in
BashOperator: 쉘 커맨드 실행
PythonOperator: 파이썬 함수 호출
EmailOperator: 이메일 전송
# providers
SimpleHttpOperator: HTTP 메세지 전송
MySqlOperator: SQL문 실행
- Operator의 특별한 타입
- 특정 이벤트가 일어나길 기다렸다가, 발생하면 다음 태스크를 진행
- 기다리는 이벤트가 단순히 시간 기반일 수도 있고, 아니면 어떤 파일이 될 수도 있음
DAG 만들어보기
DAG 정의할 때 자주 사용하는 파라미터
- 전체 파라미터는 공식문서 참고
- dag_id: The id of the DAG
- schedule_interval: Defines how often that DAG runs
- start_date: The timestamp from which the scheduler will attempt to backfill
- default_args: A dictionary of default parameters to be used as constructor keyword parameters when initialising operators.
- catchup: Perform scheduler catchup (or only run latest)? Defaults to True
- tags: List of tags to help filtering DAGs in the UI.
DAG를 선언하는 3가지 방법
컨텍스트 매니저with DAG( "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False ) as dag: op = EmptyOperator(task_id="task")
- DAG 인스턴스를 Operator에 직접 전달
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) op = EmptyOperator(task_id="task", dag=my_dag)
데코레이터@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) def generate_dag(): op = EmptyOperator(task_id="task") dag = generate_dag()
DAG 작성 예시
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'start_date': datetime(2022, 7, 19)
with DAG(dag_id='recruitment-airflow',
catchup=False) as dag:
crawling = BashOperator(
bash_command='python /opt/pipeline/script/'
MongoDBToKafka = BashOperator(
bash_command='python /opt/pipeline/script/'
KafkaToS3 = BashOperator(
bash_command='python /opt/pipeline/script/'
S3ToElasticsearch = BashOperator(
bash_command='python /opt/pipeline/script/'
S3ToMySQL = BashOperator(
bash_command='python /opt/pipeline/script/'
crawling >> MongoDBToKafka >> KafkaToS3 >> [S3ToElasticsearch, S3ToMySQL]
The possible states for a Task Instance are:
- none: The Task has not yet been queued for execution (its dependencies are not yet met)
- scheduled: The scheduler has determined the Task’s dependencies are met and it should run
- queued: The task has been assigned to an Executor and is awaiting a worker
- running: The task is running on a worker (or on a local/synchronous executor)
- success: The task finished running without errors
- shutdown: The task was externally requested to shut down when it was running
- restarting: The task was externally requested to restart when it was running
- failed: The task had an error during execution and failed to run
- skipped: The task was skipped due to branching, LatestOnly, or similar.
- upstream_failed: An upstream task failed and the Trigger Rule says we needed it
- up_for_retry: The task failed, but has retry attempts left and will be rescheduled.
- up_for_reschedule: The task is a Sensor that is in reschedule mode
- deferred: The task has been deferred to a trigger
- removed: The task has vanished from the DAG since the run started
- DAGs를 한 눈에 볼 수 있다
- DAG 하나에 대해서는 그래프로 볼 수도 있다
- DAG에서 각각의 태스크가 시간이 얼마나 걸리는지 파악할 수도 있다