説明

サーバ装置、データ順序保証プログラム、およびデータ順序保証方法

【課題】クエリの移動に伴うオーバーヘッドを抑制する。
【解決手段】停止部14は複数のノード11に対するデータの送信を所定期間毎に停止させる。取得部15はデータの送信を停止させた場合に複数のノード11からクエリの移動に伴い更新されるルーティングテーブル13のバージョンを取得する。比較部16は取得した複数のノード11のルーティングテーブル13のバージョンを比較する。更新部17は比較の結果、古いバージョンのルーティングテーブル13が記憶されたノード11がある場合、当該ノード11のルーティングテーブル13を更新する。

【発明の詳細な説明】
【技術分野】
【0001】
本発明は、サーバ装置、データ順序保証プログラム、およびデータ順序保証方法に関する。
【背景技術】
【0002】
近年、様々な対象から刻々と収集される多数のデータを並列に処理する技術として複合イベント処理(CEP(Complex Event Processing))が知られている。この複合イベント処理では、受信したデータに対してイベントを検出し、検出したイベントに関する処理が実行される。なお、複合イベント処理は、ESP(Event Stream Processing)と呼ぶ場合もあるが、ここでは、ESPも包含してCEPと総称する。
【0003】
複合イベント処理を行うCEPシステムでは、一時的に大量の受信データを処理する場合があり、処理負荷が高くなり、処理性能が低下する場合がある。また、CEPシステムは、データをリアルタイムで処理するシステムであるために常時稼働が期待され、停止が許容されない場合がある。
【0004】
そこで、CEPシステムでは、クラウド等に用いられる柔軟な資源割り当てが可能なシステム技術を用いて、処理負荷の変動に応じて複数のサーバや仮想マシン(VM:Virtual Machine)に処理の分散が行われる。例えば、処理負荷が高いサーバや仮想マシンに配置されたクエリと呼ばれる処理要求文やクエリに付随するデータを処理要素として他のサーバや仮想マシンに移動させて処理の分散を行う。
【0005】
しかし、CEPシステムでは、処理要素の移動が発生した場合、データの順序が入れ替わる可能性がある。例えば、データの送信元サーバをAとし、処理要素の移動元サーバをBとし、処理要素の移動先サーバをCとする。送信元サーバAがデータを移動元サーバBへ送信し、処理要素が移動元サーバBから移動先サーバCへ移動する場合を想定する。処理要素が移動元サーバBから移動先サーバCへ移動するタイミングで送信元サーバAから移動元サーバBへ転送されたデータは、移動元サーバBに到着する。移動元サーバBは、処理要素が移動先サーバCへ移動したため、到着したデータを移動先サーバCへ転送する。移動先サーバCは、処理要素の移動が完了すると送信元サーバAへ処理要素の移動完了の通知を行う。送信元サーバAは、移動完了の通知以降、データを移動先サーバCへ直接送信する。このような場合、送信元サーバAから移動先サーバCへ直接送信されたデータが、送信元サーバAから移動元サーバBを介して移動先サーバCへ転送されたデータよりも先に移動先サーバCに到着する可能性がある。
【0006】
このように、データの順序が入れ替わると、データの順序により示される意味が変わる場合がある。例えば、家屋の玄関ドアの鍵の解錠を検知するドアセンサと、室内の人を検知する人感センサを設け、ドアセンサと人感センサから送信されるデータを監視する防犯システムをCEPシステムにより実現した場合を考える。この場合、ドアセンサによる鍵の開閉の検知と人感センサによる室内での動作の検知のデータの順序が入れ替わると意味が変わる。例えば、鍵の解錠が検知された後に、室内で物体が検知された場合、家主の帰宅と考えられる。一方、室内で物体が検知された後に、鍵の解錠が検知された場合、窃盗犯の逃走と考えられる。
【0007】
そこで、処理要素の移動が発生した場合でもデータの順序を保つ技術が提案されている。この技術では、例えば、CEPシステムを管理するコントローラが、処理要素の移動元サーバと移動先サーバを決定すると、移動要求を移動元サーバと移動先サーバに送信する。移動元サーバは、移動元サーバにデータを送信する全ての送信元にpause要求を送り、pause要求に対するackを待つことで、送信元でデータの送信をlockさせ、移動対象の処理要素へのデータを送信元でバッファリングさせる。移動元サーバが全ての送信元からackを受け取った場合、転送中のデータがなくなったことになる。移動元サーバは、全ての送信元からackを受信するまでデータの処理を行う。そして、移動元サーバは、全ての送信元からackを受信すると、処理要素の移動を開始する。移動先サーバは、処理要素の移動完了後、全ての送信元へrestart要求を送信する。送信元は、restart要求を受信すると、ルーティングテーブルを変更してデータの送信先を移動先サーバへ変更し、データの送信を再開してバッファリングしたデータを移動先サーバへ送信する。
【先行技術文献】
【特許文献】
【0008】
【特許文献1】特開2011−39820号公報
【非特許文献】
【0009】
【非特許文献1】Mehul A. Shah, Joseph M. Hellerstein, Sirish Chandrasekaran and Michael J. Franklin著、“Flux: An Adaptive Partitioning Operator for Continuous Query Systems”、ICDE、2003
【発明の概要】
【発明が解決しようとする課題】
【0010】
しかし、先行技術は、処理要素を移動させる際、移動元サーバが全ての送信元にpause要求を送信して送信元でのデータの送信を停止させる。このため。先行技術では、処理要素の移動毎に、データの処理が停止するオーバーヘッドが発生する。特に、多数の処理要素が移動する場合、各処理要素を移動させる毎にデータの処理が停止するため、オーバーヘッドが多くなる。
【0011】
1つの側面では、クエリの移動に伴うオーバーヘッドを抑制できるサーバ装置、データ順序保証プログラム、およびデータ順序保証方法を提供することを目的とする。
【課題を解決するための手段】
【0012】
第1の案では、サーバ装置は、停止部と、取得部と、比較部と、更新部と、を有する。複数のノードは、それぞれ設定された条件にデータが合致した場合に処理を実行するための複数のクエリが配置される。また、複数のノードは、各クエリが処理対象とするデータ毎に当該クエリの配置先およびクエリの移動に伴い更新されるバージョンを格納したルーティングテーブルを記憶する。停止部は、複数のノードに対するデータの送信を所定期間毎に停止させる。取得部は、前記停止部によりデータの送信を停止させた場合に前記複数のノードからルーティングテーブルのバージョンを取得する。比較部は、前記取得部により取得された複数のノードのルーティングテーブルのバージョンを比較する。更新部は、前記比較部による比較の結果、古いバージョンのルーティングテーブルが記憶されたノードがある場合、当該ノードのルーティングテーブルを更新する。
【発明の効果】
【0013】
クエリの移動に伴うオーバーヘッドを抑制できる。
【図面の簡単な説明】
【0014】
【図1】図1は、実施例1に係るサーバ装置を含むCEPシステムの全体構成を示す図である。
【図2】図2は、実施例2に係るCEPシステムの構成の一例を示す図である。
【図3】図3は、データの流れに沿ってCEPシステムの構成を模式的に示した図である。
【図4A】図4Aは、クエリの移動の一例を示す図である。
【図4B】図4Bは、クエリの移動の他の一例を示す図である。
【図5】図5は、クエリのグループ分けの一例を示す図である。
【図6】図6は、ルーティングテーブルのデータ構成の一例を示す図である。
【図7】図7は、格納先テーブルのデータ構成の一例を示す図である。
【図8】図8は、管理サーバの構成の一例を示す図である。
【図9】図9は、停止期間テーブルのデータ構成の一例を示す図である。
【図10】図10は、データ順序保証処理の手順を示すフローチャートである。
【図11】図11は、実施例3に係るCEPシステムの構成を模式的に示した図である。
【図12】図12は、ルーティングテーブルのデータ構成の一例を示す図である。
【図13】図13は、管理サーバの構成の一例を示す図である。
【図14】図14は、データ順序保証処理の手順を示すフローチャートである。
【図15】図15は、データ順序保証プログラムを実行するコンピュータを示す図である。
【発明を実施するための形態】
【0015】
以下に、本発明にかかるサーバ装置、データ順序保証プログラム、およびデータ順序保証方法の実施例を図面に基づいて詳細に説明する。なお、この実施例によりこの発明が限定されるものではない。そして、各実施例は、処理内容を矛盾させない範囲で適宜組み合わせることが可能である。
【実施例1】
【0016】
実施例1に係るサーバ装置について説明する。図1は、実施例1に係るサーバ装置を含むCEPシステムの全体構成を示す図である。サーバ装置10は、CEPシステムを管理する物理サーバであり、例えば、データセンサや各企業に設けられた管理用のサーバコンピュータである。サーバ装置10は、複数のノード11と通信可能とする。ノード11は、サーバ装置またはサーバ装置上で動作する仮想マシンである。各ノード11には、それぞれ設定された条件にデータが合致した場合に処理を実行するための複数のクエリ12が分散配置される。各ノード11には、様々な対象から収集されたデータが配信される。ノード11は、受信したデータがクエリ12に設定された条件に合致した場合にクエリ12に設定された処理を実行する。
【0017】
各ノード11に配置されたクエリ12は、ノード11間の移動が可能とされている。各ノード11は、例えば、処理負荷の分散などで、クエリ12を移動させる場合に、データの送信元でのデータの送信を停止させることなく、クエリ12を移動する。各ノード11は、各クエリ12が処理対象とするデータ毎に当該クエリ12の配置先およびクエリ12の移動に伴い更新されるバージョンを格納したルーティングテーブル13を記憶する。各ノード11は、クエリ12を移動した場合、ルーティングテーブル13に格納された移動したクエリ12の配置先およびバージョンを更新する。例えば、クエリ12が移動した場合、移動先のノード11は、ルーティングテーブル13に格納された移動したクエリ12の配置先およびバージョンを更新すると共に、他のノード11に移動したクエリ12および移動先を通知する。各ノード11は、移動したクエリ12および移動先を通知された場合、ルーティングテーブル13に格納された移動したクエリ12の配置先およびバージョンを更新する。
【0018】
図1に示すように、サーバ装置10は、停止部14と、取得部15と、比較部16と、更新部17とを有する。
【0019】
停止部14は、データの配信を一時的に停止させる。例えば、停止部14は、各ノード11に対するデータの送信を所定期間毎に停止させる。停止部14は、所定期間の周期でデータを送信する送信元へデータの送信停止要求を送信してデータの送信を停止させる。また、停止部14は、後述する比較部16の比較の結果、ルーティングテーブル13のバージョンが同じ場合、又は後述する更新部17によるルーティングテーブル13の更新が完了した場合、送信元へデータの送信再開要求を送信してデータの送信を再開させる。
【0020】
取得部15は、各種情報を取得する。例えば、取得部15は、停止部14によりデータの送信を停止させた場合に各ノード11からルーティングテーブル13のバージョンを取得する。取得部15は、例えば、各ノード11に対してルーティングテーブル13のバージョンの送信要求を送信する。そして、取得部15は、各ノード11からバージョンの通知を受信することにより、各ノード11からルーティングテーブル13のバージョンを取得する。
【0021】
比較部16は、各種の比較を行う。例えば、比較部16は、取得部15により取得された各ノード11のルーティングテーブル13のバージョンを比較する。
【0022】
更新部17は、各種の更新を行う。例えば、更新部17は、比較部16による比較の結果、古いバージョンのルーティングテーブル13が記憶されたノード11がある場合、当該ノード11のルーティングテーブル13を更新する。更新部17は、例えば、何れのノード11に古いバージョンのルーティングテーブル13が記憶されている場合、新しいバージョンのルーティングテーブル13を記憶したノード11に対してルーティングテーブル13の情報の送信を要求する。そして、比較部16は、ノード11からルーティングテーブル13を受信した場合、当該ルーティングテーブル13の内容を古いバージョンのルーティングテーブル13を記憶したノード11に送信して当該ノード11のルーティングテーブル13を更新する。なお、サーバ装置10がルーティングテーブル13を記憶しており、当該ルーティングテーブル13が新しいものである場合、記憶したルーティングテーブル13の内容を送信してもよい。
【0023】
なお、図1の例では、機能的な構成を示したため、停止部14と、取得部15と、比較部16と、更新部17を別に分けているが、例えば、1つのデバイスで構成してもよい。デバイスの一例としては、CPU(Central Processing Unit)やMPU(Micro Processing Unit)などの電子回路が挙げられる。なお、デバイスとして、ASIC(Application Specific Integrated Circuit)やFPGA(Field Programmable Gate Array)などの集積回路を採用することもできる。
【0024】
ここで、CEPシステムでは、システム化する処理の性質によっては厳密なデータの処理順序の保証まで要求しておらず、システム化する処理が要求する精度でデータの処理順序が保証されれば十分なケースが存在する。例えば、家屋の玄関ドアの鍵の解錠を検知するドアセンサと、室内の人を検知する人感センサを設け、ドアセンサと人感センサから送信されるデータを監視する防犯システムをCEPシステムにより実現した場合を考える。このような防犯システムでは、人が玄関ドアの鍵の解錠してから室内に入るまでに数秒程度の期間がある。
【0025】
そこで、サーバ装置10では、要求される精度のデータ順序を保証する所定期間毎に各ノード11に対するデータの送信を停止させ、各ノード11のルーティングテーブル13のバージョンを比較し、古いバージョンのルーティングテーブル13を更新する。例えば、上記の防犯システムをCEPシステムにより実現する場合、所定期間を数百ミリから数秒単位に定める。この所定期間は、サーバ装置10に設けられた記憶部に記憶させてもよい。また、所定期間は、外部の記憶装置に記憶させ、外部の記憶装置から読み出すようにしてもよい。また、所定期間は、例えば、各ノード11に対するデータの送信を所定期間毎に停止させるプログラム内に定義されてもよい。このように、サーバ装置10では、システム化した処理が要求する精度のデータ順序を保証する期間毎にデータの処理を停止させて各ノード11のルーティングテーブル13の一貫性を確保する。これにより、サーバ装置10によれば、クエリ12の移動毎にデータの処理を停止させた場合と比較して、オーバーヘッドの発生を抑制できる。特に、ノード11間を多数のクエリ12が移動する場合、クエリ12の移動毎にデータの処理を停止させるとオーバーヘッドが多くなるが、所定期間毎にデータの処理を停止させてルーティングテーブル13の一貫性を確保することで、オーバーヘッドを抑制できる。
【0026】
このように、サーバ装置10は、複数のノード11に対するデータの送信を所定期間毎に停止させる。また、サーバ装置10は、データの送信を停止させた場合に複数のノード11からルーティングテーブル13のバージョンを取得する。そして、サーバ装置10は、取得した複数のノード11のルーティングテーブル13のバージョンを比較する。サーバ装置10は、比較の結果、古いバージョンのルーティングテーブル13が記憶されたノード11がある場合、当該ノード11のルーティングテーブル13を更新する。これにより、サーバ装置10によれば、システム化する処理が要求する精度のデータの処理順序を保証できる。また、サーバ装置10によれば、クエリ12の移動に伴うオーバーヘッドを抑制できる。
【実施例2】
【0027】
実施例2について説明する。実施例2では、複数のサーバ装置30上でそれぞれVM(Virtual Machine:仮想マシン)31が動作し、VM31に複数のクエリ33が分散配置されたCEPシステム20について説明する。図2は、実施例2に係るCEPシステムの構成の一例を示す図である。CEPシステム20は、サーバ装置21と、複数のサーバ装置30と、管理サーバ40とを有する。サーバ装置21と各サーバ装置30と管理サーバ40との間は、ネットワーク22を介して通信可能に接続される。かかるネットワーク22の一態様としては、有線または無線を問わず、LAN(Local Area Network)やVPN(Virtual Private Network)などの任意の通信網が挙げられる。
【0028】
サーバ装置30では、VM31が動作する。各VM31には、複数のクエリ33が分散配置される。サーバ装置21は、外部ネットワーク23を介して様々な対象からデータを受信し、受信したデータを当該データを処理対象とするクエリ33が配置されたVM31に配信する。管理サーバ40は、サーバ装置21および各サーバ装置30を管理しており、各VM31に配置されたクエリ33の移動を制御する。なお、図2の例では、サーバ装置30を3つ図示したが、サーバ装置30の数は何れであってもよい。
【0029】
図3は、データの流れに沿ってCEPシステムの構成を模式的に示した図である。サーバ装置21、VM31および管理サーバ40は、各クエリ33が処理対象とするデータ毎に当該クエリ33の配置先およびクエリ33の移動に伴い更新されるバージョンを格納したルーティングテーブル50を記憶する。
【0030】
サーバ装置21は、外部ネットワーク23を介して様々な対象から多数のデータを受信する。サーバ装置21は、受信したデータを所定のデータ構成に整える前処理を行う。そして、サーバ装置21は、ルーティングテーブル50に基づき、受信したデータを処理対象とするクエリ33の配置先を特定し、特定した配置先のVM31へ前処理後のデータを送信する。なお、本実施例では、サーバ装置21を1つ図示したが、サーバ装置21を複数設け、複数のサーバ装置21により受信したデータを振り分けてもよい。
【0031】
VM31は、それぞれデータストリーム処理エンジン32を動作させる。データストリーム処理エンジン32は、複合イベント処理を実現するソフトウェアである。データストリーム処理エンジン32は、受信したデータと、クエリ33の条件式とを突合させてイベントを検出し、検出したイベントの処理を実行する制御を行う。また、CEPシステム20では、複数のクエリ33により一連の処理を実行させることもできる。例えば、上位のクエリ33による実行結果のデータを下位のクエリ33へ送信し、下位のクエリ33の入力のデータとすることもできる。データストリーム処理エンジン32は、複数のクエリ33により一連の処理を実行する場合、クエリ33の実行結果のデータを下位のクエリ33が動作するVM31へ転送する。また、データストリーム処理エンジン32は、管理サーバ40からのクエリ33の移動の指示に応じて、他のVM31上で動作するデータストリーム処理エンジン32との間でクエリ33を移動させる制御を行う。データストリーム処理エンジン32は、移動したクエリ33が処理対象とするデータを受信した場合、受信したデータをクエリ33の移動先のVM31へ転送する。また、データストリーム処理エンジン32は、管理サーバ40から後述する送信停止要求を受信した場合、VM31間を転送中のデータがあるかを確認する確認処理を行う。例えば、データストリーム処理エンジン32は、他の全てのVM31のデータストリーム処理エンジン32へ制御パケットを送信する。そして、データストリーム処理エンジン32は、他のVM31から制御パケットの受信待ちを行う。
【0032】
ここで、送信停止要求を受信したタイミングでVM31間を転送中のデータがある場合、VM31は、他のVM31へ制御パケットの送信後、他の全てのVM31から制御パケットを受信する前に、他のVM31への転送データが受信される。一方、VM31間を転送中のデータがない場合、VM31は、他のVM31へ制御パケットの送信後、転送データを受信することなく、他の全てのVM31から制御パケットを受信する。
【0033】
データストリーム処理エンジン32は、他の全てのVM31から制御パケットの受信する前に、転送データを受信した場合、当該転送データを転送先のVM31へ転送する。そして、データストリーム処理エンジン32は、確認処理の再実施の要求を送信する。各VM31のデータストリーム処理エンジン32は、確認処理の再実施の要求を受信した場合、それぞれ他の全てのVM31のデータストリーム処理エンジン32へ制御パケットを再度送信する。
【0034】
一方、データストリーム処理エンジン32は、転送データを受信することなく、他の全てのノード11から制御パケットの受信した場合、サーバ装置10へ全てのVM31から制御パケットを受信した通知を行う。なお、この通知は、各VM31のデータストリーム処理エンジン32がサーバ装置10へ通知してもよい。また、いずれかのVM31のデータストリーム処理エンジン32が代表となり、確認処理の再実施の要求を受信することなく、他の全てのVM31から通知を受けた場合にサーバ装置10へ通知してもよい。
【0035】
ここで、クエリ33の移動について説明する。クエリ33は、付随するデータを有する場合がある。例えば、クエリ33が処理対象とするデータの一定期間の平均を求める処理を行う場合、平均を算出するために一定期間に受信したデータを保持する。また、クエリ33は、処理にテーブルなどの所定のデータを用いる場合もある。データストリーム処理エンジン32は、クエリ33を移動させる場合、クエリ33およびクエリ33に付随するデータを処理要素として移動させる。
【0036】
また、クエリ33の移動には、以下のようなケースが考えられる。図4Aは、クエリ33の移動の一例を示す図である。図4Aに示すように、クエリ33毎に処理対象とするデータが異なる場合、クエリ33を移動させ、移動先で処理対象とするデータを処理させる。例えば、処理を分散させる場合、各クエリ33をそれぞれ別なVM31に移動させて、各VM31によりデータを処理させる。一方、例えば、処理負荷が低い場合、クエリ33を1つのVM31に移動させ、1つのVM31によりデータを処理させる。
【0037】
一方、クエリ33は、処理対象とするデータが複数である場合がある。例えば、データとして証券コードおよび株価がVM31に配信され、証券コード毎に株価の一定期間の移動平均を求める処理を1つのクエリ33により行う場合がある。図4Bは、クエリ33の移動の他の一例を示す図である。図4Bの左側に示すように、クエリ33は、データ1〜nを処理対象とする。この場合、処理対象のデータ1〜n毎に、クエリ33を移動させる。例えば、処理の分散を行う場合、クエリ33を複製して別なVM31に移動させ、処理対象のデータ1〜nをそれぞれ別なVM31により処理させる。一方、例えば、処理負荷が低い場合、クエリ33を1つのVM31にまとめ、処理対象のデータ1〜nを1つのVM31により処理させる。
【0038】
ところで、各クエリ33に対して個別に移動を指示して移動させた場合、移動対象のクエリ33の数が多くなるほど移動の指示回数が多くなる。また、クエリ33が処理対象とするデータ毎に、ルーティングテーブル50に配置先を格納する場合、ルーティングテーブル50は、クエリ33が処理対象とするデータが多くなるほどデータ量が大きくなる。そこで、本実施例では、複数のクエリ33を複数のグループに分け、グループ毎に移動する。また、ルーティングテーブル50は、グループ毎に格納先を管理する。図5は、クエリ33のグループ分けの一例を示す図である。例えば、処理対象とする各データに、32ビットの整数データにより、各データを識別するキーが含まれ、それぞれのクエリ33の処理の条件に異なるキーが設定されているものとする。この場合、例えば、処理の条件に設定されたキーを所定の整数値で割った余りが同じクエリ33を同一のグループとしてグループ分けする。一例としては、処理の条件に設定されたキーを8191で割った余りが同じクエリ33を同一のグループとして静的にグループを分ける。これにより、各クエリ33は、8191個のグループに分かれる。図5の例では、クエリ33をグループに分けした各グループをV−Nodeと示している。そして、本実施例では、V−Node毎に、クエリ33をVM31に配置する。
【0039】
ルーティングテーブル50は、V−Node毎に配置先およびバージョンを格納する。図6は、ルーティングテーブルのデータ構成の一例を示す図である。図6に示すように、ルーティングテーブル50は、ID、格納先、版数の各項目を有する。IDの項目は、V−Nodeを識別するIDを記憶する領域である。格納先の項目は、格納先のVM31を示すIDを記憶する領域である。版数の項目は、バージョン示す値を記憶する領域である。図6の例では、IDが「12」のV−Nodeは、格納先のVM31のIDが「1」であり、バージョンが「3」であることを示す。
【0040】
サーバ装置21および各VM31のデータストリーム処理エンジン32は、キーに基づいてクエリ33の格納先を判別する。例えば、サーバ装置21は、受信したデータに含まれるキーが8203であった場合、8203を8191で割った余り、12を求める。そして、サーバ装置21は、ルーティングテーブル50からV−NodeのIDが12の格納先を求める。図6の例では、V−NodeのIDが12の格納先のVM31のIDが1であると求まる。
【0041】
各VM31は、それぞれ通信を行うIPアドレスおよびポート番号が定められ、各VM31のIPアドレスおよびポート番号に対応してファイルディスクリプタが割り当てられる。サーバ装置21、各VM31および管理サーバ40は、それぞれファイルディスクリプタの番号と各VM31を示すIDを格納先テーブルに記憶する。図7は、格納先テーブルのデータ構成の一例を示す図である。図7に示すように、格納先テーブルは、ID、ファイルディスクリプタの各項目を有する。IDの項目は、VM31のIDを記憶する領域である。ファイルディスクリプタの項目は、IDが示すVM31への通信路に割り当てられたファイルディスクリプタの番号を記憶する領域である。図7の例では、IDが「0」のVM31への通信路に割り当てられたファイルディスクリプタの番号が「6」であることを示す。
【0042】
VM31と通信を行う場合、格納先テーブルから通信対象のVM31のIDに対応するファイルディスクリプタの番号を求める。例えば、通信対象のVM31のIDが「1」である場合、格納先テーブルからVM31のIDが1のファイルディスクリプタの番号が「7」と求まる。そして、求めたファイルディスクリプタの番号を用いてTCP/IPのソケットにより通信対象のVM31と通信を行う。
【0043】
また、各VM31は、それぞれサーバ装置30から割り当てられたCPUやメモリの使用率などのリソースの負荷状態を検出し、検出した負荷状態を示す負荷情報を管理サーバ40へ送信する。管理サーバ40は、各VM31から送信された負荷情報に基づき、移動対象のクエリ33を特定する。そして、管理サーバ40は、移動対象のクエリ33の移動指示を送信する。なお、本実施例では、V−Node単位でクエリ33の移動を行う。管理サーバ40は、移動先のVM31のIDおよび移動対象のV−NodeのIDを含んだ移動指示を移動元のVM31に送信する。移動指示を受信したVM31のデータストリーム処理エンジン32は、移動対象のV−Nodeに属するクエリ33や当該クエリ33に付随するデータなどの処理要素をシリアライズして移動先のVM31へ送信する。
【0044】
移動先のVM31のデータストリーム処理エンジン32は、送信されたデータをデシリアライズして復元し、復元されたクエリ33の駆動を開始する。また、データストリーム処理エンジン32は、ルーティングテーブル50に格納された移動対象のV−Nodeに属するクエリ33の格納先を自身が動作するVM31のIDに更新すると共にバージョンを更新する。例えば、バージョンの値を1加算した値に更新する。そして、データストリーム処理エンジン32は、各VM31、サーバ装置21および管理サーバ40に対して移動対象のV−NodeのIDおよび移動先のVM31のIDを通知する。
【0045】
各VM31、サーバ装置21および管理サーバ40は、移動が通知された場合、ルーティングテーブル50の通知された移動対象のV−Nodeに属するクエリ33の格納先を通知された移動先のVM31のIDに更新すると共にバージョンを更新する。例えば、バージョンの値を1加算した値を受け取り更新する。
【0046】
図8は、管理サーバの構成の一例を示す図である。図8に示すように、管理サーバ40は、通信制御I/F部41と、記憶部42と、制御部43とを有する。
【0047】
通信制御I/F部41は、少なくとも1つのポートを有し、各VM31、サーバ装置21との間の通信を制御するインタフェースである。通信制御I/F部41は、各VM31およびサーバ装置21と各種情報を送受信する。例えば、通信制御I/F部41は、VM31から負荷情報を受信する。また、通信制御I/F部41は、VM31へ各種指示を送信する。
【0048】
記憶部42は、各種情報を記憶する。例えば、記憶部42は、上述したルーティングテーブル50と、最大期間情報51と、停止期間テーブル52を記憶する。記憶部42のデバイスの一例としては、フラッシュメモリやNVSRAM(Non Volatile Static Random Access Memory)などのデータを書き換え可能な半導体メモリや、ハードディスク、光ディスクなどの記憶装置が挙げられる。
【0049】
最大期間情報51は、CEPシステム20において複数のVM31の間でのデータが転送された場合に伝送にかかる最大期間を記憶したものである。最大期間情報51は、CEPシステム20の判定部43aにより、管理サーバ40と通信可能とされたクライアントコンピュータなどの端末装置あるいは管理サーバ40に設けられたマウス、キーボードなどの入力装置から登録される。
【0050】
停止期間テーブル52は、複数のクエリ33を複数のグループに分けたグループ毎に、データの送信を停止させる所定期間を記憶するテーブルである。図9は、停止期間テーブルのデータ構成の一例を示す図である。図9に示すように、停止期間テーブル52は、ID、期間の各項目を有する。IDの項目は、V−NodeのIDを記憶する領域である。期間の項目は、処理順序を保証する期間をミリ秒単位で記憶する領域である。図9の例では、IDが「12」のV−Nodeの所定期間が「100」msであることを示す。
【0051】
制御部43は、例えば、内部メモリ等を有するCPUなどの電子回路であり、判定部43aと、移動部43bと、実行部43cと、停止部43dと、取得部43eと、比較部43fと、更新部43gとを有する。
【0052】
判定部43aは、各種の判定を行う。例えば、判定部43aは、各VM31から送信された負荷情報により示される各VM31の負荷状態に許容される負荷以上のものがあるか否か判定する。
【0053】
移動部43bは、クエリ33の移動を行う。例えば、移動部43bは、判定部43aにより許容される負荷以上と判定されたVM31がある場合、当該VM31の配置された何れのV−Nodeの移動を行う。移動部43bは、例えば、各VM31から送信された負荷情報から負荷状態の低いVM31を特定する。そして、移動部43bは、許容される負荷以上と判定されたVM31を移動元とし、負荷状態の低いVM31を移動先とし、許容される負荷以上と判定されたVM31に配置された何れかのV−Nodeを移動対象として、移動指示を送信する。
【0054】
実行部43cは、停止期間テーブル52から移動したV−Nodeに対応する所定期間を読み出す。また、実行部43cは、最大期間情報51を読み出す。そして、実行部43cは、移動したV−Nodeに対応する所定期間が最大期間情報51が示す最大期間よりも短い場合、後述する停止部43d、取得部43e、比較部43f、更新部43gの処理を実行する。一方、実行部43cは、求めた期間が最大期間以上の場合、停止部43d、取得部43e、比較部43f、更新部43gの処理を停止する。
【0055】
ここで、移動したV−Nodeに対応する期間が最大期間よりも長い場合、クエリ33を移動させてからV−Nodeに対応する期間を経過するまでに、移動元のVM31から各VM31を介して中継されたデータが移動先のVM31に到着する。よって、V−Nodeに対応する期間の前後でのデータ順序が保証される。このため、実行部43cは、移動したV−Nodeに対応する期間が最大期間以上の場合に、停止部43d、取得部43e、比較部43f、更新部43gの処理を停止する。これにより、管理サーバ40によれば、不要な処理の実行を抑制できる。
【0056】
停止部43dは、V−Nodeが移動してから当該V−Nodeに対応する所定期間が経過したときにデータの送信を停止させる。例えば、停止部43dは、V−Nodeの移動先から移動対象のV−NodeのIDおよび移動先のVM31のIDが通知されてから、当該V−Nodeに対応する所定期間を経過したときにデータの送信元および各VM31へ送信停止要求を送信する。各VM31のデータストリーム処理エンジン32は、送信停止要求を受信すると、VM31間を転送中のデータがあるかの確認処理を行い、転送中のデータがない場合にサーバ装置10へ全てのVM31から制御パケットを受信した通知を行う。
【0057】
停止部43dは、後述する比較部43fの比較の結果、ルーティングテーブル50のバージョンが同じ場合、又は後述する更新部43gによるルーティングテーブル50の更新が完了した場合、送信元へデータの送信再開要求を送信してデータの送信を再開させる。
【0058】
取得部43eは、全てのVM31から制御パケットを受信した旨が通知されると、各VM31およびサーバ装置21からルーティングテーブル50に格納された移動したV−Nodeに対応するバージョンを取得する。例えば、取得部43eは、各VM31およびサーバ装置21に対して移動したV−Nodeに対応するバージョンの送信要求を送信する。そして、取得部43eは、各VM31およびサーバ装置21からバージョンの通知を受信することにより、各VM31およびサーバ装置21からバージョンを取得する。
【0059】
比較部43fは、取得した各VM31およびサーバ装置21のルーティングテーブル50のバージョンを比較する。
【0060】
更新部43gは、比較の結果、各VM31およびサーバ装置21の何れかに古いバージョンのルーティングテーブル50が記憶されている場合、ルーティングテーブル50を更新する。例えば、記憶部42に記憶されたルーティングテーブル50のバージョンが新しい場合、更新部43gは、古いバージョンのルーティングテーブル50が記憶されたVM31やサーバ装置21に記憶部42に記憶されたルーティングテーブル50の情報を送信する。また、記憶部42に記憶されたルーティングテーブル50のバージョンが古い場合、更新部43gは、新しいバージョンのルーティングテーブル50を記憶したVM31やサーバ装置21に対してルーティングテーブル50の情報の送信を要求する。そして、更新部43gは、新しいルーティングテーブル50の情報を受信した場合、受信した情報を古いバージョンのルーティングテーブル50が記憶されたVM31やサーバ装置21に送信する。また、更新部43gは、受信した情報で記憶部42に記憶されたルーティングテーブル50を更新する。
【0061】
次に、本実施例に係る管理サーバ40がデータ順序の保証を行う処理の流れを説明する。図10は、データ順序保証処理の手順を示すフローチャートである。このデータ順序保証処理は、例えば、V−Nodeの移動先から移動対象のV−NodeのIDおよび移動先のVM31のIDが通知されたタイミングで実行される。
【0062】
図10に示すように、実行部43cは、停止期間テーブル52から移動したV−Nodeに対応する所定期間を読み出す(ステップS10)。また、実行部43cは、最大期間情報51として記憶された最大期間を読み出す(ステップS11)。そして、実行部43cは、移動したV−Nodeに対応する所定期間が最大期間よりも短いか否かを判定する(ステップS12)。所定期間が最大期間以上の場合(ステップS12否定)、実行部43cは、処理を終了する。一方、所定期間が最大期間よりも短い場合(ステップS12肯定)、停止部43dは、V−Nodeの移動が通知されてから、当該V−Nodeに対応する所定期間を経過したか否かを判定する(ステップS13)。所定期間を経過していない場合(ステップS13否定)、再度ステップS13へ移行して所定期間の経過待ちを行う。一方、所定期間を経過した場合(ステップS13肯定)、停止部43dは、データの送信元および各VM31へ送信停止要求を送信する(ステップS14)。停止部43dは、全ての送信元から送信停止要求に対するACKが得られたか判定する(ステップS15)。ACKが得られていない場合(ステップS15否定)、再度ステップS15へ移行してACK待ちを行う。一方、ACKが得られた場合(ステップS15肯定)、取得部43eは、全てのVM31から制御パケットを受信した通知を受信したか否かを判定する(ステップS16)。全てのVM31から通知が得られていない場合(ステップS16否定)、再度ステップS16へ移行して通知待ちを行う。
【0063】
一方、全てのVM31から通知が得られた場合(ステップS16肯定)、取得部43eは、各VM31およびサーバ装置21からルーティングテーブル50に格納された移動したV−Nodeに対応するバージョンを取得する(ステップS17)。比較部43fは、取得した各VM31およびサーバ装置21のルーティングテーブル50のバージョンを比較し、古いバージョンのルーティングテーブル50があるか否か判定する(ステップS18)。古いバージョンのルーティングテーブル50がある場合(ステップS18肯定)、更新部43gは、古いバージョンのルーティングテーブル50を更新する(ステップS19)。一方、古いバージョンのルーティングテーブル50がない場合(ステップS18否定)、後述するステップS20へ移行する。停止部43dは、送信元へデータの送信再開要求を送信してデータの送信し(ステップS20)、処理を終了する。
【0064】
このように、管理サーバ40は、複数のクエリ33を複数のグループに分けたV−Node毎に所定期間を記憶する。また、管理サーバ40は、V−Nodeが移動してから当該V−Nodeに対応する所定期間が経過したときにデータの送信を停止させる。そして、管理サーバ40は、各VM31およびサーバ装置21から、移動したV−Nodeに関するルーティングテーブル50のバージョンを取得する。管理サーバ40は、取得したバージョンを比較し、古いバージョンのルーティングテーブル50を更新してルーティングテーブル50の一貫性を確保する。これにより、管理サーバ40によれば、要求される精度でのデータ順序を保証できる。また、管理サーバ40によれば、クエリ33の移動させる毎にデータの送信を停止させる必要が無くなるため、オーバーヘッドを抑制できる。
【0065】
また、管理サーバ40によれば、V−Nodeが移動から所定期間が経過したときにデータの送信を停止させて、バージョンの確認を行う。これにより、管理サーバ40によれば、要求される精度のデータ順序を効率よく保証できる。
【0066】
また、管理サーバ40によれば、移動したV−Nodeに対応する所定期間が最大期間よりも短い場合、ルーティングテーブル50の一貫性の確認を行い、所定期間が最大期間以上の場合、一貫性の確認を行わないことで、処理負荷を軽減できる。
【実施例3】
【0067】
実施例3について説明する。実施例3に係るCEPシステムの構成は、上述の実施例2とほぼ同様であるため、異なる部分について説明する。本実施例は、上述の実施例1と同様に、管理サーバ40が、要求される精度のデータ順序を保証する所定期間毎に各VM32からルーティングテーブル50のバージョンを取得して比較し、古いバージョンのルーティングテーブル50を更新する。図11は、実施例3に係るCEPシステムの構成を模式的に示した図である。なお、上述の実施例2と同一の部分については、同一の符号を付してその説明を省略する。
【0068】
各VM31、サーバ装置21および管理サーバ40は、ルーティングテーブル50を更新した場合、更新日時をバージョンとして格納する。V−Nodeの各クエリ33を移動させた場合、移動先のVM31のデータストリーム処理エンジン32は、ルーティングテーブル50の移動したV−Nodeの各クエリ33の格納先を更新すると共にバージョンに更新した更新日時を記憶する。そして、データストリーム処理エンジン32は、各VM31、サーバ装置21および管理サーバ40に対して移動したV−NodeのID、移動先のVM31のID、および更新日時を通知する。各VM31、サーバ装置21および管理サーバ40は、移動が通知された場合、ルーティングテーブル50の通知された移動したV−Nodeの各クエリ33の格納先を更新すると共にバージョンを通知された更新日時に更新する。図12は、ルーティングテーブルのデータ構成の一例を示す図である。なお、上述の実施例2と同一の部分については、同一の符号を付してその説明を省略する。図12に示すように、ルーティングテーブル50は、版数の項目に更新日時が記憶される。図12の例では、IDが「12」のV−Nodeは、格納先のVM31のIDが「1」であり、更新日時が「2011年11月1日 10時1分20.12秒」であることを示す。
【0069】
また、各VM31のデータストリーム処理エンジン32は、一定期間毎に、当該一定期間において最初にデータを転送した日時に関する日時情報53を記憶する。この日時情報53は、例えば、サーバ装置21が受信したデータに受信日時を付加などデータに日時が含まれる場合、データに含まれる日時を示す日時情報としてもよい。また、日時情報53は、データストリーム処理エンジン32がデータを転送した日時を示す日時情報としてもよい。そして、データストリーム処理エンジン32は、一定期間毎に、記憶した日時情報53を管理サーバ40へ送信する。なお、データストリーム処理エンジン32は、一定期間の間にデータの転送を行っていない場合、一定期間毎に、現在時刻を日時情報として管理サーバ40へ送信する。この一定期間は、管理サーバ40が、各VM32からルーティングテーブル50のバージョンを取得して比較を行う所定期間以下であればよい。これにより、管理サーバ40には、一定期間毎に各VM31からデータを転送した日時または現在日時が通知される。
【0070】
図13は、管理サーバの構成の一例を示す図である。なお、上述の実施例2と同一の部分については、同一の符号を付してその説明を省略する。
【0071】
制御部43は、特定部43hをさらに有する。特定部43hは、各種の特定を行う。例えば、特定部43hは、各VM31から通知された日時情報に基づき、直前の一定期間で最も過去に他のVM31へデータを転送した転送日時を特定する。特定部43hは、例えば、各VM31から通知された日時情報のうち日時が最も過去の日時情報の日時を直前の一定期間で最も過去に他のVM31へデータを転送した転送日時を特定する。
【0072】
停止部43dは、要求される精度のデータ順序を保証する所定期間毎に、特定部43hにより特定された直前の一定期間での最も過去の転送日時と、ルーティングテーブル50にバージョンとして記憶された更新日時とを比較する。そして、停止部43dは、転送日時が更新日時以降である場合に、データの送信元および各VM31へ送信停止要求を送信してデータの送信を停止させる。一方、停止部43dは、転送日時が更新日時よりも前である場合に、送信停止要求の送信を行わない。
【0073】
取得部43eは、停止部43dが送信停止要求を送信した場合、全てのVM31から制御パケットを受信した旨が通知されると、各VM31およびサーバ装置21からルーティングテーブル50のバージョンを取得する。すなわち、管理サーバ40は、直前の一定期間での最も過去の転送日時がルーティングテーブル50の更新日時以降である場合、データの送信元および各VM31でのデータの送信を停止させてルーティングテーブル50のバージョンを取得する。一方、取得部43eは、停止部43dが送信停止要求の送信を行わない場合、所定期間毎に、各VM31およびサーバ装置21からルーティングテーブル50のバージョンを取得する。すなわち、管理サーバ40は、直前の一定期間での最も過去の転送日時がルーティングテーブル50の更新日時よりも前である場合、データの送信元および各VM31でのデータの送信を停止させずにルーティングテーブル50のバージョンを取得する。
【0074】
比較部43fは、取得した各VM31およびサーバ装置21のルーティングテーブル50のバージョンを比較する。更新部43gは、比較の結果、各VM31およびサーバ装置21の何れかに古いバージョンのルーティングテーブル50が記憶されている場合、ルーティングテーブル50を更新する。
【0075】
図14は、データ順序保証処理の手順を示すフローチャートである。なお、上述の実施例2と同一の部分については、同一の符号を付してその説明を省略する。このデータ順序保証処理は、例えば、要求される精度のデータ順序を保証する所定期間毎に実行される。
【0076】
特定部43hは、各VM31から通知された日時情報に基づき、直前の一定期間での最も過去の転送日時を特定する(ステップS30)。停止部43dは、直前の一定期間での最も過去の転送日時がルーティングテーブル50の更新日時以降であるか否かを判定する(ステップS31)。転送日時が更新日時以降である場合(ステップS31肯定)、ステップS14へ移行し、送信停止要求を送信してデータの送信を停止させる。一方、転送日時が更新日時より新しい場合(ステップS31否定)、ステップS17へ移行し、各VM31およびサーバ装置21からルーティングテーブル50のバージョンを取得する。
【0077】
このように、管理サーバ40は、直前の一定期間での最も過去の転送日時がルーティングテーブル50の更新日時以降である場合、データの送信元および各VM31でのデータの送信を停止させてルーティングテーブル50のバージョンを取得する。これにより、管理サーバ40によれば、転送日時が更新日時以降である場合、データ順序が変る可能性があるため、データの送信を停止させることにより、データ送信を停止させた前後のデータ順序を保証できる。
【0078】
また、管理サーバ40は、直前の一定期間での最も過去の転送日時がルーティングテーブル50の更新日時よりも新しい場合、データの送信元および各VM31でのデータの送信を停止させずにルーティングテーブル50のバージョンを取得する。これにより、管理サーバ40によれば、転送日時が更新日時よりも前である場合、データ順序が保たれるため、データ送信を停止させずにルーティングテーブル50のバージョンを取得することにより、データ送信を停止による遅延を抑制できる。
【実施例4】
【0079】
さて、これまで開示の装置に関する実施例について説明したが、開示の技術は上述した実施例以外にも、種々の異なる形態にて実施されてよいものである。そこで、以下では、本発明に含まれる他の実施例を説明する。
【0080】
例えば、上記の実施例1では、複数のノード11に対するデータの送信を所定期間毎に停止させる場合について説明したが、開示の装置はこれに限定されない。例えば、複数のノード11の間でクエリ33が移動した場合、クエリ33が移動してから所定期間が経過したときにデータの送信を停止させてもよい。これにより、要求される精度のデータ順序を保証できる。
【0081】
また、上記の実施例2では、V−Nodeが移動から所定期間が経過したときにデータの送信を停止させる場合について説明したが、開示の装置はこれに限定されない。例えば、所定期間を1つ定め、V−Nodeが移動から当該所定期間が経過したときに停止させてもよい。
【0082】
また、上記の実施例2では、V−Nodeが移動から所定期間が経過したときにデータの送信を1回停止させる場合について説明したが、開示の装置はこれに限定されない。例えば、所定期間毎に複数回停止させてもよい。すなわち、所定期間毎で少なくとも1回停止させればよい。
【0083】
また、上記の実施例2では、複数のクエリ33を複数のグループに分けたV−Node毎に所定期間を停止期間テーブル52に記憶させた場合について説明したが、開示の装置はこれに限定されない。例えば、ルーティングテーブル50に所定期間を記憶させるフィールドを設けてルーティングテーブル50に記憶させてもよい。
【0084】
また、上記の実施例2では、V−Nodeが移動から所定期間が経過したときにデータの送信を1回停止させる場合について説明したが、開示の装置はこれに限定されない。例えば、実施例3と同様に、ルーティングテーブル50にバージョンとして更新日時を記憶させる。また、各VM31のデータストリーム処理エンジン32は、一定期間毎に、当該一定期間において最初にデータを転送した日時に関する日時情報を管理サーバ40へ送信する。管理サーバ40は、V−Nodeが移動から所定期間が経過したときに、直前の一定期間での最も過去の転送日時と、ルーティングテーブル50にバージョンとして記憶された更新日時とを比較する。管理サーバ40は、転送日時が更新日時以降である場合に、データの送信元および各VM31でのデータの送信を停止させてルーティングテーブル50のバージョンを取得する。一方、管理サーバ40は、転送日時が更新日時よりも前である場合に、データの送信元および各VM31でのデータの送信を停止させずにルーティングテーブル50のバージョンを取得するようにしてもよい。
【0085】
また、上記の実施例2および実施例3では、複数のクエリ33を複数のグループに分けたV−Node単位にクエリ33を移動させる場合について説明したが、開示の装置はこれに限定されない。例えば、クエリ33単位で移動させてもよい。この場合、ルーティングテーブル50は、クエリ33単位に格納先を記憶できるように構成すればよい。また、この場合、例えば、停止期間テーブル52にクエリ33単位に所定期間を記憶させ、クエリ33が移動してから当該クエリ33に対応する所定期間が経過したときにデータの送信を停止させてもよい。
【0086】
また、図示した各装置の各構成要素は機能概念的なものであり、必ずしも物理的に図示の如く構成されていることを要しない。すなわち、各装置の分散・統合の具体的状態は図示のものに限られず、その全部または一部を、各種の負荷や使用状況などに応じて、任意の単位で機能的または物理的に分散・統合して構成することができる。例えば、図1に示す停止部14と、取得部15と、比較部16と、更新部17の各処理部が適宜統合されてもよい。また、例えば、図8、図13に示す判定部43aと、移動部43bと、実行部43cと、停止部43dと、取得部43eと、比較部43fと、更新部43g、特定部43hの各処理部が適宜統合されてもよい。さらに、各処理部にて行なわれる各処理機能は、その全部または任意の一部が、CPUおよび当該CPUにて解析実行されるプログラムにて実現され、あるいは、ワイヤードロジックによるハードウェアとして実現され得る。
【0087】
[データ順序保証プログラム]
また、上記の実施例で説明した各種の処理は、あらかじめ用意されたプログラムをパーソナルコンピュータやワークステーションなどのコンピュータシステムで実行することによって実現することもできる。そこで、以下では、上記の実施例と同様の機能を有するプログラムを実行するコンピュータシステムの一例を説明する。図15は、データ順序保証プログラムを実行するコンピュータを示す図である。
【0088】
図15に示すように、コンピュータ300は、CPU(Central Processing Unit)310、ROM(Read Only Memory)320、HDD(Hard Disk Drive)330、RAM(Random Access Memory)340を有する。これら310〜340の各部は、バス400を介して接続される。
【0089】
ROM320には上記実施例1の停止部14、取得部15、比較部16、更新部17と同様の機能を発揮するデータ順序保証プログラム320aが予め記憶される。あるいは、ROM320には、上記実施例2、実施例3の実行部43c、停止部43d、取得部43e、比較部43f、更新部43g、特定部43と同様の機能を発揮するデータ順序保証プログラム320aが予め記憶される。なお、データ順序保証プログラム320aについては、適宜分離しても良い。
【0090】
そして、CPU310が、データ順序保証プログラム320aをROM320から読み出して実行することで、実施例1〜3の各制御部と同様の動作を実行する。すなわち、データ順序保証プログラム320aは、上記実施例1の停止部14、取得部15、比較部16、更新部17、または上記実施例2、3の実行部43c、停止部43d、取得部43e、比較部43f、更新部43g、特定部43と同様の動作を実行する。
【0091】
なお、上記したデータ順序保証プログラム320aについては、必ずしも最初からROM320に記憶させることを要しない。データ順序保証プログラム320aはHDD330に記憶させてもよい。
【0092】
例えば、コンピュータ300に挿入されるフレキシブルディスク(FD)、CD−ROM、DVDディスク、光磁気ディスク、ICカードなどの「可搬用の物理媒体」にプログラムを記憶させておく。そして、コンピュータ300がこれらからプログラムを読み出して実行するようにしてもよい。
【0093】
さらには、公衆回線、インターネット、LAN、WANなどを介してコンピュータ300に接続される「他のコンピュータ(またはサーバ)」などにプログラムを記憶させておく。そして、コンピュータ300がこれらからプログラムを読み出して実行するようにしてもよい。
【符号の説明】
【0094】
10 サーバ装置
11 ノード
12 クエリ
13 ルーティングテーブル
14 停止部
15 取得部
16 比較部
17 更新部
20 CEPシステム
21 サーバ装置
30 サーバ装置
31 VM
32 データストリーム処理エンジン
33 クエリ
40 管理サーバ
43 制御部
43a 判定部
43b 移動部
43c 実行部
43d 停止部
43e 取得部
43f 比較部
43g 更新部
50 ルーティングテーブル
51 最大期間情報
52 停止期間テーブル

