到这里整个ETL全过程已经结束了, 没有很多花里胡哨的东西, 全是干货, 简单做个总结吧!

订单数据采集

业务需求:

1)将采集的订单JSON数据保存到目标数据库中

2)将采集的订单JSON写出到 CSV 文件中

3)采集数据时JSON文件不能重复采集

实现思路:

  1. 获取订单文件夹下面有哪些订单JSON文件
  2. 查询元数据库表中已经被采集的订单JSON文件,来对比确定要采集新的订单JSON文件
  3. 针对待采集的新订单JSON文件,进行数据采集(ETL操作->mysql->csv)
  4. 将本次采集的订单JSON文件,记录到元数据库的表中

json模块的使用

json.loads 将JSON字符串数据转换为Python中的数据类型

json.dumps 将python中的数据类型转换为JSON字符串

jsonloads和jsonload的最大区别就是jsonload的参数是一个json文件,而jsonloads的参数是字符串

事务: 注意事务的开启时机以及事务的提交时机,事务的提交时机是一个文件写入完毕,没有出现错误,由此可以判断事务的开启时机
元数据: 注意元数据表中的字段(id自增的不需要写, 处理文件名称, 处理文件行数, 时间自动生成的不需要写)
日志: 注意日志可以计算处理数据的总耗时

问题1: 我们如何将上述json数据转换为msyql中的数据表???

进行数据的拆分,将上述数据分别放置到不同的数据表中,并且进行外键关联

问题2: 上述JSON数据转换后将转换为几张表, 他们的关系是怎样的???

转换为两张表即可 订单表 订单详情表

外键关联, 在订单详情表中创建外键,绑定订单表的主键即可

问题3: 我们如何将上述JSON数据转换为模型???

我们将其转换为两个模型,每个模型中的数据和数据表拆分的逻辑相似

问题4: 上述JSON数据转换为模型后有几个模型类, 他们的关系是怎样的???

拆分后可以有两个模型, 订单模型 订单详情模型

订单模型对象的属性中保存的值就是订单详情模型,一个订单模型可以保存对个订单详情模型

MySQL的插入形式

  1. INSERT INTO : 插入数据,如果插入失败(唯一约束生效,插入重复主键), 报错
  2. INSERT IGNORE INTO : 插入数据,如果插入失败(唯一约束生效,插入重复主键), 忽略
  3. REPLACE INTO: 插入数据,如果插入失败(唯一约束生效,插入重复主键), 替换

例如插入数据的id值为3 但是表中已有该id的数据记录

则 1 会报错 2 则不插入数据 3 会覆盖原数据

模型文件OrderModel

订单业务数据模型:
1)订单数据模型:OrdersModel : 存储的是json数据中的订单数据
2)单个售卖商品数据模型:SingleProductSoldModel : 存储的是订单中的单个商品
3)订单详情数据模型:OrdersDetailModel : 存储的是订单中的全部商品(本质就是讲多个SingleProductSoldModel嵌套起来)
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel : 本质就是订单+商品详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
"""
订单业务数据模型:
1)订单数据模型:OrdersModel
2)单个售卖商品数据模型:SingleProductSoldModel
3)订单详情数据模型:OrdersDetailModel
4)原始数据模型(订单数据+订单详情数据组合模型):RetailOriginJsonModel
"""
import json
from EmploymentClass.PythonETL.config import project_config_file as conf
from EmploymentClass.PythonETL.util import str_util, time_util

