Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다.
XCom이란
XCom(Cross Communication)이란 airflow dag 안 task 간 데이터 공유를 위해 사용되는 기술이다.
예로 task1의 수행 중 내용이나 결과를 task2에서 사용하거나 입력으로 주고 싶은 경우에 사용한다.
- 주로 작은 규모의 데이터 공유를 위해 사용
- XCom 내용은 메타 DB의 xcom 테이블에 저장됨
- 1GB 이상의 대용량 데이터 공유는 외부 솔루션(AWS S3, HDFS 등)을 사용
Bash Operator에서 XCom 사용
Template 변수인 ti(Task Instance)를 사용해 push, pull이 가능하다. Bash operator에서는 env, bash_command 파라미터에 template 변수 사용이 가능했다.
또 Airflow가 파이썬 함수의 return 값을 자동으로 XCom에 저장했던 것과 같이 bash_command에서 echo를 통해 출력하는 값은 return 값으로 간주된다. 따라서 echo를 통해 출력하는 값은 'return_value' key를 가지고 XCom에 저장된다.
하지만 echo로 많은 값들을 출력해도 그 값들이 전부 저장되는 것이 아닌 마지막 출력 값만 저장되므로 주의해야 한다.
xcom_pull 메서드를 사용할 때 task_ids만 지정하면 key는 지정하지 않아도 'return_value'가 된다.
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull
bash_push task에 의해 XCom에 push되는 값:
- ti.xcom_push: bash_pushed(=key), first_bash_message(=value)
- return 값: return_value(=key), COMPLETE(=value) (마지막으로 출력한 값이 COMPLETE이므로)
bash_pull task에 의해 XCom에서 pull되는 값:
- key(=bash_pushed)만 지정, 해당 key로 push한 task는 하나이므로 first_bash_message를 꺼내옴
- task_ids(=bash_push)만 지정, return_value인 COMPLETE를 꺼내옴
do_xcom_push가 False이면 echo로 출력하는 값을 XCom에 push하지 않는다.
'Data > Airflow' 카테고리의 다른 글
Airflow | 전역 변수 Variable (0) | 2024.04.12 |
---|---|
Airflow | Python Operator에서 XCom으로 Task 간 데이터 공유 (0) | 2024.03.30 |
Airflow | Python Operator에서 Macro 사용 (0) | 2024.03.30 |
Airflow | Bash Operator에서 Macro 사용 (4) | 2024.02.06 |
Airflow | Python Operator에서 Template 변수 사용 (0) | 2024.02.06 |