日志文件处理
这个主题主要工作大体上分为两个部分,第一部分是从元数据库中获取已经处理的文件,然后将文件与全部文件(列表)进行对比,最后获取需要处理的文件列表.第二个部分是将日志文件输出到数据库中,并且对文件进行备份操作(转为csv文件)
Experience
首先由于python解释器以及pycharm是在虚拟机中运行的,导致我的文件路径python一直读取不了,所以日志文件目录也是获取不了的.(后来发现是系统的权限的原因,权限我改不了,直接换路径)
需要有列表嵌套对象的思想,我觉得这是ETL中最重要的,学生管理系统教会了我列表中嵌套字典,这也是一种收获吧!
要通过事务来写入数据到mysql中,遇到报错要回滚,没有报错再提交,还有读取一个文件需要flush一次,以及提交一次到数据库中吧
PART1
从元数据库中获取已经处理的文件
首先创建一个元数据库的连接
调用mysql_util快速获取一个对象
查看元数据库是否存在,不存在则创建(mysql_util中的方法)
查询元数据库中的元数据表(得到元组嵌套元组)
因为file_util工具对比的是两个列表需要将元组转换为列表
可以在主业务逻辑中处理
也可以在file_util文件工具中处理
递归(非递归)遍历需要处理的文件/文件夹
将全部文件和已经处理过的文件进行比对得到需要处理的文件
这里同上,上篇文章文件工具讲了,不同的是比较逻辑,需要将元组转为列表(很好转的
1 2 3 4 5 6 7 8 9 10 def get_new_by_compare_lists (all_list, processed_list_tuple ): processed_list = [] for i in processed_list_tuple: processed_list.append(i[1 ]) need_process_list = [] for i in all_list: if i not in processed_list: need_process_list.append(i) return need_process_list
PART2 将日志文件输出到数据库中,并且对文件进行备份操作(转为csv文件)
创建目标文件(csv_file = open(文件路径,打开方式,编码格式)
向csv文件中写入表头使用类名.方法名方法写入: csv_file.write(BackendLogsModel.generate_csv_header())
创建目标数据连接(使用mysql_util的快速获取一个连接通道)
检查数据表是否存在,不存在则创建(使用mysql_util中的方法)
遍历要处理的日志文件,每次读取一个文件的一行数据(字符串),并且将数据传入数据模型中,使用数据模型创建一个数据对象,根据对象名.generate_insert_sql()方法生成一条数据库插入语句
最后再调用mysql_util工具类中的方法执行刚刚生成的sql语句
配置文件详解 日志文件相关配置
因为日志文件要设置日志处理器(文件处理器)需要配置日志输出路径,而且会根据时间自动生成不同的日志文件名称
1 2 3 4 5 import timelog_root_path = r'C:\Users\Stray\PycharmProjects\Project\EmploymentClass\PythonETL\log' log_name = f"pyetl_{time.strftime('%Y-%m-%d' , time.localtime())} .log"
数据库文件相关配置
因为需要将日志数据写入到数据库中,以及要记录写过的文件(保存为元数据信息)所以需要两个数据库,一个写出数据的库,一个元数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 metadata_host = 'localhost' metadata_port = 3306 metadata_user = 'root' metadata_password = '123456' metadata_db = 'metadata' target_host = '127.0.0.1' target_port = 3306 target_user = 'root' target_password = '123456' target_db = 'logs' logs_monitor_meta_table_name = "backend_logs_monitor" logs_monitor_meta_table_create_cols = \ "id INT PRIMARY KEY AUTO_INCREMENT, " \ "file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \ "process_lines INT NULL COMMENT '文件处理行数', " \ "process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'" target_logs_table_name = "backend_logs" target_logs_table_create_cols = \ f"id int PRIMARY KEY AUTO_INCREMENT COMMENT '自增ID', " \ f"log_time TIMESTAMP(6) COMMENT '日志时间,精确到6位毫秒值', " \ f"log_level VARCHAR(10) COMMENT '日志级别', " \ f"log_module VARCHAR(50) COMMENT '输出日志的功能模块名', " \ f"response_time INT COMMENT '接口响应时间毫秒', " \ f"province VARCHAR(30) COMMENT '访问者省份', " \ f"city VARCHAR(30) COMMENT '访问者城市', " \ f"log_text VARCHAR(255) COMMENT '日志正文', " \ f"INDEX(log_time)"
CSV文件相关配置
需要将数据放到csv文件中备份,所以需要写出路径,以及读取文件的存放路径
1 2 3 4 5 6 7 backend_logs_data_root_path = 'C:/Users/Stray/PycharmProjects/Project/EmploymentClass/logs/' logs_output_csv_root_path = "C:/Users/Stray/PycharmProjects/Project/EmploymentClass/csv/" logs_output_csv_file_name = \ f"logs-{time.strftime('%Y-%m-%d-%H_%M' , time.localtime())} .csv"
代码Demo 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 from EmploymentClass.PythonETL.util import file_utilfrom EmploymentClass.PythonETL.util import logging_utilfrom EmploymentClass.PythonETL.config import project_config_file as cfgfrom EmploymentClass.PythonETL.util import mysql_utilfrom EmploymentClass.PythonETL.model.backend_logs_model import BackendLogsModellogger = logging_util.init_logger("backend_logs" ) all_file_list = file_util.get_dir_file_list_without_recursion(path=cfg.backend_logs_data_root_path) logger.info(f"全部日志文件: {all_file_list} " ) meta_db = mysql_util.get_mysql_util( user=cfg.metadata_user, password=cfg.metadata_password, database=cfg.metadata_db ) meta_db.check_table_exists_create( cfg.metadata_db, cfg.logs_monitor_meta_table_name, cfg.logs_monitor_meta_table_create_cols ) data = meta_db.query(f"select * from {cfg.logs_monitor_meta_table_name} ;" ) logger.info(f"已经处理的文件:{data} " ) need_process_list = file_util.get_new_by_compare_lists(all_file_list, data) logger.info(f"需要处理的文件为: {need_process_list} " ) csv_file = open (cfg.logs_output_csv_root_path + cfg.logs_output_csv_file_name, 'a' , encoding='utf8' ) csv_file.write(BackendLogsModel.generate_csv_header()) target_db = mysql_util.get_mysql_util( user=cfg.target_user, password=cfg.target_password, database=cfg.target_db ) target_db.check_table_exists_create( cfg.target_db, cfg.target_logs_table_name, cfg.target_logs_table_create_cols ) """ 遍历要采集的日志文件 遍历每一行数据,生成模型对象 写入到数据库中 写入到csv文件中 """ for file in need_process_list: line_count = 0 try : target_db.begin_transaction() for lines in open (file, 'r' , encoding='utf8' ): line_model = BackendLogsModel(lines) target_db.insert_single_sql_without_commit(line_model.generate_insert_sql()) csv_file.write(line_model.generate_csv_str()) line_count += 1 except Exception as e: target_db.rollback_transaction() raise e else : csv_file.flush() target_db.commit_transaction() meta_db.execute(f"insert into {cfg.logs_monitor_meta_table_name} " f"(file_name,process_lines) values ('{file} ',{line_count} )" ) logger.info(f"文件{file} 写入成功,内容条数是{line_count} " ) csv_file.close() target_db.close() meta_db.close()
日志数据模型
构建日志模型的作用是传递进一个字符串会快速生成一个模型对象,通过对象名.方法名快速获取一个插入的sql以及生成csv文件的表头和内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 """ 构建日志数据模型类 """ import EmploymentClass.PythonETL.config.project_config_file as confclass BackendLogsModel (object ): """创建一个模型类""" def __init__ (self, log_data: str ): """初始化方法""" data = log_data.split('\t' ) self.log_time = data[0 ] self.log_level = data[1 ].strip('[]' ) self.log_module = data[2 ] self.response_time = data[3 ][5 :-2 ] self.province = data[4 ] self.city = data[5 ] self.log_text = data[6 ].replace('\n' ,'' ) def generate_insert_sql (self ): """生成插入数据的sql语句""" return f'insert into {conf.target_logs_table_name} (' \ f'log_time, log_level, log_module, response_time, province, city, log_text)' \ f' values("{self.log_time} ",' \ f'"{self.log_level} ",' \ f'"{self.log_module} ",' \ f'"{self.response_time} ",' \ f'"{self.province} ",' \ f'"{self.city} ",' \ f'"{self.log_text} ");' @staticmethod def generate_csv_header (sep=',' ): """生成csv文件的标头""" return f'log_time{sep} ' \ f'log_level{sep} ' \ f'log_module{sep} ' \ f'response_time{sep} ' \ f'province{sep} ' \ f'city{sep} ' \ f'log_text\n' def generate_csv_str (self, sep=',' ): return f'{self.log_time} {sep} ' \ f'{self.log_level} {sep} ' \ f'{self.log_module} {sep} ' \ f'{self.response_time} {sep} ' \ f'{self.province} {sep} ' \ f'{self.city} {sep} ' \ f'{self.log_text} \n'