当前位置: 首页 > news >正文

装修建材网站模板云网站功能

装修建材网站模板,云网站功能,网站备案 接入商名称,wordpress所有人提问注#xff1a;此内容是本人在另一个技术平台发布的历史文章#xff0c;转载发布到CSDN#xff1b; Apache Kafka是一个开源分布式事件流平台#xff0c;也是当前系统开发中流行的高性能消息队列服务#xff0c;数千家公司使用它来实现高性能数据管道、流分析、数据集成和关… 注此内容是本人在另一个技术平台发布的历史文章转载发布到CSDN Apache Kafka是一个开源分布式事件流平台也是当前系统开发中流行的高性能消息队列服务数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。 Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样将处理与数据生产者分离开来、缓冲未处理的消息等。与大多数消息系统相比Kafka 具有更好的吞吐量、内置分区、复制和容错能力这使其成为大规模消息处理应用程序的良好解决方案。 Java工具类 此基于kafka客户端的工具类提供基础的消息发送与监听功能。 pom.xml !-- 集成kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdversion2.2.2/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.2/version/dependency KafkaUtils.java import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections; import java.util.Properties; import java.util.concurrent.Future;/*** Description kafka工具类提供消息发送与监听*/ public class KafkaUtils {/*** 获取实始化KafkaStreamServer对象* return*/public static KafkaStreamServer bulidServer(){return new KafkaStreamServer();}/*** 获取实始化KafkaStreamClient对象* return*/public static KafkaStreamClient bulidClient(){return new KafkaStreamClient();}public static class KafkaStreamServer{KafkaProducerString, String kafkaProducer null;private KafkaStreamServer(){}/*** 创建配置属性* param host* param port* return*/public KafkaStreamServer createKafkaStreamServer(String host, int port){String bootstrapServers String.format(%s:%d, host, port);if (kafkaProducer ! null){return this;}Properties properties new Properties();//kafka地址多个地址用逗号分割properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);kafkaProducer new KafkaProducer(properties);return this;}/*** 向kafka服务发送生产者消息* param topic* param msg* return*/public FutureRecordMetadata sendMsg(String topic, String msg){ProducerRecordString, String record new ProducerRecordString, String(topic, msg);FutureRecordMetadata future kafkaProducer.send(record);System.out.println(消息发送成功: msg);return future;}/*** 关闭kafka连接*/public void close(){if (kafkaProducer ! null){kafkaProducer.flush();kafkaProducer.close();kafkaProducer null;}}}public static class KafkaStreamClient {KafkaConsumerString, String kafkaConsumer null;private KafkaStreamClient(){}/*** 配置属性,创建消费者* param host* param port* return*/public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){String bootstrapServers String.format(%s:%d, host, port);if (kafkaConsumer ! null){return this;}Properties properties new Properties();//kafka地址多个地址用逗号分割properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);kafkaConsumer new KafkaConsumerString, String(properties);return this;}/*** 客户端消费者拉取消息并通过回调HeaderInterface实现类传递消息* param topic* param headerInterface*/public void pollMsg(String topic, HeaderInterface headerInterface) {kafkaConsumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(100);for (ConsumerRecordString, String record : records) {try{headerInterface.execute(record);}catch(Exception e){e.printStackTrace();}}}}/*** 关闭kafka连接*/public void close(){if (kafkaConsumer ! null){kafkaConsumer.close();kafkaConsumer null;}}}FunctionalInterfaceinterface HeaderInterface{void execute(ConsumerRecordString, String record);}/*** 测试示例* param args* throws InterruptedException*/public static void main(String[] args) throws InterruptedException {//生产者发送消息 // KafkaStreamServer kafkaStreamServer KafkaUtils.bulidServer().createKafkaStreamServer(127.0.0.1, 9092); // int i0; // while (i10) { // String msg Hello, new Random().nextInt(100); // kafkaStreamServer.sendMsg(test, msg); // i; // Thread.sleep(100); // } // kafkaStreamServer.close(); // System.out.println(发送结束);System.out.println(接收消息);KafkaStreamClient kafkaStreamClient KafkaUtils.bulidClient().createKafkaStreamClient(127.0.0.1, 9092, consumer-45);kafkaStreamClient.pollMsg(test, new HeaderInterface() {Overridepublic void execute(ConsumerRecordString, String record) {System.out.println(String.format(topic:%s,offset:%d,消息:%s, record.topic(), record.offset(), record.value()));}});} }
http://www.laogonggong.com/news/136213.html

相关文章:

  • 合肥网站设计公seo关键词是什么意思
  • 网站建设的整体流程有哪些如何做ppt 制作过程视频教程
  • 横琴建设局网站高端网站建设公司排名
  • 申请制作网站网页与网站之间的关系
  • 长沙网站排名报价WordPress页码总数
  • 建设信用卡在网站挂失块吗成功英语网站
  • 四川成都网站建设公司网站排名优化怎么弄
  • 网站中怎么做网站统计做淘客网站的
  • 翻译做网站网站引导页案例
  • 黄页引流推广网站入口织梦网站后台管理系统
  • 装修网站实景图vr怎么做的白云百度seo公司
  • 广州 网站建设wordpress修改背景
  • 企业所得税怎么征收2022政策关键词优化建议
  • 常州房地产网站建设制作网站软件网站
  • 做机械设计图纸找什么网站?怎么做购物网站
  • 市工商联官方网站建设方案房产律师咨询免费24小时在线
  • 做非法网站有哪些网站开发公司 苏州
  • 微信手机网站案例地产公司网站建设方案
  • 衡水wap网站建设费用网站排名优化方法
  • 虎丘苏州网站建设wordpress单主题
  • 自己做网站接广告济南工程造价信息网
  • 百度注册网站怎么弄wordpress调用置顶文章
  • 2017商会网站建设方案长尾关键词挖掘精灵官网
  • 公司网站怎么弄苏州相城区网站建设
  • 国内无版权图片网站免费定制logo网站
  • 景区加强网站建设推广产品的软文怎么写
  • 句容市网站seo优化排名网站推广策划书包括哪些点
  • 网站制作都包括什么江苏建设人才网电子证书查询官网
  • 做网站有一个火箭回顶部做ppt免费模板软件
  • 如何办好公司网站个人可以做商城网站吗