当前位置: 首页 > news >正文

营销网站建设前期准备梦见死去的外公叫我回家

营销网站建设前期准备,梦见死去的外公叫我回家,网站备案怎么查,网站设计与程序方向Pyspark 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hi…

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_结构化流4
#博学谷IT学习技术支持


文章目录

  • Pyspark
  • 前言
  • 一、数据模拟器代码
  • 二、需求说明和代码实现
  • 总结


前言

接上次继续Pyspark_结构化流,今天主要是一个结构化流结合kafka的一个小案例。


一、数据模拟器代码

1- 创建一个topic, 放置后续物联网数据: search-log-topic
./kafka-topics.sh --create --zookeeper node1:2181 --topic search-log-topic --partitions 3 --replication-factor 2

import json
import random
import time
import os
from kafka import KafkaProducer# 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"# 快捷键:  main 回车
if __name__ == '__main__':print("模拟物联网数据")# 1- 构建一个kafka的生产者:producer = KafkaProducer(bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],acks='all',value_serializer=lambda m: json.dumps(m).encode("utf-8"))# 2- 物联网设备类型deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]while True:index = random.choice(range(0, len(deviceTypes)))deviceID = f'device_{index}_{random.randrange(1, 20)}'deviceType = deviceTypes[index]deviceSignal = random.choice(range(10, 100))# 组装数据集print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,'time': time.strftime('%s')})# 发送数据producer.send(topic='search-log-topic',value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,'time': time.strftime('%s')})# 间隔时间 5s内随机time.sleep(random.choice(range(1, 5)))

生成的kafka数据
{‘deviceID’: ‘device_0_14’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 18, ‘time’: ‘1680157073’}
{‘deviceID’: ‘device_2_8’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 30, ‘time’: ‘1680157074’}
{‘deviceID’: ‘device_0_17’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 84, ‘time’: ‘1680157076’}
{‘deviceID’: ‘device_2_15’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 99, ‘time’: ‘1680157078’}
{‘deviceID’: ‘device_1_17’, ‘deviceType’: ‘油烟机’, ‘deviceSignal’: 50, ‘time’: ‘1680157081’}

二、需求说明和代码实现

求: 各种信号强度>30的设备的各个类型的数量和平均信号强度,先过滤再聚合

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("综合案例: 物联网案例实现")# 1- 创建SparkSession对象spark = SparkSession.builder \.appName('file_source') \.master('local[1]') \.config('spark.sql.shuffle.partitions', 4) \.getOrCreate()# 2- 从Kafka中读取消息数据df = spark.readStream \.format('kafka') \.option('kafka.bootstrap.servers', 'node1:9092,node2:9092,node3:9092') \.option('subscribe', 'search-log-topic') \.option('startingOffsets', 'earliest') \.load()# 3- 处理数据# 求: 各种信号强度>30的设备的各个类型的数量和平均信号强度,先过滤再聚合# 数据: {'deviceID': 'device_4_4', 'deviceType': '灯', 'deviceSignal': 20, 'time': '1677243108'}df = df.selectExpr('CAST(value AS STRING)')# 思考 如何做呢?# 需要将这个Json字符串中各个字段都获取出来, 形成一个多列的数据# 专业名称: JSON拉平# 涉及函数: get_json_object()    json_tuple()# df.createTempView('t1')# SQL# df = spark.sql("""#     select#         get_json_object(value,'$.deviceID')  as deviceID,#         get_json_object(value,'$.deviceType') as deviceType,#         get_json_object(value,'$.deviceSignal') as deviceSignal,#         get_json_object(value,'$.time') as time#     from  t1# """)# df = spark.sql("""#     select#         json_tuple(value,'deviceID','deviceType','deviceSignal','time') as (deviceID,deviceType,deviceSignal,time)#     from  t1# """)# DSL# df = df.select(#     F.get_json_object('value', '$.deviceID').alias('deviceID'),#     F.get_json_object('value','$.deviceType').alias('deviceType'),#     F.get_json_object('value','$.deviceSignal').alias('deviceSignal'),#     F.get_json_object('value','$.time').alias('time')# )df = df.select(F.json_tuple('value', 'deviceID', 'deviceType', 'deviceSignal', 'time').alias('deviceID', 'deviceType','deviceSignal', 'time'))# 求: 各种信号强度>30的设备的各个类型的数量和平均信号强度,先过滤再聚合df = df.where(df['deviceSignal'] > 30).groupBy('deviceType').agg(F.count('deviceID').alias('device_cnt'),F.round(F.avg('deviceSignal'), 2).alias('deviceSignal_avg'))# 4- 打印结果df.writeStream.format('console').outputMode('complete').start().awaitTermination()

总结

今天主要和大家分享了如何用Pyspark_结构化流结合kafka模拟物连网小案例。

http://www.laogonggong.com/news/85768.html

相关文章:

  • 财经门户网站开发沈阳企业自助建站系统
  • 大连网站开发公司力推选仟亿科技wordpress天气代码
  • 诸暨网站制作公司 网页设计师工作室
  • 直播间挂人气自助网站网站建设 焦作
  • 网站做专业团队什么叫优化
  • 雅安工程交易建设网站电影网站模板源代码
  • 工程建设的信息网站网站暂停怎么做
  • 成都网站建设 网络公司西安英文网站建设
  • 学网站建设设计要钱吗什么网站做任务可以赚钱
  • 潍坊企业做网站网站建设案例分析
  • 深圳住房和建设局网站首页pc网站模板
  • 如何查看网站空间大小谷歌搜索引擎免费
  • 中医院网站模板个人开发网站
  • 网站文字不能编辑器德阳住房和城乡建设局网站
  • 自己做网站卖矿山设备wordpress主题换字体
  • 做推送网站游戏钓鱼网站开发
  • 做网站用apache还是nginx做网站详细步骤
  • 青浦华新网站建设保山市住房和建设局网站
  • 营销型网站seo河南艾特网站建设
  • 海南省城乡和建设厅网站首页商城网站建设行业现状
  • 网站建设会议汕头网络营销公司
  • 珠海网站建设熊掌号如何改变wordpress的版面
  • 湖州网站建设哪家好东平房产网
  • 关于做ppt的网站哈尔滨发布信息的网站
  • 建设项目自主验收公示的网站网站建设 发展历程
  • 手把手教网站建设自己做个网站用什么软件好
  • 域名买完后如何做网站北京网站建设分析论文
  • 上海营销平台网站建设平面广告设计经典案例
  • 个人怎样建立网站网站建设大赛
  • 瓷砖 中企动力 网站建设电子商务网站建设课设学生体会