PlayとRedisでスケーラブルWebSocket(実装編)
前回の続きです。
実際にPlayとRedisでWebSocketアプリケーションを作成したコードサンプルを示します。
★題材
今回の調査の途中でドンピシャのサンプルを見つけています。
http://www.ryantanner.org/2013/03/using-play-iteratees-and-enumerators.html
PlayにバンドルされているサンプルのチャットをRedis対応したサンプルアプリです。
接続情報を環境変数から取ってるふりして実はlocalhost固定だったとか、チャットルームのメンバリストをローカル管理してるから厳密にはスケーラブルになってないとか、手を入れ始めるといろいろ気にはなったんですが(^^;、WebSocket/Redisのコードサンプルとしては非常に有益で必要十分な内容でした。
記してここに感謝します。
今回の作業のゴールはこれを改良して汎用的に使えるWebSocket/Redisのベースクラスを作成することです。
https://github.com/shunjikonishi/websocketchat-redis
ただし現時点ではそれを切り出して単体のjar/pluginにすることは考えていません。現実にはWebSocket/Redisを使って何かを作る予定は今のところないしPlay、Redis共に開発サイクルが早いので、実際に必要になった時にはAPIが変わっている可能性も高いからです。
ここでは実際に必要になった時のプロトタイプとなり、その考え方や注意点を示せれば十分と思っています。
そんなわけでRedisService.scalaとChatRoom.scalaはかなり丁寧に書きましたが、それ以外(JavaScriptとか)は割と適当です。(^^;
JavaScript版のWebSocketラッパーもそのうち作りたいとは思うんですけどね。。。
★PlayでのWebSocketアプリの作り方
http://www.playframework.com/documentation/2.2.x/ScalaWebSockets
Controllerでリクエストハンドラを作る時にActionの代わりにWebSocketを使って受信データをハンドルするIterateeと送信を実行するEnumeratorを返すようにすればOKです。
以上、終わり(^^;
。。。なんですがちょっとだけIterateeとEnumeratorについても触れておきます。
これらを解説するサイトは日本語のものだけでも、かなり多数ありますが抽象度の高い概念なのでなかなか理解するのは難しいです。というか抽象度の高い議論は途中からついていけません。。。(^^;;;
そうした概念的な話はさておき、WebSocketの文脈に限定して話をするなら、
- Iteratee
クライアントからデータを受信した時と接続が切れた時に何をするかを定義するもの
- Enumerator
クライアントに発信するデータを供給するもの
とだけ理解しておけば十分です。
これ、よく考えるとアプリで必要な機能以外は何ひとつ作らなくて良いインターフェースになってるんですよね。
今までPlay2を触ってきた中でこのWebSocketのAPIが一番いけてると思いました。(^^;
★RedisService
https://github.com/shunjikonishi/websocketchat-redis/blob/master/app/models/RedisService.scala
プロトなので関連するクラスを全部1ファイルにまとめています。
RedisServiceクラスの機能は
- コネクションプール
- WebSocketで使用するPubSubChannelの作成
の二つだけです。
- コネクションプール
基本的にRedisを使う場合はコネクションプールを併用した方が良いです。RedisClientは基本的にソケット繋ぎっぱなので、接続のコストをカットできます。
ただし、RedisClientをSubscriberとして使用する場合は注意が必要です。Subscribe時のRedisClientは別スレッドでSocketの入力を監視しているのでプールに戻すタイミングが難しくなります。
今回はやや強引な方法でSubscriberもプールに戻してますが、シビアにスレッドセーフに気を使う必要があるので、素直にSubscriberとして使用する場合は新しいインスタンスを作成して使い捨てた方が良いと思います。(使い捨てる場合もunsubscribe時にdisconnectすることを忘れてはいけません。)
このプロトでは試行錯誤の過程を残す意味でプールに戻すコードを残していますが、もしこれを本当にライブラリ化するのであればSubscriberでは常に新しいインスタンスを使用するようにします。(そうするとborrowClient/returnClientメソッドが不要になるのでAPI的にもすっきりします。)
- PubSubChannel
WebSocketにひもづけるIteratee/Enumeratorをラップするクラスです。
コンストラクタの必須引数は購読するチャネル名のみです。
インスタンスを作成してメンバ変数のinとoutをWebSocketに返せばそれだけで単純なEchoアプリが作れます。
オプションとして
- send: クライアントから受信した文字列をRedisに送信する前に加工する関数
- receive: Redisから受信した文字列をクライアントに送信する前に加工する関数
- disconnect: クライアント切断時にRedisに送信する文字列を返す関数
などを指定できます。
exception/subscribe/unsubscribeなどのSubscriber関連のイベントハンドリング関数も一応指定できるようになってますが多分使うことはないです。
実装に関して言うと
- Publisher: Actorになっていて都度プールから取ってきたクライアントで送信
- Subscriber: ひとつのクライアントでredis-scalaの内部スレッドにおまかせ
となっています。
WebSocketではinとoutは一対ですが、RedisのPub/SubはWebSocketだけで使うものとは限らないので本来的にはは多対多であり、Publish自体はどこから行っても構わない訳です。
こう考えるとPublisherのActorはシングルトンでも構わない気がしますが、Publishのコストは購読者数に比例する(らしい)のでチャネル毎にわけています。
あとこのクラスではsubscribeするチャネルを一つに限定しています。Redisの機能を考えるとマルチチャネル購読を使いたいケースというのは十分に考えられるわけですが、そうすると
- -> in/outにチャネル名も入れる必要がでてくる
- -> 入出力の型をStringからTuple(String, String)に変更
- -> in/outを直接WebSocketに接続できるというメリットが失われる
ので止めました。
まぁマルチチャネル対応版をサブクラスなりTraitに切りだすなりして対応するのはそんなに難しくないので必要に迫られた時に考えたいと思います。
★ChatRoom
https://github.com/shunjikonishi/websocketchat-redis/blob/master/app/models/ChatRoom.scala
ChatRoomに関して具体的なアプリの内容に関する説明はここではしません。
ここではWebSocket/Redisアプリを作る際のポイントとなる部分についてのみ説明します。
- クラスで状態管理をしてはならない
元々のサンプルではチャットルームのメンバ一覧をクラスのvar変数で管理していますが、複数サーバある場合に状態を共有できないのでアウトです。
状態管理の変数は常にRedis上に置く必要があります。
RedisにはListやSetを扱う機能があるのでとても重宝します。(^^;
- WebSocketとPubSubChannelは1対1とは限らない
WebSocket/Redisアプリを作る一番簡単な方法は先に作成したPubSubChannel(のin/out)を直接WebSocketにひもづけることです。
状態管理をすべてクライアント側で行うアプリであれば、サーバ側には一切ロジックを記述する必要がありません。(このサンプルはそうなっていませんが、チャットはそういうやり方でも作成できます。)
ですが、それだと接続毎にRedisのコネクションを消費することになるので無駄が多いわけです。
チャットの場合同じ部屋にいるユーザが購読するRedisチャネルは皆同じなのでホストで一つチャネルを開いてそれを共有できれば大幅にコネクションを節約できます。
実際、PubSubChannelのoutは複数のWebSocketで共有することができます。
ですがinはダメです。何故ならこのIterateeではクライアント切断時にチャネルをクローズしているからです。つまり1人のユーザが退出したら全ユーザのWebSocketが切れます。
しかし、前述の通りPublishはどこから行っても構わないのでPubSubChannelのinをそのまま使う必要はなく、自前のIterateeを使うことができます。
この時問題となるのはRedisの接続管理です。
- いつunsubscribeするか?
今でしょ!なわけないですよ。(^^;
論理的にはWebSocket接続数を数えておいて、0になったらunsubscribeすれば良いのですが、どこでcountUp/Downするかも悩ましかったりするわけです。考慮漏れがあると即リソースリークになるので。。。(--
で。。。結論なんですがこれは変に考えすぎず、クライアントからの切断にのみ反応するようにするべきと思います。具体的にはIteratee#Doneの中。
WebSocketの切断が発生するのは多くの場合ブラウザの画面遷移やクライアントの電源断(タブレット等ののスリープ含む)です。
これらはいつ発生するかはわからないので変に状態管理するよりも確実にそれだけを捕まえるのが良いと思います。
直観ですが、WebSocketアプリではサーバーサイドからの切断もあんまりやらない方が良い気がします。
- どこで接続管理を行うか?
これは絶対にActorを建ててそこでのみ行う必要があります。
IterateeはWebリクエストのワーカースレッド内で動くので、スレッドセーフにするためにはcountUp/Downを伴う操作は同じActorの中からしか実行してはいけません。
なのでIteratee#Done内で実行するDisconnet操作もActorにDisconnectメッセージを投げるだけとなっています。
□□□□
ここまでたどり着くのに相当試行錯誤しており、AkkaのActorにも慣れてないのでイマイチ実装に確信を持ててないんですが、概ね抑えるべきところは抑えたつもりです。
何かおかしなところがあれば是非ご連絡を!
★Playの残念なお知らせ
さて最後に今回気がついたPlayframeworkの衝撃の事実を。。。。(--
Playで修正/リコンパイルを繰り返している時には、修正前に使用されていたインスタンスは回収されません。。。マジでかーーーー!!
前回書いた通り、Redisの最大接続数=10の状態で作業しているとほんの数回修正を行っただけで接続エラー。。。。
その度にPlayの再起動を繰り返すのはなかなか苦痛でした。(最初はPlayのせいとは思わず自分かredis-scalaのリソースリークを疑っていたのでなおさら。。。(--)
Global#onStopでcloseすれば大丈夫かと思ったんですが、ここのロジックが実行されるのはリコンパイルが行われた後っぽい。。。意味ねーーーー!!!
シングルトン(object)とか、どうなってるのかと思うけどこれも単なるクラスのインスタンスなのでポインタ変わって新しいインスタンスが作られるだけなのかなと思ったり。
唯一有効だったのはActorのpostStopにcloseを仕込んで置くことだけど、これも常に有効かどうかは疑わしい。。。(上の推測通りシングルトンのアドレスが変わるのであればアウトだと思う。)
修正前のインスタンスが残るのは仕方ない気がするけど、せめてonStopはリコンパイル前に走って欲しい。。。(--
□□□□
以上、こんだけ書いておけば将来ホントにライブラリ化する日が来たとしても、ちょっとこれ作った奴出てこい!とか思わずに済むでしょう。。。多分。。。(^^;
次回、「HerokuでスケーラブルWebSocket」(かもしれない)
コメント