Docker compose
Flink를 제외한 서비스들은 전부 docker 컨테이너로 수행된다. 로컬 환경이고, 작은 규모의 데이터를 처리하기 때문에 flink는 standalone 모드로 실행할 예정이나 대규모 데이터 처리, 고가용성이 요구된다면 적합하지 않은 방법이다.
docker compose version은 3.8로 지정했다.
Zookeeper & Kafka & Control Center
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: [ "CMD", "bash", "-c", "echo 'ruok' | nc localhost 2181" ]
interval: 10s
timeout: 5s
retries: 5
ZOOKEEPER_TICK_TIME : 클러스터를 구성할 때 동기화를 위한 기본 tick time이며, millisecond 단위이므로 2초이다.
Kafka 단일 브로커이기 때문에 멀티 브로커에 유효한 환경 변수를 설정하지 않았다.
broker:
image: confluentinc/cp-kafka:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
서비스 우선 순위 지정을 위해 depends_on에 zookeeper를 지정한다.
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.3
hostname: control-center
container_name: control-center
depends_on:
- broker
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Control center는 Confluent가 제공하는 kafka 클러스터 관리 및 모니터링 도구이다.
Schema Registry를 사용해 전송 메시지 구조, 형식을 지정해 더 안전하게 메시지를 처리할 수 있지만 해당 프로젝트는 데이터 구조가 단순하기 때문에 사용하지 않았다.
PostgreSQL
postgres:
image: postgres:latest
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- ./pg_data:/var/lib/postgresql/data
healthcheck:
test: [ "CMD", "pg_isready", "-U", "postgres" ]
interval: 10s
timeout: 5s
retries: 5
Elasticsearch & Kibana
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1
container_name: es-container
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ports:
- "9200:9200"
volumes:
- ./elasticsearch/data:/usr/share/elasticsearch/data
서비스 우선 순위 지정을 위해 kibana의 depends_on에 elasticsearch를 지정한다.
Flink
Flink 1.18.0 버전을 사용한다. 공식 홈페이지에서 다운로드 받을 수 있다.
프로젝트를 진행할 때 공식 문서도 1.18 버전에 맞추어 읽어야 한다.
Python
파이썬 3.9 버전을 사용한다.
Java
openjdk 11 버전을 사용한다. Flink의 공식 문서를 살펴보면 java 버전에 대한 설명이 있어 참고했다. Java 8은 사용이 불가하다.
Maven으로 프로젝트를 빌드하며, Archetype에 flink-quickstart-java를 지정하면 flink data stream을 위한 스켈레톤 코드가 알아서 추가되어 있다.
Github
'Data > Project' 카테고리의 다른 글
Project | 실시간 배달 주문 데이터 처리 - Flink Map, KeyBy, Reduce를 이용한 데이터 변환, 집계 처리 (2) | 2024.05.03 |
---|---|
Project | 실시간 배달 주문 데이터 처리 - PostgreSQL Table 생성 및 데이터 삽입 (0) | 2024.05.03 |
Project | 실시간 배달 주문 데이터 처리 - Flink Job을 사용한 Kafka 스트리밍 (0) | 2024.03.27 |
Project | 실시간 배달 주문 데이터 처리 - 데이터 수집 (2) | 2024.03.18 |
Project | 실시간 배달 주문 데이터 처리 - 시스템 구조 및 설명 (0) | 2024.03.18 |