전체 글

· Data/Project
이전 포스팅까지의 구현으로 목표로 했던 구현은 완료했으나 계속해서 개선할 점이 보여 수정한 부분이 있다.Kafka Broker 추가만약 단일 브로커 환경에서 장애가 발생하면 데이터 손실이 발생한다. 실시간 데이터를 다루는 프로젝트에서는 매우 치명적인 결함이다. 따라서 브로커를 추가했다. 브로커는 총 3대이다. Replication factor를 추가해 partition을 복제해두고 리더 장애 발생 시 다시 리더를 선출할 수 있도록 했다. 즉, 한 대의 브로커에서 장애가 발생해도 다른 브로커가 있기 때문에 고가용성을 유지할 수 있다.Topic 정보를 확인해보면 partition은 1개, replication factor는 3으로 설정되어 있다. 현재 해당 파티션에 리더 ID는 2이고 복제본이 1, 2, 3..
· Data/Project
Elasticsearch ConnectorElasticsearch connector는 elasticsearch index에 문서 작업을 요청할 수 있는 sink를 제공한다.Elasticsearch 7 사용을 위해 dependency를 추가한다. org.apache.flink flink-sql-connector-elasticsearch7 3.0.1-1.17공식 문서에 data stream에 sink를 붙여 데이터를 elasticsearch에 저장하는 방법이 자세히 쓰여있다.DataStream input = ...;input.sinkTo( new Elasticsearch7SinkBuilder() .setBulkFlushMaxActions(1) // Instructs the ..
· Data/Project
데이터 변환, 집계 처리가 필요한 이유PostgreSQL Table 생성 및 데이터 삽입에서는 데이터에 어떠한 처리도 하지 않고 카프카로부터 전달된 데이터를 바로 delivery_information 테이블에 삽입했다. 나머지 테이블은 기존 데이터에 집계 처리를 한 후 테이블에 저장된다.- pay_per_destination (배달 목적지별 음식비, 배달비 지불 총합)- charge_per_day (요일별 배달비 총합)- pay_per_category (음식 카테고리별 음식 가격 총합) 날짜, 배달 목적지를 기준으로 해당 날짜와 배달 목적지에서 발생한 배달의 음식 주문 금액, 배달비 총합 데이터를 저장하는 pay_per_destination 테이블의 예이다.날짜, 목적지가 같은 데이터끼리 음식 주문 금액..
· Data/Project
PostgreSQL Table 생성 및 데이터 삽입Flink Job을 사용한 Kafka 스트리밍에서 kafka 메시지를 스트리밍하고 DataStream 타입인 deliveryStream 객체를 얻을 수 있었다.  이제 데이터들을 postgreSQL DB에 저장해야 한다. Java로 코드를 작성하기 때문에 JDBC를 사용해 postgreSQL 테이블에 데이터를 삽입하려고 한다. Flink 공식 문서에 JDBC connector를 사용해 어떻게 스트리밍 데이터를 DB에 저장할 수 있는지 자세히 쓰여있다. 먼저 버전을 맞추어 dependency를 추가한다. org.apache.flink flink-connector-jdbc 3.1.2-1.18버전 호환이 제대로 되지 않아 오류를 겪은 적이 많아..
· Data/Airflow
XCom과 Variable의 차이 XCom으로는 특정 DAG, 특정 schedule에 수행되는 task 간에만 데이터를 공유할 수 있었다. Variable을 사용하면 모든 DAG에서 공통적으로 사용할 전역 변수를 등록하고 사용할 수 있다. 즉, 모든 DAG이 공유할 수 있는 전역 변수이다. Variable 등록 Airflow UI에서 쉽게 등록할 수 있다. 실제 등록한 Variable의 key, value 값은 메타 DB variable 테이블에 저장된다. Variable 사용 2가지 방법으로 Variable 사용이 가능하다. Variable 라이브러리 이용 Jinja Template 이용 (권고) Variable 라이브러리 이용 라이브러리를 이용해 operator 외부에서 변수를 가져오는 방법이다. i..
· Data/Airflow
Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다. XCom이란 XCom(Cross Communication)이란 airflow dag 안 task 간 데이터 공유를 위해 사용되는 기술이다. 예로 task1의 수행 중 내용이나 결과를 task2에서 사용하거나 입력으로 주고 싶은 경우에 사용한다. - 주로 작은 규모의 데이터 공유를 위해 사용 - XCom 내용은 메타 DB의 xcom 테이블에 저장됨 - 1GB 이상의 대용량 데이터 공유는 외부 솔루션(AWS S3, HDFS 등)을 사용 Bash Operator에서 XCom 사용 Template 변수인 ti(Task Instance)를 사용해 push, pull이 가능하다. Bash operator에서는 env, bash_command 파라미..
· Data/Airflow
Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다.XCom이란XCom(Cross Communication)이란 airflow dag 안 task 간 데이터 공유를 위해 사용되는 기술이다.예로 task1의 수행 중 내용이나 결과를 task2에서 사용하거나 입력으로 주고 싶은 경우에 사용한다. - 주로 작은 규모의 데이터 공유를 위해 사용- XCom 내용은 메타 DB의 xcom 테이블에 저장됨- 1GB 이상의 대용량 데이터 공유는 외부 솔루션(AWS S3, HDFS 등)을 사용Python Operator에서 XCom 사용2가지 방법으로 XCom을 사용할 수 있다.**kwargs에 존재하는 task_instance 객체 활용파이썬 함수의 return 값 활용task_instance 객체 활용P..
· Data/Airflow
Airflow 마스터 클래스 강의 수강, 개인 공부 후 기록합니다. 공식 문서 확인 Python Operator에서 Template 변수 사용에서 어떤 파라미터가 template 변수를 지원하는지 확인했었다. templates_dict, op_args, op_kwargs 파라미터에 template 변수를 사용할 수 있다. Python Operator에서 Macro 사용 Bash Operator에서 Macro를 사용했던 것과 똑같다. @task(task_id='task_using_macro', templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativede..
u_hajin
꽉자바