안녕하세요 아래 내용을 정리하다 보니 내용이 너무 빈약해 조금 더 공부하고 내용을 추가할 예정입니다.
1. XCom
- XCom(cross-communications) : 작업이 완전히 격리되어 있고 완전히 다른 서버에서 실행될 수 있기 때문에 작업이 서로 통신할 수 있도록 한다.
- XCom은 key와 task_id, dag_id에 의해 식별된다.
- XCom은 xcom_push, xcom_pull 메소드를 사용하여 값을 주고받는다.
# push_task의 instance에서 xcom_push를 통해 XCom 값을 게시한다.
push_instance.xcom_push(key="xcom unique key", value='보낼값')
# pull_task의 instance에서 xcom_pull을 통해 XCom 값을 가져온다.
pull_instance.xcom_pull(key="xcom unique key", task_ids="push_task")
주의사항
- XCom은 명시적 의존성 Task와 달리 묵시적인 의존성으로 인해 Task 스케줄시 고려되지 않는다.
- 오퍼레이터의 원자성을 무너뜨리는 패턴이 될 가능성이 있다.
- 저장하는 모든 값은 직렬화를 지원해야 한다.
2. Taskflow API
Airflow 2부터 Taskflow API를 통해 파이썬 태스크 및 의존성을 정의하기 위한 데커레이터 기반 API를 추가로 제공한다.
Taskflow API로부터 추가된 @task 데커레이터를 통해 파이썬 함수를 Task로 쉽게 변환하고 DAG 정의에서 Task 간에 데이터 공유를 명확하게 한다.
from airflow.decorators import task
with DAG(
# ...생략...
) as dag:
# ...생략...
@task
def push_XCom_taskflow():
xcom_data="push data"
return xcom_data
@task
def pull_XCom_taskflow(xcom_data:str):
new_message = xcom_data + "pull data"
return new_message
xcom_data = push_XCom_taskflow()
new_message = pull_XCom_taskflow(xcom_data)
PythonOperator(
task_id = "print_message",
python_callable=print,
op_kwargs=[new_message]
)
'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow] 기반의 데이터 파이프라인(9) - Executor (0) | 2023.10.26 |
---|---|
[Airflow] 기반의 데이터 파이프라인(8) - 워크플로우 트리거 (0) | 2023.10.21 |
[Airflow] 기반의 데이터 파이프라인(6) - 태스크 간 의존성 및 브랜치 (1) | 2023.10.19 |
[Airflow] 기반의 데이터 파이프라인(5) - 백필(BackFilling) (1) | 2023.10.14 |
[Airflow] 기반의 데이터 파이프라인(4) - Airflow의 실행 날짜 이해 (0) | 2023.10.13 |
[Airflow] 기반의 데이터 파이프라인(3) - 스케줄링 (1) | 2023.10.12 |