Apache Kafka 是一种分布式数据存储,专为实时输入和处理流数据而设计。 流数据是由数千个数据源不断生成的信息,所有这些数据源同时传输数据记录。 流媒体平台必须能够应对不断流入的数据并按顺序和渐进地处理它。
什么是多线程,我们为什么需要它?
中央处理单元 (CPU)(或多核处理器中的单核)在操作系统支持下同时提供许多执行线程的能力称为多线程。 多线程可用于提高应用程序速度的情况下,可以将工作分解为更小的单元,这些单元可以并行运行而不会影响数据的一致性。 Kafka 允许您通过使用分区来扩展分布式系统,分区是主题中消息的有序子集。
它最近注意到一种趋势,即开发人员不是确保计算可以有效地处理来自单个分区的数据,而是采用扩展分区/虚拟机的简单方法来获得所需的吞吐量。 这相当于在问题上砸钱。
Kafka 主题将记录划分为称为分区的较小部分,可以单独处理这些部分,而不会影响结果的准确性,为并行处理奠定了基础。 这通常通过扩展来实现,这涉及在同一组中使用许多消费者,每个消费者处理来自主题分区子集的数据并在单个线程中运行。
因为在一个线程中读取和处理消息对于大多数人来说已经足够了 卡夫卡用例阿帕奇 卡夫卡消费者 线程范式被广泛使用。 当处理不需要 I/O 活动时,轮询循环运行顺利。
卡夫卡消费者
购买 Kafka 的消费者通常是作为一个群体的一部分购买的。 当许多消费者订阅一个主题并且是同一消费者组的成员时,每个消费者都会从主题分区的子集接收消息。
将额外的消费者添加到消费者组是从 Kafka 主题扩展数据消费的最常用技术。 Kafka 的消费者经常执行高延迟操作,例如写入数据库或对数据执行耗时的计算。 当单个消费者无法跟上数据流入主题的速度时,我们通过添加更多消费者来扩展,这些消费者通过让每个消费者只拥有分区和消息的子集来分担负载。
多线程的好处
多线程允许程序的多个部分同时运行。 线程是进程中可用的轻量级进程。 多线程允许多任务处理以充分利用 CPU。
以下是多线程编程的一些优点:
共享资源
进程的资源,包括内存、数据和文件,在所有线程之间共享。 使用资源共享,单个程序可以在同一地址空间中拥有多个线程。
响应能力
程序响应性允许程序继续运行,即使它的一部分由于多线程而停止。 如果该过程正在执行冗长的操作,则也可以这样做。
多处理器架构
多线程允许多处理器架构中的每个线程并行运行在不同的处理器上。 这提高了系统的并发性。 在单处理器系统中,一次只能运行一个进程或线程。
每个消费者模型的线程是多少
每个线程都被实例化并连接到每个消费者模型线程中的 Kafka 代理。 消息将发送到这些线程的分区由 kafka 代理分配。
单线程以多线程消费者模式连接到Kafka,可以从多个/单个分区获取数据。 一旦将数据提供给线程,线程就可以将消息分发到其他线程池以进行并行处理。 在此方法中,消费者线程决定哪个子线程将处理哪些类型的消息。 然而,在这种情况下,偏移管理变得极其困难。
Spring 可以很容易地生成多个线程来连接到 Kafka。 让我们看看两者的行为有何不同。 我们有一个包含十个分区的测试主题和一个运行单个并发 Spring 应用程序的 VM。
每个消费者模型的线程
/**
* Consumer configuration for email topics
*
* @return
*/
@Bean
public ConsumerFactory<String, String> consumerFactory()
{
Map<String, Object> props = new HashMap<>();
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, EMAIL_STATUS_CONSUMER_GROUP);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 为 kafka 监听器设置并发
*
* @返回
*/
@豆
公共 ConcurrentKafkaListenerContainerFactory
{
ConcurrentKafkaListenerContainerFactory
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
返厂;
}
消费者组 spring-group 正在监听这个划分。 以下是单个并发的行为方式:
组主题分区 消费者 ID 主机 CLIENT-ID
弹簧组测试主题 8 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 2 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 1 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 4 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 5 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 6 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 3 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 7 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 9 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
弹簧组测试主题 0 消费者 1-01a5779b-940b-44cf-b8c6-2e414aa38eb1 /172.22.0.1 消费者 1
如果您仔细查看上面的输出,您会注意到应用程序的消费者 ID 对于所有 10 个分区都是相同的,这表明它是连接所有分区的单个线程。
我们来看看并发增加到2时会发生什么,
组主题分区 消费者 ID 主机 CLIENT-ID
弹簧组测试主题 8 消费者 2-8ab0213d-683c-4f92-b3c8-767701905994 /172.22.0.1 消费者 2
弹簧组测试主题 5 消费者 2-8ab0213d-683c-4f92-b3c8-767701905994 /172.22.0.1 消费者 2
弹簧组测试主题 6 消费者 2-8ab0213d-683c-4f92-b3c8-767701905994 /172.22.0.1 消费者 2
弹簧组测试主题 7 消费者 2-8ab0213d-683c-4f92-b3c8-767701905994 /172.22.0.1 消费者 2
弹簧组测试主题 9 消费者 2-8ab0213d-683c-4f92-b3c8-767701905994 /172.22.0.1 消费者 2
弹簧组测试主题 4 消费者 1-886f1a6e-f316-4e17-90d2-599a582682e4 /172.22.0.1 消费者 1
弹簧组测试主题 2 消费者 1-886f1a6e-f316-4e17-90d2-599a582682e4 /172.22.0.1 消费者 1
弹簧组测试主题 3 消费者 1-886f1a6e-f316-4e17-90d2-599a582682e4 /172.22.0.1 消费者 1
弹簧组测试主题 1 消费者 1-886f1a6e-f316-4e17-90d2-599a582682e4 /172.22.0.1 消费者 1
弹簧组测试主题 0 消费者 1-886f1a6e-f316-4e17-90d2-599a582682e4 /172.22.0.1 消费者 1
正如您在上面的屏幕截图中看到的,现在有两个线程,每个线程有五个分区。
Kafka 将尝试在属于同一消费者组的线程之间平均分配分区。 如果我们创建 10 个并发线程,我们将为每个分区创建一个专用线程。
结论
在这篇文章中,我们了解了一些关于多线程的知识,并学习了线程模型。