説明

分散データ処理システムおよび中央管理装置

【課題】複数のノードの性能等を把握することなく適切なスケジューリングを行うことが可能な分散データ処理システムおよび中央管理装置を提供することにある。
【解決手段】中央管理装置に含まれる入力手段は、データ処理の対象となるデータおよび目標終了時刻情報を入力する。判定手段は、データ処理が振り分けられていないノードが存在するかを判定する。抽出手段は、ノード毎に最も遅い目標終了時刻を抽出する。特定手段は、ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定する。振り分け処理手段は、入力されたデータに対するデータ処理を特定されたノードに振り分ける。格納処理手段は、データ処理識別情報、目標終了時刻情報および振り分け先ノード情報を対応づけて振り分け情報格納手段に格納する。入力されたデータに対するデータ処理が振り分けられたノードは、当該データ処理を実行する。

【発明の詳細な説明】
【技術分野】
【0001】
本発明の実施形態は、複数のノードで分散してデータ処理を実行する分散データ処理システムおよび中央管理装置に関する。
【背景技術】
【0002】
近年、複数のノード(計算機)で分散してデータ処理を実行する分散データ処理システムが知られている。なお、データ処理とは、例えば対応するプログラムが実行されることによって処理される1グループのデータ群で構成される処理単位である。
【0003】
分散データ処理システムは、例えばデータ処理を実行する複数のノードと当該複数のノードを統括する中央管理装置とから構成される。
【0004】
ところで、このような分散データ処理システムにおいて分散して実行されるデータ処理(分散データ処理)を目標終了時刻までに完了させるためには、当該分散データ処理システムにおいてデータ処理を適切なノードに振り分ける(割り振る)必要がある。
【0005】
この場合、分散データ処理システムを構成する中央管理装置は、各ノードの性能および負荷状況を逐次把握し、当該各ノードの性能、負荷状況およびデータ処理の目標終了時刻に基づいて、当該データ処理をノードに振り分ける処理(スケジューリング)を行う。
【先行技術文献】
【特許文献】
【0006】
【特許文献1】特開2008−123205号公報
【発明の概要】
【発明が解決しようとする課題】
【0007】
しかしながら、例えば分散データ処理システムを構成する複数のノードに既にデータ処理が振り分けられている状態で新たなデータ処理をノードに振り分けるような場合、中央管理装置は、当該複数のノードの全ての性能をチェックし、全体の最新状況を把握して全体のスケジューリングをし直さなければならない。この場合、複数のノード全体のスケジューリングに伴う通信処理のコストがかかる。
【0008】
そこで、本発明が解決しようとする課題は、複数のノードの性能等を把握することなく、適切なスケジューリングを行うことが可能な分散データ処理システムおよび中央管理装置を提供することにある。
【課題を解決するための手段】
【0009】
実施形態によれば、データ処理を実行する複数のノードと、前記複数のノードと接続された中央管理装置であって当該データ処理を当該ノードの各々に振り分ける中央管理装置とから構成される分散データ処理システムが提供される。
【0010】
実施形態に係る分散データ処理システムを構成する中央管理装置は、振り分け情報格納手段と、入力手段と、判定手段と、抽出手段と、特定手段と、振り分け処理手段と、格納処理手段とを含む。
【0011】
振り分け情報格納手段は、前記ノードの各々に振り分けられたデータ処理を識別するためのデータ処理識別情報、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて格納する。
【0012】
入力手段は、前記ノードに振り分けられていないデータ処理の対象となるデータおよび当該データ処理の目標終了時刻を示す目標終了時刻情報を入力する。
【0013】
判定手段は、前記振り分け情報格納手段に格納されている振り分け先ノード情報に基づいて、前記複数のノードの中にデータ処理が振り分けられていないノードが存在するかを判定する。
【0014】
抽出手段は、前記データ処理が振り分けられていないノードが存在しないと判定された場合、前記ノード毎に、当該ノードを示す振り分け先ノード情報に対応づけて前記振り分け情報格納手段に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻を抽出する。
【0015】
特定手段は、前記ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定する。
【0016】
振り分け処理手段は、前記入力されたデータおよび目標終了時刻情報を前記特定されたノードに送信し、当該入力されたデータに対するデータ処理を前記特定されたノードに振り分ける。
【0017】
格納処理手段は、前記入力されたデータに対するデータ処理を識別するためのデータ処理識別情報、前記入力された目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて前記振り分け情報格納手段に格納する。
【0018】
実施形態に係る分散データ処理システムを構成する前記入力されたデータに対するデータ処理が振り分けられたノードは、前記送信された目標終了時刻情報によって示される目標終了時刻に基づいて、前記送信されたデータに対するデータ処理を実行する。
【図面の簡単な説明】
【0019】
【図1】実施形態に係る分散データ処理システムの主として機能構成を示すブロック図。
【図2】図1に示す中央管理装置10に含まれる振り分け情報格納部12のデータ構造の一例を示す図。
【図3】図1に示す中央管理装置10に含まれる処理結果格納部16のデータ構造の一例を示す図。
【図4】図1に示すノード20に含まれる実行情報格納部26のデータ構造の一例を示す図。
【図5】データ処理を実行している複数のノード20の状態を説明するための図。
【図6】図5に示す状態においてノード1に対してデータ処理4が新たに振り分けられた場合の分散データ処理システムの動作について説明するための図。
【図7】本実施形態に係る分散データ処理システムにおいて実行される第1の処理の処理手順を示すフローチャート。
【図8】分散データ処理システムにおいて第1の処理が実行される前の中央管理装置10に含まれる振り分け情報格納部12のデータ構造の一例を示す図。
【図9】分散データ処理システムにおいて第1の処理が実行された後の振り分け情報格納部12のデータ構造の一例を示す図。
【図10】本実施形態に係る分散データ処理システムにおいて実行される第2の処理の処理手順を示すフローチャート。
【図11】本実施形態に係る分散データ処理システムにおいて実行される第2の処理の処理手順を示すフローチャート。
【図12】本実施形態に係る分散データ処理システムにおいて実行される第2の処理の処理手順を示すフローチャート。
【図13】本実施形態に係る分散データ処理システムにおいて実行される第2の処理の処理手順を示すフローチャート。
【図14】分散データ処理システムにおいて第2の処理が実行される前の対象ノード20に含まれる実行情報格納部26のデータ構造の一例を示す図。
【図15】第2の処理において更新された実行情報格納部26のデータ構造の一例を示す図。
【図16】本実施形態に係る分散データ処理システムにおいて実行される第3の処理の処理手順を示すフローチャート。
【図17】本実施形態に係る分散データ処理システムにおいて実行される第4の処理の処理手順を示すフローチャート。
【図18】本実施形態に係る分散データ処理システムにおいて実行される第5の処理の処理手順を示すフローチャート。
【図19】分散データ処理システムにおいて第5の処理が実行される際の中央管理装置10に含まれる処理結果格納部16のデータ構造の一例を示す図。
【図20】第5の処理においてデータ処理IDおよびデータ処理結果が格納された後の処理結果格納部16のデータ構造を示す図。
【図21】対象データ処理および親データ処理の処理状況が更新された後の振り分け情報格納部12のデータ構造の一例を示す図。
【発明を実施するための形態】
【0020】
以下、図面を参照して、実施形態について説明する。
【0021】
まず、図1を参照して、本実施形態に係る分散データ処理システムについて説明する。本実施形態に係る分散データ処理システムは、例えばクラウドコンピューティングのような計算機環境においてデータ処理を分散させる際に適用されるものであり、複数の計算機資源で並列にデータ処理を実行させる場合を想定している。なお、データ処理は、例えば対応するプログラムが実行されることによって処理される処理単位である。
【0022】
図1は、本実施形態に係る分散データ処理システムの主として機能構成を示すブロック図である。分散データ処理システムは、中央管理装置10および複数のノード20から構成される。中央管理装置10は、例えばネットワークを介して複数のノード20と接続されている。なお、図1においては、便宜的に、1つのノード20のみが示されている。
【0023】
中央管理装置10は、データ処理を各ノード20に振り分ける(割り振る)機能を有する。中央管理装置10は、ヘッダ情報付与部11、振り分け情報格納部12、振り分け判定部13、振り分け処理部14、中央結果処理部15および処理結果格納部16を含む。
【0024】
ヘッダ情報付与部11は、ノード20に振り分けられていないデータ処理(振り分けの対象となるデータ処理)の対象となるデータ(当該データ処理によって処理されるデータ)を入力する。
【0025】
ヘッダ情報付与部11は、入力されたデータに対するデータ処理(当該データに対して実行されるデータ処理)の目標終了時刻を示す目標終了時刻情報および当該データ処理(当該データの処理作業)の優先レベル(優先度)を示す優先レベル情報を入力する。
【0026】
ヘッダ情報付与部11は、入力されたデータに対して、当該データに対するデータ処理を識別するためのデータ処理ID(データ処理識別情報)、入力された目標終了時刻情報および優先レベル情報から構成されたヘッダ情報を付与する。なお、ヘッダ情報を構成するデータ処理IDは、データが入力された際にヘッダ情報付与部11によって発行される。
【0027】
振り分け情報格納部12には、データ処理の振り分けに関する情報が格納される。振り分け情報格納部12には、例えばデータ処理ID、優先レベル情報、目標終了時刻情報および振り分け先ノード情報等が対応づけて格納される。
【0028】
データ処理IDは、ノード20に振り分けられたデータ処理を識別するための識別子である。優先レベル情報は、データ処理IDによって識別されるデータ処理の優先レベルを示す。目標終了時刻情報は、データ処理IDによって識別されるデータ処理の目標終了時刻を示す。振り分け先ノード情報は、データ処理IDによって識別されるデータ処理が振り分けられたノード20(つまり、当該データ処理の振り分け先となるノード20)を示す。なお、振り分け情報格納部12のデータ構造の詳細については後述する。
【0029】
振り分け判定部13は、振り分け情報格納部12を参照して、ヘッダ情報付与部11によってヘッダ情報が付与されたデータ(ヘッダ情報付与部11によって入力されたデータ)に対するデータ処理が振り分けられるノード20を特定(振り分け判定)する。
【0030】
また、振り分け判定部13は、後述するようにノード20からデータが返却された場合には、振り分け情報格納部12を参照して、当該データに対するデータ処理が振り分けられるノード20を特定する。
【0031】
振り分け処理部14は、振り分け判定部13によって特定されたノード20に対してデータ処理を振り分ける。この場合、振り分け処理部14は、ヘッダ情報付与部11によってヘッダ情報が付与されたデータを、振り分け判定部13によって特定されたノード20に対して送信する。
【0032】
中央結果処理部15は、各ノード20において実行されたデータ処理の結果を受信する(受け取る)。中央結果処理部15は、受信されたデータ処理の結果に基づいて、振り分け情報格納部12(の内容)を更新する。中央結果処理部15は、受信されたデータ処理の結果を処理結果格納部16に格納する。また、中央結果処理部15は、振り分け情報格納部12および処理結果格納部16を参照して、データ処理の最終結果を例えば中央管理装置10の外部に出力する。
【0033】
処理結果格納部16には、データ処理の結果が格納される。処理結果格納部16には、例えばデータ処理を識別するためのデータ処理IDおよび当該データ処理の結果が対応づけて格納される。
【0034】
ノード20は、中央管理装置10によって当該ノード20に対して振り分けられたデータ処理を実行する機能を有する。上記したように、分散データ処理システムにおいては、複数のノード20が存在する。これらの各ノード20は、ノード管理部21、ノードデータ格納部22、データ処理実行部23、データ処理推定部24、データ処理分割部25および実行情報格納部26を含む。なお、これらの各部21〜26の説明において、当該各部21〜26を含むノード20を、便宜的に、自ノード20と称する。
【0035】
ノード管理部21は、中央管理装置10(に含まれる振り分け処理部14)によって送信されたデータを受信する。ノード管理部21は、受信されたデータに対するデータ処理(自ノード20に振り分けられたデータ処理)の実行を制御および管理する。具体的には、ノード管理部21は、受信されたデータに付与されているヘッダ情報を構成する目標終了時刻情報によって示される目標終了時刻を満たすようにした上で、当該ヘッダ情報を構成する優先レベル情報によって示される優先レベルの高い順にデータ処理が実行されるように制御および管理する。
【0036】
なお、ノード管理部21は、データ処理実行部23、データ処理推定部24およびデータ処理分割部25に対して各種指示を行うことができる。
【0037】
ノードデータ格納部22には、ノード管理部21によって受信されたデータに対するデータ処理(つまり、自ノード20に振り分けられたデータ処理)を識別するデータ処理ID(当該データに付与されているヘッダ情報を構成するデータ処理ID)および当該データが対応づけて格納される。
【0038】
データ処理実行部23は、ノードデータ格納部22を参照して、中央管理装置10によって自ノード20に振り分けられたデータ処理を実行する。換言すれば、データ処理実行部23は、ノードデータ格納部22に格納されたデータに対するデータ処理を実行する。
【0039】
また、データ処理実行部23は、ノード管理部21からの指示に応じて、自ノード20に振り分けられたデータ処理の一部を実行する。
【0040】
データ処理推定部24は、データ処理実行部23によってデータ処理の一部が実行された結果に基づいて、当該データ処理における単位処理量(例えば、2KByte/分等)を算出する。データ処理推定部24は、算出されたデータ処理における単位処理量を用いて、当該データ処理の推定終了時刻を算出する。なお、データ処理推定部24によって算出されたデータ処理における単位処理量および当該データ処理の推定終了時刻は、ノード管理部21に渡される。
【0041】
データ処理分割部25は、自ノード20に振り分けられたデータ処理の対象となるデータ(当該データ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納されているデータ)に付与されているヘッダ情報を構成する目標終了時刻情報によって示される目標終了時刻より当該データ処理の推定終了時刻が遅い(つまり、当該データ処理を当該目標終了時刻以前に完了することができない)場合に、当該データを分割する。この場合、データ処理分割部25は、自ノード20に振り分けられたデータ処理の対象となるデータを、目標終了時刻までに処理できるサイズのデータと当該目標終了時刻までに処理できないデータ(当該目標終了時刻から溢れるデータ)とに分割する。なお、データ処理分割部25によって分割された2つのデータのうち、目標終了時刻までに処理できるサイズのデータ(つまり、分割後の前方のデータ)をfrontデータと称し、目標終了時刻までに処理できないサイズのデータ(つまり、分割後の後方のデータ)をrearデータと称する。なお、データ処理分割部25は、ノード管理部21からの指示(分割指示)に応じて、分割処理を実行する。
【0042】
データ処理分割部25によって分割された2つのデータのうちのfrontデータは、当該frontデータに対するデータ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納される。なお、frontデータに対するデータ処理を識別するためのデータ処理IDには、当該frontデータに分割される前のデータ(親データ)に対するデータ処理を識別するためのデータ処理IDおよび当該frontデータであることを示す情報が含まれる。
【0043】
一方、データ処理分割部25によって分割された2つのデータのうちのrearデータは、データ処理ID、優先レベル情報および目標終了時刻情報から構成されるヘッダ情報が付与されて、中央管理装置10に返却される。この場合、rearデータに付与されるヘッダ情報を構成するデータ処理IDには、当該rearデータに分割される前のデータ(親データ)に対するデータ処理を識別するためのデータ処理IDおよび当該rearデータであることを示す情報が含まれる。
【0044】
実行情報格納部26には、自ノード20で実行されるデータ処理に関する情報(実行情報)が格納される。実行情報格納部26には、データ処理に関する実行情報として、処理順、データ処理ID、優先レベル情報、目標終了時刻情報、推定終了時刻情報および単位処理量等が対応づけて格納される。
【0045】
実行情報格納部26に格納されるデータ処理ID、優先レベル情報および目標終了時刻情報は、自ノード20に振り分けられたデータ処理の対象となるデータ(つまり、当該データ処理を識別するデータ処理IDに対応づけてノードデータ格納部22に格納されているアドレスに格納されているデータ)に付与されているヘッダ情報を構成するデータ処理ID、優先レベル情報および目標終了時刻情報である。
【0046】
処理順は、データ処理IDによって識別されるデータ処理が実行される順番を示す。なお、処理順は、優先レベル情報によって示される優先レベルに基づいてノード管理部21によって決定(変更)される。推定終了時刻情報は、データ処理IDによって識別されるデータ処理の推定終了時刻を示す。単位処理量は、データ処理IDによって識別されるデータ処理における単位処理量を示す。なお、データ処理の推定終了時刻および単位処理量は、上記したようにデータ処理推定部24によって算出される。なお、実行情報格納部26のデータ構造の詳細については後述する。
【0047】
図2は、図1に示す中央管理装置10に含まれる振り分け情報格納部12のデータ構造の一例を示す。図2に示すように、振り分け情報格納部12には、ノード20に振り分けられたデータ処理毎に、データ処理ID、優先レベル情報、目標終了時刻情報、振り分け時刻情報、振り分け先ノード情報、新規/返却状況情報および処理状況情報が対応づけて格納される。
【0048】
データ処理IDは、データ処理を識別するための識別子である。データ処理IDにおいては、例えば識別名の後に「_」で連結して使用プログラムの識別名が示される。
【0049】
優先レベル情報は、対応づけられているデータ処理IDによって識別されるデータ処理の優先レベル(優先度)を示す。なお、優先レベル情報によって示される優先レベルは例えば1〜5までの5段階であり、最高優先レベルは5であるものとする。
【0050】
目標終了時刻情報は、対応づけられているデータ処理IDによって識別されるデータ処理の目標終了時刻を示す。振り分け時刻情報は、対応づけられているデータ処理IDによって識別されるデータ処理がノード20に振り分けられた時刻(振り分け時刻)を示す。振り分け先ノード情報は、対応づけられているデータ処理IDによって識別されるデータ処理が振り分けられたノード20を示す。
【0051】
新規/返却状況情報は、対応づけられているデータ処理IDによって識別されるデータ処理の対象となるデータが外部から入力されたものであるか、または、ノード20から溢れて返却されたものであるかを示す。新規/返却状況情報には、データ処理の対象となるデータが外部から入力されたものであることを示す「新規」およびノード20から溢れて返却されたものであることを示す「返却」が含まれる。
【0052】
処理状況情報は、対応づけられているデータ処理IDによって識別されるデータ処理が完了したか否かを示す。処理状況情報には、データ処理IDによって識別されるデータ処理が完了したことを示す「完了」および当該データ処理が完了していないことを示す「未」が含まれる。
【0053】
図2に示す例では、振り分け情報格納部12には、データ処理ID「data0_hokenryoukeisan」、優先レベル情報「3」、目標終了時刻情報「2010/1/21 15:00」、振り分け時刻情報「2010/1/21 10:00」、振り分け先ノード「ノード2」、新規/返却状況情報「新規」および処理状況情報「完了」が対応づけて格納されている。これによれば、データ処理ID「data0_hokenryoukeisan」によって識別されるデータ処理の優先レベルが「3」であり、当該データ処理の目標終了時刻が「2010/1/21 15:00」であり、当該データ処理がノード2に振り分けられた時刻が「2010/1/21 10:00」であることが示されている。また、データ処理ID「data0_hokenryoukeisan」によって識別されるデータ処理の対象となるデータが外部から入力されたものであり、当該データ処理は完了していることが示されている。
【0054】
また、振り分け情報格納部12には、データ処理ID「data1_pai」、優先レベル情報「3」、目標終了時刻情報「2010/1/21 15:00」、振り分け時刻情報「2010/1/21 10:00」、振り分け先ノード「ノード1」、新規/返却状況情報「新規」および処理状況情報「未」が対応づけて格納されている。これによれば、データ処理ID「data1_pai」によって識別されるデータ処理の優先レベルが「3」であり、当該データ処理の目標終了時刻が「2010/1/21 15:00」であり、当該データ処理がノード1に振り分けられた時刻が「2010/1/21 10:00」であることが示されている。また、データ処理ID「data1_pai」によって識別されるデータ処理の対象となるデータが外部から入力されたものであり、当該データ処理は完了していないことが示されている。
【0055】
また、振り分け情報格納部12には、データ処理ID「data2_tax」、優先レベル情報「5」、目標終了時刻情報「2010/1/21 5:00」、振り分け時刻情報「2010/1/21 3:00」、振り分け先ノード「ノード2」、新規/返却状況情報「新規」および処理状況情報「未」が対応づけて格納されている。これによれば、データ処理ID「data2_tax」によって識別されるデータ処理の優先レベルが「5」であり、当該データ処理の目標終了時刻が「2010/1/21 5:00」であり、当該データ処理がノード2に振り分けられた時刻が「2010/1/21 3:00」であることが示されている。また、データ処理ID「data2_tax」によって識別されるデータ処理の対象となるデータが外部から入力されたものであり、当該データ処理は完了していないことが示されている。
【0056】
また、振り分け情報格納部12には、データ処理ID「data1_pai/rear」、優先レベル情報「3」、目標終了時刻情報「2010/1/21 15:00」、振り分け時刻情報「2010/1/21 13:00」、振り分け先ノード「ノード3」、新規/返却状況情報「返却」および処理状況情報「未」が対応づけて格納されている。これによれば、データ処理ID「data1_pai/rear」によって識別されるデータ処理の優先レベルが「3」であり、当該データ処理の目標終了時刻が「2010/1/21 15:00」であり、当該データ処理がノード3に振り分けられた時刻が「2010/1/21 13:00」であることが示されている。また、データ処理ID「data1_pai/rear」によって識別されるデータ処理の対象となるデータがノード20から返却されたものであり、当該データ処理は完了していないことが示されている。
【0057】
ここで、データ処理ID「data1_pai/rear」のようにデータ処理IDの後方に「/rear」が含まれている場合、当該データ処理IDによって識別されるデータ処理の対象となるデータは、上述したようにノード20に含まれるデータ処理分割部25によって分割された後方のデータ(つまり、rearデータ)である。つまり、データ処理ID「data1_pai/rear」によって識別されるデータ処理は、データ処理ID「data1_pai」によって識別されるデータ処理の対象となるデータが分割された後方のデータに対するデータ処理である。一方、振り分け情報格納部12には格納されないが、データ処理IDの後方に「/front」が含まれている場合、当該データデータ処理IDによって識別されるデータ処理の対象となるデータは、上述したノード20に含まれるデータ処理分割部25によって分割された前方のデータ(つまり、frontデータ)である。
【0058】
なお、例えばデータ処理ID「data1_pai/rear」によって識別されるデータ処理の対象となるデータが更に分割された場合、当該分割された後方のデータに対するデータ処理を識別するためのデータ処理IDは、データ処理ID「data1_pai/rear/rear」となる。一方、分割された前方のデータに対するデータ処理を識別するためのデータ処理IDは、データ処理ID「data1_pai/rear/front」となる。更にデータが分割された場合についても同様である。
【0059】
また、振り分け情報格納部12には、データ処理ID「data3_housedesign」、優先レベル情報「4」、目標終了時刻情報「2010/1/21 20:00」、振り分け時刻情報「2010/1/21 13:30」、振り分け先ノード「ノード4」、新規/返却状況情報「新規」および処理状況情報「未」が対応づけて格納されている。これによれば、データ処理ID「data3_housedesign」によって識別されるデータ処理の優先レベルが「4」であり、当該データ処理の目標終了時刻が「2010/1/21 20:00」であり、当該データ処理がノード4に振り分けられた時刻が「2010/1/21 13:30」であることが示されている。また、データ処理ID「data3_housedesign」によって識別されるデータ処理の対象となるデータが外部から入力されたものであり、当該データ処理は完了していないことが示されている。
【0060】
図3は、図1に示す中央管理装置10に含まれる処理結果格納部16のデータ構造の一例を示す。図3に示すように、処理結果格納部16には、データ処理IDおよびデータが対応づけて格納される。
【0061】
データ処理IDは、データ処理を識別するための識別子である。データ処理結果は、対応づけられているデータ処理IDによって識別されるデータ処理の結果である。
【0062】
図3に示す例では、処理結果格納部16には、データ処理ID「data0_hokenryoukeisan」およびデータ処理結果「データ処理結果0」が対応づけて格納されている。これによれば、データ処理ID「data0_hokenryoukeisan」によって識別されるデータ処理の結果が「データ処理結果0」であることが示される。
【0063】
ここでは、図3が処理結果格納部16のデータ構造の一例を示すものとして説明したが、ノード20に含まれるノードデータ格納部22についても処理結果格納部16と同様のデータ構造を有する。図示しないが、ノードデータ格納部22においては、図3に示す処理結果格納部16に格納されているデータ処理結果に代えて、自ノード20(当該ノードデータ格納部22を含むノード20)に振り分けられたデータ処理の対象となるデータが格納される。つまり、ノードデータ格納部22には、自ノード20に振り分けられたデータ処理を識別するためのデータ処理IDおよび当該データ処理の対象となるデータが対応づけて格納される。
【0064】
図4は、図1に示すノード20に含まれる実行情報格納部26のデータ構造の一例を示す。実行情報格納部26には、当該実行情報格納部26を含むノード20(自ノード20)に振り分けられたデータ処理に関する実行情報が格納される。図4に示すように、実行情報格納部26には、データ処理毎に、処理順、データ処理ID、優先レベル情報、目標終了時刻情報、開始時刻情報、推定終了時刻情報、単位処理量および処理済情報が対応づけて格納される。
【0065】
処理順は、自ノード20に振り分けられたデータ処理(対応づけられているデータ処理IDによって識別されるデータ処理)が実行される順番を示す。データ処理IDは、自ノード20に振り分けられたデータ処理を識別するための識別子である。優先レベル情報は、対応づけられているデータ処理IDによって識別されるデータ処理の優先レベルを示す。目標終了時刻情報は、対応づけられているデータ処理IDによって識別されるデータ処理の目標終了時刻を示す。なお、データ処理ID、優先レベル情報および目標終了時刻情報については、上述した図2において説明したデータ処理ID、優先レベル情報及び目標終了時刻情報と同様である。
【0066】
開始時刻情報は、対応づけられているデータ処理IDによって識別されるデータ処理が実際に開始された時刻(開始時刻)を示す。推定終了時刻情報は、対応づけられているデータ処理IDによって識別されるデータ処理の推定終了時刻(つまり、当該データ処理が終了すると推定される時刻)を示す。単位処理量は、対応づけられているデータ処理IDによって識別されるデータ処理における単位処理量を示す。具体的には、単位処理量は、単位時間あたり(例えば、分単位)に処理されるデータサイズ(処理サイズ)を示す。
【0067】
また、処理済情報は、対応づけられているデータ処理IDによって識別されるデータ処理の進捗状況(当該データ処理の対象となるデータに対して既に処理が完了した部分)を示す。具体的には、処理済情報は、対応づけられているデータ処理IDによって識別されるデータ処理の対象となるデータのデータ量(例えば、ファイル数)およびサイズに対する当該処理が完了したデータ量およびサイズを示す。換言すれば、処理済情報には、データ処理の対象となるデータのデータ量およびサイズと、当該処理が完了したデータ量およびサイズが含まれる。
【0068】
図4に示す例では、実行情報格納部26には、処理順「2」、データ処理ID「data1_pai/front」、優先レベル情報「3」、目標終了時刻情報「2010/1/21 15:00」、推定終了時刻情報「2010/1/21 15:00」、単位処理量「2Kbyte/分」および処理済情報「60/180file:56/168MByte」が対応づけて格納されている。これによれば、データ処理ID「data1_pai/front」によって識別されるデータ処理の処理順が「2」であり、当該データ処理の優先レベルが「3」であり、当該データ処理の目標終了時刻が「2010/1/21 15:00」であり、当該データ処理の推定終了時刻が「2010/1/21 15:00」であることが示されている。また、データ処理ID「data1_pai/front」によって識別されるデータ処理における1分あたりの処理量が2Kbyteであることが示されている。更に、データ処理ID「data1_pai/front」によって識別されるデータ処理の対象となるデータのデータ量「180file」のうちの「60file」の処理が完了しており、また、当該データのサイズ「168MByte」のうちの「56MByte」の処理が完了していることが示されている。
【0069】
なお、処理順が「2」であるため、データ処理ID「data1_pai/front」によって識別されるデータ処理は実行されていない(例えば、停止中である)。このため、データ処理ID「data1_pai/front」に対応づけられている開始時刻情報は空欄となっている。
【0070】
また、データ処理ID「data1_pai/front」には「/front」が含まれているため、当該データ処理ID「data1_pai/front」によって識別されるデータ処理は、データ処理ID「data1_pai」によって識別されるデータ処理の対象となるデータが分割された前方のデータに対するデータ処理である。
【0071】
実行情報格納部26には、処理順「1」、データ処理ID「data2_tax」、優先レベル情報「5」、目標終了時刻情報「2010/1/21 5:00」、開始時刻情報「2010/1/21 0:00」、推定終了時刻情報「2010/1/21 5:00」、単位処理量「4Kbyte/分」および処理済情報「20/40file:30/60MByte」が対応づけて格納されている。これによれば、データ処理ID「data2_tax」によって識別されるデータ処理の処理順が「1」であり、当該データ処理の優先レベルが「5」であり、当該データ処理の目標終了時刻が「2010/1/21 5:00」であり、当該データ処理の開始時刻が「2010/1/21 0:00」であり、当該データ処理の推定終了時刻が「2010/1/21 5:00」であることが示されている。また、データ処理ID「data2_tax」によって識別されるデータ処理における1分あたりの処理量が4Kbyteであることが示されている。更に、データ処理ID「data2_tax」によって識別されるデータ処理の対象となるデータのデータ量「40file」のうちの「20file」の処理が完了しており、また、当該データのサイズ「60MByte」のうちの「30MByte」の処理が完了していることが示されている。
【0072】
なお、データ処理ID「data1_pai/front」およびデータ処理ID「data2_tax」に対応づけられている処理順は、当該データ処理ID「data1_pai/front」およびデータ処理ID「data2_tax」に対応づけられている優先レベル情報によって示される優先レベルに基づいて決定される。具体的には、データ処理ID「data1_pai/front」に対応づけられている優先レベル情報によって示される優先レベルは「3」であり、データ処理ID「data2_tax」に対応づけられている優先レベル情報によって示される優先レベルは「5」であり、当該データ処理ID「data1_pai/front」によって識別されるデータ処理の優先レベルの方が高い。このため、データ処理ID「data1_pai/front」に対応づけられている処理順が「2」となり、データ処理ID「data2_tax」に対応づけられている処理順が「1」となる。
【0073】
以下、本実施形態に係る分散データ処理システムの動作について説明する。まず、分散データ処理システムを構成する中央管理装置10および複数のノード20の動作の概要について簡単に説明する。
【0074】
分散データ処理システムを構成する中央管理装置10は、例えば外部からデータ処理の対象となるデータ、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理の優先レベルを示す優先レベル情報を入力し、当該データ処理をノード20に振り分ける。この場合、中央管理装置10は、各ノード20の例えばリソース性能等を確認することなくデータ処理をノードに対して振り分ける。また、中央管理装置10は、ノードから返されたデータ処理の結果を外部に対して出力する。
【0075】
一方、中央管理装置20によってデータ処理が振り分けられたノード20は、当該データ処理の優先レベルを考慮した上で、目標終了時刻を満たすようにデータ処理を実行する。この場合、ノード20は、目標終了時刻までにデータ処理を完了できない場合であって当該データ処理の対象となるデータを分割できる場合には、当該データを分割する(当該データ処理を分割する)。ノード20は、目標終了時刻までに処理を完了できないデータ(データ処理の対象となるデータ)を中央管理装置に返却する。
【0076】
なお、ノード20からデータが返却された場合、当該データに対するデータ処理は、中央管理装置10によって再び他のノード20に振り分けられる。
【0077】
ここで、図5および図6を参照して、本実施形態に係る分散データ処理システムの動作の概要について具体的に説明する。ここでは、例えばデータ処理を実行しているノード20に対して、当該データ処理よりも優先レベルが高いデータ処理が中央管理装置10によって振り分けられた場合の動作について説明する。
【0078】
図5は、データ処理を実行している複数のノード20の状態を説明するための図である。ここでは、分散データ処理システムを構成する複数のノード20として、ノード1〜3が含まれるものとする。また、図5に示すように、ノード1ではデータ処理1、ノード2ではデータ処理2、ノード3データはデータ処理3が実行されているものとする。なお、データ処理1の優先レベルは「3」であるものとする。また、データ処理2およびデータ処理3の優先レベルは「4」であるものとする。なお、ノード1〜3においては、データ処理1〜3をそれぞれの目標終了時刻以前に完了(終了)できる状態であるものとする。
【0079】
ここで、図6を参照して、ノード1〜3が図5に示すような状態である場合において、ノード1に対してデータ処理4が新たに振り分けられた場合の分散データ処理システムの動作について説明する。なお、ノード1に振り分けられたデータ処理4の優先レベルは「5」であるものとする。
【0080】
まず、ノード1は、データ処理4の対象となるデータを受信する(ステップS1)。ここで、ノード1に対して振り分けられたデータ処理4の優先レベルは、当該ノード1において実行されているデータ処理1の優先レベルよりも高い。この場合、ノード1においては、データ処理1よりもデータ処理4が優先的に実行される。つまり、ノード1においては、データ処理1の実行が停止され、データ処理4が実行される。これにより、データ処理4については、目標終了時刻以前に完了できる。
【0081】
ここで、ノード1においてはデータ処理4が優先的に実行されるため、ノード1では、データ処理1を当該データ処理4の後ろにずらさなければならない。つまり、データ処理1は、データ処理4の終了後に実行されるように制御される。しかしながら、データ処理1がデータ処理4の終了後に実行された場合には、図6に示すように、当該データ処理1を当該データ処理1の目標終了時刻までに完了することができなくなる。
【0082】
この場合、ノード1において、データ処理1の対象となるデータは、当該データ処理1の目標終了時刻までに処理できる部分51(以下、データ処理1のfrontデータ51と表記)と当該目標終了時刻までに処理できない(つまり、当該目標終了時刻から溢れる)部分52(以下、データ処理1のrearデータ52と表記)とに分割される(ステップS2)。このようにデータ処理1の対象となるデータが分割されると、当該データ処理1のrearデータ52は、中央管理装置10に返却される。なお、データ処理1のfrontデータ51に対するデータ処理は、ノード1においてデータ処理4の終了後に実行される。
【0083】
中央管理装置10は、ノード1から返却されたデータ処理1のrearデータ52に対するデータ処理の再振り分けを行う。なお、データ処理は、例えば既に実行されているデータ処理の終了時刻(推定終了時刻)が目標終了時刻より前であるノードに振り分けられる。ここでは、データ処理1のrearデータ52に対するデータ処理は、ノード2に振り分けられたものとする。
【0084】
この場合、ノード2は、データ処理1のrearデータ52を受信する(ステップS3)。ここで、データ処理1の優先レベルは、ノード2で実行されているデータ処理2の優先レベルよりも低い。この場合、ノード2においては、データ処理1よりもデータ処理2が優先的に実行される。つまり、ノード2においては、処理順は変更されず、データ処理2の終了後にデータ処理1のrearデータ52に対するデータ処理が実行される。しかしながら、データ処理1のrearデータ52に対するデータ処理がデータ処理2の終了後に実行された場合には、図6に示すように、データ処理1のrearデータに対するデータ処理を当該データ処理1の目標終了時刻までに完了することはできない。
【0085】
この場合、ノード2において、データ処理1のrearデータ52は、当該データ処理1の目標終了時刻までに処理できる部分521(以下、データ処理1のrearデータのfrontデータ521と表記)と当該目標終了時刻までに処理できない部分522(以下、データ処理1のrearデータのrearデータ522と表記)とに分割(再分割)される(ステップS4)。このように、データ処理1のrearデータ52が分割されると、当該データ処理1のrearデータのrearデータ522は、中央管理装置10に返却される。なお、データ処理1のrearデータのfrontデータ521に対するデータ処理は、データ処理2の終了後に実行される。
【0086】
中央管理装置10は、ノード2から返却されたデータ処理1のrearデータのrearデータ522に対するデータ処理の再振り分けを行う。ここでは、データ処理1のrearデータのrearデータ522に対するデータ処理は、ノード3に振り分けられたものとする。
【0087】
この場合、ノード3は、データ処理1のrearデータのrearデータ522を受信する(ステップS5)。ここで、データ処理1の優先レベルは、ノード3において実行されているデータ処理3の優先レベルよりも低い。この場合、ノード3においては、データ処理1よりもデータ処理3が優先的に実行される。つまり、ノード3においては、処理順は変更されず、データ処理3の終了後にデータ処理1のrearデータのrearデータ522に対するデータ処理が実行される。
【0088】
ここで、データ処理1のrearデータのrearデータ522に対するデータ処理は、データ処理1の目標終了時刻までに完了することができる。したがって、ノード3において、データ処理1のrearデータのrearデータ522は分割されず、上記したようにデータ処理3の終了後に当該データ処理1のrearデータのrearデータ522に対するデータ処理が実行される。これによって、データ処理1は、ノード1〜3の各々によって分散して実行され、完了することができる。
【0089】
次に、本実施形態に係る分散データ処理システムにおいて実行される処理について詳細に説明する。本実施形態に係る分散データ処理システムにおいては、第1〜第5の処理が実行される。なお、第1〜第5の処理は、例えば独立して実行される。
【0090】
第1の処理は、外部から入力されたデータに対するデータ処理を振り分ける際の処理である。第2の処理は、中央管理装置10によって振り分けられたデータ処理を実行する際の処理である。第3の処理は、ノード20から溢れて返却されたデータに対するデータ処理を振り分ける際の処理である。第4の処理は、データ処理の結果を中央管理装置10に送信する際の処理である。第5の処理は、ノード20によって実行されたデータ処理の結果を出力する際の処理である。
【0091】
なお、第1の処理、第3の処理および第5の処理は、中央管理装置10が実行する処理である。また、第2の処理および第4の処理は、データ処理を振り分けられた各ノード20が実行する処理である。
【0092】
以下、本実施形態に係る分散データ処理システムにおいて実行される第1〜第5の処理の各々について説明する。
【0093】
まず、図7のフローチャートを参照して、本実施形態に係る分散データ処理システムにおいて実行される第1の処理の処理手順について説明する。第1の処理は、外部から入力されたデータに対するデータ処理を振り分ける際の処理である。この第1の処理は、中央管理装置10によって実行される。
【0094】
中央管理装置10に含まれるヘッダ情報付与部11は、ノード20に振り分けられていないデータ処理の対象となるデータを入力する(ステップS11)。以下、第1の処理において、ヘッダ情報付与部11によって入力されたデータに対するデータ処理(つまり、振り分けの対象となるデータ処理)を振り分け対象データ処理と称する。
【0095】
また、ヘッダ情報付与部11は、振り分け対象データ処理の目標終了時刻を示す目標終了時刻情報および当該振り分け対象データ処理の優先レベルを示す優先レベル情報を入力する(ステップS12)。なお、ヘッダ情報付与部11によって入力された目標終了時刻情報によって示される目標終了時刻情報および優先レベル情報によって示される優先レベルは、例えば分散データ処理システムを利用するユーザによって指定される。
【0096】
次に、ヘッダ情報付与部11は、振り分け対象データ処理を識別するためのデータ処理ID、入力された目標終了時刻情報および優先レベル情報から構成されるヘッダ情報を、入力されたデータに対して付与する(ステップS13)。なお、ヘッダ情報付与部11によってデータに付与されたヘッダ情報を構成するデータ処理IDは、当該データが入力された際に、振り分け対象データ処理に対して当該ヘッダ情報付与部11によって自動的に発行される。
【0097】
振り分け判定部13は、振り分け情報格納部12に格納されている振り分け先ノード情報に基づいて、データ処理を実行していないノード20があるか否かを判定する(ステップS14)。具体的には、振り分け判定部13は、複数のノード20の中に、データ処理を実行しているノード20以外のノード20が存在するか否かを判定する。なお、データ処理を実行しているノード20とは、処理状況情報「未」に対応づけて振り分け情報格納部12に格納されている振り分け先ノード情報によって示されるノード20である。
【0098】
データ処理を実行していないノード20がないと判定された場合(ステップS14のNO)、振り分け判定部13は、振り分け情報格納部12に格納されている目標終了時刻情報および振り分け先ノード情報に基づいて、ノード20毎に最も遅い目標終了時刻を抽出する(ステップS15)。この場合、振り分け判定部13は、ノード20毎に、当該ノード20を示す振り分け先ノード情報に対応づけて振り分け情報格納部12に格納されている目標終了時刻情報を取得し、当該取得された目標終了時刻情報によって示される目標終了時刻の中から最も遅い目標終了時刻を抽出する。つまり、振り分け判定部13は、ノード20毎に1つの目標終了時刻を抽出する。
【0099】
次に、振り分け判定部13は、ノード20毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノード20(最も早い目標終了時刻に関するノード)を特定する(ステップS16)。
【0100】
振り分け判定部13は、特定されたノード20が複数存在するか否かを判定する(ステップS17)。
【0101】
ここで、振り分け判定部13によって特定されたノード20が複数存在すると判定された場合(ステップS17のYES)、振り分け判定部13は、当該特定された複数のノード(以下、候補ノードと表記)20の中から、目標終了時刻内にデータ処理を完了することができずに当該目標終了時刻から溢れたデータを返却したノード(つまり、返却元のノード)20を特定する。振り分け判定部13は、候補ノード20のうち、溢れたデータを返却したノード20以外のノード20を、振り分け対象データ処理が振り分けられるノード(振り分け先ノード)として決定(特定)する(ステップS18)。
【0102】
なお、溢れたデータを返却したノード20とは、振り分け情報格納部12において新規/返却状況情報「返却」に対応づけられているデータ処理IDに「/rear」が含まれていない場合には当該データ処理IDと同一のデータ処理IDおよび処理状況情報「未」に対応づけて振り分け情報格納部12に格納されている振り分け先ノード情報によって示されるノード20、または、振り分け情報格納部12において新規/返却状況情報「返却」に対応づけられているデータ処理IDに「/rear」が含まれている場合には当該「/rear」の前部分と同一のデータ処理IDおよび処理状況情報「未」に対応づけて振り分け情報格納部12に格納されている振り分け先ノード情報によって示されるノード20である。
【0103】
振り分け処理部14は、振り分け判定部13によって決定されたノード20(つまり、振り分け先ノード)に対して振り分け対象データ処理を振り分ける(ステップS19)。この場合、振り分け処理部14は、ヘッダ情報付与部11によってヘッダ情報が付与されたデータを、振り分け判定部13によって決定されたノード20に対して送信する。
【0104】
また、振り分け処理部14は、振り分け対象データ処理の振り分けに関する情報(振り分け情報)を振り分け情報格納部12に格納する(ステップS20)。
【0105】
具体的には、振り分け処理部14は、ヘッダ情報付与部11によってデータに付与されたヘッダ情報を構成するデータ処理ID(振り分け対象データ処理を識別するためのデータ処理ID)、目標終了時刻情報および優先レベル情報を対応づけて振り分け情報格納部12に格納する。更に、振り分け処理部14は、振り分け対象データ処理を識別するためのデータ処理IDに対応づけて、当該振り分け対象データ処理が振り分け判定部13によって決定されたノード20に振り分けられた時刻を示す振り分け時刻情報、当該ノード20(振り分け先ノード)を示す振り分け先ノード情報、当該振り分け対象データ処理の対象となるデータが外部から入力されたものであること(つまり、ヘッダ情報付与部11によって入力されたこと)を示す新規/返却状況情報「新規」および振り分け対象データ処理が完了していないことを示す処理状況情報「未」を振り分け情報格納部12に格納する。なお、ステップS20の処理が実行されると分散データ処理システムにおいて実行される第1の処理は終了される。
【0106】
一方、ステップS14においてデータ処理を実行していないノード20があると判定された場合、当該ノード20を振り分け先ノードとして、上記したステップS19およびS20の処理が実行される。
【0107】
また、ステップS17において、振り分け判定部13によって特定されたノード20が複数存在しないと判定された(つまり、1つのノード20が特定された)場合、当該振り分け判定部13によって特定されたノード20を振り分け先ノードとして、上記したステップS19およびS20の処理が実行される。
【0108】
ここで、図8および図9を参照して、分散データ処理システムにおいて実行される第1の処理の具体例について説明する。
【0109】
図8は、分散データ処理システムにおいて第1の処理が実行される前の中央管理装置10に含まれる振り分け情報格納部12のデータ構造の一例を示す。なお、振り分け情報格納部12に格納されている情報については、上述した図2において説明した通りであるため、その詳しい説明を省略する。
【0110】
また、ここでは分散データ処理システムを構成する複数のノード20としてノード1〜3の3つのノードが存在するものとする。
【0111】
まず、分散データ処理システムにおいて実行される第1の処理では、中央管理装置10に含まれるヘッダ情報付与部11は、振り分け対象データ処理の対象となるデータを入力する。また、ヘッダ情報付与部11は、振り分け対象データ処理の目標終了時刻を示す目標終了時刻情報および当該振り分け対象データ処理の優先レベルを示す優先レベル情報を入力する。ここでは、ヘッダ情報付与部11は、ユーザによって指定された目標終了時刻情報「2010/1/21 8:00」および優先レベル情報「4」を入力したものとする。なお、振り分け対象データ処理を識別するためのデータ処理IDとして「data2_housedesign」がヘッダ情報付与部11によって自動的に発行されたものとする。
【0112】
ヘッダ情報付与部11は、データ処理ID「data2_housedesign」、目標終了時刻「2010/1/21 8:00」および優先レベル情報「4」から構成されるヘッダ情報をデータに付与する。
【0113】
次に、振り分け判定部13は、ノード1〜3の中から振り分け対象データ処理が振り分けられるノードを決定する処理(データ処理ID「data2_housedesign」に関する振り分け判定)を行う。
【0114】
まず、振り分け判定部13は、振り分け情報格納部12に格納されている振り分け先ノード情報に基づいて、ノード1〜3の中にデータ処理を実行していないノードがあるか否かを判定する。この場合、振り分け判定部13は、振り分け情報格納部12において、処理状況情報「未」に対応づけられているノード1〜3の各々を示す振り分け先ノード情報が存在するか否かを判定する。
【0115】
図8に示す振り分け情報格納部12には、処理状況情報「未」に対応づけられているノード1〜3を示す振り分け先ノード情報が存在する。したがって、振り分け判定部13は、ノード1〜3の中にデータ処理を実行していないノードはないと判定する。
【0116】
次に、振り分け判定部13は、振り分け情報格納部12に格納されている目標終了時刻情報および振り分け先ノード情報に基づいて、ノード毎(ここでは、ノード1〜3)に最も遅い目標終了時刻を抽出する。
【0117】
図8に示す振り分け情報格納部12によれば、ノード1を示す振り分け先ノード情報(および処理済状況情報「未」)に対応づけて当該振り分け情報格納部12に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻は「2010/1/21 15:00」である。したがって、ノード1については、最も遅い目標終了時刻として「2010/1/21 15:00」が抽出される。
【0118】
また、ノード2を示す振り分け先ノード情報(および処理済状況情報「未」)に対応づけて振り分け情報格納部12に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻は「2010/1/21 17:00」である。したがって、ノード2については、最も遅い目標終了時刻として「2010/1/21 17:00」が抽出される。
【0119】
更に、ノード3を示す振り分け先ノード情報(および処理済状況情報「未」)に対応づけて振り分け情報格納部12に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻は「2010/1/21 15:00」である。したがって、ノード3については、最も遅い目標終了時刻として「2010/1/21 15:00」が抽出される。
【0120】
次に、振り分け判定部13は、ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定する。ここでは、振り分け判定部13は、ノード1およびノード3を特定する。
【0121】
ここで、振り分け判定部13によって特定されたノードはノード1およびノード3の2つである(つまり、振り分け判定部13によって特定されたノードが複数存在する)。このため、振り分け判定部13は、ノード1およびノード3(候補ノード)の中から、目標終了時刻から溢れたデータを返却したノードを特定する。
【0122】
この場合、図8に示す振り分け情報格納部12において新規/返却状況情報「返却」に対応づけられているデータ処理IDは、「data1_pai/rear」である。ここで、このデータ処理ID「data1_pai/rear」には「/rear」が含まれているため、当該「/rear」の前部分である「data1_pai」と同一のデータ処理IDに対応づけて振り分け情報格納部12に格納されている振り分け先ノード情報によって示されるノード1が目標終了時刻から溢れたデータを返却したノードである。つまり、ノード1は、データ処理ID「data1_pai」によって識別されるデータ処理の対象となるデータを分割し、当該分割された後方のデータ(つまり、rearデータ)を中央管理装置10に返却している。よって、振り分け判定部13は、目標終了時刻から溢れたデータを返却したノードとしてノード1を特定する。
【0123】
この場合、振り分け判定部13は、候補ノードであるノード1およびノード3のうち、目標終了時刻から溢れたデータを返却したノード1以外のノードであるノード3を、振り分け先ノードとして決定する。
【0124】
振り分け処理部14は、振り分け対象データ処理を振り分け先ノードとして決定されたノード3に振り分ける。この場合、振り分け処理部14は、ヘッダ情報付与部11によって入力されたデータ(振り分け対象データ処理の対象となるデータ)をノード3に送信する。
【0125】
また、振り分け処理部14は、振り分け対象データ処理の振り分けに関する情報を振り分け情報格納部12に格納(登録)する。
【0126】
ここで、図9は、分散データ処理システムにおいて第1の処理が実行された(つまり、振り分け対象データ処理が振り分けられた)後の振り分け情報格納部12のデータ構造の一例を示す。
【0127】
図9に示すように、振り分け情報格納部12には、図8に示す振り分け情報格納部12と比較して、ヘッダ情報付与部11によってデータに付与されたヘッダ情報を構成するデータ処理ID「data2_housedesign」、目標終了時刻情報「2010/1/21 8:00」および優先レベル情報「4」が対応づけて格納されている。更に、振り分け情報格納部12には、データ処理ID「data2_housedesign」に対応づけて、振り分け対象データ処理がノード3に振り分けられた時刻を示す振り分け時刻情報「2010/1/21 7:00」、振り分け対象データ処理が振り分けられたノード3を示す振り分け先ノード情報「ノード3」、振り分け対象データ処理の対象となるデータが外部から入力されたものであることを示す新規/返却状況情報「新規」および振り分け対象データ処理が完了していないことを示す処理状況情報「未」が格納されている。
【0128】
上記したように分散データ処理システムにおいて第1の処理が実行されると、図8に示す振り分け情報格納部12は、図9に示す振り分け情報格納部12に更新される。
【0129】
次に、図10〜図13のフローチャートを参照して、本実施形態に係る分散データ処理システムにおいて実行される第2の処理の処理手順について説明する。第2の処理は、中央管理装置10によって振り分けられたデータ処理を実行する際の処理である。この第2の処理は、データ処理を振り分けられたノード20によって実行される。
【0130】
ここでは、第2の処理を実行するノード(データ処理を振り分けられたノード)20を対象ノード20と称する。また、対象ノード20に振り分けられたデータ処理を対象データ処理と称する。
【0131】
まず、対象ノード20に含まれるノード管理部21は、中央管理装置10によって送信されたデータを受信する(ステップS31)。ノード管理部21によって受信されたデータは、対象データ処理の対象となるデータである。なお、ノード管理部21によって受信されたデータには、ヘッダ情報が付与されている。このヘッダ情報は、対象データ処理を識別するためのデータ処理ID、当該対象データ処理の目標終了時刻を示す目標終了時刻情報および当該対象データ処理の優先レベルを示す優先レベル情報から構成される。
【0132】
ノード管理部21は、受信されたデータに付与されているヘッダ情報を構成するデータ処理IDおよび当該データを対応づけてノードデータ格納部22に格納する(ステップS32)。
【0133】
次に、ノード管理部21は、実行情報格納部26を参照して、対象データ処理の一部を実行可能か否か、つまり、当該対象データ処理の一部が割り込み可能か否かを判定する(ステップS33)。
【0134】
ノード管理部21は、例えば実行情報格納部26に対象データ処理以外の他のデータ処理(以下、既存データ処理と表記)に関する実行情報が格納されていない場合、つまり、対象ノード20において既存データ処理が存在しない場合には、対象データ処理の一部が割り込み可能であると判定する。また、ノード管理部21は、例えば実行情報格納部26に既存データ処理に関する実行情報が格納されている場合、つまり、対象ノード20において既存データ処理が存在する場合であっても、当該既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている目標終了時刻情報によって示される目標終了時刻と推定終了時刻情報によって示される推定終了時刻との間に予め定められた時間以上の間隔がある場合には、対象データ処理の一部が割り込み可能であると判定する。つまり、既存データ処理の目標終了時刻と推定終了時刻との間に所定の間隔があれば、対象データ処理の一部が割り込み可能であると判定される。
【0135】
これに対して、対象ノード20において既存データ処理が存在する場合であって、当該既存データ処理の目標終了時刻と推定終了時刻との間に所定の間隔がない場合、対象データ処理の一部が割り込み可能でないと判定される。
【0136】
なお、対象データ処理の一部が割り込み可能でないと判定された場合(ステップS33のNO)、ノード管理部21は、当該割り込み可能となるまで、つまり、既存データ処理が完了(終了)するまで待機する(ステップS34)。なお、ステップS34の処理が実行されると、ステップS33に戻って処理が繰り返される。これにより、既存データ処理が完了した場合には、ステップS33において対象データ処理の一部が割り込み可能であると判定される。
【0137】
対象データ処理の一部が割り込み可能であると判定された場合(ステップS33のYES)、ノード管理部21は、対象データ処理の一部の実行をデータ処理実行部23に指示する。
【0138】
データ処理実行部23は、ノード管理部21からの指示に応じて、対象データ処理の一部を実行する(ステップS35)。データ処理実行部23は、実行結果をデータ処理推定部24に渡す。この実行結果には、例えばデータ処理実行部23によって実行された対象データ処理の一部の実行の対象となったデータのサイズ(データサイズ)および当該対象データ処理の一部の実行に費やした時間(処理時間)等が含まれる。
【0139】
データ処理推定部24は、データ処理実行部23から渡された実行結果(に含まれるデータサイズおよび処理時間)に基づいて、対象データ処理における単位処理量を算出する(ステップS36)。この場合、データ処理推定部24は、例えばデータサイズを処理時間で除算することによって単位時間あたりの処理量(単位処理量)を算出する。
【0140】
また、データ処理推定部24は、算出された対象データ処理における単位処理量を用いて、対象データ処理の推定終了時刻を算出する(ステップS37)。
【0141】
この場合、データ処理推定部24は、例えば対象データ処理の対象となるデータを当該対象データ処理における単位処理量で除算することによって当該対象データ処理の完了に必要な時間(以下、推定処理時間と表記)を算出する。
【0142】
ここで、対象ノード20において既存データ処理が存在しない場合には、データ処理推定部24は、例えばステップS31においてデータが受信された時刻を基準として、当該時刻に推定処理時間を加算することによって推定終了時刻を算出する。
【0143】
一方、対象ノード20において既存データ処理が存在する場合には、データ処理推定部24は、当該既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている推定終了時刻情報によって示される時刻(つまり、既存データ処理の推定終了時刻)を基準として、当該時刻に推定処理時間を加算することによって推定終了時刻を算出する。
【0144】
ステップS37の処理が実行されると、ノード管理部21は、対象データ処理に関する実行情報として、受信されたデータに付与されているヘッダ情報を構成するデータ処理ID、目標終了時刻情報および優先レベル情報と、データ処理推定部24によって算出された対象データ処理の推定終了時刻を示す推定終了時刻情報および対象データ処理における単位処理量とを対応づけて実行情報格納部26に格納する。
【0145】
また、既存データ処理が存在する場合には、対象データ処理の処理順として当該既存データ処理の処理順の次の値が実行情報格納部26に格納される。一方、既存データ処理が存在しない場合には、対象データ処理の処理順として「1」が実行情報格納部26に格納される。
【0146】
また、例えば対象データ処理の対象となるデータのデータ量が20fileであり、サイズが50MByteである場合には、処理済情報としては、「0/20file:0/50MByte」が実行情報格納部26に格納される。
【0147】
次に、ノード管理部21は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納された目標終了時刻情報によって示される対象データ処理の目標終了時刻および推定終了時刻情報によって示される対象データ処理の推定終了時刻に基づいて、対象データ処理が当該目標終了時刻以前に終了するか否かを判定する(ステップS38)。この場合、対象データ処理の目標終了時刻が推定終了時刻以前であれば、対象データ処理が目標終了時刻以前に終了しないと判定される。一方、対象データ処理の目標終了時刻が推定終了時刻より後であれば、対象データ処理が目標終了時刻以前に終了すると判定される。
【0148】
対象データ処理が目標終了時刻以前に終了しないと判定された場合(ステップS38のNO)、ノード管理部21は、対象ノード20において既存データ処理(つまり、他のデータ処理)が存在するか(既存データ処理が実行されているか)否かを判定する(ステップS39)。ここでは、上記したように実行情報格納部26に既存データ処理に関する実行情報が格納されている場合には、対象ノード20において既存データ処理が存在すると判定される。
【0149】
対象ノード20において既存データ処理が存在すると判定された場合(ステップS39のYES)、ノード管理部21は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている優先レベル情報によって示される対象データ処理の優先レベルと既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている優先レベル情報によって示される既存データ処理の優先レベルとを比較する。これによって、ノード管理部21は、対象データ処理の優先レベルが既存データ処理の優先レベルより高いか否かを判定する(ステップS40)。
【0150】
対象データ処理の優先レベルが既存データ処理の優先レベルより高いと判定された場合(ステップS40のYES)、ノード管理部21は、当該対象データ処理および既存データ処理の処理順を入れ替える(ステップS41)。具体的には、ノード管理部21は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている処理順と既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている処理順とを入れ替える。これにより、対象データ処理の処理順が前(例えば、処理順「1」)となり、既存データ処理の処理順が後(例えば、処理順「2」)となる。
【0151】
次に、ノード管理部21は、データ処理推定部24に対して、実行情報格納部26に格納されている処理順が前のデータ処理の推定終了時刻の再推定を依頼する。ここでは、上記したように処理順が入れ替えられているため、対象データ処理の推定終了時刻の再推定が依頼される。
【0152】
ノード管理部21によって対象データ処理の推定終了時刻の再推定が依頼されると、既存データ処理の実行が停止される。
【0153】
データ処理推定部24は、既存データ処理の実行が停止された時刻および対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている対象データ処理における単位処理量に基づいて対象データ処理(つまり、処理順が前のデータ処理)の推定終了時刻を算出する(ステップS42)。
【0154】
具体的には、データ処理推定部24は、対象データ処理の対象となるデータ(のサイズ)を当該対象データ処理における単位処理量で除算することによって当該対象データ処理の完了に必要な時間(推定処理時間)を算出し、当該推定処理時間を既存データ処理の実行が停止された時刻に加算することによって当該対象データ処理の推定終了時刻を算出する。なお、データ処理推定部24によって算出された対象データ処理の推定終了時刻は、当該対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される(上書きされる)。
【0155】
次に、ノード管理部21は、処理順が前のデータ処理である対象データ処理の対象となるデータが当該対象データ処理の目標終了時刻から溢れるか否かを判定する(ステップS43)。この場合、ノード管理部21は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納された目標終了時刻情報によって示される対象データ処理の目標終了時刻および推定終了時刻情報によって示される対象データ処理の推定終了時刻に基づいて、対象データ処理が当該目標終了時刻以前に終了するか否かを判定する。つまり、ノード管理部21は、対象データ処理が目標終了時刻以前に終了する場合には対象データ処理の対象となるデータが当該対象データ処理の目標終了時刻から溢れないと判定し、対象データ処理が目標終了時刻以前に終了しない場合には対象データ処理の対象となるデータが当該対象データ処理の目標終了時刻から溢れると判定する。
【0156】
対象データ処理(処理順が前のデータ処理)の対象となるデータが溢れると判定された場合(ステップS43のYES)、ノード管理部21は、当該対象データ処理の対象となるデータの分割が必要であるか否かを判定する(ステップS44)。
【0157】
具体的には、対象データ処理の目標終了時刻が上記した既存データ処理を停止した時刻(つまり、対象データ処理の推定終了時刻を算出する際に基準とした時刻)以前であるような場合、または、対象データ処理の目標終了時刻は上記した既存データ処理を停止した時刻より後であるが、当該既存データ処理を停止した時刻から当該対象データ処理の目標終了時刻までの間で当該対象データ処理における1単位(例えば、1ファイル)も処理できないような場合には、ノード管理部21は、当該対象データ処理の対象となるデータの分割は必要でないと判定する。なお、これら以外の場合については、ノード管理部21は、対象データ処理の対象となるデータの分割が必要であると判定する。
【0158】
対象データ処理の対象となるデータの分割が必要であると判定された場合(ステップS44のYES)、ノード管理部21は、データ処理分割部25に対して当該データの分割を指示する(ステップS45)。ここで、ノード管理部21からの分割指示には、対象データ処理の対象となるデータのうちの当該対象データ処理において目標終了時刻までに処理できるデータのサイズが含まれる。
【0159】
この対象データ処理において目標終了時刻までに処理できるデータのサイズは、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている目標終了時刻情報によって示される対象データ処理の目標終了時刻、推定終了時刻情報によって示される対象データ処理の推定終了時刻および対象データ処理における単位処理量に基づいて算出される。
【0160】
具体的には、ノード管理部21は、対象データ処理の目標終了時刻から推定終了時刻までの間に処理されるデータのサイズを対象データ処理における単位処理量を用いて算出し、当該算出されたサイズを対象データ処理の対象となるデータ全体のサイズから減算することによって、対象データ処理において目標終了時刻までに処理できるデータのサイズを算出できる。なお、対象データ処理の対象となるデータ全体のサイズは、当該対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている処理済情報から取得される。
【0161】
対象データ処理の対象となるデータの分割が指示されると、データ処理分割部25は、当該分割指示に含まれる対象データ処理において目標終了時刻までに処理できるデータのサイズに基づいて、対象データ処理の対象となるデータ(対象データ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納されているデータ)を分割する(ステップS46)。つまり、データ処理分割部25は、対象データ処理の対象となるデータを、当該対象データ処理の目標終了時刻までに処理できるデータと当該目標終了時刻までに処理できないデータとに分割する。
【0162】
次に、ノード管理部21は、データ処理分割部25による分割後の後方のデータである対象データ処理の目標終了時刻までに処理できないデータ(rearデータ)を、中央管理装置10に返却(送信)する(ステップS47)。
【0163】
ここで、中央管理装置10に返却されるデータには、当該データに対するデータ処理を識別するためのデータ処理ID、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理の優先レベルを示す優先レベル情報から構成されるヘッダ情報が付与される。なお、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成するデータ処理IDは、対象データ処理を識別するためのデータ処理IDおよび当該データがrearデータであることを示す情報である「/rear」を含む。また、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成する目標終了時刻情報および優先レベル情報は、対象データ処理の対象となるデータ(つまり、対象データ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納されているデータ)に付与されているヘッダ情報を構成する目標終了時刻情報および優先レベル情報と同一である。換言すれば、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成する目標終了時刻情報および優先レベル情報は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている目標終了時刻および優先レベル情報と同一である。
【0164】
一方、データ処理分割部25による分割後の前方のデータである対象データ処理の目標終了時刻までに処理できるデータ(frontデータ)は、当該データに対するデータ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納される。なお、対象データ処理の目標終了時刻までに処理できるデータに対するデータ処理を識別するためのデータ処理IDは、対象データ処理を識別するためのデータ処理IDおよび当該データがfrontデータであることを示す情報である「/front」を含む。また、実行情報格納部26に格納されている対象データ処理を識別するためのデータ処理IDは、対象データ処理の目標終了時刻までに処理できるデータに対するデータ処理を識別するためのデータ処理IDに更新され、当該データ処理IDに対応づけて実行情報格納部26に格納されている推定終了時刻情報によって示される当該データ処理の推定終了時刻は、当該対象データ処理の目標終了時刻と同時刻に更新される。
【0165】
なお、上記したステップS44において対象データ処理の対象となるデータの分割が必要でないと判定された場合には、ステップS45およびS46の処理は実行されず、ステップS47の処理が実行される。この場合、対象データ処理の対象となるデータがそのまま返却される。対象データ処理の対象となるデータが返却されると、実行情報格納部26に格納されている対象データ処理に関する実行情報は、当該実行情報格納部26から削除される。
【0166】
次に、データ処理推定部24は、実行情報格納部26に格納されている処理順が後のデータ処理の推定終了時刻を算出する(ステップS48)。ここでは、既存データ処理の推定終了時刻が算出される。この場合、データ処理推定部24は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている推定終了時刻情報によって示される対象データ処理の推定終了時刻(ステップS42において算出された対象データ処理の推定終了時刻)および既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている既存データ処理における単位処理量に基づいて、既存データ処理の推定終了時刻を算出する。
【0167】
具体的には、データ処理推定部24は、既存データ処理の対象となるデータのうちの未処理のデータ(のサイズ)を当該既存データ処理における単位処理量で除算することによって当該既存データ処理の完了に必要な時間(推定処理時間)を算出し、当該推定処理時間を対象データ処理の推定終了時刻に加算することによって当該既存データ処理の推定終了時刻を算出する。
【0168】
なお、既存データ処理の対象となるデータのうちの未処理のデータのサイズは、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている処理済情報に含まれる当該既存データ処理の対象となるデータのサイズと当該処理が完了したデータのサイズとの差により算出される。なお、処理済情報は、上記したように既存データ処理の実行が停止されるまで逐次更新されているものとする。
【0169】
上記したように既存データ処理の推定終了時刻が算出された場合、当該推定終了時刻を示す推定終了時刻情報は、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される(上書きされる)。
【0170】
次に、ノード管理部21は、処理順が後のデータ処理である既存データ処理の対象となるデータが当該既存データ処理の目標終了時刻から溢れるか否かを判定する(ステップS49)。この場合、ノード管理部21は、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納された目標終了時刻情報によって示される既存データ処理の目標終了時刻および推定終了時刻情報によって示される既存データ処理の推定終了時刻に基づいて、既存データ処理が当該目標終了時刻以前に終了するか否かを判定する。つまり、ノード管理部21は、既存データ処理が目標終了時刻以前に終了する場合には既存データ処理の対象となるデータが当該既存データ処理の目標終了時刻から溢れないと判定し、既存データ処理が目標終了時刻以前に終了しない場合には既存データ処理の対象となるデータが当該既存データ処理の目標終了時刻から溢れると判定する。
【0171】
既存データ処理(処理順が後のデータ処理)の対象となるデータが溢れると判定された場合(ステップS49のYES)、ノード管理部21は、当該既存データ処理の対象となるデータの分割が必要であるか否かを判定する(ステップS50)。
【0172】
具体的には、既存データ処理の目標終了時刻が上記した対象データ処理の推定終了時刻(つまり、既存データ処理の推定終了時刻を算出する際に基準とした時刻)以前であるような場合、または、既存データ処理の目標終了時刻は対象データ処理の推定終了時刻より後であるが、当該対象データ処理の推定終了時刻から当該既存データ処理の目標終了時刻までの間で当該既存データ処理における1単位(例えば、1ファイル)も処理できないような場合には、ノード管理部21は、当該既存データ処理の対象となるデータの分割は必要でないと判定する。なお、これら以外の場合については、ノード管理部21は、既存データ処理の対象となるデータの分割が必要であると判定する。
【0173】
既存データ処理の対象となるデータの分割が必要であると判定された場合(ステップS50のYES)、ノード管理部21は、データ処理分割部25に対して当該データの分割を指示する(ステップS51)。ここで、ノード管理部21からの分割指示には、既存データ処理の対象となるデータのうちの当該既存データ処理において目標終了時刻までに処理できるデータのサイズが含まれる。
【0174】
この既存データ処理において目標終了時刻までに処理できるデータのサイズは、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている目標終了時刻情報によって示される既存データ処理の目標終了時刻、推定終了時刻情報によって示される既存データ処理の推定終了時刻および既存データ処理における単位処理量に基づいて算出される。
【0175】
具体的には、ノード管理部21は、既存データ処理の目標終了時刻から推定終了時刻までの間に処理されるデータのサイズを対象データ処理における単位処理量を用いて算出し、当該算出されたサイズを既存データ処理の対象となるデータ全体のサイズから減算することによって、既存データ処理において目標終了時刻までに処理できるデータのサイズを算出できる。なお、既存データ処理の対象となるデータ全体のサイズは、当該既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている処理済情報から取得される。
【0176】
既存データ処理の対象となるデータの分割が指示されると、データ処理分割部25は、当該分割指示に含まれる対象データ処理において目標終了時刻までに処理できるデータのサイズに基づいて、既存データ処理の対象となるデータ(既存データ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納されているデータ)を分割する(ステップS52)。つまり、データ処理分割部25は、既存データ処理の対象となるデータを、当該既存データ処理の目標終了時刻までに処理できるデータと当該目標終了時刻までに処理できないデータとに分割する。
【0177】
次に、ノード管理部21は、データ処理分割部25による分割後の後方のデータである既存データ処理の目標終了時刻までに処理できないデータ(rearデータ)を、中央管理装置10に返却(送信)する(ステップS53)。
【0178】
ここで、中央管理装置10に返却されるデータには、当該データに対するデータ処理を識別するためのデータ処理ID、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理の優先レベルを示す優先レベル情報から構成されるヘッダ情報が付与される。なお、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成するデータ処理IDは、既存データ処理を識別するためのデータ処理IDおよび当該データがrearデータであることを示す情報である「/rear」を含む。また、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成する目標終了時刻情報および優先レベル情報は、既存データ処理の対象となるデータ(つまり、既存データ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納されているデータ)に付与されているヘッダ情報を構成する目標終了時刻情報および優先レベル情報と同一である。換言すれば、中央管理装置10に返却されるデータに付与されるヘッダ情報を構成する目標終了時刻情報および優先レベル情報は、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納されている目標終了時刻および優先レベル情報と同一である。
【0179】
一方、データ処理分割部25による分割後の前方のデータである既存データ処理の目標終了時刻までに処理できるデータ(frontデータ)は、当該データに対するデータ処理を識別するためのデータ処理IDに対応づけてノードデータ格納部22に格納される。なお、既存データ処理の目標終了時刻までに処理できるデータに対するデータ処理を識別するためのデータ処理IDは、既存データ処理を識別するためのデータ処理IDおよび当該データがfrontデータであることを示す情報である「/front」を含む。また、実行情報格納部26に格納されている対象データ処理を識別するためのデータ処理IDは、対象データ処理の目標終了時刻までに処理できるデータに対するデータ処理を識別するためのデータ処理IDに更新され、当該データ処理IDに対応づけて実行情報格納部26に格納されている推定終了時刻情報によって示される当該データ処理の推定終了時刻は、当該既存データ処理の目標終了時刻と同時刻に更新される。
【0180】
なお、上記したステップS50において既存データ処理の対象となるデータの分割が必要でないと判定された場合には、ステップS51およびS52の処理は実行されず、ステップS53の処理が実行される。この場合、既存データ処理の対象となるデータがそのまま返却される。既存データ処理の対象となるデータが返却されると、実行情報格納部26に格納されている既存データ処理に関する実行情報は、当該実行情報格納部26から削除される。
【0181】
ここで、上記したように既存データ処理の実行は停止されたため、現在、対象ノード20(に含まれるデータ処理実行部23)においてはデータ処理が実行されていない。したがって、ノード管理部21は、データ処理実行部23に対してデータ処理の実行を指示する。
【0182】
データ処理実行部23は、ノード管理部21からの指示に応じて、データ処理を実行する(ステップS54)。この場合、データ処理実行部23は、実行情報格納部26に格納されている処理順(上記したステップS41において入れ替えられた処理順)に応じて、当該処理順に対応づけられているデータ処理IDによって識別されるデータ処理を実行する。ここでは、データ処理実行部23は、対象データ処理の目標終了時刻までに処理できるデータに対するデータ処理、既存データ処理の目標終了時刻までに処理できるデータに対するデータ処理の順番で、当該各データ処理を実行する。なお、対象データ処理の目標終了時刻までに処理できるデータに対するデータ処理および既存データ処理の目標終了時刻までに処理できるデータに対するデータ処理の実行に応じて、当該データ処理の各々の実行が開始された時刻(開始時刻)を示す開始時刻情報が当該データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される。このようにステップS54の処理が実行されると、第2の処理は終了される。
【0183】
一方、上記したステップS38において対象データ処理が目標終了時刻以前に終了すると判定された場合、第2の処理は終了される。なお、この場合において既存データ処理が存在しない場合には、第2の処理の終了の前に、対象データ処理の実行がデータ処理実行部23によって開始される。このように対象データ処理が実行されると、当該対象データ処理の実行が開始された時刻(開始時刻)を示す開始時刻情報が当該対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される。
【0184】
また、ステップS39において対象ノード20において既存データ処理が存在しないと判定された場合、ノード管理部21は、対象データ処理の対象となるデータの分割が必要であるか否かを判定する(ステップS55)。
【0185】
具体的には、対象データ処理の目標終了時刻が上記したステップS31においてデータが受信された時刻(つまり、対象データ処理の推定終了時刻を算出する際に基準とした時刻)以前であるような場合、または、対象データ処理の目標終了時刻は上記したステップS31においてデータが受信された時刻より後であるが、当該データが受信された時刻か当該対象データ処理の目標終了時刻までの間で当該対象データ処理における1単位(例えば、1ファイル)も処理できないような場合には、ノード管理部21は、当該対象データ処理の対象となるデータの分割は必要でないと判定する。なお、これら以外の場合については、ノード管理部21は、対象データ処理の対象となるデータの分割が必要であると判定する。
【0186】
対象データ処理の対象となるデータの分割が必要であると判定された場合(ステップS55のYES)、ノード管理部21は、上記したステップS45〜S47の処理と同様のステップS56〜S58の処理を実行する。なお、対象データ処理の対象となるデータの分割が必要でないと判定された場合(ステップS55のNO)、ステップS56およびS57の処理は実行されず、ステップS58の処理が実行される。
【0187】
ここでは、上記したように対象ノード20において既存データ処理が存在しないため、ノード管理部21は、データ処理実行部23に対して対象データ処理の実行を指示する。データ処理実行部23は、ノード管理部21からの指示に応じて、対象データ処理を実行する(ステップS59)。なお、対象データ処理が実行されると、当該対象データ処理の実行が開始された時刻(開始時刻)を示す開始時刻情報が当該対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される。このようにステップS59の処理が実行されると、第2の処理は終了される。
【0188】
一方、上記したステップS43において対象データ処理の対象となるデータが溢れないと判定された場合、上記したステップS48以降の処理が実行される。なお、この場合のステップS49においては、対象データ処理を処理順が後のデータ処理として処理が実行される。
【0189】
ここで、図14および図15を参照して、分散データ処理システムにおいて実行される第2の処理の具体例について説明する。
【0190】
図14は、分散データ処理システムにおいて第2の処理が実行される前の対象ノード20に含まれる実行情報格納部26のデータ構造の一例を示す。なお、実行情報格納部26に格納されている情報については、上述した図4において説明した通りであるため、その詳しい説明を省略する。以下、図14に示す実行情報格納部26に格納されているデータ処理ID「data1_pai/rear」によって識別されるデータ処理を既存データ処理とする。なお、対象ノード20においては、実行情報格納部26に格納されているデータ処理ID「data1_pai/rear」によって識別されるデータ処理(つまり、既存データ処理)が実行されているものとする。
【0191】
ここでは、データ処理ID「data2_housedesign」によって識別されるデータ処理が対象ノード20に振り分けられたものとして説明する。以下、データ処理ID「data2_housedesign」によって識別されるデータ処理を対象データ処理とする。
【0192】
この場合、ノード管理部21は、中央管理装置10によって送信された対象データ処理の対象となるデータを受信する。なお、ノード管理部21によって受信されたデータには、データ処理ID「data2_housedesign」、目標終了時刻情報「2010/1/21 8:00」および優先レベル情報「3」から構成されるヘッダ情報が付与されているものとする。
【0193】
ノード管理部21によって受信されたデータは、当該データに付与されているヘッダ情報を構成するデータ処理ID「data2_housedesign」に対応づけてノードデータ格納部22に格納される。
【0194】
次に、ノード管理部21は、対象データ処理の一部が割り込み可能であるか否かを判定する。図14に示す実行情報格納部26によれば既存データ処理が存在するが、当該データ処理の目標終了時刻と推定終了時刻との間には所定の間隔があるため、ノード管理部21は、対象データ処理の一部が割り込み可能であると判定する。
【0195】
この場合、ノード管理部21は、対象データ処理の一部の実行をデータ処理実行部23に対して指示する。データ処理実行部23は、ノード管理部21からの指示に応じて、対象データ処理の一部を実行する。これにより、データ処理実行部23は、対象データ処理の一部の実行の対象となったデータのサイズおよび当該対象データ処理の一部の実行に費やした時間(処理時間)を含む実行結果を取得する。データ処理実行部23は、取得された実行結果をノード管理部21に渡す。また、ノード管理部21は、データ処理実行部23から渡された実行結果をデータ処理推定部24に渡す。
【0196】
データ処理推定部24は、ノード管理部21から渡された実行結果に含まれるデータサイズおよび処理時間に基づいて、対象データ処理における単位処理量を算出する。ここでは、対象データ処理における単位処理量として、例えば4KByte/分が算出されたものとする。
【0197】
また、データ処理推定部24は、算出された対象データ処理における単位処理量を用いて、対象データ処理の推定終了時刻を算出する。なお、対象データ処理の推定終了時刻は、図14に示す実行情報格納部26に格納されている推定終了時刻情報によって示される既存データ処理の推定終了時刻を基準として算出される。ここでは、対象データ処理の推定終了時刻として例えば「2010/1/21 19:00」が算出されたものとする。
【0198】
ここで、対象データ処理の処理順、対象データ処理を識別するためのデータ処理ID「data2_housedesign」、対象データ処理の目標終了時刻を示す目標終了時刻情報「2010/1/21 8:00」、対象データ処理の優先レベルを示す優先レベル情報「3」、データ処理推定部24によって算出された対象データ処理の推定終了時刻を示す推定終了時刻情報「2010/1/21 19:00」、対象データ処理における単位処理量「4KByte/分」および処理済情報が対応づけて実行情報格納部26に格納される。なお、図14に示すように既存データ処理の処理順が「1」であるため、対象データ処理の処理順としては「2」が格納される。また、対象データ処理は実行されていないため、例えば対象データ処理の対象となるデータのデータ量が20file、サイズが50MByteである場合には、処理済情報としては例えば「0/20file:0/50MByte」が格納される。
【0199】
次に、ノード管理部21は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納された目標終了時刻情報によって示される対象データ処理の目標終了時刻および推定終了時刻情報によって示される対象データ処理の推定終了時刻に基づいて、対象データ処理が目標終了時刻以前に終了するか否かを判定する。ここでは、対象データ処理の目標終了時刻「2010/1/21 8:00」が対象データ処理の推定終了時刻「2010/1/21 19:00」より前であるため、対象データ処理は目標終了時刻以前に終了しないと判定される。
【0200】
また、上記したように図14に示す実行情報格納部26によれば既存データ処理に関する実行情報が格納されているため、対象ノード20において既存データ処理が存在すると判定される。更に、上記したように対象データ処理の優先レベルは「4」であり、既存データ処理の優先レベルは「3」であるため、対象データ処理の優先レベルが既存データ処理の優先レベルより高いと判定される。
【0201】
この場合、ノード管理部21は、対象データ処理の処理順と既存データ処理の処理順とを入れ替える。具体的には、実行情報格納部26において、対象データ処理の処理順が「1」とされ、既存データ処理の処理順が「2」とされる。
【0202】
次に、データ処理推定部24は、処理順が前のデータ処理(処理順が「1」のデータ処理)である対象データ処理の推定終了時刻を再推定する。この場合、既存データ処理の実行が停止される。データ処理推定部24は、既存データ処理の実行が停止された時刻を基準として、実行情報格納部26に格納されている対象データ処理における単位処理量を用いて当該対象データ処理の推定終了時刻を算出する。ここでは、対象データ処理の推定終了時刻として例えば「2010/1/21 8:00」が算出されたものとする。データ処理推定部24によって算出された対象データ処理の推定終了時刻を示す推定終了時刻情報は、対象データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される(上書きされる)。
【0203】
ノード管理部21は、対象データ処理(処理順が前のデータ処理)の対象となるデータが当該対象データ処理の目標終了時刻から溢れるか否かを判定する。ここでは、対象データ処理の目標終了時刻は「2010/1/21 8:00」であり、対象データ処理の推定終了時刻は「2010/1/21 8:00」である。したがって、対象データ処理の対象となるデータが対象データ処理の目標終了時刻から溢れない(つまり、対象データ処理は目標終了時刻までに完了できる)と判定される。
【0204】
次に、データ処理推定部24は、処理順が後のデータ処理(処理順が「2」のデータ処理)である既存データ処理の推定終了時刻を再推定する。この場合、データ処理推定部24は、実行情報格納部26を参照して、既存データ処理の推定終了時刻を算出する。この場合、データ処理推定部24は、対象データ処理の推定終了時刻を基準として、既存データ処理における単位処理量を用いて当該既存データ処理の推定終了時刻を算出する。ここでは、既存データ処理の推定終了時刻として例えば「2010/1/21 18:00」が算出されたものとする。データ処理推定部24によって算出された既存データ処理の推定終了時刻を示す推定終了時刻情報は、既存データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される(上書きされる)。
【0205】
ノード管理部21は、既存データ処理(処理順が後のデータ処理)の対象となるデータが当該既存データ処理の目標終了時刻から溢れるか否かを判定する。ここでは、既存データ処理の目標終了時刻は「2010/1/21 15:00」であり、既存データ処理の推定終了時刻は「2010/1/21 18:00」である。したがって、既存データ処理の対象となるデータが既存データ処理の目標終了時刻から溢れる(つまり、既存データ処理は目標終了時刻までに完了できない)と判定される。
【0206】
この場合、ノード管理部21は、既存データ処理の対象となるデータの分割が必要であるか否かを判定する。ここでは、上記したように既存データ処理の目標終了時刻と当該既存データ処理の推定終了時刻を算出する際に基準とした時刻(ここでは、対象データ処理の推定終了時刻)との関係から、既存データ処理の対象となるデータの分割が必要であると判定される。
【0207】
次に、ノード管理部21は、データ処理分割部25に対して既存データ処理の対象となるデータの分割を指示する。このデータ処理分割部25に対する分割指示には、既存データ処理の対象となるデータのうちの当該既存データ処理において目標終了時刻までに処理できるデータのサイズが含まれる。
【0208】
データ処理分割部25は、ノード管理部21からの分割指示に含まれる既存データ処理において目標終了時刻までに処理できるデータのサイズに基づいて、既存データ処理の対象となるデータを分割する。つまり、既存データ処理の対象となるデータは、既存データ処理の目標終了時刻までに処理できるデータと当該目標終了時刻までに処理できないデータとに分割される。
【0209】
ノード管理部21は、データ処理分割部25による分割後の後方のデータである既存データ処理の目標終了時刻までに処理できないデータ(rearデータ)を中央管理装置10に返却する。中央管理装置10に返却されるデータには、当該データに対するデータ処理を識別するためのデータ処理ID「data1_pai/rear/rear」、当該データ処理の目標終了時刻を示す目標終了時刻情報「2010/1/21 15:00」および当該データ処理の優先レベルを示す優先レベル情報「3」から構成されるヘッダ情報付与される。なお、このデータ処理ID「data1_pai/rear/rear」は、既存データ処理を識別するためのデータ処理ID「data1_pai/rear」およびrearデータであることを示す情報である「/rear」から構成されている。また、目標終了時刻情報「2010/1/21 15:00」および優先レベル情報「3」は、既存データ処理の目標終了時刻を示す目標終了時刻情報および当該既存データ処理の優先レベルを示す優先レベル情報と同一である。
【0210】
また、ノード管理部21は、データ処理分割部25による分割後の前方のデータである既存データ処理の目標終了時刻までに処理できるデータ(frontデータ)を当該データに対するデータ処理を識別するためのデータ処理ID「data1_pai/rear/front」に対応づけてノードデータ格納部22に格納する。なお、このデータ処理ID「data1_pai/rear/front」は、既存データ処理を識別するためのデータ処理ID「data1_pai/rear」およびfrontデータであることを示す情報である「/front」から構成されている。また、ノード管理部21は、実行情報格納部26に格納されている既存データ処理を識別するためのデータ処理ID「data1_pai/rear」を既存データ処理の目標終了時刻までに処理できるデータに対するデータ処理を識別するためのデータ処理ID「data1_pai/rear/front」に更新する。更に、ノード管理部21は、このデータ処理ID「data1_pai/rear/front」に対応づけて実行情報格納部26に格納されている推定終了時刻情報によって示される推定終了時刻を既存データ処理の目標終了時刻と同時刻に更新する。なお、図15は、上記したような処理が実行されることによって更新された実行情報格納部26のデータ構造の一例を示す。
【0211】
ここで、上記したように既存データ処理の実行は停止されているため、現在、対象ノード20においては、データ処理が実行されていない。したがって、ノード管理部21は、実行情報格納部26を参照して、データ処理実行部23に対してデータ処理の実行を指示する。
【0212】
この場合、データ処理実行部23は、ノード管理部21からの指示に応じて、処理順「1」に対応づけて図15に示す実行情報格納部26に格納されているデータ処理ID「data2_housedesign」によって識別されるデータ処理(対象データ処理)を実行し、当該データ処理が完了した後に処理順「2」に対応づけて実行情報格納部26に格納されている「data1_pai/rear/front」によって識別されるデータ処理(既存データ処理の対象となるデータのうちの目標終了時刻までに処理できるデータに対するデータ処理)を実行する。
【0213】
このように分散データ処理システム(を構成する対象ノード20)において第2の処理が実行されることによって、データ処理の優先レベルを考慮した上で、各データ処理の目標終了時刻を満たすように適切にデータ処理を実行することができる。
【0214】
次に、図16のフローチャートを参照して、本実施形態に係る分散データ処理システムにおいて実行される第3の処理の処理手順について説明する。第3の処理は、ノード20から溢れて返却されたデータに対するデータ処理を振り分ける際の処理である。この第2の処理は、中央管理装置10によって実行される。
【0215】
中央管理装置に含まれる振り分け判定部13は、ノード20から返却されたデータを受信する(ステップS61)。ノード20から返却されたデータには、当該データに対するデータ処理を識別するためのデータ処理ID、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理の優先レベルを示す優先レベル情報から構成されるヘッダ情報が付与されている。
【0216】
なお、ノード20から返却されたデータとは、当該ノード20において目標終了時刻までにデータ処理を完了することができずに当該目標終了時刻から溢れたデータである。
【0217】
以下、第3の処理において、振り分け判定部13によって受信されたデータ(ノード20から返却されたデータ)に対するデータ処理を振り分け対象データ処理と称する。
【0218】
分散データ処理システムにおいて実行される第3の処理においては、ステップS61の処理が実行されると、上述した図7に示すステップS14〜S20の処理に相当するステップS62〜ステップS68の処理が実行される。ステップS68の処理が実行されると、分散データ処理システムにおいて実行される第3の処理は終了される。
【0219】
なお、ステップS67において振り分け先ノードに対して振り分け対象データ処理が振り分けられる場合、振り分け処理部14は、振り分け判定部13によって受信されたデータ(ステップS61において受信されたデータ)を当該振り分け先ノードに対して送信する。
【0220】
また、ステップS68においては、振り分け判定部13によって受信されたデータに付与されているヘッダ情報を構成するデータ処理ID(振り分け対象データ処理を識別するためのデータ処理ID)、目標終了時刻情報、優先レベル情報が対応づけて振り分け情報格納部12に格納される。更に、振り分け対象データ処理を識別するためのデータ処理IDに対応づけて、振り分け対象データ処理が振り分け判定部13によって決定されたノード20に振り分けられた時刻を示す振り分け時刻情報、当該ノード20(振り分け先ノード)を示す振り分け先ノード情報、当該振り分け対象データ処理の対象となるデータがノード20から溢れて返却されたものであること(つまり、振り分け判定部13によって受信されたこと)を示す新規/返却状況情報「返却」および振り分け対象データ処理が完了していないことを示す処理状況情報「未」が振り分け情報格納部12に格納される。
【0221】
なお、ステップS62においてデータ処理を実行していないノード20があると判定された場合、当該ノード20を振り分け先ノードとして、ステップS67およびS68の処理が実行される。
【0222】
また、ステップS65において、振り分け判定部13によって特定されたノード20が複数存在しないと判定された(つまり、1つのノード20が特定された)場合、当該振り分け判定部13によって特定されたノード20を振り分け先ノードとして、ステップS67およびS68の処理が実行される。
【0223】
次に、図17のフローチャートを参照して、本実施形態に係る分散データ処理システムにおいて実行される第4の処理の処理手順について説明する。第4の処理は、データ処理の結果を中央管理装置10に送信する際の処理である。この第4の処理は、データ処理を振り分けられたノード20によって実行される。
【0224】
ノード20に含まれるデータ処理実行部23は、データ処理の実行が完了した場合、その旨をノード管理部21に通知する(ステップS71)。この際、データ処理実行部23は、データ処理の結果(以下、データ処理結果と表記)をノード管理部21に渡す。
【0225】
ノード管理部21は、データ処理実行部23からの通知を受けると、データ処理実行部23によるデータ処理の実行の完了(終了)に伴って実行情報格納部26(の内容)を更新する(ステップS72)。
【0226】
この場合、ノード管理部21は、データ処理実行部23によって実行が完了されたデータ処理に関する実行情報を実行情報格納部26から削除する。つまり、ノード管理部21は、データ処理実行部23によって実行が完了されたデータ処理を識別するためのデータ処理IDおよび当該データ処理IDに対応づけられている情報を実行情報格納部26から削除する。また、ノード管理部21は、データ処理実行部23によって実行が完了されたデータ処理の実行情報の削除に応じて、他のデータ処理の処理順を繰り上げる。更に、処理順が繰り上げられたデータ処理の実行がデータ処理実行部23によって開始された場合には、当該データ処理の実行の開始時刻を示す開始時刻情報が当該データ処理を識別するためのデータ処理IDに対応づけて実行情報格納部26に格納される。
【0227】
例えば上述した図15に示す実行情報格納部26に格納されているデータ処理ID「data2_housedesign」によって識別されるデータ処理の実行がデータ処理実行部23によって完了された場合には、当該データ処理に関する実行情報が実行情報格納部26から削除され、データ処理ID「data1_pai/rear/front」に対応づけられている処理順が「2」から「1」に繰り上げられる。また、データ処理ID「data1_pai/rear/front」によって識別されるデータ処理の実行が開始された場合には、当該開始時刻を示す開始時刻情報が当該データ処理ID「data1_pai/rear/front」に対応づけて実行情報格納部26に格納される。
【0228】
次に、ノード管理部21は、データ処理実行部23から渡されたデータ処理結果および当該データ処理を識別するためのデータ処理IDを中央管理装置10に対して送信する(ステップS73)。
【0229】
次に、図18のフローチャートを参照して、本実施形態に係る分散データ処理システムにおいて実行される第5の処理の処理手順について説明する。第5の処理は、ノード20によって実行されたデータ処理の結果を出力する処理である。この第5の処理は、中央管理装置10によって実行される。
【0230】
中央管理装置10に含まれる中央結果処理部15は、ノード20によって実行されたデータ処理の結果(データ処理結果)および当該データ処理を識別するためのデータ処理IDを、当該ノード20から受信する(ステップS81)。ここでは、中央結果処理部15によって受信されたデータ処理IDによって識別されるデータ処理を対象データ処理と称する。
【0231】
中央結果処理部15は、受信されたデータ処理IDおよびデータ処理結果を対応づけて処理結果格納部16に格納する(ステップS82)。
【0232】
次に、中央結果処理部15は、対象データ処理の対象となるデータが分割されている(つまり、分割後のデータである)か否かを判定する(ステップS83)。ここでは、中央結果処理部15は、受信されたデータ処理IDの後方に「/front」または「/rear」が含まれている場合に、対象データ処理の対象となるデータが分割されていると判定する。
【0233】
対象データ処理の対象となるデータが分割されていると判定された場合(ステップS83のYES)、中央結果処理部15は、当該分割後の他方のデータに対するデータ処理(以下、単に他方のデータ処理と表記)が完了しているか否かを判定する(ステップS84)。
【0234】
この場合、中央結果処理部15によって受信されたデータ処理IDに対応するデータ処理ID(以下、対応データ処理IDと表記)が処理結果格納部16に格納されているか否かによって判定処理を実行する。なお、対応データ処理IDとは、中央結果処理部15によって受信されたデータ処理IDの後方に例えば「/front」が含まれている場合には、当該「/front」に代えて当該データ処理IDの後方に「/rear」が含まれているデータ処理IDである。一方、中央結果処理部15によって受信されたデータ処理IDの後方に例えば「/rear」が含まれている場合には、当該「/rear」に代えて当該データ処理IDの後方に「/front」が含まれているデータ処理IDである。
【0235】
つまり、ステップS84においては、中央結果処理部15によって受信されたデータ処理IDの対応データ処理IDが処理結果格納部16に格納されている場合には、他方のデータ処理が完了していると判定される。一方、中央結果処理部15によって受信されたデータ処理IDの対応データ処理IDが処理結果格納部16に格納されている場合には、他方のデータ処理が完了していると判定される。
【0236】
他方のデータ処理が完了していると判定された場合(ステップS84のYES)、中央結果処理部15は、振り分け情報格納部12を参照して、対象データ処理および親データ処理の処理状況(情報)を更新する(ステップS85)。ここで、親データ処理とは、対象データ処理を識別するためのデータ処理ID(つまり、中央結果処理部15によって受信されたデータ処理ID)の後方に含まれる「/front」または「/rear」を除いたデータ処理IDによって識別されるデータ処理である。換言すれば、親データ処理とは、対象データ処理の対象となるデータに分割される前のデータ(親データ)に対するデータ処理である。
【0237】
この場合、中央結果処理部15は、対象データ処理を識別するためのデータ処理IDに対応づけて振り分け情報格納部12に格納されている処理状況情報を「完了」に更新する。また、中央結果処理部15は、親データ処理を識別するためのデータ処理IDに対応づけて振り分け情報格納部12に格納されている処理状況情報を「完了」に更新する。
【0238】
なお、処理状況情報が「完了」に更新された親データ処理の対象となるデータが例えば分割後のデータである(つまり、分割されている)場合には、当該親データ処理に対して上記したステップS84の処理に相当する処理が実行されるものとする。この処理によって、親データ処理に対する他方のデータ処理(分割後の他方のデータに対するデータ処理)が完了していると判定された場合には、当該親データ処理の親データ処理の処理状況情報についても「完了」に更新される。一方、親データ処理に対する他方のデータ処理が完了していないと判定された場合には、ステップS86の処理が実行される。
【0239】
次に、中央結果処理部15は、振り分け情報格納部12を参照して、外部から入力されたデータに対するデータ処理(以下、外部入力データ処理と表記)が完了しているか否かを判定する(ステップS86)。具体的には、中央結果処理部15は、外部入力データ処理を識別するためのデータ処理IDに対応づけて振り分け情報格納部12に格納されている処理状況情報が「完了」であるか否かを判定する。
【0240】
ここで、外部入力データ処理とは、「/front」および「/rear」が含まれていないデータ処理IDであって、新規/返却状況情報「新規」に対応づけて振り分け情報格納部12に格納されているデータ処理IDによって識別されるデータ処理である。
【0241】
外部入力データ処理が完了していると判定された場合(ステップS86のYES)、中央結果処理部15は、当該外部入力データ処理に関するデータ処理結果を処理結果格納部16から取得する(ステップS87)。なお、外部入力データ処理を識別するためのデータ処理IDが処理結果格納部16に格納されている場合には、当該データ処理IDに対応づけて処理結果格納部16に格納されているデータ処理結果(つまり、外部入力データ処理のデータ処理結果)が取得される。一方、外部入力データ処理を識別するためのデータ処理IDが処理結果格納部16に格納されていない場合には、当該外部入力データ処理に関連するデータ処理(以下、外部入力データ処理の関連データ処理と表記)を識別するためのデータ処理IDに対応づけて処理結果格納部16に格納されているデータ処理結果(つまり、外部入力データ処理の関連データ処理のデータ処理結果)の全てが取得される。
【0242】
なお、外部入力データ処理の関連データ処理とは、外部入力データ処理の目標終了時刻から溢れてノード20から返却されたデータ(つまり、外部入力データ処理の対象となるデータの分割後のデータ)に対するデータ処理である。つまり、外部乳旅行データ処理の関連データ処理とは、当該外部入力データ処理を識別するためのデータ処理IDの後方に「/front」または「/rear」が含まれているデータ処理IDによって識別されるデータ処理である。
【0243】
次に、中央結果処理部15は、外部入力データ処理の最終結果を外部に出力する(ステップS88)。上記したステップS87において外部入力データ処理のデータ処理結果が取得された場合には、中央結果処理部15は、当該外部入力データ処理のデータ処理結果を最終結果として出力する。一方、ステップS87において外部入力データ処理の関連データ処理のデータ処理結果の全てが取得された場合には、中央結果処理部15は、当該データ処理結果の全てを集約した結果(最終結果)を出力する。
【0244】
なお、ステップS88の処理が実行されると、外部入力データ処理を識別するためのデータ処理IDおよび当該データ処理IDに対応づけられている情報は、振り分け情報格納部12および処理結果格納部16から削除される。同様に、外部入力データ処理の関連データ処理を識別するためのデータ処理IDおよび当該データ処理IDに対応づけられている情報についても、振り分け情報格納部12および処理結果格納部16から削除される。
【0245】
一方、外部入力データ処理が完了していないと判定された場合(ステップS86のNO)、ステップS87およびS88の処理は実行されない。
【0246】
また、ステップS83において対象データ処理の対象となるデータが分割されていないと判定された場合、およびステップS84において他方のデータ処理が完了していないと判定された場合、中央結果処理部15は、振り分け情報格納12を参照して、対象データ処理の処理状況を更新する(ステップS89)。
【0247】
この場合、中央結果処理部15は、対象データ処理を識別するためのデータ処理IDに対応づけて振り分け情報格納部12に格納されている処理状況情報を「完了」に更新する。なお、ステップS83において対象データ処理の対象となるデータが分割されていないと判定された場合であって、対象データ処理を識別するためのデータ処理IDが新規/返却状況情報「返却」に対応づけられている場合、つまり、対象データ処理の対象となるデータが分割されずにノード20から返却されたデータである場合には、当該対象データ処理を識別するためのデータ処理IDおよび新規/返却状況情報「新規」に対応づけて振り分け情報格納部12に格納されている処理状況情報についても「完了」に更新されるものとする。
【0248】
上記したステップS89の処理が実行されると、上記したステップS86以降の処理が実行される。
【0249】
ここで、図19および図20を参照して、分散データ処理システムにおいて実行される第5の処理の具体例について説明する。
【0250】
図19は、分散データ処理システムにおいて第5の処理が実行される前の中央管理装置10に含まれる処理結果格納部16のデータ構造の一例を示す。図19に示す例では、処理結果格納部16には、データ処理ID「data1_pai/front」およびデータ処理結果「データ処理結果1f」が対応づけて格納されている。
【0251】
なお、中央管理装置10に含まれる振り分け情報格納部12は、上述した図8に示すデータ構造を有するものとする。また、分散データ処理システムを構成する複数のノード20としてノード1〜3の3つのノードが存在するものとする。
【0252】
以下、データ処理ID「data1_pai/rear」および当該データ処理ID「data1_pai/rear」によって識別されるデータ処理のデータ処理結果「データ処理結果1r」がノード3から送信された場合について具体的に説明する。
【0253】
まず、分散データ処理システムにおいて実行される第5の処理では、中央管理装置10に含まれる中央結果処理部15は、ノード3によって送信されたデータ処理ID「data1_pai/rear」およびデータ処理結果「データ処理結果1r」を受信する。
【0254】
中央結果処理部15は、受信されたデータ処理ID「data1_pai/rear」およびデータ処理結果「データ処理結果1r」を対応づけて処理結果格納部16に格納する。なお、図20は、データ処理ID「data1_pai/rear」およびデータ処理結果「データ処理結果1r」が格納された後の処理結果格納部16のデータ構造を示す。
【0255】
次に、中央結果処理部15は、受信されたデータ処理ID「data1_pai/rear」によって識別されるデータ処理(以下、対象データ処理と表記)の対象となるデータが分割されている(分割後のデータである)か否かを判定する。ここで、中央結果処理部15によって受信されたデータ処理ID「data1_pai/rear」には後方に「/rear」が含まれているため、対象データ処理の対象となるデータはrearデータ(つまり、分割後の後方のデータ)である。したがって、中央結果処理部15は、対象データ処理の対象となるデータが分割されていると判定する。
【0256】
対象データ処理の対象となるデータが分割されていると判定された場合、中央結果処理部15は、他方のデータ処理(ここでは、データ処理ID「data1_pai/front」によって識別されるデータ処理)が完了しているか否かを判定する。
【0257】
この場合、図20に示す処理結果格納部16にはデータ処理ID「data1_pai/front」が格納されているため、中央結果処理部15は、他方のデータ処理が完了していると判定する。
【0258】
他方のデータ処理が完了していると判定された場合、中央結果処理部15は、振り分け情報格納部12において、対象データ処理および親データ処理の処理状況を更新する。なお、対象データ処理を識別するためのデータ処理IDが「data1_pai/rear」である場合における親データ処理は、データ処理ID「data1_pai」によって識別されるデータ処理である。
【0259】
ここでは、中央結果処理部15は、対象データ処理を識別するためのデータ処理ID「data1_pai/rear」に対応づけて振り分け情報格納部12に格納されている処理状況情報を「完了」に更新する。また、中央結果処理部15は、親データ処理を識別するためのデータ処理ID「data1_pai」に対応づけて振り分け情報格納部812に格納されている処理状況情報を「完了」に更新する。なお、図21は、対象データ処理および親データ処理の処理状況が更新された後の振り分け情報格納部12のデータ構造の一例を示す。
【0260】
次に、中央結果処理部15は、振り分け情報格納部12を参照して、外部入力データ処理が完了しているか否かを判定する。ここで、図21に示す振り分け情報格納部12に格納されている例えばデータ処理ID「data1_pai」によって識別されるデータ処理は、「/front」および「/rear」が含まれていないデータ処理IDであって新規/返却状況情報「新規」に対応づけられているデータ処理IDによって識別されるデータ処理であるため、外部入力データ処理である。このデータ処理ID「data1_pai」に対応づけて振り分け情報格納部12に格納されている処理状況情報は「完了」であるため、当該データ処理ID「data1_pai」によって識別される外部入力データ処理は完了していると判定される。
【0261】
この場合、中央結果処理部15は、外部入力データ処理に冠するデータ処理結果を処理結果格納部16から取得する。ここで、図20に示す処理結果格納部16には外部入力データ処理を識別するためのデータ処理ID「data1_pai」は格納されていないため、当該外部入力データ処理の関連データ処理を識別するためのデータ処理ID「data1_pai/front」および「data1_pai/rear」に対応づけて処理結果格納部16に格納されているデータ処理結果「データ処理結果1f」および「データ処理結果1r」を取得する。
【0262】
次に、中央結果処理部15は、データ処理ID「data1_pai」によって識別される外部入力データ処理の最終結果として、取得されたデータ処理結果「データ処理結果1f」および「データ処理結果1r」を集約した結果を外部に出力する。
【0263】
上記したように本実施形態において、分散データ処理システムを構成する中央管理装置10は、振り分け情報格納部12に格納されている振り分け先ノード情報に基づいてデータ処理が振り分けられていないノードが存在するか否かを判定し、データ処理が振り分けられていないノードが存在しないと判定された場合に、ノード毎に最も遅い目標終了時刻を抽出し、当該ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定し、ノードに振り分けられていないデータ処理を当該特定されたノードに振り分ける。
【0264】
また、本実施形態において、分散データ処理システム構成する複数のノード20のうちのデータ処理が振り分けられたノード20は、当該ノード20に振り分けられたデータ処理および当該ノード20において実行されている他のデータ処理の処理順を優先レベルに基づいて入れ替え、当該データ処理の対象となるデータを当該データ処理の目標終了時刻までに処理できるデータおよび当該データ処理の目標終了時刻までに処理できないデータに分割し、当該分割された後の後方のデータ(つまり、目標終了時刻までに処理できないデータ)を中央管理装置10に返却する。
【0265】
また、本実施形態において、中央管理装置10は、ノード20から返却されたデータを再び当該データを返却したノード20以外のノード20に振り分ける。
【0266】
また、本実施形態において、中央管理装置10は、例えば外部入力データ処理の対象となるデータが分割されている場合には、当該分割されたデータに対するデータ処理の全てが各ノード20で完了した場合に、当該データ処理結果を集約した結果を当該外部入力データ処理の最終結果として出力する。
【0267】
したがって、本実施形態においては、複数のノード20の性能等を把握することなく、適切なスケジューリング(データ処理の振り分け)を行うことが可能となる。これにより、本実施形態においては、データ処理を複数のノード20に振り分ける際の通信処理コストを低減することができる。更に、本実施形態においては、各データ処理の優先レベルを考慮した上で、目標終了時刻を満たした再スケジューリングを行うことが可能となる。
【0268】
なお、本実施形態においては、上記した第2の処理でデータ処理の目標終了時刻からデータが溢れる場合に当該データ処理の対象となるデータが分割されるものとして説明したが、例えば目標終了時刻の所定時間前にデータ処理を完了できないような場合には、当該データ処理の対象となるデータを強制的に分割するような構成であっても構わない。
【0269】
また、本願発明は、上記実施形態そのままに限定されるものではなく、実施段階ではその要旨を逸脱しない範囲で構成要素を変形して具体化できる。また、上記実施形態に開示されている複数の構成要素の適宜な組合せにより種々の発明を形成できる。例えば、実施形態に示される全構成要素から幾つかの構成要素を削除してもよい。更に、異なる実施形態に亘る構成要素を適宜組合せてもよい。
【符号の説明】
【0270】
10…中央管理装置、11…ヘッダ情報付与部、12…振り分け情報格納部、13…振り分け判定部、14…振り分け処理部、15…中央結果処理部、16…処理結果格納部、20…ノード、21…ノード管理部、22…ノードデータ格納部、23…データ処理実行部、24…データ処理推定部、25…データ処理分割部、26…実行情報格納部。

