ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • BranchPythonOperator 로 조건에 따라 task 분기하기
    airflow 2021. 7. 31. 17:51

    문제

    조건에 따라 다음 task 를 선택해 실행되도록 dag 을 구성하고 싶다.

     

    Code

    import random
    from datetime import datetime
    
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import BranchPythonOperator
    
    default_args = {
        'start_date': datetime(2021, 7, 31),
        'schedule_interval': '@daily'
    }
    
    
    def choose_branch(**kwargs):
        branches = ['b1', 'b2', 'b3']
        chosen = random.choice(branches)
        print(f'chosen: {chosen}')
        return chosen
    
    
    with DAG(dag_id='branch_test', default_args=default_args, schedule_interval=None) as dag:
        start_dag = BashOperator(task_id='start', bash_command='echo start')
    
        branching = BranchPythonOperator(task_id='choose_branch', python_callable=choose_branch)
        b1 = BashOperator(task_id='b1', bash_command='echo b1')
        b2 = BashOperator(task_id='b2', bash_command='echo b2')
        b3 = BashOperator(task_id='b3', bash_command='echo b3')
        c1 = BashOperator(task_id='c1', bash_command='echo c1')
    
        start_dag >> branching >> [b1, b2, b3]
        b1 >> c1

     

    Airflow 실행

    def choose_branch(**kwargs) 에서 chosen 값으로 b1 이 선택되었다면,

    start >> choose_branch >> b1 >> c1 으로 실행된다. (b2, b3 는 SKIP 됨)

     

     

     

     

     

     

     

     

     

     

     

     

     

    ShortCircuitOperator

    조건(condition) 에 맞지 않을경우 downstream task 를 SKIP 하는 ShortCircuitOperator 도 있다.

    댓글

Designed by Tistory.