説明

メッセージ配信システム及びメッセージ配信方法

【課題】 メッセージ配信システムにおけるサーバのリソースを効率的に利用可能とし、配信先への安定したメッセージ配信を実現する。
【解決手段】 メッセージ配信システムは、第1及び第2の計算機100、200、及び記憶装置105を備える。第1の計算機100は、配信元計算機400から送られてくるメッセージを受信し、配信先計算機500の少なくとも一部にメッセージを配信してその配信状況を取得する。配信状況に基づいて、メッセージの配信が遅延している低速配信先計算機が存在するか否かが判別される。低速配信先計算機が存在する場合、第1の計算機100から第2の計算機200に低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、第2の計算機200により低速配信先計算機へのメッセージの配信を引き継いで、低速配信先計算機へのメッセージ配信を行う。

【発明の詳細な説明】
【技術分野】
【0001】
本発明は、ネットワークを介したメッセージの配信方法、およびそのためのシステムに係り、特に、配信元の計算機からネットワークを介して複数の配信先計算機にメッセージを配信するための方法及び計算機システムに関する。
【背景技術】
【0002】
近年、ユーザにより求められる情報量やユーザ数が増大するにつれ、大量のデータを複数の宛先に配信する情報システムへの需要が高まっている。特に、金融分野など、社会インフラを担う情報システムにおいては、高速かつ高信頼(データ損失無し)なデータ配信を実現することが求められる。
【0003】
このような要求を満たす情報システムを実現するためには、配信先サーバ(コンシューマサーバ)へのデータ(メッセージ)の配信に、複数台の配信サーバ(ブローカサーバ)を用いたシステム構成が一般的に用いられる。こうした情報システムでは、複数台のコンシューマサーバへのメッセージの配信を複数台のブローカサーバで行うことで、メッセージの配信に伴う負荷を分散して高速性を実現している。また、ブローカサーバにおいて配信するメッセージの複製を作成して多重化したり、ストレージ装置等の不揮発性の記憶装置に保存して永続化したりすることによって高信頼性が実現されている。
【0004】
高速性については安定的な性能が求められるため、ブローカサーバは、処理負荷やコンシューマサーバ数に応じて台数を柔軟に増減させることが求められる。加えて、台数の変更を柔軟に行うためにも、ブローカサーバ1台あたりの構築コストを抑えることも求められる。このように、安定的な性能とコスト省力化を実現するためには、ブローカサーバの効率的なリソース活用が必要である。
【0005】
高速化及び高信頼化を効率的なリソース活用によって実現する従来技術として、特許文献1と特許文献2に開示された技術がある。特許文献1には、メッセージの永続化を専用に行うストレージ装置を用いることで、ブローカサーバでのメッセージ配信処理とメッセージの永続化処理を別サーバで実施し、ストレージ装置を用いた永続化処理によるメッセージ配信への性能影響を抑制する技術が開示される。また、複数台のブローカサーバにおけるストレージ装置を集約することでコスト抑制を実現する特長も併せ持つ。特許文献2では、ブローカサーバを多階層に接続し、メッセージの永続化を上位階層のブローカサーバに集約することで、下位階層のブローカサーバでのメッセージ永続化処理を削減し、処理コストの低減とストレージ容量削減を図ることが開示される。
【先行技術文献】
【特許文献】
【0006】
【特許文献1】特表2008−527538号公報
【特許文献2】米国公開 2006/0248219号公報
【発明の概要】
【発明が解決しようとする課題】
【0007】
複数台のコンシューマサーバへの配信を行うシステムにおいては、コンシューマサーバ側での処理の状況によりメッセージを受け取れず、ブローカサーバからのメッセージの配信が遅延するコンシューマサーバが発生する(このように配信遅延が生じているコンシューマサーバを以下では便宜上低速コンシューマサーバと呼ぶ)。低速コンシューマサーバが発生すると、ブローカサーバのリソース利用量が増えるため、他の正常なコンシューマサーバへの配信に対する遅延が発生する。また、正常なコンシューマサーバへの配信処理について安定的な高速性能を実現するためには、低速コンシューマサーバの発生によるリソース利用量の増大を想定した十分なリソースを備えておく必要があり、ブローカサーバの構成コストが増えてしまう。
【0008】
例えば、低速コンシューマサーバが発生すると、そのコンシューマサーバにメッセージを配信するブローカサーバ上でメッセージを保持するためにリソースを余剰に消費することになる。通常、ブローカサーバは、メッセージをメモリ上のバッファに格納する。高信頼性のために、メッセージは、未配信のコンシューマサーバが1台でもある限りバッファに保持される。このため、1台のコンシューマサーバで配信遅延が生じると、バッファはそのコンシューマサーバへの未配信メッセージの保持のために消費される。バッファ容量には限界があるため、未配信メッセージ数が増えるとバッファ溢れが発生してしまう。
【0009】
一般に、バッファを溢れたメッセージについては、それをストレージ装置(あるいはそれに準ずる記憶装置)に一時的に保持することで対処する方法が用いられる。このようにバッファを溢れたメッセージをストレージ装置に保持するにあたっては、メッセージ格納のための書き込み処理と、メッセージ利用のための読み出し処理を行う必要が生じる。このためバッファ溢れ時には、これらの処理のためにリソースが消費され、正常配信時に比べ、メッセージの配信処理に利用可能なリソース量が減少し、配信性能に影響が生じる恐れがある。
【0010】
複数台のブローカサーバそれぞれにおいて低速コンシューマサーバが発生した場合には、各ブローカサーバにおいて低速コンシューマサーバに対処するための処理負荷がかかることになる。さらに、各ブローカサーバがバッファ上に未配信メッセージを保持することになるため、複数のブローカサーバでメッセージを重複して保持する可能性が生じる。メッセージを重複して保持することによるリソースの浪費は、情報システム全体としての処理性能の低下要因となり、複数台のブローカサーバによる負荷の分散が有効に機能しなくなる。
【0011】
さらに、複数台のブローカサーバでバッファ溢れに対処できるようにするため、各ブローカサーバにメッセージ永続化のためのストレージ装置を設け、それぞれのブローカサーバがメッセージを保持すると、メッセージの再送処理を効率的に行うことができる。しかし、大量で多様なメッセージを処理するためには、大容量を備えたストレージ装置が必要となる。これらのストレージ装置はバッファが溢れない限り利用されないため、すべてのブローカサーバにこのような大容量のストレージ装置を設けることはコスト抑制の観点から望ましくない。複数台のブローカサーバで1台のストレージ装置を共有し、メッセージの重複保存を避けることでコストを抑制することが可能である。しかし、ストレージ装置を共有する場合には、ストレージ装置への書き込みの排他処理等が発生してメッセージ永続化処理負荷がさらに増大してしまう。
【0012】
上述した従来技術を用いることで、低速コンシューマサーバの発生時におけるメッセージの永続化箇所を集約することができるため、ストレージ装置のコスト抑制が実現される。しかしながら、これら従来技術では、低速コンシューマサーバの発生による配信処理の負荷増大の影響が、低速コンシューマサーバにメッセージを配信するすべてのブローカサーバに及ぶため、低速コンシューマサーバの発生に伴い他の正常コンシューマサーバへの配信処理が遅延するという問題には対処することができない。
【0013】
上述したような従来技術における問題点に鑑み、本発明の目的は、低速コンシューマサーバが発生した場合であっても、正常なコンシューマサーバへの安定的な配信性能を実現することにある。
【課題を解決するための手段】
【0014】
上記目的を達成するために、本発明の一つの観点から、第1及び第2の計算機と、第1及び第2の計算機からアクセス可能な記憶装置とを備え、配信元から送られるメッセージを受信して、複数の配信先計算機にメッセージを配信するメッセージ配信システムにおけるメッセージの配信方法が提供される。当該メッセージの配信方法は、好ましくは、第1の計算機により配信元から送られてくるメッセージを受信して、複数の配信先計算機の少なくとも一部の配信先計算機にメッセージを配信するとともに、メッセージの配信状況を取得し、配信状況に基づいて、配信先計算機の中に配信が遅延している低速配信先計算機が存在するか否かを判別する。低速配信先計算機が存在する場合、第1の計算機から第2の計算機に低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、第2の計算機により低速配信先計算機へのメッセージの配信を引き継ぎ、低速配信先計算機にメッセージを配信する。
【0015】
より好ましくは、第2の計算機においてメッセージを保持するバッファの使用率が所定の値を超えた場合、メッセージの少なくとも一部が第2の計算機から記憶装置に格納される。
【0016】
さらに好ましくは、第2の計算機により、第2の計算機がメッセージを配信している配信先計算機の中でメッセージの配信に遅延の生じていない正常配信先計算機を判別する。正常配信先計算機が存在する場合、第2の計算機から第1の計算機に正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、第1の計算機により正常配信先計算機へのメッセージの配信を引き継ぎ、正常配信先計算機にメッセージを配信する。
【0017】
本発明の別の観点によれば、複数の配信先計算機にネットワークを介してメッセージを配信するメッセージ配信システムが提供される。好ましい一つの態様において、メッセージ配信システムは、第1の計算機と第2の計算機とを含む。第1の計算機は、受信したメッセージを保持する第1のバッファと、複数の配信先計算機の各々へのメッセージの配信を担当する計算機を指定する第1の担当情報と、第1の担当情報で自計算機が担当として割り当てられている第1の担当配信先計算機へのメッセージの配信状況を示す第1の配信状況情報とを保持するメモリ、配信元から送られてくるメッセージを受信して第1のバッファに保持する第1のメッセージ管理手段、第1の担当配信先計算機にメッセージを配信して第1の配信状況情報を更新する第1の配信手段、及び、第1の配信状況情報に基づいて、第1の担当配信先計算機の中に配信が遅延している低速配信先計算機が存在するか否かを判別し、低速配信先計算機が存在する場合、低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する第1の切替手段を有する。また、第2の計算機は、受信したメッセージを保持する第2のバッファ、複数の配信先計算機の各々へのメッセージの配信を担当する計算機を指定する第2の担当情報、及び、第2の担当情報で自計算機が担当として割り当てられている第2の担当配信先計算機へのメッセージの配信状況を示す第2の配信状況情報を保持するメモリ、配信元から送られてくるメッセージを受信して第2のバッファに保持する第2のメッセージ管理手段、及び第1の計算機からの要求に応じて低速配信先計算機を第2の担当配信先計算機に含まれるよう第2の担当情報を更新し、第2の担当配信先計算機にメッセージを配信して第2の配信状況情報を更新する第2の配信手段を有する。
【0018】
好ましくは、第2のメッセージ管理部は、第2のバッファの使用状況を監視し、バッファの使用率が所定の使用率を超えた場合、少なくとも一部のメッセージをストレージ装置に移動する。
【0019】
より好ましくは、第2の計算機はさらに、第2の配信状況情報に基づいて、メッセージの配信が正常に行われている正常配信先計算機の有無を判別し、正常配信先計算機が存在する場合、正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する第2の切替手段を有し、第1の配信手段は、第2の計算機からの要求に応じて正常配信先計算機を第1の担当配信先計算機に含まれるよう第1の担当情報を更新する。
【0020】
本発明のさらに別の観点によれば、上述したようなメッセージの配信システムにおいて用いられるメッセージ配信計算機が提供される。当該計算機は、好ましくは、メッセージを保持するバッファ、複数の配信先計算機の中でメッセージの配信が遅延している低速配信先計算機へのメッセージの配信を担当する計算機であるか否かを示す計算機情報、複数の配信先計算機の各々へメッセージの配信を担当する計算機を指定する担当情報、及び、担当情報で自計算機が担当として割り当てられている配信先計算機へのメッセージの配信状況情報を保持するメモリと、送られてくるメッセージを受信してバッファに格納するメッセージ管理手段と、担当情報に基づいてメッセージ管理手段が受信したメッセージを自計算機が担当する配信先計算機に配信して配信状況情報を更新する配信処理手段と、自計算機が低速配信先計算機へのメッセージ配信を担当しない場合、配信状況情報に基づいて、担当する配信先計算機の中に存在する低速配信先計算機を検知して、検知された低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、自計算機が低速配信先計算機へのメッセージ配信を担当する場合、配信状況情報に基づいて、担当する配信先計算機の中に存在する配信が遅延していない正常配信先サーバを検知して、当該正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する切替手段とを有する。
【発明の効果】
【0021】
低速コンシューマサーバの発生時においても、ブローカサーバのリソースを効率的に利用することができ、コンシューマサーバへの安定したメッセージの配信を実現することができる。
【図面の簡単な説明】
【0022】
【図1】本発明の第1の実施形態における計算機システムのハードウェア構成を示す簡略ブロック図である。
【図2】ソフトウェアモジュールを主としたブローカサーバの構成を示す簡略ブロック図である。
【図3】ブローカサーバ表のテーブル構成を示す概念図である。
【図4】配信担当表のテーブル構成を示す概念図である。
【図5】配信状況表のテーブル構成を示す概念図である。
【図6】判定しきい値表のテーブル構成を示す概念図である。
【図7】正常時におけるメッセージ配信処理を示すシーケンス図である。
【図8】メッセージ管理部によるメッセージ登録処理の手順を示すフローチャートである。
【図9】配信処理部によるメッセージ配信処理の流れを示すフローチャートである。
【図10】メッセージ受取確認応答時に、配信処理部により行われる処理の手順を示すフローチャートである。
【図11】正常配信用ブローカサーバにおけるメッセージ削除処理の手順を示すフローチャートである。
【図12】メッセージ管理部によるメッセージ取得処理の手順を示すフローチャートである。
【図13】メッセージ配信担当ブローカサーバの切替処理を示すシーケンス図である。
【図14】正常配信用ブローカサーバにおける低速コンシューマサーバ検知処理、及び切替処理の手順を示すフローチャートである。
【図15】正常配信用ブローカサーバの配信処理部により実施される配信担当解除処理の手順を示すフローチャートである。
【図16】低速コンシューマサーバ専用ブローカサーバの配信処理部により実施される配信担当登録処理の手順を示すフローチャートである。
【図17】正常コンシューマサーバの配信担当ブローカサーバの切替処理を示すシーケンス図である。
【図18】低速コンシューマ専用ブローカサーバの切替処理部による正常コンシューマサーバ検知処理と配信担当の切替処理の手順を示すフローチャートである。
【図19】フェールオーバ処理の手順を示したフローチャートである。
【図20】第2の実施形態における判定しきい値表を示す概念図である。
【図21】正常配信用ブローカサーバの切替処理部において、低速コンシューマサーバへのメッセージ配信を担当するブローカサーバの切替処理を示すシーケンス図である。
【図22】正常配信用ブローカサーバの切替処理部により実施される低速コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。
【図23】正常コンシューマサーバの配信担当ブローカサーバの切替処理を示すシーケンス図である。
【図24】切替処理部による正常コンシューマサーバ検知処理と配信担当の切替処理の手順を示すフローチャートである。
【図25】配信処理部によるメッセージ配信処理を示すフローチャートである。
【図26】第3の実施形態における計算機システムのハードウェア構成を示す簡略化されたブロック図である。
【図27】判定しきい値表の概念図である。
【図28】低速コンシューマサーバの配信担当切替処理、低速コンシューマサーバへのメッセージ配信処理、及びメッセージ削除処理を示すシーケンス図である。
【図29】切替対象コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。
【図30】ブローカサーバ間での配信担当の切戻処理を示すシーケンス図である。
【図31】切替処理部による切戻コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。
【発明を実施するための形態】
【0023】
<第1の実施形態>
図1は、本発明が適用された計算機システムの一実施形態における簡略なハードウェア構成を示すブロック図である。
【0024】
図1において、メッセージ配信システム10は、2台のブローカサーバ100とブローカサーバ200、ブローカサーバ100、200によって利用されるストレージ装置105を含んで構成される。メッセージ配信システム10(ブローカサーバ100、200)は、ネットワーク106を介してメッセージの配信元であるプロデューササーバ400、およびメッセージの配信先となる複数のコンシューマサーバ500と接続されている。プロデューササーバ400、コンシューマサーバ500は、任意のブローカサーバ100、200と通信することができる。またブローカサーバ100とブローカサーバ200との間においてもネットワーク106を介して通信することが可能である。ここでは、プロデューササーバ400として2台のプロデューササーバが示されているが、プロデューササーバ400の数はこれに限るものではなく、1台、あるいは3台以上であってよい。
【0025】
ブローカサーバ100は、演算処理を行うCPU101、CPU101により実行されるプログラムや各種処理に使われるデータを格納するためのメモリ102、ネットワーク106を介して他のサーバと通信を行うための通信インタフェース103、及びストレージ装置105との間でデータの入出力を行うI/Oインタフェース104を備える。ブローカサーバ200もブローカサーバ100と同様に、CPU201、メモリ202、通信インタフェース203、及びI/Oインタフェース204を有して構成される。プロデューササーバ400から受信し、コンシューマサーバ500に配信されるメッセージは、メモリ102、202に確保されたバッファに格納される。
【0026】
本実施形態において、ブローカサーバ100、200は、それぞれに割り当てられたコンシューマサーバ500に対して、プロデューササーバ400から送られてくるメッセージを配信する。ブローカサーバ100に割り当てられたコンシューマサーバ500の中で、メッセージの配信が遅延する低速コンシューマサーバが発生した場合、そのようなサーバへのメッセージの配信は、ブローカサーバ200に切り替えられる。つまり、ブローカサーバ100は、正常なコンシューマサーバへの配信を担当するブローカサーバ(正常配信用ブローカサーバ)として、また、ブローカサーバ200は、低速コンシューマサーバへの配信を担当する専用ブローカサーバ(低速コンシューマサーバ専用ブローカサーバ)として機能する。
【0027】
通常、プロデューササーバ400により配信されるメッセージは、ブローカサーバ100により受信される。ブローカサーバ100は、プロデューササーバ400よりメッセージを受信すると、ブローカサーバ200に対して受信したメッセージの複製を送りメッセージの二重化を図る。ブローカサーバ100とブローカサーバ200は、上述したように、それぞれに割り当てられたメッセージ配信先となる複数台のコンシューマサーバ500に対してプロデューササーバ400により配信されたメッセージを送付する。このように、各ブローカサーバ100、200は、通常、あらかじめそれぞれに割り当てられたコンシューマサーバ500に対してメッセージを配信する両現用系として動作する。
【0028】
ブローカサーバ100とブローカサーバ200は、互いにネットワークを介して定期的にそれぞれの稼動状態を通知しあってお互いを監視(ハートビート監視)し、他のブローカサーバの障害を検知する機能を持つ。一方のブローカサーバに障害が生じた時には、他方のブローカサーバで障害が発生したブローカサーバの配信処理を引き継ぐことで、障害時においても継続的なメッセージ配信を行う(このような構成をハイアベイラビリティ(HA)構成と呼ぶ)。また、上述のとおり両ブローカサーバ間でメッセージを二重化し、配信が完了するまで保持することで高信頼性が実現される。加えて、メッセージを二重化できる容量には限界があるため、ブローカサーバ100とブローカサーバ200は、メッセージをストレージ装置105上に永続化する機能を備える。
【0029】
正常配信用ブローカサーバ100は、自身が配信を担当しているコンシューマサーバ500へのメッセージの配信遅延を検知する機能(低速コンシューマサーバ検知機能)を持つ。ブローカサーバ100において低速コンシューマサーバが発生した場合、正常配信用ブローカサーバ100は、低速コンシューマサーバへのメッセージの配信元を低速コンシューマサーバ専用ブローカサーバ200に切り替える。低速コンシューマサーバへの配信担当を随時低速コンシューマサーバ専用ブローカサーバ200に切り替えることにより、低速コンシューマサーバへのメッセージの配信処理を低速コンシューマサーバ専用ブローカサーバ200に集約する。
【0030】
一方、低速コンシューマサーバ専用ブローカサーバ200は、自身が配信を担当しているコンシューマサーバ500へのメッセージの配信が正常であることを検知する機能(正常コンシューマサーバ検知機能)を持つ。低速コンシューマサーバ専用ブローカサーバ200に低速コンシューマサーバへのメッセージ配信処理が移るにつれて、低速コンシューマサーバ専用ブローカサーバ200のリソース利用量が増大する(切り替えによる配信負荷の偏り)。これをこのまま放置すると、低速コンシューマサーバ専用ブローカサーバ200が配信を担当する正常コンシューマサーバへのメッセージの配信性能に影響を与えてしまうことになる。そこで、低速コンシューマサーバ専用ブローカサーバ200のリソース利用量が増大した時には、低速コンシューマサーバ専用ブローカサーバ200が正常コンシューマサーバとして検知したコンシューマサーバ500へのメッセージの配信処理を、正常配信用ブローカサーバ100に切り替えることで、上記の配信負荷の偏りの問題を解決する。
【0031】
本実施形態において、ストレージ装置105は、通常、ブローカサーバ200のI/Oインタフェース204に接続され、低速コンシューマサーバ専用ブローカサーバ(ブローカサーバ200)によりメッセージを永続化するために利用される。ブローカサーバ200に障害が発生した場合には、ブローカサーバ100によりメッセージ配信処理が引き継がれるため、ストレージ装置105は、ブローカサーバ100のI/Oインタフェース104に接続され、ブローカサーバ100によりメッセージを永続化するために利用される。
【0032】
図2は、ソフトウェアモジュールを主としたブローカサーバの構成を示す簡略ブロック図である。ここではブローカサーバ100を例に説明するが、ブローカサーバ200も同様に構成される。したがって、特に記載のない場合ブローカサーバ200についてもその説明を流用することができる。
【0033】
本実施形態において、ブローカサーバ100は、データ送受信部111、メッセージ管理部112、配信処理部113、切替処理部114、管理インタフェース115を有する。これらの各処理機能部は、ソフトウェアモジュールの形で提供され、メモリ102上に配置される。これらのプログラムモジュールをCPU101で実行することにより、後述する各種の機能が実現される。
【0034】
メモリ102には、プロデューササーバ400から配信され、コンシューマサーバ500に配信するメッセージを一時的に保持しておくメッセージバッファ116、ブローカサーバ情報を管理するブローカサーバ表610、コンシューマサーバ500の情報とメッセージの配信を担当するブローカサーバの対応関係を管理する配信担当表620、配信メッセージを一意に識別するメッセージIDを取得するために使われるメッセージIDカウンタ630、各ブローカサーバが配信を担当しているコンシューマサーバ500に対するメッセージ配信状況を表す配信状況表640、及び低速あるいは正常コンシューマサーバ検知判定と検知処理開始判定に用いられるしきい値のセットが設定される判定しきい値表710が保持される。ブローカサーバ表610、配信担当表620、メッセージIDカウンタ630、配信状況表640、及び判定しきい値表710は、メッセージの配信処理および配信を担当するブローカサーバの切替処理に用いられる。
【0035】
本実施形態では、配信メッセージに付加するメッセージIDとして、プロデューササーバ400からの配信順に1ずつ大きくなる通番を用いる。このため、メッセージIDカウンタ630には、プロデューササーバから受け取ったメッセージにメッセージIDを付加する都度値がインクリメント(1加算)されるカウント値が保持される。メッセージIDは、メッセージを一意に特定できる識別子であれば良く、本実施形態におけるようなカウンタ値以外の方法により決定してもよい。その他の各表に保持される情報については後述する。
【0036】
ブローカサーバ100は、データ送受信部111を介して、他方のブローカサーバ200、プロデューササーバ400、コンシューマサーバ500とデータ通信を行う。以降記載する処理手順等の説明においては簡単のためにデータ送受信部111の処理を割愛するが、外部との通信は、データ送受信部111を経由して行われるものとする。
【0037】
ブローカサーバ100は、メッセージ配信を実施するためメッセージ管理部112と配信処理部113を有する。ブローカサーバ100では、プロデューササーバ400からのメッセージをメッセージ管理部112において受信する。メッセージ管理部112は、受信したメッセージをメッセージバッファ116に格納する。メッセージバッファ116に格納されたメッセージは、配信処理部113によりコンシューマサーバ500に送信される。また、メッセージ管理部112は、自ブローカサーバが低速コンシューマ専用ブローカサーバとして機能しているときにメッセージバッファ116がバッファ溢れを起こしそうになった場合、メッセージバッファ116に保持されている一部のメッセージをストレージ装置105に移動する永続化処理を実施する。
【0038】
切替処理部114は、ブローカサーバ100の状態を監視して低速コンシューマサーバの発生を検知し、低速コンシューマサーバへのメッセージの配信を担当するブローカサーバを切り替える。ブローカサーバ100とブローカサーバ200の間で配信担当を切り替える場合、ブローカサーバ100の切替処理部114は、ブローカサーバ200の切替処理部との間で通信して切替処理を行う。本実施形態においては、正常配信用ブローカサーバ100の切替処理部114は、メッセージバッファ116が溢れる前に低速コンシューマサーバの発生を検知し、その配信担当をブローカサーバ200に切り替える。このためメッセージバッファ溢れはブローカサーバ200においてのみ発生する。故に、ストレージ装置105へのメッセージ永続化処理はブローカサーバ200のみが実施する。
【0039】
管理インタフェース115は、切替処理部114における切替検知開始と切替対象コンシューマサーバの検知の基準となるパラメータの設定機能を提供する。管理インタフェース115は、GUI(Graphical User Interface)であることが望ましいが、CUI(Character User Interface)であってもよい。また、管理インタフェース115は、一方のブローカサーバのみに提供され、ブローカサーバ間での通信により、他方のブローカサーバに設定を反映するように構成されてもよい。
【0040】
以降、各ブローカサーバの機能部111〜116について、ブローカサーバ100の機能部とブローカサーバ200の機能部とを区別する場合、ブローカサーバ100の機能部については、メッセージ管理部112−1、配信処理部113−1のように添え字“−1”を参照番号に付し、ブローカサーバの機能部については、メッセージ管理部112−2、配信処理部113−2のように添え字“−2”を参照番号に付して区別する。
【0041】
なお、ブローカサーバ表610、配信担当表620、メッセージIDカウンタ630、配信状況表640、及び判定しきい値表710の各テーブルは、それぞれのブローカサーバのメモリ102および202に保持されているが、ブローカサーバ100及び200の双方から共有可能なメモリ領域、あるいは記憶装置を設け、両者でこれらの情報を共用するようにしてもよい。
【0042】
図3は、ブローカサーバ表のテーブル構成を示す概念図である。ブローカサーバ表610は、任意のブローカサーバが他のブローカサーバの情報を確認するために用いられる。このため、ブローカサーバ100は、ブローカサーバ200との間でブローカサーバ表610に設定される情報の同期をとって両者が保持するブローカサーバ表の一貫性を維持している。このような情報の同期については公知の技術を利用することができるため、ここではその説明を省略する。なお、ブローカサーバ間で同期を取ることに代えて、ブローカサーバ100及びブローカサーバ200の双方から参照可能な記憶領域を設け、両者でブローカサーバ管理表610を共有するように構成してもよい。
【0043】
ブローカサーバ表610は、ブローカサーバ名611、IPアドレス612、種別613を含む。ブローカサーバ名611は、メッセージ配信システム10に含まれるブローカサーバ100、200に与えられたサーバ名である。IPアドレス612は、ブローカサーバ名611で特定されるブローカサーバに割り当てられているIPアドレスである。種別613は、ブローカサーバ100、200の配信担当種別を表し、本実施形態においては、「正常配信用」あるいは「低速コンシューマサーバ専用」の2種類がある。ブローカサーバ100、200は、自身の種別を確認することで、配信担当種別に応じた処理を選択することが可能である。また、他のブローカサーバの種別を確認することで、コンシューマサーバ担当の切替先となるブローカサーバを識別することができる。なお、本実施形態では、ブローカサーバ100、200のアドレスとしてIPアドレスを用いているが、メッセージの配信先となるアドレスを特定できる識別子であれば、これ以外の異なる情報を用いてもよい。
【0044】
図4は、配信担当表のテーブル構成を示す概念図である。配信担当表620は、各ブローカサーバ100、200によって管理され、各ブローカサーバ100、200が、自身の配信担当コンシューマサーバ500を確認するために用いられる。また、配信担当外のコンシューマサーバ500からメッセージの配信要求等があった場合に、担当のブローカサーバを特定し、その要求をリダイレクトする際に利用してもよい。そのため配信担当表620は、配信担当外のコンシューマサーバ500に関する情報を含んでもよい。
【0045】
配信担当表620は、コンシューマサーバ名621、IPアドレス622、担当ブローカサーバ名623を含む。コンシューマサーバ名621は、メッセージ配信システム10が配信を受け持っているコンシューマサーバ500のサーバ名である。IPアドレス622は、コンシューマサーバ名621で特定されるコンシューマサーバ500のIPアドレスである。担当ブローカサーバ名623は、各コンシューマサーバ500への配信を担当するブローカサーバ100、200のサーバ名である。担当ブローカサーバ名623は、ブローカサーバ表610におけるブローカサーバ名611に対応したものである。
【0046】
図5は、配信状況表のテーブル構成を示す概念図である。配信状況表640は、コンシューマサーバ名641、配信済メッセージID642、最終配信時刻643、状態644を含み、各ブローカサーバ100、200によって管理される。
【0047】
コンシューマサーバ名641は、各ブローカサーバ100、200が配信を担当するコンシューマサーバ500のサーバ名であり、配信担当表620におけるコンシューマサーバ名621に対応したものである。配信済メッセージID642は、コンシューマサーバ500への配信が完了した末尾メッセージのメッセージIDである。ブローカサーバ100、200は、コンシューマサーバ500からメッセージの受取確認応答を得る度に配信完了済メッセージID642をインクリメントする。
【0048】
ここで、本実施形態におけるメッセージ配信形態について説明する。本実施形態において、各コンシューマサーバ500は、ブローカサーバ100に届いたメッセージをすべて受け取り、各コンシューマサーバ500が受け取るメッセージの配信順序が保たれる。このため、各コンシューマサーバ500へのメッセージ配信処理において、ブローカサーバ100、200は、コンシューマサーバ500からのメッセージの受取確認応答を受け取った後、次のメッセージを配信する。なお、これはメッセージの配信形態の一例であり、本実施形態に示す配信形態以外の方法によりメッセージを配信するようにしてもよい。
【0049】
最終配信時刻643は、ブローカサーバ100、200がコンシューマサーバに対してメッセージ送信を行った時刻を示す。ブローカサーバ100、200は、メッセージ送信の都度、最終配信時刻643を更新する。最終配信時刻643は、コンシューマサーバ500への再送処理を行う基準として用いられる。したがって、後述する再送処理を実施する上での基準となりうる情報であれば、これを最終配信時刻の代わりに用いてもよい。
【0050】
状態644は、コンシューマサーバ500の状態を示す情報である。コンシューマサーバ500の状態には、「正常待機」、「Ack待ち」の2種類がある。ブローカサーバ100、200は、状態644が「正常待機」であるとき、コンシューマサーバ500に次のメッセージを送り、状態644を「Ack待ち」に更新する。また、ブローカサーバ100、200は、コンシューマサーバ500からメッセージの受取確認応答を得たとき、状態644を「正常待機」に更新する。状態644が「Ack待ち」であるとき、ブローカサーバ100、200は、メッセージの再送処理を行う。
【0051】
図6は、判定しきい値表のテーブル構成を示す概念図である。判定しきい値表710は、バッファ利用率しきい値711、メッセージ切替オフセット712、及びリソース利用率しきい値713を含む。リソース利用率しきい値713は副項目として、CPU利用率714、ネットワーク利用率715、メモリ利用率716、及び判定方式717を含む。
【0052】
判定しきい値表710は、ブローカサーバ100における低速コンシューマサーバ検知処理時、あるいはブローカサーバ200における正常コンシューマサーバ検知処理に用いられる。本実施例において、判定しきい値表710の各値は、管理インタフェース115を介してオペレータにより入力された値が設定されるものとするが、配信遅延の状態に応じてこれらしきい値を計算して設定し、あるいは、予め定められた値が設定されていてもよい。
【0053】
バッファ利用率しきい値711は、ブローカサーバ100における低速コンシューマサーバ検知処理の開始判定基準となるメッセージバッファの利用率である。メッセージ切替オフセット712は、各コンシューマサーバ500への配信進度(遅延または正常)を判定する際に基準となるID値に幅を持たせるための値であり、ブローカサーバ100における低速コンシューマサーバ判定と、ブローカサーバ200における正常コンシューマサーバ判定の両方に用いられる。
【0054】
リソース利用率しきい値713は、ブローカサーバ200における負荷増大により、正常コンシューマサーバへのメッセージの配信をブローカサーバ100に切り替える必要があるかどうかを判断するための基準である。ブローカサーバ200のリソース利用率がリソース利用率しきい値713を超えた場合に、正常コンシューマサーバの検知処理を開始する。判定方式717は状態「AND」と「OR」の2種類があり、CPU利用率714、ネットワーク利用率715、メモリ利用率716を用いた判定の論理演算に利用される。
【0055】
図7は、正常時におけるメッセージ配信処理を示すシーケンス図である。
【0056】
処理S801〜S813は、ブローカサーバ100がプロデューササーバ400からメッセージを受信した際のメッセージ登録処理シーケンスである。
【0057】
ブローカサーバ100のメッセージ管理部112−1は、プロデューササーバ400から新規メッセージを受け取ると(S801)、切替処理部114−1に対してメッセージ受取を通知する(S802)。切替処理部114−1は、メッセージ受取通知を得ると、メッセージ管理部112−1に対して通知受取完了を返す(S803)。その後ブローカサーバ100は、低速コンシューマサーバ発生検知処理を開始する(S804)。
【0058】
メッセージ管理部112−1は、切替処理部114−1から通知受取完了通知を受け取った後、受信したメッセージにメッセージIDを付加し(S805)、このメッセージをメッセージバッファ116−1に格納する(S806)。さらに、メッセージ管理部112−1は、ブローカサーバ200のメッセージ管理部112−1に対して受信したメッセージの複製を送付し、これを登録するように要求する。本実施形態ではこれ以降の処理によりメッセージの二重化を実現している(S807)。
【0059】
ブローカサーバ200のメッセージ管理部112−2は、ブローカサーバ100からメッセージ複製登録要求を受けると、切替処理部114−2にメッセージ受取を通知する(S808)。切替処理部114−2は、メッセージ受取通知を得ると、メッセージ管理部112−1に対して通知受取完了を返し(S809)、ブローカサーバ100への切戻対象となる正常コンシューマサーバの検知処理を開始する(S810)。ここで、S807におけるメッセージ複製登録要求処理と、S801における新規メッセージの受取処理は、説明を理解し易くするために、別の処理名としているが、後述するように実質的に同様の処理によって実現される。
【0060】
メッセージ管理部112−2は、切替処理部114−2からの受取完了通知後に、受信したメッセージの複製をメッセージバッファ116−2に格納する(S811)。ここで、ブローカサーバ200は、その役割上、配信が遅延したメッセージを保持しておく必要があり、保持するメッセージ量の増大に伴って、メッセージバッファ116−2の容量が不足する場合がある。このような場合、メッセージ管理部112−2は、メッセージバッファ116−2から溢れたメッセージをストレージ装置105に書き込むメッセージ永続化処理を行う(S812)。
【0061】
以上のメッセージ保持処理が終了した後、ブローカサーバ200のメッセージ管理部112−2は、ブローカサーバ100のメッセージ管理部112−1にメッセージ登録完了を通知してメッセージ登録処理を完了する(S813)。
【0062】
処理S814〜S827は、ブローカサーバ100におけるコンシューマサーバ500へのメッセージ配信処理シーケンスである。
【0063】
ブローカサーバ100の配信処理部113−1は、配信担当表620を参照し、自身が配信を担当するコンシューマサーバ500の情報を取得する(S814)。配信処理部113−1は、情報を取得したコンシューマサーバ500について、配信状況表640からメッセージの配信状況を取得する(S815)。取得した配信状況を基に配信処理部113−1は、メッセージ管理部112−1に対して、メッセージIDを指定してメッセージの取得を要求する(S816)。メッセージ管理部112−1は、メッセージの取得要求を受けると、指定されたメッセージIDを持つメッセージをメッセージバッファ116−1から読み出し(S817)、配信処理部113−2にそのメッセージを送る(S818)。
【0064】
配信処理部113−1は、取得したメッセージをコンシューマサーバ500に対して送信する(S819)。コンシューマサーバ500からメッセージの受取確認応答(Ack)が返ってくると(S820)、配信処理部113−1は、配信状況表640を更新する(S821)。配信処理部113−1は、さらに配信状況表640を参照し、その時点で削除可能なメッセージのメッセージIDを取得し(S822)、メッセージ管理部112−1に取得した削除可能なメッセージのメッセージIDを通知する(S823)。
【0065】
メッセージ管理部112−1は、通知されたメッセージIDのメッセージがメッセージバッファ116−1に残っている場合、バッファ溢れが生じないようそのメッセージを削除するために、ブローカサーバ200のメッセージ管理部112−2に対して、そのメッセージの削除可否を確認する。本実施形態においては、ブローカサーバ200との間でメッセージを二重化し、また、ブローカサーバ200においてメッセージを永続化することで信頼性を保証している。このため、ブローカサーバ100がメッセージを削除する際には、ブローカサーバ200側でもメッセージの配信が終わっているか、あるいはメッセージの永続化が済んでいるか確認する必要がある。
【0066】
ブローカサーバ200のメッセージ管理部112−2は、ブローカサーバ100のメッセージ管理部112−1から確認要求を受けると、自身においてすべてのコンシューマサーバ500へのメッセージの配信が完了しているか、あるいはメッセージがストレージ装置105に永続化されているか調べ、削除の可否を確認する(S825)。メッセージの削除が可能であることが確認できた場合、メッセージ管理部112−2は、ブローカサーバ100のメッセージ管理部112−1にメッセージの削除許可を通知する(S826)。メッセージ管理部112−1は、削除許可通知を受け取った後、メッセージバッファ116−1から該当するメッセージを削除する(S827)。
【0067】
図示していないが、上述した処理のうちS814〜S827の処理は、ブローカサーバ200側でも同様におこなわれ、ブローカサーバ200が担当するコンシューマサーバ500にメッセージが配信される。なお、ブローカサーバ200側でのメッセージの削除処理においては、他のブローカサーバへの削除の確認、即ち、S824〜S826の処理は不要となる。また、永続化が済んでいないメッセージの削除処理は、自身によるメッセージ配信がすべて完了した時点、及びブローカサーバ100からの削除確認要求の受信時点の遅い方を契機に行なわれる。
【0068】
ここでは、ブローカサーバが配信を担当するある1台のコンシューマサーバに対する配信処理を例に説明したが、ブローカサーバは、S814〜S827の配信処理をブローカサーバが担当するすべてのコンシューマサーバに対して繰り返し実施する。また、ここでは、メッセージの登録処理と配信処理を一連の処理として説明しているが、メッセージの配信処理を登録処理とは非同期に行うようにしてもよい。同様に、メッセージ削除処理(S822〜S827)をメッセージ配信処理の中で行っているが、S814〜S821の処理とS822〜S827の処理を分離し、それぞれを互いに独立して実行するようにしてもかまわない。
【0069】
図8は、メッセージ管理部によるメッセージ登録処理の手順を示すフローチャートである。図8に示す処理は、図7のシーケンス図におけるS801〜S813の処理に対応している。
【0070】
メッセージ管理部112は、プロデューササーバ400から配信されたメッセージ、あるいは、他のブローカサーバからメッセージの複製を含む登録要求を受け取ると(S901)、切替処理部114にメッセージを受け取ったことを通知し、通知完了応答があるまで待つ(S902)。
【0071】
メッセージ管理部112は、切替処理部114から通知完了を受け取ると(S903)、受信したメッセージにメッセージIDが付加されているか否かを判定する。受け取ったメッセージがプロデューササーバ400から送られてきた新規メッセージである場合、受け取ったメッセージにメッセージIDは付加されていないため、メッセージ管理部112におけるS904の判定結果は「メッセージIDなし」となる。一方、通常運用時に低速コンシューマサーバ専用ブローカサーバとして動作するブローカサーバ200のように、他のブローカサーバからメッセージの複製を受け取る場合は、受け取ったメッセージには既にメッセージIDが付加されているため、判定結果は「メッセージIDあり」となる(S904)。
【0072】
S904において、「メッセージIDなし」と判定された場合、メッセージ管理部112は、メッセージIDカウンタ630から現在のメッセージIDを取得し(S905)、取得したメッセージIDをメッセージに付加する(S906)。その後、メッセージIDカウンタ630の値を次に受け取る新規メッセージに付加するメッセージIDとするために、1加算する(S907)。
【0073】
S904〜907によりメッセージに付加した後、あるいは、S904において、「メッセージIDなし」と判定された場合、メッセージ管理部112は、受け取ったメッセージをメッセージバッファ116に格納する(S908)。次に、メッセージ管理部112は、自ブローカサーバがストレージ装置105に接続されているか否か判定する。ストレージ装置105は、通常運用時、低速コンシューマサーバ専用ブローカサーバとして動作するブローカサーバ200に接続されており、正常配信用ブローカサーバとして動作するブローカサーバ100は、ストレージ装置105に接続されていない。このため、通常運用時、ブローカサーバ100のメッセージ管理部112−1における判定結果は「NO」となり、S913の処理に移る。一方、通常運用時のブローカサーバ200のメッセージ管理部112−2における判定結果は「YES」となる(S909)。
【0074】
自ブローカサーバにストレージ装置105が接続されている場合、メッセージ管理部112は、メッセージバッファ116の利用率をバッファ内のメッセージ量とバッファの最大サイズの比によって求め、その利用率がしきい値を超えているか否か判定する(S910)。
【0075】
メッセージバッファ116の利用率がしきい値を超えている場合、メッセージ管理部112は、バッファ溢れを防ぐためにメッセージバッファ116に保持されているメッセージを、付加されているメッセージIDの古いものから順に読み出し、ストレージ装置105に格納する(S911)。ストレージ装置に格納されたメッセージは、メッセージバッファ116から削除される(S912)。本実施形態では、S911、S912によるストレージ装置105へのメッセージの移動は、あらかじめ定められた一定量ずつ行われるが、メッセージバッファ116に保持されているすべてのメッセージを移動の対象としてもかまわないし、移動するメッセージ量が可変であってもかまわない。
【0076】
S913で、メッセージ管理部112は、受け取ったメッセージの複製を送付すべきブローカサーバがあるか否かを判定する。メッセージ管理部112は、ブローカサーバ表610を参照し、自ブローカサーバのブローカサーバ名611に対応する種別613が「正常配信用」である場合、他の「低速コンシューマサーバ専用」のブローカサーバにメッセージの複製を送付する必要がある。通常運用時において、ブローカサーバ100は、ブローカサーバ200にメッセージ複製を送る必要があり、ブローカサーバ200は、他のブローカサーバにメッセージの複製を送付する必要がない。したがって、メッセージ管理部112−1における判定結果は「YES」となり、メッセージ管理部112−2における判定結果は「NO」となる。なお、他にブローカサーバが複数台設けられ、第3の実施形態として後述するように通常時にメッセージの配信を行わないブローカサーバが存在する場合、あるいは、メッセージごとに配信先のコンシューマサーバが異なりメッセージ配信先となるコンシューマサーバを担当していないブローカサーバが存在する場合、そのようなブローカサーバへはメッセージの複製を送付する必要はない。このような場合、メッセージ管理部112は、配信担当表620を参照することにより、メッセージの複製の配信が必要なブローカサーバを判別することができる。
【0077】
S913における判定結果が「YES」であった場合、メッセージ管理部112は、メッセージの複製を送付する必要があるブローカサーバのメッセージ管理部112にメッセージの複製を送付する(S914)。その後、メッセージの複製を送付したブローカサーバからのメッセージ登録完了応答を待ち、メッセージ登録完了応答が返ってくるとそれを受け取る(S915)。
【0078】
S913で複製の送付が不要と判定された場合、あるいは、S915でメッセージの複製を送付したブローカサーバからのメッセージ登録完了応答を受信した後、メッセージ管理部112は、S901で受け取ったメッセージの送信元であるプロデューササーバ、あるいは、ブローカサーバにメッセージ登録完了応答を送り、メッセージ登録処理を終える(S916)。
【0079】
ここではプロデューササーバに対する登録完了応答処理と登録処理とを一連の処理として説明したが、これらは必ずしも一連の処理として実施される必要はなく、たとえば、登録応答処理を登録処理とは独立した処理として非同期で行い、複数の登録完了応答をまとめて送るようにしてもよい。
【0080】
図9は、配信処理部によるメッセージ配信処理の流れを示すフローチャートである。図9に示す処理は、図7のシーケンス図におけるS814〜S819の処理 に対応している。
【0081】
配信処理部113は、配信担当表620から配信を担当する1台のコンシューマサーバ500のコンシューマサーバ名を取得する(S1001)。次に、取得したコンシューマサーバ名に該当するコンシューマサーバ500の配信状況を配信担当表640から取得する(S1002)。コンシューマサーバ500の配信状況を取得すると、配信処理部113は、その状態644に設定されている情報が「正常待機」であるか「Ack待ち」であるか判定する(S1003)。
【0082】
S1003において、コンシューマサーバ500が「正常待機」状態であると判定された場合、配信処理部113は、配信済メッセージID642で特定されるメッセージの次のメッセージのメッセージID(配信済メッセージID+1)を指定して、メッセージ管理部にメッセージ取得を要求する(S1004)。メッセージ管理部から要求したメッセージを受け取ると、配信処理部113は、そのメッセージをコンシューマサーバ500に送信する(S1005)。その後、配信処理部113は、メッセージを配信したコンシューマサーバ500に関連する配信状況表640の状態644を「Ack待ち」に、最終配信時刻643をその時点における現在時刻に、それぞれ変更する(S1006)。
【0083】
一方、S1003において、コンシューマサーバ500の状態が「Ack待ち」と判定された場合、配信処理部113は、その時点における現在時刻と最終配信時刻643に設定されている時刻の差を計算して、所定時間以上「Ack待ち」の状態が続いているか否かを判定する(S1007)。「Ack待ち」の状態となってから所定時間が経過していない場合、配信処理部113は、特に処理を行わずそのコンシューマサーバへのメッセージ配信処理を終える。
【0084】
S1007において、「Ack待ち」の状態が所定時間以上続いていると判断された場合、配信処理部113は、再び配信済メッセージID642で特定されるメッセージの次のメッセージを取得し(S1008)、そのメッセージを配信する(S1009)。
【0085】
配信処理部113は、以上の配信処理手順を自ブローカサーバが配信を担当するすべてのコンシューマサーバに対して順次行う。なお、各コンシューマサーバへの配信処理はマルチスレッドなどによって並列に実行してもよい。また、上述したメッセージ配信処理は、すべてのブローカサーバにおいて同様に実施される。
【0086】
図10は、コンシューマサーバからのメッセージ受取確認応答時に、配信処理部により行われる処理の手順を示すフローチャートである。図10に示す処理は、図7のシーケンス図におけるS820〜S823に対応している。
【0087】
配信処理部113は、コンシューマサーバ500から受取確認応答を受信すると(S1101)、受取確認応答を受信したコンシューマサーバ500について、配信状況表640の配信済メッセージID642に設定されているメッセージIDの値をインクリメントし、状態644を「正常待機」に変更する(S1102)。次に、配信処理部113は、配信状況表640からすべてのコンシューマサーバ500への配信が完了したメッセージのメッセージID、すなわち全コンシューマサーバの配信済メッセージID642の最小値を取得し(S1103)、メッセージ管理部112に取得したメッセージIDを削除可能なメッセージIDとして通知する(S1104)。
【0088】
図11は、正常配信用ブローカサーバのメッセージ管理部によるメッセージ削除処理の手順を示すフローチャートである。図11に示す処理は、図7のシーケンス図におけるS823〜S827に対応している。
【0089】
メッセージ管理部112−1は、配信処理部113−1より全コンシューマサーバへの配信が完了した削除可能なメッセージのメッセージIDを受領すると(S1201)、メッセージバッファ116−1に保持されている最古のメッセージに付加されたメッセージIDを取得し、配信処理部113−1から受け取ったメッセージIDと比較し、削除可能なメッセージがバッファに存在するかどうか確認する。メッセージバッファ116−1にある最古メッセージのメッセージIDが配信処理部113−1から受け取ったメッセージID以下の値であれば、削除可能なメッセージがメッセージバッファ116−1に存在する(S1202)。
【0090】
S1202において、削除可能なメッセージがメッセージバッファ116−1に存在すると判断された場合、メッセージ管理部112−1は、低速コンシューマサーバ専用のブローカ200のメッセージ管理部112−2に削除可能メッセージIDを通知して削除可能であることの確認を要求する(S1203)。その後、メッセージ管理部112−1は、ブローカサーバ200のメッセージ管理部112−2から確認結果を受信すると(S1204)、その結果に基づいてメッセージを削除可能であるか否か判定する(S1205)。
【0091】
S1205における判定の結果が「YES」の場合、メッセージ管理部112−1は、メッセージバッファ116−1から、S1201において受領したメッセージIDのメッセージ及び、それよりも古いメッセージ(受領したメッセージIDの値以下のIDを持つメッセージ)を削除する(S1206)。S1202、またはS1205における判定結果が「NO」である場合、メッセージ管理部112−1は、メッセージを削除せずにメッセージ削除処理を終える。
【0092】
S1203で削除可否の確認を要求されたブローカサーバ200のメッセージ管理部112−2では、受信したメッセージIDのメッセージ及びそれよりも古いメッセージについて、該当するメッセージがメッセージバッファ116−2に存在しない(ストレージ装置105への永続化が完了している)、あるいは、ブローカサーバ200が配信を担当するすべてのコンシューマサーバ500への配信が完了しているか確認し、削除可否を応答する。また、メッセージ管理部112−2における他の確認応答の方法として、該当するメッセージがメッセージバッファ116−2に存在する場合に、それらのメッセージをストレージ装置105に移動して永続化した後、削除可能の応答をするようにしてもよい。後者の方法によれば、正常配信用ブローカサーバ100のメッセージバッファ116−1をより有効に利用することができるようになる。
【0093】
図12は、メッセージ管理部によるメッセージ取得処理の手順を示すフローチャートである。
【0094】
メッセージ管理部112は、配信処理部113からメッセージIDの指定を含むメッセージ取得要求を受け取ると(S1301)、メッセージバッファ116から指定されたメッセージIDを持つメッセージを読み出す(S1302)。正常配信用ブローカサーバの場合未配信のコンシューマサーバ500があれば、そのメッセージは必ずメッセージバッファ116に保持されているが、低速コンシューマサーバ専用ブローカサーバでは、永続化によりメッセージがメッセージバッファ116からストレージ装置105に移動されている場合がある。このため、メッセージ管理部112は、S1302において、メッセージの読み出しに成功したか否か判定する(S1303)。
【0095】
S1303において、メッセージバッファ116からメッセージの読み出しに失敗したと判定された場合、メッセージ管理部112は、ストレージ装置105から指定されたメッセージIDを持つメッセージを取得する(S1304)。
【0096】
メッセージ管理部112は、S1304でストレージ装置から読み出したメッセージ、あるいは、S1303の判定の結果が肯定的であった場合には、メッセージバッファ116から読み出したメッセージを配信処理部113に返す(S1305)。
【0097】
図13は、正常配信用ブローカサーバ(ブローカサーバ100)が担当するコンシューマサーバの中に低速コンシューマサーバが発生した場合に行われる、低速コンシューマサーバへのメッセージ配信担当ブローカサーバの切替処理を示すシーケンス図である。
【0098】
ブローカサーバ100の切替処理部114−1が、ブローカサーバ100が配信担当となっているコンシューマサーバ500の中に、少なくとも1台の低速コンシューマサーバが発生したことを検知すると切替処理が開始される(S1401)。低速コンシューマサーバの具体的な検知処理については後述する。切替処理部114−1は、低速コンシューマサーバの発生を検知すると、配信処理部113−1に対して、検知した低速コンシューマサーバを指定して配信担当の切り替えを要求する(S1402)。
【0099】
配信処理部113−1は、切替要求を受けると、配信担当表620を更新し、指定されたコンシューマサーバ名621に対応する担当ブローカサーバ名623をブローカサーバ100のサーバ名からブローカサーバ200のサーバ名に変更する(S1403)。その後、配信処理部113−1は、配信状況表640から当該コンシューマサーバの配信状況を取得して(S1404)、それを切替処理部114−1に返す(S1405)。
【0100】
ここで配信担当表620を更新することにより、低速コンシューマサーバは、ブローカサーバ100の担当外となるので、配信処理部113−1は、当該コンシューマサーバへのメッセージ配信処理は停止する。切替処理の実施中に、当該コンシューマサーバからブローカサーバ100に対して、Ackや再送要求が送られてきた場合、それらAckや再送要求を切替先のブローカサーバ200に対してリダイレクトするか、あるいは、切替先のブローカサーバ200が配信担当となるまでそれらを無視する。
【0101】
次に、切替処理部114−1は、ブローカサーバ200の切替処理部114−2に対して、低速コンシューマサーバへの配信担当ブローカサーバの切り替えを要求する。この際、切替処理部114−1は、配信処理部113−1から受け取った当該コンシューマサーバの配信状況を切替処理部114−2に切替要求と共に送る(S1406)。
【0102】
ブローカサーバ200の切替処理部114−2は、切替要求を受けると、配信処理部113−2に対して低速コンシューマサーバの配信担当を切り替えるよう要求する(S1407)。
【0103】
配信処理部113−2は、切替処理部114−2から切替要求を受けると、配信状況表640にブローカサーバ100から送られてきた配信状況を登録し(S1408)、配信担当表620を更新する(S1409)。以上の処理を行った時点で、低速コンシューマサーバへのメッセージの配信を担当するブローカサーバがブローカサーバ200となり、ブローカサーバ200による当該コンシューマサーバへのメッセージ配信処理が開始される。
【0104】
切替処理部114−2は、以上の処理の後、ブローカサーバ100の切替処理部114−1に対して、切替準備が完了したことを通知する(S1410)。
【0105】
ブローカサーバ100の切替処理部114−1は、切替準備完了の通知を受け取ると、配信処理部113−1に対して当該コンシューマサーバの配信情報を削除するように要求する(S1411)。配信処理部113−1は、配信状況表640から当該コンシューマサーバの配信状況を削除する(S1412)。
【0106】
以上で切替処理は終了する。なお、本実施形態では、切り替えの対象となったコンシューマサーバに対して、配信を担当するブローカサーバが切り替わったことを特に通知をすることなくメッセージ配信処理を再開しているが、切替時点でコンシューマサーバに対しメッセージの配信を担当するブローカサーバが切り替わったことを明示的に通知するようにしてもよい。
【0107】
図14は、正常配信用ブローカサーバの切替処理部による低速コンシューマサーバ検知処理、及び切替処理の手順を示すフローチャートである。図中、S1501〜S1508が低速コンシューマサーバの検知処理、S1509〜1512が切替処理となる。なお、低速コンシューマサーバの検知処理は、図13のS1401に、切替処理は、図13のS1402〜1412の処理にそれぞれ対応する。
【0108】
低速コンシューマサーバの検知・切替処理において、切替処理部114−1は、まず、メッセージバッファ116−1に保持されているメッセージの量を取得する(S1501)。そして、取得したメッセージ量とメッセージバッファ116−1のサイズの比を計算してメッセージバッファ116−1の利用率を得て、判定しきい値表710に設定されているバッファ利用率しきい値711を超えているか否か判定する。メッセージバッファ116−1の利用率が一定量を超え、バッファ溢れの恐れがあるときに、これ以降の処理において低速コンシューマサーバの検知処理が実施される(S1502)。
【0109】
S1502において、メッセージバッファ116−1の利用率がバッファ利用率しきい値711以下であれば、バッファ溢れの恐れがないため、切替処理部114−1は、何もせずに検知・切替処理を終了する。メッセージバッファ116−1の利用率がバッファ利用率しきい値711を超えている場合、切替処理部114−1は、低速コンシューマサーバを検知するため、配信担当表620からブローカサーバ100がメッセージの配信を担当しているすべてのコンシューマサーバ500を含むリストを取得する(S1503)。そして、切替処理部114−1は、取得した担当コンシューマサーバのリストに未処理のコンシューマサーバが残されているか判定する(S1504)。
【0110】
本実施形態では、配信の遅延が発生しているか否かは、コンシューマサーバ500へのメッセージの配信状況に基づいて判定される。具体的には、配信の完了していないメッセージが相対的に多くなっているコンシューマサーバ500が低速コンシューマサーバとして検知される。この判定を行うために、切替処理部114−1は、リストに未処理のコンシューマサーバ500が残っている場合、次のコンシューマサーバ名を取得し(S1505)、配信状況表640からそのコンシューマサーバ500への配信状況を得る(S1506)。
【0111】
切替処理部114−1は、S1506で取得した配信状況から配信済メッセージID642を取り出し、遅延判定基準IDと比較し、メッセージ配信の遅延が発生しているか否か判定する。ここで用いられる遅延判定基準IDとしては、メッセージバッファ116−1に保持されているメッセージの中で最古のメッセージのメッセージIDの値に、しきい値表710のメッセージ切替オフセット712に設定されている値を加えたものが用いられる。配信済メッセージID642の値が遅延判定基準ID以下の場合、配信が完了していない滞留メッセージの量が相対的に多くなっており、配信が遅延しているものとして判定される。配信に遅延が発生していなければ、処理はS1504に戻る(S1507)。
【0112】
S1507において、メッセージの配信に遅延が発生していると判定された場合、切替処理部114−1は、そのコンシューマサーバ500を低速コンシューマサーバのリストに加え、S1504の処理に戻る(S1508)。
【0113】
S1504において、担当コンシューマサーバのリストにあるすべてのコンシューマサーバ500の処理を終えている場合、切替処理部114−1は、配信処理部113−1に検知処理において検知した低速コンシューマサーバのリストと共に低速コンシューマサーバへのメッセージの配信切替を要求する。配信処理部113−1は、配信切替要求に応答して、リスト中の低速コンシューマサーバに対応する配信状況を取得し、切替処理部114−1に返す。これにより、切替処理部114−1は、低速コンシューマサーバの配信状況を取得する(S1509)。
【0114】
次に、切替処理部114−1は、ブローカサーバ200の切替処理部114−2に対して、低速コンシューマサーバの配信状況を含む配信切替要求を送信し、切替準備完了を待つ(S1510)。切替処理部114−1は、切替処理部114−2から切替準備完了通知を受け取ると(S1511)、配信処理部113−1に対して、配信状況表640から低速コンシューマサーバに関する配信状況を削除するように要求し、配信処理部113−1により該当する情報が配信状況表640から削除される(S1512)。
【0115】
本実施形態においては、メッセージバッファ116への滞留メッセージ数とその配信状態に応じて、低速コンシューマサーバを検知しているが、他の基準で低速コンシューマサーバを検知してもよい。例えば、滞留しているメッセージの総サイズ、ネットワーク状態、応答時間、他のリソースの利用率、あるいはこれらの組み合わせによって低速コンシューマサーバを検出するようにしてもよい。
【0116】
図15は、切替処理時に、正常配信用ブローカサーバの配信処理部により実施される配信担当解除処理の手順を示すフローチャートである。図15に示す処理は、図13のシーケンス図におけるS1403〜S1404およびS1412 に対応している。
【0117】
配信処理部113−1は、切替処理部114−1から低速コンシューマサーバの配信切替要求を受け取ると(S1601)、配信担当表620上で、配信切替要求と共に受け取った切り替えの対象となる低速コンシューマサーバのサーバ名621に対応する担当ブローカサーバ名623を切替先のブローカサーバ名(ここではブローカサーバ200のサーバ名)に変更し、自ブローカサーバの担当配信先から除外する(S1602)。配信処理部113−1は、その後、配信状況表640から当該低速コンシューマサーバの配信状況を取得して切替処理部114−1に渡す(S1603)。
【0118】
ブローカサーバ200側の切替準備が完了し、切替処理部114−1から配信状況の削除要求を受けると(S1604)、配信処理部113−1は、配信状況表640から当該する低速コンシューマサーバの配信状況を削除する(S1605)。
【0119】
図16は、切替処理時に、低速コンシューマサーバ専用ブローカサーバの配信処理部により実施される配信担当登録処理の手順を示すフローチャートである。図16に示す処理は図13のシーケンス図におけるS1408〜S1409の処理に対応している。
【0120】
配信処理部113−2は、切替処理部114−2から切替対象となる低速コンシューマサーバの配信状況を含む低速コンシューマサーバの配信切替要求を受け取ると(S1701)、配信状況表640に受け取った低速コンシューマサーバの配信状況を登録する(S1702)。その後、配信担当表620上の当該低速コンシューマサーバのサーバ名621に対応する担当ブローカサーバ名623を自ブローカサーバ名(ここではブローカサーバ200のサーバ名)に変更し、当該低速コンシューマサーバを自身の担当配信先に加える(S1703)。その後、配信処理部113−2は、切替処理部114−2に対して切替準備が完了した旨を通知する(S1704)。
【0121】
図17は、低速コンシューマサーバ専用ブローカサーバの負荷増大時に行われる正常コンシューマサーバの配信担当ブローカサーバの切替処理を示すシーケンス図である。
【0122】
図17に示す処理は、ブローカサーバ200の負荷が大きくなり、切替処理部114−2により、ブローカサーバ200が配信担当となっているコンシューマサーバ500の中で正常配信用ブローカサーバへの切り替えが可能な正常コンシューマサーバが検知されることにより開始される。正常コンシューマサーバの具体的な検知処理については後述する(S1801)。切替処理部114−2は、正常コンシューマサーバを検知すると、配信処理部113−2に対して、検知した正常コンシューマサーバを指定して配信担当の切り替えを要求する(S1802)。以降、低速コンシューマサーバを担当するブローカサーバの切替処理におけるS1403〜S1406と同様、切替処理部114−2により、配信担当表620を更新し(S1803)、配信担当を切り替えるコンシューマサーバへの配信状況を取得して(S1804)、それを切替処理部114−2に渡す(S1805)。切替処理部114−2は、受け取った配信状況を含む切替要求をブローカサーバ100の切替処理部114−1に送る(S1806)。
【0123】
ブローカサーバ100の切替処理部114−1は、切替要求を受けると、配信処理部112−1に配信担当の切り替えを要求する(S1807)。その後、S1408〜S1410と同様に、配信処理部113−1により配信状況表640に配信状況を登録(S1808)、配信担当表620を更新して(S1809)、切替処理部112−1により切替準備完了をブローカサーバ200に返す(S1810)。
【0124】
その後、ブローカサーバ200では、S1411〜S1412と同様に、切替処理部114−2から配信処理部113−2に配信状況の削除要求がなされ(S1811)、配信処理部113−2により該当するコンシューマサーバについて、配信状況表640から配信状況を削除する(S1812)。
【0125】
図18は、低速コンシューマ専用ブローカサーバの切替処理部による正常コンシューマサーバ検知処理と配信担当の切替処理の手順を示すフローチャートである。
【0126】
低速コンシューマサーバへのメッセージの配信を担当するブローカサーバを正常配信用ブローカサーバ100から低速コンシューマサーバ専用ブローカサーバ200に切り替えていくに伴い、ブローカサーバ200が配信を担当するコンシューマサーバの数が増加し、リソース利用量が増大しいく。リソース利用量の増大は、ブローカサーバ200の配信処理の性能劣化を引き起こす要因となる。そこで、本実施形態では、ブローカサーバ200のリソース利用量が一定量を超えた場合に、ブローカサーバ200が担当している正常コンシューマサーバへのメッセージ配信を正常配信用ブローカサーバ100から行うように切り替えることで、配信負荷の偏りを防ぐ。
【0127】
正常コンシューマサーバの検知・切替処理において、切替処理部114−2は、ブローカサーバ200のリソース利用率を算出する(S1901)。切替処理部114−2は、算出したリソース利用率と判定しきい値表710のリソース利用率しきい値713に設定されたしきい値とを比較し、リソース利用率がリソース利用率しきい値713を超えているか否か判定する。判定結果が「NO」であれば、切替処理部114−2は、切替処理を開始せずに処理を終了する(S1902)。
【0128】
S1902において、リソース利用率がリソース利用率しきい値713を超えていた場合、正常コンシューマサーバの検知処理(S1903〜S1908)が実施される。以降、正常コンシューマサーバの検知処理、及び切替処理では、図14に示した低速コンシューマサーバの検知処理(S1503〜S1912)とほぼ同様の処理が実施される。図18では、図14における処理と同じ処理については、同じ参照番号が用いられている。以下、図14と相違する部分を主に説明する。
【0129】
正常コンシューマサーバの検知処理では、S1503で取得された担当コンシューマサーバのリストのコンシューマサーバについて、S1505、S1506で得られた配信状況に基づいて滞留メッセージ数が所定数以下となっている場合に正常コンシューマサーバと判定する。具体的には、配信状況として得られた配信済メッセージID642と正常判定基準IDとを比較し、配信済メッセージID642が正常判定基準ID以降のものである場合に、正常コンシューマサーバとして判定する。本実施形態では、正常判定基準IDとして、メッセージバッファ116−2に保持されたメッセージの中で最新のメッセージのメッセージIDの値から、しきい値表710のメッセージ切替オフセット712に設定されている値を引いたものが用いられる。比較の結果正常コンシューマサーバでないと判定された場合、処理はS1504に戻る(S1907)。
【0130】
S1907において、検知対象のコンシューマサーバが正常コンシューマサーバであると判定された場合、切替処理部114−2は、そのコンシューマサーバ500を定常コンシューマサーバのリストに加え、S1504の処理に戻る(S1908)。
【0131】
S1504で担当コンシューマサーバのリストにあるすべてのコンシューマサーバ500の処理を終えていると判断された後、S1509と同様にして正常コンシューマサーバの配信状況を取得し(S1909)、S1510、1511において、ブローカサーバ100との間で配信切替の要求、切替完了通知の受信が行われる。その後、切替処理部114−2は、配信処理部113−2に対して、配信状況表640から正常コンシューマサーバに関する配信状況を削除するように要求し、配信処理部113−1により該当する情報が配信状況表640から削除される(S1512)。
【0132】
本実施形態では、S1507において用いられる遅延判定基準IDの算出と、S1907において用いられる正常判定基準IDの算出に同じメッセージ切替オフセットを用いているが、両者を異なるオフセット値を用いて算出するようにしてもかまわない。
【0133】
図17に示す切替処理においてブローカサーバ200の配信処理部113−2により実施される配信担当解除処理、及びブローカサーバ100の配信処理部113−1により実施される配信担当登録処理は、それぞれ、図15、図16に示すフローチャートに従って実施される。ただし、この場合、ブローカサーバ100とブローカサーバ200の関係が、図14に示す切替処理と入れ替わっていることに留意されたい。
【0134】
本実施形態では、低速コンシューマサーバ、及び正常コンシューマサーバの検知において、検知処理を実施するブローカサーバとそれによってメッセージが配信されるコンシューマサーバの状態を基準として判定が行われる。メッセージ配信システム全体の処理を最適化するという観点から、自ブローカサーバの状態に加えて、切替先のブローカサーバの状態も判定基準として用いるようにしてもよい。さらに、コンシューマサーバの状態の判定にはブローカサーバ側が管理する配信状況を基にしているが、より高精度に把握するためにコンシューマサーバ内部のリソース利用率等コンシューマサーバ側が有する状態を用いても良い。
【0135】
また、上述した実施形態では、自ブローカサーバにおいて配信を担当するコンシューマサーバの切り替えを行うか否か判断しているがこれは任意のサーバが実施可能である。例えば、切替先のブローカサーバが自身のリソース利用率に余裕があるときに、低速コンシューマサーバの検知および切替元のブローカサーバに対する切替要求をしても良い。さらにブローカサーバ間で連携して担当するブローカサーバの切り替えを行っているが、これも任意のサーバ上で行うことができ、例えば、各ブローカサーバの状況を監視する管理サーバを設け、管理サーバ上の配信状況監視プログラム等のメッセージ配信システム監視部がブローカサーバ間で担当するコンシューマサーバを切り替えるようにしてもよい。この場合、例えば、管理サーバによって各ブローカサーバにおけるメッセージの配信状況、バッファ内のメッセージ滞留数や滞留量、あるいは、各ブローカサーバの処理負荷の状況等を収集する。そして、収集した情報に基づいて、管理サーバにより上述した実施形態と同様の手法で低速コンシューマサーバや正常コンシューマサーバを検知する。低速コンシューマサーバ、あるいは、正常コンシューマサーバが検知された場合、管理サーバにより、そのコンシューマサーバへのメッセージ配信を担当させる他のブローカサーバを決定してそれらブローカサーバ間で、そのコンシューマサーバへメッセージ配信を行うブローカサーバを切り替える。
【0136】
さらに、正常コンシューマサーバと低速コンシューマサーバの切替処理は別々に行われているが、低速コンシューマサーバを検知しその配信担当を移す際に、切替先のブローカサーバからその代わりとなる正常コンシューマサーバを受け取るといったように、ブローカサーバ間で担当するコンシューマサーバを交換するようにしてもかまわない。
【0137】
図19は、いずれかのブローカサーバに障害が生じた際に実施されるフェールオーバ処理の手順を示したフローチャートである。本実施形態において、一方のブローカサーバに障害が生じた際には、他方のブローカサーバがその配信業務を引き継ぎ、フェールオーバする。正常配信用ブローカサーバであるブローカサーバ100に障害が発生した場合と、低速コンシューマサーバ専用ブローカサーバであるブローカサーバ200に障害が発生した場合とでは、ストレージ装置105が接続されているか否かによる処理の相違がある。
【0138】
通常運用時、ブローカサーバ100及びブローカサーバ200の切替処理部114は、ハートビートにより、相手方のブローカサーバが正常に稼働しているか否か相互に診断を行っている。切替処理部114が、ブローカサーバ200からのハートビートが途切れたことを検出し、他方のブローカサーバに障害が発生したことを検知すると、フェールオーバ処理が開始される(S2001)。
【0139】
切替処理部114は、まず配信処理部113にフェールオーバ要求を出す(S2002)。配信処理部113は、フェールオーバ要求を受けると(S2003)、メッセージ管理部112−1に対し、さらに、フェールオーバ要求を伝える(S2004)。
【0140】
メッセージ管理部112は、フェールオーバ要求を受けると(S2005)、自ブローカサーバにストレージ装置105が接続されているか否か判別する。本実施形態では、正常配信用ブローカサーバ100にはストレージ装置105が接続されておらず、低速コンシューマサーバ専用ブローカサーバとして機能しているブローカサーバ200にストレージ装置が接続されている。このため、ブローカサーバ200に障害が生じた際のメッセージ管理部112−1による判別結果は否定的なもの(「NO」)となり、ブローカサーバ100に障害が生じた際のメッセージ管理部112−2による判別結果は肯定的なもの(「YES」)となる(S2006)。
【0141】
S2006における判別結果が「NO」であれば、メッセージ管理部112は、自ブローカサーバとストレージ装置105との間の接続を確立し、自ブローカサーバにおいてストレージ装置105保持されている未配信先コンシューマサーバが存在するメッセージの利用を可能にするとともに、自ブローカサーバのメッセージバッファ116に保持されているメッセージの永続化を可能にする(S2007)。この後、メッセージ管理部112は、配信処理部113にフェールオーバ完了通知を返す。また、S2006における判別結果が「YES」である場合、メッセージ管理部112は、S2007の処理を実施せずに、フェールオーバ完了通知を配信処理部113に返す(S2008)。
【0142】
配信処理部113は、フェールオーバ完了通知が戻ってくると(S2009)、配信状況表640に障害が発生したブローカサーバが配信を担当していたコンシューマサーバ500へのメッセージの配信状況を新規に追加する(S2010)。配信処理部113は、さらに、配信担当表620の当該コンシューマサーバの配信担当ブローカサーバ名623を自ブローカサーバのサーバ名に変更した後(S2011)、切替処理部114にフェールオーバの完了を通知する(S2012)切替処理部114が配信処理部113からフェールオーバ完了通知を受領すると、フェールオーバ処理は終了する(S2013)。
【0143】
本実施形態では2台のブローカサーバを用いて両現用系のシステムを構成しているが、実行系と待機系ブローカサーバによるHA構成を採ることも可能である。たとえば、HA構成のシステムにおいて、通常状態では実行系のブローカサーバのみでメッセージを配信し、低速コンシューマサーバが発生したときには、待機系ブローカサーバを低速コンシューマサーバ専用ブローカサーバとして動作させ、低速コンシューマサーバへのメッセージ配信のみを待機系ブローカサーバで行うようにすることもできる。この場合、低速コンシューマサーバ専用ブローカサーバは、正常コンシューマサーバへのメッセージ配信を行わないため、正常コンシューマサーバに対してより安定した配信性能を提供可能となる。
【0144】
また、ブローカサーバの台数は、2台である必要はなく、3台以上のブローカサーバを用いてもよい。この場合には、正常配信用ブローカサーバ、低速コンシューマサーバ専用ブローカサーバそれぞれに少なくとも1台のブローカサーバを割り当てればよく、他の負荷分散技術との組み合わせによって、さらに配信性能を向上させることも可能である。
【0145】
さらに、本実施形態では、初期状態において、低速コンシューマサーバ専用ブローカサーバと正常配信用ブローカサーバのそれぞれが担当するコンシューマサーバを任意のサーバとしている。予めコンシューマサーバの仕様等が判っており、低速コンシューマサーバとなる可能性のあるコンシューマサーバを判別できる場合は、予め、そのような低速コンシューマサーバとなる可能性のあるコンシューマサーバへのメッセージ配信を低速コンシューマサーバ専用ブローカサーバに、他のコンシューマサーバへのメッセージ配信を正常配信用ブローカサーバにそれぞれ割り当てておき、システムの運用中は、上述した実施形態と同様、メッセージの配信状況に応じて各ブローカサーバが担当するコンシューマサーバを切り替えるようにしてもよい。具体的には、例えば、コンシューマサーバの仕様から、基準となる所定の性能以下の性能を持つコンシューマサーバを低速コンシューマサーバ専用ブローカサーバに、所定の性能以上の性能を持つコンシューマサーバを正常用ブローカサーバに割り当てる。例えば、コンシューマサーバが備えるプロセッサの処理性能(プロセッサの種類)、クロック周波数、メモリ容量など、一般の計算機における処理性能に影響する要素を考慮してコンシューマサーバの性能を判断することができる。この場合、切替処理の発生回数の減少が見込めるため、ブローカサーバの配信性能はより安定する。
【0146】
さらに、本実施形態では、各コンシューマサーバがブローカサーバにより配信されるすべてのメッセージを受け取ることを想定しているが、各コンシューマサーバに配信するメッセージの種類(トピック)をあらかじめブローカサーバに登録(Subscribe)しておき、各コンシューマサーバに、登録されたトピックに応じたメッセージのみを配信(Publish)するようなPublish/Subscribe型の配信形態をとってもかまわない。この場合、配信担当の切替をトピック毎に実施するようにしてもよい。
【0147】
以上説明した実施形態によれば、低速コンシューマサーバへのメッセージ配信、コンシューマサーバへの配信が完了していないメッセージの永続化処理が、低速コンシューマサーバ専用ブローカサーバに集約される。これにより、正常コンシューマサーバへのメッセージ配信処理に対する永続化処理やメッセージの再送処理による処理負荷の増大の影響を低減することができる。また、低速なコンシューマサーバにメッセージの配信を行わないブローカサーバにおけるメッセージ配信の処理負荷の見積もりが容易となり、安価な計算機でも性能要件を満たすことが可能になる等、コスト面での効果もある。
【0148】
また、すべてのブローカサーバがストレージ装置を持つ必要はなく、メッセージの永続化が複数のブローカサーバで重複して行われることもなくなるため、ストレージ装置の容量を有効に活用することができる。これにより、ストレージ装置の構成コストを抑制することが可能となる。また、メッセージをストレージ装置に書き込むブローカサーバの台数が減少するため、複数台のブローカサーバでストレージ装置を共有した場合に生じていた書き込み排他制御を抑止することができる。さらに、ストレージ装置に書き込みをするブローカサーバが1台であれば、書き込み処理をシーケンシャルに行うことが可能である。以上のように、低速コンシューマサーバが発生した際の性能劣化を抑えることができるという効果もある。

