GParsでリモート処理

はじめに

これはG*アドベントカレンダーの第21日目の記事です。

今回は、GPars 1.3.0から追加される(であろう)機能の一つ、リモート処理(remoting)をご紹介します。

この記事では、次の環境で動作をさせています。

OS: OSX 10.9.5
Java: 1.8.0_25
Groovy: 2.4.0-beta-4

GParsとは

GParsとは、JavaやGroovyで並行処理を行うためのフレームワークです。12月21日時点では1.2.1が安定版のバージョンで、1.3.0はスナップショット版となっています。

GPars自体はGroovyに同梱されており、2.3.9、2.4.0-beta-4とも安定版の1.2.1が同梱されています。

GParsのリモート処理

GParsが元々持っている

  • Actor
  • Agent
  • Dataflow

は同じJVM上で動作させることになりますが、GParsのリモート処理とはこれらを別JVM上で動作せることが可能となります。これにより、別JVM上のActorにメッセージを送ったり、応答を受け取ったりすることができるようになります。

同じホスト上の別JVM上だけでなく、別ホスト上のJVM上でも動作させることが可能となっています。

なお、メッセージのやりとりについては、ベースにNettyを使って実現しているようです。

GPars 1.3.0のjarのビルド

GPars 1.3.0はまだ開発中なので、スナップショット版のjarファイルをリポジトリから取得するか、自分でビルドする必要があります。

公式ページからたどると、スナップショット版のjarファイルをリポジトリから取得できる記載がありますが、リポジトリに上がっているjarファイルにはリモート処理用のclassファイルが含まれていないようでしたので、今回は自分でビルドすることにします。

GitHubからソースを落とし、

git clone https://github.com/GPars/GPars.git

ソースディレクトリ内のgradlewコマンドでビルドします。

cd GPars
./gradlew

私の環境ではtestの途中で先に進まなかったので、control+Cで中断しjarファイルだけコピーすることにしました。

cloneしたディレクトリの下に「build/libs/gpars-1.3-SNAPSHOT.jar」ができていますので、${GROOVY_HOME}/lib/gpars-1.2.1.jarを削除して、代わりに「build/libs/gpars-1.3-SNAPSHOT.jar」を${GROOVY_HOME}/libにコピーします。

あと、先程も述べたとおりNettyを使っていますので、Mavenリポジトリからnetty-all-4.0.24.Final.jarを取得し${GROOVY_HOME}/libにコピーします。

リモート処理のサンプルコード

cloneしたディレクトリの下に「src/test/groovy/groovyx/gpars/samples/remote/」というディレクトリがあり、このディレクトリ配下のソースがリモート処理のサンプルコードとなっています。

ここからは、actor/calculatorのサンプルコードを使って、通常のActorとリモート処理のActorがどのように違うのか、を見てみたいと思います。

calculatorのサンプルコードについて

calculatorのサンプルコードですが、次のような内容になっています。

  • 計算を依頼し結果を表示するActor(A)と、実際に計算をするActor(B)の、2つのActorを用意。
  • AからBへは、2つの数値をメッセージとして順に送信する。
  • Bは受け取った2つのメッセージを加算し、計算結果としてAに返す。
  • 計算結果を受け取ったAは、標準出力に計算結果を出力する。

従来のActorでのコード

従来のActorを使って実現すると、こんな感じのコードになります。

// LocalCalculator.groovy

package groovyx.gpars.samples.remote.actor.calculator

import groovyx.gpars.actor.Actors

def answerActor = Actors.actor {
    println "Calculator - Answer"

    react { a->            // (*a)
        react { b->        // (*b)
            reply a + b    // (*c)
        }
    }
}

def queryActor = Actors.actor {
    println "Calculator - Query"

    answerActor << 1    // (*d)
    answerActor << 2    // (*e)

    react { println it }      // (*f)
}

[answerActor, queryActor]*.join()

計算をするanswerActorと、計算を依頼するqueryActorの、2つのActorのオブジェクトを生成し、動作させます。

