00001
00002
00003
00004
00005
00006
00007
00009 #include "IoModules/IoDDSStreamItr.h"
00010
00011 #include "Dispatcher/DDSClient.h"
00012 #include "Dispatcher/DDSSubscription.h"
00013 #include "MessageService/MsgService.h"
00014 #include "MinosObjectMap/MomNavigator.h"
00015 #include "Persistency/Per.h"
00016 #include "Validity/VldContext.h"
00017
00018 #include "TSystem.h"
00019
00020 CVSID("$Id: IoDDSStreamItr.cxx,v 1.16 2007/08/24 05:37:46 schubert Exp $");
00021
00022
00023
00024 JobCResult IoDDSStreamItr::GoToFile(const char* filename,
00025 const char* ) {
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00038
00039 if ( this -> IsModified() ) {
00040 bool isOk = this -> Subscribe();
00041 if ( !isOk ) {
00042 MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00043 return JobCResult::kWarning;
00044 }
00045 }
00046
00047 std::string returnedfilename;
00048 DDS::EMessageType msgrc = fDDSClient->GoToFile(filename,returnedfilename);
00049 if (msgrc != DDS::kOk) {
00050 MSG("Io",Msg::kWarning) <<
00051 "An error message " << DDS::AsString(msgrc) <<
00052 " was received from DDSClient::GoToFile " << filename << "." << endl;
00053 return JobCResult::kWarning;
00054 }
00055
00056 return JobCResult::kAOK;
00057
00058 }
00059
00060
00061
00062 JobCResult IoDDSStreamItr::NextFile(int n, const char* ) {
00063
00064
00065
00066
00067
00068
00069 if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00070
00071 if ( this -> IsModified() ) {
00072 bool isOk = this -> Subscribe();
00073 if ( !isOk ) {
00074 MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00075 return JobCResult::kWarning;
00076 }
00077 }
00078
00079 for ( int i = 0; i < n; i++ ) {
00080 std::string returnedfilename;
00081 DDS::EMessageType msgrc = fDDSClient->GoToNextFile(returnedfilename);
00082 if (msgrc != DDS::kOk) {
00083 MSG("Io",Msg::kWarning) <<
00084 "An error message " << DDS::AsString(msgrc) <<
00085 " was received from DDSClient::GoToNextFile." << endl;
00086 return JobCResult::kWarning;
00087 }
00088 }
00089
00090 return JobCResult::kAOK;
00091
00092 }
00093
00094
00095
00096 void IoDDSStreamItr::AddFile(const char* , int ,
00097 const char* ) {
00098 return;
00099 }
00100
00101
00102
00103 void IoDDSStreamItr::RemoveFile(const char* ,
00104 const char* ) {
00105 return;
00106 }
00107
00108
00109
00110 JobCResult IoDDSStreamItr::PrevFile(int , const char* ){
00111 MSG("Io",Msg::kWarning) << " DDS Stream does not support PrevFile" << endl;
00112 return JobCResult::kWarning;
00113 }
00114
00115
00116
00117 const char* IoDDSStreamItr::GetCurrentFile(const char* ) const{
00118 MSG("Io",Msg::kWarning) << " DDS Stream does not support GetCurrentFile"
00119 << endl;
00120 return "";
00121 }
00122
00123
00124
00125 JobCResult IoDDSStreamItr::GoToFile(int ,
00126 const char* ) {
00127 MSG("Io",Msg::kWarning) << " DDS Stream does not support GoToFile n" << endl;
00128 return JobCResult::kWarning;
00129 }
00130
00131
00132
00133 std::ostream& IoDDSStreamItr::ListFile(std::ostream& os,
00134 const char* ) const
00135 {
00136 return os;
00137 }
00138
00139
00140
00141 IoDDSStreamItr::IoDDSStreamItr(const char* server, unsigned int port,
00142 unsigned int nretry, unsigned int delay,
00143 DDS::EClientType clienttype,
00144 string clientname) :
00145 fPort(port),fTimeOut(120),fMaxRetry(nretry),fRetryDelay(delay),
00146 fDDSClient(0),fModified(true),fClientType(clienttype),
00147 fClientName(clientname)
00148 {
00149 this->SetSourceName(server);
00150 this->SetClientType(clienttype);
00151 this->SetClientName(clientname);
00152 this->InitDDSClient();
00153 }
00154
00155
00156
00157 IoDDSStreamItr::~IoDDSStreamItr()
00158 {
00159 this->ShutdownDDSClient();
00160 }
00161
00162
00163
00164 void IoDDSStreamItr::InitDDSClient()
00165 {
00166
00167
00168
00169
00170 unsigned int attempt = fMaxRetry+1;
00171 fDDSClient = 0;
00172 while (!fDDSClient && attempt) {
00173 fDDSClient = new DDSClient(this->GetSourceName(),fPort,DDS::kData,
00174 fClientType,fClientName);
00175 --attempt;
00176
00177 if (fDDSClient==0 || !fDDSClient->IsValid()) {
00178
00179 MSG("Io",Msg::kError)
00180 << "Unable to create DDSClient to host " << this->GetSourceName()
00181 << " on port " << fPort << "," << endl
00182 << " will attempt " << attempt << " more times." << endl;
00183 this->ShutdownDDSClient();
00184 gSystem->Sleep(1000*fRetryDelay);
00185 }
00186 }
00187 fModified = true;
00188
00189 }
00190
00191
00192 int IoDDSStreamItr::Subscribe()
00193 {
00194
00195
00196
00197 int isOk = 0;
00198 if ( !IsValid() ) return isOk;
00199
00200 DDS::EMessageType msgrc = fDDSClient->Subscribe();
00201 if (msgrc != DDS::kOk) {
00202 MSG("Io",Msg::kError) <<
00203 "An error message " << DDS::AsString(msgrc) <<
00204 " was received from DDS::Subscribe." <<
00205 "( host " << this->GetSourceName() <<
00206 ", port " << fPort << ")." << endl;
00207 this->ShutdownDDSClient();
00208 }
00209 else {
00210 isOk = 1;
00211 fModified = false;
00212 }
00213
00214 return isOk;
00215
00216 }
00217
00218
00219
00220 void IoDDSStreamItr::ShutdownDDSClient()
00221 {
00222 if (fDDSClient) { delete fDDSClient; fDDSClient = 0; }
00223 }
00224
00225
00226
00227 int IoDDSStreamItr::LoadRecords(MomNavigator* mom)
00228 {
00229
00230
00231
00232
00233 if ( this -> IsModified() ) {
00234 bool isOk = this -> Subscribe();
00235 if ( !isOk ) return 0;
00236 }
00237
00238
00239 DDS::EMessageType rtn = fDDSClient->Next(mom, fTimeOut, 1);
00240 if (rtn != DDS::kOk) {
00241 MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00242 << DDS::AsString(rtn) << "." << endl;
00243 return 0;
00244 }
00245
00246 return 1;
00247 }
00248
00249
00250
00251 int IoDDSStreamItr::Increment(int n, MomNavigator* m)
00252 {
00253
00254
00255
00256 if (n==0) return 0;
00257
00258 if ( this -> IsModified() ) {
00259 bool isOk = this -> Subscribe();
00260 if ( !isOk ) return 0;
00261 }
00262
00263
00264 if (n==1) return 1;
00265
00266
00267 MomNavigator* mtmp = 0;
00268 bool dodel = false;
00269 if (m==0) { mtmp = new MomNavigator; dodel = true; }
00270 else { mtmp = m; }
00271
00272 DDS::EMessageType rtn = fDDSClient->Next(mtmp, fTimeOut, n-1);
00273
00274 if (dodel) { delete mtmp; mtmp = 0; }
00275
00276 if (rtn != DDS::kOk) {
00277 MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00278 << DDS::AsString(rtn) << "." << endl;
00279 return 0;
00280 }
00281
00282 return n;
00283 }
00284
00285
00286
00287 int IoDDSStreamItr::Decrement(int n, MomNavigator* )
00288 {
00289 MSG("Io",Msg::kWarning) <<
00290 "DDS Stream does not support Decrement(" << n << ").\n";
00291 return 0;
00292 }
00293
00294
00295
00296 JobCResult IoDDSStreamItr::GoTo(const VldContext& vld, MomNavigator* )
00297 {
00298 MSG("Io",Msg::kWarning) <<
00299 "DDS Stream does not support GoTo(" << vld << ").\n";
00300 return JobCResult::kWarning;
00301 }
00302
00303
00304
00305 int IoDDSStreamItr::GoToEOF()
00306 {
00307 MSG("Io",Msg::kWarning) <<
00308 "DDS Stream does not support GoToEOF().\n";
00309 return 0;
00310 }
00311
00312
00313
00314 int IoDDSStreamItr::SetMaxSyncDelay(unsigned int maxSyncDelay)
00315 {
00316
00317
00318 int isOpen = 0;
00319 if ( this -> IsValid() ) {
00320 fDDSClient -> GetSubscription() -> SetMaxSyncDelay(maxSyncDelay);
00321 isOpen = 1;
00322 }
00323
00324 fModified = true;
00325 return isOpen;
00326
00327 }
00328
00329
00330
00331
00332 int IoDDSStreamItr::SetOffLine(bool offLine)
00333 {
00334
00335
00336 int isOpen = 0;
00337 if ( this -> IsValid() ) {
00338 fDDSClient -> GetSubscription() -> SetOffLine(offLine);
00339 isOpen = 1;
00340 }
00341
00342 fModified = true;
00343 return isOpen;
00344
00345 }
00346
00347
00348
00349 int IoDDSStreamItr::SetDataSource(unsigned int dataSource)
00350 {
00351
00352
00353 int isOpen = 0;
00354 if ( this -> IsValid() ) {
00355 DDS::EDataSource sourceEnum = static_cast<DDS::EDataSource>(dataSource);
00356 fDDSClient -> GetSubscription() -> SetDataSource(sourceEnum);
00357 isOpen = 1;
00358 }
00359
00360 fModified = true;
00361 return isOpen;
00362
00363 }
00364
00365
00366
00367 int IoDDSStreamItr::SetKeepUpMode(unsigned int keepUpMode)
00368 {
00369
00370
00371 int isOpen = 0;
00372 if ( this -> IsValid() ) {
00373 DDS::EKeepUpMode keepUpEnum = static_cast<DDS::EKeepUpMode>(keepUpMode);
00374 fDDSClient -> GetSubscription() -> SetKeepUpMode(keepUpEnum);
00375 isOpen = 1;
00376 }
00377
00378 fModified = true;
00379 return isOpen;
00380
00381 }
00382
00383
00384
00385 void IoDDSStreamItr::SetTimeOut(unsigned int timeout) { fTimeOut = timeout; }
00386
00387
00388
00389 void IoDDSStreamItr::SetClientType(DDS::EClientType clienttype)
00390 { fClientType = clienttype; }
00391
00392
00393
00394 void IoDDSStreamItr::SetClientName(string clientname)
00395 { fClientName = clientname; }
00396
00397
00398
00399 int IoDDSStreamItr::Streams(const char* streamList)
00400 {
00401
00402
00403 Int_t numStream = 0;
00404 if ( this->IsValid() ) {
00405 numStream = fDDSClient -> GetSubscription() -> SetStreams(streamList);
00406 }
00407
00408 fModified = true;
00409 return numStream;
00410
00411 }
00412
00413
00414
00415 int IoDDSStreamItr::Select(const char* stream, const char* selection,
00416 bool )
00417 {
00418
00419
00420 int isOpen = 0;
00421 if ( this->IsValid() ) {
00422 fDDSClient -> GetSubscription() -> SetSelection(stream,selection);
00423 isOpen = 1;
00424 }
00425 fModified = true;
00426
00427 return isOpen;
00428
00429 }
00430
00432
00433
00434
00435
00436
00437
00438
00439