LoggingModule

 日志模块

作用: 方便后续在 ETL 程序中记录日志

目标: 当我们在项目的其他位置使用logging模块进行日志记录时,不需要进行配置或者只需要进行简单的配置即可使用.

  1. 为了更方便的使用logging,我们在日志模块中创建一个日志类Logging,专门管理日志器对象
  • 重点: 在创建日志类时可以同时传入日志级别,方便日志级别控制
  1. 创建init_logger函数快速创建日志器对象,并完成日志处理器和日志格式的绑定.
  • 重点: 返回值是一个日志器对象

首先回顾一下日志logger的食用方式.

  • 创建日志对象logger = logging.getLogger()
  • 创建文件处理器FileHandler(流处理器StreamHandler) handler = logging.FileHandler()
  • 创建日志格式对象fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
  • 给处理器设置格式handler.setFormatter(fat)
  • 将处理器绑定到日志对象上logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import logging
import EmploymentClass.PythonETL.config.project_config_file as cfg
import os
class LoggingUtil():
def __init__(self, name=None, level=logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)

def init_logger(name=None):
# 创建对象,并获取对象的logger日志对象
logger = LoggingUtil(name).logger
# 创建文件处理器
file_handler = logging.FileHandler(
filename=os.path.join(cfg.log_root_path, cfg.log_name),
mode='a',
encoding='utf8'
)
# 创建处理器格式
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s %(filename)s %(lineno)d ')
# 设置处理器格式
file_handler.setFormatter(fmt)
# 将日志处理器绑定到日志对象上
logger.addHandler(file_handler)
# 返回日志对象
return logger
if __name__ == '__main__':
logger = init_logger('小明')
logger.info('测试logger工具')

FileModule

文件模块

log日志文件和json订单文件都是以文件的形式保存在计算机的指定目录下, 如果我们想要处理这些数据

就需要将所有的文件信息提取出来

比对已经处理过的文件,筛选出未处理的文件

这两个需求,都需要使用到os模块,所以我们就使用os模块封装一个file_util模块来方便我们调用相关功能.

这里就是两个功能:

  • 递归/非递归获取某个目录下的所有文件

  • (对比已经处理的文件列表[可以从元数据metadata库中获取],全部文件列表[直接递归(非递归)遍历])获取未处理过的文件

