00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #ifndef SEMAINE_COMPONENTS_META_METAMESSENGER_H
00011 #define SEMAINE_COMPONENTS_META_METAMESSENGER_H
00012
00013 #include <list>
00014
00015 #include <semaine/config.h>
00016
00017 #include <decaf/util/concurrent/Concurrent.h>
00018 #include <decaf/util/concurrent/Mutex.h>
00019
00020 #include <cms/MessageListener.h>
00021 #include <cms/MessageProducer.h>
00022 #include <cms/MessageConsumer.h>
00023
00024 #include <semaine/cms/IOBase.h>
00025 #include <semaine/cms/receiver/Receiver.h>
00026 #include <semaine/cms/sender/Sender.h>
00027
00028 using namespace cms;
00029 using namespace semaine::cms;
00030
00031 namespace semaine {
00032 namespace components {
00033 namespace meta {
00034
00035 class Statistics;
00036
00037 class MetaMessenger : public IOBase, public MessageListener, public decaf::util::concurrent::Mutex
00038 {
00039 public:
00040
00041 static const std::string COMPONENT_NAME;
00042 static const std::string COMPONENT_STATE;
00043 static const std::string COMPONENT_STATE_DETAILS;
00044 static const std::string RECEIVE_TOPICS;
00045 static const std::string SEND_TOPICS;
00046 static const std::string IS_INPUT;
00047 static const std::string IS_OUTPUT;
00048 static const std::string LAST_SEEN_ALIVE;
00049 static const std::string AVERAGE_ACT_TIME;
00050 static const std::string AVERAGE_REACT_TIME;
00051 static const std::string AVERAGE_TRANSMIT_TIME;
00052 static const std::string TOTAL_MESSAGES_RECEIVED;
00053
00054
00055 static const std::string SYSTEM_READY;
00056 static const std::string SYSTEM_READY_TIME;
00057 static const std::string PING;
00058 static const std::string REPORT_TOPICS;
00059
00060 static const long long TIMEOUT_PERIOD;
00061
00062 MetaMessenger(const std::string & componentName) throw(CMSException);
00063
00064 MetaMessenger(const std::string & cmsUrl, const std::string & cmsUser, const std::string & cmsPassword, const std::string & componentName) throw(CMSException);
00065
00066 void reportState(const std::string & state, const std::string & message = "", const std::exception * exc = NULL)
00067 throw(CMSException);
00068
00069 void reportTopics(std::list<semaine::cms::receiver::Receiver *> & receivers,
00070 std::list<semaine::cms::sender::Sender *> & senders,
00071 bool isInput, bool isOutput)
00072 throw(CMSException);
00073
00074 virtual void onMessage(const Message * m);
00075
00076 bool isSystemReady();
00077
00086 long long getTime();
00087
00088 void IamAlive();
00089
00090 Statistics * statistics() { return _statistics; }
00091
00092
00093 private:
00094 const std::string componentName;
00095 MessageProducer * producer;
00096 MessageConsumer * consumer;
00097 bool systemReady;
00098 long long timeDelta;
00099 long long lastSeenAlive;
00100 std::string lastSeenState;
00101 std::string receiveTopics;
00102 std::string sendTopics;
00103 bool isInput;
00104 bool isOutput;
00105 Statistics * _statistics;
00106
00107 void setSystemReady(bool ready);
00108 void setTime(long long systemTime);
00109 void _reportTopics()
00110 throw(CMSException);
00111
00112 };
00113
00114
00115 class Statistics
00116 {
00117 public:
00118 Statistics(int memory) : memory(memory), messagesReceived(0)
00119 {}
00120
00121 void actTime(long millis);
00122
00123 void reactTime(long millis);
00124
00125 void transmitTime(long millis);
00126
00127 void countMessageReceived() { messagesReceived++; }
00128
00129 int getTotalMessagesReceived() { return messagesReceived; }
00130
00131 long avgActTime() { return average(actTimes); }
00132
00133 long avgReactTime() { return average(reactTimes); }
00134
00135 long avgTransmitTime() { return average(transmitTimes); }
00136
00137 private:
00138 size_t memory;
00139 std::list<long> actTimes;
00140 std::list<long> reactTimes;
00141 std::list<long> transmitTimes;
00142 int messagesReceived;
00143
00144 long average(std::list<long> & values);
00145
00146 };
00147
00148 }
00149 }
00150 }
00151
00152
00153
00154 #endif
00155