Kafka(二)-- 安装配置

一、单机部署

1.下载kafka,可在apache官网上下载,kafka要和JDK版本对应,我使用的是JDK1.7,kafka为0.10

2.进入到 /usr/java 中,解压到 此文件夹中

tar -zxvf kafka_2.12-0.10.2.0.tgz

3.因为kafka是通过Zookeeper来集群的,所以先开启Zookeeper集群(当然开启一个Zookeeper也可以),此处用了3个Zookeeper实现了伪集群,集群的方法见此处

4.修改kafka的 /usr/java/kafka_2.12-0.10.2.0/config/server.properties 配置文件,配置Zookeeper的地址,将zookeeper.connect 的值改为:

1
zookeeper.connect=192.168.80.128:2181,192.168.80.128:2182,192.168.80.128:2183

5.启动kafka,进入到kafka的 bin 目录,需要指定启动的配置文件。

启动(以后台方式启动): ./kafka-server-start.sh -daemon ../config/server.properties

在Zookeeper中查看节点:ls / ,发现多个很多节点

img

由此,kafka 已经启动成功了,启动成功之后 kafka 会暴露一个 9092的端口。

6.创建一个topic

./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partition 2 --topic TestTopic

解析:create 为 创建一个topic;

zookeeper 为 指定的zookeeper的地址和端口;

replication-factor 为 topic要在 不同的 broke中保存几分,因为此处没有做集群,则只有一个broker;

partition 为 分区;

opic 为 创建的topic的名字;

7.查看创建的topic

命令行方式:./kafka-topics.sh --list --zookeeper localhost:2181,localhost:2182,localhost:2183

img

在Zookeeper中查看:ls /brokers/topics

img

所有的消息都死通过Zookeeper管理的,不论是发送还是消费。

8.启动消费者,监听6中创建的topic:TestTopic

./kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic TestTopic --from-beginning

9.重新打开一个窗口,启动生产者,发送消息

./kafka-console-producer.sh --broker-list 192.168.80.128:9092 --topic TestTopic

发送:hello kafka,在消费端可以看到 收到信息了。然后多发送几条消息,后面会用到,先发送7条消息。

10.查看kafka的日志:

cd /tmp/kafka-logs/ (因为 server.properties中配置的 log.dirs=/tmp/kafka-logs)

会看到 里面有 TestTopic-0 和 TestTopic-1:

img

因为创建topic的时候创建了两个分区,所以有两个partition。短横线后面的表示 分区的号。

进入到 TestTopic-0中: cd TestTopic-0/,有两个文件:

img

如果直接查看log文件的话,里面会是乱码,所以使用下面命令查看日志:

/usr/java/kafka_2.10-0.10.0.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log

发现日志中有刚刚发的消息,但是发的日志中消息的数量并没有7条,这是因为消息会平均分配在两个partition中,即 TestTopic-0 和 TestTopic-1 中,在TestTopic-0中有4条,就会在TestTopic-1中有3条,一个partition就对应本地的一个物理文件,每个文件中存储的就是消息的内容,消息是持久化存储在磁盘中的,即使消费了的消息也会依然存储在磁盘中,消息的多少和处理消息的效率没太大影响。

二、集群部署(此处为伪集群,部署两个Kafka)

1.开启Zookeeper集群。

2.在 /usr/java中新建kafkaCluster文件夹,然后在 kafkaCluster中新建 kafka1 和 kakfa2 文件夹,将 kafka_2.12-0.10.2.0.tgz 解压到 kafka1 和 kafka2中。

3.修改kafka1 中的 server.perproties文件(192.168.99.50为服务器的IP),

(1)将zookeeper.connect的值改为:zookeeper.connect=192.168.99.50:2181,192.168.99.50:2182,192.168.99.50:2183,

(2)将advertised.listeners的值改为:advertised.listeners=PLAINTEXT://192.168.99.50:9092。

