Jay's Cookbook
Menu
  • Tags
  • Categories
  • Projects
Computer Science
OS
Network
Data Structure
Algorithm
Language
Code Architecture
Python
Javascript
Typescript
Java
Backend
Backend Theory
TypeORM
Node.js
NestJS
FastAPI
Frontend
HTML/CSS
React
Next.js
Data Engineering
DE Theory
MySQL
MongoDB
Elastic
Redis
Kafka
Spark
Airflow
AI
Basic
Pytorch
NLP
Computer Vision
Data Analytics
Statistics
Pandas
Matplotlib
DevOps
Git
Docker
Kubernetes
AWS
Kafka Series [Part9]: Kafka Connector
data_engineering
kafka

Kafka Series [Part9]: Kafka Connector

Jay Kim
Jay Kim 11 Apr 2022
Kafka Series [Part10]: MySQL Connector Kafka Series [Part11]: Python API for Kafka

Table of Contents

  • Why Connector?
  • Kafka Connect
    • Kafka Connect 구성요소
    • Connect? Connector?
    • Standalone과 Distributed Workers
  • Debezium
  • 도커 컴포즈 파일
  • kafka 컨테이너에서 워커 실행 모드 설정
  • kafka 컨테이너에서 커넥터 워커 실행
  • connect 제외한 아무 컨테이너(나의 경우 kafka 컨테이너)에서 REST API를 이용해 커넥터 등록/실행
  • 기타 커넥터 관련 REST API
  • 커넥터와 백엔드(Java Spring)의 관계
  • 참고

Why Connector?

커넥터 없이도 프로듀서 컨슈머 사용 가능

하지만 커넥터를 이용하면 카프카를 사용하면서 발생할 수 있는 장애에 대한 복구를 비롯한 필요한 기능들을 따로 개발할 필요없이 사용가능

Kafka Connect

  • Kafka Connect는 다른 데이터 시스템을 Kafka와 통합하는 과정을 표준화한 프레임워크
  • 통합을 위한 Connector 개발, 배포, 관리를 단순화

Kafka Connect 구성요소

  • Connector: Task를 관리하여 데이터 스트리밍을 조정하는 jar파일
  • Task: 데이터 시스템간의 전송 방법을 구현한 구현체
  • Worker: Connector와 Task를 실행하는 프로세스
  • Converter: 데이터 포맷을 변환하는데 사용하는 구성요소
  • Trasform: 데이터를 변환하는데 사용하는 구성요소

Connect? Connector?

커넥트는 커넥터를 실행시키기 위한 환경(프레임워크)을 제공해줌. 커넥트 위에서 커넥터 설치하고 커넥터(jar파일) 실행하면 됨

커넥트 이미지로 인스턴스 띄우고 거기서 각종 커넥터 다운로드 받아서 커넥터를 몽고db, mysql, s3같은데 RESTapi로 등록

Standalone과 Distributed Workers

Worker 프로세스를 한 개만 띄우는 Standalone 모드와 여러개 실행시키는 Distributed 모드가 있다.

보통 확장성과 내결함성을 이유로 Distributed 모드를 많이 사용한다.

Debezium

Debezium은 변경 데이터 캡처를 위한 오픈 소스 분산 플랫폼이다.

Debezium 에서 변경된 데이터 캡쳐를 위해 mysql의 경우 binlog, postgresql의 경우 replica slot(logical)을 이용하여 데이터베이스에 커밋하는 데이터를 감시하여 Kakfa, DB, ElasticSearch 등 미들웨어에 이벤트를 전달한다

도커 컴포즈 파일

version: '3.2'

services:

  mongodb:
    image: mongo:latest
    hostname: mongodb
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: root
    tty: true
  

  zookeeper:
    image: zookeeper:3.7
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
    volumes:
      - ./data/zookeeper/data:/data
      - ./data/zookeeper/datalog:/datalogco

  kafka:
    image: wurstmeister/kafka
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    tty: true
    volumes:
      - ./data/kafka/data:/tmp/kafka-logs
    depends_on:
      - zookeeper
  
  connect:
    image: confluentinc/cp-kafka-connect:latest.arm64
    hostname: connect1
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: /usr/share/java/,/usr/share/confluent-hub-components/mongodb-kafka-connect-mongodb/lib/
      CONNECT_REST_PORT: 8083
    ports:
      - 18083:8083
    volumes:
      - ./connectors/1:/usr/share/confluent-hub-components
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.7.0
        /etc/confluent/docker/run &
        sleep infinity

  producer:
    build:
      context: ./
      dockerfile: Dockerfile_producer
    stdin_open: true
    tty: true

  consumer:
    build:
      context: ./
      dockerfile: Dockerfile_consumer
    stdin_open: true
    tty: true

volumes:
  mongodb:

kafka 컨테이너에서 워커 실행 모드 설정