class OrdersModel(object):
"""订单数据模型"""
def __init__(self, data):
"""
利用传入的订单json数据构建订单数据模型对象
"""
# 将 json 数据转换为字典
data = json.loads(data)
# 初始化订单数据模型对象
self.discount_rate = data['discountRate'] # 折扣率
self.store_shop_no = data['storeShopNo'] # 店铺店号(无用列)
self.day_order_seq = data['dayOrderSeq'] # 本单为当日第几单
self.store_district = data['storeDistrict'] # 店铺所在行政区
self.is_signed = data['isSigned'] # 是否签约店铺(签约第三方支付体系)
self.store_province = data['storeProvince'] # 店铺所在省份
self.origin = data['origin'] # 原始信息(无用)
self.store_gps_longitude = data['storeGPSLongitude'] # 店铺GPS经度
self.discount = data['discount'] # 折扣金额
self.store_id = data['storeID'] # 店铺ID
self.product_count = data['productCount'] # 本单售卖商品数量
self.operator_name = data['operatorName'] # 操作员姓名
self.operator = data['operator'] # 操作员ID
self.store_status = data['storeStatus'] # 店铺状态
self.store_own_user_tel = data['storeOwnUserTel'] # 店铺店主电话
self.pay_type = data['payType'] # 支付类型
self.discount_type = data['discountType'] # 折扣类型
self.store_name = data['storeName'] # 店铺名称
self.store_own_user_name = data['storeOwnUserName'] # 店铺店主名称
self.date_ts = data['dateTS'] # 订单时间
self.small_change = data['smallChange'] # 找零金额
self.store_gps_name = data['storeGPSName'] # 店铺GPS名称
self.erase = data['erase'] # 是否抹零
self.store_gps_address = data['storeGPSAddress'] # 店铺GPS地址
self.order_id = data['orderID'] # 订单ID
self.money_before_whole_discount = data['moneyBeforeWholeDiscount'] # 折扣前金额
self.store_category = data['storeCategory'] # 店铺类别
self.receivable = data['receivable'] # 应收金额
self.face_id = data['faceID'] # 面部识别ID
self.store_own_user_id = data['storeOwnUserId'] # 店铺店主ID
self.payment_channel = data['paymentChannel'] # 付款通道
self.payment_scenarios = data['paymentScenarios'] # 付款情况(无用)
self.store_address = data['storeAddress'] # 店铺地址
self.total_no_discount = data['totalNoDiscount'] # 整体价格(无折扣)
self.payed_total = data['payedTotal'] # 已付款金额
self.store_gps_latitude = data['storeGPSLatitude'] # 店铺GPS纬度
self.store_create_date_ts = data['storeCreateDateTS'] # 店铺创建时间
self.store_city = data['storeCity'] # 店铺所在城市
self.member_id = data['memberID'] # 会员ID

def generate_insert_sql(self):
"""
生成添加表数据的SQL语句
"""
sql = f"INSERT IGNORE INTO {conf.target_orders_table_name}(" \
f"order_id,store_id,store_name,store_status,store_own_user_id," \
f"store_own_user_name,store_own_user_tel,store_category," \
f"store_address,store_shop_no,store_province,store_city," \
f"store_district,store_gps_name,store_gps_address," \
f"store_gps_longitude,store_gps_latitude,is_signed," \
f"operator,operator_name,face_id,member_id,store_create_date_ts," \
f"origin,day_order_seq,discount_rate,discount_type,discount," \
f"money_before_whole_discount,receivable,erase,small_change," \
f"total_no_discount,pay_total,pay_type,payment_channel," \
f"payment_scenarios,product_count,date_ts" \
f") VALUES(" \
f"'{self.order_id}', " \
f"{self.store_id}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_status)}, " \
f"{self.store_own_user_id}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_own_user_tel)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_category)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_address)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_shop_no)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_province)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_city)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_district)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_address)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_longitude)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.store_gps_latitude)}, " \
f"{self.is_signed}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.operator)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.operator_name)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.face_id)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.member_id)}, " \
f"'{time_util.ts13_to_date_str(self.store_create_date_ts)}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.origin)}, " \
f"{self.day_order_seq}, " \
f"{self.discount_rate}, " \
f"{self.discount_type}, " \
f"{self.discount}, " \
f"{self.money_before_whole_discount}, " \
f"{self.receivable}, " \
f"{self.erase}, " \
f"{self.small_change}, " \
f"{self.total_no_discount}, " \
f"{self.payed_total}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.pay_type)}, " \
f"{self.payment_channel}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.payment_scenarios)}, " \
f"{self.product_count}, " \
f"'{time_util.ts13_to_date_str(self.date_ts)}')"

