-
Notifications
You must be signed in to change notification settings - Fork 4
kafka相关
参考文章:
1.http://www.cnblogs.com/haoxinyue/p/5723986.html
2.http://www.cnblogs.com/luotianshuai/p/5206662.html
3.http://blog.csdn.net/ouyang111222/article/details/51094912
自带位置:/usr/jdk64
设置JAVA的环境变量
/etc/profile
·在profile文件末尾加入:
export JAVA_HOME=/usr/share/jdk1.6.0_14
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
我下载的是官方:scala-2.11.8.tgz版本。
下载解压后配置环境变量
export SCALA_HOME=/data/apps/scala-2.11.8
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile立刻生效
scala -version
https://services.gradle.org/distributions/gradle-2.13-all.zip
配置环境变量。配置GRADLE_HOME到你的gradle根目录当中,然后把%GRADLE_HOME%/bin(linux或mac的是$GRADLE_HOME/bin)加到PATH的环境变量。
export GRADLE_HOME=/data/apps/gradle-2.13
export PATH=$GRADLE_HOME/bin:$PATH
source /etc/profile立刻生效
gradle -version
先执行1->3
download source code:我下载的是官方源码:kafka-0.10.0.0-src.tgz版本。
官方doc参考:http://kafka.apache.org/documentation.html#quickstart
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
进入到config目录:cd /opt/kafka/kafka_2.11-0.9.0.1/config/
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
#broker.id=0 每台服务器的broker.id都不能相同
#hostname
host.name=192.168.7.100
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181
当一个kafka broker启动后,会向zookeeper注册自己的节点信息,该节点为一个临时节点,当当broker断开和zookeeper的连接时,其临时节点将会被删除。
其路径为:
/broker/ids/[0...N]
其中[0..N]表示broker id(broker id唯一,不可以重复),znode的值为对应broker的相关信息,如下:
{ "jmx_port": -1, //JMX的端口号 "timestamp": "1460082147315",//broker启动的时间戳 "host": "xx.xxx.xxx.xxx",//host "version": 1,//默认的版本 "port": 9092 //broker进程的对外监听的端口号 }
当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息。 其路径为:
/broker/topics/[topic]/partitions/[0...N]
其中[0..N]表示partition索引号。其zonode下的信息如下:
{ "controller_epoch": 17,//中央控制器的总的选举次数 "leader": 0, //此partition的broker leader的id "version": 1, //默认版本号 "leader_epoch": 1,//此partition的leader选举的次数 "isr": [ 0 ] //同步副本组brokerId顺序列表 }
在kafka consumer的配置参数中有:
#消费者的ID,若是没有设置的话,会自增
consumer.id
当然,consumer id也可以手动设置。
在zookeeper查看相关consumer id的注册信息,其路径如下:
/consumers/[group_id]/ids/[consumer_id]
这仍然是一个临时的znode,此节点的值为格式如下:
{ "version": 1, "subscription": { "user11": 1 }, "pattern": "static", "timestamp": "1460083658252" }
{“topic_name”:#streams…},即表示此consumer目前所消费的topic + partitions列表。
用来跟踪每个consumer group目前所消费的partition中最大的offset。
其路径为:
/consumers/[group_id]/offsets/[topic]/[partition_id]
此znode为永久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费。
##5 Partition Owner 注册
用来标记partition被哪个consumer消费,为临时节点。
/consumers/[group_id]/owners/[topic]/[partition_id]
Kafka是一个开源的,分布式的,高吞吐量的消息系统。随着Kafka的版本迭代,日趋成熟。大家对它的使用也逐步从日志系统衍生到其他关键业务领域。特别是其超高吞吐量的特性,在互联网领域,使用越来越广泛,生态系统也越来的完善。同时,其设计思路也是其他消息中间件重要的设计参考。
Kafka原先的开发初衷是构建一个处理海量日志的框架,基于高吞吐量为第一原则,所以它对消息的可靠性以及消息的持久化机制考虑的并不是特别的完善。0.8版本后,陆续加入了一些复制、应答和故障转移等相关机制以后,才可以让我们在其他关键性业务中使用。
Kafka的运行架构如下图,各组件之间通过TCP协议通信:
主题,或者说是一类消息。类似于RabbitMQ中的queue。可以理解为一个队列。
一个Kafka服务称之为Broker。Kafka可以集群部署,每一个Kafka部署就是一个Broker。
生产者和消费者。一般消息系统都有生产者和消费者的概念。生产者产生消息,即把消息放入Topic中,而消费者则从Topic中获取消息处理。一个生产者可以向多个Topic发送消息,一个消费者也可以同时从几个Topic接收消息。同样的,一个Topic也可以被多个消费者来接收消息。
分区,或者说分组。分组是Kafka提升吞吐量的一个关键设计。这样可以让消费者多线程并行接收消息。创建Topic时可指定Parition数量。一个Topic可以分为多个Partition,也可以只有一个Partition。每一个Partition是一个有序的,不可变的消息序列。每一个消息在各自的Partition中有唯一的ID。这些ID是有序的。称之为offset,offset在不同的Partition中是可以重复的,但是在一个Partition中是不可能重复的。越大的offset的消息是最新的。Kafka只保证在每个Partition中的消息是有序的,就会带来一个问题,即如果一个Consumer在不同的Partition中获取消息,那么消息的顺序也许是和Producer发送到Kafka中的消息的顺序是不一致的。
如果是多Partition,生产者在把消息放到Topic中时,可以决定放到哪一个Patition。这个可以使用简单的轮训方法,也可以使用一些Hash算法。
一个Topic的多个Partition可以分布式部署在不同的Server上,一个Kafka集群。配置项为:num.partitions,默认是1。每一个Partition也可以在Broker上复制多分,用来做容错。
(5).kafka常用命令:
5.1.创建topic:
./kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic nick-name-modify --partitions 6 --replication-factor 1
zookeeper指定用IP,kafka命令行使用时也要用IP,两者要保持一致,否则提示broker不存在.
xxx是kafka的server.properties中指定的路径。
5.2.删除kafka:
./kafka-topics.sh --delete --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic syncFriend
5.3.list all topics
./kafka-topics.sh --list --zookeeper localhost:2181
5.4.list all topics 详细信息
./kafka-topics.sh --describe --zookeeper localhost:2181
5.5.控制台接收消息:
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic
5.6.查看消息:
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic
5.7.查看kafka指定topic的指定brokerlist的offset
./kafka-simple-consumer-shell.sh --print-offsets --broker-list 10.102.0.28:9092 --topic dataFerryMysqlTopic
5.8.查看kafka消费进度
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
5.9.控制台发送消息: bin/kafka-console-producer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test < logfile