商品数据采集主要分为三个部分: 1, 获取上次记录的最大时间值(从元数据库的表中获取) . 2, 根据上一次采集商品数据中updateAt的最大值查询数据源库商品表,获取继上一次采集之后,新增和更新的商品数据 .3, 针对新增和更新的商品数据,进行数据采集(ETL->mysql->csv)
需要优化的点: 1,使用事务,事务的使用需要结合try, except, else来使用.同时注意执行事务操作不需要调用工具类中的execute函数,因为函数会自动commit, 所以需要自定义without_commit,而且如果设置每执行1000条sql就提交一次事务,而在for,else中使用事务再次提交一次 2,使用日志工具(每执行1000句sql就记录一下,同时提交事务.日志的设置比较随便,仁者见仁智者见智. 3,将读取到的数据记录写入元数据表中,注意元数据表中的字段,一个是记录本次采集的最大时间(updatetime),以及采集到了哪一行,方便后续的采集动作. 4,使用time工具计算执行所需的时间,strat_time = time.time()
模型文件
使用一个元组类型的数据能够快速获得sql插入语句,csv文件插入语句,以及生成csv表头.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 """ 条码商品信息模型类 """ from EmploymentClass.PythonETL.util import str_utilfrom EmploymentClass.PythonETL.config import project_config_file as confclass BarcodeModel (object ): """条码商品信息模型类""" def __init__ (self, data_tuple: tuple ): """条码商品模型对象初始化""" self.code = data_tuple[0 ] self.name = str_util.clear_str(data_tuple[1 ]) self.spec = str_util.clear_str(data_tuple[2 ]) self.trademark = str_util.clear_str(data_tuple[3 ]) self.addr = str_util.clear_str(data_tuple[4 ]) self.units = str_util.clear_str(data_tuple[5 ]) self.factory_name = str_util.clear_str(data_tuple[6 ]) self.trade_price = data_tuple[7 ] self.retail_price = data_tuple[8 ] self.update_at = str (data_tuple[9 ]) self.wholeunit = data_tuple[10 ] self.wholenum = data_tuple[11 ] self.img = data_tuple[12 ] self.src = data_tuple[13 ] def generate_insert_sql (self ): """生成SQL的插入语句""" sql = f"REPLACE INTO {conf.target_barcode_table_name} (" \ f"code,name,spec,trademark,addr,units,factory_name,trade_price," \ f"retail_price,update_at,wholeunit,wholenum,img,src) VALUES(" \ f"'{self.code} ', " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.name)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.spec)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.trademark)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.addr)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.units)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.factory_name)} , " \ f"{str_util.check_number_null_and_transform_to_sql_null(self.trade_price)} , " \ f"{str_util.check_number_null_and_transform_to_sql_null(self.retail_price)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.update_at)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.wholeunit)} , " \ f"{str_util.check_number_null_and_transform_to_sql_null(self.wholenum)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.img)} , " \ f"{str_util.check_str_null_and_transform_to_sql_null(self.src)} " \ f")" return sql @staticmethod def get_csv_header (sep=',' ): """ 生成 csv 数据的标头内容 """ return f"code{sep} " \ f"name{sep} " \ f"spec{sep} " \ f"trademark{sep} " \ f"addr{sep} " \ f"units{sep} " \ f"factory_name{sep} " \ f"trade_price{sep} " \ f"retail_price{sep} " \ f"update_at{sep} " \ f"wholeunit{sep} " \ f"wholenum{sep} " \ f"img{sep} " \ f"src\n" def to_csv (self, sep=',' ): """生成csv数据行""" csv_line = \ f"{self.code} {sep} " \ f"{self.name} {sep} " \ f"{self.spec} {sep} " \ f"{self.trademark} {sep} " \ f"{self.addr} {sep} " \ f"{self.units} {sep} " \ f"{self.factory_name} {sep} " \ f"{self.trade_price} {sep} " \ f"{self.retail_price} {sep} " \ f"{self.update_at} {sep} " \ f"{self.wholeunit} {sep} " \ f"{self.wholenum} {sep} " \ f"{self.img} {sep} " \ f"{self.src} \n" return csv_line
条码采集文件(带注释
跟上一个处理日志文件的思路差不多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 from EmploymentClass.PythonETL.util import mysql_utilfrom EmploymentClass.PythonETL.config import project_config_file as cfgfrom EmploymentClass.PythonETL.model.barcode_model import BarcodeModelfrom EmploymentClass.PythonETL.util import logging_utilimport time""" 需要优化的点: 1,使用事务 2,使用日志工具(每执行1000句sql就记录一下,同时提交事务 3,将读取到的数据记录写入元数据表中 4,使用time工具计算执行所需的时间 """ logger = logging_util.init_logger() meat_db = mysql_util.get_mysql_util( user=cfg.metadata_user, password=cfg.metadata_password, database=cfg.metadata_db ) meat_db.check_table_exists_create( cfg.metadata_db, cfg.metadata_barcode_table_name, cfg.metadata_barcode_table_create_cols ) max_tuple_date = meat_db.query(f"select max(time_record) from {cfg.metadata_barcode_table_name} ;" ) if max_tuple_date[0 ][0 ] is None : max_date = None else : max_date = max_tuple_date[0 ][0 ] logger.info(f"上次采集的时间是: {max_date} " ) source_db = mysql_util.get_mysql_util( user=cfg.source_user, password=cfg.source_password, database=cfg.source_data_db ) if not source_db.check_table_exists(cfg.source_data_db,cfg.target_barcode_table_name): exit("后台商品不存在,请找后台开发人员!" ) if max_date is None : select_sql = f"select * from {cfg.target_barcode_table_name} " else : select_sql = f"select * from {cfg.target_barcode_table_name} where updateAt > '{max_date} ';" result_data = source_db.query(select_sql) print (f"要采集的数据条数为: {len (result_data)} " )if len (result_data) == 0 : exit("没有要处理的数据,退出采集!" ) target_db = mysql_util.get_mysql_util( user=cfg.target_user, password=cfg.target_password, database=cfg.target_retail_db ) target_db.check_table_exists_create( cfg.target_retail_db, cfg.target_barcode_table_name, cfg.target_barcode_table_create_cols ) csv_file = open (cfg.barcode_output_csv_root_path+cfg.barcode_orders_output_csv_file_name, "a" , encoding="utf8" ) csv_file.write(BarcodeModel.get_csv_header()) line_count = 0 start_time = time.time() target_db.begin_transaction() for tuple_data in result_data: try : line_count += 1 bm_model = BarcodeModel(tuple_data) target_db.insert_single_sql_without_commit(bm_model.generate_insert_sql()) csv_file.write(bm_model.to_csv()) except Exception as e: logger.error(f"插入数据出错:{e} ,回滚事务!" ) target_db.rollback_transaction() raise e else : if line_count % 1000 == 0 : target_db.commit_transaction() target_db.begin_transaction() csv_file.flush() else : end_time = time.time() target_db.commit_transaction() query_result = target_db.query(f"select max(update_at) from {cfg.target_barcode_table_name} " ) meat_db.insert_single_sql(f"insert into table {cfg.metadata_barcode_table_name} (time_record,gather_line_count)" f"values ({query_result[0 ][0 ]} ,{line_count} );" ) csv_file.close() target_db.close() meat_db.close()
END