PostgreSQL Table 생성 및 데이터 삽입
Flink Job을 사용한 Kafka 스트리밍에서 kafka 메시지를 스트리밍하고 DataStream 타입인 deliveryStream 객체를 얻을 수 있었다. 이제 데이터들을 postgreSQL DB에 저장해야 한다. Java로 코드를 작성하기 때문에 JDBC를 사용해 postgreSQL 테이블에 데이터를 삽입하려고 한다.
Flink 공식 문서에 JDBC connector를 사용해 어떻게 스트리밍 데이터를 DB에 저장할 수 있는지 자세히 쓰여있다.
먼저 버전을 맞추어 dependency를 추가한다.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.2-1.18</version>
</dependency>
버전 호환이 제대로 되지 않아 오류를 겪은 적이 많아서 새로운 라이브러리를 추가할 때 너무 무섭다.. 다행히 Flink 공식 문서에 버전이 명시되어 있다.
총 4개의 테이블을 생성할 것이다.
- delivery_information (Delivery 전체 정보)
- pay_per_destination (서울특별시 xx구별 음식 가격, 배달비 결제 금액 총합)
- charge_per_day (요일별 배달비 총합)
- pay_per_category (음식 카테고리별 음식 가격 총합)
사실 delivery_information 테이블만 있으면 쿼리 실행을 통해 나머지 3개 테이블과 동일한 결과를 얻을 수 있다. 그럼에도 나머지 3개의 테이블을 생성하는 이유는 데이터 스트림에 map, reduce 등 여러가지 처리 및 집계 연산을 적용해보기 위해서다.
JDBC 설정
JDBC sink를 사용한다. 공식 문서에 따르면 JDBC sink는 at least once를 보장하나 upsert를 사용하면 exactly once 처리 수준과 동일해진다고 한다. at least once는 중복이 발생할 가능성이 있는데 upsert를 사용하면 동일한 데이터를 DB에 여러번 삽입, 업데이트하더라도 일관된 결과를 얻을 수 있기 때문인 것 같다.
JDBC Sink의 기본적인 사용 방법이다.
JdbcSink.sink(
sqlDmlStatement, // mandatory
jdbcStatementBuilder, // mandatory
jdbcExecutionOptions, // optional
jdbcConnectionOptions // mandatory
);
sqlDmlStatement는 실행하려고 하는 쿼리이다.
jdbcStatementBuilder는 prepared statement를 빌드하는 역할을 한다. 아래처럼 사용한다.
delivery_information 테이블에 데이터를 삽입할 때는 Delivery 객체를 사용하면 된다. 집계 연산을 거친 데이터들이 저장되는 나머지 테이블들은 새로 클래스를 정의하고 그에 해당하는 객체를 사용할 예정이다.
"insert into books (id, title, authors, year) values (?, ?, ?, ?)",
(statement, book) -> {
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);
},
jdbcExecutionOptions, jdbcConnectionOptions를 먼저 설정한다.
private static final String JDBC_URL = "jdbc:postgresql://localhost:5432/postgres";
private static final String USER_NAME = "postgres";
private static final String PASSWORD = "postgres";
docker compose에 작성한 port, user, password를 가지고 postgreSQL DB와 연결한다.
JdbcExecutionOptions executionOptions = new JdbcExecutionOptions.Builder()
.withBatchSize(1000) // default = 5000
.withBatchIntervalMs(200) // default = 0
.withMaxRetries(5) // default = 3
.build();
JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(JDBC_URL)
.withDriverName("org.postgresql.Driver")
.withUsername(USER_NAME)
.withPassword(PASSWORD)
.build();
Table 생성
4개의 테이블 생성 로직은 쿼리만 다르고 나머지 설정은 전부 동일하므로 for문을 통해 sink를 붙여준다.
for (int i = 0; i < createTableStatements.length; i++) {
deliveryStream.addSink(JdbcSink.sink(
createTableStatements[i],
(JdbcStatementBuilder<Delivery>) (preparedStatement, delivery) -> {
},
executionOptions,
connectionOptions
)).name(sinkName[i]);
}
createTableStatements 배열에 테이블 생성 쿼리가 저장되어 있다.
delivery_information 테이블은 Delivery 클래스 멤버 변수명을 컬럼명으로 가지며 배달 주문의 전체 정보를 가지고 있다.
Flink job을 실행하고 DB에 접속해서 조회해보면
코드에 명시한 대로 테이블이 잘 생성된다.
delivery_information table 데이터 삽입
INSERT INTO delivery_information (delivery_id, delivery_date, user_id, food_category, food_price,
payment_method, delivery_distance, delivery_destination, destination_lat, destination_lon, delivery_charge)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (delivery_id) DO UPDATE SET
delivery_date = EXCLUDED.delivery_date,
user_id = EXCLUDED.user_id,
food_category = EXCLUDED.food_category,
food_price = EXCLUDED.food_price,
payment_method = EXCLUDED.payment_method,
delivery_distance = EXCLUDED.delivery_distance,
delivery_destination = EXCLUDED.delivery_destination,
destination_lon = EXCLUDED.destination_lon,
delivery_charge = EXCLUDED.delivery_charge
Upsert 쿼리를 사용해 데이터를 삽입한다.
delivery_id는 기본키이며 파이썬 코드에서 faker 라이브러리의 uuid4()를 통해 생성되는데 중복될 가능성이 매우 희박하지만 혹시 모를 오류를 피하기 위해 delivery_id가 중복될 경우 update, 아니라면 insert 하도록 했다.
(JdbcStatementBuilder<Delivery>) (preparedStatement, delivery) -> {
preparedStatement.setString(1, delivery.getDeliveryId());
preparedStatement.setTimestamp(2, delivery.getDeliveryDate());
preparedStatement.setString(3, delivery.getUserId());
preparedStatement.setString(4, delivery.getFoodCategory());
preparedStatement.setDouble(5, delivery.getFoodPrice());
preparedStatement.setString(6, delivery.getPaymentMethod());
preparedStatement.setDouble(7, delivery.getDeliveryDistance());
preparedStatement.setString(8, delivery.getDeliveryDestination());
preparedStatement.setDouble(9, delivery.getDestinationLat());
preparedStatement.setDouble(10, delivery.getDestinationLon());
preparedStatement.setInt(11, delivery.getDeliveryCharge());
}
preparedStatement를 delivery 객체로 업데이트한다.
Flink job을 실행하고 DB 테이블을 조회하면
데이터가 잘 삽입되는 것을 확인할 수 있다. 파이썬 코드가 계속 실행되고 데이터가 생성되면 계속해서 테이블에 새로운 값이 삽입된다.
Github
'Data > Project' 카테고리의 다른 글
Project | 실시간 배달 주문 데이터 처리 - Elasticsearch, Kibana를 이용한 데이터 시각화 (0) | 2024.05.06 |
---|---|
Project | 실시간 배달 주문 데이터 처리 - Flink Map, KeyBy, Reduce를 이용한 데이터 변환, 집계 처리 (2) | 2024.05.03 |
Project | 실시간 배달 주문 데이터 처리 - Flink Job을 사용한 Kafka 스트리밍 (0) | 2024.03.27 |
Project | 실시간 배달 주문 데이터 처리 - 데이터 수집 (2) | 2024.03.18 |
Project | 실시간 배달 주문 데이터 처리 - 환경 구축 (2) | 2024.03.18 |