return sql

@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
header = f"order_id{sep}" \
f"store_id{sep}" \
f"store_name{sep}" \
f"store_status{sep}" \
f"store_own_user_id{sep}" \
f"store_own_user_name{sep}" \
f"store_own_user_tel{sep}" \
f"store_category{sep}" \
f"store_address{sep}" \
f"store_shop_no{sep}" \
f"store_province{sep}" \
f"store_city{sep}" \
f"store_district{sep}" \
f"store_gps_name{sep}" \
f"store_gps_address{sep}" \
f"store_gps_longitude{sep}" \
f"store_gps_latitude{sep}" \
f"is_signed{sep}" \
f"operator{sep}" \
f"operator_name{sep}" \
f"face_id{sep}" \
f"member_id{sep}" \
f"store_create_date_ts{sep}" \
f"origin{sep}" \
f"day_order_seq{sep}" \
f"discount_rate{sep}" \
f"discount_type{sep}" \
f"discount{sep}" \
f"money_before_whole_discount{sep}" \
f"receivable{sep}" \
f"erase{sep}" \
f"small_change{sep}" \
f"total_no_discount{sep}" \
f"pay_total{sep}" \
f"pay_type{sep}" \
f"payment_channel{sep}" \
f"payment_scenarios{sep}" \
f"product_count{sep}" \
f"date_ts\n"

return header

def check_and_transform_area(self):
"""
检查省市区内容,为空就转换为未知
"""
if str_util.check_null(self.store_province):
self.store_province = '未知省份'
if str_util.check_null(self.store_city):
self.store_city = '未知城市'
if str_util.check_null(self.store_district):
self.store_district = '未知行政区'

def check_and_transform_all_column(self):
"""
转换全部的列,如果是空内容,就将其设置为空字符串
"""
self.discount_rate = str_util.check_null_and_transform(self.discount_rate)
self.store_shop_no = str_util.check_null_and_transform(self.store_shop_no)
self.day_order_seq = str_util.check_null_and_transform(self.day_order_seq)
self.store_district = str_util.check_null_and_transform(self.store_district)
self.is_signed = str_util.check_null_and_transform(self.is_signed)
self.store_province = str_util.check_null_and_transform(self.store_province)
self.origin = str_util.check_null_and_transform(self.origin)
self.store_gps_longitude = str_util.check_null_and_transform(self.store_gps_longitude)
self.discount = str_util.check_null_and_transform(self.discount)
self.store_id = str_util.check_null_and_transform(self.store_id)
self.product_count = str_util.check_null_and_transform(self.product_count)
self.operator_name = str_util.check_null_and_transform(self.operator_name)
self.operator = str_util.check_null_and_transform(self.operator)
self.store_status = str_util.check_null_and_transform(self.store_status)
self.store_own_user_tel = str_util.check_null_and_transform(self.store_own_user_tel)
self.pay_type = str_util.check_null_and_transform(self.pay_type)
self.discount_type = str_util.check_null_and_transform(self.discount_type)
self.store_name = str_util.check_null_and_transform(self.store_name)
self.store_own_user_name = str_util.check_null_and_transform(self.store_own_user_name)
self.date_ts = str_util.check_null_and_transform(self.date_ts)
self.small_change = str_util.check_null_and_transform(self.small_change)
self.store_gps_name = str_util.check_null_and_transform(self.store_gps_name)
self.erase = str_util.check_null_and_transform(self.erase)
self.store_gps_address = str_util.check_null_and_transform(self.store_gps_address)
self.order_id = str_util.check_null_and_transform(self.order_id)
self.money_before_whole_discount = str_util.check_null_and_transform(self.money_before_whole_discount)
self.store_category = str_util.check_null_and_transform(self.store_category)
self.receivable = str_util.check_null_and_transform(self.receivable)
self.face_id = str_util.check_null_and_transform(self.face_id)
self.store_own_user_id = str_util.check_null_and_transform(self.store_own_user_id)
self.payment_channel = str_util.check_null_and_transform(self.payment_channel)
self.payment_scenarios = str_util.check_null_and_transform(self.payment_scenarios)
self.store_address = str_util.check_null_and_transform(self.store_address)
self.total_no_discount = str_util.check_null_and_transform(self.total_no_discount)
self.payed_total = str_util.check_null_and_transform(self.payed_total)
self.store_gps_latitude = str_util.check_null_and_transform(self.store_gps_latitude)
self.store_create_date_ts = str_util.check_null_and_transform(self.store_create_date_ts)
self.store_city = str_util.check_null_and_transform(self.store_city)
self.member_id = str_util.check_null_and_transform(self.member_id)

