데이터 엔지니어링/AirFlow 9

[Airflow] 기반의 데이터 파이프라인(9) - Executor

Executor는 Task 인스턴스들을 실행하는 메커니즘입니다. 공통 API를 가지고 있으며 요구되는 사항에 맞게 Executor를 변경하여 설치할 수 있습니다. 다만 Airflow는 한 번에 하나의 Executor를 구성할 수 있으며, airflow.cfg에서 옵션을 변경할 수 있습니다. Executor Executor는 크게 두 가지로 분류할 수 있다. Local Executors 스케줄로 프로세스 내부에서 작업을 실행한다. dag.test() Debugging Airflow DAGs on the command line SequentialExecutor : Airflow 초기 설치 시 default값 다중 연결을 지원하지 않기 때문에 sqllite만 유일하게 지원한다. 한 번에 하나의 작업 인스턴스만..

[Airflow] 기반의 데이터 파이프라인(8) - 워크플로우 트리거

워크플로우 트리거 : 미리 정의된 시간에 워크플로우를 시작하지 않고 특정 Task의 수행으로 시작해야 하는 경우 사용 외부 이벤트의 결과 공유 드라이브 등에 파일 업로드 변경된 코드가 repository에 푸시된 경우 Hive 테이블에 파티션이 생성된 경우 등등.. 위와 같은 사례의 일반적인 경우는 새로운 데이터가 도착하는 경우라고 볼 수 있다. 1. 센서(Sensor) 워크플로우 생성 시 특정 Event를 기다려야 하는 경우가 존재한다. 이러한 경우 Sensor를 사용할 수 있습니다. Sensor는 BashOperator, PythonOperator등과 같은 operator로써 해당하는 Event가 발생하는 것을 기다리도록 설계되어 있어 시간 혹은 파일 등의 외부 Event를 기다린 이후 수행하여 다운..

[Airflow] 기반의 데이터 파이프라인(7) - 태스크 간 데이터 공유

안녕하세요 아래 내용을 정리하다 보니 내용이 너무 빈약해 조금 더 공부하고 내용을 추가할 예정입니다. 1. XCom XCom(cross-communications) : 작업이 완전히 격리되어 있고 완전히 다른 서버에서 실행될 수 있기 때문에 작업이 서로 통신할 수 있도록 한다. XCom은 key와 task_id, dag_id에 의해 식별된다. XCom은 xcom_push, xcom_pull 메소드를 사용하여 값을 주고받는다. # push_task의 instance에서 xcom_push를 통해 XCom 값을 게시한다. push_instance.xcom_push(key="xcom unique key", value='보낼값') # pull_task의 instance에서 xcom_pull을 통해 XCom 값을 ..

[Airflow] 기반의 데이터 파이프라인(6) - 태스크 간 의존성 및 브랜치

의존성 1. 선형 의존성 Airflow에서 선형 의존성을 가진 Task는 이전 Task의 결과가 다음 Task의 입력값으로 사용되기 때문에, 다음 태스크로 이동하기 전에 각 태스크가 완료되어야 한다. Airflow에서는 >> 연산지 혹은 second_stream >> third_stream # 유형 2 # 유형 1과 마찬가지로 first_stream ~ third_stream까지 차례로 수행된다. # 후행Task.set_upstream(선행Task) second_stream.set_upstream(first_stream) third_stream.set_upstream(second_stream) # 유형 3 # set_downstream # 유형 1과 마찬가지로 first_stream ~ third_str..

[Airflow] 기반의 데이터 파이프라인(5) - 백필(BackFilling)