<第2の実施形態>
第1の実施形態におけるメッセージ配信システム10は、両現用系のシステム構成であるため、低速コンシューマサーバ専用ブローカサーバ200がメッセージの配信を担当するコンシューマサーバ500には、低速コンシューマサーバと正常コンシューマサーバが混在している。本実施形態では、低速コンシューマサーバ専用ブローカは、正常時には処理を行わず、低速コンシューマサーバが発生している間だけ低速コンシューマサーバへのメッセージの配信を行い、正常コンシューマサーバへのメッセージ配信を行うブローカサーバと低速コンシューマサーバへのメッセージ配信を行うブローカサーバとを分離する。
【0149】
また、第1の実施形態では、2台のブローカサーバによりメッセージを二重化して保持することで高信頼化を図っているが、本実施形態では、メッセージ二重化およびメッセージ複製の常時送付を行わず、ブローカサーバ間で担当するコンシューマサーバの切り替えが発生した時に、メッセージの複製を一括してブローカサーバに送付することで、各ブローカサーバは配信を担当するコンシューマサーバに必要なメッセージを確保する。
【0150】
本実施形態における計算機システムも第1の実施形態と同様、図1に示す構成を有する。以下では、第1の実施形態と相違する部分について主に説明を行い、第1の実施形態と共通する部分については説明を省略する。また、以降の説明において参照される図において、第1の実施形態と共通する部分については、第1の実施形態の説明において参照した図において用いられている参照番号と同一の参照番号が用いられる。
【0151】
図20は、本実施形態において、第1の実施形態における判定しきい値表710に換えて用いられる判定しきい値表の概念図である。
【0152】
判定しきい値表720は、バッファ利用率しきい値721、メッセージ切替オフセット722、検知開始タイマーしきい値723を含む。
バッファ利用率しきい値721、メッセージ切替オフセット722は、バッファ利用率しきい値711、メッセージ切替オフセット712と同様に、低速コンシューマサーバ、あるいは正常コンシューマサーバの検知、及び検知処理の開始タイミングを判定するために用いられる。
【0153】
検知開始タイマーしきい値723は、正常コンシューマサーバの検知処理の開始トリガとなる時間を示すしきい値が設定される。本実施形態では、検知開始タイマーしきい値723に設定された時間間隔で、正常コンシューマサーバの検知処理が実施される。
【0154】
図21は、本実施形態の正常配信用ブローカサーバの切替処理部において、低速コンシューマサーバへのメッセージ配信を担当するブローカサーバの切替処理を示すシーケンス図である。
【0155】
ブローカサーバ100において低速コンシューマサーバが検知されてから切替準備完了が通知されるまでの処理(S1401〜S1410)は、第1の実施形態と同様である。本実施形態では、通常のメッセージ配信処理において、ブローカサーバ100からブローカサーバ200へのメッセージ複製の送付が行われないため、低速コンシューマサーバ専用ブローカサーバ200の切替準備が完了した後で、低速コンシューマサーバへの配信が必要なメッセージの複製がブローカサーバ100からブローカサーバ200に送られる。
【0156】
ブローカサーバ100の切替処理部114−1は、ブローカサーバ200から切替準備完了通知を受け取ると、メッセージ管理部112−1に対して低速コンシューマサーバへの配信が必要なメッセージの複製の取得要求をする(S2211)。メッセージ管理部112−1は、切替対象となる低速コンシューマサーバに未配信であるメッセージをメッセージバッファ116−1から読み出し(S2212)、それらのメッセージを切替処理部114−1に渡す(S2213)。切替処理部114−1は、メッセージ管理部112−1から受け取ったメッセージの複製を、ブローカサーバ200のメッセージ管理部112−1に送付してメッセージの複製の登録を要求する(S2214)。
【0157】
メッセージ管理部112−2は、受信したメッセージの複製をメッセージバッファ116−2に格納する(S2215)。メッセージバッファ116−2がバッファ溢れを起こす恐れがある場合、メッセージ管理部112−2は、メッセージの一部をストレージ装置105に書き込み、メッセージの永続化を行う(S2216)。その後、メッセージ管理部112−2は、ブローカサーバ100の切替処理部114−1に対してメッセージ複製の登録完了を通知する(S2217)。また、メッセージ管理部112−2は、配信処理部113−2に対して低速コンシューマサーバへの配信開始を要求する(S2218)。
【0158】
以降、配信処理部113−2により低速コンシューマサーバへのメッセージ配信が実施される。なお、ブローカサーバ200が低速コンシューマサーバへのメッセージ配信の開始以降、ブローカサーバ200が低速コンシューマサーバにメッセージの配信を行っている間、新たにプロデューササーバ400から配信されるメッセージについては、ブローカサーバ200が受信しているメッセージの複製を配信し終える等、所定のタイミングでブローカサーバ100からブローカサーバ200に一括してメッセージの複製を送信するか、あるいは、第1の実施形態と同様に、逐次、ブローカサーバ100からブローカサーバ200にメッセージの複製を送信するようにすればよい。
【0159】
本実施形態では、処理S2216においてバッファ溢れの恐れがある場合に、一部のメッセージをストレージ装置105に書き込んでいるが、メッセージバッファ116−2に書き込めなかったメッセージのみ、あるいは、メッセージバッファ116−2から溢れたメッセージを含め、メッセージバッファ116−2に保持されている一部あるいはすべてのメッセージをストレージ装置105に書き込むようにしてもよい。
【0160】
図22は、本実施形態において正常配信用ブローカサーバの切替処理部により実施される低速コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。本実施形態における低速コンシューマサーバの検知処理と切替処理は、図14に示した第1の実施形態における低速コンシューマサーバの検知処理、切替処理と同様におこなわれるが、第1の実施形態における切替処理のS1511とS1512との間でメッセージの複製が一括してブローカサーバ200に送信される点で相違している。
【0161】
S1511においてブローカサーバ200の切替処理部114−2から切替準備完了通知を受け取った切替処理部114−1は、配信状況表640から切替対象となるすべての低速コンシューマサーバの配信状況から配信済メッセージID642の最小値を取得する。配信済メッセージID642の最小値は、配信処理部640から取得してもよいし、準備完了通知を受け取る際にブローカサーバ200から取得してもよい(S2312)。次に、切替処理部114−1は、配信済メッセージIDの最小値からメッセージIDカウンタ613に保持されているID値までのメッセージの複製を一括してメッセージ管理部112−1から取得する(S2313)。切替処理部114−1は、取得したメッセージの複製をブローカサーバ200のメッセージ管理部112−2にむけて一括して送信し(S2314)、メッセージ管理部112−2から登録処理の完了が通知されるまで待つ(S2315)。メッセージ管理部112−2から登録処理の完了を受信した後、切替処理部114−1は、第1の実施形態と同様に削除処理を行う(S1512)。
【0162】
図23は、正常コンシューマサーバの配信担当ブローカサーバの切替処理を示すシーケンス図である。
【0163】
本実施形態では、ブローカサーバ100とブローカサーバ200との間でメッセージの二重化が行われておらず、メッセージバッファ116に保持されるメッセージが同期していない。このため、ブローカサーバ200が切替要求を発行しようとする時点でブローカサーバ100のメッセージバッファ116−1に切替対象となるコンシューマサーバへの未配信メッセージが残されていない可能性がある。そこで、本実施形態では、切替処理部114−2が切替要求を発行する前に、ブローカサーバ間で配信担当を引き継ぐ条件を決定し、その条件が満たされた後、配信担当の切り替えが行われる。
【0164】
具体的に、ブローカサーバ200の切替処理部114−2は、正常コンシューマサーバの発生を検知すると(S1801)、ブローカサーバ100の切替処理部114−1に対して、検出した正常コンシューマサーバへのメッセージの配信担当の切替準備を要求する(S2402)。
【0165】
切替処理部114−1は、メッセージ管理部112−1に対して、配信の引き継ぎを行う契機となるメッセージ(配信切替メッセージ)を決定するように要求する。本実施形態では、配信切替メッセージとして切替準備の要求時点における最新メッセージ(メッセージIDが「メッセージIDカウンタ630に設定されている値−1」であるメッセージ)を用いるものとする(S2403)。メッセージ管理部112−1は、要求を受けるとメッセージIDカウンタ630からID値を取り出し、配信切替メッセージとするメッセージのメッセージID(配信切替メッセージID)を取得し(S2404)、それを切替処理部114−1に返す。以降、メッセージ管理部112−1は、配信切替メッセージID以降のメッセージを、切替対象となる正常コンシューマサーバへの配信が完了するまでメッセージバッファ116−1から削除せずに保持する(S2405)。切替処理部114−1は、メッセージ管理部112-1から受け取った配信切替メッセージIDをブローカサーバ200の切替処理部114−2に通知する(S2406)。
【0166】
切替処理部114−2は、配信切替メッセージIDを受けた後、切替対象となるコンシューマサーバに対して、配信切替メッセージIDを持つメッセージの配信が完了するまで配信処理を継続する。この間のメッセージの配信処理については、図25を参照しつつ後述する。
【0167】
ブローカサーバ100から受け取った配信切替メッセージIDのメッセージまでメッセージの配信が完了すると、切替処理部114−2は、配信処理部113−2から配信完了通知を受け取り、配信担当の切替を開始する(S2407)。以降、S2048〜S2418は第1の実施形態と同様にして切替処理が行われる(S1802〜S1812)。
【0168】
本実施形態では、2段階で切替処理が実施されるため、切替準備要求時から実際に切り替えが行われるまでに配信状況が変化する場合が想定される。例えば、ブローカサーバ100のメッセージバッファ116−1が溢れて配信切替メッセージが保持できなくなったり、切替対象となる正常コンシューマサーバに再び遅延が生じたりすることが考えられ、このような場合、切替処理を継続することが困難となる。このため、本実施形態では、これらの状況が生じたときには、切替処理をキャンセルする。
【0169】
図24は、切替処理部による正常コンシューマサーバ検知処理と配信担当の切替処理の手順を示すフローチャートである。
【0170】
ブローカサーバ200の切替処理部114−2は、検知開始タイマーのカウント値が検知開始タイマーしきい値723に設定された値を超えているか否か判定する(S2501)。検知開始タイマーのカウント値が検知開始タイマーしきい値に設定された値を超えている場合、第1の実施形態と同様の処理により正常コンシューマサーバの検知処理を実施する(S1903〜S1908)。
【0171】
S1904ですべての担当コンシューマサーバについて正常コンシューマサーバとなっているか否かの判定が行われたと判定された後、切替処理部114−2は、ブローカサーバ100の切替処理部114−1に対して、正常コンシューマサーバのリストを含む配信切替準備要求を送信する(S2508)。切替処理部114−2は、ブローカサーバ100の切替処理部114−1から配信切替メッセージIDを受け取ると(S2509)、切替中であることを示すフラグ(切替中フラグ)を「ON」にし、切替対象となっている正常コンシューマサーバに対する配信切替メッセージIDまでのメッセージ配信処理を配信処理部113−2に依頼する。なお、切替中フラグの初期状態は「OFF」である。また、切替中フラグが「ON」である間、切替処理部114−2は、新たな正常コンシューマサーバの検知処理を実施しない(S2510)。
【0172】
配信切替メッセージまでメッセージの配信が完了すると、切替処理部114−2は、配信処理部113−2から配信完了通知を受ける(S2511)。配信完了通知を受け取った以降、切替処理部114−2は、第1の実施形態と同様の処理により、正常ブローカサーバへのメッセージ配信担当ブローカサーバの切り替えを実施する(S1909〜S1912)。
【0173】
配信担当ブローカサーバの切替処理が完了した後、切替処理部114−2は、再び正常コンシューマサーバ検知処理を行うため、検知開始タイマーをリセットし(S2516)、切替中フラグを「OFF」にして切替処理を終了する(S2517)。
【0174】
図25は、配信処理部によるメッセージ配信処理を示すフローチャートである。
【0175】
配信処理部113−2は、切替中フラグを参照して、切替処理中であるか否か確認する。切替中フラグが「OFF」のときには、図9に示す第1の実施形態における配信処理と同様にして通常の配信処理を行う(S2601)。
【0176】
切替中フラグが「ON」の場合、配信処理部113−2は、配信切替メッセージまでメッセージの配信が完了したか否かを判定する。具体的には、配信状況表640から切替要求中のすべての正常コンシューマサーバについて配信状況を取得し(S2603)、それらすべてのコンシューマサーバについて、配信済メッセージID644が配信切替メッセージID−1と等しいか否かを判定する(S2604)。
【0177】
S2604での判定結果が「YES」の場合、配信処理部113−2は、切替処理部114−2に対して配信切替メッセージまでメッセージの配信が完了したことを通知する(S2605)。
【0178】
S2604での判定結果が「NO」の場合は、配信切替メッセージまでメッセージの配信が完了していない正常コンシューマサーバが存在するため、メッセージ配信処理が行われる。メッセージ配信処理は、図10に示す第1の実施形態とほぼ同様に行われるが、切替中フラグが「ON」の場合、切替要求中の正常コンシューマサーバに対しては配信切替メッセージ以降のメッセージの配信は不要となる。このため、S1003においてメッセージの配信先のコンシューマサーバが「正常待機」状態と判定されたとき、配信処理部113−2は、そのコンシューマサーバに対して配信切替メッセージまでメッセージの配信が完了しているか否か、すなわち、配信済メッセージID644が配信切替メッセージID−1に等しいか否か判定する。配信切替メッセージまでメッセージの配信が完了している場合、配信処理部113−2は、そのコンシューマサーバに対してメッセージ配信を行わず、そうでなければ、S1004〜S1006の処理を実施し、メッセージを配信する(S2609)。
【0179】
本実施形態では、以上説明した処理以外の処理については、第1の実施形態と同様の処理が行われる。なお、上述した正常コンシューマサーバの配信担当の切替処理では、配信切替メッセージを決め、そのメッセージまでのメッセージ配信をブローカサーバ200で行った後切り替えが行われるが、低速コンシューマサーバの配信担当切替処理のように、正常コンシューマサーバの検知後に、必要なメッセージをブローカサーバ200からブローカサーバ100に転送して切替処理を実施するようにすることもできる。
【0180】
以上説明した第2の実施形態によっても第1の実施形態と同様の効果を得ることができる。さらに、第2の実施形態では、第1の実施形態のようにブローカサーバ間でメッセージ多重化していないため、メッセージバッファの内容は両ブローカサーバ間で同一とする必要はない。このため、正常配信用ブローカサーバは、自身が担当するコンシューマサーバへの配信完了したメッセージを配信完了後すぐに削除することでメッセージバッファを有効に利用することが可能となる。これにより、システム全体としてメッセージ配信に必要なメモリ容量を抑制することができ、システムをより安価に構築することができる。また正常配信用ブローカサーバを2台以上設けた場合、それぞれが持つメッセージバッファ領域を有効に利用できるため、処理の負荷分散がより有効に働くことが期待できる。
【0181】
また、本実施形態では、ブローカサーバ間で、必要な時に必要なメッセージの複製だけを一括して送ればよいため、メッセージの複製の転送に要するデータ通信量と処理時間を削減することができる。
【0182】
<第3の実施形態>
第1の実施形態において、低速コンシューマサーバの数が増大すると、低速コンシューマサーバ専用ブローカサーバ200にかかる負荷が増大し、低速コンシューマサーバへの配信処理がさらに遅れてしまうことが考えられる。低速コンシューマサーバに対しては元々配信が遅延しているため、このような問題が生じる可能性は少ないが、低速コンシューマサーバ専用ブローカサーバ200にかかる負荷の増大が、正常コンシューマサーバに復帰する直前の低速コンシューマサーバに対し配信遅延を発生させてしまう可能性がある。
【0183】
本実施形態では、第1の実施形態において、さらに低速コンシューマサーバ専用ブローカサーバを追加することで、上記問題に対応できるようにしている。以下では、第2の実施形態と同様、第1の実施形態と相違する部分について主に説明を行ない、第1の実施形態と共通する部分については説明を省略する。また、以降の説明において参照される図において、第1の実施形態と共通する部分については、第1の実施形態の説明において参照した図において用いられている参照番号と同一の参照番号が用いられる。
【0184】
図26は第3の実施形態にける計算機システムのハードウェア構成を示す簡略化されたブロック図である。
【0185】
本実施形態の計算機システムは、メッセージ配信システム20を構成するブローカサーバに新たに低速コンシューマ専用ブローカサーバとして機能するブローカサーバ300を有することと、ストレージ装置105に換えて、ブローカサーバ200、300によって共有される共有ストレージ装置800を有する点を除くと、第1の実施形態の計算機システムと同様に構成される。
【0186】
ブローカサーバ300は、ブローカサーバ100、ブローカサーバ200と同様にCPU301、メモリ302、通信インタフェース303、及びI/Oインタフェース304を有して構成される。ブローカサーバ300は、図2に示すブローカサーバ100と同様のプログラムモジュール、テーブル類を有する。以下、特にブローカサーバ100、ブローカサーバ200が有するプログラムモジュール、テーブル類と区別する必要がある場合には、メッセージ管理部112−3、配信処理部113−3のように、参照番号に添え字“−3”を付加して引用する。
【0187】
本実施形態において、ブローカサーバ300は、通常時においてはメッセージの配信処理を行わない。ブローカサーバ200が担当する低速コンシューマサーバの数が増加し、ブローカサーバ200の負荷が増大した場合、ブローカサーバ300は、一部の低速コンシューマサーバの配信担当をブローカサーバ200から引き継ぎ、メッセージ配信処理を行い、ブローカサーバ200との間で、低速コンシューマサーバへのメッセージ配信処理の負荷を分散させる。
【0188】
共有ストレージ装置800は、通常時、ブローカサーバ200、及びブローカサーバ300の双方に接続され、両者から共有して利用される。また、ブローカサーバ200、またはブローカサーバ300に障害が生じた際、ブローカサーバ100から共有ストレージ800に格納されているデータにアクセスできるよう、ブローカサーバ100とも接続可能となるように構成される。なお、本実施形態では、ストレージ装置800における排他制御によるアクセス性能の低下を抑止するため、ブローカサーバ200だけがストレージ装置へのメッセージ書き込みを行い、ブローカサーバ300はストレージ装置からのメッセージ読み出しのみを行うように処理が行われる。
【0189】
本実施形態では、通常時のメッセージ配信、ブローカサーバ100とブローカサーバ200との間の配信担当の切替処理など、特に断りのない限りにおいて第1の実施形態と同様に行われる。したがって、ここでは、ブローカサーバ200とブローカサーバ300との間で行われる配信担当の切替処理についてのみ説明を行う。以降では第3の実施形態におけるコンシューマサーバ配信担当切替処理について説明する。ブローカサーバ100とブローカサーバ200の間でのコンシューマサーバ配信担当切替処理については、第1の実施形態に記載したとおりである。その他の処理についても、特に断りがない限りは第1の実施形態と同一である。
【0190】
図27は、本実施形態において切替あるいは切戻コンシューマサーバ検知処理開始判定に用いられるしきい値のセットを示す判定しきい値表の概念図である。判定しきい値表730は、ブローカサーバ200とブローカサーバ300により、両ブローカサーバ間の配信担当切替処理に用いられる。なお、ブローカサーバ100とブローカサーバ200との間の配信担当切替処理では、本実施形態においても、第1の実施形態において説明した判定しきい値表710が用いられる。
【0191】
判定しきい値表730は、切替用リソース利用率しきい値731、切戻用リソース利用率しきい値736を含む。両リソース利用率しきい値は第1の実施形態における判定しきい値表710のリソース利用率しきい値713と同様、CPU利用率732、737、ネットワーク利用率733、738、メモリ利用率734、739、及び判定方式735、740をその副項目として有する。判定しきい値表730の各値は、他のテーブルと同様に、ブローカサーバの管理インタフェースを介して、その設定値を変更することが可能である。
【0192】
切替用リソース利用率しきい値731は、ブローカサーバ200における切替コンシューマサーバ検知処理開始の基準として用いられる。ブローカサーバ200のリソース利用率がしきい値731を上回るとき、負荷が増大しているとみなされ、切替対象となるコンシューマサーバを決定し、そのコンシューマサーバへのメッセージ配信処理がブローカサーバ300に引き継がれる。
【0193】
切戻用リソース利用率しきい値736は、ブローカサーバ300における切戻コンシューマサーバ検知処理開始の基準として用いられる。ブローカサーバ200のリソース利用率がしきい値736を下回るとき、負荷の増大が解消されたとみなされ、ブローカサーバ300が担当しているコンシューマサーバへのメッセージの配信処理がブローカサーバ200に戻される。
【0194】
図28は、本実施形態における、低速コンシューマサーバの配信担当切替処理、低速コンシューマサーバへのメッセージ配信処理、及びメッセージ削除処理を示すシーケンス図である。
【0195】
ブローカサーバ200からブローカサーバ300への低速コンシューマサーバの配信担当切替処理は、ブローカサーバ200の負荷増大を契機に行われる。この切替処理では、ブローカサーバ200上で、最も配信が遅延しているコンシューマサーバから順に1台ずつ配信担当がブローカサーバ300に引き継がれる。
【0196】
処理S1401〜S1412は、ブローカサーバ200における低速コンシューマサーバ検知の後、低速コンシューマサーバに対するメッセージ配信処理をブローカサーバ200からブローカサーバ300に切り替える切替処理である。この切替処理は、図13に示す第1の実施形態においてブローカサーバ100により実施される処理がブローカサーバ200で、同じく、第1の実施形態においてブローカサーバ200により実施される処理がブローカサーバ300でそれぞれ行われる点を除けば、第1の実施形態の切替処理と同様のものである。
【0197】
処理S814〜S815、S3116〜S3117、及びS818〜S827からなる一連の処理は、ブローカサーバ300における低速コンシューマサーバへのメッセージ配信処理である。S814〜S815、及びS818〜S827で実施される処理は、図7に示した第1の実施形態におけるブローカサーバ100によるメッセージ配信処理(S814〜S827)において対応する参照番号の処理と同一の処理である。ブローカサーバ300により実施されるメッセージ配信処理は、その主体がブローカサーバ300であること、メッセージの読み出し処理(図7におけるS817)が後述するS3116、S3117の処理に置き換えられていること、及び、削除確認のための処理(図7におけるS824〜S826)の処理が実施されないことを除き、図7に示すメッセージ配信処理と同様に行われる。
【0198】
本実施形態においては、配信が最も遅れているコンシューマサーバから順にブローカサーバ300に割り当てられる。このため、ブローカサーバ300のメッセージ管理部112−3は、S3116において、共有ストレージ装置800に格納されている古いメッセージから順にメッセージを取得する。メッセージ取得の際、メッセージ管理部112−3は、共有ストレージ装置800から要求されたメッセージID以降のメッセージを一定量一括して取得し、メッセージバッファ116−3に一定期間保持する。そして、S3117において、メッセージバッファ116−3に保持したメッセージを適宜コンシューマサーバに配信する。メッセージをメッセージバッファ116−3に一時的に保持することで、再送処理や他のコンシューマサーバに同じメッセージを配信する際に、ストレージ装置からのメッセージの読み込みを避けることができる。また、要求されたメッセージID以降のメッセージを一括して取得しておくことで、後で配信するメッセージの先読みと、ストレージ装置からのシーケンシャルな読み出し処理による処理効率の向上を期待できる。
【0199】
処理S3024〜S3035は、ブローカサーバ200におけるメッセージ削除処理である。本実施形態では、複数台のブローカサーバで共有ストレージ装置800に格納されているメッセージを利用するため、メッセージ削除時には、少なくとも共有ストレージ装置800を利用するすべてのブローカサーバにとって、削除対象としているメッセージが不要であることを確認しなければならない。
【0200】
ブローカサーバ200のメッセージ管理部112−2は、あらかじめ定められたタイミングでメッセージ削除処理を開始する。メッセージ削除処理は、所定時間間隔で定期的に行われるか、あるいは、ストレージ装置800の利用状況やブローカサーバ200の負荷状況等、あらかじめ所定の基準を設けておき、その基準に従ったタイミングで開始されてよい(S3024)。
【0201】
メッセージ管理部112−2は、メッセージ削除処理を開始すると、配信処理部112−2に対して削除可能なメッセージのメッセージIDを要求する(S3025)。配信処理部113−2は、この要求を受けると、配信状況表640から配信済メッセージID642の最小値を削除可能メッセージIDとして取得し(S3026)、この削除可能メッセージIDをメッセージ管理部112−2に渡す(S3027)。メッセージ管理部112−2は、削除可能メッセージIDを受け取った後、他のブローカサーバのメッセージ管理部112から同様に削除可能メッセージIDの取得を要求する(S3028)。
【0202】
削除可能メッセージIDの要求を受けた他のブローカサーバのメッセージ処理部112は、ブローカサーバ200のメッセージ管理部112−2と同様、配信処理部113に削除可能メッセージIDを要求し(S3029)、配信処理部113からそれを受け取ると(S3030、S3031)、その削除可能メッセージIDをブローカサーバ200のメッセージ管理部112−2に返す(S2032)。なお、図28ではブローカサーバ300のみについてこの処理を図示しているが、他のブローカサーバが存在する場合は、同様の処理が他のブローカサーバを対象として行われる。
【0203】
ブローカサーバ200の配信処理部113−2は、メッセージ配信システム20内の全ブローカサーバからの削除可能メッセージIDを取得した後、それらのID値の中で最小のID値をシステム全体としての削除可能メッセージIDとして得る(S3034)。メッセージ管理部112−2は、S3034で得た削除可能メッセージID以前のメッセージIDを持つメッセージを共有ストレージ装置800から削除する(S3035)。
【0204】
図29は、切替対象コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。
【0205】
切替処理部114−2は、ブローカサーバ200のリソース利用率を求め(S3101)、リソース利用率が切替用リソース利用率しきい値731を上回るか否か判定する(S3102)。
【0206】
リソース利用率が切替用リソース利用率しきい値731以下である場合は、低速コンシューマサーバの切替処理は行われない。一方、リソース利用率が切替用リソース利用率しきい値731を上回る場合、切替処理部114−2は、配信担当表620から担当コンシューマサーバのリストを取得し(S3103)、配信状況表640から担当する全てのコンシューマサーバについて配信状況を取得する(S3104)。その後、切替処理部114−2は、担当する全てのコンシューマサーバの配信済メッセージID642を確かめ、配信済メッセージID642の中で値が最も小さい配信済メッセージIDを持つコンシューマサーバを切替対象の低速コンシューマサーバとして選定する(S3105)。
【0207】
以降、切替処理部113−2は、選定したコンシューマサーバの配信担当切替処理を実施する。この処理は、図14に示す第1の実施形態におけるブローカサーバ100による配信担当切替処理S1509〜S1512と同様にして行われる。この場合、第1の実施形態におけるブローカサーバ100の処理をブローカサーバ200が、同じく、ブローカサーバ200の処理をブローカサーバ300が実施することに留意されたい。
【0208】
図30は、ブローカサーバ300からブローカサーバ200への配信担当の切戻処理を示すシーケンス図である。本実施形態では、ブローカサーバ200の負荷増大が解消したことを契機に、ブローカサーバ300のコンシューマサーバの切戻処理を行う。切り戻しは、ブローカサーバ300において、最も配信処理が進んでいるコンシューマサーバから順に1台ずつ行われる。
【0209】
ブローカサーバ300の切替処理部114−3は、ブローカサーバ200の切替処理部114−2に対して、リソース利用率を要求する(S3201)。ブローカサーバ200の切替処理部114−2は、切替処理部114−3からの要求に応じて、ブローカサーバ200のリソース利用率を求め(S3202)、ブローカサーバ300の切替処理部114−3に求めたリソース利用率を返す(S3203)。
【0210】
ブローカサーバ300の切替処理部114−3は、受け取ったリソース利用率に基づいて、ブローカサーバ200に対してコンシューマサーバを切り戻すことが可能かどうかを判定し、切り戻すことが可能な場合には切り戻しの対象とすることのできるコンシューマサーバを検知する(S3204)。
【0211】
S3204以降は、図17に示す第1の実施形態における、ブローカサーバ200からブローカサーバ100への正常コンシューマサーバの配信担当切替処理S1802〜S1812と同様に行われる。この場合、第1の実施形態におけるブローカサーバ100の処理をブローカサーバ200が、同じく、ブローカサーバ200の処理をブローカサーバ300が実施し、切り戻しの対象としているコンシューマサーバが実施例1の正常ブローカサーバに対応していることに留意されたい。
【0212】
図31は、切替処理部による切戻コンシューマサーバの検知処理と配信担当の切替処理の手順を示すフローチャートである。
【0213】
切替処理部114−3は、ブローカサーバ200からリソース利用率を取得し(S3101)、ブローカサーバ200から取得したリソース利用率と切戻用リソース利用率しきい値736に設定されているしきい値とを比較し、取得したリソース利用率が切戻用リソース利用率しきい値736に設定されているしきい値を下回るか否か判定する(S3302)。
【0214】
取得したリソース利用率が切戻用リソース利用率しきい値736に設定されているしきい値を上回る場合には、コンシューマサーバの切戻処理は行われない。一方、取得したリソース利用率が切戻用リソース利用率しきい値736に設定されているしきい値を下回る場合、切替処理部114−3は、配信担当表620から担当コンシューマサーバのリストを取得し(S3303)、配信状況表640から担当する全てのコンシューマサーバの配信状況を取得する(S3304)。その後、切替処理部114−3は、担当する全てのコンシューマサーバの配信済メッセージID642を確かめ、配信済メッセージID642の中で値が最も大きい配信済メッセージIDを持つコンシューマサーバを切戻対象のコンシューマサーバとして選定する(S3305)。
【0215】
以降、切替処理部113−3は、選定したコンシューマサーバの配信担当の切戻処理を実施する。この処理は、図18に示す第1の実施形態におけるブローカサーバ100による配信担当切替処理S1909〜S1912と同様にして行われる。この場合、第1の実施形態におけるブローカサーバ100の処理をブローカサーバ200が、同じく、ブローカサーバ200の処理をブローカサーバ300が実施し、切り戻しの対象としているコンシューマサーバが実施例1の正常ブローカサーバに対応していることに留意されたい。
【0216】
以上説明した第3の実施形態によっても第1の実施形態と同様の効果を得ることができる。さらに本実施形態では、低速コンシューマサーバへのメッセージ配信が複数の低速コンシューマサーバ専用ブローカサーバで行われるので、これら複数のブローカサーバ間で低速コンシューマサーバへの配信処理の負荷を分散させることができる。
【0217】
第3の実施形態では、第1の実施形態の計算機システムに低速コンシューマサーバ専用ブローカサーバを追加したものであるが、第2の実施形態の計算機システムにおいて、同様にして低速コンシューマサーバ専用ブローカを追加することもできる。また、第3の実施形態においては、2台の低速コンシューマサーバ専用ブローカサーバを用いているが、低速コンシューマサーバ専用ブローカサーバを3台以上にしてもかまわない。さらに、第3の実施形態では、追加されたブローカサーバを低速コンシューマサーバ専用ブローカサーバの負荷が増大するまでは配信処理を実施させずに起動状態で待機させているが、通常時に配信処理を実施するようにしてもかまわない。例えば、ブローカサーバ200の負荷が小さいうちは、ブローカサーバ300を正常配信用ブローカサーバとして正常コンシューマサーバの配信処理の負荷分散のために稼働させ、ブローカサーバ200の負荷が増加してきたら、ブローカサーバ300を低速コンシューマサーバ専用ブローカとして用いるといったように、ブローカサーバの種別を動的に変更してもよい。
【0218】
以上説明した実施形態によれば、低速コンシューマサーバへのメッセージの配信処理、あるいは、ストレージ装置へのメッセージの永続化の処理を特定のブローカサーバに集約しているので、低速コンシューマサーバの発生時においても、ブローカサーバのリソースを効率的に利用することができ、コンシューマサーバへの安定したメッセージの配信を実現することができる。
【0219】
本発明は、上述した各実施形態によって限定されるものではなく、本発明の主旨から逸脱しない範囲において様々な態様を採りえるものであることは言うまでもない。たとえば、上述した各実施形態では、各コンシューマサーバへ配信すべきメッセージの滞留量を得るために、各コンシューマサーバに配信した最後のメッセージのメッセージIDを利用しているが、各コンシューマサーバへの配信が完了していないメッセージの数を直接的に管理してもよい。具体的には、各コンシューマサーバへのメッセージ配信時にカウンタ値をインクリメントし、確認応答受信時にそのカウンタをデクリメントすることで、滞留メッセージ量を直接的に管理することができる。
【0220】
また、上述した実施形態では、プロデューササーバから配信されるメッセージを1台の通常配信用ブローカサーバのみが受信し、そのコピーを他のブローカサーバに転送しているが、プロデューササーバにおいて、その配信順と共に個々のメッセージを一意に識別できる識別子を配信メッセージに付加し、すべてのブローカサーバでプロデューササーバからの配信メッセージを受信するようにしてもよい。このようにすることで、正常配信用ブローカサーバの処理負荷をさらに低減させ、より効率よくメッセージ配信を行うことが可能となる。
【符号の説明】
【0221】
100、200、300・・・ブローカサーバ
400・・・プロデューササーバ
500・・・コンシューマサーバ
105・・・ストレージ装置
106・・・ネットワーク
800・・・共有ストレージ装置
111・・・データ送受信部
112・・・メッセージ管理部
113・・・配信処理部
114・・・切替処理部
115・・・管理インタフェース
116・・・メッセージバッファ
610・・・ブローカサーバ表
620・・・配信担当表
630・・・メッセージIDカウンタ
640・・・配信状況表
710、720、730・・・判定しきい値表