【特許請求の範囲】
【請求項1】
それぞれ設定された条件にデータが合致した場合に処理を実行するための複数のクエリが配置され、各クエリが処理対象とするデータ毎に当該クエリの配置先およびクエリの移動に伴い更新されるバージョンを格納したルーティングテーブルを記憶する複数のノードに対するデータの送信を所定期間毎に停止させる停止部と、
前記停止部によりデータの送信を停止させた場合に前記複数のノードからルーティングテーブルのバージョンを取得する取得部と、
前記取得部により取得された複数のノードのルーティングテーブルのバージョンを比較する比較部と、
前記比較部による比較の結果、古いバージョンのルーティングテーブルが記憶されたノードがある場合、当該ノードのルーティングテーブルを更新する更新部と、
を有することを特徴とするサーバ装置。
【請求項2】
前記停止部は、前記複数のノードの間でクエリが移動した場合、クエリが移動してから前記所定期間が経過したときにデータの送信を停止させる
ことを特徴とする請求項1に記載のサーバ装置。
【請求項3】
クエリ毎に前記所定期間を記憶する記憶部をさらに有し、
前記停止部は、クエリが移動してから当該クエリに対応する所定期間が経過したときにデータの送信を停止させ、
前記取得部は、前記複数のノードから、移動したクエリに関するルーティングテーブルのバージョンを取得する
ことを特徴とする請求項2に記載のサーバ装置。
【請求項4】
前記複数のクエリを複数のグループに分けたグループ毎に前記所定期間を記憶する記憶部をさらに有し、
前記停止部は、クエリが移動してから当該クエリが属するグループに対応する所定期間が経過したときにデータの送信を停止させ、
前記取得部は、前記複数のノードから、移動したクエリが属するグループの各クエリに関するルーティングテーブルのバージョンを取得する
ことを特徴とする請求項2に記載のサーバ装置。
【請求項5】
前記所定期間が、前記複数のノードの間でのデータが転送される最大期間よりも短い場合、前記停止部、前記取得部、前記比較部、前記更新部の処理を実行する実行部をさらに有することを特徴とする請求項1〜4の何れか1つに記載のサーバ装置。
【請求項6】
前記複数のノードから一定期間毎に受信する、当該一定期間の間に最初に他のノードへデータを転送した日時を示す日時情報に基づき、当該一定期間で最も過去に他のノードへデータを転送した日時を特定する特定部をさらに有し、
前記ルーティングテーブルは、前記バージョンとして、更新日時を記憶し、
前記停止部は、前記特定部により特定された日時が前記ルーティングテーブルにバージョンとして記憶された更新日時以降の日時である場合に、複数のノードに対するデータの送信を所定期間毎に停止させる
ことを特徴とする請求項1〜5の何れか1つに記載のサーバ装置。
【請求項7】
コンピュータに、
それぞれ設定された条件にデータが合致した場合に処理を実行するための複数のクエリが配置され、各クエリが処理対象とするデータ毎に当該クエリの配置先およびクエリの移動に伴い更新されるバージョンを格納したルーティングテーブルを記憶する複数のノードに対するデータの送信を所定期間毎に停止させ、
データの送信を停止させた場合に前記複数のノードからルーティングテーブルのバージョンを取得し、
取得された複数のノードのルーティングテーブルのバージョンを比較し、
比較の結果、古いバージョンのルーティングテーブルが記憶されたノードがある場合、当該ノードのルーティングテーブルを更新する
処理を実行させることを特徴とするデータ順序保証プログラム。
【請求項8】
コンピュータが、
それぞれ設定された条件にデータが合致した場合に処理を実行するための複数のクエリが配置され、各クエリが処理対象とするデータ毎に当該クエリの配置先およびクエリの移動に伴い更新されるバージョンを格納したルーティングテーブルを記憶する複数のノードに対するデータの送信を所定期間毎に停止させ、
データの送信を停止させた場合に前記複数のノードからルーティングテーブルのバージョンを取得し、
取得された複数のノードのルーティングテーブルのバージョンを比較し、
比較の結果、古いバージョンのルーティングテーブルが記憶されたノードがある場合、当該ノードのルーティングテーブルを更新する
処理を実行することを特徴とするデータ順序保証方法。
【請求項9】
処理を実行する複数の処理要求文が配置され、当該処理要求文が処理対象とするデータ毎に当該処理要求文の配置先および処理要求文の移動に伴って更新されるバージョンを格納したルーティングテーブルを記憶する複数のノードに対してデータの送信を停止させる停止部と、
前記停止部によりデータの送信が停止した場合に、前記複数のノードから前記ルーティングテーブルのバージョンを取得する取得部と、
前記取得部により取得した複数のノードの前記ルーティングテーブルのバージョンを比較する比較部と、
前記比較部による比較の結果、古いバージョンのルーティングテーブルが記憶されたノードがある場合、当該ノードのルーティングテーブルを更新する更新部と、
を有することを特徴とするサーバ装置。

【図1】
image rotate

【図2】
image rotate

【図3】
image rotate

【図4A】
image rotate

【図4B】
image rotate

【図5】
image rotate

【図6】
image rotate

【図7】
image rotate

【図8】
image rotate

【図9】
image rotate

【図10】
image rotate

【図11】
image rotate

【図12】
image rotate

【図13】
image rotate

【図14】
image rotate

【図15】
image rotate


【公開番号】特開2013−115781(P2013−115781A)
【公開日】平成25年6月10日(2013.6.10)
【国際特許分類】
【出願番号】特願2011−263049(P2011−263049)
【出願日】平成23年11月30日(2011.11.30)
【国等の委託研究の成果に係る記載事項】(出願人による申告)平成23年度、経済産業省、「次世代高信頼・省エネ型IT基盤技術開発・実証事業(大規模データストリーム処理基盤の研究開発)」委託研究、産業技術力強化法第19条の適用を受ける特許出願
【出願人】(000005223)富士通株式会社 (25,993)
【Fターム(参考)】