Workflow를 개발, 운영해본 입장이라면 과거데이터 재소급 등의 이유로 과거 시점의 데이터 처리를 진행할 필요가 있다는 것을 알고 있을 것이다. Airflow의 BackFilling은 이를 위해 존재한다. dag=DAG( dag_id="backfill_examples", schedule_interval="@daily", # 매일 자정에 실행되도록 DAG를 스케줄하는 매크로 start_date=dt.datetime(2023.1.1), # 2023.1.1 부터 DAG 실행 스케줄을 시작할 날짜/시간 end_date=dt.datetime(2023.12.31), # 2023.12.31 까지 DAG가 실행된다. catchup=True, # ...생략... ) catchup=True 인 경우 현재 시점보다 과거..

[Airflow] 기반의 데이터 파이프라인(4) - Airflow의 실행 날짜 이해

Airflow는 시작날짜, 스케줄 간격, 종료날짜의 세 가지 매개변수를 사용하여 DAG를 실행하는 시점을 제어할 수 있다. Airflow는 스케줄의 간격이 지난 시점에 DAG를 트리거 한다. 따라서 start_date 기준 schedule_interval이 도래한 가장 첫 시점에 DAG가 실행된다고 보면 된다. (실무에서 사용해보지 못했지만 오픈 카톡방인 '한국 데이터 엔지니어 모임' 에서 종종 물어보는 질문이다.) 책의 예시에서는 2019-01-03일의 데이터를 2019-01-04의 정각에 수집하는 예를 보여준다. 일반적인 스케줄러라면 시작일을 2019-01-04로 해야 하겠지만 Airflow에서의 start_date는 예약 간격의 실행 날짜 이기 때문에 start_date는 2019-01-03으로 주..

[Airflow] 기반의 데이터 파이프라인(3) - 스케줄링

Airflow는 DAG에 적절한 값을 통해 스케줄 간격을 정의하여 정기적으로 실행할 수 있다. DAG를 초기화할 때 schedule_interval을 설정하여 스케줄 간격을 정의할 수 있다. 해당 값의 default는 None이며 DAG가 예약 실행되지 않고, UI 또는 API를 통해서 수동으로 트리거 된다. 1. 스케줄 간격 및 시작 날짜 정의 import datetime as dt # ...생략... dag=DAG( dag_id="schedule_examples", schedule_interval="@daily", # 매일 자정에 실행되도록 DAG를 스케줄하는 매크로 start_date=dt.datetime(2023.1.1), # 2023.1.1 부터 DAG 실행 스케줄을 시작할 날짜/시간 end_d..

[Airflow] 기반의 데이터 파이프라인(2) - Airflow 설치 with Docker

Airflow를 처음 접하는 저는 Window 환경에서 설치가 안된다는 사실이 책에 없어 혹시 누군가가 설치를 진행하다 포기하는 일을 방지하기 위해 작성합니다. Airflow 내부에 pwd 라이브러리등을 사용하기에 Window 환경에서 설치는 불가능 합니다. 또한 Docker가 PC에 설치되어 있어야 합니다. 1. docker-compose.yaml 다운로드 # 윈도우 wget -Uri https://airflow.apache.org/docs/apache-airflow/2.3.2/docker-compose.yaml -OutFile ./docker-compose.yaml # 맥, 리눅스 curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/doc..

[Airflow] 기반의 데이터 파이프라인(1)

Apache Airflow 기반의 데이터 파이프라인을 읽고 정리하였습니다. Airflow 1. 파이썬 코드로 유연한 파이프라인 정의 Airflow를 사용하면 파이프라인이나 워크플로 태스크를 방향성 비순환 그래프(DAG)로 정의할 수 있다. Airflow는 파이썬 스크립트로 DAG의 구조를 설명하고 구성하기 때문에 각 DAG 파일은 주어진 DAG에 대한 태스크 집합과 태스크 간의 의존성을 기술하고 실행 방법, 시간 등을 정의한다. 2. 파이프라인 스케줄링 및 실행 DAG로 파이프라인 구조를 정의하고 나면 Airflow가 파이프라인을 언제 실행할 것인지 각각의 DAG 실행 주기를 정의할 수 있다. 이를 통해 매시간, 매일, 매주 등의 DAG를 실행하거나 더 복잡한 스케줄을 사용할 수 있다. 3. 구성 Air..