Apache Kafka 소개

Apache Kafka 관련 사항 소개글

Posted by Start Bootstrap on April 16, 2021

Message Queue란

서로 다른 프로세스는 데이터를 주고 받을 때 Message를 사용합니다. Message를 전송해서 통신하는 방식으로 데이터를 주고 받을 수 있습니다. Queue는 일련의 시작 부분에서 시작하여 순차적으로 그것을 처리할 수 있도록 기다리게 하는 선형 자료구죠 입니다. 따라서 Message Queue는 Message를 프로세스 간에 전송되는 Queue 입니다.

일반적으로 Message Queue는 일반적으로 Queue에 넣고 응답을 계속 기다리거나 바로 처리하도록 하지 않는 특징이 있습니다. 이러한 점 때문에 대기열에서 요청이 올때까지 하염없이 기다리지 않아도 됩니다. 또한 특정 프로세스가 처리할 동안 다른 프로세스는 그 처리를 기다리지 않아도 됩니다. 그래서 일반적으로 생성자와 소비자를 두어서 이러한 특징을 유지할 수 있도록 합니다. 좀더 자세히 이야기하면 생산자가 메시지를 대기열에 게시합니다. 지정된 시간에 소비자는 대기열의 메시지를 시작하고 처리합니다. 대기중인 메시지는 저장 및 전달이 가능하며 처리 될 때까지 메시지를 다시 전달할 수 있습니다. 이러한 방식을 통해 각각의 프로세스는 본인의 역할만 충실하게 되고 다른 프로세스의 상태나 역할을 신경쓰지 않아도 됩니다.

Event Streaming란

이벤트 스트리밍은 각종 소프트웨어 애플리케이션 등에서 이벤트 소스에서 이벤트 스트림 형태로 실시간으로 데이터를 캡처하는 방식입니다. 일반적으로 배치 처리와 비교되는 것으로 보면 쉽게 알 수 있습니다. 이러한 일괄 처리 방식과 달리 이벤트 스트림을 실시간 및 소급 적으로 처리 및 반응합니다. 이벤트 스트림의 가장 큰 장점은 지속적인 데이터 처리를 통해 비교적 적은 대기 시간으로 데이터를 적절한 시간에 처리할 수 있습니다.

Apache Kafka란

Kafka는 Event Stream의 기능을 제공하는 Message Queue 입니다. 일반적으로 Topic이라고 부르는 해당 채널을 통해서 게시(Write)하고 구독(Read)하는 기능을 제공합니다. 이 모든 기능은 분산되고 확장성을 가진 채로 제공됩니다. 이 기능을 제공하기 위해 Kafka는 고성능 TCP 네트워크 프로토콜을 통해 통신하는 서버와 클라이언트로 구성되어 있습니다. Kafka는 여러 데이터 센터 또는 클라우드 지역에 걸쳐있을 수있는 하나 이상의 서버 클러스터로 실행됩니다. 이러한 서버 중 일부는 브로커라고하는 스토리지 계층을 형성합니다. 이러한 기능을 통해 서버 중 하나에 장애가 발생하면 다른 서버가 작업을 대신하여 데이터 손실없이 지속적인 운영을 보장합니다. 일반적인 분산 아키텍쳐와 같이 서버가 장애가 있을 거라 믿고 대비하는 방식입니다.

Kafka에서 전송하는 Message는 기본적으로 Key, Value, Timestamp 형식으로 되어 있습니다. 일반적으로 Topic에 해당하는 Queue에 Event로 발행이 됩니다. Topic에서 데이터가 분산 배치되어 있는데 확장성을 고려한 듯한 느낌입니다.

Kafka 실행

Kafka의 설치는 Windows를 기준으로 Download를 통해서 쉽게 설치할 수 있습니다. 다만 실행은 신경 써야하는 부분이 있습니다. kafka 실행 이전에 반드시 Apache Zookeeper를 실행해야 합니다. Zookeeper는 Service Discovery 프로그램 중 하나로 서버의 상태를 기록하고 정보를 전달하는 역할을 담당합니다.

Demo Image Start Zookeeper

Windows에서는 bin\windows\zookeeper-server-start.bat config\zookeeper.properties 명령어를 통해서 실행할 수 있습니다. Zookeeper가 실행된 것이 확인되면 그 다음에 Kafka 서버를 실행할 수 있습니다.

Demo Image Start Kafka

Windows에서는 bin\windows\kafka-server-start.bat config\server.properties 명령어를 통해서 실행할 수 있습니다. 일반적으로 이 부분까지 실행이 완료가 되었다면 Kafka를 사용할 준비가 되어 있다고 할 수 있습니다.

golang을 통해서 간단한 consumer와 producer를 만들려고 합니다. segmentio/kafka-go 패키지를 통해 kafka에 접근할 수 있도록 할 것입니다. 일단 Topic을 생성해서 해당 Topic을 접근하는 producer와 consumer를 만들 것입니다.

        
            package main

            import (
                "context"
                "fmt"
                "log"
                "os"
                "strconv"
                "time"

                "github.com/segmentio/kafka-go"
            )

            const (
                topic         = "message-log"
                brokerAddress = "localhost:9092"
            )

            func main() {
                context := context.Background()

                createTopic(context)
                go produce(context)
                consume(context)
            }

            func createTopic(context context.Context) {
                conn, err := kafka.DialLeader(context, "tcp", "localhost:9092", topic, 0)
                if err != nil {
                    panic(err.Error())
                }
                defer conn.Close()
            }

            func produce(ctx context.Context) {

                i := 0
                logger := log.New(os.Stdout, "kafka producer: ", 0)
                writer := kafka.NewWriter(kafka.WriterConfig{
                    Brokers: []string{brokerAddress},
                    Topic:   topic,
                    Logger:  logger,
                })

                for {
                    err := writer.WriteMessages(ctx, kafka.Message{
                        Key:   []byte(strconv.Itoa(i)),
                        Value: []byte("this is message" + strconv.Itoa(i)),
                    })

                    if err != nil {
                        panic("could not write message " + err.Error())
                    }

                    fmt.Println("writes:", i)
                    i++

                    time.Sleep(time.Second)
                }
            }

            func consume(ctx context.Context) {
                logger := log.New(os.Stdout, "kafka consumer: ", 0)
                reader := kafka.NewReader(kafka.ReaderConfig{
                    Brokers: []string{brokerAddress},
                    Topic:   topic,
                    GroupID: "my-group",
                    Logger:  logger,
                })

                for {
                    msg, err := reader.ReadMessage(ctx)
                    if err != nil {
                        panic("could not read message " + err.Error())
                    }

                    fmt.Println("received: ", string(msg.Value))
                }
            }

        
    

마무리

Kafka를 간단하게 소개하고 간단한 실행까지 해봤습니다. 아직 Application이나 Platform에 적용하기에는 이정도로는 부족하지만 Kafka에 대한 감을 가지고 소개하는 데는 괜찮지 않을까 하는 생각이 들었습니다. 다음에는 좀 더 실제 서비스에 사용할 수 있는 정도로 포스팅을 해보도록 하겠습니다 감사합니다.