Jay's Cookbook
Menu
  • Tags
  • Categories
  • Projects
Computer Science
OS
Network
Data Structure
Algorithm
Language
Code Architecture
Python
Javascript
Typescript
Java
Backend
Backend Theory
TypeORM
Node.js
NestJS
FastAPI
Frontend
HTML/CSS
React
Next.js
Data Engineering
DE Theory
MySQL
MongoDB
Elastic
Redis
Kafka
Spark
Airflow
AI
Basic
Pytorch
NLP
Computer Vision
Data Analytics
Statistics
Pandas
Matplotlib
DevOps
Git
Docker
Kubernetes
AWS
Airflow Series [Part7]: 워크플로 컨트롤
data_engineering
airflow

Airflow Series [Part7]: 워크플로 컨트롤

Jay Kim
Jay Kim 05 Feb 2022
Airflow Series [Part6]: 템플릿 변수 끝

Table of Contents

  • 의존성 정의하기
  • 브랜치 하기
  • 태스크 트리거
  • 워크플로 트리거
  • 참고

의존성 정의하기

# in general we recommend you use the bitshift operators, as they are easier to read in most cases

bash_start >> [bash_a, bash_b]
bash_a >> bash_c
bash_b >> bash_d
[bash_c, bash_d] >> bash_e
bash_e >> bash_f >> bash_g

브랜치 하기

import random
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator

def _pick_erp_system():
    x = random.randint(1, 2)
    if x % 2 == 0:
        return "bash_a1" # 리턴할 태스크 ID. 리스트를 넘기면 여러 개의 태스크를 실행할 수도 있다
    else:
        return "bash_b1"

with DAG(
            dag_id="test_airflow",
            schedule_interval="* * * * *", # 매분마다 실행
            start_date=datetime(2023, 1, 15),
            catchup=False
        ) as dag:
    bash_pick = BranchPythonOperator( # BranchPythonOperator 사용ㄷ
        task_id="bash_pick", 
        python_callable=_pick_erp_system
    )
    bash_a1 = BashOperator(
        task_id="bash_a1", 
        bash_command="echo A1"
    )
    bash_b1 = BashOperator(
        task_id="bash_b1", 
        bash_command="echo B1"
    )
    bash_a2 = BashOperator(
        task_id="bash_a2", 
        bash_command="echo A2"
    )
    bash_b2 = BashOperator(
        task_id="bash_b2", 
        bash_command="echo B2"
    )
    bash_a3 = BashOperator(
        task_id="bash_a3", 
        bash_command="echo A3"
    )
    bash_b3 = BashOperator(
        task_id="bash_b3", 
        bash_command="echo B3"
    )


    bash_pick >> [bash_a1, bash_b1]
    bash_a1 >> bash_a2 >> bash_a3
    bash_b1 >> bash_b2 >> bash_b3

태스크 트리거

  • 업스트림 태스크가 어떤 상태일 때 태스크가 실행되도록 할지 결정한다
  • 디폴트는 all_success로, 업스트림 태스크가 모두 성공했을 때 실행된다
트리거 규칙 동작 사용 사례
all_success 모든 상위 태스크가 성공하면 트리거 된다 일반적인 워크플로에 대한 기본 트리거 규칙
all_failed 모든 상위 태스크가 실패했거나, 오류가 발생한 경우 트리거 된다 오류를 처리하는 태스크의 트리거 규칙
all_done 결과 상태에 관계없이 모든 부모가 실행을 완료하면 트리거 된다 시스템 종료 또는 클러스터 중지하는 태스크의 트리거 규칙
one_success 한 부모가 성공하자마자 트리거 된다  
one_failed 하나 이상의 상위 태스크가 실패하자마자 트리거 된다 알림 또는 롤백과 같은 일부 오류 처리 태스크의 트리거 규칙
none_failed 실패한 태스크 없이, 모든 상위 태스크가 성공 또는 건너뛴 경우 트리거 된다 스킵된 태스크와 성공한 태스크를 결합한 태스크의 트리거 규칙
none_skipped 스킵한 태스크 없이, 모든 상위 태스크가 성공 또는 실패한 경우 트리거 된다  
dummy 업스트림 태스크의 상태와 관계없이 트리거 된다 테스트 용도

    bash_join = BashOperator(
        task_id="bash_join", 
        bash_command="echo JOIN",
        trigger_rule="none_failed"
    )

    bash_join = BashOperator(
        task_id="bash_join", 
        bash_command="echo JOIN",
        trigger_rule="none_skipped"
    )

워크플로 트리거

  • 센서를 이용하면, 고정된 스케줄링 방식이 아닌 특정 이벤트로 파이프라인을 트리거 할 수도 있다
  • ex. 새로운 데이터가 도착, 공유 드라이브에 파일이 업로드, 코드를 리포지터리로 푸시, Hive 테이블에 파티션이 있을 때와 같은 경우
  • 센서는 오퍼레이터의 특수 타입(서브클래스)
  • 센서는 특정 조건이 True인지 지속적으로 확인하고, True이면 태스크가 성공한 것으로 간주된다. False이면 True 또는 타임아웃이 될 때까지 확인
from airflow.sensors.filesystem import FileSensor

wait_for_data = FileSensor(
    task_id="wait_for_data",
    filepath="/data/data.csv",
)
from pathlib import Path
from airflow.sensors.python import PythonSensor

def _wait_for_data():
    data_path = Path("/data/data.csv")
    return data_path

wait_for_data = PythonSensor(
    task_id="wait_for_data",
    python_callable=_wait_for_data
)

참고

Airflow Series [Part6]: 템플릿 변수 끝

You may also like

See all airflow
05 Feb 2022 Airflow Series [Part6]: 템플릿 변수
data_engineering
airflow

Airflow Series [Part6]: 템플릿 변수

05 Feb 2022 Airflow Series [Part5]: 스케줄링
data_engineering
airflow

Airflow Series [Part5]: 스케줄링

05 Feb 2022 Airflow Series [Part4]: 에어플로우 도입 사례
data_engineering
airflow

Airflow Series [Part4]: 에어플로우 도입 사례

Jay Kim

Jay Kim

Web development, data engineering for human for the Earth. I share posts, free resources and inspiration.

Rest
Lifestyle
Hobby
Hobby
Hobby
Hobby
2025 © Jay's Cookbook. Crafted & Designed by Artem Sheludko.