上記のコードでは、queryActorから1, 2という数値をメッセージとして送信し(*d, *e)、2つのメッセージ受け取った(*a, *b)answerActorではそれらのメッセージを数値として加算し、その結果を返します(*c)。結果を受け取ったqueryActorは、その結果を標準出力に出力します(*f)。

これら2つのActorは、同じJVM上で動作することになります。

従来のActorの実行結果

上記のコードを実行してみます。

groovy LocalCalculator.groovy

実行すると、次の値が標準出力に出力されます。

Calculator - Query
Calculator - Answer
3

リモート処理のActorのコード : 計算をする側

リモート処理のActorのコードですが、計算をする側と計算を依頼する側にコードがわかれます。

計算をする側はこんな感じのコードになります。

// RemoteCalculatorAnswer.groovy

package groovyx.gpars.samples.remote.actor.calculator

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.remote.RemoteActors

def HOST = "localhost"
def PORT = 9000

def remoteActors = RemoteActors.create()    // (*a)
remoteActors.startServer HOST, PORT           // (*b)

def answerActor = Actors.actor {
    println "Remote Calculator - Answer"

    remoteActors.publish delegate, "remote-calculator"    // (*c)

    react { a->
        react { b->
            reply a + b
        }
    }
}

answerActor.join()

remoteActors.stopServer()    // (*d)

まず、リモート処理用のコンテキストを作成します(*a)。次に、localhostの9000ポートでメッセージ受信を開始します(*b)。

実際にメッセージを受信し処理をするコードは従来のActorと基本的には同じですが、"remote-calculator"という名前でメッセージを受信できるようにパブリッシュ処理を別途行います(*c)。

処理の最後に、コンテキストを終了させます(*d)。

リモート処理のActorのコード : 計算を依頼する側

計算を依頼する側はこんな感じのコードになります。

// RemoteCalculatorQuery.groovy

package groovyx.gpars.samples.remote.actor.calculator

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.remote.RemoteActors

def HOST = "localhost"
def PORT = 9000

def remoteActors = RemoteActors.create()    // (*a)

def queryActor = Actors.actor {
    println "Remote Calculator - Query"

    def remoteCalculator = remoteActors.get HOST, PORT, "remote-calculator" get()    // (*b)

    remoteCalculator << 1    // (*c)
    remoteCalculator << 2    // (*d)

    react { println it }    // (*e)
}
queryActor.join()

まず、リモート処理用のコンテキストを作成します(*a)。次に、localhostの9000ポート、"remote-calculator"という名前のActorに対してメッセージを送信できるよう、Proxyオブジェクトを取得します(*b)。

取得したProxyオブジェクトに対しメッセージを送信することで、Proxyオブジェクトを介して別JVM上のActorにメッセージを送信することができます(*c, *d)。

最後に、別JVM上のActorからメッセージを受信し標準出力に出力します(*e)。

リモート処理のActorの実行結果

上記のコードを実行してみます。まず、計算する側のコードを実行します。

groovy RemoteCalculatorQuery.groovy

実行すると、次の値が標準出力に出力されます。

Remote Calculator - Answer

次に、計算を依頼する側のコードを先程とは別のターミナルで実行します。

groovy RemoteCalculatorQuery.groovy

実行すると、次の値が標準出力に出力されます。一見すると同じJVM上で動作しているようにも見えますが、別のJVM上で計算した結果を受け取って表示しています。

Remote Calculator - Query
3

なお、私の環境では、次の警告メッセージとスタックトレース標準エラー出力に出力されました。

12 22, 2014 1:00:11 午前 io.netty.channel.DefaultChannelPipeline$TailContext exceptionCaught
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)

最後に

GParsのリモート処理を使うことで、他サーバのActorと割とお手軽に連携する可能になります。また、今回は紹介できませんでしたが、AgentもDataflowについても、Actorと基本的に同じように使うことができると思います。

他サーバと処理を連携させるコードを書く場合などに使ってみるのはいかがでしょうか。

早く1.3.0がリリースされることを希望します。