GParsでリモート処理
はじめに
これはG*アドベントカレンダーの第21日目の記事です。
今回は、GPars 1.3.0から追加される(であろう)機能の一つ、リモート処理(remoting)をご紹介します。
この記事では、次の環境で動作をさせています。
OS: OSX 10.9.5 Java: 1.8.0_25 Groovy: 2.4.0-beta-4
GParsとは
GParsとは、JavaやGroovyで並行処理を行うためのフレームワークです。12月21日時点では1.2.1が安定版のバージョンで、1.3.0はスナップショット版となっています。
GPars自体はGroovyに同梱されており、2.3.9、2.4.0-beta-4とも安定版の1.2.1が同梱されています。
GParsのリモート処理
GParsが元々持っている
- Actor
- Agent
- Dataflow
は同じJVM上で動作させることになりますが、GParsのリモート処理とはこれらを別JVM上で動作せることが可能となります。これにより、別JVM上のActorにメッセージを送ったり、応答を受け取ったりすることができるようになります。
同じホスト上の別JVM上だけでなく、別ホスト上のJVM上でも動作させることが可能となっています。
なお、メッセージのやりとりについては、ベースにNettyを使って実現しているようです。
GPars 1.3.0のjarのビルド
GPars 1.3.0はまだ開発中なので、スナップショット版のjarファイルをリポジトリから取得するか、自分でビルドする必要があります。
公式ページからたどると、スナップショット版のjarファイルをリポジトリから取得できる記載がありますが、リポジトリに上がっているjarファイルにはリモート処理用のclassファイルが含まれていないようでしたので、今回は自分でビルドすることにします。
GitHubからソースを落とし、
git clone https://github.com/GPars/GPars.git
ソースディレクトリ内のgradlewコマンドでビルドします。
cd GPars ./gradlew
私の環境ではtestの途中で先に進まなかったので、control+Cで中断しjarファイルだけコピーすることにしました。
cloneしたディレクトリの下に「build/libs/gpars-1.3-SNAPSHOT.jar」ができていますので、${GROOVY_HOME}/lib/gpars-1.2.1.jarを削除して、代わりに「build/libs/gpars-1.3-SNAPSHOT.jar」を${GROOVY_HOME}/libにコピーします。
あと、先程も述べたとおりNettyを使っていますので、Mavenリポジトリからnetty-all-4.0.24.Final.jarを取得し${GROOVY_HOME}/libにコピーします。
リモート処理のサンプルコード
cloneしたディレクトリの下に「src/test/groovy/groovyx/gpars/samples/remote/」というディレクトリがあり、このディレクトリ配下のソースがリモート処理のサンプルコードとなっています。
ここからは、actor/calculatorのサンプルコードを使って、通常のActorとリモート処理のActorがどのように違うのか、を見てみたいと思います。
calculatorのサンプルコードについて
calculatorのサンプルコードですが、次のような内容になっています。
- 計算を依頼し結果を表示するActor(A)と、実際に計算をするActor(B)の、2つのActorを用意。
- AからBへは、2つの数値をメッセージとして順に送信する。
- Bは受け取った2つのメッセージを加算し、計算結果としてAに返す。
- 計算結果を受け取ったAは、標準出力に計算結果を出力する。
従来のActorでのコード
従来のActorを使って実現すると、こんな感じのコードになります。
// LocalCalculator.groovy package groovyx.gpars.samples.remote.actor.calculator import groovyx.gpars.actor.Actors def answerActor = Actors.actor { println "Calculator - Answer" react { a-> // (*a) react { b-> // (*b) reply a + b // (*c) } } } def queryActor = Actors.actor { println "Calculator - Query" answerActor << 1 // (*d) answerActor << 2 // (*e) react { println it } // (*f) } [answerActor, queryActor]*.join()
計算をするanswerActorと、計算を依頼するqueryActorの、2つのActorのオブジェクトを生成し、動作させます。
上記のコードでは、queryActorから1, 2という数値をメッセージとして送信し(*d, *e)、2つのメッセージ受け取った(*a, *b)answerActorではそれらのメッセージを数値として加算し、その結果を返します(*c)。結果を受け取ったqueryActorは、その結果を標準出力に出力します(*f)。
これら2つのActorは、同じJVM上で動作することになります。
従来のActorの実行結果
上記のコードを実行してみます。
groovy LocalCalculator.groovy
実行すると、次の値が標準出力に出力されます。
Calculator - Query Calculator - Answer 3
リモート処理のActorのコード : 計算をする側
リモート処理のActorのコードですが、計算をする側と計算を依頼する側にコードがわかれます。
計算をする側はこんな感じのコードになります。
// RemoteCalculatorAnswer.groovy package groovyx.gpars.samples.remote.actor.calculator import groovyx.gpars.actor.Actors import groovyx.gpars.actor.remote.RemoteActors def HOST = "localhost" def PORT = 9000 def remoteActors = RemoteActors.create() // (*a) remoteActors.startServer HOST, PORT // (*b) def answerActor = Actors.actor { println "Remote Calculator - Answer" remoteActors.publish delegate, "remote-calculator" // (*c) react { a-> react { b-> reply a + b } } } answerActor.join() remoteActors.stopServer() // (*d)
まず、リモート処理用のコンテキストを作成します(*a)。次に、localhostの9000ポートでメッセージ受信を開始します(*b)。
実際にメッセージを受信し処理をするコードは従来のActorと基本的には同じですが、"remote-calculator"という名前でメッセージを受信できるようにパブリッシュ処理を別途行います(*c)。
処理の最後に、コンテキストを終了させます(*d)。
リモート処理のActorのコード : 計算を依頼する側
計算を依頼する側はこんな感じのコードになります。
// RemoteCalculatorQuery.groovy package groovyx.gpars.samples.remote.actor.calculator import groovyx.gpars.actor.Actors import groovyx.gpars.actor.remote.RemoteActors def HOST = "localhost" def PORT = 9000 def remoteActors = RemoteActors.create() // (*a) def queryActor = Actors.actor { println "Remote Calculator - Query" def remoteCalculator = remoteActors.get HOST, PORT, "remote-calculator" get() // (*b) remoteCalculator << 1 // (*c) remoteCalculator << 2 // (*d) react { println it } // (*e) } queryActor.join()
まず、リモート処理用のコンテキストを作成します(*a)。次に、localhostの9000ポート、"remote-calculator"という名前のActorに対してメッセージを送信できるよう、Proxyオブジェクトを取得します(*b)。
取得したProxyオブジェクトに対しメッセージを送信することで、Proxyオブジェクトを介して別JVM上のActorにメッセージを送信することができます(*c, *d)。
最後に、別JVM上のActorからメッセージを受信し標準出力に出力します(*e)。
リモート処理のActorの実行結果
上記のコードを実行してみます。まず、計算する側のコードを実行します。
groovy RemoteCalculatorQuery.groovy
実行すると、次の値が標準出力に出力されます。
Remote Calculator - Answer
次に、計算を依頼する側のコードを先程とは別のターミナルで実行します。
groovy RemoteCalculatorQuery.groovy
実行すると、次の値が標準出力に出力されます。一見すると同じJVM上で動作しているようにも見えますが、別のJVM上で計算した結果を受け取って表示しています。
Remote Calculator - Query 3
なお、私の環境では、次の警告メッセージとスタックトレースが標準エラー出力に出力されました。
12 22, 2014 1:00:11 午前 io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)
最後に
GParsのリモート処理を使うことで、他サーバのActorと割とお手軽に連携する可能になります。また、今回は紹介できませんでしたが、AgentもDataflowについても、Actorと基本的に同じように使うことができると思います。
他サーバと処理を連携させるコードを書く場合などに使ってみるのはいかがでしょうか。
早く1.3.0がリリースされることを希望します。