更新时间:2023年10月20日10时32分 来源:传智教育 浏览次数:
在Apache Kafka中,消费者(Consumers)和消费者组(Consumer Groups)是核心概念,用于处理消息的订阅和处理。接下来笔者将详细解释它们之间的关系,并提供一个简单的代码示例来演示它们的用法。
消费者是Kafka中的客户端应用程序,它负责订阅主题并处理从主题中生产的消息。消费者可以独立订阅一个或多个主题,并且可以以不同的速度处理消息。它们可以在不同的分区中并行地处理消息。
消费者组是消费者的逻辑集合,它们一起协作处理主题中的消息。每个消费者组可以包含一个或多个消费者。消费者组的关键特性是它可以协调多个消费者来消费主题中的消息,确保每个分区的消息只被组内的一个消费者处理。这有助于实现负载均衡和提高容错性。
接下来我们看一个使用Java语言的Kafka消费者和消费者组示例:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class MyKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName); Consumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅一个主题 consumer.subscribe(Collections.singletonList("my-topic")); while (true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息 System.out.printf("消费者: key=%s, value=%s%n", record.key(), record.value()); } } } }
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumerGroup { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-broker-list"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组名称 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName); Consumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅一个主题 consumer.subscribe(Collections.singletonList("my-topic")); while (true) { // 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息 System.out.printf("消费者组成员: key=%s, value=%s%n", record.key(), record.value()); } } } }
在上述示例中,两个消费者(可以是同一消费者组的成员)订阅了同一个主题,但消费者组确保每个分区的消息只被一个消费者处理,实现了负载均衡和高可用性。这是Kafka中消费者和消费者组的基本关系和用法。