Replies: 9 comments 1 reply
-
아직 db와 Kafka Connect 플러그인(Debezium) 연결이 안됨 |
Beta Was this translation helpful? Give feedback.
-
다음 진행 단계
|
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
topic : dbserver1.public.folder_log |
Beta Was this translation helpful? Give feedback.
-
spark가 kafka에 있는 데이터 consumingkafka_df = spark value 컬럼을 문자열로 변환decoded_df = kafka_df.selectExpr("CAST(value AS STRING)").select("value").collect() decoded_df를 print 했을 떄[ Row(value='{"before":null,"after":{"id":2,"path":"/path/to/folder1","created_at":1726144496000000},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1727077140520,"snapshot":"false","db":"inventory","sequence":"[null,\"26837280\"]","schema":"public","table":"folder_log","txId":742,"lsn":26837280,"xmin":null},"op":"c","ts_ms":1727077140906,"transaction":null}'), Row(value='{"before":null,"after":{"id":3,"path":"/version3","created_at":1726144496000000},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1727077772859,"snapshot":"false","db":"inventory","sequence":"[\"26837704\",\"26838096\"]","schema":"public","table":"folder_log","txId":743,"lsn":26838096,"xmin":null},"op":"c","ts_ms":1727077773153,"transaction":null}'), Row(value='{"before":null,"after":{"id":4,"path":"/version4","created_at":1726922096000000},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1727142393377,"snapshot":"false","db":"inventory","sequence":"[\"26838584\",\"26839152\"]","schema":"public","table":"folder_log","txId":744,"lsn":26839152,"xmin":null},"op":"c","ts_ms":1727142393583,"transaction":null}') |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
배치 작업에서 CDC를 사용하는 이유배치 작업에서 Kafka DB CDC(Change Data Capture)를 사용하는 경우는 주로 대량의 데이터 변경 사항을 실시간으로 처리하고, 데이터 일관성을 유지하며, 시스템 간의 데이터 동기화를 효율적으로 수행하기 위해서입니다. 다음과 같은 주요 이유와 사례가 있습니다: Kafka DB CDC의 사용 사례
Kafka DB CDC의 장점
배치 작업에서 Kafka DB CDC를 활용하면 실시간 데이터 처리와 시스템 간 데이터 일관성 유지가 가능해져, 다양한 비즈니스 요구사항에 신속하게 대응할 수 있는 유연한 데이터 아키텍처를 구축할 수 있습니다. Citations: |
Beta Was this translation helpful? Give feedback.
-
Debezium이 db상태를 확인하기 위한 설정값변경( 수정 및 추가 설명)# 변경사항
# postgres.conf
listen_addresses = '*'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4 추가설명
max_wal_senders:
max_replication_slots:
|
Beta Was this translation helpful? Give feedback.
-
db의 변동사항을 감지하여 카프카 프로듀싱
DB환경: postgres - HA Proxy로 환경구성
postgres - HA Proxy 설정값
기존 kafka-cluster(kraft mode)
docker-compose.yaml
각 인스턴스의
docker-compose.yaml
파일에 Kafka Connect 서비스를 추가하고, Debezium PostgreSQL 커넥터를 설정1. Kafka 클러스터 상태 확인
브로커 상태 확인
각 인스턴스에서 Kafka 브로커가 제대로 실행 중인지 확인하려면, 각 서버에서 아래 명령어로 Kafka 주제(topic) 목록을 조회할 수 있습니다:
docker exec -it kafka-1 kafka-topics --bootstrap-server 10.178.0.5:9092 --list
각 인스턴스에서 실행하며, Kafka 브로커가 제대로 클러스터에 연결되어 있으면 동일한 주제 목록이 반환되어야 합니다.
Kafka 클러스터 메타데이터 확인
클러스터에 연결된 모든 브로커들을 확인하려면, 다음 명령을 실행합니다:
docker exec -it kafka-1 kafka-broker-api-versions --bootstrap-server 10.178.0.5:9092
이 명령을 통해 클러스터에 연결된 각 브로커의 ID와 메타데이터를 확인할 수 있습니다.
2. Kafka Connect 상태 확인
Kafka Connect REST API 확인
각 서버의 Kafka Connect가 제대로 작동하는지 확인하려면,
curl
명령어를 사용하여 REST API를 호출할 수 있습니다:curl http://10.178.0.5:8083/ | jq
이 명령을 실행했을 때, Kafka Connect의 상태 정보(JSON 형식)가 반환되면 정상적으로 작동하는 것입니다.
Kafka Connect 플러그인 설치 확인
Kafka Connect에 설치된 Debezium 플러그인 같은 커넥터가 제대로 설치되었는지 확인하려면, 다음 명령어를 실행해 Kafka Connect에 등록된 커넥터를 확인할 수 있습니다:
curl http://10.178.0.5:8083/connector-plugins | jq
여기서 PostgreSQL 커넥터와 같은 Debezium 커넥터가 표시되면 설치가 잘 된 것입니다.
Beta Was this translation helpful? Give feedback.
All reactions