(3)修改端口:listeners=PLAINTEXT://:9092

4.修改kafka2 中的server.perproties文件,

(1)将zookeeper.connect的值改为:zookeeper.connect=192.168.99.50:2181,192.168.99.50:2182,192.168.99.50:2183,

(2)将broker.id 的值 改为 :broker.id = 1 ,用于区分是哪一个broker,相当于 ActiveMQ中的myid。

(3)将advertised.listeners的值改为:advertised.listeners=PLAINTEXT://192.168.99.50:9093。

(4)修改端口:listeners=PLAINTEXT://:9093

(5)修改日志文件保存的位置:log.dirs=/tmp/kafka-logs2(伪集群的时候需要修改,否则会报 文件锁定的错,kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.)

5.分别启动 kafkka1 和 kafka2 ,进入 bin中,

./kafka-server-start.sh -daemon ../config/server.properties

6.查看 Zookeeper节点,

ls /brokers/ids,发现有两个节点,为 [0,1],然后我们 取出 0 和 1 中的值,从 endpoints的值就可以看出我们修改advertised.listeners值的原因,如果不修改的话,默认的是localhost,集群中根本不知道是哪一个。

get /brokers/ids/0

img

7.验证集群是否成功,在kafka1 中发消息,在 kafka2 中收消息。

1) 首先创建一个topic,如果不创建,发消息的时候会自动帮我们创建topic,但是就不会指定partition了。

./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partition 4 --topic jiqun

2) 查看Zookeeper上节点:ls /brokers/topics,存在 刚刚创建的jiqun 的topic

img

3) 在kafka2中 启动消费者,

./kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic jiqun --from-beginning

4) 在kafka中 启动生产者,发送消息,

kafka-console-producer.sh --broker-list 192.168.99.50:9092,192.168.99.50:9093 --topic jiqun

发送 :How are you! ,发现 消费者中 接收到 How are you!

说明集群部署成功!

5) 查看kafka1 的日志目录,cd /tmp/kafka-logs/,发现有 jiqun-1 和 jiqun-3

img

6) 查看kafka2 的日志目录,cd /tmp/kafka-logs2/,发现有jiqun-0 和 jiqun-2

img

这就说明kafka是一个分布式的消息中间件。在不同的 物理机上 存在着真实的文件,又是以顺序存储的。我们可以进入到 partitsion中 找出 刚刚 发送的消息 在哪一个 partision中,发现在jiqun-0上,

img

kafka消息是顺序写入的,kafka之所以性能高,是因为零拷贝 和 顺序写入磁盘,顺序写入磁盘的效率比随机写入磁盘的效率高几百倍甚至上千倍。

kafka读取消息 是 consumer 主动pull 消息的,并不是 broker push 到consumer的,这也是和 ActiveMQ的区别,kafka的消费能力是由 consumer来决定的,pull的时候 是根据offset 来拿到消息。

kafka的message是以topic为基本单位,不同的topic之间是相互独立的,可以理解为不同的topic就是不同的业务,每一个topic中又可以分成几个不同的partision(创建的时候指定),每个partision存储一部分message。Zookeeper上不会保存 message。

查看日志文件中得到的数据分析:

  • offset: 0 逻辑上值,递增,主键
  • position: 0 物理偏移量
  • CreateTime: 1491920255486 创建时间
  • isvalid: true 是否有效
  • payloadsize: 3 消息大小
  • magic: 1 协议版本号
  • compresscodec: NONE 是否压缩
  • crc: 3544424991 算法
  • payload: How are you! 消息内容

8.配置master-slave模式

创建topic的时候, ./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 2 --partition 4 --topic ceshiMasterSlave

这样在消息既在master上保存,又在slave中保存。

9.查看哪一个leader,哪一个是follow

1
./kafka-topics.sh --describe --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic ceshiMasterSlave
坚持原创技术分享,您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------
分享到:
0%