Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

IoDDSStreamItr.cxx

Go to the documentation of this file.
00001 
00002 // $Id: IoDDSStreamItr.cxx,v 1.16 2007/08/24 05:37:46 schubert Exp $
00003 //
00004 // Read data from a dispatcher stream
00005 //
00006 // messier@huhepl.harvard.edu
00007 // schubert@hep.umn.edu
00009 #include "IoModules/IoDDSStreamItr.h"
00010 
00011 #include "Dispatcher/DDSClient.h"
00012 #include "Dispatcher/DDSSubscription.h"
00013 #include "MessageService/MsgService.h"
00014 #include "MinosObjectMap/MomNavigator.h"
00015 #include "Persistency/Per.h"
00016 #include "Validity/VldContext.h"
00017 
00018 #include "TSystem.h"
00019 
00020 CVSID("$Id: IoDDSStreamItr.cxx,v 1.16 2007/08/24 05:37:46 schubert Exp $");
00021 
00022 //......................................................................
00023 
00024 JobCResult IoDDSStreamItr::GoToFile(const char* filename, 
00025                                     const char* /* streamlist */) {
00026 //======================================================================
00027 // Go to file filename. filename may be a symbolic link, e.g. "currentfile"
00028 // will advance to the target of the symbolic link "currentfile" in the
00029 // data source directory.  If filename is not absolute fullfilepathname,
00030 // its path is assumed relative to the data source directory defined on
00031 // the server.  If filename is missing in this directory, will position
00032 // pointer to the next file in the alphanumerically sorted list in the
00033 // data source directory.  Returns JobCResult::kAOK or JobCResult::kWarning
00034 // if error return from DDS, or JobCResult::kEndOfInputStream if not
00035 // connected.
00036 
00037   if ( !IsValid() ) return JobCResult::kEndOfInputStream; 
00038 
00039   if ( this -> IsModified() ) { // modified since last subscription
00040     bool isOk = this -> Subscribe(); 
00041     if ( !isOk ) {
00042       MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00043       return JobCResult::kWarning;
00044     }
00045   }
00046 
00047   std::string returnedfilename;
00048   DDS::EMessageType msgrc = fDDSClient->GoToFile(filename,returnedfilename);
00049   if (msgrc != DDS::kOk) { 
00050     MSG("Io",Msg::kWarning) << 
00051       "An error message " << DDS::AsString(msgrc) << 
00052       " was received from DDSClient::GoToFile " << filename << "." << endl;
00053     return JobCResult::kWarning;
00054   }
00055 
00056   return JobCResult::kAOK;
00057 
00058 }
00059 
00060 //......................................................................
00061 
00062 JobCResult IoDDSStreamItr::NextFile(int n, const char* /* streamlist */) {
00063 //======================================================================
00064 // Advance by n files in data source directory. Returns JobCResult::kAOK
00065 // or JobCResult::kWarning if error returned from DDS, or
00066 // JobCResult::kEndOfInputStream if not connected.
00067 //======================================================================
00068 
00069   if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00070 
00071   if ( this -> IsModified() ) { // modified since last subscription
00072     bool isOk = this -> Subscribe(); 
00073     if ( !isOk ) {
00074       MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00075       return JobCResult::kWarning;
00076     }
00077   }
00078 
00079   for ( int i = 0; i < n; i++ ) {
00080     std::string returnedfilename;
00081     DDS::EMessageType msgrc = fDDSClient->GoToNextFile(returnedfilename);
00082     if (msgrc != DDS::kOk) {
00083       MSG("Io",Msg::kWarning) << 
00084         "An error message " << DDS::AsString(msgrc) << 
00085         " was received from DDSClient::GoToNextFile." << endl;
00086       return JobCResult::kWarning;
00087     }
00088   }
00089 
00090   return JobCResult::kAOK;
00091 
00092 }
00093 
00094 //......................................................................
00095 
00096 void IoDDSStreamItr::AddFile(const char* /* filename */, int /* at */,
00097                             const char* /* streamlist */) {
00098   return;
00099 }
00100 
00101 //......................................................................
00102 
00103 void IoDDSStreamItr::RemoveFile(const char* /* filename */, 
00104                                const char* /* streamlist */) {
00105   return;
00106 }
00107 
00108 //......................................................................
00109 
00110 JobCResult IoDDSStreamItr::PrevFile(int /* n */, const char* /* streamlist */){
00111   MSG("Io",Msg::kWarning) << " DDS Stream does not support PrevFile" << endl;
00112   return JobCResult::kWarning;
00113 }
00114 
00115 //......................................................................
00116 
00117 const char* IoDDSStreamItr::GetCurrentFile(const char* /* streamlist */) const{
00118   MSG("Io",Msg::kWarning) << " DDS Stream does not support GetCurrentFile" 
00119                           << endl;
00120   return "";
00121 }
00122 
00123 //......................................................................
00124 
00125 JobCResult IoDDSStreamItr::GoToFile(int /* n */, 
00126                             const char* /* streamlist */) {
00127   MSG("Io",Msg::kWarning) << " DDS Stream does not support GoToFile n" << endl;
00128   return JobCResult::kWarning;
00129 }
00130 
00131 //......................................................................
00132 
00133 std::ostream& IoDDSStreamItr::ListFile(std::ostream& os, 
00134                             const char* /* streamlist */) const
00135 {
00136   return os;
00137 }
00138 
00139 //......................................................................
00140 
00141 IoDDSStreamItr::IoDDSStreamItr(const char* server, unsigned int port,
00142                                unsigned int nretry, unsigned int delay,
00143                                DDS::EClientType clienttype,
00144                                string clientname) :
00145   fPort(port),fTimeOut(120),fMaxRetry(nretry),fRetryDelay(delay),
00146   fDDSClient(0),fModified(true),fClientType(clienttype),
00147   fClientName(clientname) 
00148 {
00149   this->SetSourceName(server);
00150   this->SetClientType(clienttype);
00151   this->SetClientName(clientname);
00152   this->InitDDSClient();
00153 }
00154 
00155 //......................................................................
00156 
00157 IoDDSStreamItr::~IoDDSStreamItr() 
00158 {
00159   this->ShutdownDDSClient();
00160 }
00161 
00162 //......................................................................
00163 
00164 void IoDDSStreamItr::InitDDSClient() 
00165 {
00166 //======================================================================
00167 // Set up the data dispatcher client
00168 //======================================================================
00169 
00170   unsigned int attempt = fMaxRetry+1;
00171   fDDSClient = 0;
00172   while (!fDDSClient && attempt) {
00173     fDDSClient = new DDSClient(this->GetSourceName(),fPort,DDS::kData,
00174                                fClientType,fClientName);
00175     --attempt;
00176 
00177     if (fDDSClient==0 || !fDDSClient->IsValid()) {
00178       // Warning here...
00179       MSG("Io",Msg::kError)
00180         << "Unable to create DDSClient to host " << this->GetSourceName()
00181         << " on port " << fPort << "," << endl
00182         << "   will attempt " << attempt << " more times." << endl; 
00183       this->ShutdownDDSClient();
00184       gSystem->Sleep(1000*fRetryDelay);
00185     }
00186   }
00187   fModified = true;
00188   
00189 }
00190 
00191 //......................................................................
00192 int IoDDSStreamItr::Subscribe() 
00193 {
00194 //======================================================================
00195 // Submit subscription to server.  Returns 1 if successful, else 0.
00196 //======================================================================
00197   int isOk = 0;
00198   if ( !IsValid() ) return isOk;
00199 
00200   DDS::EMessageType msgrc = fDDSClient->Subscribe();
00201   if (msgrc != DDS::kOk) {
00202     MSG("Io",Msg::kError) << 
00203       "An error message " << DDS::AsString(msgrc) << 
00204       " was received from DDS::Subscribe." <<
00205       "( host " << this->GetSourceName() << 
00206       ", port " << fPort << ")." << endl;
00207     this->ShutdownDDSClient();
00208   }
00209   else {
00210     isOk = 1;
00211     fModified = false;
00212   }
00213 
00214   return isOk;
00215 
00216 }
00217 
00218 //......................................................................
00219 
00220 void IoDDSStreamItr::ShutdownDDSClient()
00221 {
00222   if (fDDSClient) { delete fDDSClient; fDDSClient = 0; }
00223 }
00224 
00225 //......................................................................
00226   
00227 int IoDDSStreamItr::LoadRecords(MomNavigator* mom) 
00228 {
00229 //======================================================================
00230 // Load the record at the current position in the stream
00231 //======================================================================
00232 
00233   if ( this -> IsModified() ) { // modified since last subscription
00234     bool isOk = this -> Subscribe(); 
00235     if ( !isOk ) return 0;
00236   }
00237 
00238   // DDSClient only has a Next
00239   DDS::EMessageType rtn = fDDSClient->Next(mom, fTimeOut, 1);
00240   if (rtn != DDS::kOk) {
00241     MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00242                             << DDS::AsString(rtn) << "." << endl;
00243     return 0;
00244   }
00245   
00246   return 1;
00247 }
00248 
00249 //......................................................................
00250 
00251 int IoDDSStreamItr::Increment(int n, MomNavigator* m)
00252 {
00253 //======================================================================
00254 // Advance the position in the stream n spaces.
00255 //=====================================================================
00256   if (n==0) return 0;
00257 
00258   if ( this -> IsModified() ) { // modified since last subscription
00259     bool isOk = this -> Subscribe(); 
00260     if ( !isOk ) return 0;
00261   }
00262   
00263   // Advance by n-1 since Get() is implemented by Next()...
00264   if (n==1) return 1;
00265 
00266   // For advances beyond just one...
00267   MomNavigator* mtmp = 0;
00268   bool dodel = false;
00269   if (m==0) { mtmp = new MomNavigator; dodel = true; }
00270   else      { mtmp = m; }
00271 
00272   DDS::EMessageType rtn = fDDSClient->Next(mtmp, fTimeOut, n-1);
00273 
00274   if (dodel) { delete mtmp; mtmp = 0; }
00275 
00276   if (rtn != DDS::kOk) {
00277     MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00278                             << DDS::AsString(rtn) << "." << endl;
00279     return 0;
00280   }
00281 
00282   return n;
00283 }
00284 
00285 //......................................................................
00286 
00287 int IoDDSStreamItr::Decrement(int n, MomNavigator* /* mom */)
00288 {
00289   MSG("Io",Msg::kWarning) << 
00290     "DDS Stream does not support Decrement(" << n << ").\n";
00291   return 0;
00292 }
00293 
00294 //......................................................................
00295 
00296 JobCResult IoDDSStreamItr::GoTo(const VldContext& vld, MomNavigator* /* mom */)
00297 {
00298   MSG("Io",Msg::kWarning) << 
00299     "DDS Stream does not support GoTo(" << vld << ").\n";
00300   return JobCResult::kWarning;
00301 }
00302 
00303 //......................................................................
00304 
00305 int IoDDSStreamItr::GoToEOF() 
00306 {
00307   MSG("Io",Msg::kWarning) << 
00308     "DDS Stream does not support GoToEOF().\n";
00309   return 0;
00310 }
00311 
00312 //......................................................................
00313 
00314 int IoDDSStreamItr::SetMaxSyncDelay(unsigned int maxSyncDelay) 
00315 { 
00316   // Subscription is modified with the new max sync delay but is not sent
00317   // until the IoDDSStreamItr::Subscribe method is invoked.
00318   int isOpen = 0;
00319   if ( this -> IsValid() ) {
00320     fDDSClient -> GetSubscription() -> SetMaxSyncDelay(maxSyncDelay);
00321     isOpen = 1;
00322   }
00323 
00324   fModified = true;
00325   return isOpen;
00326 
00327 }
00328 
00329 
00330 //......................................................................
00331 
00332 int IoDDSStreamItr::SetOffLine(bool offLine) 
00333 { 
00334   // Subscription is modified with the new offline status but is not sent
00335   // until the IoDDSStreamItr::Subscribe method is invoked.
00336   int isOpen = 0;
00337   if ( this -> IsValid() ) {
00338     fDDSClient -> GetSubscription() -> SetOffLine(offLine);
00339     isOpen = 1;
00340   }
00341 
00342   fModified = true;
00343   return isOpen;
00344 
00345 }
00346 
00347 //......................................................................
00348 
00349 int IoDDSStreamItr::SetDataSource(unsigned int dataSource) 
00350 { 
00351   // Subscription is modified with the new data source but is not sent
00352   // until the IoDDSStreamItr::Subscribe method is invoked.
00353   int isOpen = 0;
00354   if ( this -> IsValid() ) {
00355     DDS::EDataSource sourceEnum = static_cast<DDS::EDataSource>(dataSource);
00356     fDDSClient -> GetSubscription() -> SetDataSource(sourceEnum);
00357     isOpen = 1;
00358   }
00359 
00360   fModified = true;
00361   return isOpen;
00362 
00363 }
00364 
00365 //......................................................................
00366 
00367 int IoDDSStreamItr::SetKeepUpMode(unsigned int keepUpMode) 
00368 { 
00369   // Subscription is modified with the new keep up mode but is not sent
00370   // until the IoDDSStreamItr::Subscribe method is invoked.
00371   int isOpen = 0;
00372   if ( this -> IsValid() ) {
00373     DDS::EKeepUpMode keepUpEnum = static_cast<DDS::EKeepUpMode>(keepUpMode);
00374     fDDSClient -> GetSubscription() -> SetKeepUpMode(keepUpEnum);
00375     isOpen = 1;
00376   }
00377 
00378   fModified = true;
00379   return isOpen;
00380 
00381 }
00382 
00383 //......................................................................
00384 
00385 void IoDDSStreamItr::SetTimeOut(unsigned int timeout) { fTimeOut = timeout; }
00386 
00387 //......................................................................
00388 
00389 void IoDDSStreamItr::SetClientType(DDS::EClientType clienttype) 
00390                                                 { fClientType = clienttype; }
00391 
00392 //......................................................................
00393 
00394 void IoDDSStreamItr::SetClientName(string clientname) 
00395                                                 { fClientName = clientname; }
00396 
00397 //......................................................................
00398 
00399 int IoDDSStreamItr::Streams(const char* streamList) 
00400 { 
00401   // Subscription is modified with the new streamList but is not sent
00402   // until the IoDDSStreamItr::Subscribe method is invoked.
00403   Int_t numStream = 0;
00404   if ( this->IsValid() ) {
00405     numStream = fDDSClient -> GetSubscription() -> SetStreams(streamList);
00406   }
00407 
00408   fModified = true;
00409   return numStream;
00410 
00411 }
00412 
00413 //......................................................................
00414 
00415 int IoDDSStreamItr::Select(const char* stream, const char* selection,
00416                            bool /* isRequired */) 
00417 { 
00418   // Subscription is modified with the new selection cut but is not sent
00419   // until the IoDDSStreamItr::Subscribe method is invoked.
00420   int isOpen = 0;
00421   if ( this->IsValid() ) {
00422     fDDSClient -> GetSubscription() -> SetSelection(stream,selection);
00423     isOpen = 1;
00424   }
00425   fModified = true;
00426 
00427   return isOpen;
00428 
00429 }
00430 
00432 
00433 
00434 
00435 
00436 
00437 
00438 
00439 

Generated on Mon Feb 15 11:06:48 2010 for loon by  doxygen 1.3.9.1