-
BranchPythonOperator 로 조건에 따라 task 분기하기airflow 2021. 7. 31. 17:51
문제
조건에 따라 다음 task 를 선택해 실행되도록 dag 을 구성하고 싶다.
Code
import random from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator default_args = { 'start_date': datetime(2021, 7, 31), 'schedule_interval': '@daily' } def choose_branch(**kwargs): branches = ['b1', 'b2', 'b3'] chosen = random.choice(branches) print(f'chosen: {chosen}') return chosen with DAG(dag_id='branch_test', default_args=default_args, schedule_interval=None) as dag: start_dag = BashOperator(task_id='start', bash_command='echo start') branching = BranchPythonOperator(task_id='choose_branch', python_callable=choose_branch) b1 = BashOperator(task_id='b1', bash_command='echo b1') b2 = BashOperator(task_id='b2', bash_command='echo b2') b3 = BashOperator(task_id='b3', bash_command='echo b3') c1 = BashOperator(task_id='c1', bash_command='echo c1') start_dag >> branching >> [b1, b2, b3] b1 >> c1
Airflow 실행
def choose_branch(**kwargs) 에서 chosen 값으로 b1 이 선택되었다면,
start >> choose_branch >> b1 >> c1 으로 실행된다. (b2, b3 는 SKIP 됨)
ShortCircuitOperator
조건(condition) 에 맞지 않을경우 downstream task 를 SKIP 하는 ShortCircuitOperator 도 있다.