데이터 엔지니어링/AirFlow

[Airflow] 기반의 데이터 파이프라인(7) - 태스크 간 데이터 공유

안용감한호랑이 2023. 10. 20. 20:02

안녕하세요 아래 내용을 정리하다 보니 내용이 너무 빈약해 조금 더 공부하고 내용을 추가할 예정입니다.


1. XCom

  • XCom(cross-communications) : 작업이 완전히 격리되어 있고 완전히 다른 서버에서 실행될 수 있기 때문에 작업이 서로 통신할 수 있도록 한다.
  • XCom은 key와 task_id, dag_id에 의해 식별된다.
  • XCom은 xcom_push, xcom_pull 메소드를 사용하여 값을 주고받는다.
# push_task의 instance에서 xcom_push를 통해 XCom 값을 게시한다.
push_instance.xcom_push(key="xcom unique key", value='보낼값')


# pull_task의 instance에서 xcom_pull을 통해 XCom 값을 가져온다.
pull_instance.xcom_pull(key="xcom unique key", task_ids="push_task")

 

주의사항

  1. XCom은 명시적 의존성 Task와 달리 묵시적인 의존성으로 인해 Task 스케줄시 고려되지 않는다.
  2. 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성이 있다.
  3. 저장하는 모든 값은 직렬화를 지원해야 한다.

 

2. Taskflow API

Airflow 2부터 Taskflow API를 통해 파이썬 태스크 및 의존성을 정의하기 위한 데커레이터 기반 API를 추가로 제공한다.

Taskflow API로부터 추가된 @task 데커레이터를 통해 파이썬 함수를 Task로 쉽게 변환하고 DAG 정의에서 Task 간에 데이터 공유를 명확하게 한다.

from airflow.decorators import task

with DAG(
	# ...생략...
) as dag:
	
    # ...생략...
    
    @task
    def push_XCom_taskflow():
    	xcom_data="push data"
        return xcom_data
    
    @task
    def pull_XCom_taskflow(xcom_data:str):
    	new_message = xcom_data + "pull data"
        
        return new_message
        
    xcom_data   = push_XCom_taskflow()
    new_message = pull_XCom_taskflow(xcom_data)
    
    PythonOperator(
    	task_id = "print_message",
        python_callable=print,
        op_kwargs=[new_message]
    )