深圳市网站建设,运城哪家做网站的公司好,一般网站空间多大,网站维护细则目录 一、独立消费者消费某一个主题数据案例1.1、案例需求1.2、案例代码1.3、测试 一、独立消费者消费某一个主题数据案例
1.1、案例需求
创建一个独立消费者#xff0c;消费firstTopic主题中数据#xff0c;所下图所示#xff1a; 注意#xff1a;在消费者 API 代码中必… 目录 一、独立消费者消费某一个主题数据案例1.1、案例需求1.2、案例代码1.3、测试 一、独立消费者消费某一个主题数据案例
1.1、案例需求
创建一个独立消费者消费firstTopic主题中数据所下图所示 注意在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
1.2、案例代码 代码 package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/**** 独立消费者消费某一个主题中的数据*/
public class CustomConsumer {public static void main(String[] args) {// 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test5);// 设置分区分配策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,org.apache.kafka.clients.consumer.StickyAssignor);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(firstTopic);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){//每一秒拉取一次数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));//输出数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}kafkaConsumer.commitAsync();}}
}1.3、测试 在 Kafka 集群控制台创建firstTopic主题 bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 3 --replication-factor 1 --topic firstTopic在 IDEA中启动案例代码 在 Kafka 集群控制台创建 Kafka生产者并输入数据。 bin/kafka-console-producer.sh --bootstrap-server 192.168.136.27:9092 --topic firstTopic在 IDEA 控制台观察接收到的数据。 ConsumerRecord(topic firstTopic, partition 0, leaderEpoch 0, offset 0, CreateT
ime 1694097579736, serialized key size -1, serialized value size 10, headers
RecordHeaders(headers [], isReadOnly false), key null, value helo kafka)