【特許請求の範囲】
【請求項1】
データ処理を実行する複数のノードと、前記複数のノードと接続された中央管理装置であって当該データ処理を当該ノードの各々に振り分ける中央管理装置とから構成される分散データ処理システムにおいて、
前記中央管理装置は、
前記ノードの各々に振り分けられたデータ処理を識別するためのデータ処理識別情報、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて格納する振り分け情報格納手段と、
前記ノードに振り分けられていないデータ処理の対象となるデータおよび当該データ処理の目標終了時刻を示す目標終了時刻情報を入力する入力手段と、
前記振り分け情報格納手段に格納されている振り分け先ノード情報に基づいて、前記複数のノードの中にデータ処理が振り分けられていないノードが存在するかを判定する判定手段と、
前記データ処理が振り分けられていないノードが存在しないと判定された場合、前記ノード毎に、当該ノードを示す振り分け先ノード情報に対応づけて前記振り分け情報格納手段に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻を抽出する抽出手段と、
前記ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定する特定手段と、
前記入力されたデータおよび目標終了時刻情報を前記特定されたノードに送信し、当該入力されたデータに対するデータ処理を前記特定されたノードに振り分ける振り分け処理手段と、
前記入力されたデータに対するデータ処理を識別するためのデータ処理識別情報、前記入力された目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて前記振り分け情報格納手段に格納する格納処理手段と
を含み、
前記入力されたデータに対するデータ処理が振り分けられたノードは、前記送信された目標終了時刻情報によって示される目標終了時刻に基づいて、前記送信されたデータに対するデータ処理を実行する
ことを特徴とする分散データ処理システム。
【請求項2】
前記入力手段は、前記入力されたデータに対するデータ処理の優先レベルを示す優先レベル情報を更に入力し、
前記振り分け処理手段は、前記入力された優先レベル情報を前記特定されたノードに更に送信し、
前記入力されたデータに対するデータ処理が振り分けられたノードは、
当該ノードで実行されているデータ処理を識別するためのデータ処理識別情報、当該データ処理の優先レベルを示す優先レベル情報、当該データ処理の目標終了時刻を示す目標終了時刻情報、当該データ処理の推定終了時刻を示す推定終了時刻情報および当該データ処理における単位処理量を対応づけて格納する実行情報格納手段と、
前記送信されたデータに対するデータ処理の一部を実行するデータ処理実行手段と、
前記データ処理実行手段によるデータ処理の一部の実行結果に基づいて前記送信されたデータに対するデータ処理における単位処理量を算出し、前記送信されたデータ、当該算出された単位処理量および前記実行情報格納手段に格納されている推定終了時刻情報によって示される推定終了時刻に基づいて前記送信されたデータに対するデータ処理の推定終了時刻を算出する第1の算出手段と、
前記送信された目標終了時刻情報によって示される目標終了時刻および前記第1の算出手段によって算出された推定終了時刻に基づいて、前記送信されたデータに対するデータ処理が当該目標終了時刻以前に終了するかを判定する第1の判定手段と、
前記送信されたデータに対するデータ処理が目標終了時刻以前に終了しないと前記第1の判定手段によって判定された場合、前記送信された優先レベル情報によって示される優先レベルおよび前記実行情報格納手段に格納されている優先レベル情報によって示される優先レベルに基づいて、前記送信されたデータに対するデータ処理および前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理が実行される順番を入れ替える入れ替え手段と、
前記順番が入れ替えられた場合、前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理を停止し、前記送信されたデータ、前記第1の算出手段によって算出された単位処理量および当該データ処理が停止された時刻に基づいて前記送信されたデータに対するデータ処理の推定終了時刻を算出する第2の算出手段と、
前記送信された目標終了時刻情報によって示される目標終了時刻および前記第2の算出手段によって算出された推定終了時刻に基づいて、前記送信されたデータに対するデータ処理が当該目標終了時刻以前に終了するかを判定する第2の判定手段と、
前記送信されたデータに対するデータ処理が目標終了時刻以前に終了しないと前記第2の判定手段によって判定された場合、前記送信された目標終了時刻情報によって示される目標終了時刻、前記第1の算出手段によって算出された単位処理量および前記第2の算出手段によって算出された推定終了時刻に基づいて、前記送信されたデータを、当該データに対するデータ処理において当該目標終了時刻までに処理できる前方のデータと当該目標終了時刻までに処理できない後方のデータとに分割する第1の分割手段と、
前記第1の分割手段によって分割された後方のデータを前記中央管理装置に返却する第1の返却手段と、
前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理の対象となるデータ、当該データ処理識別情報に対応づけて前記実行情報格納手段に格納されている単位処理量および前記第2の算出手段によって算出された推定終了時刻に基づいて前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理の推定終了時刻を算出する第3の算出手段と、
前記実行情報格納されている目標終了時刻情報によって示される目標終了時刻および前記第3の算出手段によって算出された推定終了時刻に基づいて、前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理が当該目標終了時刻以前に終了するかを判定する第3の判定手段と、
前記実行情報格納手段に格納されているデータ処理識別情報によって識別されるデータ処理が目標終了時刻以前に終了しないと判定された場合、当該データ処理識別情報に対応づけて前記実行情報格納手段に格納されている目標終了時刻情報によって示される目標終了時刻、単位処理量および前記第3の算出手段によって算出された推定終了時刻に基づいて、当該データ処理識別情報によって識別されるデータ処理の対象となるデータを、当該データ処理において当該目標終了時刻までに処理できる前方のデータと当該目標終了時刻までに処理できない後方のデータとに分割する第2の分割手段と、
前記第2の分割手段によって分割された後方のデータを前記中央管理装置に返却する第2の返却手段と
を含み、
前記データ処理実行手段は、前記第1の分割手段によって分割された前方のデータに対するデータ処理、前記第2の分割手段によって分割された前方のデータに対するデータ処理の順番で、当該各データ処理を実行する
ことを特徴とする請求項1記載の分散データ処理システム。
【請求項3】
前記振り分け処理手段は、前記第1および第2の返却手段によってデータが返却された場合、当該第1および第2の返却手段によって返却されたデータに対するデータ処理を前記特定されたノードに更に振り分けることを特徴とする請求項2記載の分散データ処理システム。
【請求項4】
前記中央管理装置は、前記入力されたデータに対するデータ処理が振り分けられたノードに含まれるデータ処理実行手段によって実行された前記第1の分割手段によって分割された前方のデータに対するデータ処理の結果および前記第1の返却手段によって返却されたデータに対するデータ処理が振り分けられたノードに含まれるデータ処理実行手段によって実行された当該データ処理の結果を集約して出力し、前記入力されたデータに対するデータ処理が振り分けられたノードに含まれるデータ処理実行手段によって実行された前記第2の分割手段によって分割された前方のデータに対するデータ処理の結果および前記第2の返却手段によって返却されたデータに対するデータ処理が振り分けられたノードに含まれるデータ処理実行手段によって実行された当該データ処理の結果を集約して出力する出力手段を更に含むことを特徴とする請求項3記載の分散データ処理システム。
【請求項5】
データ処理を実行する複数のノードと接続され、当該データ処理を当該ノードの各々に振り分ける中央管理装置において、
前記ノードの各々に振り分けられたデータ処理を識別するためのデータ処理識別情報、当該データ処理の目標終了時刻を示す目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて格納する振り分け情報格納手段と、
前記ノードに振り分けられていないデータ処理の対象となるデータおよび当該データ処理の目標終了時刻を示す目標終了時刻情報を入力する入力手段と、
前記振り分け情報格納手段に格納されている振り分け先ノード情報に基づいて、前記複数のノードの中にデータ処理が振り分けられていないノードが存在するかを判定する判定手段と、
前記データ処理が振り分けられていないノードが存在しないと判定された場合、前記ノード毎に、当該ノードを示す振り分け先ノード情報に対応づけて前期振り分け情報格納手段に格納されている目標終了時刻情報によって示される目標終了時刻のうち最も遅い目標終了時刻を抽出する抽出手段と、
前記ノード毎に抽出された目標終了時刻の中で最も早い目標終了時刻が抽出されたノードを特定する特定手段と、
前記入力されたデータおよび目標終了時刻情報を前記特定されたノードに送信し、当該入力されたデータに対するデータ処理を前記特定されたノードに振り分ける振り分け処理手段と、
前記入力されたデータに対するデータ処理を識別するためのデータ処理識別情報、前記入力された目標終了時刻情報および当該データ処理が振り分けられたノードを示す振り分け先ノード情報を対応づけて前記振り分け情報格納手段に格納する格納処理手段と
を具備することを特徴とする中央管理装置。

【図1】
image rotate

【図2】
image rotate

【図3】
image rotate

【図4】
image rotate

【図5】
image rotate

【図6】
image rotate

【図7】
image rotate

【図8】
image rotate

【図9】
image rotate

【図10】
image rotate

【図11】
image rotate

【図12】
image rotate

【図13】
image rotate

【図14】
image rotate

【図15】
image rotate

【図16】
image rotate

【図17】
image rotate

【図18】
image rotate

【図19】
image rotate

【図20】
image rotate

【図21】
image rotate


【公開番号】特開2012−78921(P2012−78921A)
【公開日】平成24年4月19日(2012.4.19)
【国際特許分類】
【出願番号】特願2010−221345(P2010−221345)
【出願日】平成22年9月30日(2010.9.30)
【出願人】(000003078)株式会社東芝 (54,554)
【出願人】(301063496)東芝ソリューション株式会社 (1,478)