데이터 엔지니어링/AirFlow

[Airflow] 기반의 데이터 파이프라인(8) - 워크플로우 트리거

안용감한호랑이 2023. 10. 21. 14:51

워크플로우 트리거 : 미리 정의된 시간에 워크플로우를 시작하지 않고 특정 Task의 수행으로 시작해야 하는 경우 사용

  • 외부 이벤트의 결과
  • 공유 드라이브 등에 파일 업로드
  • 변경된 코드가 repository에 푸시된 경우
  • Hive 테이블에 파티션이 생성된 경우
  • 등등..

위와 같은 사례의 일반적인 경우는 새로운 데이터가 도착하는 경우라고 볼 수 있다.

 

 

 

1. 센서(Sensor)

워크플로우 생성 시 특정 Event를 기다려야 하는 경우가 존재한다.

이러한 경우 Sensor를 사용할 수 있습니다. Sensor는 BashOperator, PythonOperator등과 같은 operator로써 해당하는 Event가 발생하는 것을 기다리도록 설계되어 있어 시간 혹은 파일 등의 외부 Event를 기다린 이후 수행하여 다운스트림을 실행할 수 있도록 한다.

 

 

Sensor는 주로 대기상태 이기 때문에 poke와 reschedule 실행모드로 사용할 수 있다.

  • poke
    • default
    • Sensor는 실행시간 동안 지속적으로 worker slot을 점유하고있다.
    • 짧은 시간(초 단위) 동안 대기해야 하는 경우 사용한다.

 

  • reschedule
    • Seonsor는 Event의 발생 여부를 체크할 때만 worker slot을 점유한다.
    • 긴 시간(분 단위) 동안 대기해야 하는 경우 사용한다.
    • poke와 다르게 worker slot을 지속적으로 잡고 있지 않기 때문에 자원을 덜 사용한다.

 

다음 두 경로는 Airflow의 Document상에서 poke와 reschedule에 대한 설명입니다.

다만 경로1, 경로2 두 경로에서의 설명 중 경로1 에서는 대기 시간은 한쪽은 초단위와 분단위로 구분하고,  경로2 에서는 스케줄러의 부하 방지를 위해 poke 간격이 1분 이상으로 권장하고 있습니다.

따라서 poke는 짧은 대기시간을 reschedule은 긴 대기시간을 사용하는 것으로 구분하는 것으로 개념을 잡아야 할 것 같습니다.

 

Sensor의 파라미터

  • soft_fail(bool) : True로 설정한다면 Task 실패 시 실패가 아닌 skipped로 표현된다.
  • poke_interval(float) : Event 여부를 체크하는 시간의 간격(단위 : 초)
  • timeout(float) : Sensor가 Event를 체크하는 최대 시간
  • mode(str) : {poke or reschedule}
  • exponential_backoff : 지수 백오프 알고리즘을 이용하여 poke 사이에 점진적으로 더 많은 대기 시간을 허용한다.

 

많은 Sensor들은 공식 Document에서 설명하고 있습니다.

다만 센서 모듈 리스트에서 제공하던 Sensor들이 Airflow가 1.x 버전에서 2.x 버전으로 올라오며 provider로 다수 변경되었습니다.

 

공식 Document상의 센서 모듈 리스트

  1. FileSensor : 파일 혹은 폴더 내부를 확인하고 True, False를 return 한다.
  2. PythonSensor : python_callable에 정의한 함수의 return 값이 True일 때까지 대기한다. 복잡한 로직 적용 가능
  3. ExternalTaskSensor : Airflow의 다른 DAG의 수행완료를 기다린다.
  4. SqlSensor : 기준이 충족될 때까지 SQL문을 수행한다.

 

2. 트리거

워크플로우를 생성하다 보면 하나의 워크플로우를 분리하여 여러 개의 워크플로우로 쪼개거나 여러 별개의 워크플로우가 하나의 다음 워크플로우를 개별 실행해야 하는 경우가 있다.

이러한 경우 TriggerDagRunOperator를 사용할 수 있다.

 

TriggerDagRunOperator의 파라미터

  • trigger_dag_id(str) : 트리거할 DAG의 id
  • trigger_run_id(str|Nome) : 트리거 된 DAG 실행에 사용할 실행 ID, None인 경우 실행 ID가 자동으로 생성된다.
  • conf(dict|None) : DAG 실행을 위한 configuration
  • execution_date(str|datetime.datetime|Nome) : DAG의 실행 날짜
  • reset_dag_run(bool) : 이미 수행 중인 DAG가 존재하는 경우 해당 DAG를 reset 하고 실행한다. 기존 DAG실행을 backfill 하거나 재 수행할 때 유용하다.
  • wait_for_completion(bool) : DAG 실행 완료를 기다리는지 여부
  • poke_interval(int) : wait_for_completion=True인 경우 DAG 실행 상태를 확인하기 위한 poke의 간격
  • allowed_states(list[str]|None) : 허용되는 상태 목록
  • faile_states(list[str]|None) : 실패 또는 허용되지 않는 상태 목록
  • deferrable(bool) : 완료를 기다리는 경우 완료될 때까지 작업을 연기할지 여부

 

3. REST/CLI

REST API 및 CLI를 통하여 트리거를 할 수 있다.

예를 들어 CI/CD 파이프라인의 일부로 Airflow 외부에서 워크플로를 시작하려는 경우, 타 서버의 Airflow DAG를 수행하는 경우 등의 상황이 발생할 수 있다.

 

해당 사항의 경우 하나의 포스트에 담기 어려워 Airflow의 REST API 문서로 대체합니다.

Airflow REST API Document