这里说一下非递归获取文件列表,需要使用到os模块下的walk函数, 通过for root, dirs, files in os.walk()来多层次遍历, 然后dirs,files都是列表类型的数据, dirs是不需要你处理的(除非你想获取某个路径下的所有文件夹), 通过遍历files将files中的元素append到一个所有文件列表中all_file_list,来实现获取某个目录下的文件列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""
(递归/非递归)获取指定目录下的所有文件
"""
import os

# 递归获取某个文件目录下的所有文件
def get_dir_file_list(path='./', recursion=False):
# 定义列表用于存储所有文件
all_file_list = []
# 获取文件目录下的所有文件(文件夹)listdir返回值为列表(而且为文件名)
file_dir_list = os.listdir(path)
for file_name in file_dir_list:
# 用path和file_name拼接
abs_path = os.path.join(path, file_name)
if os.path.isfile(abs_path):
# 是文件的逻辑
all_file_list.append(file_name)
else:
# 是文件夹的逻辑
if recursion:
sub_path_list = get_dir_file_list(abs_path, True)
all_file_list += sub_path_list
return all_file_list

# 非递归遍历文件夹嵌套文件夹中的文件
def get_dir_file_list_without_recursion(path='./'):
all_file_list = []
for root, dirs, files in os.walk(path):
for file in files:
path = os.path.join(root, file)
name = path.replace("\\", '/')
i = os.path.basename(name)
all_file_list.append(i)
return all_file_list

# 为了方便测试就用了列表
def get_new_by_compare_lists(all_list, processed_list):
need_process_list = []
for i in all_list:
if i not in processed_list:
need_process_list.append(i)
return need_process_list

if __name__ == '__main__':
path = r'C:\Users\Stray\PycharmProjects\Project\EmploymentClass\PythonETL\learn'
print(get_dir_file_list(path, True))
print(get_new_by_compare_lists([1, 2, 3, 4, 5], [1, 2]))
print(get_dir_file_list_without_recursion(path))

MysqlModule

数据库工具类

  • 初始化对象

  • 执行非查询SQL

  • 执行查询SQL

  • 切换数据库conn.select_db()

  • 开启事务conn.begin()

  • 提交事务conn.commit()

  • 回滚事务conn.rollback()

  • 判断数据表是否存在

    • 首先调用自身的非事务方法query讲返回值赋给datadata = self.query('show tables;')
    • 判断数据表是否存在if (tb_name,) in data: return True
  • 检查表是否存在,不存在则创建

    • 上面的逻辑已经实现了判断数据库是否存在,然后没有直接创建就行了
    • tb_cols已经在上面通过配置文件导入过了,sql = f”create table {tb_name} ({tb_cols});”
1
2
3
4
5
file_monitor_meta_table_create_cols = \
"id INT PRIMARY KEY AUTO_INCREMENT, " \
"file_name VARCHAR(255) NOT NULL COMMENT '处理文件名称', " \
"process_lines INT NULL COMMENT '文件处理行数', " \
"process_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '文件处理时间'"
  • 执行一条插入SQL
  • 关闭数据库连接
  • 创建数据库工具对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pymysql
from EmploymentClass.PythonETL.util.logging_util import *

# 初始化日志工具(下面的init_logger方法会返回一个日志对象
logger = init_logger()


class MysqlUtil():
# todo: 创建连接对象
def __init__(self, host, port, user, password, database=None ,charset='utf8'):
self.conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
charset=charset,
database=database
)
logger.info("数据库连接成功......")

# todo: 切换数据库
def switch_db(self, db_name):
# 通过conn的select_db函数切换数据库
self.conn.select_db(db_name)
logger.info(f"数据库{db_name}切换成功!")
# todo: 执行非查询sql
def execute(self, sql):
# 创建游标
cursor = self.conn.cursor()
# 执行sql语句
row_nums = cursor.execute(sql)
if row_nums > 0:
print(f"执行{sql}语句成功")
self.conn.commit()
logger.info(f"执行{sql}语句成功")
else:
print(f"执行{sql}语句失败")
# 关闭游标
cursor.close()
# todo: 执行查询sql
def query(self, sql):
# 获取游标
cursor = self.conn.cursor()
# 执行sql
cursor.execute(sql)
# 返回执行结果
data = cursor.fetchall()
cursor.close()
logger.info(f"执行非事务型sql:{sql}成功")
return data
# todo: 开启事务
def begin_transaction(self):
if self.conn.get_autocommit():
self.conn.autocommit(False)
# 开启事务
self.conn.begin()
# 记录日志
logger.info("事务开启成功!")
# todo: 回滚事务
def rollback_transaction(self):
self.conn.rollback()
logger.info("事务回滚成功!")
# todo: 提交事务
def commit_transaction(self):
self.conn.commit()
logger.info("事务提交成功")
# todo: 检查数据表是否存在
def check_table_exists(self, db_name, tb_name):
# 切换数据库
self.switch_db(db_name)
# 查询数据库中的所有表
result = self.query('show tables;')# 元组套元组的形式
if (tb_name,) in result:
return True
else:
return False
# todo: 检查数据库是否存在, 不存在则创建(这里要给出数据库名,表名, 建表语句)
def check_table_exists_create(self, db_name, tb_name, tb_cols):
sql = f"create table {tb_name} ({tb_cols});"
# 检查表在数据库中是否存在(在的话返回True,不在就False
if not self.check_table_exists(db_name, tb_name):
# 说明不在, 执行非查询sql方法
self.execute(sql)
logger.info(f"在数据库{db_name}中不存在数据表:{tb_name}创建成功!")
else:
logger.info(f"在数据库{db_name}中存在数据表")
# todo: 查询指定表的数据
def query_all(self, db_name, tb_name, limit=None):
# 切换数据库
self.switch_db(db_name)
# 如果直接将limit写到sql中,然后执行可能不太行(因为limit可能为none
sql = f"select * from {tb_name}"
if limit:
sql += f"limit {limit}"
# 执行sql(将结果返回
res = self.query(sql)
return res
# todo: 执行一条插入sql(这个方法看起来不太聪明的样子,明明有了execute方法还要写另一个方法,这是为了记录异常把)
def insert_single_sql(self, sql):
try:
self.execute(sql)
except Exception as e:
logger.error(f"执行一条sql: {sql}发生了{e}异常")
else:
logger.info(f"执行一条sql: {sql}成功!")
# todo: 关闭数据库连接
def close(self):
# 如果对象创建了连接,就关闭
if self.conn:
self.conn.close()
logger.info("数据库连接成功关闭!")
# todo: 从数据库中获取需要处理的文件(1,'D:/etl/json/x00', 1024
def get_processed_files(self, util: MysqlUtil, db_name, tb_name, tb_cols):
# 切换库
util.switch_db(db_name)
# 查看表是否存在
util.check_table_exists_create(db_name, tb_name, tb_cols)
# 查询数据
data = util.query(f"select * from {tb_name}")
# 定义列表存放处理过的文件
processed_list = []
# data 是元组套元组形式, for循环,然后取每个元组中的第二个元素值,下标为1 .1, 'D:/etl/json/x00', 1024,
if data is not None:
for tuple_data in data:
processed_list.append(tuple_data[1])
return processed_list

# 快速获取MySqlUtil对象
def get_mysql_util(user, password, host='127.0.0.1', port=3306,database=None, charset='utf8'):
return MysqlUtil(host, port, user, password, database ,charset)

if __name__ == '__main__':
# 创建数据库连接对象
mysql_util = MysqlUtil('localhost', 3306, 'root', '123456', 'test', 'utf8')

StringModule

文件工具

主要实现了字符串/数字判空,以及数据转换的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""
字符串工具方法
"""
# 判断传入的字符串是否为空,True表示空, False表示非空
def check_null(data: str):
if not data:
return True
# 转小写
data = data.lower()
if data in ('null', 'none', 'undefined'):
return True
else:
return False

