张春成
2022/09/04阅读:30主题:默认主题
WebSocket 实用记录
WebSocket 实用记录
手头的项目需要用到 WebSocket。
你敢相信吗,这个破玩意的通信功能已经集合了 HTTP,WebSocket 和 Parallel Ports,诚可谓五毒俱全。
本文将记录使用 WebSocket 的一些实用方案,比如如何建立和测试连接,如何判断失联并实现重联等。
WebSocket
首先,WebSocket 可能听上去比较陌生,但它其实与日常上网的 HTTP 协议是“并列”的关系,只是它没有用于超文本消息的传输,因此也没有浏览器给它带来的多种约束。
WebSocket 协议在 2008 年诞生,2011 年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
from websocket[1]
典型的 WebSocket 连接如下
ws: ; //example.com:80/some/path
WebSocket 与 TCP 和 HTTP 之间的关系可以用下图来表示

WebSocket, HTTP and TCP
典型连接方式
既然说它连接简便,那么就意味着它能够通过不长且不复杂的代码来实现双端通信。本文为此提供了服务端(附件 1)和客户端(附件 2)代码,实现了简单的双端通信。
在下图的演示过程中,服务端在右,客户端在左。服务端在接到 stop 消息后会主动关闭连接。但值得注意的是,服务端关闭连接时并没有什么机制来“控制”客户端也关闭他们之间的连接,因此,客户端只能在下一次试图发送消息时,“才会”知道服务端已经停止服务了,即显示消息
Connection is already closed
当然,从逻辑上这并不是协议设计不完善,恰恰相反,正因为通信过程千奇百怪,所以才不需要双端之间采取额外的通信方式,来绕过用户的设计,去将自己的状态通过通信告知对方。
那么什么叫“用户的设计”呢?就是类似上面提到的 stop 消息,这个消息在形式上并无特殊之处,只是通信双方事先约定了这些消息的意义,之后统一执行某些操作。它与底层的 ping 或 pong 消息不同,系统不会“代劳”,只能通过用户来自己维护双方的消息和状态。

基本双端通信演示
那么,另一个问题是,通信的客户端如何知道,或者在什么情况下能够知道,它与服务端之间的连接已经中断或异常了呢?这个问题其实很难回答。你可以想象两个人在通电话,你永远不会知道对方没有回话是因为人不在电话旁边,还是单纯的不想说话。也就是说,如果服务端单纯地关闭了一条链接,客户端还是能检测到它与服务端的连接是“通”的,只是接不到任何回复而已。
而客户端能够感知到的情况,是服务端的对应端口已经离线,或者事先约定要“立刻”回复的信息也没有收到任何回复。后者对应之前的 “Connection is already closed” 消息,而前者代表更低一层的连接,即 TCP 连接,出现了问题
Got error: [WinError 10061] 由于目标计算机积极拒绝,无法连接。
这种异常位于相当底层的、基于过于底层的位置,因此我们既没有意愿,也没有必要去时刻盯着它们。本代码提供了一种在异常情况下的自动重连方法思路,其效果如下图所示。处理异常的机制并不复杂,它只是在发送消息时对连接进行检测,在检测到异常时会试图重新建立连接。

接下来,新的问题是,如何在不发送消息的前提下对连接进行检测呢?严格地说,我们没有很好的方法来做到这样的事情,因为如上述所介绍的,网络通信十分复杂,我们不可能面面俱到地捕捉这些异常。
但有一个投机取巧的方法,那就是检测底层“心跳包”的通信内容,通过 last_pong_tm 来获取上一次服务端返回心跳 pong 的时间。由于这种消息是定时发送的,如果经过一段时间,(较短的时间,比如 1 秒),都没有收到这些消息的话,那就说明连接出现了问题。这时就可以告警并记录异常了。
附件 1,服务端代码
# %%
import logging
import threading
from websocket_server import WebsocketServer
# %%
host = '127.0.0.1'
port = 23333
# %%
def new_client(client, server):
print('New client: {}'.format(client))
handler = client['handler']
server.send_message_to_all("Hey all, a new client has joined us")
handler.send_message('Hello from server')
return
def message_received(client, server, message):
print('Got message: {}'.format(message))
if message == 'stop':
client['handler'].send_close()
print('Sent close to client: {}'.format(client))
return
# %%
if __name__ == "__main__":
server = WebsocketServer(host=host, port=port, loglevel=logging.INFO)
server.set_fn_new_client(new_client)
server.set_fn_message_received(message_received)
t = threading.Thread(target=server.run_forever)
t.setDaemon(True)
t.start()
input('Escape server')
# %%
附件 2,客户端代码
# %%
import time
import threading
import websocket
# %%
host = '127.0.0.1'
port = 23333
kwargs = dict(
ping_interval=1,
)
# %%
status = dict(alive=True)
def on_message(ws, message):
print('Got message: {}'.format(message))
def on_pong(ws, message):
# print('Got pong: {}'.format(message))
return
def on_error(ws, message):
print('Got error: {}'.format(message))
# %%
class MyClient(object):
def __init__(self):
pass
def run_forever(self, host=host, port=port, kwargs=kwargs):
url = 'ws://{}:{}'.format(host, port)
app = websocket.WebSocketApp(
url=url, on_message=on_message, on_pong=on_pong, on_error=on_error)
t = threading.Thread(target=app.run_forever, kwargs=kwargs)
t.setDaemon(True)
t.start()
self.app = app
return app
# %%
if __name__ == "__main__":
client = MyClient()
client.run_forever()
while status['alive']:
print(time.time() - client.app.last_pong_tm)
inp = input('>> ')
# if not inp:
# continue
try:
client.app.send(inp)
except Exception as e:
print(e)
client.run_forever()
print('Client closed')
# %%
参考资料
websocket: https://www.rhttps://www.ruanyifeng.com/blog/2017/05/websocket.htmluanyifeng.com/blog/2017/05/websocket.html
作者介绍