Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다.
공식 문서 확인
Python Operator에서 Template 변수 사용에서 어떤 파라미터가 template 변수를 지원하는지 확인했었다.
templates_dict, op_args, op_kwargs 파라미터에 template 변수를 사용할 수 있다.
Python Operator에서 Macro 사용
Bash Operator에서 Macro를 사용했던 것과 똑같다.
@task(task_id='task_using_macro',
templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
'end_date':'{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
}
)
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start date 없음'
end_date = templates_dict.get('end_date') or 'end_date 없음'
print(start_date)
print(end_date)
templates_dict는 get_datetime_macro의 파라미터인 kwargs로 전달된다.
start_date : 1개월 빼기, 1일로 변경이므로 전월 1일이다.
end_date : 1일로 변경 후 하루 빼기이므로 전월 말일이다.
Macro를 사용하지 않고 직접 연산
위 예시에서는 operator를 선언할 때 template 변수에 macro를 사용해 연산을 진행했다. 사실 python operator에서 macro를 굳이 사용할 필요가 없다.
macros.dateutil 등 macro 변수는 파이썬의 datetime, dateutil 라이브러리를 기반으로 하기 때문에 날짜 연산을 dag 안에서 직접 할 수도 있다. 파이썬 라이브러리를 직접 import해서 연산을 하면 된다.
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
Macro를 사용한 것과 실행 결과가 똑같은 것을 확인할 수 있다.
여기서 한가지 알아야 하는 것이 있다.
import pendulum
from airflow import DAG
from airflow.decorators import task
from dateutil.relativedelta import relativedelta # 라이브러리 import
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
위 코드처럼 라이브러리 import 부분을 맨 위에 선언할 수 있다. 그런데 스케줄러 부하 경감을 위해서는 task decorator 안에서 라이브러리를 불러와야 한다.
스케줄러는 주기적으로 dag을 파싱한다. DAG에 문법적인 오류가 있지는 않은지 주기적으로 검사하는 것이다. DAG이 실행되지 않아도 주기적으로 검사한다.
그런데 DAG 안 모든 부분을 파싱하는 것은 아니다. 파싱하는 부분은 아래와 같다.
- with DAG 선언 직전 부분(코드 최상단 부분)
- Operator 선언 직전 부분
해당 부분에 내용이 많으면 많아질수록 스케줄러에 큰 부하가 발생한다. Operator, task decorator 내부는 검사하지 않기 때문에 이 부분에서 라이브러리를 불러오는 것은 스케줄러 부하를 줄이는 방법 중 하나이다.
'Data > Airflow' 카테고리의 다른 글
Airflow | Bash Operator에서 XCom으로 Task 간 데이터 공유 (0) | 2024.04.01 |
---|---|
Airflow | Python Operator에서 XCom으로 Task 간 데이터 공유 (0) | 2024.03.30 |
Airflow | Bash Operator에서 Macro 사용 (4) | 2024.02.06 |
Airflow | Python Operator에서 Template 변수 사용 (0) | 2024.02.06 |
Airflow | 날짜 개념 (0) | 2024.02.06 |