# 检查字符串, 如果是空字符串,返回空字符串,同时去除字符串两端的空格
def check_null_and_trans(data):
if check_null(str(data)):
return ""
elif isinstance(data, str):
return data.strip()
else:
return data
# 检查字符串是否为空, 如果为空则将其转换为sql语句中的null,None在sql中是不能用的
def check_null_trans_to_sql_null(data):
# check_null 函数True表示空
if check_null(data):
return 'null'
else:
# 这里因为是字符串, insert into table values('str')插入字符串必须用单引号
return f"'{data}'"
# 检查传入的数字是否为空,为空返回null,非空返回本身(因为sql中的数字是不用加引号的
def check_num_null_trans_to_sql_null(data):
if check_null(data):
return 'null'
else:
return data
# 数据转换(处理字符串中异常字符,比如单引号,双引号,逗号,分号)但是具有破坏数据本身的内容
def clear_str(data: str):
if check_null(data):
# 为空,则是没有意义的内容,直接不处理
return data
data = data.replace("'", "")
data = data.replace("\"", "")
data = data.replace(";", "")
data = data.replace(",", "")
data = data.replace("@", "")
data = data.replace("\\", "")
return data

TimeModule

时间工具

主要实现了将13位日期(毫秒为单位)转换为10位日期,以及将时间戳转换指定日期格式的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time

# 将13位的时间戳规范成10位的时间戳(因为从Json中读取的数据是毫秒为单位的,python中以秒为单位
def ts13_to_ts10(ts):
# 将ts(时间) 除以1000转为以秒为单位的时间格式 //是整除符号
return ts // 1000
# 将10位的时间戳类型转换为指定日期格式的时间
def ts10_to_str(ts, format = '%Y-%m-%d %H:%M:%S'):
# 将指定时间戳日期转换为时间对象
time_array = time.localtime(ts)
return time.strftime(format, time_array)
# 将13位的时间戳转换为指定日期格式的时间
def ts13_to_str(ts, format = '%Y-%m-%d %H:%M:%S'):
ts = ts13_to_ts10(ts)
return ts10_to_str(ts)

Extension

数据库事务的四大特性

ACID: 原子性Atomicity, 一致性Consistency, 隔离性Isolation, 持久性Durability

Atomicity:原子性,把要执行的SQL语句当做一个整体,要么全部成功,要么全部失败。不能在划分

Consistency:一致性,数据一旦写入,全局查询到的结果都是一致的

  • 银行(分行) => 银行卡有10000,取2000(任何一个分行),所有分行都是完全一致。

Isolation:隔离性,事务处理之间都是相互独立的,不会互相影响

Durability:持久性,事务一旦提交,就会永久生效。

MySQL中,事务处理如何实现的

第一步:开启事务

1
mysql> start transaction;

第二步:事务操作(增删改)

1
2
mysql> update
mysql> update

第三步:做一个简单的判断,判断以上语句是否执行成功与否

二选一:

1
2
3
4
5
6
成功,则提交事务到磁盘,永久生效
mysql> commit;


失败,则回滚事务,不能把SQL执行写入到磁盘,滚回到没有更新之前状态
mysql> rollback;

注意:如果使用事务处理,则创建数据表时,必须明确指定数据表的引擎格式为InnoDB格式。因为MyISAM这个格式不支持事务处理。