.NET Core下使用Kafka的方法步骤

ZooLeeper : https://zookeeper.apache.org/releases.html

下载并解压

# 下载,并解压 $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $ tar -zxvf kafka_2.12-2.1.1.tgz $ mv kafka_2.12-2.1.1.tgz /data/kafka # 下载 zookeeper,解压 $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz $ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz $ mv apache-zookeeper-3.5.8-bin /data/zookeeper

启动 ZooKeeper

# 复制配置模版 $ cd /data/kafka/conf $ cp zoo_sample.cfg zoo.cfg # 看看配置需不需要改 $ vim zoo.cfg # 命令 $ ./bin/zkServer.sh start # 启动 $ ./bin/zkServer.sh status # 状态 $ ./bin/zkServer.sh stop # 停止 $ ./bin/zkServer.sh restart # 重启 # 使用客户端测试 $ ./bin/zkCli.sh -server localhost:2181 $ quit

启动 Kafka

# 备份配置 $ cd /data/kafka $ cp config/server.properties config/server.properties_copy # 修改配置 $ vim /data/kafka/config/server.properties # 集群配置下,每个 broker 的 id 是必须不同的 # broker.id=0 # 监听地址设置(内网) # listeners=PLAINTEXT://ip:9092 # 对外提供服务的IP、端口 # advertised.listeners=PLAINTEXT://106.75.84.97:9092 # 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3 # num.partitions=3 # zookeeper 配置 # zookeeper.connect=localhost:2181 # 通过配置启动 kafka $ ./bin/kafka-server-start.sh config/server.properties& # 状态查看 $ ps -ef|grep kafka $ jps

docker下安装Kafka

docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

docker pull wurstmeister/kafka docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介绍

Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。

Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。

Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。

offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

.NET Core下使用Kafka的方法步骤

kafka partition 和 consumer 数目关系

如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。

如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。

如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 快速开始

.NET Core 项目中安装组件

Install-Package Confluent.Kafka

开源地址: https://github.com/confluentinc/confluent-kafka-dotnet

添加 IKafkaService 服务接口

public interface IKafkaService { /// <summary> /// 发送消息至指定主题 /// </summary> /// <typeparam></typeparam> /// <param></param> /// <param></param> /// <returns></returns> Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class; /// <summary> /// 从指定主题订阅消息 /// </summary> /// <typeparam></typeparam> /// <param></param> /// <param></param> /// <param></param> /// <returns></returns> Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class; }

实现 IKafkaService

public class KafkaService : IKafkaService { public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class { var config = new ProducerConfig { BootstrapServers = "127.0.0.1:9092" }; using var producer = new ProducerBuilder<string, string>(config).Build(); await producer.ProduceAsync(topicName, new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = message.SerializeToJson() }); } public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class { var config = new ConsumerConfig { BootstrapServers = "127.0.0.1:9092", GroupId = "crow-consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; //const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }) .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}"); }) .Build(); consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } TMessage messageResult = null; try { messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value); } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } await Task.CompletedTask; } }

注入 IKafkaService ,在需要使用的地方直接调用即可。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/0098c51b75669a47038fa37d0848837b.html