自定义Spark函数

函数的分类

UDF:一对一的函数【User Defined Functions】

  • substr、length

UDAF:多对一的函数【User Defined Aggregation Functions】

  • count、sum、max、min、avg

UDTF:一对多的函数【User Defined Tabular Functions】

  • explode

函数的定义方式

  • 1、register方式定义的函数既可以用于SQL风格,也可以用于DSL风格

  • 2、udf和pandas_df定义的函数只能用于DSL风格

需求

  • 原始数据:datas/udf/music.tsv

    1
    2
    3
    01	周杰伦	  150/175
    02 周杰 130/185
    03 周华健 148/178
  • 目标结果

    1
    2
    3
    01	周杰伦	 150斤/175cm
    02 周杰 130斤/185cm
    03 周华健 148斤/178cm

udf注册方式定义UDF函数

1
2
3
4
# 导包:DSL函数库
import pyspark.sql.functions as F
# 定义
UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)

register方式

1
2
3
4
5
UDF函数名2 = spark.udf.register(UDF函数名1, 函数的处理逻辑)

定义:spark.udf.register()
UDF函数名2:DSL中调用UDF使用的
UDF函数名1:SQL中调用UDF使用

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# register方式创建自定义函数
music_df = spark\
.read\
.schema('id int, name string, info string')\
.option('sep', '\t')\
.csv('/datas/spark/input/test5')
music_df.show()
# 创建视图
music_df.createOrReplaceTempView('music')

# 需求: 将DataFrame中的表格第三列转为150斤/175cm(先写sql, 自定义函数将某一列转换

# register方式来自定义udf函数 DSL风格函数名 = spark.udf.register(SQL风格函数名, 函数的处理逻辑)
my_process2 = spark.udf.register('my_process', lambda x: (x.split('/')[0] +'斤/'+x.split('/')[1] + 'cm'))
# udf注册方式定义UDF函数(这种方式只是适用于DSL风格的编程) UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)
def to_new_info(x):
return x.split('/')[0] +'斤/' + x.split('/')[1] + 'cm'
music_udf = F.udf(f=lambda old_info: to_new_info(old_info), returnType=StringType())

spark.sql(
"""
select
id,
name,
my_process(info)
from music;
"""
).show()

# DSL 风格
music_df.select('id', 'name', music_udf('info').alias('new_info')).show()