데이터 변환, 집계 처리가 필요한 이유
PostgreSQL Table 생성 및 데이터 삽입에서는 데이터에 어떠한 처리도 하지 않고 카프카로부터 전달된 데이터를 바로 delivery_information 테이블에 삽입했다.
나머지 테이블은 기존 데이터에 집계 처리를 한 후 테이블에 저장된다.
- pay_per_destination (배달 목적지별 음식비, 배달비 지불 총합)
- charge_per_day (요일별 배달비 총합)
- pay_per_category (음식 카테고리별 음식 가격 총합)
날짜, 배달 목적지를 기준으로 해당 날짜와 배달 목적지에서 발생한 배달의 음식 주문 금액, 배달비 총합 데이터를 저장하는 pay_per_destination 테이블의 예이다.
날짜, 목적지가 같은 데이터끼리 음식 주문 금액, 배달비를 더한 후 테이블에 삽입한다. 실시간 데이터이기 때문에 계속해서 새로운 데이터가 들어오면 값이 누적되어야 한다. 중요한 것은 집계 연산이 적용되는 데이터를 그룹화하는데 필요한 기준이 2개라는 것이다.
delivery_date, delivery_destination 값이 하나라도 다르면 완전히 다른 그룹에 속해야 한다.
(2024-03-15, 서울특별시 광진구), (2024-03-16, 서울특별시 광진구) 값을 가지는 원본 데이터는
delivery_destination 값은 같지만 delivery_date가 다르므로 같은 그룹으로 묶일 수 없다.
이 부분이 꽤나 고민스러웠다. delivery_date는 저장하지 않고 delivery_destination만을 기준으로 집계 연산을 적용한다면 구현은 쉬울테지만 저 테이블이 데이터로써 가치가 있을까 싶었다.
극단적으로 생각해서 데이터 수집이 영원히 계속됐을 때 날짜는 없고 특정 목적지에서 음식 주문비, 배달비로 얼마나 지불하는지만 계속해서 쌓이는 것이지 언제부터 언제까지 수집된 데이터인지, 월별 차이는 없는지 등 알기 어렵고 인사이트를 도출해내기 어렵다는 생각이 들었다. 물론 날짜를 추가한다고 해서 저 테이블이 좋은 인사이트를 도출해낼 수 있나라는 질문에는 명확히 답변은 어렵지만... 특정 기간, 특정 목적지에서 음식 주문 금액, 배달비가 유독 적다면 배달비가 적은 해당 지역 가게를 상단에 노출해 주문을 유도하거나 등등... 그런데 만약 데이터 수집 기간에 따라 db파일을 분리한다고 하면 delivery_date가 필요하지 않을 것 같다. 이 부분을 고민하다가 시계열 데이터베이스에 대해 공부하게 됐는데 이 부분은 따로 포스팅할 예정이다.
Flink Operators
데이터 스트림에 연산을 적용하기 위해 flink operator 공식 문서를 확인한다.
공식 문서에 따르면 operator는 데이터 스트림을 새로운 데이터 스트림으로 변환한다.
원래의 데이터 스트림 deliveryStream에 연산을 적용해서 새로운 데이터 스트림으로 변환하고 그에 sink를 붙여서 테이블에 삽입하면 된다.
Map
데이터 스트림의 한 요소를 하나의 요소로 변환한다. Map은 DataStream을 새로운 DataStream으로 변환한다.
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
Integer로 이루어진 데이터 스트림의 한 요소에 2를 곱한 후 반환하는 예시다.
FlatMap과는 차이가 있다. FlatMap 연산은 한 요소를 여러 개의 요소로도 반환할 수 있다. 문자열 데이터로 이루어진 데이터 스트림의 한 문자열을 단어로 분할하고 반환하는 것이 그 예이다.
Delivery 클래스의 객체에서 필요한 정보를 이용해 PayPerDestination 객체를 생성하고 반환할 것이므로 map 연산을 사용하면 된다.
public class PayPerDestination {
private Date deliveryDate;
private String deliveryDestination;
private BigDecimal totalFoodPrice;
private BigDecimal totalDeliveryCharge;
}
deliveryStream.map(
delivery -> {
Date deliveryDate = Date.valueOf(delivery.getDeliveryDate().toLocalDateTime().toLocalDate());
StringTokenizer tokens = new StringTokenizer(delivery.getDeliveryDestination(), " ");
String deliveryDestination = tokens.nextToken() + " " + tokens.nextToken();
BigDecimal foodPrice = BigDecimal.valueOf(delivery.getFoodPrice());
BigDecimal deliveryCharge = BigDecimal.valueOf(delivery.getDeliveryCharge());
return new PayPerDestination(deliveryDate, deliveryDestination, foodPrice, deliveryCharge);
}
)
deliveryStream에 map 연산을 통해 PayPerDestination 객체를 생성하고 반환한다.
KeyBy
스트림을 서로 분리된 파티션으로 분할한다. 동일한 키를 가진 레코드는 동일한 파티션으로 할당된다. KeyBy는 DataStream을 KeyedStream으로 변환한다.
날짜, 배달 목적지가 같은 데이터끼리 묶기 위해서는 keyBy 연산이 필요하다. SQL 쿼리에서 GROUP BY와 같다. 이 부분에서 조금 헤맸는데 공식 문서를 더 찾아보니 두가지 기준으로 데이터를 그룹화하기 위해서는 tuple을 사용하면 된다고 쓰여져 있다. 구글링을 조금 더 해보고 해결할 수 있었다.
map(
...
).keyBy(new KeySelector<PayPerDestination, Tuple2<Date, String>>() {
@Override
public Tuple2<Date, String> getKey(PayPerDestination payPerDestination) throws Exception {
return Tuple2.of(payPerDestination.getDeliveryDate(), payPerDestination.getDeliveryDestination());
}
})
KeySelector를 사용해 데이터 스트림의 각 요소에서 키를 추출한다.
Date 타입인 배달 날짜 String 타입인 배달 목적지를 기준으로 키를 생성한다.
Reduce
KeyBy 연산으로 변환된 KeyedStream에 reduce 연산을 적용해 같은 그룹의 데이터끼리 음식비, 배달비를 집계한다. Reduce는 KeyedStream을 DataStream으로 변환한다.
keyBy(
...
)
.reduce((t1, payPerDestination) -> {
t1.setTotalFoodPrice(payPerDestination.getTotalFoodPrice().add(t1.getTotalFoodPrice()));
t1.setTotalDeliveryCharge(payPerDestination.getTotalDeliveryCharge().add(t1.getTotalDeliveryCharge()));
return t1;
})
t1, payPerDestination 모두 PayPerDestination 클래스의 인스턴스이며, t1과 payPerDestination의 totalFoodPrice, totalDeliveryCharge 값을 각각 더한다.
pay_per_destination table 데이터 삽입
reduce(
...
).addSink(JdbcSink.sink(
"INSERT INTO pay_per_destination (delivery_date, delivery_destination, total_food_price, total_delivery_charge) " +
"VALUES (?, ?, ?, ?) " +
"ON CONFLICT (delivery_date, delivery_destination) DO UPDATE SET " +
"total_food_price = EXCLUDED.total_food_price, " +
"total_delivery_charge = EXCLUDED.total_delivery_charge",
(JdbcStatementBuilder<PayPerDestination>) (preparedStatement, payPerDestination) -> {
preparedStatement.setDate(1, payPerDestination.getDeliveryDate());
preparedStatement.setString(2, payPerDestination.getDeliveryDestination());
preparedStatement.setBigDecimal(3, payPerDestination.getTotalFoodPrice());
preparedStatement.setBigDecimal(4, payPerDestination.getTotalDeliveryCharge());
},
executionOptions,
connectionOptions
)).name("Insert into pay_per_destination table");
Reduce 연산 후 변환된 DataStream에 jdbc sink를 붙여 데이터를 삽입한다.
Upsert 쿼리를 사용해 pay_per_destination에 데이터를 삽입하는데 delivery_date, delivery_destination이 중복될 경우 새로운 누적 금액으로 업데이트한다.
Flink job을 실행하고 DB 테이블을 조회하면
금액이 누적되어 삽입되는 것을 확인할 수 있다. 계속해서 데이터가 생성되면 누적 값이 실시간으로 업데이트된다.
charge_per_day table 데이터 삽입
month(월), day(요일)를 기준으로 데이터를 그룹화하고 배달비 총합을 charge_per_day 테이블에 삽입한다.
public class ChargePerDay {
private int month;
private String day;
private BigDecimal totalDeliveryCharge;
}
Delivery 클래스의 객체에서 필요한 정보를 이용해 ChargePerDay 클래스 객체를 생성하고 keyBy, reduce 연산을 거친다. 이후 jdbc sink를 붙여 데이터를 테이블에 삽입한다.
addSink(JdbcSink.sink(
"INSERT INTO charge_per_day (month, day, total_delivery_charge) " +
"VALUES (?, ?, ?) " +
"ON CONFLICT (month, day) DO UPDATE SET " +
"total_delivery_charge = EXCLUDED.total_delivery_charge",
(JdbcStatementBuilder<ChargePerDay>) (preparedStatement, chargePerDay) -> {
preparedStatement.setInt(1, chargePerDay.getMonth());
preparedStatement.setString(2, chargePerDay.getDay());
preparedStatement.setBigDecimal(3, chargePerDay.getTotalDeliveryCharge());
},
executionOptions,
connectionOptions
)).name("Insert into charge_per_day table");
pay_per_category table 데이터 삽입
delivery_date, food_category를 기준으로 데이터를 그룹화하고 카테고리별 음식비 총합을 pay_per_category 테이블에 삽입한다.
public class PayPerCategory {
private Date deliveryDate;
private String foodCategory;
private BigDecimal totalFoodPrice;
}
Delivery 클래스의 객체에서 필요한 정보를 이용해 PayPerCategory 클래스 객체를 생성하고 keyBy, reduce 연산을 거친다. 이후 jdbc sink를 붙여 데이터를 테이블에 삽입한다.
addSink(JdbcSink.sink(
"INSERT INTO pay_per_category (delivery_date, food_category, total_food_price) " +
"VALUES(?, ?, ?) " +
"ON CONFLICT (delivery_date, food_category) DO UPDATE SET " +
"total_food_price = EXCLUDED.total_food_price",
(JdbcStatementBuilder<PayPerCategory>) (preparedStatement, payPerCategory) -> {
preparedStatement.setDate(1, payPerCategory.getDeliveryDate());
preparedStatement.setString(2, payPerCategory.getFoodCategory());
preparedStatement.setBigDecimal(3, payPerCategory.getTotalFoodPrice());
},
executionOptions,
connectionOptions
)).name("Insert into pay_per_category");
Github
'Data > Project' 카테고리의 다른 글
Project | 실시간 배달 주문 데이터 처리 - Kafka Broker 추가 (0) | 2024.05.29 |
---|---|
Project | 실시간 배달 주문 데이터 처리 - Elasticsearch, Kibana를 이용한 데이터 시각화 (0) | 2024.05.06 |
Project | 실시간 배달 주문 데이터 처리 - PostgreSQL Table 생성 및 데이터 삽입 (0) | 2024.05.03 |
Project | 실시간 배달 주문 데이터 처리 - Flink Job을 사용한 Kafka 스트리밍 (0) | 2024.03.27 |
Project | 실시간 배달 주문 데이터 처리 - 데이터 수집 (2) | 2024.03.18 |