1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| import time
from pyspark import SparkContext, SparkConf import os import sys from pyspark.sql import SparkSession
from pyspark.sql.functions import *
os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['SPARK_HOME'] = '/export/server/spark' os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__': print("WordCount")
""" .appName:应用的名字 config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个 getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建 """ spark = SparkSession \ .builder \ .master('local[2]') \ .appName("wordcount_1") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate()
input_df = spark\ .readStream\ .format('socket')\ .option('host', 'node1')\ .option('port', 9999)\ .load() filter_element = ['?', ',', '.', '!', ' ', '-'] rs_df = input_df\ .select(explode(split('value', '\\s+')).alias('word'))\ .filter(~col('word').isin(filter_element))
rs_df\ .writeStream\ .outputMode('append')\ .format('csv')\ .option('path', 'file:///root/file_sink')\ .option('checkpointLocation','file:///root/file_sink_check_point')\ .start().awaitTermination()
|