1 安装flask_sockets
conda好像装不了,使用pip安装
pip install Flask-Sockets
2 创建websocket服务器
2.1 普通方式
使用以下代码创建一个简单的websocket服务器,服务器地址为:ws://localhost:5678/
# -*- coding: utf-8 -*-
from flask import Flask
from flask_sockets import Sockets
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/')
def run(ws):
while not ws.closed:
# 接收发送过来的消息
message = ws.receive()
response_text = f"Server receive message: {message}"
# 向客户端发送消息
ws.send(response_text)
@app.route('/')
def hello():
return 'Hello World!'
if __name__ == "__main__":
server = pywsgi.WSGIServer(('localhost', 5678), app, handler_class=WebSocketHandler)
server.serve_forever()
2.2 蓝图方式
# -*- coding: utf-8 -*-
from flask import Flask, Blueprint
from flask_sockets import Sockets
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
html = Blueprint(r'html', __name__)
ws = Blueprint(r'ws', __name__)
@html.route('/')
def hello():
return 'Hello World!'
@ws.route('/')
def echo_socket(socket):
while not socket.closed:
message = socket.receive()
response_text = f"Server receive message: {message}"
socket.send(response_text)
app = Flask(__name__)
sockets = Sockets(app)
app.register_blueprint(html, url_prefix=r'/')
sockets.register_blueprint(ws, url_prefix=r'/')
if __name__ == "__main__":
server = pywsgi.WSGIServer(('localhost', 5678), app, handler_class=WebSocketHandler)
server.serve_forever()
3 在线测试
本文作者:StubbornHuang
版权声明:本文为站长原创文章,如果转载请注明原文链接!
原文标题:Python – 使用flask_sockets库构建websocket服务器
原文链接:https://www.stubbornhuang.com/1440/
发布于:2021年07月16日 14:42:29
修改于:2023年06月26日 21:30:01
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
评论
52