张春成

V2

2023/04/11阅读:32主题:默认主题

Websocket 通信的方法取舍

Websocket 通信的方法取舍

本文设想了两种 websocket 使用场景,一种是面向低延时的单路串行场景;另一种是面向大吞吐量的多路并行场景。针对两种场景分别设计了 websocket 服务和客户端对,并进行通信实验。

实验结果表明多路并行方法吞吐量更大,但延时稍不可控;而单路串行方法准时性强,但数据阻塞现象严重。两种方法各有所长,使用时应根据具体要求,因地制宜地进行选择。

开源代码可见我的 github 仓库

https://github.com/listenzcc/websocket-speed-test


场景及实验设置

Websocket 是网络通信常用的双工通信协议,是 http 类的“请求-应答”式通信协议的重要补充。

The WebSocketobject provides the API for creating and managing a WebSocket connection to a server, as well as for sending and receiving data on the connection.

WebSocket - Web APIs | MDN

本文设想了两种 websocket 使用场景,一种是面向低延时的单路串行场景;另一种是面向大吞吐量的多路并行场景。

  • 场景一:针对面向低延时的单路串行场景,它可以看成是客户端与服务端建立电话式的长连接,即两者只建立一条连接,建立后再不中断,所有数据的传输均由此路信道完成,它的特点是从一而终;
  • 场景二:针对面向大吞吐量的多路并行场景,它可以看成是客户端根据自己的需要与服务端随机建立若干条短连接,每次连接建立后即进行数据传输,完成传输和应答后此连接随即被关闭,它的特点是阅后即焚。

经过分析不难发现,场景一属于单线程任务(SingleProcess),通信双方在长连接信道上不断地,以串行的方式进行通信和应答。而场景二属于多线程任务(MultiProcess),服务端允许客户端与自己建立大量连接,并且可以为每个连接分配单独处理单元(进程或线程)进行应答。另外,服务端以两种方式进行计算,一种是等待计算结束后返回信息(Slow Response),另一种是直接交由后台计算,不再返回而是记录信息(Fast Response)。

本文根据以上两种场景搭建模拟程序,并以 大小的单通道图像作为通信负载,客户端对图像进行发送,而服务端计算图像全部像素的极佳、均值和中值等信息。以上过程重复 1000 张图片的传输,并对消息传输、处理和返回的时间进行记录。

实验结果及建议

为了避免建议过于靠后,现将它们提前写出来,而支撑它们的材料在之后展开,

  • 首先,并行场景中的总传输耗时较少,代价是反馈不及时,更适合大量不需要即时反馈的信息传输,如数据归档等;
  • 其次,串行场景中的信号反馈十分及时,代价是每个时间只能处理一条消息,更适合发送实时响应的控制信号等。

实验结果分析如下图所示。在并行的场景二中,完成 1000 张图像的传输耗时约 8 秒;在串行的场景一中,完成 1000 张图像的传输耗时约 50 秒。

Process-time.png
Process-time.png

但是,虽然并行方法的总传输时长较低,但这是有代价的,代价是牺牲单张图像的计算响应延时。串行方法的平均处理时长约为 0.05 秒,而并行方法的平均处理时长约为 1.5 秒。当然,我认为这个夸张的时长是有水分的,它可能是由于 Python 孱弱的多线程处理能力。

Response-time.png
Response-time.png

附录:异步 websocket 服务器的代码

import asyncio
import websockets
import multiprocessing

import numpy as np

def deal_msg(msg):
    mat = np.frombuffer(msg, dtype=np.uint8)
    shape = mat.shape
    max = np.max(mat)
    min = np.min(mat)
    mean = np.mean(mat)
    median = np.median(mat)

    response = f"Received: {len(msg)}{shape}{max}{min}{mean}{median}"

    print(f"> {response}")
    return

