Flaskフレームワークでサーバープッシュを実装する方法とは?
質問
Flaskのマイクロウェブフレームワークでサーバープッシュ機能を使った小さなサイトを構築しようとしているのですが、直接連携するフレームワークがあるのか知りませんでした。
私が使用したのは Juggernaut とは相性が悪いようで redis-py とは連動しないようですし、Juggernautは最近非推奨になっています。
どなたか、私のケースについてご提案いただけないでしょうか?
どのように解決するのですか?
以下のサイトをご覧ください。 サーバーから送信されるイベント . Server-Sent Events は ブラウザの API で、サーバへのソケットを開いたまま、更新のストリームを購読することができます。 ストリームを受信することができます。詳細については、Alex MacCaw (著) を参照してください。 Juggernaut)の投稿をご覧ください。 なぜ彼はジャガーノートを殺すのか と、なぜよりシンプルな Server-Sent Events は、多くの場合、この仕事に適したツールであり、Websocket よりも優れている。 ウェブソケットよりも優れています。
プロトコルはとても簡単です。単に mimetype
text/event-stream
を追加するだけです。
を追加するだけです。ブラウザは接続を維持し、更新を待ちます。サーバーから送られるイベント
は、サーバから送信された
data:
で始まり、次の改行がある行です。
data: this is a simple message
<blank line>
構造化されたデータを交換したい場合は、データをjsonとしてダンプし、jsonをワイヤで送信すればよい。
利点はFlaskでSSEを使えることです。 サーバーを追加することなく、FlaskでSSEを使えることです。単純な チャットアプリケーションの例 があります。 は、パブ/サブバックエンドとしてredisを使用しています。
def event_stream():
pubsub = red.pubsub()
pubsub.subscribe('chat')
for message in pubsub.listen():
print message
yield 'data: %s\n\n' % message['data']
@app.route('/post', methods=['POST'])
def post():
message = flask.request.form['message']
user = flask.session.get('user', 'anonymous')
now = datetime.datetime.now().replace(microsecond=0).time()
red.publish('chat', u'[%s] %s: %s' % (now.isoformat(), user, message))
@app.route('/stream')
def stream():
return flask.Response(event_stream(),
mimetype="text/event-stream")
サンプルアプリを実行するためにgunicronを使用する必要はありません。 を使用する必要はありません。アプリを実行するときにスレッドを使用することを確認してください。 そうでなければ、SSE 接続が開発サーバーをブロックしてしまうからです。
if __name__ == '__main__':
app.debug = True
app.run(threaded=True)
クライアント側では、新しいメッセージがサーバーからプッシュされたときに呼び出される、Javascriptのハンドラ関数が必要です。 がプッシュされたときに呼び出されます。
var source = new EventSource('/stream');
source.onmessage = function (event) {
alert(event.data);
};
サーバー送信型イベントは 対応 をサポートしています。 Internet Explorer はまだ Server-Sent Events をサポートしていませんが、Version 10 でサポートされる予定です。 バージョン 10 でサポートされる予定です。古いブラウザをサポートするために、2つの推奨される Polyfill があります。
関連
-
[解決済み] プログラムの実行やシステムコマンドの呼び出しはどのように行うのですか?
-
[解決済み] リストのリストからフラットなリストを作るには?
-
[解決済み] Pythonで現在時刻を取得する方法
-
[解決済み] リストが空かどうかを確認するにはどうすればよいですか?
-
[解決済み] Flaskのリクエストで受信したデータを取得する
-
[解決済み] 2次元アレイにおけるピーク検出
-
[解決済み】ネストされたディレクトリを安全に作成するには?
-
[解決済み】2つの辞書を1つの式でマージする(辞書の和をとる)には?)
-
[解決済み] CSVデータを処理する際、1行目のデータを無視する方法を教えてください。
-
[解決済み] Pythonの文字列の前にあるbという接頭辞は何を意味するのですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] 前月の日時オブジェクトを返す
-
[解決済み] SQLAlchemy: 日付フィールドをフィルタリングする方法は?
-
[解決済み] Django Rest Framework ファイルアップロード
-
[解決済み] 異なる順序で同じ要素を持つ2つのJSONオブジェクトを等しく比較するには?
-
[解決済み] Flask でグローバル変数はスレッドセーフか?リクエスト間でデータを共有するには?
-
[解決済み] Pythonによる一対のクロスプロダクト [重複] (英語)
-
[解決済み] Celeryタスクのユニットテストはどのように行うのですか?
-
[解決済み] Flaskで非同期タスクを作る
-
[解決済み] あるメソッドが複数の引数のうち1つの引数で呼び出されたことを保証する
-
[解決済み] djangoのQueryDictをPythonのDictに変更するには?