1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
| import datetime import os
import user_agents from pyspark.sql import SparkSession, DataFrame
from com.itheima.online.utils import ConfigLoader import pyspark.sql.functions as F
SPARK_HOME = "/export/server/spark" PYSPARK_HOME = "/root/anaconda3/envs/pyspark_env/bin/python"
os.environ['SPARK_HOME'] = SPARK_HOME os.environ['PYSPARK_HOME'] = PYSPARK_HOME
if __name__ == '__main__': spark = SparkSession.builder \ .appName('spark结构化流处理nginx日志') \ .master('local[*]') \ .config('spark.sql.shuffle.partitions', 20) \ .getOrCreate()
host = ConfigLoader.getKafkaConfig('bootstrapServerHost') port = ConfigLoader.getKafkaConfig('bootstrapServerPort') topic = ConfigLoader.getKafkaConfig('event_topic')
kafka_df = spark.readStream.format('kafka') \ .option('kafka.bootstrap.servers', f"{host}:{port}") \ .option('subscribe', topic) \ .option('startingOffsets', 'earliest') \ .load() \ .selectExpr('cast(value as string)')
etl_df = kafka_df.select(F.json_tuple('value', 'phone_num', 'system_id', 'area', 'user_name', 'user_id', 'visit_time', 'goods_type', "minimum_price", "user_behavior", "goods_detail") .alias('phone_num', 'system_id', 'area', 'user_name', 'user_id', 'visit_time', 'goods_type', "minimum_price", "user_behavior", "goods_detail") ) \ .select('*', F.json_tuple('area', 'province', 'city', 'sp').alias('province', 'city', 'service_provider')) \ .select('*', F.json_tuple('user_behavior', "is_browse", "is_order", "is_buy", "is_back_order", "is_claim") .alias("is_browse", "is_order", "is_buy", "is_back_order", "is_claim")) \ .select('*', F.json_tuple('goods_detail', "goods_name", "browse_page", "browse_time", "to_page", "to_time", "page_keywords").alias("goods_name", "browse_page", "browse_time", "to_page", "to_time", "page_keywords")) \ .selectExpr("cast(phone_num as string) phone_num", "cast(system_id as string) system_id", "cast(province as string) province", "cast(city as string) city", "cast(service_provider as string) service_provider", "cast(user_name as string) user_name", "cast(user_id as string) user_id", "cast(visit_time as string) visit_time", "cast(goods_type as string) goods_type", "cast(minimum_price as float) minimum_price", "cast(is_browse as smallint) is_browse", "cast(is_order as smallint) is_order", "cast(is_buy as smallint) is_buy", "cast(is_back_order as smallint) is_back_order", "cast(is_claim as smallint) is_claim", "cast(goods_name as string) goods_name", "cast(browse_page as string) browse_page", "cast(browse_time as string) browse_time", "cast(to_page as string) to_page", "cast(to_time as string) to_time", "cast(page_keywords as string) page_keywords")
area_agg = F.approxCountDistinct('province').alias('province_area_num') sp_agg = F.approxCountDistinct('service_provider').alias('service_provider_num') stay_duration_agg = F.sum(F.unix_timestamp(F.col('to_time')) - F.unix_timestamp(F.col('browse_time'))).alias( 'stay_duration') browse_num_agg = F.count('browse_page').alias('browse_page_num') is_browse_agg = F.sum(F.col('is_browse')).alias('browse_num') is_order_agg = F.sum(F.col('is_order')).alias('order_num') is_pay_agg = F.sum(F.col('is_buy')).alias('pay_num') is_back_order_agg = F.sum(F.col('is_back_order')).alias('back_order_num') is_claim_agg = F.sum(F.col('is_claim')).alias('claim_num') goods_type_agg = F.first('goods_type').alias('insurance_type_name') min_price_agg = F.min('minimum_price').alias('min_price') page_keyword_agg = F.first('page_keywords').alias('page_keyword') visit_time_agg = F.first('visit_time').alias('visit_time')
result_df = etl_df.groupby('user_id').agg( area_agg, sp_agg, stay_duration_agg, browse_num_agg, is_browse_agg, is_order_agg, is_pay_agg, is_back_order_agg, is_claim_agg, goods_type_agg, min_price_agg, page_keyword_agg, visit_time_agg ).orderBy(F.col("province_area_num").desc(), F.col("service_provider_num").desc(), F.col("stay_duration").desc(), F.col("browse_page_num").desc(), F.col("browse_num").desc(), F.col("order_num").desc(), F.col("pay_num").desc(), F.col("back_order_num").desc(), F.col("claim_num").desc(), F.col("min_price").asc())
def save2mysql(df: DataFrame, batch_id): """ 将结构化流转换批df进行写入mysql :param df: 本次操作的数据 :param batch_id: 本批次的id :return: truncate: true-每次清空表中数据 isolationLevel: None-不使用隔离级别 batchsize: 10- 每10条数据写一次结果表 """ df.write \ .mode("overwrite") \ .format("jdbc") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://up01:3306?rewriteBatchedStatements=true") \ .option("dbtable", "htv_insurance_stream_db.user_event_result") \ .option("user", "root") \ .option("password", "123456") \ .option("truncate", "true") \ .option("useSSL", "false") \ .option("isolationLevel", "NONE") \ .option("batchsize", "10") \ .save()
result_df.writeStream \ .foreachBatch(save2mysql) \ .outputMode('complete') \ .start() \ .awaitTermination()
|