Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다.
XCom이란
XCom(Cross Communication)이란 airflow dag 안 task 간 데이터 공유를 위해 사용되는 기술이다.
예로 task1의 수행 중 내용이나 결과를 task2에서 사용하거나 입력으로 주고 싶은 경우에 사용한다.
- 주로 작은 규모의 데이터 공유를 위해 사용
- XCom 내용은 메타 DB의 xcom 테이블에 저장됨
- 1GB 이상의 대용량 데이터 공유는 외부 솔루션(AWS S3, HDFS 등)을 사용
Python Operator에서 XCom 사용
2가지 방법으로 XCom을 사용할 수 있다.
- **kwargs에 존재하는 task_instance 객체 활용
- 파이썬 함수의 return 값 활용
task_instance 객체 활용
Python 오퍼레이터는 **kwargs에 template 변수들을 자동으로 제공해준다.
**kwargs에 존재하는 template 변수에서 task_instance 객체를 얻을 수 있다. task_instance 객체는 xcom_push 메서드를 가지고 있다.
공식 문서를 확인해보면
key, value 형태로 저장하는 것을 알 수 있다.
xcom_push가 어떻게 동작하는지 내부 코드가 궁금해서 확인해봤다.
파라미터 설명에 execution_date는 deprecated라고 적혀있으므로 self_execution_date와 비교해 문제가 없으면 deprecated 경고 메시지만 전달하고 쓰이지 않는 것을 확인할 수 있다.
그 후에는 인자로 전달한 key, value를 XCom.set으로 저장한다.
value를 가져올 때는 xcom_pull 메서드를 사용한다. 인자로 전달한 key에 해당하는 value를 반환하며, task_ids에 task id를 명시하면 해당 task가 push한 value를 가져온다.
xcom_pull 내부 코드는 push보다 훨씬 길고 복잡하다. 주석을 읽어보면 어느 정도는 이해할 수 있으나 조금 어렵다. task_ids를 입력하지 않으면 하나의 task에서만 pull한다. task_ids가 여러개 입력되면 순회하면서 value를 가져온다.
여러 task에서 중복 key로 push 했을 때 해당 key로 pull 한다면
- task_ids 명시 X : 해당 key로 마지막에 넣었던 value 가져옴
- task_ids 명시 : 해당 task id를 가진 task가 push 했던 value 가져옴
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1, 2, 3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1, 2, 3, 4])
@task(task_id='python_xcom_pull_task1')
def xcom_pull1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
@task(task_id='python_xcom_pull_task2')
def xcom_pull2(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result2", task_ids=['python_xcom_push_task1', 'python_xcom_push_task2'])
print(value1)
xcom_push1() >> xcom_push2() >> [xcom_pull1(), xcom_pull2()]
python_xcom_push_task1 | |
result1 | "value_1" |
result2 | [1, 2, 3] |
python_xcom_push_task2 | |
result1 | "value_2" |
result2 | [1, 2, 3, 4] |
위 두 task는 동일한 key에 대해 다른 value를 저장한다.
python_xcom_pull_task1
- value1 : key(=result1)만 명시, 마지막에 push한 value만 pull하므로 "value_2"
- value2 : key(=result2), task_ids 명시, 'python_xcom_push_task1'이 push한 [1, 2, 3]
python_xcom_pull_task2
- value1 : key(=result2), task_ids 명시, 'python_xcom_push_task1', 'python_xcom_push_task2'가 push한 [1, 2, 3], [1, 2, 3, 4]
예상한 결과가 맞을지 로그를 확인한다.
예상한 value를 가져온다.
return 값 활용 (1)
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값:' + status)
python_xcom_push_by_return = xcom_push_result() # String 값이 아닌 airflow task 객체
xcom_pull_2(python_xcom_push_by_return)
xcom_pull_2(xcom_push_result()) # 이렇게도 가능
python_xcom_push_by_return은 단순히 String 값이 아니라 airflow task 객체이다.
Task decorator를 사용했다면 함수 입출력 관계만으로 task flow를 정의할 수 있다. Task decorator 사용을 권장하는 이유 중 하나다.
xcom_push_result가 먼저 실행된 후 xcom_pull_2가 실행된다.
return 값 활용 (2)
Airflow는 파이썬 함수의 return 값을 자동으로 XCom에 저장한다. 이것 또한 task decorator 사용을 권장하는 이유 중 하나다.
key는 'return_value'로 고정되어 저장된다. 만약 return한 task가 여러개라면 task_ids에 task id를 명시하면 된다.
사실 return 값을 pull할 때는 key를 꼭 명시하지 않아도 된다. 자동으로 'return_value' key의 value를 찾는다.
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_push_by_return2')
def xcom_push_result2(**kwargs):
return 'Success 2'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull()
value2 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('key, task_ids 명시 X : ' + value1)
print('task_ids 명시 : ' + value2)
python_xcom_push_by_return = xcom_push_result() # 단순히 String 값이 아닌 airflow task 객체
python_xcom_push_by_return2 = xcom_push_result2()
python_xcom_push_by_return >> python_xcom_push_by_return2 >> xcom_pull_1()
key와 task_ids 모두 명시하지 않았을 때 return한 task가 여러개라면 마지막에 return한 값을 가져온다.
task_ids에 id를 명시한 경우는 해당 task의 return 값을 가져온다.
값은 잘 가져오지만 task id를 명시하지 않으면 task가 더 많아졌을 때 어떤 task의 return 값을 가져오는지 알기 힘들다. task id는 꼭 지정해주는 것이 좋을 것 같다.
또 봐야하는 것이 task flow이다.
python_xcom_push_by_return = xcom_push_result() # 단순히 String 값이 아닌 airflow task 객체
python_xcom_push_by_return2 = xcom_push_result2()
python_xcom_push_by_return >> python_xcom_push_by_return2 >> xcom_pull_1()
xcom_push_result()의 return 값이 'Success'이니 python_xcom_push_by_return에는 문자열 값이 들어가겠구나라고 생각하면 안 된다.
python_xcom_push_by_return은 단순히 String 값이 아니라 airflow task 객체이기 때문에 task flow에 작성하는 것이 가능하다.
참고로 airflow UI에서 XCom 탭을 확인하면 해당 task에서 push한 key, value를 확인할 수 있다.
'Data > Airflow' 카테고리의 다른 글
Airflow | 전역 변수 Variable (0) | 2024.04.12 |
---|---|
Airflow | Bash Operator에서 XCom으로 Task 간 데이터 공유 (0) | 2024.04.01 |
Airflow | Python Operator에서 Macro 사용 (0) | 2024.03.30 |
Airflow | Bash Operator에서 Macro 사용 (4) | 2024.02.06 |
Airflow | Python Operator에서 Template 변수 사용 (0) | 2024.02.06 |