ある時点でのユーザ接続数を追跡するアプリケーションを実装してきました。現在接続しているユーザー数を追跡する機能を追加してみましょう。
現在接続している全てのユーザ番号のリストを保持するキーを決めて、ユーザが切断した際にそのキーからユーザ番号の要素を削除します。しかし、すぐに課題が生まれます。二人のユーザが正確に同時に開始した場合、どのようにお互いの更新を上書きして無効にしてしまわないように保障すればようでしょうか?やってみましょう。
APIには、append
と呼ばれるメソッドが含まれています。それはチェックアンドセットの略であるCASという特別な値を利用します、この値はどこから取得するのでしょう?getsというメソッドが別にあります。これはappend操作を実行する際に必要になるCAS値を取得することが出来るオブジェクトを返します。appendメソッドについてもう一つ興味深いのは、Future
<Boolean>を返す、非同期メソッドであるということです。操作が正常か失敗したかを確かめるためにこの戻り値を使って待つことが出来ます。非同期のメソッドは結果を待つことなく他の処理を実行できます。後のコードでは、future変数を利用して操作の実行結果を取得することが出来ます。
このチュートリアルではappend
メソッドを利用します。append
が文字列をキャッシュ内の値の末尾に追記するのに対し、prepend
は文字列をキャッシュ内の値の先頭に挿入する点を除いて、prepend
メソッドの機能はappend
と全く同じです。
append
とprepend
メソッドはどちらもアトミックで、キャッシュ内の1つの値に対する操作が完了したら次を実行するように、1つずつ実行します。値の中間にappendあるいはprependすることはできません。もちろん、これらの操作の絶対的な実行順序は保障されません。
registerメソッド内でボールド表記になっているコードを変更する必要があります。
private static String getUserNameToken() { return String.format("<User-%d>", userId); } private static boolean register() { userId = client.incr("UserId", 1, 1); System.out.println("You are user " + userId + "."); CASValue<Object> casValue = client.gets("CurrentUsers"); if (casValue == null) { System.out.println("First user ever!"); try { client.set("CurrentUsers", Integer.MAX_VALUE, getUserNameToken()).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } else { Future<Boolean> appendDone = client.append(casValue.getCas(), "CurrentUsers", getUserNameToken()); try { if (appendDone.get()) { System.out.println("Registration succeeded."); } else { System.out.println("Sorry registration failed."); return false; } } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (ExecutionException e) { e.printStackTrace(); return false; } } userCount = client.incr("UserCount", 1, 1); System.out.println("There are currently " + userCount + " connected."); return true; }
まず、casValue
を取得するために、client.gets("CurrentUsers")
メソッドを実行しています。casValueがnullの場合、まだ誰も接続していないということで、このユーザが最初のユーザになります。よって、単純にCurrentUsers
の値をnew
getUserNameToken()
メソッドを利用してセットします。
null以外の場合、ユーザIDをユーザのリストにappendします。これを行うには、casValue
内のCASをgetCas()
メソッドにより取得し、appendメソッドに指定して実行する必要があります。appendメソッドは非同期であり、Future
<Boolean>を返します。そのfutureに対してget()
メソッドを実行すると、その操作が完了した際にその値が返却されます。例えば、ユーザ名のリストのインスタンスサイズが、キャッシュ内の値の最大サイズを超えた場合など、append操作は失敗することがあります。これらのどちらの場合も処理します。get()メソッドがtrueを返す場合は、登録が成功したことをユーザーに伝えます。falseの場合は登録が失敗します、そしてユーザにその旨を伝え、メインプログラムになにか問題が起きたことを伝えるためにfalseを返します
register
メソッドがfalseを返した場合に対応するため、main
メソッドも変更する必要があります。
try { connect(serverAddress); if (register()) { unregister(); } client.shutdown(1, TimeUnit.MINUTES); } catch (IOException e) { e.printStackTrace(); }
さて、ユーザが去った場合にユーザリストをクリーンアップする実装が必要です。unregister
メソッドを修正して、終了前に注意深く現在のユーザIDをCurrentUsersリストから削除しましょう。これは分散キャッシュにおいて、潜在的に危険な操作です。なぜなら二人以上のユーザが同時にアプリケーションを終了し、ユーザリストへの更新を上書きしてしまう可能性があるためです。分散環境でのクリティカルなセクションを強制的に行うトリックを利用します。
private static void unregister() { try { // Wait for add to succeed. It will only // succeed if the value is not in the cache. while (!client.add("lock:CurrentUsers", 10, "1").get()) { System.out.println("Waiting for the lock..."); Thread.sleep(500); } try { String oldValue = (String)client.get("CurrentUsers"); String userNameToken = getUserNameToken(); String newValue = oldValue.replaceAll(userNameToken, ""); client.set("CurrentUsers", Integer.MAX_VALUE, newValue); } finally { client.delete("lock:CurrentUsers"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.decr("UserCount", 1); System.out.println("Unregistered."); }
一度に1つのアプリケーションでのみCurrentUserを更新できるようにするため、特定のキーに対する値が存在しなかった場合のみ、client.add()
メソッドが成功するという機能を利用します。10秒の有効期限付きで、lock:CurrentUsers
を呼び出します。add出来ない場合、コードは500ミリ秒スリープして、リトライを試みます。
プロトコルで定義されている有効期限は、API用のjavadocで次のように記載されています:
Unix時間(1970年1月1日からの秒数、32ビット値)、あるいは現在の時間からの秒数を指定できます。後者の場合、60*60*24*30(30日間)を超える数値は設定できません。もしクライアントがこれを超える値を送信した場合、サーバはUnix時間として扱います。
追加が成功したら、try/finallyブロックに到達し、実際にCurrentUsersの値を取得し変更を行い、現在のユーザトークンを空文字列に置換します。そして設定し直します。finallyブロックでは、ロックがclient.delete()
メソッドにより削除されていることがわかります。これはCouchbaseからキーを削除し、unregisterを待っている他のクライアントがクリティカルセッションに入ることを許可します。
ユーザが入力したメッセージを取得し、出力するスレッドを記述して、チュートリアルアプリケーションの機能を完成させましょう。
最初のクラスに次のメンバ変数を追加します:
private static Thread messageThread;
次に、再びmain
メソッドを変更し、次の太字の行を追加します:
if (register()) { startMessageThread(); processInput(); unregister(); messageThread.interrupt(); }
続いて、いくつかのヘルパーメソッドを追加しましょう、まずは:
private static void printMessages(long startId, long endId) { for (long i = startId; i <= endId; i++) { String message = (String)client.get("Message:" + i); if (message != null) System.out.println(message); } }
このメソッドはメッセージ番号セットでループ処理し、画面にメッセージを出力します。Couchbaseはキーでのイテレーション処理ができませんが、問題ありません、ここではキーがどういったパターンになるのか完全に分かっているので、このように処理できます。
次はキャッシュ内の、有効期限が切れていない最も古いメッセージを探すためのヘルパーメソッドで、最後のメッセージから開始し最初のメッセージへ戻るように処理します。最終的に最初のメッセージを見つけ、それが最も古いメッセージとしてその番号を返します。正しい番号を返すために最終行で+1しています。
private static long findFirstMessage(long currentId) { CASValue<Object> cas = null; long firstId = currentId; do { firstId -= 1; cas = client.gets("Message:" + firstId); } while (cas != null); return firstId + 1; }
最後に、全てのメッセージを登録される毎に出力するメソッドを記述します。やや複雑なため、後で詳細を解説します。
private static void startMessageThread() { messageThread = new Thread(new Runnable() { public void run() { long messageId = -1; try { while (!Thread.interrupted()) { CASValue<Object> msgCountCas = client.gets("Messages"); if (msgCountCas == null) { Thread.sleep(250); continue; } long current = Long.parseLong((String)msgCountCas.getValue()); if (messageId == -1) { printMessages(findFirstMessage(current), current); } else if (current > messageId) { printMessages(messageId + 1, current); } else { Thread.sleep(250); continue; } messageId = current; } } catch (InterruptedException ex) { } catch (RuntimeException ex) { } System.out.println("Stopped message thread."); } }); messageThread.start(); }
このコードは新しい実行スレッドを生成し、messageThread
変数に代入します。run()
メソッドを実装した匿名Runnable
クラスを作成します。
messageId
変数はwhileループ処理において、いつが初回か分かるような標識値に設定されています。スレッドが中断されるまでwhileループは繰り返されます。
まず、whileループでは、client.gets("Messages")
は値が存在しない場合 null
を返します(この場合ループは少しの間スリープし、ループの先頭に戻り処理を続けます)、値が存在する場合、
gets
メソッドは現在のメッセージIDを取得するために利用できる
CASValue
<Object>
インスタンスを返します。
この時、ループ処理の初回の場合(messageId
==
-1
)、最初から現在までの全てのメッセージを出力する必要があります。
一方、現在のmessageIdが前回の値より大きい場合、いくつかの新規メッセージが最後にチェックしてから追加されたことを意味するので、それらを出力します。
上記以外は、最後にチェックしてから何も変化が無いので単純に少しスリープします。
ループの最後に、現在のメッセージIDを次のループ処理のために記憶しておきます。発生した例外をキャッチし、while ループを抜けるとメッセージスレッドが停止した旨のメッセージを表示します。
実際にはメソッドの最後にスレッドを開始しています。
素晴らしい、これでメッセージが登録されれば、表示することができます。また、このアプリケーションの初回起動時にはキャッシュ内に保存されている全てのメッセージが表示されます。次はユーザがキャッシュを操作するためのメソッドを実装する必要があります。
private static void processInput() { boolean quit = false; System.out.println("Enter text, or /who to see user list, or /quit to exit."); do { String input = System.console().readLine(); if (input.startsWith("/quit")) { quit = true; } else if (input.startsWith("/who")) { System.out.println("Users connected: " + client.get("CurrentUsers")); } else { // Send a new message to the chat long messageId = client.incr("Messages", 1, 1); client.set("Message:" + messageId, 3600, getUserNameToken() + ": " + input); } } while (!quit); }
このメソッドは do/while ループの終了判定に quit 変数を利用し、ユーザに向けていくつかの単純なインストラクションを表示します。
コンソールから一行ずつ読み込まれ、各行をコマンドで始まるかチェックします。ユーザが '/quit' と入力した場合、quit フラグが立ち、ループは終了します。
ユーザが '/who'と入力した場合、キャッシュされている CurrentUsers の値がスクリーンに表示されます。ユーザは今、誰がオンラインなのか、いつでもチェックできます。
それ以外の場合は、行はメッセージとして扱われます。ここでは、Messages
のキーをインクリメントし、メッセージIDとしてその値を使用します。その後、
Message:messageId
をキー、1時間のタイムアウト、ユーザ名とユーザの入力値を引数として、client.set()
メソッドが実行されます。
これらのキャッシュに対する変更をメッセージスレッドが検知し、画面に出力します。もちろん各ユーザが自分で入力したメッセージも画面に表示されます。
コンパイルしたプログラムを複数のターミナルで実行すると、自分自身と会話出来ます。プログラムがちゃんと動作するのを見るだけでも面白いですよね?あなたは非常に優秀なプログラマということです!