1. ホーム

WebSocketクラスタリングソリューション

2022-02-22 18:37:32

1. 問題の原因

最近、私は複数のユーザー間の通信を必要とするプロジェクトに取り組んでいました。WebSocketのハンドシェイク要求と、クラスタでのWebSocketセッションの共有が含まれています。

その間、私は数日間研究を重ね、zuulでの様々な試みから春のクラウドゲートウェイまで、分散型WebSocketクラスタを実装するいくつかの方法を思いつき、この記事が何人かの人の役に立ち、これに関するアイデアや研究を一緒に共有できればと思い、この記事にまとめました。

以下は、私のシナリオの説明です。

  • リソース : サーバー4台。うち1台のみssl認証ドメイン、redis+mysqlサーバ、アプリケーションサーバ2台(クラスタ化)。

  • アプリ公開の制限 : シナリオの必要性から、アプリケーションサイトは公開するためにssl認証されたドメイン名を必要とします。そのため、ssl認証されたネームサーバーがapiゲートウェイとして使用され、wss(secure authenticated ws)と接続するためのhttpsリクエストに責任を負います。一般にhttpsオフロードと呼ばれ、ユーザーはhttpsのネームサーバー(例:https://oiscircle.com/xxx)を要求しますが、実際のアクセスはhttp+IPアドレスという形になります。ゲートウェイの構成が高く、複数のアプリケーションを扱うことができればよい

  • 必要条件 : ユーザーはアプリにログインし、サーバーへのwss接続を確立する必要があります。単一のメッセージとして、または異なるロール間のグループメッセージとして

  • クラスタ内のアプリケーションサービスの種類 : 各クラスタインスタンスは、httpステートレスリクエストサービスとwsロングコネクションサービスを担当します。

2. システム構成図

私の実装では、各アプリケーションサーバーがhttpとwsの両方のリクエストを担当していますが、実はwsリクエストで作成したチャットモデルを別モジュールとして設定することも可能です。分散型という観点からは、この2種類の実装は似ていますが、実装の容易さという点では、http + ws リクエストを提供するアプリケーションの方が便利です。これについては、次のセクションで説明します。

この記事で扱うテクノロジースタック

  • Eurekaサービスの発見と登録

  • Redisセッションの共有

  • Redisメッセージサブスクリプション

  • スプリングブート

  • Zuulゲートウェイ

  • Spring Cloud Gatewayゲートウェイ

  • Spring WebSocketによるロングコネクションの処理

  • リボンのロードバランシング

  • マルチプロトコルNIOネットワーク通信フレームワーク「Netty」。

  • Consistent Hash 一貫したハッシュアルゴリズム

ここまで来た人は、上に挙げた技術スタックを理解していると思いますので、まだの人は、ネットで入門用のチュートリアルを探して、勉強することから始めてください。以下は、上記の技術に関連するもので、デフォルトでは誰もが理解しているテーマですが...。

3. 技術的実現可能性分析

以下では、セッションの特徴を説明し、これらの特徴に基づいて分散アーキテクチャでwsリクエストを処理するためのn個のクラスタリングソリューションをリストアップします。

WebSocketSessionとHttpSessionの比較

Springの統合WebSocketでは、ws接続ごとに対応するセッションが存在します。Springの統合WebSocketでは、ws接続を確立した後、WebSocketSessionというセッションがあり、同様の方法でクライアントと通信することができます。

protected void handleTextMessage(WebSocketSession session, TextMessage message) {
   System.out.println("Message received by server: "+ message );
   //send message to client
   session.sendMessage(new TextMessage("message"));
}

 つまり、wsセッションはredisにシリアライズできないので、クラスタではすべてのWebSocketSessionsをredisにキャッシュしてセッションを共有することはできません。Redisはhttpsセッションの共有はサポートできますが、websocketセッションの共有はソリューションがないので、redisのwebsocketセッション共有というルートは実行不可能です。

セッシンキー情報をredisにキャッシュし、クラスタ内のサーバーがredisからセッションキー情報を取得し、Webソケットセッションを再構築することはできないか...と考えている方もいらっしゃるかもしれません。この方法を試せる人がいたら教えてほしいのですが...

これは、websocketセッションとhttpセッション共有の違いです。一般的に、httpセッション共有には解決策があり、それは、依存関係を導入するのと同じくらい簡単です。 spring-session-data-redis spring-boot-starter-redis ウェブからデモを探し、やり方を確認して遊んでみてください。また、ウェブソケットセッション共有ソリューションでは、基盤となるウェブソケットが実装されているため、真のウェブソケットセッション共有を行うことはできません。

4. ソリューションの進化

4.1、NettyとSpring WebSocket

最初の頃、私はnettyを使ってウェブソケットサーバーを実装しようとしました。nettyでは、websocketセッションという概念はなく、チャンネルと呼ばれる、各クライアント接続がチャンネルを表す。フロントエンドからのwsリクエストはnettyがリスンするポートを経由し、wsハンドシェイク接続のためのwebsocketプロトコルを取り、メッセージ処理のためにいくつかのハンドラのシリーズ(非難チェーンモード)を経由する。) を使ってメッセージ処理を行います。ウェブソケットセッションと同様に、接続が確立された後、サーバー側にはチャンネルがあり、そのチャンネルを通じてクライアントと通信することができます

   /**
    * TODO assigns to different groups based on the id passed in by the server
    */
   private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
       //retain increases the reference count to prevent the next call from invalidating the reference
       System.out.println("The server received a message from " + ctx.channel().id() + ": " + msg.text());
       // send the message to all channels inside the group, i.e. send the message to the client
       GROUP.writeAndFlush(msg.contain());
   }