def to_csv(self, sep=','):
"""
生成 csv 数据,分割符默认为逗号。
Note: 生成的数据顺序和header是一一对应的,不要混乱了。
"""
self.check_and_transform_area()
self.check_and_transform_all_column()

csv_line = \
f"{self.order_id}{sep}" \
f"{self.store_id}{sep}" \
f"{self.store_name}{sep}" \
f"{self.store_status}{sep}" \
f"{self.store_own_user_id}{sep}" \
f"{self.store_own_user_name}{sep}" \
f"{self.store_own_user_tel}{sep}" \
f"{self.store_category}{sep}" \
f"{self.store_address}{sep}" \
f"{self.store_shop_no}{sep}" \
f"{self.store_province}{sep}" \
f"{self.store_city}{sep}" \
f"{self.store_district}{sep}" \
f"{self.store_gps_name}{sep}" \
f"{self.store_gps_address}{sep}" \
f"{self.store_gps_longitude}{sep}" \
f"{self.store_gps_latitude}{sep}" \
f"{self.is_signed}{sep}" \
f"{self.operator}{sep}" \
f"{self.operator_name}{sep}" \
f"{self.face_id}{sep}" \
f"{self.member_id}{sep}" \
f"{time_util.ts13_to_date_str(self.store_create_date_ts)}{sep}" \
f"{self.origin}{sep}" \
f"{self.day_order_seq}{sep}" \
f"{self.discount_rate}{sep}" \
f"{self.discount_type}{sep}" \
f"{self.discount}{sep}" \
f"{self.money_before_whole_discount}{sep}" \
f"{self.receivable}{sep}" \
f"{self.erase}{sep}" \
f"{self.small_change}{sep}" \
f"{self.total_no_discount}{sep}" \
f"{self.payed_total}{sep}" \
f"{self.pay_type}{sep}" \
f"{self.payment_channel}{sep}" \
f"{self.payment_scenarios}{sep}" \
f"{self.product_count}{sep}" \
f"{time_util.ts13_to_date_str(self.date_ts)}\n"

return csv_line

class SingleProductSoldModel(object):
"""订单售卖商品数据模型"""
def __init__(self, order_id, product_detail):
self.order_id = order_id
self.count = product_detail['count']
self.name = product_detail['name']
self.unit_id = product_detail['unitID']
self.barcode = product_detail['barcode']
self.price_per = product_detail['pricePer']
self.retail_price = product_detail['retailPrice']
self.trade_price = product_detail['tradePrice']
self.category_id = product_detail['categoryID']

def generate_value_segment_for_sql_insert(self):
"""
生成添加表数据SQL语句的VALUE语句段
"""
segment = f"(" \
f"'{self.order_id}', " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.barcode)}, " \
f"{str_util.check_str_null_and_transform_to_sql_null(self.name)}, " \
f"{self.count}, " \
f"{self.price_per}, " \
f"{self.retail_price}, " \
f"{self.trade_price}, " \
f"{self.category_id}, " \
f"{self.unit_id}" \
f")"

