最近迷上了互联网中间件,终于又有了深入钻研新鲜玩意的动力了
其实Kafka表面看起来是很简单的,就一个发布-订阅的消息中间件系统,但是就其使用广泛性来说,肯定有它的优势,而且它涉及到的概念还是挺多的,broker,topic,partitions,producer,consumer,每个概念还有一些属性,比如offset,leader,具体这些都在下面的测试过程中体会下
创建一个CentOS7的虚拟机进行测试,首先下载Kafka,如果是单节点,应该提供了zookeeper的启动,不需要单独再安装
[lihui@2018 ~]$ wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
下载版本就按官方推荐的2.11好了,这里下载的不是Source Code,有兴趣的自己下源码进行编译
解压后,首先看下config/server.properties这个文件,应该是最重要的一个配置文件
1:broker
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0
broker消息保存的地方,因为Kafka的消息是基于磁盘存储,磁盘的顺序读写性能优于内存的随机读写,对于Kafaka集群来说,每一个节点就是一个broker,消息都存在节点机的磁盘上,这里的配置,因为我这里是单节点,因此默认为0即可,假如是分布式部署,一共有多少个集群部署,修改为index
2:listeners
############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://192.168.0.17:9092
这里的注释要去掉,表示的是socket server,假如是集群,配置了才能让别人知道你;如果没有配置,默认会由java.net.InetAddress.getCanonicalHostName()来获取,当然因为这里是单节点,配置个127.0.0.1也行,但是端口9092不用改,约定,我这里就直接配置成真实IP,正规军开始
3:消息目录
############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs
这里有点小坑,默认tmp目录,但是这里存放的可是真正的消息记录的,最好换个目录,不然出现重启消息都没了,我这里配置的是/data/kafka-logs
4:partitions
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1
这也是一个重要概念,它是topic下的一个分区,其实它还有一堆的参数选项,偏移量offset,,每个分区如果有多副本的话还有leader和follower,每次leader来处理分区的消息,follower同步leader的消息,如果leader当掉了,follower变为leader来达到主备效果,在kafka官网有这样一张图
这里kafka集群有2个broker,1个topic,创建了4个partitions,consumer有2个group,A和B,A里有2个 consumer:C1和C2,B里有4个consumer:C3,C4,C5和C6
在并发情况下,某一时刻每个partition只会给一个Consumer Group,也就是给同一个Group里某一个consumer来消费,保证了消费这个分区消息的顺序性,与此同时Consumer Group的数量肯定不能比Partitions还多,这样有的partition就没法消费了
因此这里分区数量就不改了,为1
5:zookeeper
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=192.168.0.17:2181
消息交互的,很少有没用到zookeeper,谁让它牛逼呢,作为zoo(动物书籍?动物园?)的keeper,同样设为本机,port不需要改
6:topic
这只是一个逻辑概念,代表的是一种分类,有的也说是一个话题,一般来说,自己定义的一类消息就是一个topic
配置到这里,基本就完工了,下面就是真正测试阶段了
(1)启动zookeeper,在kafka的bin/目录下,发现了一个启动和关闭zookeeper的脚本,zookeeper-server-start.sh也就30几行,最关键参数就两个,一个可以后台执行,一个指定zookeeper.properties
那就按要求先启动zookeeper,可以先将config/zookeeper.propertiesdataDir目录修改一下
dataDir=/data/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0
启动
[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
检查下是否真的启动成功了,最好是CLI连接测试一下,这里还是用kafaka里提供的zookeeper-shell,你也可以单独下一个zookeeper包,里面有zkClient
[lihui@2018 kafka_2.11-2.0.0]$ sh bin/zookeeper-shell.sh 127.0.0.1:2181 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Connecting to 127.0.0.1:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null ls / [zookeeper] ls /zookeeper [quota]
这样就启动成功了
(2)启动kafka
sudo bin/kafka-server-start.sh -deamon config/server.properties
但是报了一堆错误
[2018-08-05 01:46:07,287] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:08,395] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:08,396] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:09,502] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:09,503] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:10,608] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:10,609] INFO Socket error occurred: 192.168.0.17/192.168.0.17:2181: 拒绝连接 (org.apache.zookeeper.ClientCnxn) [2018-08-05 01:46:13,269] INFO [ZooKeeperClient] Closing. (kafka.zookeeper.ZooKeeperClient) [2018-08-05 01:46:16,722] INFO Opening socket connection to server 192.168.0.17/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
再次查看zookeeper,坑爹的,还好核还算多
[lihui@2018 kafka_2.11-2.0.0]$ bin/zookeeper-shell.sh 192.168.0.17:2181 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Connecting to 192.168.0.17:2181 Welcome to ZooKeeper! JLine support is disabled ^C[lihui@2018 kafka_2.11-2.0.0]$
给虚拟机分配了两个核,重新启动kafka,果然就好了
(3)创建topic,因为我是单节点,因此只创建一个partition和一个副本
[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/kafka-topics.sh --create --zookeeper 192.168.0.17:2181 --topic hello --partitions 1 --replication-factor 1 Created topic "hello".
查看创建的topic列表
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-topics.sh --list -zookeeper 192.168.0.17:2181 hello
(4)先打开consumer,注意这里zookeeper改成了bootstrap-server
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:2181 --topic hello --from-beginning
(5)provider生成消息发送
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-producer.sh --broker-list 192.168.0.17:9092 --topic hello >Hello, LiHui
可是消息并没有收到,可以查看下topic里的消息
[lihui@2018 kafka_2.11-2.0.0]$ cat /data/kafka-logs/hello-0/ 00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint [lihui@2018 kafka_2.11-2.0.0]$ cat /data/kafka-logs/hello-0/00000000000000000000.log t��.!e�1e�1���������������vHello, my name is lihui, this is my world: lihuia.com, 3Q~!8uu�e��e���������������� DC-� e%�e%���������������$Hello, LiHui[lihui@2018 kafka_2.11-2.0.0]$ [lihui@2018 kafka_2.11-2.0.0]$
这里消息是有的
没办法,关掉kafaka,然后前台启动,看看启动信息有没有问题
[lihui@2018 kafka_2.11-2.0.0]$ sudo bin/kafka-server-start.sh config/server.properties
这个地方比较奇怪,提示是连接的server,但是端口号是2181
[2018-08-05 10:10:36,333] INFO Initiating client connection, connectString=192.168.0.17:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1623b78d (org.apache.zookeeper.ZooKeeper) [2018-08-05 10:10:36,346] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2018-08-05 10:10:36,348] INFO Opening socket connection to server localhost/192.168.0.17:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-08-05 10:10:36,353] INFO Socket connection established to localhost/192.168.0.17:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-08-05 10:10:36,361] INFO Session establishment complete on server localhost/192.168.0.17:2181, sessionid = 0x10000476d4f0001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-08-05 10:10:36,364] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient) [2018-08-05 10:10:36,634] INFO Cluster ID = vFeD_uAfRmKWhFE72gjh-A (kafka.server.KafkaServer)
再次查看一下consumer里bootstrap-server的意义
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh The console consumer is a tool that reads data from Kafka and outputs it to standard output. Option Description ------ ----------- --bootstrap-server
将consumer的端口换成9092,连不上
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:9092 --topic hello --from-beginning [2018-08-05 10:49:16,278] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:16,332] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:16,489] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:16,748] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:17,109] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:17,991] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-08-05 10:49:18,926] WARN [Consumer clientId=consumer-1, groupId=console-consumer-98961] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
这就比较奇怪,bootstrap-server和配置里面listen的应该是一致的
查看了一下kafka进程,原来是修改之后,没有重启kafka服务,重启之后,再次启动consumer,topic里的消息都收到了
[lihui@2018 kafka_2.11-2.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.17:9092 --topic hello --from-beginning Hello, my name is lihui, this is my world: lihuia.com, 3Q~! Hello, LiHui Hello, LiHui Nice to hi
结束