【特許請求の範囲】
【請求項1】
第1及び第2の計算機と、少なくとも前記第2の計算機からアクセス可能な記憶装置とを備え、配信元から送られるメッセージを受信し、ネットワークを介して前記メッセージを複数の配信先計算機に配信するメッセージ配信システムにおけるメッセージの配信方法であって、
前記第1の計算機により配信元から送られてくるメッセージを受信し、
前記複数の配信先計算機の少なくとも一部の配信先計算機に前記メッセージを配信するとともに、前記少なくとも一部の配信先計算機へのメッセージの配信状況を取得し、
前記配信状況に基づいて、前記少なくとも一部の配信先計算機の中に配信が遅延している低速配信先計算機が存在するか否かを判別し、
前記低速配信先計算機が存在する場合、前記第1の計算機から前記第2の計算機に前記低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、
前記第2の計算機により前記低速配信先計算機へのメッセージの配信を引き継ぎ、前記低速配信先計算機にメッセージを配信するメッセージ配信方法。
【請求項2】
請求項1記載のメッセージ配信方法において、前記受信するステップは、前記メッセージを前記第2の計算機に転送するステップを含み、
前記低速配信先計算機にメッセージを配信するステップは、前記転送するステップによって前記第1の計算機から転送された前記メッセージを前記低速配信先計算機に配信することを特徴とするメッセージ配信方法。
【請求項3】
請求項2記載のメッセージ配信方法は、さらに、
前記第2の計算機において、前記第1の計算機から転送された前記メッセージを前記第2の計算機が有するバッファメモリに保持するステップと、
前記バッファメモリの使用状況を監視し、前記バッファメモリの使用率が所定の使用率を超えた場合に少なくとも一部のメッセージを前記記憶装置に格納するステップとを含むことを特徴とするメッセージ配信方法。
【請求項4】
請求項1記載のメッセージ配信方法は、さらに、
前記第2の計算機において、前記第2の計算機がメッセージの配信を行っている配信先計算機の各々について、メッセージの配信状況を取得し、
前記第2の計算機がメッセージの配信を行っている配信先計算機の各々について取得した前記配信状況に基づいて、メッセージの配信が正常に行われている正常配信先計算機の有無を判別し、
正常配信先計算機が存在する場合、前記第2の計算機から前記第1の計算機に前記正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、
前記第1の計算機により前記正常配信先計算機へのメッセージの配信を引き継ぎ、前記正常配信先計算機にメッセージを配信することを特徴とするメッセージ配信方法。
【請求項5】
請求項4記載のメッセージ配信方法は、さらに、
前記メッセージを前記第1の計算機が有するバッファメモリに保持するステップと、
前記バッファメモリの使用状況を監視するステップとを含み、
前記バッファメモリの使用率が所定の使用率を超えたときに前記低速配信先計算機が存在するか否かを判別するステップ以降の処理が実施されることを特徴とするメッセージの配信方法。
【請求項6】
請求項5記載のメッセージ配信方法において、前記低速配信先計算機が存在するか否かを判別するステップは、前記配信状況として、各配信先計算機へ配信すべきメッセージの滞留数、滞留量、各配信先計算機との間のネットワークの状態、及び、各配信先計算機からの応答時間のいずれかを用いることを特徴とするメッセージ配信方法。
【請求項7】
請求項4記載のメッセージ配信方法において、前記正常配信先計算機の有無を判別するステップは、前記第2の計算機がメッセージを配信している配信先計算機の各々について取得した前記配信状況に基づいて、配信すべきメッセージの滞留量が所定数以下となっている配信先計算機を前記正常配信先計算機とすることを特徴とするメッセージ配信方法。
【請求項8】
請求項7記載のメッセージ配信方法において、さらに、
前記第2の計算機において、前記第2の計算機のリソースの利用率を取得するステップと、
前記リソースの利用率が所定の利用率を超えたときに前記正常配信先計算機の有無を判別するステップ以降の処理が実施されることを特徴とするメッセージ配信方法。
【請求項9】
請求項7記載のメッセージ配信方法において、前記正常配信先計算機の有無を判別するステップ以降の処理は、予め設定された時間間隔で実施されることを特徴とするメッセージ配信方法。
【請求項10】
請求項4記載のメッセージ配信方法において、前記メッセージ配信システムはさらに第3の計算機を有し、前記メッセージ配信方法は、さらに、
前記第2の計算機がメッセージを配信している配信先計算機の各々について取得した前記配信状況に基づいて、前記第2の計算機がメッセージを配信している配信先計算機の中に配信が遅延している第2の低速配信先計算機が存在するか否かを判別し、
前記第2の低速配信先計算機が存在する場合、前記第2の計算機から前記第3の計算機に前記第2の低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、
前記第3の計算機により前記第2の低速配信先計算機へのメッセージの配信を引き継ぎ、前記第2の低速配信先計算機にメッセージを配信するメッセージの配信方法。
【請求項11】
複数の配信先計算機にネットワークを介してメッセージを配信するメッセージ配信システムであって、
受信したメッセージを保持する第1のバッファと、前記複数の配信先計算機の各々へのメッセージの配信を担当する計算機を指定する第1の担当情報と、前記第1の担当情報で自計算機が担当として割り当てられている第1の担当配信先計算機へのメッセージの配信状況を示す第1の配信状況情報とを保持するメモリ、
配信元から送られてくるメッセージを受信して前記第1のバッファに保持する第1のメッセージ管理手段、
前記第1の担当配信先計算機に前記メッセージを配信して前記第1の配信状況情報を更新する第1の配信手段、及び、
前記第1の配信状況情報に基づいて、前記第1の担当配信先計算機の中に配信が遅延している低速配信先計算機が存在するか否かを判別し、前記低速配信先計算機が存在する場合、前記低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する第1の切替手段を有する第1の計算機と、
受信したメッセージを保持する第2のバッファ、前記複数の配信先計算機の各々へのメッセージの配信を担当する計算機を指定する第2の担当情報、及び、前記第2の担当情報で自計算機が担当として割り当てられている第2の担当配信先計算機へのメッセージの配信状況を示す第2の配信状況情報を保持するメモリ、
配信元から送られてくるメッセージを受信して前記第2のバッファに保持する第2のメッセージ管理手段、及び
前記第1の計算機からの前記要求に応じて前記低速配信先計算機を前記第2の担当配信先計算機に含まれるよう前記第2の担当情報を更新し、前記第2の担当配信先計算機にメッセージを配信して前記第2の配信状況情報を更新する第2の配信手段を有する第2の計算機と、
を有するメッセージ配信システム。
【請求項12】
前記第1のメッセージ管理部は、前記配信元から送られてくるメッセージのコピーを前記第2の計算機に送信し、
前記第2のメッセージ管理部は、前記メッセージのコピーを前記配信元から送られてくるメッセージを受信して前記第2のバッファに格納することを特徴とする請求項11記載のメッセージ配信システム。
【請求項13】
前記第2のメッセージ管理部は、前記第2のバッファの使用状況を監視し、前記バッファの使用率が所定の使用率を超えた場合、少なくとも一部のメッセージを前記ストレージ装置に格納することを特徴とする請求項12記載のメッセージ配信システム。
【請求項14】
前記第2の計算機は、さらに、前記第2の配信状況情報に基づいて、メッセージの配信が正常に行われている正常配信先計算機の有無を判別し、前記正常配信先計算機が存在する場合、前記正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する第2の切替手段を有し、
前記第1の配信手段は、前記第2の計算機からの前記要求に応じて前記正常配信先計算機を前記第1の担当配信先計算機に含まれるよう前記第1の担当情報を更新することを特徴とする請求項11記載のメッセージ配信システム。
【請求項15】
前記第1の切替手段は、前記第1のバッファの使用状況を監視し、前記第1のバッファの使用率が所定の使用率を超えたときに前記低速配信先計算機が存在するか否かを判別して前記切り替えを要求することを特徴とする請求項14記載のメッセージ配信システム。
【請求項16】
前記第1の切替手段は、前記低速配信先計算機が存在するか否かの判別を、各配信先計算機へ配信すべきメッセージの滞留数、滞留量、各配信先計算機との間のネットワークの状態、及び、各配信先計算機からの応答時間のいずれかを用いて行うことを特徴とする請求項15記載のメッセージ配信システム。
【請求項17】
前記第2の切替手段は、前記第2の計算機のリソースの利用率を取得し、前記リソースの利用率が所定の利用率を超えたときに前記正常配信先計算機の有無を判別し、前記計算機の切り替えを要求することを特徴とする請求項14記載のメッセージ配信システム。
【請求項18】
前記第2の切替手段は、前記第2の配信状況情報に基づいて、配信すべきメッセージの滞留量が所定数以下となっている前記第2の担当配信先計算機を前記正常配信先計算機として判別することを特徴とする請求項17記載のメッセージ配信システム。
【請求項19】
請求項14記載のメッセージ配信システムは、さらに、
受信したメッセージを保持する第3のバッファ、前記複数の配信先計算機の各々へのメッセージの配信を担当する計算機を指定する第3の担当情報、及び、前記第3の担当情報で自計算機が担当として割り当てられている第3の担当配信先計算機へのメッセージの配信状況を示す第2の配信状況情報を保持するメモリ、
配信元から送られてくるメッセージを受信して前記第3のバッファに保持する第3のメッセージ管理手段、及び
前記第3の担当配信先計算機にメッセージを配信して前記第3の配信状況情報を更新する第3の配信手段を有する第3の計算機を備え、
前記第2の切替手段は、前記第2の配信状況情報に基づいて、前記第2の担当配信先計算機の中に配信が遅延している第2の低速配信先計算機が存在するか否かを判別し、前記第2の低速配信先計算機が存在する場合、前記第2の低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、
前記第3の配信手段は、前記第2の計算機からの前記要求に応じて前記第2の低速配信先計算機を前記第3の担当配信先計算機に含まれるよう前記第3の担当情報を更新することを特徴とするメッセージ配信システム。
【請求項20】
複数の配信先計算機にネットワークを介してメッセージを配信するメッセージ配信計算機であって、
メッセージを保持するバッファ、前記複数の配信先計算機の中でメッセージの配信が遅延している低速配信先計算機へのメッセージの配信を担当する計算機であるか否かを示す計算機情報、前記複数の配信先計算機の各々へメッセージの配信を担当する計算機を指定する担当情報、及び、前記担当情報で自計算機が担当として割り当てられている前記配信先計算機へのメッセージの配信状況情報を保持するメモリと、
送られてくるメッセージを受信して前記バッファに格納するメッセージ管理手段と、
前記担当情報に基づいて前記メッセージ管理手段が受信したメッセージを自計算機が担当する配信先計算機に配信して前記配信状況情報を更新する配信処理手段と、
自計算機が前記低速配信先計算機へのメッセージ配信を担当しない場合、前記配信状況情報に基づいて、前記担当する配信先計算機の中に存在する前記低速配信先計算機を検知して、検知された低速配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求し、自計算機が前記低速配信先計算機へのメッセージ配信を担当する場合、前記配信状況情報に基づいて、前記担当する配信先計算機の中に存在する配信が遅延していない正常配信先サーバを検知して、当該正常配信先計算機へのメッセージの配信を担当する計算機の切り替えを要求する切替手段とを有するメッセージ配信計算機。

【図1】
image rotate

【図2】
image rotate

【図3】
image rotate

【図4】
image rotate

【図5】
image rotate

【図6】
image rotate

【図7】
image rotate

【図8】
image rotate

【図9】
image rotate

【図10】
image rotate

【図11】
image rotate

【図12】
image rotate

【図13】
image rotate

【図14】
image rotate

【図15】
image rotate

【図16】
image rotate

【図17】
image rotate

【図18】
image rotate

【図19】
image rotate

【図20】
image rotate

【図21】
image rotate

【図22】
image rotate

【図23】
image rotate

【図24】
image rotate

【図25】
image rotate

【図26】
image rotate

【図27】
image rotate

【図28】
image rotate

【図29】
image rotate

【図30】
image rotate

【図31】
image rotate


【公開番号】特開2011−170572(P2011−170572A)
【公開日】平成23年9月1日(2011.9.1)
【国際特許分類】
【出願番号】特願2010−33063(P2010−33063)
【出願日】平成22年2月18日(2010.2.18)
【出願人】(000005108)株式会社日立製作所 (27,607)
【Fターム(参考)】