Kafka单节点消息测试

最近迷上了互联网中间件,终于又有了深入钻研新鲜玩意的动力了

其实Kafka表面看起来是很简单的,就一个发布-订阅的消息中间件系统,但是就其使用广泛性来说,肯定有它的优势,而且它涉及到的概念还是挺多的,broker,topic,partitions,producer,consumer,每个概念还有一些属性,比如offset,leader,具体这些都在下面的测试过程中体会下

创建一个CentOS7的虚拟机进行测试,首先下载Kafka,如果是单节点,应该提供了zookeeper的启动,不需要单独再安装

[lihui@2018 ~]$ wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz

下载版本就按官方推荐的2.11好了,这里下载的不是Source Code,有兴趣的自己下源码进行编译

解压后,首先看下config/server.properties这个文件,应该是最重要的一个配置文件

1:broker

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

broker消息保存的地方,因为Kafka的消息是基于磁盘存储,磁盘的顺序读写性能优于内存的随机读写,对于Kafaka集群来说,每一个节点就是一个broker,消息都存在节点机的磁盘上,这里的配置,因为我这里是单节点,因此默认为0即可,假如是分布式部署,一共有多少个集群部署,修改为index

2:listeners

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.0.17:9092

这里的注释要去掉,表示的是socket server,假如是集群,配置了才能让别人知道你;如果没有配置,默认会由java.net.InetAddress.getCanonicalHostName()来获取,当然因为这里是单节点,配置个127.0.0.1也行,但是端口9092不用改,约定,我这里就直接配置成真实IP,正规军开始

3:消息目录

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

这里有点小坑,默认tmp目录,但是这里存放的可是真正的消息记录的,最好换个目录,不然出现重启消息都没了,我这里配置的是/data/kafka-logs

4:partitions

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

这也是一个重要概念,它是topic下的一个分区,其实它还有一堆的参数选项,偏移量offset,,每个分区如果有多副本的话还有leader和follower,每次leader来处理分区的消息,follower同步leader的消息,如果leader当掉了,follower变为leader来达到主备效果,在kafka官网有这样一张图

NewImage

这里kafka集群有2个broker,1个topic,创建了4个partitions,consumer有2个group,A和B,A里有2个 consumer:C1和C2,B里有4个consumer:C3,C4,C5和C6

在并发情况下,某一时刻每个partition只会给一个Consumer Group,也就是给同一个Group里某一个consumer来消费,保证了消费这个分区消息的顺序性,与此同时Consumer Group的数量肯定不能比Partitions还多,这样有的partition就没法消费了

因此这里分区数量就不改了,为1

5:zookeeper

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.0.17:2181

消息交互的,很少有没用到zookeeper,谁让它牛逼呢,作为zoo(动物书籍?动物园?)的keeper,同样设为本机,port不需要改

6:topic

这只是一个逻辑概念,代表的是一种分类,有的也说是一个话题,一般来说,自己定义的一类消息就是一个topic

配置到这里,基本就完工了,下面就是真正测试阶段了

(1)启动zookeeper,在kafka的bin/目录下,发现了一个启动和关闭zookeeper的脚本,zookeeper-server-start.sh也就30几行,最关键参数就两个,一个可以后台执行,一个指定zookeeper.properties

那就按要求先启动zookeeper,可以先将config/zookeeper.propertiesdataDir目录修改一下

dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

启动

[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

检查下是否真的启动成功了,最好是CLI连接测试一下,这里还是用kafaka里提供的zookeeper-shell,你也可以单独下一个zookeeper包,里面有zkClient

[lihui@2018 kafka_2.11-2.0.0]$ sh bin/zookeeper-shell.sh 127.0.0.1:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[zookeeper]
ls /zookeeper
[quota]

这样就启动成功了

(2)启动kafka

sudo bin/kafka-server-start.sh -deamon config/server.properties

但是报了一堆错误

[2018-08-05 01:46:07,287] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:08,395] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:08,396] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:09,502] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:09,503] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:10,608] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:10,609] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn)
[2018-08-05 01:46:13,269] INFO [ZooKeeperClient] Closing. (kafka.zookeeper.ZooKeeperClient)
[2018-08-05 01:46:16,722] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

再次查看zookeeper,坑爹的,还好核还算多

[lihui@2018 kafka_2.11-2.0.0]$ bin/zookeeper-shell.sh 192.168.0.17:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Connecting to 192.168.0.17:2181
Welcome to ZooKeeper!
JLine support is disabled
^C[lihui@2018 kafka_2.11-2.0.0]$

给虚拟机分配了两个核,重新启动kafka,果然就好了

(3)创建topic,因为我是单节点,因此只创建一个partition和一个副本

[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/kafka-topics.sh --create --zookeeper 192.168.0.17:2181 --topic hello --partitions 1 --replication-factor 1
Created topic "hello".

查看创建的topic列表

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-topics.sh --list -zookeeper 192.168.0.17:2181
hello

(4)先打开consumer,注意这里zookeeper改成了bootstrap-server

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:2181 --topic hello --from-beginning

(5)provider生成消息发送

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-producer.sh --broker-list 192.168.0.17:9092 --topic hello
>Hello, LiHui

可是消息并没有收到,可以查看下topic里的消息

[lihui@2018 kafka_2.11-2.0.0]$ cat /data/kafka-logs/hello-0/
00000000000000000000.index      00000000000000000000.log        00000000000000000000.timeindex  leader-epoch-checkpoint
[lihui@2018 kafka_2.11-2.0.0]$ cat /data/kafka-logs/hello-0/00000000000000000000.log
t��.!e�1e�1���������������vHello, my name is lihui, this is my world: lihuia.com, 3Q~!8uu�e��e����������������
                                                                                                              DC-� e%�e%���������������$Hello, LiHui[lihui@2018 kafka_2.11-2.0.0]$
[lihui@2018 kafka_2.11-2.0.0]$

这里消息是有的

没办法,关掉kafaka,然后前台启动,看看启动信息有没有问题

[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/kafka-server-start.sh config/server.properties

这个地方比较奇怪,提示是连接的server,但是端口号是2181

[2018-08-05 10:10:36,333] INFO Initiating client connection, connectString=192.168.0.17:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1623b78d (org.apache.zookeeper.ZooKeeper)
[2018-08-05 10:10:36,346] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2018-08-05 10:10:36,348] INFO Opening socket connection to server localhost/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-08-05 10:10:36,353] INFO Socket connection established to localhost/192.168.0.17:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-08-05 10:10:36,361] INFO Session establishment complete on server localhost/192.168.0.17:2181, sessionid = 0x10000476d4f0001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-08-05 10:10:36,364] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2018-08-05 10:10:36,634] INFO Cluster ID = vFeD_uAfRmKWhFE72gjh-A (kafka.server.KafkaServer)

再次查看一下consumer里bootstrap-server的意义

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option                                   Description
------                                   -----------
--bootstrap-server 

将consumer的端口换成9092,连不上

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:9092 --topic hello --from-beginning
[2018-08-05 10:49:16,278] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:16,332] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:16,489] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:16,748] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:17,109] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:17,991] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-05 10:49:18,926] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

这就比较奇怪,bootstrap-server和配置里面listen的应该是一致的

查看了一下kafka进程,原来是修改之后,没有重启kafka服务,重启之后,再次启动consumer,topic里的消息都收到了

[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:9092 --topic hello --from-beginning
Hello, my name is lihui, this is my world: lihuia.com, 3Q~!

Hello, LiHui
Hello, LiHui
Nice to


hi

结束

发表回复