return segment

def to_csv(self, sep=","):
"""
生成一条csv数据,分隔符默认逗号
"""
csv_line = \
f"{self.order_id}{sep}" \
f"{self.barcode}{sep}" \
f"{self.name}{sep}" \
f"{self.count}{sep}" \
f"{self.price_per}{sep}" \
f"{self.retail_price}{sep}" \
f"{self.trade_price}{sep}" \
f"{self.category_id}{sep}" \
f"{self.unit_id}\n"

return csv_line

class OrdersDetailModel(object):
"""订单详情数据模型"""
def __init__(self, data):
"""
利用传入的订单json数据构建订单详情数据模型对象
"""
data = json.loads(data)
order_products_list = data['product']
self.order_id = data['orderID']
self.products_detail = [] # 记录当前订单卖出的商品

for sing_product in order_products_list:
product = SingleProductSoldModel(self.order_id, sing_product)
self.products_detail.append(product)

def generate_insert_sql(self):
"""
生成添加表数据的SQL语句
"""
sql = f"INSERT IGNORE INTO {conf.target_orders_detail_table_name}(" \
f"order_id,barcode,name,count,price_per,retail_price,trade_price,category_id,unit_id" \
f") VALUES"

for single_product_sold_model in self.products_detail:
sql += single_product_sold_model.generate_value_segment_for_sql_insert() + ", "

# 去除最后的逗号
sql = sql[:-2]
return sql

@staticmethod
def get_csv_header(sep=','):
"""
生成 csv 数据的标头内容
"""
return f"order_id{sep}" \
f"barcode{sep}" \
f"name{sep}" \
f"count{sep}" \
f"price_per{sep}" \
f"retail_price{sep}" \
f"trade_price{sep}" \
f"category_id{sep}" \
f"unit_id\n"

def to_csv(self):
"""生成添加csv的数据行"""
csv_lines = ''

for single_product_sold_model in self.products_detail:
csv_lines += single_product_sold_model.to_csv()

return csv_lines

class RetailOriginJsonModel(object):
"""
原始订单JSON数据模型
"""
# data 是一行数据{order_id: xxx,products: [{商品1详情}, {商品2详情}]}
def __init__(self, data):
self.order_model = OrdersModel(data)
self.order_detail_model = OrdersDetailModel(data)

def get_order_model(self):
return self.order_model

def get_order_detail_model(self):
return self.order_detail_model

def get_order_id(self):
return self.order_model.order_id

def get_products_list(self):
return self.order_detail_model.products_detail

业务流程OrderService

具体业务逻辑跟前两个业务处理基本一致,不多bb了,实在不行就再敲一遍,都有注释的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from EmploymentClass.PythonETL.util import file_util,mysql_util
from EmploymentClass.PythonETL.config import project_config_file as cfg
from EmploymentClass.PythonETL.model import order_model

# todo: 第一步: 获取要采集的文件[3步](1, 获取目录下的所有待采集的json文件 2, 从元数据库中读取数据,得到已经采集过的文件 3, 对比全部文件和已经采集的文件
# 1, 获取所有文件
json_file_list = file_util.get_dir_file_list_without_recursion(cfg.json_file_path)

# 2, 从元数据库中读取数据(1,创建连接 2,检查元数据表是否存在不存在就创建 3,执行查询sql
meta_db = mysql_util.get_mysql_util(
user=cfg.metadata_user,
password=cfg.metadata_password,
database=cfg.metadata_db
)
meta_db.check_table_exists_create(
cfg.metadata_db,
cfg.json_monitor_meta_table_name,
cfg.json_monitor_meta_table_create_cols
)
tuple_data = meta_db.query(f'select * from {cfg.json_monitor_meta_table_name};')
# 3, 对比已经采集的文件和全部文件获得 需要采集的文件
need_process_list = file_util.get_new_by_compare_lists(json_file_list, tuple_data)

