Kafka单机环境配置及基本使用详解(3)

创建一个生产数据的Controller

package com.example.kane.Controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @RestController @RequestMapping("/kafka") public class CollectController { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/send", method = RequestMethod.GET) public void sendKafka(HttpServletRequest request, HttpServletResponse response) { try { String message = request.getParameter("message"); logger.info("kafka的消息={}", message); kafkaTemplate.send("test", "key", message); logger.info("发送kafka成功."); } catch (Exception e) { logger.error("发送kafka失败", e); } } }

启动项目后,在浏览器访问:8080/kafka/send?message=url_producer

# 查看结果 2019-03-05 13:57:16.438 INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController : 发送kafka成功. 2019-03-05 13:57:45.871 INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController : kafka的消息=url_producer 2019-03-05 13:57:45.872 INFO 10208 --- [nio-8080-exec-5] c.e.kane.Controller.CollectController : 发送kafka成功. # 查看虚拟机 Consumer结果 [root@localhost ~]# /home/kafka_2.11-2.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.138:9092 --topic test --from-beginning physics python message 123 null url_producer

增加消费者的配置

package com.example.kane.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; import com.example.kane.service.kafka_listener; @Configuration @EnableKafka public class kafka_consumer_config { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.138:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); return propsMap; } @Bean public kafka_listener listener() { return new kafka_listener(); } }

增加listener类

package com.example.kane.service; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; public class kafka_listener { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { logger.info(record.toString()); logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); } }

同样我们用访问:8080/kafka/send?message=url_producer1重新发一个消息

# 结果 2019-03-05 14:31:04.787 INFO 10208 --- [nio-8080-exec-1] c.e.kane.Controller.CollectController : 发送kafka成功. 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : ConsumerRecord(topic = test, partition = 0, offset = 10, CreateTime = 1551767464787, serialized key size = 3, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = key, value = url_producer1) 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : kafka的key: key 2019-03-05 14:31:04.848 INFO 10208 --- [ntainer#0-0-C-1] com.example.kane.service.kafka_listener : kafka的value: url_producer1 # 查看虚拟机 消费者信息 physics python message 123 null url_producer url_producer1 url_producer1 一些需要注意的问题

现在kafka官方提供自带zookeeper版本,不建议使用自带的,还是建议自己安装zookeeper

物理机没法访问的时候,看文中的注意事项,依次更改一定能访问

Linux公社的RSS地址https://www.linuxidc.com/rssFeed.aspx

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/be0f666235fabc9db508fbe3e181edbf.html