自定义Spark函数
函数的分类
UDF:一对一的函数【User Defined Functions】
UDAF:多对一的函数【User Defined Aggregation Functions】
UDTF:一对多的函数【User Defined Tabular Functions】
函数的定义方式
需求
原始数据:datas/udf/music.tsv
1 2 3
| 01 周杰伦 150/175 02 周杰 130/185 03 周华健 148/178
|
目标结果
1 2 3
| 01 周杰伦 150斤/175cm 02 周杰 130斤/185cm 03 周华健 148斤/178cm
|
udf注册方式定义UDF函数
1 2 3 4
| import pyspark.sql.functions as F
UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)
|
register方式
1 2 3 4 5
| UDF函数名2 = spark.udf.register(UDF函数名1, 函数的处理逻辑)
定义:spark.udf.register() UDF函数名2:DSL中调用UDF使用的 UDF函数名1:SQL中调用UDF使用
|
CodeDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import time
from pyspark import SparkContext, SparkConf import os import sys from pyspark.sql import SparkSession
import pyspark.sql.functions as F from pyspark.sql.types import StringType
os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['SPARK_HOME'] = '/export/server/spark' os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__': print("WordCount")
""" .appName:应用的名字 config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个 getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建 """ spark = SparkSession \ .builder \ .master('local[2]') \ .appName("wordcount_1") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate() music_df = spark\ .read\ .schema('id int, name string, info string')\ .option('sep', '\t')\ .csv('/datas/spark/input/test5') music_df.show() music_df.createOrReplaceTempView('music')
my_process2 = spark.udf.register('my_process', lambda x: (x.split('/')[0] +'斤/'+x.split('/')[1] + 'cm')) def to_new_info(x): return x.split('/')[0] +'斤/' + x.split('/')[1] + 'cm' music_udf = F.udf(f=lambda old_info: to_new_info(old_info), returnType=StringType())
spark.sql( """ select id, name, my_process(info) from music; """ ).show()
music_df.select('id', 'name', music_udf('info').alias('new_info')).show()
|