最近在研究利用 kafka 集群作为消息队列,以提高消息的吞吐量,在测试过程中,利用 docker 搭建了一套测试环境,docker可以很方便的搭建kafka集群作为本地测试环境使用
使用docker-compose进行搭建,包含zookpper服务、kafka broker、kafka-manager

docker-compose.yml配置文件

version: '3.1'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    restart: unless-stopped
    hostname: zookeeper
    ports:
      - ${KAFKA_ZOOKEEPER_PORT}
    container_name: zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - ${KAFKA_PORT}
    environment:
      KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_ADVERTISED_HOST_NAME}
      KAFKA_ZOOKEEPER_CONNECT: ${KAFKA_ZOOKEEPER_CONNECT}
      KAFKA_BROKER_ID: ${KAFKA_BROKER_ID}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR}
    depends_on:
      - zookeeper
  kafka-manager:
    image: hlebalbau/kafka-manager
    restart: unless-stopped
    ports:
      - ${KAFKA_MANAGER_PORT}
    external_links:
      - kafka
    environment:
      ZK_HOSTS: ${ZK_HOSTS}
      APPLICATION_SECRET: ${APPLICATION_SECRET}
      KAFKA_MANAGER_AUTH_ENABLED: ${KAFKA_MANAGER_AUTH_ENABLED}
      KAFKA_MANAGER_USERNAME: ${KAFKA_MANAGER_USERNAME}
      KAFKA_MANAGER_PASSWORD: ${KAFKA_MANAGER_PASSWORD}
    depends_on:
      - zookeeper
      - kafka
    container_name: kafka-manager
    command: -Dpidfile.path=/dev/null
  kafka-producer:
    build: kafka/producer
    ports:
      - ${KAFKA_PRODUCE_PORT}
    links:
      - kafka
    environment:
      KAFKA_HOSTS: ${KAFKA_HOSTS}
      KAFKA_CREATE_TOPICS: ${KAFKA_CREATE_TOPICS}
      KAFKA_CONSUMER_GROUP: ${KAFKA_CONSUMER_GROUP}
    depends_on:
      - kafka
    container_name: kafka-producer
  kafka-consumer:
    build: kafka/consumer
    links:
      - kafka
    environment:
      KAFKA_HOSTS: ${KAFKA_HOSTS}
      KAFKA_CREATE_TOPICS: ${KAFKA_CREATE_TOPICS}
      KAFKA_CONSUMER_GROUP: ${KAFKA_CONSUMER_GROUP}
    depends_on:
      - kafka
    container_name: kafka-consumer

.env文件配置

#https://docs.docker.com/compose/env-file
#宿主机ip
KAFKA_ADVERTISED_HOST_NAME=192.168.0.103
#kafka端口隐射
KAFKA_PORT=9092:9092
#zookeeper链接
KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
#zookeeper端口隐射
KAFKA_ZOOKEEPER_PORT=2181:2181
#kafka broker id
KAFKA_BROKER_ID=1
#kafka 备份设置
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
#kafka topic partition
KAFKA_CREATE_TOPICS=stream-in:2:1
#kafka consumer group
KAFKA_CONSUMER_GROUP=diycoder-consumer
#zookeeper 链接设置
ZK_HOSTS=zookeeper:2181
#密钥 设置
APPLICATION_SECRET=random-secret
#kafka 链接设置
KAFKA_HOSTS=kafka:9092
KAFKA_MANAGER_AUTH_ENABLED=true
#kafka manager 端口映射
KAFKA_MANAGER_PORT=9000:9000
#kafka manager 用户名
KAFKA_MANAGER_USERNAME=admin
#kafka manager 密码
KAFKA_MANAGER_PASSWORD=diycoder
#kafka 消费者端口隐射
KAFKA_PRODUCE_PORT=8080:8080

build.sh自动编译脚本

#!/bin/bash