async def handle(websocket, path):
    '''
    Handle message from the client

    Args:
        :param:websocket: The websocket connection
        :param:path: The path of the client connection
    '''

    print('Client connects', websocket, path)

    msg = await websocket.recv()
    print(f"< {len(msg)}{msg[:8]}")

    p = multiprocessing.Process(target=deal_msg, args=(msg,))
    p.start()

    response = f'Received: {len(msg)}'

    # mat = np.frombuffer(msg, dtype=np.uint8)
    # shape = mat.shape
    # max = np.max(mat)
    # min = np.min(mat)
    # mean = np.mean(mat)
    # median = np.median(mat)

    # response = f"Received: {len(msg)}, {shape}, {max}, {min}, {mean}, {median}"

    await websocket.send(response)
    print(f"> {response}")

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--host", help="host of the websocket connection", default='localhost')
    parser.add_argument(
        "--port", help="port of the websocket connection", default=7788)
    args = parser.parse_args()

    host = args.host
    port = args.port

    start_server = websockets.serve(handle, host, port)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

附录:异步 websocket 客户端的代码

其中,logger 模块用于多进程时记录实验结果。

'''
Websocket client example of short connection
'''


# %%
import time
import logging
import asyncio
import websockets
import multiprocessing

import numpy as np
import pandas as pd

# %%

DEFAULT_LOGGING_KWARGS = dict(
    name='websocket',
    filepath='websocket.log',
    level_file=logging.DEBUG,
    level_console=logging.DEBUG,
    format_file='%(asctime)s %(name)s %(levelname)-8s %(message)-40s {{%(filename)s:%(lineno)s:%(module)s:%(funcName)s}}',
    format_console='%(asctime)s %(name)s %(levelname)-8s %(message)-40s {{%(filename)s:%(lineno)s}}'
)

def GENERATE_LOGGER(name, filepath, level_file, level_console, format_file, format_console):
    '''
    Generate logger from inputs,
    the logger prints message both on the console and into the logging file.
    The DEFAULT_LOGGING_KWARGS is provided to automatically startup

    Args:
        :param:name: The name of the logger
        :param:filepath: The logging filepath
        :param:level_file: The level of logging into the file
        :param:level_console: The level of logging on the console
        :param:format_file: The format when logging on the console
        :param:format_console: The format when logging into the file
    '''


    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    file_handler = logging.FileHandler(filepath)
    file_handler.setFormatter(logging.Formatter(format_file))
    file_handler.setLevel(level_file)

    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(format_console))
    console_handler.setLevel(level_console)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

logger = GENERATE_LOGGER(**DEFAULT_LOGGING_KWARGS)

# %%

async def short_connect(url, msg='', idx=0):
    '''
    Send msg to url and close the connection

    Args:
        :param: url: The url of the websocket connection
        :param: msg: The message to be sent, if it is empty, it waits for console input, default is ''
    '''


    async with websockets.connect(url) as websocket:

        if not msg:
            msg = input("Send message to {}\n>> ".format(url))

        print(f"< {len(msg)}{msg[:8]}")

        t = time.time()
        await websocket.send(msg)
        t1 = time.time()
        response = await websocket.recv()
        t2 = time.time()

        print(f"< {response}")

        print(f'Web socket costs {t2 - t} seconds')
        time_costs = dict(
            time=t,
            send=t1-t,
            recv=t2-t1,
            total=t2-t,
            idx=idx
        )

        logger.debug(time_costs)

# %%
if __name__ == '__main__':
    host = 'localhost'
    port = 7788
    path = ''
    
    # !!! Random & Large & Meaningful bytes sequence
    msg = np.random.randint(0255, (800600)).astype(np.uint8).tobytes()

    url = 'ws://{}:{}/{}'.format(host, port, path)

    for idx in range(1000):
        def target(url, msg, idx):
            asyncio.get_event_loop().run_until_complete(short_connect(url, msg, idx))

        # !!! Used for single mode experiment
        target(url, msg, idx)

        # !!! Used for multi model experiment
        # p = multiprocessing.Process(
        #     target=target, args=(url, msg, idx), daemon=True)
        # p.start()

    input('Enter to leave')

# %%

分类:

后端

标签:

后端

作者介绍

张春成
V2