Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다.
Macro 변수 필요성
dag이 실행되는 시점에 값이 들어가야 할 때 template 변수를 사용했다.
DB 테이블이 존재하고 daily ETL 처리를 위한 조회 쿼리가 매일 0시에 실행된다고 가정할 때
data_interval_start, data_interval_end template 변수를 사용하면 매일 날짜를 바꿔줄 필요 없이 하루동안 쌓인 데이터를 조회할 수 있었다.
그런데 이와는 달리 복잡한 경우가 있다.
dag이 매월 말일(0 0 L * *)에 도는 스케줄을 가지고 있을 때 전월 말일부터 1일 전까지의 데이터만 조회하고 싶을 때이다.
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND
??
배치일이 1월 31일이라면 12월 31일부터 1월 30일까지,
배치일이 2월 28일이라면 1월 31일부터 2월 27일까지의 데이터만 조회하고 싶은 경우이다.
전월 말일(=이전 배치일)은 data_interval_start로 해결이 가능하나 1일 전은 data_interval_end에서 하루를 뺀 값이다. 단순히 template 변수만을 사용해서는 해결할 수 없다.
from : {{ data_interval_start }}
to : {{ data_interval_end }} - 1 day
이때 macro를 사용해 template 변수 기반 다양한 날짜 연산을 수행할 수 있다.
공식 문서 확인
Template reference에서 template 변수 기반 날짜 연산이 가능하도록 하는 연산 모듈을 확인할 수 있다.
시간, 날짜와 관련된 macro 변수는 4가지인데 그 중 datetime, dateutil이 가장 많이 쓰인다.
파이썬의 datetime, dateutil 라이브러리를 기반으로 하기 때문에 macro를 잘 쓰려면 해당 라이브러리에 익숙해져야 한다.
파이썬의 datetime, dateutil 라이브러리
Airflow에서 macro를 사용하기 전에 파이썬의 라이브러리를 사용해본다.
from datetime import datetime
from dateutil import relativedelta
now = datetime(year=2024, month=3, day=31)
print('현재 날짜 : ' + str(now))
print('---------- 월 연산 ----------')
print(now + relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now + relativedelta.relativedelta(months=-1)) # 1개월 빼기
print('---------- 일 연산 ----------')
print(now + relativedelta.relativedelta(day=1)) # 1일로 변경
print(now.replace(day=1)) # 1일로 변경
print(now + relativedelta.relativedelta(days=-1)) # 1일 빼기
print('---------- 복합 연산 ----------')
print(now + relativedelta.relativedelta(months=-1) + relativedelta.relativedelta(days=-1)) # 1개월, 1일 빼기
print(now + relativedelta.relativedelta(month=1) + relativedelta.relativedelta(days=-1)) # 1월로 변경, 1일 빼기
datetime 객체에 연산을 적용하면 쉽게 다양한 날짜 연산이 가능하다.
현재 날짜가 3월 31일이고 단순히 1개월을 빼면 2월 31일이지만 존재하지 않으므로 알아서 가장 가까운 말일 29일로 연산이 적용된다.
now + relativedelta.relativedelta(months=-1) # 1개월 빼기
now - relativedelta.relativedelta(months=1) # 1개월 빼기
위 코드의 결과는 같다. "-"의 위치만 다르기 때문에 편한 방식을 사용하면 된다.
Bash Operator에서 Macro 사용
Bash 오퍼레이터에서 env 파라미터에 template 변수 사용이 가능했다.
따라서 env에 macro를 사용한 결과를 저장하고 출력해본다.
1. 매월 말일(10 1 L * *) 수행되는 dag에서 전월 말일, 1일 전으로 env 설정
with DAG(
dag_id="dags_bash_with_macro_eg1",
schedule="10 0 L * *",
start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 전월 말일, END_DATE: 1일 전
bash_task_1 = BashOperator(
task_id="bash_task_1",
env={
'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
결과를 확인해보면 11월 30일에 dag이 돌았고 START_DATE는 전월 말일(10월 31일), END_DATE는 실행일 1일 전(11월 29일)으로 잘 계산된 것을 확인할 수 있다.
2. 매월 두번째 토요일(10 0 * * 6#2)에 수행되는 dag에서 2주 전 월요일, 2주 전 토요일로 env 설정
with DAG(
dag_id="dags_bash_with_macro_eg2",
schedule="10 0 * * 6#2",
start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
# START_DATE: 2주 전 월요일, END_DATE: 2주 전 토요일
bash_task_2 = BashOperator(
task_id='bash_task_2',
env={
'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}',
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds }}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
결과를 확인해보면 1월 13일에 dag이 돌았고 START_DATE는 2주 전 월요일(12월 25일), END_DATE는 2주 전 토요일(12월 30일)로 잘 계산된 것을 확인할 수 있다.
'Data > Airflow' 카테고리의 다른 글
Airflow | Python Operator에서 XCom으로 Task 간 데이터 공유 (0) | 2024.03.30 |
---|---|
Airflow | Python Operator에서 Macro 사용 (0) | 2024.03.30 |
Airflow | Python Operator에서 Template 변수 사용 (0) | 2024.02.06 |
Airflow | 날짜 개념 (0) | 2024.02.06 |
Airflow | Bash Operator에서 Template 변수 사용 (0) | 2024.02.05 |