で、サーバー側のnettyかspring websocketですが、両実装の長所と短所をいくつか挙げてみますと

4.2. nettyを使用したwebsocketの実装

nettyだけでwebsocketのサーバーサイドを開発すると、確かにスピードは気になりますが、以下のような問題が発生する可能性があります。

  1. システム内の他のアプリケーションとの統合が不便、rpcコールになるとspringcloudのfeignサービスコールの利便性を享受できない。

  2. ビジネスロジックを繰り返し実装しなければならない場合がある

  3. nettyを使用すると、車輪の組み立てを二重に行う必要がある場合がある

  4. サービスレジストリへの接続方法もめんどくさい

  5. restful serviceとws serviceは別々に実装する必要があり、restful serviceをnettyで実装するとなると、どれだけ面倒か想像がつくと思いますが、spring one-stop restful developmentを使って、慣れている方も多いと思います。

4.3. Spring WebSocketを使用したwsサービスの実装

Spring WebSocketはspringbootによく統合されているので、springboot上でwsサービスを開発するのは非常に簡単で、アプローチも非常に簡単です。

4.3.1. ステップ1:依存関係の追加

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

4.3.2. ステップ2:コンフィギュレーションクラスの追加

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(myHandler(), "/")
        .setAllowedOrigins("*");
}
 
@Bean
 public WebSocketHandler myHandler() {
     return new MessageHandler();
 }
}


4.3.3. ステップ3: メッセージリスナー・クラスの実装