# extra: 判断需要采集的文件列表是否为空,如果为空就直接退出(节省资源的消耗)
if len(need_process_list) == 0:
exit("没有需要采集的文件,程序退出!")

# todo: 第二步: 把要采集的数据写入数据库和csv文件上.[写出两个数据库,两个csv]
# 创建数据库通道(由于订单表和订单详情表都是在一个数据库中的,所以可以只创建一个连接通道
target_db = mysql_util.get_mysql_util(
user=cfg.target_user,
password=cfg.target_password,
database=cfg.target_retail_db
)
# 检查订单表是否存在
target_db.check_table_exists_create(
cfg.target_retail_db,
cfg.target_orders_table_name,
cfg.target_orders_table_create_cols
)
# 检查订单详情表是否存在
target_db.check_table_exists_create(
cfg.target_retail_db,
cfg.target_orders_detail_table_name,
cfg.target_orders_detail_table_create_cols
)
# 创建csv文件写出通道(订单表和订单详情表
order_csv_file = open(cfg.retail_output_csv_root_path+cfg.retail_orders_output_csv_file_name, 'a', encoding='utf8')
order_detail_csv_file = open(cfg.retail_output_csv_root_path + cfg.retail_orders_detail_output_csv_file_name, 'a', encoding='utf8')
# 写出表头数据到csv文件中
order_csv_file.write(order_model.OrdersModel.get_csv_header())
order_detail_csv_file.write(order_model.OrdersDetailModel.get_csv_header())

# todo: 优化的点(1,开启事务 - 结合异常使用 2, 记录处理过的文件/行数到元数据库中 3, 记录日志
# 事务: 注意事务的开启时机以及事务的提交时机,事务的提交时机是一个文件写入完毕,没有出现错误,由此可以判断事务的开启时机
# 元数据: 注意元数据表中的字段(id自增的不需要写, 处理文件名称, 处理文件行数, 时间自动生成的不需要写)
# 日志: 注意日志可以计算处理数据的总耗时
# 将json中的数据写入到数据库, csv文件中
# 记录数据的处理行数
line_count = 0
for file_name in need_process_list:
# file_name为单个文件
try:
# 开启事务
target_db.begin_transaction()
for lines in open(file_name, 'r', encoding='utf8'):
original_model = order_model.RetailOriginJsonModel(lines)
if original_model.order_model.receivable <= 10000:
# 只有数据值正确, 处理行数才会加1
line_count += 1
# lines 为某个json文件的某一行数据(类型为python中的字典)(可以创建模型对象)
# 向订单表中插入数据(不得不说model写的可以,json处理的很好
target_db.insert_single_sql(order_model.RetailOriginJsonModel(lines).get_order_model().generate_insert_sql())
target_db.insert_single_sql(order_model.RetailOriginJsonModel(lines).get_order_detail_model().generate_insert_sql())
# 向csv文件中写入数据
order_csv_file.write(order_model.RetailOriginJsonModel(lines).get_order_model().to_csv())
order_detail_csv_file.write(order_model.RetailOriginJsonModel(lines).get_order_detail_model().to_csv())
except Exception as e:
# 回滚事务
target_db.rollback_transaction()
else:
# 提交事务(当一个文件完全写完才会提交一次事务,否则不会提交)
target_db.commit_transaction()
order_csv_file.flush()
order_detail_csv_file.flush()
# 向元数据表中插入数据
meta_db.insert_single_sql(f"insert into {cfg.json_monitor_meta_table_name} (file_name,process_lines)"
f"values ('{file_name}', {line_count})")

# 释放资源
target_db.close()
meta_db.close()
order_csv_file.close()
order_detail_csv_file.close()