이전 포스팅까지의 구현으로 목표로 했던 구현은 완료했으나 계속해서 개선할 점이 보여 수정한 부분이 있다.Kafka Broker 추가만약 단일 브로커 환경에서 장애가 발생하면 데이터 손실이 발생한다. 실시간 데이터를 다루는 프로젝트에서는 매우 치명적인 결함이다. 따라서 브로커를 추가했다. 브로커는 총 3대이다. Replication factor를 추가해 partition을 복제해두고 리더 장애 발생 시 다시 리더를 선출할 수 있도록 했다. 즉, 한 대의 브로커에서 장애가 발생해도 다른 브로커가 있기 때문에 고가용성을 유지할 수 있다.Topic 정보를 확인해보면 partition은 1개, replication factor는 3으로 설정되어 있다. 현재 해당 파티션에 리더 ID는 2이고 복제본이 1, 2, 3..
Data/Project
Elasticsearch ConnectorElasticsearch connector는 elasticsearch index에 문서 작업을 요청할 수 있는 sink를 제공한다.Elasticsearch 7 사용을 위해 dependency를 추가한다. org.apache.flink flink-sql-connector-elasticsearch7 3.0.1-1.17공식 문서에 data stream에 sink를 붙여 데이터를 elasticsearch에 저장하는 방법이 자세히 쓰여있다.DataStream input = ...;input.sinkTo( new Elasticsearch7SinkBuilder() .setBulkFlushMaxActions(1) // Instructs the ..
데이터 변환, 집계 처리가 필요한 이유PostgreSQL Table 생성 및 데이터 삽입에서는 데이터에 어떠한 처리도 하지 않고 카프카로부터 전달된 데이터를 바로 delivery_information 테이블에 삽입했다. 나머지 테이블은 기존 데이터에 집계 처리를 한 후 테이블에 저장된다.- pay_per_destination (배달 목적지별 음식비, 배달비 지불 총합)- charge_per_day (요일별 배달비 총합)- pay_per_category (음식 카테고리별 음식 가격 총합) 날짜, 배달 목적지를 기준으로 해당 날짜와 배달 목적지에서 발생한 배달의 음식 주문 금액, 배달비 총합 데이터를 저장하는 pay_per_destination 테이블의 예이다.날짜, 목적지가 같은 데이터끼리 음식 주문 금액..
PostgreSQL Table 생성 및 데이터 삽입Flink Job을 사용한 Kafka 스트리밍에서 kafka 메시지를 스트리밍하고 DataStream 타입인 deliveryStream 객체를 얻을 수 있었다. 이제 데이터들을 postgreSQL DB에 저장해야 한다. Java로 코드를 작성하기 때문에 JDBC를 사용해 postgreSQL 테이블에 데이터를 삽입하려고 한다. Flink 공식 문서에 JDBC connector를 사용해 어떻게 스트리밍 데이터를 DB에 저장할 수 있는지 자세히 쓰여있다. 먼저 버전을 맞추어 dependency를 추가한다. org.apache.flink flink-connector-jdbc 3.1.2-1.18버전 호환이 제대로 되지 않아 오류를 겪은 적이 많아..
Flink Job을 사용한 Kafka 메시지 스트리밍데이터 수집에서 파이썬으로 데이터를 생성하고 kafka에 메시지를 publish 했다. 이제 flink로 kafka에서 메시지를 소비하고 변환 및 집계 연산을 수행해야 한다.일단 kafka에서 파이썬으로 생성한 데이터를 그대로 잘 가져오는지 확인해본다. 환경 구축에서 archetype을 flink-quickstart-java로 지정했다면 프로젝트 파일에 아래처럼 스켈레톤 코드가 작성되어 있다.Flink 공식 문서에 kafka connector를 사용해 데이터를 읽는 방법이 자세하게 쓰여있다. At-least-once, At-most-once, Exactly-once 3가지의 이벤트 처리 보장 중 가장 신뢰할 수 있는 exactly-once 방법으로 ka..
데이터 구조Json 형식으로 kafka에 메시지를 publish한다.keyvaluedeliveryIduuiddeliveryDate배달 주문 날짜userId배달 주문 고객 idfoodCategory음식 카테고리foodPrice음식 총 가격paymentMethod결제 방식deliveryDistance배달 거리deliveryDestination배달 목적지destinationLat배달 목적지의 위도destinationLon배달 목적지의 경도deliveryCharge배달 요금 배달 목적지의 위도, 경도를 데이터에 포함하는 이유는 elasticsearch의 지도를 활용하기 위해서다. 배달 목적지에 따른 주문 빈도수를 지도로 한번에 나타내면 좋을 것 같아 위도, 경도를 저장하기로 했다.데이터 생성faker = Fak..
Docker composeFlink를 제외한 서비스들은 전부 docker 컨테이너로 수행된다. 로컬 환경이고, 작은 규모의 데이터를 처리하기 때문에 flink는 standalone 모드로 실행할 예정이나 대규모 데이터 처리, 고가용성이 요구된다면 적합하지 않은 방법이다. docker compose version은 3.8로 지정했다.Zookeeper & Kafka & Control Centerzookeeper: image: confluentinc/cp-zookeeper:7.4.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEE..
시스템 구조 Kafka, flink를 사용해 실시간으로 발생하는 배달 주문 데이터를 처리하고 저장한다. 데이터는 postgres, elasticsearch에 저장되며 저장된 데이터를 kibana에서 시각화한다. 환경 구축 - Docker compose 사용해 컨테이너 구성 - Zookeeper, Kafka broker, Control center, Postgres, Elasticsearch, Kibana 컨테이너 실행 및 관리 데이터 수집 - Python 코드에서 배달 주문 데이터 생성 - Kafka 사용해 메시지 publish - Confluent control center에서 kafka 모니터링 데이터 처리 - Flink job 사용해 배달 주문 내역이 저장된 토픽에서 메시지 소비, 실시간 처리 - ..