@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {
   private List<WebSocketSession> clients = new ArrayList<>();
 
   @Override
   public void afterConnectionEstablished(WebSocketSession session) {
       clients.add(session);
       System.out.println("uri :" + session.getUri());
       System.out.println("Connection established: " + session.getId());
       System.out.println("current seesion: " + clients.size());
   }
 
   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
       clients.remove(session);
       System.out.println("Disconnected: " + session.getId());
   }
 
   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) {
       String payload = message.getPayload();
       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
       System.out.println("Received data" + map);
       clients.forEach(s -> {
           try {
               System.out.println("Sent message to: " + session.getId());
               s.sendMessage(new TextMessage("Server returns received message," + payload));
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
   }
}

このデモから、spring websocketを使ってwsサービスを実装することの利便性を想像していただけると思います。Spring Cloudファミリーとよりよく連携するために、私は最終的にspring websocketを使ってwsサービスを実装することにしました。

つまり、1つのアプリケーションがrestfulサービスとwsサービスの両方を担当するという、私のアプリケーション・サービス・アーキテクチャはこのようになります。wsサービスモジュールを分割しなかったのは、サービスコールをするためにfeignを使わなければならなかったからです。第一に、私は怠け者です。第二に、分割するのとしないのとでは、サービス間のio呼び出しのレイヤーが増えるので、私はそれを行いませんでした。

5.ズール技術からスプリングクラウドゲートウェイへの移行

ウェブソケットクラスタリングを実装するために、以下の理由でどうしてもzuulからspring cloud gatewayに移行する必要がありました。

zuul 2.0 は数ヶ月前にオープンソース化されましたが、バージョン 2.0 は spring boot と統合されておらず、ドキュメントも貧弱でした。そのため、移行は必須であり、移行は簡単に実施できます。

gatewayでは、ssl認証とダイナミックルーティングのロードバランシングを実装するため、ここであらかじめピットを選ばないようにymlファイルでの以下のいくつかの設定が必要です。

server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**

httpsオフロードで楽しく遊びたいなら、フィルタも設定する必要があります。そうしないと、ゲートウェイをリクエストしたときに、SSL/TLSレコードでないエラーが発生します。

@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri ! = null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {
              throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }
 
  @Override
  public int getOrder() {
      return HTTPS_TO_HTTP_FILTER_ORDER;
  }
}

このようにして、ゲートウェイを使用してhttpsのリクエストをオフロードすることができます。ここまでで、基本的なフレームワークが構築され、ゲートウェイは https と wss の両方のリクエストを転送できるようになりました。次のステップは、多対多のユーザー間でセッションを相互運用するための通信ソリューションです。次に、エレガンスに従って、最もエレガントでない解決策から始めることにします。

6.セッションブロードキャスト

これは、websocketクラスタ通信のための最もシンプルなソリューションです。シナリオは以下の通りです。

教師Aが生徒にグループメッセージを送りたい

  • 先生のメッセージリクエストはゲートウェイに送られ、{I am Teacher A and I want to send xxx message to my students}という内容が含まれています。

  • ゲートウェイはメッセージを受信し、クラスタのすべてのIPアドレスを取得し、教師のリクエストを1つずつ呼び出します。

  • クラスタの各サーバーはリクエストを取得し、教師Aの情報に基づいてその生徒に関連するローカルセッションがあるかどうかを調べ、あればsendMessageメソッドを呼び出し、無ければリクエストを無視します。

セッションのブロードキャストは、実装が簡単ですが、致命的な欠陥があります:無駄な計算力の現象は、サーバーは、計算力のループトラバースを無駄にすることと等しいメッセージ受信者のセッションを持っていない場合、ソリューションは、低連続性の要件の場合には、実装が簡単です優先することができます。

スプリングクラウドのサービスクラスタ内の各サーバの情報を取得する方法は以下の通りです。

@Resource
private EurekaClient eurekaClient;
 
Application app = eurekaClient.getApplication("service-name");
//instanceInfo includes a server ip, port and other messages
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address: " + instanceInfo.getIPAddr());

サーバーは、ユーザーの ID とセッションを対応付ける関係マッピングテーブルを維持し、セッションが作成されたときにマッピング関係をマッピングテーブルに追加し、セッションが切断されたときにマッピングテーブルの関係を削除する必要があります。

7. 一貫したハッシュアルゴリズムの実装(本記事の要点)

この方法は、私が最もエレガントな実装だと考えているものです。この方式を理解するには少し時間がかかりますが、根気よく見ていけば、きっと何かを得ることができると思います。繰り返しになりますが、一貫性のあるハッシュアルゴリズムを理解していない方のために、今度はハッシュリングが時計回りのルックアップであると仮定して、まずこちらを読んでみてください。

まず、一貫したハッシュアルゴリズムの考え方をwebsocketクラスタに適用するために、次のような新しい問題を解く必要があります。

  • クラスターノードがDOWNしていると、状態がDOWNしているノードへのハッシュリングマッピングに影響を与える。

  • クラスターノードUP、古いキーが対応するノードにマッピングされていないノードに影響します。

  • ハッシュリングの読み書きの共有。

クラスタでは、常にサービスのUP/DOWN問題が発生します。

ノードDOWNの問題の解析は以下の通りです。

サーバーがDOWNすると、それが所有するウェブソケットセッションは自動的に接続を閉じ、フロントエンドに通知されます。これは、ハッシュリングのマッピングエラーに影響します。サーバーのDOWNをリッスンする際に、ハッシュリング上の実ノードと仮想ノードを削除して、ゲートウェイがDOWN状態のサーバーに転送されるのを回避すればよいのです。

実装する。eureka governance centerでクラスタサービスのDOWNイベントをリッスンし、時間差でハッシュリングを更新する。

ノードUPの問題の解析は以下の通りである。

<ブロッククオート

クラスタ内のサービスであるCacheBがオンラインであり、そのサーバーのIPアドレスがkey1とcacheAの間にマッピングされているとする。すると、key1に対応するユーザはメッセージを送信するために毎回CacheBに走りますが、CacheBにはkey1に対応するセッションがないため、当然ながらメッセージは送信されません。

この時点で、2つの解決策があります。 

解決策Aはシンプルで、アクションが大きい。

eurekaはノードUPイベントをリッスンした後、既存のクラスタ情報に基づいてハッシュリングを更新します。そして、すべてのセッション接続を切断し、クライアントに再接続させる。このとき、メッセージの不達を回避する方法として、クライアントは更新されたハッシュセッションポイントに接続する。

シナリオBは、小さなアクションを伴う複雑なものです。

まず、仮想ノードがない場合を見てみると、CacheCとCacheAの間でCacheBがオンラインになっていることが想定されます。つまり、CacheB がオンラインになると、CacheC と CacheB の間でメッセージを送信するユーザーに影響を与えることになります。そこで、CacheA から CacheC と CacheB を切断し、クライアントにセッションを再接続させればよい。

次に、薄い色のノードを仮想ノードと仮定した場合です。ある領域マッピングの結果がある Cache に属することを表すために、長い括弧を使っている。まず、C ノードがオンラインでない場合である。この図を理解する必要がある。B のすべての仮想ノードは実際の B ノードを指しているので、B ノードはすべて反時計回りの部分で B にマッピングされる(ハッシュリングは時計回りに見えると指定しているため)。

 次にCノードがオンラインになった様子ですが、ある領域がCに占拠されていることがわかります。

以上のことから、あるノードがオンラインになると、対応する多くの仮想ノードも同時にオンラインになるため、マルチセグメントの範囲キー(上図の赤い部分)に対応するセッションを切断する必要があることが分かります。正確なアルゴリズムは少し複雑で、実装も人によって異なるので、自分で実装してみるとよいでしょう。

ハッシュリングはどこに置けばいいのか?

  • ゲートウェイはローカルでハッシュリングを作成し、維持します。wsリクエストが来ると、ローカルはハッシュリングを取得し、マップされたサーバー情報を取得し、wsリクエストを転送する。この方法は良さそうに見えるが、実はあまり好ましくない。上記のサーバーのDOWNはeurekaだけが聞くことができることを思い出してください。eurekaがDOWNイベントを聞いた後、io経由でゲートウェイに該当ノードを削除するように通知する必要があるでしょうか。明らかにeurekaの責務をゲートウェイに分散させるのは面倒ですし、推奨されません。

  • eurekaを作成し、redis共有のread/writeに入れる。このソリューションは動作します。eurekaはサービスのDOWNをリッスンすると、ハッシュリングを修正してredisにプッシュします。リクエストの応答時間をできるだけ短くするために、wsリクエストを転送するたびにgatewayがredisからハッシュリングをフェッチするようなことはできません。ハッシュリングが変更される確率は非常に低く、ゲートウェイはredisのメッセージ購読パターンを適用し、ハッシュリング変更イベントを購読するだけでこの問題を解決することができます。

 この時点で、私たちのSpring WebSocketクラスタはほぼ構築され、最も重要な部分は一貫性のあるハッシュアルゴリズムです。さて、最後に技術的なボトルネックが一つあります。ws リクエストに基づいて指定されたクラスタサーバにゲートウェイをどのように転送するのでしょうか?

spring cloud gatewayやzuulはデフォルトでribbonをロードバランシングとして統合しているので、wsリクエストを作成する際にクライアントから送られたユーザーIDに基づいてribbonのロードバランシングアルゴリズムを書き換え、ユーザーIDに基づいてハッシュ化し、ハッシュリング上のipを探して、そのIPにwsリクエストを転送すれば完了します。フローは以下の図のようになります。

次にあるユーザーが通信を行うとき、IDを元にハッシュを行い、ハッシュリングで対応するipを取得すれば、そのユーザーとのws接続が確立されたときにどのサーバーにセッションが存在したかがわかります!

8、春の雲 Finchley.RELEASE 版のリボンは完璧ではない 

実際にやってみると、被写体はリボンに2つの不完全な点を発見した・・・・・・。

  • Webで見つけた方法によると、AbstractLoadBalancerRuleを継承して負荷分散ポリシーを書き換えた後、複数の異なるアプリケーションからの要求が混乱するそうです。eurekaにA、Bの2つのサービスがあった場合、ロードバランサーのポリシーを書き換えると、A、Bどちらかのサービスにしかリクエストがマッピングされなくなります。とても不思議ですね。Spring Cloud Gatewayのサイトでは、ロードバランシングポリシーを書き換えるデモをきちんと行う必要があるのかもしれませんね。

  • 一貫したハッシュアルゴリズムでは、ユーザーIDのようなキーが必要で、そのキーに従ってハッシュ化し、ハッシュリングで検索してipに返します。しかし、ribbonはchoose関数のkey引数を洗練するのではなく、dead defaultを書き込むだけなのです!

これはどうしようもないことなのだろうか。実行可能で一時的な代替案があります!

下図のように、クライアントは通常のhttpリクエスト(idパラメータ付き)をゲートウェイに送り、ゲートウェイはidに基づいてハッシュを行い、ハッシュリングからipアドレスを探してクライアントに返し、クライアントはそのipアドレスに基づいてwsリクエストを行います。

ribbonは鍵の扱いが完璧ではないので、今のところribbonにconsistent hashing algorithmを実装することはできません。クライアントからの2つのリクエスト(1つはhttp、もう1つはws)により、間接的に一貫したハッシュを実装することができるのみです。ribbonがすぐにこのバグをアップデートしてくれることを願っています。WebSocketクラスタの実装をもう少しエレガントにしましょう。

9. ポストスクリプト 

 上記は、ここ数日の手探りの結果です。多くの問題に遭遇し、一つずつ解決していき、2つのwebsocketクラスタリングの解決策をリストアップしました。1つ目はセッションブロードキャスト、2つ目は一貫したハッシュ化です。

この記事では、ActiveMQやKarfaなどのメッセージキューを利用してメッセージプッシュを実装するのではなく、メッセージキューに頼らずに複数ユーザー間のロングコネクション通信を簡単な方法で実装したいだけです。異なる考え方を提供できれば幸いです。