【Spark笔耕不辍(三)】Kafka生产者消费者API及核心原理
RetrospectKafka常用命令
创建主题:指定分区数和副本数
topic bigdata01 主题的名字
partitions 3 分区的个数
replication-factor 2 副本个数
bootstrap-server kafka内部服务器通信地址,端口默认是9092
创建主题:不指定分区数和副本数,默认是1个分区,1个副本
查看所有主题
查看某一个主题的详情(关键字describe)
12345678910111213141516171819202122# 创建主题:指定分区数和副本数kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092# 创建主题:不指定分区数和副本数,默认是1个分区,1个副本kafka-topics.sh --create --topic test1 --bootstrap-server node1:9092, ...
【SparkSQL笔耕不辍(一)】UDF及新零售案例
Spark连接Hive
Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
本质上:SparkSQL访问了Metastore服务获取了Hive元数据,基于元数据提供的地址进行计算
命令行集成
step1:第一台机器启动HDFS和Hive的Metastore服务
1234567# 不管你其他服务什么模式运行 只要你是以hive 是使用hdfs 就需要把3台虚拟机都启动# 启动HDFS服务:NameNode和DataNodesstart-dfs.sh # 启动HiveMetaStore 服务start-metastore.sh
step2:在Spark中构建配置文件指定metastore地址【集群模式所有节点都必须配】
==spark local模式==:只需要在node1配置hive metastore服务地址即可
spark集群模式:standalone、yarn 、mesos 需要在集群中每个节点上都添加hive metastore服务地址
123456789 ...
【SparkSQL】Spark自定义函数
自定义Spark函数函数的分类UDF:一对一的函数【User Defined Functions】
substr、length
UDAF:多对一的函数【User Defined Aggregation Functions】
count、sum、max、min、avg
UDTF:一对多的函数【User Defined Tabular Functions】
explode
函数的定义方式
1、register方式定义的函数既可以用于SQL风格,也可以用于DSL风格
2、udf和pandas_df定义的函数只能用于DSL风格
需求
原始数据:datas/udf/music.tsv
12301 周杰伦 150/17502 周杰 130/18503 周华健 148/178
目标结果
12301 周杰伦 150斤/175cm02 周杰 130斤/185cm03 周华健 148斤/178cm
udf注册方式定义UDF函数1234# 导包:DSL函数库import pyspark.sql.functions as F# 定义UDF变量 ...
【SparkSQL】Spark读取外部文件及写出数据(附开窗函数)
这里写一些SQL中的难点
炸裂函数
spark.sql(
"""
with t2 as(
select explode(split(value,' ')) as word from t1 where length (value) > 0
)
select word, count(*) cnt from t2 group by word order by cnt
"""
).show() # Explode函数里面的值必须为map类型或者array类型, 也就是列表
12345678910111213141516171819202122232425- <font size=4 color=red face='华文楷体'>开窗函数</font> - SQL风格- ```python # SQL风格 - 创建临时视图, 写SQL emp_df.createO ...
【SparkSQL】SparkSQL词频统计Demo
SparkSQL的概念
1、SparkSQL是Spark发展后期产生的,是为了使用SQL风格来替换之前SparkCore的RDD风格
2、SparkSQL既可以做离线,也可以做实时
3、SparkSQL的编程有两种风格:SQL风格、DSL分格
SparkSQL和SparkCore区别
1、SparkCore的核心数据类型是RDD,SparkSQL核心数据类型是DataFrame
2、SparkCore的核心入口类是SparkContext、SparkSQL的核心入口类是:SparkSession
3、SparkSQL是基于SparkCore,SparkSQL代码底层就是rdd
4、SparkCore只侧重数据本身,没有表概念,SparkSQL要侧重:数据+表结构
SparkSQL的SQL风格词频统计
SparkSQL的入口类为SparkSession, 导包方式: from pyspark.sql import SparkSession
创建SparkSQL的入口类对象
spark = SparkSession \
.builder \
.app ...
【SparkCore】SparkTheory理论
Spark容错机制
两个问题引入Spark容错机制
问题1:计算机在存储数据的过程中如何保证数据的安全?
a. 内存快照:将内存中所有数据拍摄一个快照,存储在文件中,读取快照文件恢复内存中数据
b. 操作日志:将内存变化操作日志追加记录在一个文件中,下一次读取文件对内存重新操作
HDFS:edits文件,保证HDFS内存元数据的安全的
c. 副本机制:将数据构建多份冗余副本
HDFS:副本机制,保证HDFS数据的安全
d. 依赖关系:每份数据保留与其他数据之间的一个转换关系
问题2:Spark中RDD的数据如何保证数据的安全?
每个RDD在构建数据时,会根据自己来源一步步倒推到数据来源,然后再一步步开始构建RDD数据
当RDD的数据被触发调用时,就会根据RDD的血缘关系层层构建RDD的数据
如果在计算过程中,RDD的数据丢失,就会通过依赖关系重新构建,彻底保证了RDD的数据安全
但是: 如果一个RDD被触发多次,这个RDD就会按照依赖关系被构建多次,性能相对较差,怎么解决?
Persist缓存机制
问题:RDD依赖血缘机制保证数据安全,那每调用一次RDD都要 ...
【SparkCore】搜狗&百度日志分析
搜狗日志分析数据格式1搜索时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL
需求分析
首先要做的是数据的清洗和转换
过滤掉无效的数据
无效的数据是读取的数据的长度小于6就排除(通过filter算子实现)
筛选出有用的字段
筛选出的字段方便我们后续的分析
需求1:
统计热门搜索词Top10【出现次数最多前10个==搜索词==】
预期结果:(搜索词,出现次数)
实现思路:
1, 读取数据, 数据清洗
2, 将搜索内容通过分词器分解(每个词作为一个独立的单元)
3, 对每一个词作标记,出现一次
4, 通过reduceByKey算子计算每个词出现的次数
需求2
统计所有用户所有搜索中最大搜索次数、最小搜索次数、平均搜索次数
预期结果:最大搜索次数、最小搜索次数、平均搜索次数
实现思路:
1, 读取数据, 数据清洗
2, 构建(userid, 查询词设为1)kv键值对格式的数据
3, 进行reduceByKey聚合操作,得到每个用户的总的搜索次数
4, 比较所有用户, ...
【SparkCore】RDD算子
Retrospect转换算子和行为算子的区别
1,会不会触发job任务的执行: 转换算子不会触发job的执行,行为算子会
2, 生成新的RDD: 转换算子会从原有的RDD中产生新的RDD
网络日志案例
我一直保持只有做题才能真正检验你自己的水平, 哪怕你平时不听课,但老师布置的作业,以及上课的题目你都会.那就OK.
下面是一个案例, 能独立做出来也是对自己的一个检验罢.
PV,UV概念
pv:网页访问量,每访问一个页面,则就算一个pv
uv:独立访客数,每来一个不同的用户,就算一个uv
举例1234567891011121001 2023-07-15 08:15 a.html1001 2023-07-15 09:15 b.html1001 2023-07-15 10:15 c.html1001 2023-07-15 11:15 d.html1002 2023-07-15 08:15 a.html1002 2023-07-15 09:15 b.html1002 2023-07-15 10:15 c.html1002 2023-07-15 11:15 ...
【Spark】PySpark刷题本(二)
数据准备12345678910111213研发部,1,乔峰,男,20,5000研发部,2,段誉,男,21,2800研发部,3,虚竹,男,23,8000研发部,4,阿紫,女,18,4000销售部,5,扫地僧,男,85,9000销售部,6,李秋水,女,33,4500销售部,7,鸠摩智,男,50,3900销售部,8,天山童姥,女,60,8900销售部,9,慕容博,男,58,3400人事部,10,丁春秋,男,90,7000人事部,11,王语嫣,女,50,7700人事部,12,阿朱,女,43,5500人事部,13,无崖子,男,51,8800
题目123456789练习题目:(1) 读入emp.txt文档,生成RDD(2) 获得年龄大于50的员工(3) 获得人事部性别为男的员工(4) 获取比无崖子薪资高的员工信息(5) 获得整个公司薪资最高的员工名字(6) 获得每个部门的员工人数,并降序排序(7) 获得每个部门的平均薪资(8) 获得每个部门的最高薪水
答案1234567891011121314151617181920212223242526272829303132333435363738394 ...
【SparkCore】SparkOnYarn及RDD理论
Retrospect
书接上回,这次说说StandAlone的核心原理, 以及SparkOnYarn的执行流程(原理的说). 还有RDD算子的特性以及创建及分类
StandAlone原理
集群角色, 不管有没有任务运行,集群永远都有以下两个角色
Master:管理,分配资源,监控worker的健康状况(相当于包工头)
Worker: 分配资源,执行任务(相当于组长),向Master定时发送心跳包
Task: 执行任务的(进程)相当于工人
Driver
1、每执行一个任务,Spark会自动启动一个Driver进程,类似于Yarn中AppMaster,负责整个任务执行的监控.当你启动10个Spark任务,系统就会启动10个Driver
2、Driver进程本身由于需要对整个任务进行管理,它本身也需要一定的资源,比如内存和CPU
Executor
1、每当你执行一个Spark任务,Spark需要启动若干个Executor进程,Executor用来管理执行执行任务的Task线程
2、Executor创建完成之后需要向Dirver进行注册
3、当执行任务时,需要多少个Executo ...