dir=`pwd`
buildprefix="kafka"
build() {
    for d in $(ls ./$1); do
        echo "building $1/$d filename:$buildprefix.$d"
        # pushd命令常用于将目录加入到栈中,加入记录到目录栈顶部,并切换到该目录;若pushd命令不加任何参数,则会将位于记录栈最上面的2个目录对换位置
        pushd $dir/$1/$d >/dev/null
        # -s 忽略符号表和调试信息,-w忽略DWARF符号表,通过这两个参数,可以进一步减少编译的程序的尺寸
        #CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w'
        CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ${buildprefix}.$d -a -installsuffix cgo -ldflags '-s -w'
        # opd用于删除目录栈中的记录;如果popd命令不加任何参数,则会先删除目录栈最上面的记录,然后切换到删除过后的目录栈中的最上面的目录
        popd >/dev/null
    done
}
build kafka

Makefile文件配置

.PHONY: proto build

build:
    ./bin/build.sh

run:
    docker-compose build
    docker-compose up

down:
    docker-compose down

生产者writer.go

package main

import (
    "fmt"
    "github.com/segmentio/kafka-go"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "strings"
)

/**
 * @Author: diycoder
 * @Date: 2019-07-24 20:45
 */

func producerHandler(kafkaWriter *kafka.Writer) func(http.ResponseWriter, *http.Request) {
    return http.HandlerFunc(func(wrt http.ResponseWriter, req *http.Request) {
        body, err := ioutil.ReadAll(req.Body)
        if err != nil {
            log.Fatalln(err)
        }
        msg := kafka.Message{
            Key:   []byte(fmt.Sprintf("address-%s", req.RemoteAddr)),
            Value: body,
        }
        err = kafkaWriter.WriteMessages(req.Context(), msg)

        if err != nil {
            wrt.Write([]byte(err.Error()))
            log.Fatalln(err)
        }
    })
}

func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{kafkaURL},
        Topic:    topic,
        Balancer: &kafka.LeastBytes{},
    })
}

func main() {
    // get kafka writer using environment variables.
    kafkaURL := os.Getenv("KAFKA_HOSTS")
    topic := os.Getenv("KAFKA_CREATE_TOPICS")
    topics := make([]string, 0)
    if topic != "" {
        topics = strings.Split(topic, ":")
    }
    kafkaWriter := getKafkaWriter(kafkaURL, topics[0])
    defer kafkaWriter.Close()

    // Add handle func for producer.
    http.HandleFunc("/", producerHandler(kafkaWriter))

    // Run the web server.
    fmt.Println("start producer-api ... !!")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

writer.go对应的Dockerfile文件

#基础镜像
FROM alpine
#讲编译后的可执行文件添加到容器(ADD命令:添加并解压缩)
ADD kafka.producer /kafka.producer
#容器运行后执行命令
ENTRYPOINT [ "/kafka.producer" ]

消费者reader.go

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "log"
    "os"
    "strings"
)

/**
 * @Author: diycoder
 * @Date: 2019-07-24 20:51
 */

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
    brokers := strings.Split(kafkaURL, ",")
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:  brokers,
        GroupID:  groupID,
        Topic:    topic,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
    })
}

