Elasticsearch Connector
Elasticsearch connector는 elasticsearch index에 문서 작업을 요청할 수 있는 sink를 제공한다.
Elasticsearch 7 사용을 위해 dependency를 추가한다.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7</artifactId>
<version>3.0.1-1.17</version>
</dependency>
공식 문서에 data stream에 sink를 붙여 데이터를 elasticsearch에 저장하는 방법이 자세히 쓰여있다.
DataStream<String> input = ...;
input.sinkTo(
new Elasticsearch7SinkBuilder<String>()
.setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)))
.build());
private static IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.id(element)
.source(json);
}
setHosts를 통해 Elasticsearch 클러스터의 hostname, port를 설정한다.
Emitter는 데이터 요소를 elasticsearch에 저장하기 위한 action으로 변환하는 역할을 담당한다.
element는 데이터 요소이므로 Delivery 클래스 객체가 된다. Context는 데이터 처리에 대한 추가 컨텍스트를 제공하는데 사용되고 타임스탬프, 워터마크 등을 설정할 수 있다.
Index request를 생성할 때 예시에서는 HashMap에 데이터 요소를 put하고 source에 설정했다. 프로젝트에서는 지도 시각화를 사용하기 위해 Delivery 클래스 객체 구조를 변경하고 json으로 변환하는 클래스를 따로 생성할 것이다.
Kibana 지도 사용을 위한 설정
필드의 타입이 geo_point이면 kibana에서 읽어와 지도 위에 데이터를 시각화할 수 있다. 공식 문서에서는 6가지의 방법을 소개하고 있다.
PUT my-index-000001
{
"mappings": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}
PUT my-index-000001/_doc/3
{
"text": "Geopoint as an object with 'lat' and 'lon' keys",
"location": {
"lat": 41.12,
"lon": -71.34
}
}
그중 location 필드의 타입을 geo_point로 설정하고, location 안에 lat, lon 필드가 중첩되는 형태를 선택했다.
만약 location 필드 타입을 매핑하지 않고 아래처럼 해버리면
PUT test
PUT test/_doc/1
{
"location" : {
"lat": 41.12,
"lon": -71.34
}
}
지도에 layer를 아예 추가할 수 없으니 location 타입이 꼭 geo_point로 설정되어 있어야 한다.
배달 주문 정보에는 destinationLat, destinationLon 데이터(배달 목적지 위도, 경도)가 존재한다. 이 값을 사용해 어떤 지역에서 배달 주문이 발생했는지 지도로 한눈에 확인할 수 있도록 하는 것이 목적이다.
만약 지도를 사용하지 않았다면 단순히 ObjectMapper로 자바 객체를 json 문자열로 변환해 source에 설정하면 된다.
그런데 기존 배달 주문 정보는 destinationLat, destinationLon이 location 안으로 중첩되어 있지 않기 때문에 노드를 구성해 구조를 바꿔주어야 한다.
// public static String convertDeliveryDataToJson(Delivery delivery)
ObjectNode rootNode = OBJECT_MAPPER.createObjectNode();
rootNode.put("deliveryId", delivery.getDeliveryId());
rootNode.put("deliveryDate", delivery.getDeliveryDate().toString());
rootNode.put("userId", delivery.getUserId());
rootNode.put("foodCategory", delivery.getFoodCategory());
rootNode.put("foodPrice", delivery.getFoodPrice());
rootNode.put("paymentMethod", delivery.getPaymentMethod());
rootNode.put("deliveryDistance", delivery.getDeliveryDistance());
rootNode.put("deliveryDestination", delivery.getDeliveryDestination());
rootNode.put("deliveryCharge", delivery.getDeliveryCharge());
ObjectNode locationNode = OBJECT_MAPPER.createObjectNode();
locationNode.put("lat", delivery.getDestinationLat());
locationNode.put("lon", delivery.getDestinationLon());
rootNode.set("location", locationNode);
return OBJECT_MAPPER.writeValueAsString(rootNode);
Elasticsearch Sink
이제 data stream에 sink를 붙여 데이터를 elasticsearch에 저장할 수 있다.
deliveryStream.sinkTo(
new Elasticsearch7SinkBuilder<Delivery>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((delivery, context, indexer) -> {
String json = convertDeliveryDataToJson(delivery);
IndexRequest indexRequest = Requests.indexRequest()
.index("delivery")
.id(delivery.getDeliveryId())
.source(json, XContentType.JSON);
indexer.add(indexRequest);
}).build()
).name("Elasticsearch sink");
인덱스명을 delivery로 설정하고 여러 가지 설정 후 빌드하면 된다.
Flink job을 실행해보면 elasticsearch sink가 제대로 붙어있는 것을 확인할 수 있다.
Console을 사용해 데이터가 잘 저장, 조회되는지 확인한다.
바꿔준 구조대로 저장되는 것을 확인할 수 있다.
데이터 시각화
Map
Kibana 메뉴에서 Maps를 선택해 지도 위에 데이터를 시각화할 수 있다.
필드 타입이 geo_point면 data view를 추가했을 때 해당 필드를 선택할 수 있다.
단순히 위도, 경도 값을 가진 데이터를 점으로 찍어줄 수도 있고,
Heat map으로 배달 주문이 많은 지역을 한눈에 확인할 수 있다. 실시간으로 계속해서 업데이트된다.
이외에도 여러 가지 기법으로 지도 위에 데이터를 찍어줄 수 있다.
Dashboard
지도에서는 위도, 경도 정보만 사용했고, 다른 값들을 사용해 dashboard에서 시각화할 수 있다.
여러 가지 정보들을 조합하면서 실시간 데이터를 시각화해봤다. 데이터가 계속해서 들어오면 추가적인 동작을 하지 않아도 실시간으로 시각화 정보도 알아서 업데이트된다.
Github
'Data > Project' 카테고리의 다른 글
Project | 실시간 배달 주문 데이터 처리 - Kafka Broker 추가 (0) | 2024.05.29 |
---|---|
Project | 실시간 배달 주문 데이터 처리 - Flink Map, KeyBy, Reduce를 이용한 데이터 변환, 집계 처리 (2) | 2024.05.03 |
Project | 실시간 배달 주문 데이터 처리 - PostgreSQL Table 생성 및 데이터 삽입 (0) | 2024.05.03 |
Project | 실시간 배달 주문 데이터 처리 - Flink Job을 사용한 Kafka 스트리밍 (0) | 2024.03.27 |
Project | 실시간 배달 주문 데이터 처리 - 데이터 수집 (2) | 2024.03.18 |