日志文件处理

这个主题主要工作大体上分为两个部分,第一部分是从元数据库中获取已经处理的文件,然后将文件与全部文件(列表)进行对比,最后获取需要处理的文件列表.第二个部分是将日志文件输出到数据库中,并且对文件进行备份操作(转为csv文件)

Experience

  • 首先由于python解释器以及pycharm是在虚拟机中运行的,导致我的文件路径python一直读取不了,所以日志文件目录也是获取不了的.(后来发现是系统的权限的原因,权限我改不了,直接换路径)
  • 需要有列表嵌套对象的思想,我觉得这是ETL中最重要的,学生管理系统教会了我列表中嵌套字典,这也是一种收获吧!
  • 要通过事务来写入数据到mysql中,遇到报错要回滚,没有报错再提交,还有读取一个文件需要flush一次,以及提交一次到数据库中吧

PART1

  • 从元数据库中获取已经处理的文件
    • 首先创建一个元数据库的连接
    • 调用mysql_util快速获取一个对象
    • 查看元数据库是否存在,不存在则创建(mysql_util中的方法)
    • 查询元数据库中的元数据表(得到元组嵌套元组)
      • 因为file_util工具对比的是两个列表需要将元组转换为列表
      • 可以在主业务逻辑中处理
      • 也可以在file_util文件工具中处理
  • 递归(非递归)遍历需要处理的文件/文件夹
    • 遍历得到所有文件的逻辑在上篇的文件工具中说了
  • 将全部文件和已经处理过的文件进行比对得到需要处理的文件
    • 这里同上,上篇文章文件工具讲了,不同的是比较逻辑,需要将元组转为列表(很好转的
1
2
3
4
5
6
7
8
9
10
def get_new_by_compare_lists(all_list, processed_list_tuple):
processed_list = []
for i in processed_list_tuple:
processed_list.append(i[1])

need_process_list = []
for i in all_list:
if i not in processed_list:
need_process_list.append(i)
return need_process_list

PART2

将日志文件输出到数据库中,并且对文件进行备份操作(转为csv文件)

  • 创建目标文件(csv_file = open(文件路径,打开方式,编码格式)
  • 向csv文件中写入表头使用类名.方法名方法写入: csv_file.write(BackendLogsModel.generate_csv_header())
  • 创建目标数据连接(使用mysql_util的快速获取一个连接通道)
  • 检查数据表是否存在,不存在则创建(使用mysql_util中的方法)
  • 遍历要处理的日志文件,每次读取一个文件的一行数据(字符串),并且将数据传入数据模型中,使用数据模型创建一个数据对象,根据对象名.generate_insert_sql()方法生成一条数据库插入语句
  • 最后再调用mysql_util工具类中的方法执行刚刚生成的sql语句

配置文件详解

日志文件相关配置

因为日志文件要设置日志处理器(文件处理器)需要配置日志输出路径,而且会根据时间自动生成不同的日志文件名称

1
2
3
4
5
import time
# 获取日志文件路径
log_root_path = r'C:\Users\Stray\PycharmProjects\Project\EmploymentClass\PythonETL\log'
# 生成随机文件名称(以天/小时为单位
log_name = f"pyetl_{time.strftime('%Y-%m-%d', time.localtime())}.log"

数据库文件相关配置

因为需要将日志数据写入到数据库中,以及要记录写过的文件(保存为元数据信息)所以需要两个数据库,一个写出数据的库,一个元数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# ################## --元数据库metadata连接信息- ###################
# 元数据库配置
metadata_host = 'localhost'
metadata_port = 3306
metadata_user = 'root'
metadata_password = '123456'
metadata_db = 'metadata'

############ . 数仓(MySQL)连接信息 ###########
target_host = '127.0.0.1'
target_port = 3306
target_user = 'root'
target_password = '123456'
target_db = 'logs' # 必须要提前在数据库中创建一个logs数据库

# todo ################## --后台日志数据采集配置项-- ###################
# 采集后台日志数据,元数据表配置项
logs_monitor_meta_table_name = "backend_logs_monitor"
logs_monitor_meta_table_create_cols = \
"id INT PRIMARY KEY AUTO_INCREMENT, " \
"file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
"process_lines INT NULL COMMENT '文件处理行数', " \
"process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"

# ################## --日志业务数据采集配置项-- ###################
# 后台日志表名称
target_logs_table_name = "backend_logs"
target_logs_table_create_cols = \
f"id int PRIMARY KEY AUTO_INCREMENT COMMENT '自增ID', " \
f"log_time TIMESTAMP(6) COMMENT '日志时间,精确到6位毫秒值', " \
f"log_level VARCHAR(10) COMMENT '日志级别', " \
f"log_module VARCHAR(50) COMMENT '输出日志的功能模块名', " \
f"response_time INT COMMENT '接口响应时间毫秒', " \
f"province VARCHAR(30) COMMENT '访问者省份', " \
f"city VARCHAR(30) COMMENT '访问者城市', " \
f"log_text VARCHAR(255) COMMENT '日志正文', " \
f"INDEX(log_time)"

CSV文件相关配置

需要将数据放到csv文件中备份,所以需要写出路径,以及读取文件的存放路径

1
2
3
4
5
6
7
# 待采集 日志 文件所在的目录
backend_logs_data_root_path = 'C:/Users/Stray/PycharmProjects/Project/EmploymentClass/logs/'
# 后台日志数据写出 csv 的根路径
logs_output_csv_root_path = "C:/Users/Stray/PycharmProjects/Project/EmploymentClass/csv/"
# 每一次运行,后台日志文件写出路径
logs_output_csv_file_name = \
f"logs-{time.strftime('%Y-%m-%d-%H_%M', time.localtime())}.csv"

代码Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from EmploymentClass.PythonETL.util import file_util
from EmploymentClass.PythonETL.util import logging_util
from EmploymentClass.PythonETL.config import project_config_file as cfg
from EmploymentClass.PythonETL.util import mysql_util
from EmploymentClass.PythonETL.model.backend_logs_model import BackendLogsModel

# 创建日志对象
logger = logging_util.init_logger("backend_logs")

# todo: 获取日志目录中一共有哪些文件(总共的文件-给的是日志文件的路径
all_file_list = file_util.get_dir_file_list_without_recursion(path=cfg.backend_logs_data_root_path)
logger.info(f"全部日志文件: {all_file_list}")

# 连接元数据库(这里通过快速创建对象的方法创建
meta_db = mysql_util.get_mysql_util(
user=cfg.metadata_user,
password=cfg.metadata_password,
database=cfg.metadata_db
)

# 测试数据证明数据库连接成功
# print(meta_db.query('show databases;'))
# todo: 查询元数据库中日志监控记录表(看看有哪些文件是被采集过的)
# 首先查看数据库中的元数据表是否存在,不存在则创建
meta_db.check_table_exists_create(
cfg.metadata_db,
cfg.logs_monitor_meta_table_name,
cfg.logs_monitor_meta_table_create_cols
)
# 查询元数据库中的信息(data是元组嵌套元组的形式)(获取已经被处理的文件
data = meta_db.query(f"select * from {cfg.logs_monitor_meta_table_name};")
logger.info(f"已经处理的文件:{data}")

# todo: 对比要采集的日志文件(处理过的文件是元组形式
need_process_list = file_util.get_new_by_compare_lists(all_file_list, data)
logger.info(f"需要处理的文件为: {need_process_list}")

# todo: 针对需要处理的文件,进行数据采集(将数据放到csv文件中, 放到数据库中)
# 创建目标csv文件
csv_file = open(cfg.logs_output_csv_root_path + cfg.logs_output_csv_file_name, 'a', encoding='utf8')

# 向csv文件中写入表头
csv_file.write(BackendLogsModel.generate_csv_header())

# 创建目标数据连接
target_db = mysql_util.get_mysql_util(
user=cfg.target_user,
password=cfg.target_password,
database=cfg.target_db
)

# 检查目标数据库是否存在, 不存在则创建
target_db.check_table_exists_create(
cfg.target_db,
cfg.target_logs_table_name,
cfg.target_logs_table_create_cols
)
"""
遍历要采集的日志文件
遍历每一行数据,生成模型对象
写入到数据库中
写入到csv文件中
"""
# 遍历要处理的文件列表(i为某个需要处理的文件
for file in need_process_list:
line_count = 0
try:
# 打开事务
target_db.begin_transaction()
# lines为文件的一行数据
for lines in open(file, 'r', encoding='utf8'):
# 创建模型对象
line_model = BackendLogsModel(lines)
# print(line_model.generate_insert_sql())
# print(line_model.gender_csv_str())
target_db.insert_single_sql_without_commit(line_model.generate_insert_sql())
csv_file.write(line_model.generate_csv_str())
line_count += 1
# 发生了异常
except Exception as e:
# 回滚事务
target_db.rollback_transaction()
raise e
# 没有异常发生
else:
csv_file.flush()
target_db.commit_transaction()
# 记录元数据(已经处理的文件名,每处理一个文件就要记录一个
meta_db.execute(f"insert into {cfg.logs_monitor_meta_table_name}"
f"(file_name,process_lines) values ('{file}',{line_count})")
logger.info(f"文件{file}写入成功,内容条数是{line_count}")

# 释放资源
csv_file.close()
target_db.close()
meta_db.close()

日志数据模型

构建日志模型的作用是传递进一个字符串会快速生成一个模型对象,通过对象名.方法名快速获取一个插入的sql以及生成csv文件的表头和内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
构建日志数据模型类
"""
import EmploymentClass.PythonETL.config.project_config_file as conf


class BackendLogsModel(object):
"""创建一个模型类"""

def __init__(self, log_data: str):
"""初始化方法"""
# 1. 将log_data数据拆分为列表,分隔符是\t
data = log_data.split('\t')
# 2. 将列表中的每一个元素赋值给相应的属性
self.log_time = data[0]
self.log_level = data[1].strip('[]')
self.log_module = data[2]
self.response_time = data[3][5:-2]
self.province = data[4]
self.city = data[5]
self.log_text = data[6].replace('\n','')

def generate_insert_sql(self):
"""生成插入数据的sql语句"""
return f'insert into {conf.target_logs_table_name}(' \
f'log_time, log_level, log_module, response_time, province, city, log_text)' \
f' values("{self.log_time}",' \
f'"{self.log_level}",' \
f'"{self.log_module}",' \
f'"{self.response_time}",' \
f'"{self.province}",' \
f'"{self.city}",' \
f'"{self.log_text}");'


@staticmethod
def generate_csv_header(sep=','):
"""生成csv文件的标头"""
return f'log_time{sep}' \
f'log_level{sep}' \
f'log_module{sep}' \
f'response_time{sep}' \
f'province{sep}' \
f'city{sep}' \
f'log_text\n'

# 生成csv文件的行内容
def generate_csv_str(self, sep=','):
return f'{self.log_time}{sep}' \
f'{self.log_level}{sep}' \
f'{self.log_module}{sep}' \
f'{self.response_time}{sep}' \
f'{self.province}{sep}' \
f'{self.city}{sep}' \
f'{self.log_text}\n'