商品数据采集主要分为三个部分: 1, 获取上次记录的最大时间值(从元数据库的表中获取)2, 根据上一次采集商品数据中updateAt的最大值查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据.3, 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)

需要优化的点:
1,使用事务,事务的使用需要结合try, except, else来使用.同时注意执行事务操作不需要调用工具类中的execute函数,因为函数会自动commit, 所以需要自定义without_commit,而且如果设置每执行1000条sql就提交一次事务,而在for,else中使用事务再次提交一次
2,使用日志工具(每执行1000句sql就记录一下,同时提交事务.日志的设置比较随便,仁者见仁智者见智.
3,将读取到的数据记录写入元数据表中,注意元数据表中的字段,一个是记录本次采集的最大时间(updatetime),以及采集到了哪一行,方便后续的采集动作.
4,使用time工具计算执行所需的时间,strat_time = time.time()

模型文件

使用一个元组类型的数据能够快速获得sql插入语句,csv文件插入语句,以及生成csv表头.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
条码商品信息模型类
"""
from EmploymentClass.PythonETL.util import str_util
from EmploymentClass.PythonETL.config import project_config_file as conf


class BarcodeModel(object):
"""条码商品信息模型类"""
def __init__(self, data_tuple: tuple):
"""条码商品模型对象初始化"""
self.code = data_tuple[0]
self.name = str_util.clear_str(data_tuple[1])
self.spec = str_util.clear_str(data_tuple[2])
self.trademark = str_util.clear_str(data_tuple[3])
self.addr = str_util.clear_str(data_tuple[4])
self.units = str_util.clear_str(data_tuple[5])
self.factory_name = str_util.clear_str(data_tuple[6])
self.trade_price = data_tuple[7]
self.retail_price = data_tuple[8]
self.update_at = str(data_tuple[9])
self.wholeunit = data_tuple[10]
self.wholenum = data_tuple[11]
self.img = data_tuple[12]
self.src = data_tuple[13]

def generate_insert_sql(self):
"""生成SQL的插入语句"""
sql = f"REPLACE INTO {conf.target_barcode_table_name}(" \
f"code,name,spec,trademark,addr,units,factory_name,trade_price," \
f"retail_price,update_at,wholeunit,wholenum,img,src) VALUES(" \
f"'{self.code}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.spec)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.trademark)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.addr)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.units)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.factory_name)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.trade_price)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.retail_price)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.update_at)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.wholeunit)}, " \
f"{str_util.check_number_null_and_transform_to_sql_null(self.wholenum)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.img)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.src)}" \
f")"

return sql

@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
return f"code{sep}" \
f"name{sep}" \
f"spec{sep}" \
f"trademark{sep}" \
f"addr{sep}" \
f"units{sep}" \
f"factory_name{sep}" \
f"trade_price{sep}" \
f"retail_price{sep}" \
f"update_at{sep}" \
f"wholeunit{sep}" \
f"wholenum{sep}" \
f"img{sep}" \
f"src\n"

def to_csv(self, sep=','):
"""生成csv数据行"""
csv_line = \
f"{self.code}{sep}" \
f"{self.name}{sep}" \
f"{self.spec}{sep}" \
f"{self.trademark}{sep}" \
f"{self.addr}{sep}" \
f"{self.units}{sep}" \
f"{self.factory_name}{sep}" \
f"{self.trade_price}{sep}" \
f"{self.retail_price}{sep}" \
f"{self.update_at}{sep}" \
f"{self.wholeunit}{sep}" \
f"{self.wholenum}{sep}" \
f"{self.img}{sep}" \
f"{self.src}\n"

return csv_line

条码采集文件(带注释

跟上一个处理日志文件的思路差不多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from EmploymentClass.PythonETL.util import mysql_util
from EmploymentClass.PythonETL.config import project_config_file as cfg
from EmploymentClass.PythonETL.model.barcode_model import BarcodeModel
from EmploymentClass.PythonETL.util import logging_util
import time

"""
需要优化的点:
1,使用事务
2,使用日志工具(每执行1000句sql就记录一下,同时提交事务
3,将读取到的数据记录写入元数据表中
4,使用time工具计算执行所需的时间
"""
# 创建日志对象
logger = logging_util.init_logger()

# todo: Part1: 获取上次记录的最大时间值
# 后台条码商品采集业务.
# 1. 查询元数据库表,获取上批次条码商品数据处理的最大时间
# 创建元数据库连接对象
# 检查元数据表是否存在,不存在则创建
# 查询商品采集元数据表中,上一次采集记录的 updateAt 的最大值
# 判断获取到的数据集是否有值,创建变量保存该值
# 如果有值则保存
# 如果没有值则保存None

# 创建元数据库连接对象
meat_db = mysql_util.get_mysql_util(
user=cfg.metadata_user,
password=cfg.metadata_password,
database=cfg.metadata_db
)

# 检查元数据表是否存在,不存在则创建
meat_db.check_table_exists_create(
cfg.metadata_db,
cfg.metadata_barcode_table_name,
cfg.metadata_barcode_table_create_cols
)

# 查询商品采集元数据表中, 上一次采集记录的时间最大值(time_record)(如果有值则保存,没有则设置为None)
max_tuple_date = meat_db.query(f"select max(time_record) from {cfg.metadata_barcode_table_name};")
# max_tuple_date <class 'tuple'>: ((None,),)
# 判断最大是日期时间是否为None
if max_tuple_date[0][0] is None:
# 没有最大日期,将最大日期设置为None
max_date = None
else:
# 有最大日期,将最大日期读取出来
max_date = max_tuple_date[0][0]
logger.info(f"上次采集的时间是: {max_date}")

# todo Part2根据上一次采集商品数据中updateAt的最大值查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据
# 与后台数据表建立连接
source_db = mysql_util.get_mysql_util(
user=cfg.source_user,
password=cfg.source_password,
database=cfg.source_data_db
)
# 检查源数据库是否存在(存在为True,不存在为False
if not source_db.check_table_exists(cfg.source_data_db,cfg.target_barcode_table_name):
# 如果不存在直接退出,
exit("后台商品不存在,请找后台开发人员!")
# 从后台数据库中获取数据(首先判断max_date是否为None,如果是None则不能和timestep类型对比
if max_date is None:
# 如果最大的日期值为None,则是全量采集
select_sql = f"select * from {cfg.target_barcode_table_name}"
else:
# 否则将是增量采集(这里max_date要加单引号,才能比较)
select_sql = f"select * from {cfg.target_barcode_table_name} where updateAt > '{max_date}';"
# 从后台数据库中获取数据(元组嵌套元组
result_data = source_db.query(select_sql)
# 测试要采集的数据量
print(f"要采集的数据条数为: {len(result_data)}")
# 判断要处理的数据量(如果是0条,直接退出程序)
if len(result_data) == 0:
exit("没有要处理的数据,退出采集!")

# todo Part3 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
# 创建目标数据数据库连接(这里已经创建了3个连接了分别是元数据,后台数据,目标数据
target_db = mysql_util.get_mysql_util(
user=cfg.target_user,
password=cfg.target_password,
database=cfg.target_retail_db
)
# 判断其中的数据表是否存在,不存在则创建
target_db.check_table_exists_create(
cfg.target_retail_db,
cfg.target_barcode_table_name,
cfg.target_barcode_table_create_cols
)
# 创建文件输出通道
csv_file = open(cfg.barcode_output_csv_root_path+cfg.barcode_orders_output_csv_file_name, "a", encoding="utf8")
# 写入csv表头到文件中(类中的静态方法可以通过类.方法名直接调用,不需要创建对象
csv_file.write(BarcodeModel.get_csv_header())
# 根据数据源采集结果创建数据模型
# 定义变量用于计算sql执行的次数
line_count = 0
# 定义变量用于记录本次采集的开始时间
start_time = time.time()
# 开启事务
target_db.begin_transaction()
# 遍历元组获取单个元组数据 result_data: (('014779000888 ','日本眉机', '2017710165644'),('4000000002604','丁丁孜然酱228G/瓶','2017710165644'))"
for tuple_data in result_data:
try:
line_count += 1
# 创建模型对象
bm_model = BarcodeModel(tuple_data)
# 向目标表中插入数据
target_db.insert_single_sql_without_commit(bm_model.generate_insert_sql())
# 向csv文件中写入数据
csv_file.write(bm_model.to_csv())
except Exception as e:
logger.error(f"插入数据出错:{e},回滚事务!")
# 出现异常就回滚事务
target_db.rollback_transaction()
# 抛出异常,python解释器停止运行
raise e
else:
# 1000次/2000/提交事务
if line_count % 1000 == 0:
# 没有出现异常就提交事务(文件需要flush)
target_db.commit_transaction()
# 注意提交事务之后,事务就会关闭了这时需要为下个插入数据开启事务
target_db.begin_transaction()
# 文件刷新出缓冲区
csv_file.flush()
# 遍历完要再提交一次事务(但是csv文件不必flush,因为close时会自动写入
else:
# 定义变量用于记录本次采集的结束时间
end_time = time.time()
# for循环正常执行结束执行这里(也就是说没出现异常
target_db.commit_transaction()
# 查询目标数据表中的updateAt最大值
query_result = target_db.query(f"select max(update_at) from {cfg.target_barcode_table_name}")
# query_result : <class 'tuple'>: (datetime.datetime(2017, 7, 10, 19, 9, 9),)
# 将得到的updateAt最大值写入元数据库中(id, time_record, gather_line_count
meat_db.insert_single_sql(f"insert into table {cfg.metadata_barcode_table_name}(time_record,gather_line_count)"
f"values ({query_result[0][0]},{line_count});")
# 遍历完并且写完数据之后,释放资源
csv_file.close()
target_db.close()
meat_db.close()

END