自定义消息协议实现基本客户端服务器通信。

产品需求说明:

1,实现消息的转发(服务器为客户转发消息)
2,处理登录(服务器能够监听客户请求登陆的action)
3,处理退出 (退出动作能够被服务器接收到)
4,维护历史消息,维护在线用户,维护在线用户的连接

详细细节:

1, 多线程去处理每个用户连接,防止主线程阻塞

2,自定义了消息协议,并且自己完成了消息协议的解析

代码

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import socket
import json
import threading

client = socket.socket()
client.connect(("10.211.55.31", 8000))

user = "Penguin1"

# 1,登录
login_template = {
"action": "login",
"user": user
}
client.send(json.dumps(login_template).encode("utf8"))
res = client.recv(1024)
print(res.decode("utf8"))

# 2,获取在线用户
get_user_template = {
"action": "list_user",
}
client.send(json.dumps(get_user_template).encode("utf8"))
res = client.recv(1024)
print("当前在线用户:{}".format(res.decode("utf8")))

# 3,获取历史消息
offonline_msg_template = {
"action": "history_msg",
"user": user
}
client.send(json.dumps(offonline_msg_template).encode("utf8"))
res = client.recv(1024)
print("历史消息:{}".format(res.decode("utf8")))

exit = False

def handle_receive():
# 处理接受请求
while True:
if not exit:
try:
res = client.recv(1024)
except:
break
res = res.decode("utf8")
try:
res_json = json.loads(res)
msg = res_json["data"]
from_user = res_json["from"]
print("收到来自({})用户的消息:{}".format(from_user, msg))
except:
print(res)
else:
break


def handle_send():
while True:
# 1,随时发送消息
# 2,有消息能够随时接收到
op_type = input("请输入你要进行的操作:1,发送消息 2,退出 3,获取当前在线用户")
if op_type not in ["1", "2", "3"]:
print("不支持该操作")
op_type = input("请输入你要进行的操作:1,发送消息 2,退出 3,获取当前在线用户")
elif op_type == "1":
to_user = input("请输入你要发送到用户")
msg = input("请输入你要发送到消息")
send_data_template = {
"action": "send_msg",
"to": to_user,
"from": user,
"date": msg,
}
client.send(json.dumps(send_data_template).encode("utf8"))
elif op_type == "2":
exit_template = {
"actino": "exit",
"user": user
}
client.send(json.dumps(exit_template).encode("utf8"))
exit = True
client.close()
break
elif op_type == "3":
get_user_template = {
"action": "list_user"
}
client.send(json.dumps(get_user_template).encode("utf8"))


if __name__ == "__main__":
send_thread = threading.Thread(target=handle_send)
send_thread.start()
receive_thread = threading.Thread(target=handle_receive())
receive_thread.start()

服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import socket
from collections import defaultdict
import threading
import json


#1,维护用户连接
online_users = defaultdict(dict)
#注意一定是传的方法的名称

# 2,维护用户的历史消息
user_msgs = defaultdict(list)

server = socket.socket()
server.bind(("0.0.0.0", 8000))
server.listen()

def handle_sock(sock, addr):
while True:
data = sock.recv(1024)
json_data = json.loads(data.decode("utf8"))
action = json_data.get("action", "")
if action == "login":
online_users[json_data["user"]] = sock
sock.send("登录成功".encode("utf8"))
elif action == "list_user":
#获取当前在线用户
all_users = [user for user, sock in online_users.items()]
sock.send(json.dumps(all_users).encode("utf8"))
elif action == "history_msg":
sock.send(json.dumps(user_msgs.get(json_data["user"], [])).encode("utf8"))
elif action == "send_msg":
if json_data["to"] in online_users:
online_users[json_data["to"]].send(json.dumps(json_data).encode("utf8"))
user_msgs[json_data["to"]].append(json_data)
elif action == "exit":
del online_users[json_data["user"]]
sock.send("退出成功!".encode("utf8"))

while True:
#阻塞等待连接
sock,addr = server.accept()
#启动一个线程去处理新的用户连接
client_thread = threading.Thread(target = handle_sock,args = (sock,addr))
client_thread.start()

总结:

Python能够将很多复杂的东西简化,但是真正的功能并没有QQ初代完善,规模也是极其小的。本程序只是提供一种socket编程思路,对理解消息的转发,协议的具体规则有很好的帮助。