#include <IoDDSStreamItr.h>
Inheritance diagram for IoDDSStreamItr:

Public Member Functions | |
| IoDDSStreamItr (const char *server, unsigned int port=DDS::kPort, unsigned int nretry=0, unsigned int retrydelay=1, DDS::EClientType=DDS::kUnknownClientType, string clientname="") | |
| ~IoDDSStreamItr () | |
| bool | IsValid () const |
| const char * | GetFormat () const |
| int | LoadRecords (MomNavigator *m) |
| int | Increment (int n=1, MomNavigator *m=0) |
| int | Decrement (int n=1, MomNavigator *m=0) |
| JobCResult | GoTo (const VldContext &vld, MomNavigator *m=0) |
| int | GoToEOF () |
| int | Streams (const char *streamlist) |
| int | Select (const char *stream, const char *selection, bool isRequired=false) |
| void | SetTimeOut (unsigned int seconds) |
| void | SetClientType (DDS::EClientType clienttype) |
| void | SetClientName (string clientname) |
| void | AddFile (const char *filename, int at=-1, const char *streamlist="*") |
| const char * | GetCurrentFile (const char *streamlist="*") const |
| JobCResult | GoToFile (const char *filename, const char *streamlist="*") |
| JobCResult | GoToFile (int i, const char *streamlist="*") |
| JobCResult | NextFile (int n=1, const char *streamlist="*") |
| JobCResult | PrevFile (int n=1, const char *streamlist="*") |
| void | RemoveFile (const char *filename="*", const char *streamlist="*") |
| std::ostream & | ListFile (std::ostream &ostream, const char *streamlist="*") const |
| int | SetMaxSyncDelay (unsigned int seconds) |
| int | SetDataSource (unsigned int datasource) |
| int | SetKeepUpMode (unsigned int keepupmode) |
| int | SetOffLine (bool offLine=true) |
| int | Subscribe () |
| int | GetPort () const |
| DDS::EClientType | GetClientType () const |
| string | GetClientName () const |
| bool | IsModified () const |
Private Member Functions | |
| void | InitDDSClient () |
| void | ShutdownDDSClient () |
Private Attributes | |
| unsigned int | fPort |
| unsigned int | fTimeOut |
| unsigned int | fMaxRetry |
| unsigned int | fRetryDelay |
| DDSClient * | fDDSClient |
| bool | fModified |
| DDS::EClientType | fClientType |
| string | fClientName |
|
||||||||||||||||||||||||||||
|
Definition at line 141 of file IoDDSStreamItr.cxx. References InitDDSClient(), port, server, SetClientName(), SetClientType(), and IoDataStreamItr::SetSourceName(). 00144 : 00145 fPort(port),fTimeOut(120),fMaxRetry(nretry),fRetryDelay(delay), 00146 fDDSClient(0),fModified(true),fClientType(clienttype), 00147 fClientName(clientname) 00148 { 00149 this->SetSourceName(server); 00150 this->SetClientType(clienttype); 00151 this->SetClientName(clientname); 00152 this->InitDDSClient(); 00153 }
|
|
|
Definition at line 157 of file IoDDSStreamItr.cxx. References ShutdownDDSClient(). 00158 {
00159 this->ShutdownDDSClient();
00160 }
|
|
||||||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 96 of file IoDDSStreamItr.cxx. 00097 {
00098 return;
00099 }
|
|
||||||||||||
|
Implements IoDataStreamItr. Definition at line 287 of file IoDDSStreamItr.cxx. References MSG. 00288 {
00289 MSG("Io",Msg::kWarning) <<
00290 "DDS Stream does not support Decrement(" << n << ").\n";
00291 return 0;
00292 }
|
|
|
Definition at line 67 of file IoDDSStreamItr.h. Referenced by IoInputModule::UpdateDDSConfig(). 00067 { return fClientName; }
|
|
|
Definition at line 66 of file IoDDSStreamItr.h. Referenced by IoInputModule::UpdateDDSConfig(). 00066 { return fClientType; }
|
|
|
Reimplemented from IoDataStreamItr. Definition at line 117 of file IoDDSStreamItr.cxx. References MSG. 00117 {
00118 MSG("Io",Msg::kWarning) << " DDS Stream does not support GetCurrentFile"
00119 << endl;
00120 return "";
00121 }
|
|
|
Implements IoDataStreamItr. Definition at line 33 of file IoDDSStreamItr.h. 00033 { return "dds"; }
|
|
|
Definition at line 65 of file IoDDSStreamItr.h. Referenced by IoInputModule::UpdateDDSConfig(). 00065 { return fPort; }
|
|
||||||||||||
|
Implements IoDataStreamItr. Definition at line 296 of file IoDDSStreamItr.cxx. References MSG. 00297 {
00298 MSG("Io",Msg::kWarning) <<
00299 "DDS Stream does not support GoTo(" << vld << ").\n";
00300 return JobCResult::kWarning;
00301 }
|
|
|
Reimplemented from IoDataStreamItr. Definition at line 305 of file IoDDSStreamItr.cxx. References MSG. 00306 {
00307 MSG("Io",Msg::kWarning) <<
00308 "DDS Stream does not support GoToEOF().\n";
00309 return 0;
00310 }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 125 of file IoDDSStreamItr.cxx. References MSG. 00126 {
00127 MSG("Io",Msg::kWarning) << " DDS Stream does not support GoToFile n" << endl;
00128 return JobCResult::kWarning;
00129 }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 24 of file IoDDSStreamItr.cxx. References DDS::AsString(), fDDSClient, DDSClient::GoToFile(), IsModified(), IsValid(), MSG, and Subscribe(). 00025 {
00026 //======================================================================
00027 // Go to file filename. filename may be a symbolic link, e.g. "currentfile"
00028 // will advance to the target of the symbolic link "currentfile" in the
00029 // data source directory. If filename is not absolute fullfilepathname,
00030 // its path is assumed relative to the data source directory defined on
00031 // the server. If filename is missing in this directory, will position
00032 // pointer to the next file in the alphanumerically sorted list in the
00033 // data source directory. Returns JobCResult::kAOK or JobCResult::kWarning
00034 // if error return from DDS, or JobCResult::kEndOfInputStream if not
00035 // connected.
00036
00037 if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00038
00039 if ( this -> IsModified() ) { // modified since last subscription
00040 bool isOk = this -> Subscribe();
00041 if ( !isOk ) {
00042 MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00043 return JobCResult::kWarning;
00044 }
00045 }
00046
00047 std::string returnedfilename;
00048 DDS::EMessageType msgrc = fDDSClient->GoToFile(filename,returnedfilename);
00049 if (msgrc != DDS::kOk) {
00050 MSG("Io",Msg::kWarning) <<
00051 "An error message " << DDS::AsString(msgrc) <<
00052 " was received from DDSClient::GoToFile " << filename << "." << endl;
00053 return JobCResult::kWarning;
00054 }
00055
00056 return JobCResult::kAOK;
00057
00058 }
|
|
||||||||||||
|
Implements IoDataStreamItr. Definition at line 251 of file IoDDSStreamItr.cxx. References DDS::AsString(), fDDSClient, fTimeOut, IsModified(), MSG, DDSClient::Next(), and Subscribe(). 00252 {
00253 //======================================================================
00254 // Advance the position in the stream n spaces.
00255 //=====================================================================
00256 if (n==0) return 0;
00257
00258 if ( this -> IsModified() ) { // modified since last subscription
00259 bool isOk = this -> Subscribe();
00260 if ( !isOk ) return 0;
00261 }
00262
00263 // Advance by n-1 since Get() is implemented by Next()...
00264 if (n==1) return 1;
00265
00266 // For advances beyond just one...
00267 MomNavigator* mtmp = 0;
00268 bool dodel = false;
00269 if (m==0) { mtmp = new MomNavigator; dodel = true; }
00270 else { mtmp = m; }
00271
00272 DDS::EMessageType rtn = fDDSClient->Next(mtmp, fTimeOut, n-1);
00273
00274 if (dodel) { delete mtmp; mtmp = 0; }
00275
00276 if (rtn != DDS::kOk) {
00277 MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00278 << DDS::AsString(rtn) << "." << endl;
00279 return 0;
00280 }
00281
00282 return n;
00283 }
|
|
|
Definition at line 164 of file IoDDSStreamItr.cxx. References fDDSClient, fMaxRetry, fModified, fPort, fRetryDelay, IoDataStreamItr::GetSourceName(), gSystem(), DDSClient::IsValid(), MSG, and ShutdownDDSClient(). Referenced by IoDDSStreamItr(). 00165 {
00166 //======================================================================
00167 // Set up the data dispatcher client
00168 //======================================================================
00169
00170 unsigned int attempt = fMaxRetry+1;
00171 fDDSClient = 0;
00172 while (!fDDSClient && attempt) {
00173 fDDSClient = new DDSClient(this->GetSourceName(),fPort,DDS::kData,
00174 fClientType,fClientName);
00175 --attempt;
00176
00177 if (fDDSClient==0 || !fDDSClient->IsValid()) {
00178 // Warning here...
00179 MSG("Io",Msg::kError)
00180 << "Unable to create DDSClient to host " << this->GetSourceName()
00181 << " on port " << fPort << "," << endl
00182 << " will attempt " << attempt << " more times." << endl;
00183 this->ShutdownDDSClient();
00184 gSystem->Sleep(1000*fRetryDelay);
00185 }
00186 }
00187 fModified = true;
00188
00189 }
|
|
|
Definition at line 68 of file IoDDSStreamItr.h. Referenced by GoToFile(), Increment(), LoadRecords(), and NextFile(). 00068 { return fModified; }
|
|
|
Implements IoDataStreamItr. Definition at line 32 of file IoDDSStreamItr.h. Referenced by GoToFile(), NextFile(), Select(), SetDataSource(), SetKeepUpMode(), SetMaxSyncDelay(), SetOffLine(), Streams(), and Subscribe(). 00032 { return ( fDDSClient ) ? true : false; }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 133 of file IoDDSStreamItr.cxx. 00135 {
00136 return os;
00137 }
|
|
|
Implements IoDataStreamItr. Definition at line 227 of file IoDDSStreamItr.cxx. References DDS::AsString(), fDDSClient, fTimeOut, IsModified(), MSG, DDSClient::Next(), and Subscribe(). 00228 {
00229 //======================================================================
00230 // Load the record at the current position in the stream
00231 //======================================================================
00232
00233 if ( this -> IsModified() ) { // modified since last subscription
00234 bool isOk = this -> Subscribe();
00235 if ( !isOk ) return 0;
00236 }
00237
00238 // DDSClient only has a Next
00239 DDS::EMessageType rtn = fDDSClient->Next(mom, fTimeOut, 1);
00240 if (rtn != DDS::kOk) {
00241 MSG("Io",Msg::kWarning) << "DDSClient::Next returned error message: "
00242 << DDS::AsString(rtn) << "." << endl;
00243 return 0;
00244 }
00245
00246 return 1;
00247 }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 62 of file IoDDSStreamItr.cxx. References DDS::AsString(), fDDSClient, DDSClient::GoToNextFile(), IsModified(), IsValid(), MSG, and Subscribe(). 00062 {
00063 //======================================================================
00064 // Advance by n files in data source directory. Returns JobCResult::kAOK
00065 // or JobCResult::kWarning if error returned from DDS, or
00066 // JobCResult::kEndOfInputStream if not connected.
00067 //======================================================================
00068
00069 if ( !IsValid() ) return JobCResult::kEndOfInputStream;
00070
00071 if ( this -> IsModified() ) { // modified since last subscription
00072 bool isOk = this -> Subscribe();
00073 if ( !isOk ) {
00074 MSG("Io",Msg::kWarning)<<" Subscription submit to server failed."<< endl;
00075 return JobCResult::kWarning;
00076 }
00077 }
00078
00079 for ( int i = 0; i < n; i++ ) {
00080 std::string returnedfilename;
00081 DDS::EMessageType msgrc = fDDSClient->GoToNextFile(returnedfilename);
00082 if (msgrc != DDS::kOk) {
00083 MSG("Io",Msg::kWarning) <<
00084 "An error message " << DDS::AsString(msgrc) <<
00085 " was received from DDSClient::GoToNextFile." << endl;
00086 return JobCResult::kWarning;
00087 }
00088 }
00089
00090 return JobCResult::kAOK;
00091
00092 }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 110 of file IoDDSStreamItr.cxx. References MSG. 00110 {
00111 MSG("Io",Msg::kWarning) << " DDS Stream does not support PrevFile" << endl;
00112 return JobCResult::kWarning;
00113 }
|
|
||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 103 of file IoDDSStreamItr.cxx. 00104 {
00105 return;
00106 }
|
|
||||||||||||||||
|
Reimplemented from IoDataStreamItr. Definition at line 415 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00417 {
00418 // Subscription is modified with the new selection cut but is not sent
00419 // until the IoDDSStreamItr::Subscribe method is invoked.
00420 int isOpen = 0;
00421 if ( this->IsValid() ) {
00422 fDDSClient -> GetSubscription() -> SetSelection(stream,selection);
00423 isOpen = 1;
00424 }
00425 fModified = true;
00426
00427 return isOpen;
00428
00429 }
|
|
|
Definition at line 394 of file IoDDSStreamItr.cxx. References fClientName. Referenced by IoDDSStreamItr(). 00395 { fClientName = clientname; }
|
|
|
Definition at line 389 of file IoDDSStreamItr.cxx. References fClientType. Referenced by IoDDSStreamItr(). 00390 { fClientType = clienttype; }
|
|
|
Definition at line 349 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00350 {
00351 // Subscription is modified with the new data source but is not sent
00352 // until the IoDDSStreamItr::Subscribe method is invoked.
00353 int isOpen = 0;
00354 if ( this -> IsValid() ) {
00355 DDS::EDataSource sourceEnum = static_cast<DDS::EDataSource>(dataSource);
00356 fDDSClient -> GetSubscription() -> SetDataSource(sourceEnum);
00357 isOpen = 1;
00358 }
00359
00360 fModified = true;
00361 return isOpen;
00362
00363 }
|
|
|
Definition at line 367 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00368 {
00369 // Subscription is modified with the new keep up mode but is not sent
00370 // until the IoDDSStreamItr::Subscribe method is invoked.
00371 int isOpen = 0;
00372 if ( this -> IsValid() ) {
00373 DDS::EKeepUpMode keepUpEnum = static_cast<DDS::EKeepUpMode>(keepUpMode);
00374 fDDSClient -> GetSubscription() -> SetKeepUpMode(keepUpEnum);
00375 isOpen = 1;
00376 }
00377
00378 fModified = true;
00379 return isOpen;
00380
00381 }
|
|
|
Definition at line 314 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00315 {
00316 // Subscription is modified with the new max sync delay but is not sent
00317 // until the IoDDSStreamItr::Subscribe method is invoked.
00318 int isOpen = 0;
00319 if ( this -> IsValid() ) {
00320 fDDSClient -> GetSubscription() -> SetMaxSyncDelay(maxSyncDelay);
00321 isOpen = 1;
00322 }
00323
00324 fModified = true;
00325 return isOpen;
00326
00327 }
|
|
|
Definition at line 332 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00333 {
00334 // Subscription is modified with the new offline status but is not sent
00335 // until the IoDDSStreamItr::Subscribe method is invoked.
00336 int isOpen = 0;
00337 if ( this -> IsValid() ) {
00338 fDDSClient -> GetSubscription() -> SetOffLine(offLine);
00339 isOpen = 1;
00340 }
00341
00342 fModified = true;
00343 return isOpen;
00344
00345 }
|
|
|
Reimplemented from IoDataStreamItr. Definition at line 385 of file IoDDSStreamItr.cxx. References fTimeOut. Referenced by IoInputModule::UpdateDDSConfig(). 00385 { fTimeOut = timeout; }
|
|
|
Definition at line 220 of file IoDDSStreamItr.cxx. References fDDSClient. Referenced by InitDDSClient(), Subscribe(), and ~IoDDSStreamItr(). 00221 {
00222 if (fDDSClient) { delete fDDSClient; fDDSClient = 0; }
00223 }
|
|
|
Reimplemented from IoDataStreamItr. Definition at line 399 of file IoDDSStreamItr.cxx. References fDDSClient, fModified, and IsValid(). 00400 {
00401 // Subscription is modified with the new streamList but is not sent
00402 // until the IoDDSStreamItr::Subscribe method is invoked.
00403 Int_t numStream = 0;
00404 if ( this->IsValid() ) {
00405 numStream = fDDSClient -> GetSubscription() -> SetStreams(streamList);
00406 }
00407
00408 fModified = true;
00409 return numStream;
00410
00411 }
|
|
|
Definition at line 192 of file IoDDSStreamItr.cxx. References DDS::AsString(), fDDSClient, fModified, fPort, IoDataStreamItr::GetSourceName(), IsValid(), MSG, ShutdownDDSClient(), and DDSClient::Subscribe(). Referenced by GoToFile(), Increment(), LoadRecords(), and NextFile(). 00193 {
00194 //======================================================================
00195 // Submit subscription to server. Returns 1 if successful, else 0.
00196 //======================================================================
00197 int isOk = 0;
00198 if ( !IsValid() ) return isOk;
00199
00200 DDS::EMessageType msgrc = fDDSClient->Subscribe();
00201 if (msgrc != DDS::kOk) {
00202 MSG("Io",Msg::kError) <<
00203 "An error message " << DDS::AsString(msgrc) <<
00204 " was received from DDS::Subscribe." <<
00205 "( host " << this->GetSourceName() <<
00206 ", port " << fPort << ")." << endl;
00207 this->ShutdownDDSClient();
00208 }
00209 else {
00210 isOk = 1;
00211 fModified = false;
00212 }
00213
00214 return isOk;
00215
00216 }
|
|
|
Definition at line 81 of file IoDDSStreamItr.h. Referenced by SetClientName(). |
|
|
Definition at line 80 of file IoDDSStreamItr.h. Referenced by SetClientType(). |
|
|
Definition at line 78 of file IoDDSStreamItr.h. Referenced by GoToFile(), Increment(), InitDDSClient(), LoadRecords(), NextFile(), Select(), SetDataSource(), SetKeepUpMode(), SetMaxSyncDelay(), SetOffLine(), ShutdownDDSClient(), Streams(), and Subscribe(). |
|
|
Definition at line 76 of file IoDDSStreamItr.h. Referenced by InitDDSClient(). |
|
|
Definition at line 79 of file IoDDSStreamItr.h. Referenced by InitDDSClient(), Select(), SetDataSource(), SetKeepUpMode(), SetMaxSyncDelay(), SetOffLine(), Streams(), and Subscribe(). |
|
|
Definition at line 74 of file IoDDSStreamItr.h. Referenced by InitDDSClient(), and Subscribe(). |
|
|
Definition at line 77 of file IoDDSStreamItr.h. Referenced by InitDDSClient(). |
|
|
Definition at line 75 of file IoDDSStreamItr.h. Referenced by Increment(), LoadRecords(), and SetTimeOut(). |
1.3.9.1