LoggingModule
日志模块
作用: 方便后续在 ETL 程序中记录日志
目标: 当我们在项目的其他位置使用logging模块进行日志记录时,不需要进行配置或者只需要进行简单的配置即可使用.
为了更方便的使用logging,我们在日志模块中创建一个日志类Logging,专门管理日志器对象
重点: 在创建日志类时可以同时传入日志级别,方便日志级别控制
创建init_logger函数快速创建日志器对象,并完成日志处理器和日志格式的绑定.
首先回顾一下日志logger的食用方式.
创建日志对象logger = logging.getLogger()
创建文件处理器FileHandler(流处理器StreamHandler) handler = logging.FileHandler()
创建日志格式对象fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
给处理器设置格式handler.setFormatter(fat)
将处理器绑定到日志对象上logger.addHandler(handler)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import loggingimport EmploymentClass.PythonETL.config.project_config_file as cfgimport osclass LoggingUtil (): def __init__ (self, name=None , level=logging.INFO ): self.logger = logging.getLogger(name) self.logger.setLevel(level) def init_logger (name=None ): logger = LoggingUtil(name).logger file_handler = logging.FileHandler( filename=os.path.join(cfg.log_root_path, cfg.log_name), mode='a' , encoding='utf8' ) fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s %(filename)s %(lineno)d ' ) file_handler.setFormatter(fmt) logger.addHandler(file_handler) return logger if __name__ == '__main__' : logger = init_logger('小明' ) logger.info('测试logger工具' )
FileModule
文件模块
log日志文件和json订单文件都是以文件的形式保存在计算机的指定目录下, 如果我们想要处理这些数据
就需要将所有的文件信息提取出来
比对已经处理过的文件,筛选出未处理的文件
这两个需求,都需要使用到os模块,所以我们就使用os模块封装一个file_util模块来方便我们调用相关功能.
这里就是两个功能:
这里说一下非递归获取文件列表,需要使用到os模块下的walk函数, 通过for root, dirs, files in os.walk()
来多层次遍历, 然后dirs,files都是列表类型的数据, dirs是不需要你处理的(除非你想获取某个路径下的所有文件夹), 通过遍历files将files中的元素append到一个所有文件列表中all_file_list,来实现获取某个目录下的文件列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 """ (递归/非递归)获取指定目录下的所有文件 """ import osdef get_dir_file_list (path='./' , recursion=False ): all_file_list = [] file_dir_list = os.listdir(path) for file_name in file_dir_list: abs_path = os.path.join(path, file_name) if os.path.isfile(abs_path): all_file_list.append(file_name) else : if recursion: sub_path_list = get_dir_file_list(abs_path, True ) all_file_list += sub_path_list return all_file_list def get_dir_file_list_without_recursion (path='./' ): all_file_list = [] for root, dirs, files in os.walk(path): for file in files: path = os.path.join(root, file) name = path.replace("\\" , '/' ) i = os.path.basename(name) all_file_list.append(i) return all_file_list def get_new_by_compare_lists (all_list, processed_list ): need_process_list = [] for i in all_list: if i not in processed_list: need_process_list.append(i) return need_process_list if __name__ == '__main__' : path = r'C:\Users\Stray\PycharmProjects\Project\EmploymentClass\PythonETL\learn' print (get_dir_file_list(path, True )) print (get_new_by_compare_lists([1 , 2 , 3 , 4 , 5 ], [1 , 2 ])) print (get_dir_file_list_without_recursion(path))
MysqlModule
数据库工具类
初始化对象
执行非查询SQL
执行查询SQL
切换数据库conn.select_db()
开启事务conn.begin()
提交事务conn.commit()
回滚事务conn.rollback()
判断数据表是否存在
首先调用自身的非事务方法query讲返回值赋给datadata = self.query('show tables;')
判断数据表是否存在if (tb_name,) in data: return True
检查表是否存在,不存在则创建
上面的逻辑已经实现了判断数据库是否存在,然后没有直接创建就行了
tb_cols已经在上面通过配置文件导入过了,sql = f”create table {tb_name} ({tb_cols});”
1 2 3 4 5 file_monitor_meta_table_create_cols = \ "id INT PRIMARY KEY AUTO_INCREMENT, " \ "file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \ "process_lines INT NULL COMMENT '文件处理行数', " \ "process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
执行一条插入SQL
关闭数据库连接
创建数据库工具对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 import pymysqlfrom EmploymentClass.PythonETL.util.logging_util import *logger = init_logger() class MysqlUtil (): def __init__ (self, host, port, user, password, database=None ,charset='utf8' ): self.conn = pymysql.connect( host=host, port=port, user=user, password=password, charset=charset, database=database ) logger.info("数据库连接成功......" ) def switch_db (self, db_name ): self.conn.select_db(db_name) logger.info(f"数据库{db_name} 切换成功!" ) def execute (self, sql ): cursor = self.conn.cursor() row_nums = cursor.execute(sql) if row_nums > 0 : print (f"执行{sql} 语句成功" ) self.conn.commit() logger.info(f"执行{sql} 语句成功" ) else : print (f"执行{sql} 语句失败" ) cursor.close() def query (self, sql ): cursor = self.conn.cursor() cursor.execute(sql) data = cursor.fetchall() cursor.close() logger.info(f"执行非事务型sql:{sql} 成功" ) return data def begin_transaction (self ): if self.conn.get_autocommit(): self.conn.autocommit(False ) self.conn.begin() logger.info("事务开启成功!" ) def rollback_transaction (self ): self.conn.rollback() logger.info("事务回滚成功!" ) def commit_transaction (self ): self.conn.commit() logger.info("事务提交成功" ) def check_table_exists (self, db_name, tb_name ): self.switch_db(db_name) result = self.query('show tables;' ) if (tb_name,) in result: return True else : return False def check_table_exists_create (self, db_name, tb_name, tb_cols ): sql = f"create table {tb_name} ({tb_cols} );" if not self.check_table_exists(db_name, tb_name): self.execute(sql) logger.info(f"在数据库{db_name} 中不存在数据表:{tb_name} 创建成功!" ) else : logger.info(f"在数据库{db_name} 中存在数据表" ) def query_all (self, db_name, tb_name, limit=None ): self.switch_db(db_name) sql = f"select * from {tb_name} " if limit: sql += f"limit {limit} " res = self.query(sql) return res def insert_single_sql (self, sql ): try : self.execute(sql) except Exception as e: logger.error(f"执行一条sql: {sql} 发生了{e} 异常" ) else : logger.info(f"执行一条sql: {sql} 成功!" ) def close (self ): if self.conn: self.conn.close() logger.info("数据库连接成功关闭!" ) def get_processed_files (self, util: MysqlUtil, db_name, tb_name, tb_cols ): util.switch_db(db_name) util.check_table_exists_create(db_name, tb_name, tb_cols) data = util.query(f"select * from {tb_name} " ) processed_list = [] if data is not None : for tuple_data in data: processed_list.append(tuple_data[1 ]) return processed_list def get_mysql_util (user, password, host='127.0.0.1' , port=3306 ,database=None , charset='utf8' ): return MysqlUtil(host, port, user, password, database ,charset) if __name__ == '__main__' : mysql_util = MysqlUtil('localhost' , 3306 , 'root' , '123456' , 'test' , 'utf8' )
StringModule 文件工具
主要实现了字符串/数字判空,以及数据转换的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 """ 字符串工具方法 """ def check_null (data: str ): if not data: return True data = data.lower() if data in ('null' , 'none' , 'undefined' ): return True else : return False def check_null_and_trans (data ): if check_null(str (data)): return "" elif isinstance (data, str ): return data.strip() else : return data def check_null_trans_to_sql_null (data ): if check_null(data): return 'null' else : return f"'{data} '" def check_num_null_trans_to_sql_null (data ): if check_null(data): return 'null' else : return data def clear_str (data: str ): if check_null(data): return data data = data.replace("'" , "" ) data = data.replace("\"" , "" ) data = data.replace(";" , "" ) data = data.replace("," , "" ) data = data.replace("@" , "" ) data = data.replace("\\" , "" ) return data
TimeModule 时间工具
主要实现了将13位日期(毫秒为单位)转换为10位日期,以及将时间戳转换指定日期格式的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timedef ts13_to_ts10 (ts ): return ts // 1000 def ts10_to_str (ts, format = '%Y-%m-%d %H:%M:%S' ): time_array = time.localtime(ts) return time.strftime(format , time_array) def ts13_to_str (ts, format = '%Y-%m-%d %H:%M:%S' ): ts = ts13_to_ts10(ts) return ts10_to_str(ts)
Extension 数据库事务的四大特性 ACID: 原子性Atomicity, 一致性Consistency, 隔离性Isolation, 持久性Durability
Atomicity:原子性,把要执行的SQL语句当做一个整体,要么全部成功,要么全部失败。不能在划分
Consistency:一致性,数据一旦写入,全局查询到的结果都是一致的
银行(分行) => 银行卡有10000,取2000(任何一个分行),所有分行都是完全一致。
Isolation:隔离性,事务处理之间都是相互独立的,不会互相影响
Durability:持久性,事务一旦提交,就会永久生效。
MySQL中,事务处理如何实现的 第一步:开启事务
1 mysql> start transaction;
第二步:事务操作(增删改)
1 2 mysql> update mysql> update
第三步:做一个简单的判断,判断以上语句是否执行成功与否
二选一:
1 2 3 4 5 6 成功,则提交事务到磁盘,永久生效 mysql> commit; 失败,则回滚事务,不能把SQL执行写入到磁盘,滚回到没有更新之前状态 mysql> rollback;
注意:如果使用事务处理,则创建数据表时,必须明确指定数据表的引擎格式为InnoDB格式。因为MyISAM这个格式不支持事务处理。