func main() {
    // get kafka reader using environment variables.
    kafkaURL := os.Getenv("KAFKA_HOSTS")
    topic := os.Getenv("KAFKA_CREATE_TOPICS")
    groupID := os.Getenv("KAFKA_CONSUMER_GROUP")
    topics := make([]string, 0)
    if topic != "" {
        topics = strings.Split(topic, ":")
    }
    reader := getKafkaReader(kafkaURL, topics[0], groupID)

    defer reader.Close()

    fmt.Println("start consuming ... !!")
    for {
        m, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Fatalln(err)
        }
        fmt.Printf("message at topic:%v partition:%v offset:%v    %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    }
}

reader.go对应的Dockerfile文件

#基础镜像
FROM alpine
#讲编译后的可执行文件添加到容器(ADD命令:添加并解压缩)
ADD kafka.consumer /kafka.consumer
#容器运行后执行命令
ENTRYPOINT [ "/kafka.consumer" ]

切换到项目目录依次执行以下命令:

➜  sample make build
./bin/build.sh
building kafka/consumer filename:kafka.consumer
building kafka/producer filename:kafka.producer
➜  sample make run
docker-compose build
zookeeper uses an image, skipping
kafka uses an image, skipping
kafka-manager uses an image, skipping
Building kafka-producer
Step 1/3 : FROM alpine
 ---> b7b28af77ffe
Step 2/3 : ADD kafka.producer /kafka.producer
 ---> a002044c0fbc
Step 3/3 : ENTRYPOINT [ "/kafka.producer" ]
 ---> Running in 4250e6836c51
Removing intermediate container 4250e6836c51
 ---> aa265c7ffbfa
Successfully built aa265c7ffbfa
Successfully tagged sample_kafka-producer:latest
Building kafka-consumer
Step 1/3 : FROM alpine
 ---> b7b28af77ffe
Step 2/3 : ADD kafka.consumer /kafka.consumer
 ---> 35a0c582ac61
Step 3/3 : ENTRYPOINT [ "/kafka.consumer" ]
 ---> Running in eb2b8d26d1e9
Removing intermediate container eb2b8d26d1e9
 ---> 6011c68f09b8
Successfully built 6011c68f09b8
Successfully tagged sample_kafka-consumer:latest
docker-compose up
Creating network "sample_default" with the default driver
Creating zookeeper ... done
Creating sample_kafka_1 ... done
Creating kafka-consumer ... done
Creating kafka-producer ... done
Creating kafka-manager  ... done

查看项目部署信息

➜  ~ docker ps
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                                NAMES
c811fb5ae98f        sample_kafka-consumer     "/kafka.consumer"        43 seconds ago      Up 41 seconds                                                            kafka-consumer
351fd3470bca        hlebalbau/kafka-manager   "/kafka-manager/bin/…"   43 seconds ago      Up 41 seconds       0.0.0.0:9000->9000/tcp                               kafka-manager
7734a99b64d2        sample_kafka-producer     "/kafka.producer"        43 seconds ago      Up 41 seconds       0.0.0.0:8080->8080/tcp                               kafka-producer
9f45a59bb2d4        wurstmeister/kafka        "start-kafka.sh"         44 seconds ago      Up 43 seconds       0.0.0.0:9092->9092/tcp                               sample_kafka_1
2521ab818981        wurstmeister/zookeeper    "/bin/sh -c '/usr/sb…"   45 seconds ago      Up 44 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
➜  ~

访问http://127.0.0.1:9000登陆设置kafka-manager相关信息

新增一个cluster 并设置 zookeeper hosts,其他的默认

Golang 实践 —— Docker搭建 Kafka 集群及监控

设置好了,接下来我们来发送消息到kafka,这次我们用 webbench,至于webbench如何安装我这里就不多说了,不会的自行百度.

先来1000次看看效果

➜  ~ webbench -t 120 -c 1000 http://localhost:8080/
Webbench - Simple Web Benchmark 1.5
Copyright (c) Radim Kolar 1997-2004, GPL Open Source Software.

Benchmarking: GET http://localhost:8080/
1000 clients, running 120 sec.
problems forking worker no. 483
fork failed.: Operation timed out
git_prompt_info:2: fork failed: resource temporarily unavailable

我们来看下终端的日志

Golang 实践 —— Docker搭建 Kafka 集群及监控

我们来看下kafka manager的topic相关信息

Golang 实践 —— Docker搭建 Kafka 集群及监控

再来看下consumer相关的信息

Golang 实践 —— Docker搭建 Kafka 集群及监控

ok成功了,我们可以看到consumer正在消费消息