1 安装flask_sockets

conda好像装不了,使用pip安装

pip install Flask-Sockets

2 创建websocket服务器

2.1 普通方式

使用以下代码创建一个简单的websocket服务器,服务器地址为:ws://localhost:5678/

# -*- coding: utf-8 -*-

from flask import Flask
from flask_sockets import Sockets
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler

app = Flask(__name__)
sockets = Sockets(app)


@sockets.route('/')
def run(ws):
    while not ws.closed:
        # 接收发送过来的消息
        message = ws.receive()

        response_text = f"Server receive message: {message}"

        # 向客户端发送消息
        ws.send(response_text)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    server = pywsgi.WSGIServer(('localhost', 5678), app, handler_class=WebSocketHandler)
    server.serve_forever()

2.2 蓝图方式

# -*- coding: utf-8 -*-

from flask import Flask, Blueprint
from flask_sockets import Sockets
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler

html = Blueprint(r'html', __name__)
ws = Blueprint(r'ws', __name__)

@html.route('/')
def hello():
    return 'Hello World!'

@ws.route('/')
def echo_socket(socket):
    while not socket.closed:
        message = socket.receive()

        response_text = f"Server receive message: {message}"

        socket.send(response_text)


app = Flask(__name__)
sockets = Sockets(app)

app.register_blueprint(html, url_prefix=r'/')
sockets.register_blueprint(ws, url_prefix=r'/')


if __name__ == "__main__":
    server = pywsgi.WSGIServer(('localhost', 5678), app, handler_class=WebSocketHandler)
    server.serve_forever()

3 在线测试

Python – 使用flask_sockets库构建websocket服务器-StubbornHuang Blog