#include <DDSClient.h>
Public Member Functions | |
| DDSClient (const char *serverhost, UInt_t serverport=0, DDS::EMessageType initmsg=DDS::kData, DDS::EClientType type=DDS::kUnknownClientType, std::string clientname="") | |
| virtual | ~DDSClient () |
| DDS::EMessageType | GetStatus () const |
| const DDSPSStatus * | GetPSStatus () const |
| const DDSClientId * | GetClientId () const |
| DDSSubscription * | GetSubscription () const |
| bool | IsValid () const |
| std::ostream & | Print (std::ostream &ms) const |
| DDS::EMessageType | GoToFile (std::string filename, std::string &returnedfilename) |
| DDS::EMessageType | GoToNextFile (std::string &returnedfilename) |
| DDS::EMessageType | Next (MomNavigator *&mom, UInt_t waittime=120, UInt_t advanceby=1) |
| DDS::EMessageType | Shutdown () |
| DDS::EMessageType | Subscribe () |
Private Member Functions | |
| DDS::EMessageType | SendMessage () |
Private Attributes | |
| DDS::EMessageType | fStatus |
| TSocket * | fTSocket |
| DDSSubscription * | fSubscription |
| TMessage * | fMessageIn |
| TMessage | fMessageOut |
| TInetAddress | fClientAddress |
| TInetAddress | fServerAddress |
| DDSClientId * | fClientId |
| DDSPSStatus * | fPSStatus |
|
||||||||||||||||||||||||
|
Definition at line 40 of file DDSClient.cxx. References DDS::AsString(), MsgService::Instance(), and MSG. 00042 : 00043 fStatus(DDS::kSocketError), fTSocket(0), 00044 fSubscription(0), fMessageIn(0), fClientId(0), 00045 fPSStatus(0) { 00046 // Purpose: Default constructor for DDSClient class. This constructor 00047 // creates a full-duplex communication TSocket attached to 00048 // the serverport at serverhost. If serverport = 0 (default), 00049 // the socket will be connected to the named service "MinosDDS" 00050 // instead. 00051 // 00052 // Argument: serverhost host name of dispatcher parent server 00053 // (e.g. "kasahara.hep.umn.edu"). 00054 // serverport port number of dispatcher parent server. If 00055 // serverport = 0 (default), socket will attempt to 00056 // connect to named service "MinosDDS" instead. 00057 // msgrequest initial service request message sent to parent 00058 // server. The services which the client can 00059 // currently request from the parent server are: 00060 // DDS::kData == Request for data (default). 00061 // DDS::kShutdown == Request to shutdown parent server. 00062 // DDS::kStatus = Request for ddsparentserver status. 00063 // (Note that to request shutdown of the parent server, 00064 // the client must be local to the parent or the 00065 // service will be denied.) 00066 // 00067 // Return: n/a. 00068 // 00069 // Contact: S. Kasahara 00070 // 00071 // Notes: Use IsValid() to check if the DDSClient was created 00072 // successfully. 00073 // 00074 00075 // Customize message service to print time stamp for all log levels 00076 // for DDS stream 00077 MsgStream* msdds = MsgService::Instance() -> GetStream("DDS"); 00078 for ( int ilvl = Msg::kMinLogLevel ; ilvl < Msg::kNLogLevel; ilvl++ ) { 00079 msdds -> AddFormat(ilvl,Msg::kTime); 00080 } 00081 00082 // Connect to dispatcher parent server 00083 if ( !serverport ) { 00084 // User has not specified an alternative port for the parent server. Will 00085 // attempt to connect to the port assigned to the named service "MinosDDS". 00086 fTSocket = new TSocket(serverhost,"MinosDDS"); 00087 } 00088 else { 00089 fTSocket = new TSocket(serverhost,serverport); 00090 } 00091 00092 if ( !fTSocket || !fTSocket -> IsValid() ) { 00093 // An error occured during the creation of the TSocket 00094 if ( serverport ) { 00095 MSG("DDS",Msg::kWarning) 00096 << "Unable to create socket connected to server host\n" 00097 << serverhost << " and port " << serverport << "." << endl; 00098 } 00099 else { 00100 MSG("DDS",Msg::kWarning) 00101 << "Unable to create socket connected to server host\n" 00102 << serverhost << " and named service MinosDDS." << endl; 00103 } 00104 if (fTSocket) delete fTSocket; fTSocket = 0; 00105 } 00106 00107 else { 00108 00109 // Store the addresses of the client and server 00110 fClientAddress = fTSocket -> GetLocalInetAddress(); 00111 fServerAddress = fTSocket -> GetInetAddress(); 00112 00113 // Construct Client identity information. This is sent to ddsparentserver 00114 fClientId = new DDSClientId(clienttype,clientname); 00115 00116 // Send initiating message to parent server. Client is only allowed 00117 // one message exchange with parent server. 00118 fMessageOut.Reset(msgrequest | kMESS_ACK); 00119 fMessageOut.WriteObject(fClientId); 00120 00121 DDS::EMessageType msgrc = SendMessage(); 00122 fStatus = msgrc; 00123 00124 if ( msgrc != DDS::kOk ) { 00125 MSG("DDS",Msg::kWarning) 00126 << "The error return message " << DDS::AsString(msgrc) 00127 << "\nwas received from the dispatcher parent server in response to\n" 00128 << "the client's request for " << DDS::AsString(msgrequest) << "." 00129 << endl; 00130 if (fTSocket) delete fTSocket; fTSocket = 0; 00131 } 00132 if (msgrequest == DDS::kShutdown || msgrequest == DDS::kStatus ) { 00133 // Socket is no longer valid if shutdown of parent server was requested. 00134 if ( fTSocket ) delete fTSocket; fTSocket = 0; 00135 if ( msgrequest == DDS::kStatus ) { 00136 fPSStatus = dynamic_cast<DDSPSStatus*> 00137 (fMessageIn -> ReadObject(fMessageIn->GetClass())); 00138 if ( !fPSStatus ) { 00139 MSG("DDS",Msg::kWarning) 00140 << "Request for parentserver report status failed to receive " 00141 << "\nDDSPSStatus object." << endl; 00142 } 00143 } 00144 } 00145 } 00146 00147 if ( IsValid() ) { 00148 // If socket has been created successfully, create default subscription 00149 fSubscription = new DDSSubscription(); 00150 } 00151 00152 }
|
|
|
Definition at line 155 of file DDSClient.cxx. References fClientId, fMessageIn, fPSStatus, fSubscription, IsValid(), and Shutdown(). 00155 {
00156 // Purpose: Destructor.
00157 //
00158 // Argument: n/a.
00159 //
00160 // Return: n/a.
00161 //
00162 // Contact: S. Kasahara
00163 //
00164
00165 if ( IsValid() ) Shutdown();
00166 if ( fSubscription ) delete fSubscription; fSubscription = 0;
00167 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00168 if ( fClientId ) delete fClientId; fClientId = 0;
00169 if ( fPSStatus ) delete fPSStatus; fPSStatus = 0;
00170
00171 }
|
|
|
Definition at line 44 of file DDSClient.h. 00044 { return fClientId; }
|
|
|
Definition at line 43 of file DDSClient.h. Referenced by main(). 00043 { return fPSStatus; }
|
|
|
Definition at line 42 of file DDSClient.h. Referenced by main(). 00042 { return fStatus; }
|
|
|
Definition at line 45 of file DDSClient.h. Referenced by ReadDispatcherModule::ConnectToServer(), and main(). 00045 { return fSubscription; }
|
|
||||||||||||
|
Definition at line 173 of file DDSClient.cxx. References fMessageOut, IsValid(), MSG, and SendMessage(). Referenced by IoDDSStreamItr::GoToFile(), and GoToNextFile(). 00174 {
00175 //
00176 // Purpose: Advance to serving data from file filename.
00177 //
00178 // Arguments:filename filename of interest, e.g. F00001782_0000.mdaq.root.
00179 // Specifying a relative path means that the path
00180 // will be assumed relative to the data source
00181 // directory defined on the server.
00182 // filename may be a symbolic link file, e.g.
00183 // 'currentfile' will advance to the file pointed
00184 // to by 'currentfile' in the data source directory.
00185 //
00186 // returnedfilename filename actually advanced to on the server
00187 // side.
00188 //
00189 // Return: DDS::kOk if returnedfilename is not empty, else errorcode.
00190 //
00191 // Contact: S. Kasahara
00192 //
00193 // Notes: If file is not found, the server will be set to serve
00194 // data from the first file alphanumerically beyond the requested
00195 // filename.
00196 // Special arguments: "next" (advance to next file)
00197 //
00198
00199 DDS::EMessageType msgrc; // return status of service request
00200 char newfilename[512];
00201
00202 if ( this -> IsValid() ) {
00203 // reset message buffer and tell it type of msg
00204 fMessageOut.Reset(DDS::kGoToFile | kMESS_ACK);
00205 fMessageOut << filename.c_str(); // load filename into message buffer
00206
00207 // Send fMessageOut to childserver
00208 msgrc = SendMessage();
00209 if ( msgrc == DDS::kOk ) {
00210 // fMessageIn should contain newfilename string
00211 (*fMessageIn) >> newfilename;
00212 }
00213 }
00214 else {
00215 MSG("DDS",Msg::kWarning)
00216 << "GoToFile requested with invalid socket connection." << endl;
00217 msgrc = DDS::kSocketError;
00218 }
00219 returnedfilename = newfilename;
00220 return msgrc;
00221
00222 }
|
|
|
Definition at line 224 of file DDSClient.cxx. References GoToFile(). Referenced by IoDDSStreamItr::NextFile(). 00224 {
00225 //
00226 // Purpose: Advance to serving data from next file in directory.
00227 // This request only makes sense when user has subscribed in
00228 // DDS::kAll mode.
00229 //
00230 // Arguments: returnedfilename. Name of file actually advanced to.
00231 //
00232 // Return: DDS::kOk if returnedfilename is not empty, else errorcode.
00233 //
00234 // Contact: S. Kasahara
00235 //
00236
00237 return this -> GoToFile("next",returnedfilename);
00238
00239 }
|
|
|
Definition at line 46 of file DDSClient.h. Referenced by GoToFile(), IoDDSStreamItr::InitDDSClient(), main(), Next(), Print(), Shutdown(), Subscribe(), and ~DDSClient(). 00046 { return (fTSocket != (TSocket*)0) ? true : false; }
|
|
||||||||||||||||
|
Definition at line 241 of file DDSClient.cxx. References fMessageIn, fMessageOut, Registry::GetCharString(), Registry::GetInt(), RecMinos::GetTempTags(), IsValid(), MSG, SendMessage(), and Registry::Set(). Referenced by IoDDSStreamItr::Increment(), ReadDispatcherModule::IsNewEventReady(), and IoDDSStreamItr::LoadRecords(). 00242 {
00243 //
00244 // Purpose: Request and receive next available entry satisfying client's
00245 // subscription from child server.
00246 //
00247 // Arguments:mom pointer to MomNavigator object to be filled with
00248 // retrieved MomNavigator from dispatcher server.
00249 // See Notes.
00250 // waittime time(sec) that client wishes to wait for next
00251 // available entry satisfying subscription before
00252 // timing out (default = 120).
00253 // advanceby number of record sets to advance by through
00254 // stream(s) (default = 1).
00255 //
00256 // Return: DDS::EMessageType return status of service request.
00257 //
00258 // Contact: S. Kasahara
00259 //
00260 // Notes: For historic reasons, the user can specify the input
00261 // MomNavigator* mom pointer in one of two ways:
00262 // 1)mom == 0. In this case, a new MomNavigator is created and mom
00263 // is set to point to it.
00264 // 2)mom == a pointer to an existing MomNavigator object. In this
00265 // case, objects retrieved by the dispatcher are added to
00266 // the existing mom object.
00267 // In both cases, the user owns and must delete the MomNavigator
00268 // object pointed to by mom.
00269 //
00270
00271 DDS::EMessageType msgrc; // return status of service request
00272 // Local copy of Mom used to receive collection of objects from childserver
00273 MomNavigator* localMom = 0;
00274
00275 if ( IsValid() ) {
00276 // reset message buffer and tell it type of msg
00277 fMessageOut.Reset(DDS::kNext | kMESS_ACK);
00278 fMessageOut << waittime; // load requested waittime into message buffer
00279 fMessageOut << advanceby; // load requested advanceby interval
00280 // Send fMessageOut to childserver
00281 msgrc = SendMessage();
00282 if ( msgrc == DDS::kOk ) {
00283 // fMessageIn should contain object of type MomNavigator
00284 localMom = dynamic_cast<MomNavigator*>
00285 (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00286 if (!localMom) {
00287 MSG("DDS",Msg::kWarning)
00288 << "Next failed to receive expected MomNavigator object." << endl;
00289 }
00290 else {
00291 // if user hasn't supplied a mom pointer, initialize one here
00292 if (!mom) mom = new MomNavigator(); // this is dumb, but historic
00293 // Loop over Mom fragments to repack fTempTags object in record
00294 // that otherwise wouldn't survive i/o. This is ugly.
00295 // Add received records to users Mom
00296 TObjArray* objArray =const_cast<TObjArray*>
00297 (localMom -> GetFragmentArray());
00298 Int_t nrecord = objArray -> GetEntriesFast();
00299 for ( Int_t irec=0; irec < nrecord/2; irec++ ) {
00300 //record is removed from localMom to pass to usr's mom for ownership
00301 RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00302 Registry* temptags=dynamic_cast<Registry*>
00303 (objArray->At(irec+nrecord/2));
00304 record->GetTempTags().Set("stream",temptags->GetCharString("stream"));
00305 record->GetTempTags().Set("tree",temptags->GetCharString("tree"));
00306 record->GetTempTags().Set("file",temptags->GetCharString("file"));
00307 record->GetTempTags().Set("index",temptags->GetInt("index"));
00308 record->GetTempTags().SetName(temptags->GetName());
00309 mom -> AdoptFragment(record); // new owner of record
00310 }
00311 delete localMom; localMom=0; // clears array and deletes temptags
00312 }
00313 }
00314 }
00315 else {
00316 MSG("DDS",Msg::kWarning)
00317 << "Next requested with invalid socket connection." << endl;
00318 msgrc = DDS::kSocketError;
00319 }
00320
00321 return msgrc;
00322
00323 }
|
|
|
Definition at line 325 of file DDSClient.cxx. References fClientAddress, fServerAddress, and IsValid(). Referenced by operator<<(). 00325 {
00326 // Purpose: Print DDSClient status on std::ostream.
00327 //
00328 // Argument: ms std::ostream to print on.
00329 //
00330 // Return: std::ostream reference.
00331 //
00332 // Contact: S. Kasahara
00333 //
00334
00335 if ( IsValid() ) {
00336 ms << "Client socket connected from local address:" << endl;
00337 // Print the local host name, address, and port number
00338 ms << fClientAddress.GetHostName() << "/"<< fClientAddress.GetHostAddress()
00339 << "(port " << fClientAddress.GetPort() << ")." << endl;
00340 ms << "to a child server at remote address:" << endl;
00341 // Print the child server's host name, address, and port number
00342 ms << fServerAddress.GetHostName() << "/"<< fServerAddress.GetHostAddress()
00343 << "(port " << fServerAddress.GetPort() << ")." << endl;
00344 }
00345 else {
00346 ms << "Client to child server socket is not connected." << endl;
00347 }
00348
00349 return ms;
00350
00351 }
|
|
|
Definition at line 353 of file DDSClient.cxx. References fMessageIn, fMessageOut, fTSocket, and MSG. Referenced by GoToFile(), Next(), Shutdown(), and Subscribe(). 00353 {
00354 //
00355 // Purpose: Send message previously loaded in fMessageOut to child server.
00356 //
00357 // Argument: none.
00358 //
00359 // Return: msgrc of type DDS::EMessageType received from server. Complete
00360 // message received from server is stored in fMessageIn.
00361 //
00362 // Contact: S. Kasahara
00363 //
00364
00365 DDS::EMessageType msgrc = DDS::kSocketError; // default return code
00366 // Delete previously retrieved message if any
00367 if (fMessageIn) delete fMessageIn; fMessageIn = 0;
00368
00369 // Send requested message to server
00370 Int_t rc = fTSocket -> Send(fMessageOut);
00371 if (rc < 0) {
00372 MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00373 << rc << " in send of message to server." << endl;
00374 // if ( errno == EPIPE ) {
00375 // Broken pipe between child server and client is fatal
00376 MSG("DDS",Msg::kWarning)
00377 << "\nSocket will be shutdown." << endl;
00378 if ( fTSocket ) delete fTSocket; fTSocket = 0;
00379 // }
00380 }
00381 else {
00382 // Wait for return message from user
00383 Int_t rc = fTSocket -> Recv(fMessageIn);
00384 if (rc < 0) {
00385 MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00386 << rc << " in send of message to server." << endl;
00387 //if ( errno == EPIPE ) {
00388 // Broken pipe between child server and client is fatal
00389 MSG("DDS",Msg::kWarning)
00390 << "\nSocket will be shutdown." << endl;
00391 if ( fTSocket ) delete fTSocket; fTSocket = 0;
00392 // }
00393 }
00394 else {
00395 if ( ! fMessageIn ) {
00396 MSG("DDS",Msg::kWarning)
00397 << "Received empty fMessageIn from fTSocket->Recv()"
00398 << endl;
00399 }
00400 else
00401 msgrc = static_cast< DDS::EMessageType > (fMessageIn -> What());
00402 }
00403 }
00404
00405 return msgrc;
00406
00407 }
|
|
|
Definition at line 410 of file DDSClient.cxx. References fMessageOut, fTSocket, IsValid(), MSG, and SendMessage(). Referenced by ReadDispatcherModule::IsNewEventReady(), and ~DDSClient(). 00410 {
00411 //
00412 // Purpose: Send child server the shutdown message.
00413 //
00414 // Argument: none.
00415 //
00416 // Return: DDS::EMessageType return status of service request
00417 //
00418 // Contact: S. Kasahara
00419 //
00420
00421 DDS::EMessageType msgrc; // return status of service request
00422
00423 if ( IsValid() ) {
00424 // reset message buffer & tell it msg type
00425 fMessageOut.Reset(DDS::kShutdown | kMESS_ACK);
00426 // Send fMessageOut to childserver
00427 msgrc = SendMessage();
00428 }
00429 else {
00430 MSG("DDS",Msg::kWarning)
00431 << "Shutdown requested with invalid socket connection." << endl;
00432 msgrc = DDS::kSocketError;
00433 }
00434
00435 if (fTSocket) delete fTSocket; fTSocket = 0;
00436
00437 return msgrc;
00438
00439 }
|
|
|
Definition at line 441 of file DDSClient.cxx. References fMessageOut, fSubscription, IsValid(), MSG, and SendMessage(). Referenced by ReadDispatcherModule::ConnectToServer(), main(), and IoDDSStreamItr::Subscribe(). 00441 {
00442 //
00443 // Purpose: Submit subscription object to the dispatcher child server.
00444 //
00445 // Argument: none.
00446 //
00447 // Return: DDS::EMessageType return status of service request
00448 //
00449 // Contact: S. Kasahara
00450 //
00451 // Notes: The subscription object of type DDSSubscription can be retrieved
00452 // and modified by the client via the DDSClient::GetSubscription()
00453 // method. There is no limit to the number of times that the
00454 // subscription object can be modified and resubmitted to the server.
00455 //
00456
00457 DDS::EMessageType msgrc; // return status of service request
00458
00459 if ( IsValid() ) {
00460 // reset buffer & tell it msg type
00461 fMessageOut.Reset(DDS::kSubscribe | kMESS_ACK);
00462 fMessageOut.WriteObject(fSubscription);
00463 msgrc = SendMessage(); // Submit subscription to server
00464 }
00465 else {
00466 MSG("DDS",Msg::kWarning)
00467 << "Subscribe requested with invalid socket connection." << endl;
00468 msgrc = DDS::kSocketError;
00469 }
00470
00471 return msgrc;
00472
00473 }
|
|
|
Definition at line 68 of file DDSClient.h. Referenced by Print(). |
|
|
Definition at line 70 of file DDSClient.h. Referenced by ~DDSClient(). |
|
|
Definition at line 66 of file DDSClient.h. Referenced by Next(), SendMessage(), and ~DDSClient(). |
|
|
Definition at line 67 of file DDSClient.h. Referenced by GoToFile(), Next(), SendMessage(), Shutdown(), and Subscribe(). |
|
|
Definition at line 71 of file DDSClient.h. Referenced by ~DDSClient(). |
|
|
Definition at line 69 of file DDSClient.h. Referenced by Print(). |
|
|
Definition at line 63 of file DDSClient.h. |
|
|
Definition at line 65 of file DDSClient.h. Referenced by Subscribe(), and ~DDSClient(). |
|
|
Definition at line 64 of file DDSClient.h. Referenced by SendMessage(), and Shutdown(). |
1.3.9.1