00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00015 #include "TriD/stat/ReadDispatcherModule.h"
00016 #include "MessageService/MsgService.h"
00017 #include "MinosObjectMap/MomNavigator.h"
00018 #include "JobControl/JobCModuleRegistry.h"
00019
00020 #include "TIterator.h"
00021 #include "TROOT.h"
00022 #include "TSystem.h"
00023 #include "RawData/RawRecord.h"
00024 #include "Persistency/Per.h"
00025 #include "Dispatcher/DDS.h"
00026 #include "Dispatcher/DDSClient.h"
00027 #include "Dispatcher/DDSSubscription.h"
00028 #include "DataUtil/DumpMom.h"
00029 #include "RawData/RawRecord.h"
00030 #include "RawData/RawDaqSnarlHeader.h"
00031 #include "RawData/RawDigitDataBlock.h"
00032 #include "RawData/RawBlockRegistry.h"
00033 #include "RawData/RawBlockId.h"
00034 #include "RawData/RawDigit.h"
00035
00036
00037 JOBMODULE(ReadDispatcherModule, "ReadDispatcherModule",
00038 "ReadDispatcherModule");
00039 CVSID("$Id: ReadDispatcherModule.cxx,v 1.3 2007/03/01 16:59:43 rhatcher Exp $");
00040
00041
00042 ReadDispatcherModule::ReadDispatcherModule()
00043 {
00044 fClient = 0;
00045 fConnected = false;
00046 fSurrogateMom = 0;
00047 }
00048
00049
00050
00051
00052 ReadDispatcherModule::~ReadDispatcherModule()
00053 {
00054 if(fClient) delete fClient;
00055 }
00056
00057
00058 Int_t ReadDispatcherModule::ConnectToServer( void )
00059 {
00060
00061
00062 if(fClient) delete fClient;
00063 fConnected = false;
00064
00065
00066
00067 fClient = new DDSClient(fDDSServer.c_str(),fDDSPort);
00068
00069
00070 if (! fClient -> IsValid() ) {
00071 MSG("ReadDisp",Msg::kInfo) << "Error in creation of socket connected to server." << endl;
00072 delete fClient; fClient = 0;
00073 return 1;
00074 }
00075 else {
00076 MSG("ReadDisp",Msg::kInfo) << "Successfully connected to dispatcher server.\n"
00077 << fClient << endl;
00078 }
00079
00080
00081 fClient->GetSubscription()
00082 ->SetDataSource((DDS::EDataSource)DDS::GetDataSourceCode(fDDSDataSource.c_str()));
00083
00084
00085 fClient->GetSubscription()
00086 ->SetStreams(fStreams.c_str());
00087
00088 fClient->GetSubscription()->
00089 SetKeepUpMode((DDS::EKeepUpMode)DDS::GetKeepUpCode(fDDSKeepUpMode.c_str()));
00090
00091
00092 fClient->GetSubscription()->
00093 SetSelection(fStreams.c_str(),fSelectRule.c_str());
00094
00095
00096
00097 DDS::EMessageType msgrc = fClient->Subscribe();
00098 if (msgrc != DDS::kOk) {
00099 MSG("ReadDisp",Msg::kInfo) << "An error message " << DDS::AsString(msgrc)
00100 << " was received from DDS::Subscribe." << endl;
00101 return 1;
00102 }
00103
00104 fConnected = true;
00105 return 0;
00106 }
00107
00108 Bool_t ReadDispatcherModule::IsNewEventReady( void )
00109 {
00110
00111
00112
00113
00114
00115 if(!fConnected) ConnectToServer();
00116
00117 if(!fConnected) return false;
00118
00119 DDS::EMessageType msgrc = DDS::kOk;
00120
00121 MomNavigator* newMom = 0;
00122 msgrc = fClient->Next(newMom,fDDSTimeOut);
00123 if (msgrc == DDS::kOk ) {
00124
00125
00126 if( newMom ) {
00127
00128
00129
00130 if(newMom->GetFragmentArray()->IsEmpty()){
00131 cout << "Empty mom." << endl;
00132 delete newMom;
00133 } else {
00134
00135
00136
00137
00138 if( fSurrogateMom ) delete fSurrogateMom;
00139 fSurrogateMom = newMom;
00140
00141 if(MsgService::Instance()->GetStream("TriD")->IsActive(Msg::kDebug))
00142 DataUtil::dump_mom(newMom,std::cout);
00143
00144 return true;
00145 }
00146 }
00147
00148 } else {
00149 if(msgrc != DDS::kTimeoutNewRecord) {
00150
00151 MSG("ReadDisp",Msg::kInfo) << "Dispatcher Error "
00152 << DDS::AsString(msgrc)
00153 << " Will try reconnect."
00154 << endl;
00155
00156 fClient->Shutdown();
00157 delete fClient;
00158 fClient = 0;
00159 fConnected = false;
00160 }
00161 }
00162
00163 if(fSurrogateMom) return true;
00164 return false;
00165 }
00166
00167
00168
00169 JobCResult ReadDispatcherModule::Get()
00170 {
00171
00172
00173
00174
00175
00176
00177 return JobCResult::kAOK;
00178 }
00179
00180
00181 JobCResult ReadDispatcherModule::Get(MomNavigator* mom)
00182 {
00183
00184
00185
00186
00187
00188
00189 if(! fSurrogateMom) {
00190
00191 IsNewEventReady();
00192 }
00193
00194 if(! fSurrogateMom) return JobCResult::kFailed;
00195
00196
00197
00198
00199 mom->Clear();
00200 TObjArray* objArray =const_cast<TObjArray*>
00201 (fSurrogateMom -> GetFragmentArray());
00202 Int_t nrecord = objArray -> GetEntries();
00203 for ( Int_t irec=0; irec < nrecord; irec++ ) {
00204
00205 RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00206 mom -> AdoptFragment(record);
00207 }
00208 delete fSurrogateMom; fSurrogateMom=0;
00209
00210
00211 return JobCResult::kPassed;
00212 }
00213
00214
00215
00216 const Registry& ReadDispatcherModule::DefaultConfig() const
00217 {
00218
00219
00220
00221 static Registry r;
00222
00223
00224 std::string name = this->GetName();
00225 name += ".config.default";
00226 r.SetName(name.c_str());
00227
00228
00229 r.UnLockValues();
00230 r.Set("DDSServer", "localhost");
00231 r.Set("DDSPort", 9090);
00232 r.Set("Streams", "DaqSnarl");
00233 r.Set("DDSKeepUpMode", "RecordKeepUp");
00234 r.Set("DDSDataSource", "Daq");
00235 r.Set("DDSTimeOut", 1);
00236 r.Set("SelectRule", "true");
00237 r.LockValues();
00238
00239 return r;
00240 }
00241
00242
00243
00244 void ReadDispatcherModule::Config(const Registry& r)
00245 {
00246
00247
00248
00249 int tmpi;
00250 const char* tmps;
00251
00252 if (r.Get("DDSServer",tmps)) { fDDSServer = tmps; }
00253 if (r.Get("DDSPort",tmpi)) { fDDSPort = tmpi; }
00254 if (r.Get("Streams",tmps)) { fStreams = tmps; }
00255 if (r.Get("DDSKeepUpMode",tmps)) { fDDSKeepUpMode = tmps; }
00256 if (r.Get("DDSDataSource",tmps)) { fDDSDataSource = tmps; }
00257 if (r.Get("DDSTimeOut",tmpi)) { fDDSTimeOut = tmpi; }
00258 if (r.Get("SelectRule",tmps)) { fSelectRule = tmps; }
00259 }
00260