-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathWebSocketServer.py
More file actions
295 lines (262 loc) · 9.97 KB
/
WebSocketServer.py
File metadata and controls
295 lines (262 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# coding=utf-8
import time
import hashlib
import base64
import struct
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s]- %(name)s - %(message)s')
logger = logging.getLogger(__name__)
HTTP_RESPONSE = "HTTP/1.1 {code} {msg}\r\n" \
"Server:LyricTool\r\n" \
"Date:{date}\r\n" \
"Content-Length:{length}\r\n" \
"\r\n" \
"{content}\r\n"
STATUS_CODE = {200: 'OK', 501: 'Not Implemented'}
UPGRADE_WS = "HTTP/1.1 101 Switching Protocols\r\n" \
"Connection: Upgrade\r\n" \
"Upgrade: websocket\r\n" \
"Sec-WebSocket-Accept: {}\r\n" \
"WebSocket-Protocol: chat\r\n\r\n"
class WebSocketError(Exception):
def __init__(self, msg):
"""websocket连接失败"""
self.msg = msg
class EmptyFrame(Exception):
"""空信息帧,说明对方意外断开连接"""
class CloseFrame(Exception):
"""用于断开连接,跳出with"""
def sec_key_gen(msg):
key = msg + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
ser_key = hashlib.sha1(key.encode('utf-8')).digest()
return base64.b64encode(ser_key).decode()
class WebSocketServer:
# Websocket操作码 opcode
FRAME_CONTINUATION = 0x00
FRAME_TEXT = 0x01
FRAME_BINARY = 0x02
FRAME_CLOSE = 0x08
FRAME_PING = 0x09
FRAME_PONG = 0x0A
# 支持的Websocket关闭状态码
CLOSE_NORMAL = 1000
CLOSE_GOING_AWAY = 1001
CLOSE_PROTOCOL_ERROR = 1002
CLOSE_UNSUPPORTED = 1003
CLOSE_NO_STATUS = 1005
CLOSE_ABNORMAL = 1006
UNSUPPORTED_DATA = 1007
POLICY_VIOLATION = 1008
CLOSE_TOO_LARGE = 1009
INTERNAL_ERROR = 1011
SERVICE_RESTART = 1012
TRY_AGAIN_LATER = 1013
def __init__(self, conn):
# 接受一个socket对象
self.conn = conn
self._state = 0
self._tmp_data = b'' # 暂存数据
self.close_event = [] # 自定义关闭事件,在self.close()触发
def open(self):
self._handshake()
return self
def __enter__(self):
return self.open()
def getstate(self):
# 获取连接状态
state_map = {0: 'READY', 1: 'CONNECTION ESTABLISHED', 2: 'HANDSHAKE FINISHED', 3: 'FAILED', -1: 'CLOSED', 4: 'HTTP REQUEST IS'}
return self._state, state_map[self._state]
@property
def state(self):
"""get connection state"""
return self._state
def _handshake(self):
raw_data = b''
while True:
fragment = self.conn.recv(1024)
raw_data += fragment
if len(fragment) < 1024:
break
data = raw_data.decode('utf-8')
header, content = data.split('\r\n\r\n', 1)
header = header.split('\r\n')
options = map(lambda i: i.split(': '), header[1:])
options_dict = {item[0]: item[1] for item in options}
try:
# 解析反向代理服务器转发的真实地址
self.raddr = (options_dict['X-Real-IP'], options_dict['X-Real-Port'])
except KeyError:
self.raddr = self.conn.getpeername()
date = time.strftime("%m,%d%Y", time.localtime())
if 'Sec-WebSocket-Key' not in options_dict:
self.conn.send(
bytes(HTTP_RESPONSE.format(code=501, msg=STATUS_CODE[501], date=date, length=len(date), content=date),
encoding='utf-8'))
self._state = 4
self.data = (header, content)
return False
else:
self._state = 2
self._build(options_dict['Sec-WebSocket-Key'])
return True
def _build(self, sec_key):
# 建立WebSocket连接
response = UPGRADE_WS.format(sec_key_gen(sec_key))
self.conn.send(bytes(response, encoding='utf-8'))
self._state = 1
return True
@staticmethod
def _decode(info):
if info == b'':
# 空帧意味着对方意外断开连接
logging.error("Received an Empty data frame")
raise EmptyFrame()
payload_len = info[1] & 127
fin = 1 if info[0] & 128 == 128 else 0
opcode = info[0] & 15 # 提取opcode
# 提取载荷数据
if payload_len == 126:
# extend_payload_len = info[2:4]
mask = info[4:8]
decoded = info[8:]
elif payload_len == 127:
# extend_payload_len = info[2:10]
mask = info[10:14]
decoded = info[14:]
else:
# extend_payload_len = None
mask = info[2:6]
decoded = info[6:]
bytes_list = bytearray()
# 收集所有数据
for i in range(len(decoded)):
chunk = decoded[i] ^ mask[i % 4]
bytes_list.append(chunk)
return fin, opcode, bytes_list
def _recv(self):
# 处理切片
raw_data = b''
while True:
section = self.conn.recv(1024)
raw_data += section
if len(section) < 1024:
break
fin, opcode, fragment = self._decode(raw_data)
if fin == 0:
# 数据分片,继续递归接收其它分片
data, _opcode = self._recv()
fragment += data
return fragment, opcode
def handle(self, func=None, args=(), heartbeat: bool=False, timeout: int=-1, cycle: int=-1):
"""接受一个函数作为参数"""
# todo:Timeout
if heartbeat:
from multiprocessing import Process
p = Process(target=self.ping, args=(cycle,))
p.start()
while True:
data, opcode = self._recv()
if opcode == 0x08:
# 客户端请求断开
# 解析断开请求
try:
code = struct.unpack('!H', data[0:2])[0]
reason = data[2:]
except (IndexError, struct.error):
code = 1005
reason = b''
logger.info("Client ask for closing websocket with {}:{}.{}".format(code, reason, self.raddr))
self.close()
return code, reason
elif opcode == 0x0A:
# PONG
logger.info('Pong from {}'.format(self.raddr))
else:
if func is not None:
func(str(data, encoding='utf-8'), *args)
def send(self, msg, fin=True, isbytes=False):
# 发送数据
opcode = 0x02 if isbytes else 0x01
fin = 1 if fin else 0
self._send(fin=fin, opcode=opcode, msg=msg)
def _send(self, fin, opcode, msg):
bit_1 = struct.pack('B', fin*(2**7) + opcode)
data = bit_1
msg_len = len(msg)
if msg_len <= 125:
data += struct.pack('B', msg_len)
elif msg_len <= (2**16 - 1):
data += struct.pack('!BH', 126, msg_len)
elif msg_len <= (2**64 - 1):
data += struct.pack('!BQ', 127, msg_len)
else:
# 分片传输超大内容(应该用不到)
while True:
fragment = msg[:(2 ** 64 - 1)]
msg -= fragment
if msg > (2 ** 64 - 1):
if opcode != 0x00:
# 第一个切片声明opcode,剩下的切片opcode全部设为0
self._send(fin=0, opcode=opcode, msg=fragment)
self._send(fin=0, opcode=0x00, msg=fragment)
else:
self._send(fin=1, opcode=0x00, msg=fragment)
break
data += msg
self.conn.send(data)
def send_str(self, msg, encoding='utf-8'):
msg = bytes(msg, encoding=encoding)
self.send(msg)
def ping(self, cycle=5):
ping_msg = 0b10001001
data = struct.pack('B', ping_msg)
data += struct.pack('B', 0)
while self.state == 1:
self.conn.send(data)
logger.info('ping {}'.format(self.raddr))
time.sleep(cycle)
def sending_coroutine(self):
"""构建一个协程,方便其他模块直接向前传输数据"""
while True:
data = yield
self.send(bytes(data, encoding='utf-8'))
def close(self, code=1000, reason=b'Normally closed.'):
# 执行关闭事件
if self.close_event:
for event, args in self.close_event:
event(*args)
# 发送关闭控制帧, 关闭码和原因信息
if self.state:
code = struct.pack('!H', code)
msg = code + reason
self._send(fin=1, opcode=0x08, msg=msg)
"""
Socket的close方法并不能立即释放连接
Websocket要求收到关闭帧的一方在返回关闭帧后立即释放连接,否则认为非正常关闭, 状态码1006
只有在使用multiprocess时会出现这种问题
socket的shutdown方法的参数
0:关闭接收通道 SHUT_RD
1:关闭发送通道 SHUT_WR
2:两个都关闭
"""
try:
self.conn.shutdown(2) # 两个都关闭
except OSError:
logging.info('Connection has been closed by Client {}.'.format(self.raddr))
self.conn.close()
self._state = -1
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is IOError:
logging.info(exc_val)
self.close()
return True
if exc_type is WebSocketError:
logging.info(exc_val)
self.close()
return True
if exc_type is EmptyFrame:
logging.info("Client {} closed connection unexpectedly.".format(self.raddr))
self.close()
return True
if self.state != -1:
self.close()