cd opt/kafka/config
vi connect-distributed.properties
# connect 컨테이너에서 커넥터(jar파일)가 설치되어 있는 경로 설정
plugin.path=/usr/share/java/,/usr/share/confluent-hub-components/mongodb-kafka-connect-mongodb/lib/

# 컨버터 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

kafka 컨테이너에서 커넥터 워커 실행

./bin/connect-distributed.sh ./config/connect-distributed.properties

connect 제외한 아무 컨테이너(나의 경우 kafka 컨테이너)에서 REST API를 이용해 커넥터 등록/실행

curl -X POST -H'Accept:application/json' -H'Content-Type:application/json' http://connect1:8083/connectors 
  -w "\n"
  -d '{"name": "mongo-sink",
      "config": {
         "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.user": "root",
         "connection.password": "root",
         "connection.uri":"mongodb://root:root@mongodb:27017",
         "database":"quickstart",
         "collection":"topicData",
         "topics":"taxi",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
         }
     }'

기타 커넥터 관련 REST API

# 커넥터 상태 확인(커넥터 등록과 태스크 실행이 RUNNING이면 성공)
curl -X GET http://connect1:8083/connectors/mongo-sink/status

# 커넥터 삭제
curl -X DELETE http://connect1:8083/connectors/mongo-sink

커넥터와 백엔드(Java Spring)의 관계

커넥터가 있으면 알아서 커넥터가 토픽에서 데이터를 가져와 DB로 잘 반영을하는 것 같다.

이런거보면 딱히 스프링부트 같은 걸 이용해서 백엔드 프로그램을 개발하지 않아도 되는 것 같아보인다.

하지만 만약 내가 스프링부트 같은 거를 엄청 잘 알아서 직접 개발하는데 불편함이 없다면 왠만한 것들은 스프링 부트를 이용하고 부분적으로 특정 프로듀서/컨슈머는 커넥터를 사용하는 것이 아마 가장 좋은 방법이 아닐까 라는 생각이 든다.

나는 지금 스프링부트를 모른다. 심지어 자바 언어도 써본 적이 없다. 커넥터는 아예 러닝 커브가 없는 것은 아니지만 훨씬 쉽다.

하지만 백엔드의 중요한 철학들을 공부하는 것은 굉장히 중요해보인다.

결론은 지금 당장 구현이 필요한 부분들은 커넥터로 구현을 하고, 백엔드 공부는 스프링 부트를 통해서 계속 하자.

백엔드 공부를 스프링 부트로 하기로 한 이유는, 내가 사용하고 있는 언어는 파이썬이지만 데이터 엔지니어링 공부에서 자바 언어는 필요해보인다. (데이터 엔지니어링 분야의 관련 오픈 소스들이 자바로 많이 개발됨)

파이썬으로 백엔드를 구현하도록 해주는 장고나 플라스크도 있지만, 아직은 스프링 부트를 사용하는 비중이 더 커보이고 뭔가 공부하는 관점에서는 스프링 부트가 더 도움이 많이 될 것 같다.

참고

  • Confluent 공식문서: Kafka Connect Tutorial on Docker
  • Connect 도커 이미지
  • Confluent 공식문서: MongoDB 커넥터
  • MongoDB 공식문서: MongoDB 커넥터를 위한 Configuration
  • kudl: CDC - debezium 설정
  • Confluent 공식문서: 커넥터 관련 강의
  • Confluent 공식문서: Connect 관련 configuration
  • sup2is: Kafka Connect로 데이터 허브 구축하기
  • 깃허브: mongodb-university/kafka-edu
  • Kafka Connect Deep Dive – Converters and Serialization Explained
  • 정몽실이: 카프카 커넥트 실행
  • Stackoverflow: Connector and Spring Kafka
Kafka Series [Part10]: MySQL Connector Kafka Series [Part11]: Python API for Kafka

You may also like

See all kafka
11 May 2022 Kafka Series [Part11]: Python API for Kafka
data_engineering
kafka

Kafka Series [Part11]: Python API for Kafka

11 Apr 2022 Kafka Series [Part10]: MySQL Connector
data_engineering
kafka

Kafka Series [Part10]: MySQL Connector

31 Jan 2022 Kafka Series [Part8]: Client Won’t Connect to Apache Kafka Cluster in Docker/AWS/My Laptop.[번역]
data_engineering
kafka

Kafka Series [Part8]: Client Won’t Connect to Apache Kafka Cluster in Docker/AWS/My Laptop.[번역]

클라이언트와 카프카간의 메세지를 주고받기 위해서는 두 가지의 연결이 반드시 선행되어야 한다.

Jay Kim

Jay Kim

Web development, data engineering for human for the Earth. I share posts, free resources and inspiration.

Rest
Lifestyle
Hobby
Hobby
Hobby
Hobby
2025 © Jay's Cookbook. Crafted & Designed by Artem Sheludko.