Kafka单机环境配置及基本使用详解(2)

重新开另个一Xshell窗口CD到Kafka目录/bin下,我们先介绍这一节会使用到的 kafka-console-consumer.sh

# 键入如下命令 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 最近本的指定,bootstrap-server与topic/whitelist是必须的参数 # 由于有 from-beginning 参数 会从头load所有消息 # 消费后返回如下 today message #在生产端键入消息后,消费端会同步消息出现

kafka-console-consumer.sh参数说明运行./kafka-console-consumer.sh --help可查看

使用Python作为生产者、消费者

在物理机上写一个Python生产者的脚本

from kafka.producer import KafkaProducer import time def send_data(data): producer = KafkaProducer(bootstrap_servers='192.168.233.138:9092') producer.send("test",b''+str(data)+'') producer.flush() print ("end") if __name__=="__main__": send_data("physics python message");

查看Xshell上消费的命令行

[root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning 111 333 1 12 physics python message

在物理机上写一个消费者的脚本

from kafka import KafkaConsumer import time def get_data(data): consumer = KafkaConsumer('test',bootstrap_servers='192.168.233.138:9092', group_id='my_favorite_group') print ("end") for msg in consumer: print(msg) if __name__=="__main__": get_data();

物理机消费者的结果

# 我这边是先运行的消费者的脚本,所以实时接收到了物理机产生的消息 ConsumerRecord(topic=u'test', partition=0, offset=5, timestamp=1551762485911L, timestamp_type=0, key=None, value='physics python message', checksum=1520092583, serialized_key_size=-1, serialized_value_size=22)

测试使用虚拟机sh端的生产者发送123 消息,查看物理机消费者结果

ConsumerRecord(topic=u'test', partition=0, offset=6, timestamp=1551762784609L, timestamp_type=0, key=None, value='123', checksum=1760815061, serialized_key_size=-1, serialized_value_size=3)

几点注意

# 物理机连接时可能出现【kafka.errors.NoBrokersAvailable: NoBrokersAvailable】这个错误按照如下顺序依次更改 1. 查看虚拟机防火墙是否关闭 systemctl status firewalld systemctl stop firewalld 2. 更改kafka服务端的server.properties: 增加 [ listeners=PLAINTEXT://192.168.233.138:9092 ]这一行 3. 修改物理机的hosts文件 C:\Windows\System32\drivers\etc\hosts 增加 【虚拟机ip 虚拟机主机名】 Eg:[192.168.233.138 localhost] 使用Springboot 作为生产者、消费者

注:我直接在我的一个寄存的Spring Boot Demo项目上更改

在pom.xml中添加kafka依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 提示一件事情此处别指定version了,直接用最新的就可以,老的版本一些包找不到 -->

写一个kafka 生产者配置类

package com.example.kane.config; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration @EnableKafka public class kafka_config { public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/be0f666235fabc9db508fbe3e181edbf.html