Kafka安装配置
kafka需要java环境,自行安装java sdk 1.8+.(新版kafka包内集成了zookeeper,直接启动即可)
http://kafka.apache.org/downloads
单独配置KAFKA JDK,bin/kafka-run-class.sh添加以下内容
#init java_home
export JAVA_HOME=/opt/tmp/jdk1.8.0_191
config路径下配置文件:
- consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,默认即可
- producer.properties 生产者配置,这个配置文件用于配置开启的生产者,默认即可
- server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,核心配置如下:
- broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,默认即可
- listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置。例如:listeners=PLAINTEXT:// 192.168.1.1:9092。并确保服务器的9092端口能够访问
- zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,默认即可。例如:zookeeper.connect=localhost:2181
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
&代表后台运行
启动kafka:
bin/kafka-server-start.sh config/server.properties &
创建一个名为test的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已经创建的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
更改tioic分区:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4
查看指定topic信息:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
删除指定topic
bin/kafka-topics --delete --zookeeper localhost:2181 --topic test
创建一个消息消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
如果弹出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需将上文中localhost改为队列配置的地址
创建一个消息生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
出现 > 符号后输入消息发送,消费者连接可以看到输出。
如果弹出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,需将上文中localhost改为队列配置的地址
创建topic
./bin/kafka-topics.sh --create --zookeeper 192.168.131.11:2181 --replication-factor 1 --partitions 1 --topic SVCD-OPERATION-USER-TOPIC
./bin/kafka-topics.sh --create --zookeeper 192.168.131.11:2181 --replication-factor 1 --partitions 1 --topic test6
删除topic
./bin/kafka-topics.sh --delete --zookeeper 192.168.131.11:2181 --topic SVCD-OPERATION-USER-TOPIC
生产者
./bin/kafka-console-producer.sh --broker-list 192.168.131.11:9092 --topic SVCD-OPERATION-USER-TOPIC
消费者
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.131.11:9092 --topic SVCD-OPERATION-USER-TOPIC --from-beginning
打开SASL/PLAIN权限
配置config/server.properties
添加下面的配置 ip和端口改成自己需要
listeners=SASL_PLAINTEXT://192.168.131.11:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
配置config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapswd"
user_kafka="kafkapswd"
user_mooc="moocpswd";
};
配置config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="mooc"
password="moocpswd";
};
配置bin/kafka-server-start-saal.sh
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/tmp/kafka_2.11-2.1.0/config/kafka_server_jaas.conf"
fi
配置bin/kafka-console-producer.sh 和 bin/kafka-console-consumer.sh
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/tmp/kafka_2.11-2.1.0/config/kafka_client_jaas.conf"
fi