Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

DDSClient.cxx

Go to the documentation of this file.
00001 
00002 //                                                                           //
00003 // DDSClient                                                                 //
00004 //                                                                           //
00005 // Package: DDS (Data Dispatcher System).                                    //
00006 //                                                                           //
00007 // S. Kasahara 5/2001                                                        //
00008 //                                                                           //
00009 // Purpose: This class is the interface to be used by the client to receive  //
00010 //          near-online data from the data dispatcher server.                //
00011 //                                                                           //
00013 
00014 #include <errno.h>
00015 #include "TClass.h"
00016 #include "TSocket.h"  
00017 #include "MessageService/MsgService.h"
00018 #include "MinosObjectMap/MomNavigator.h"
00019 
00020 #include "Dispatcher/DDSClientId.h"
00021 #include "Dispatcher/DDSClient.h"
00022 #include "Dispatcher/DDSSubscription.h"
00023 #include "Dispatcher/DDSPSStatus.h"
00024 
00025 #include "Record/RecMinos.h" // for repacking temptags
00026 
00027 std::ostream& operator << (std::ostream& ms, DDSClient* cs) { 
00028                                              return cs->Print(ms); } 
00029 
00030 ClassImp(DDSClient)
00031 
00032 // Definition of static data members
00033 // *********************************
00034 
00035 CVSID("$Id: DDSClient.cxx,v 1.16 2007/08/24 05:23:48 schubert Exp $");
00036  
00037 // Definition of methods (alphabetical order)
00038 // ******************************************
00039 
00040 DDSClient::DDSClient(const char* serverhost, UInt_t serverport, 
00041                      DDS::EMessageType msgrequest, 
00042                      DDS::EClientType clienttype, string clientname) : 
00043                      fStatus(DDS::kSocketError), fTSocket(0), 
00044                      fSubscription(0), fMessageIn(0), fClientId(0),
00045                      fPSStatus(0) {
00046   // Purpose: Default constructor for DDSClient class.  This constructor 
00047   //          creates a full-duplex communication TSocket attached to 
00048   //          the serverport at serverhost.  If serverport = 0 (default),
00049   //          the socket will be connected to the named service "MinosDDS"
00050   //          instead.
00051   //  
00052   // Argument: serverhost  host name of dispatcher parent server
00053   //                       (e.g. "kasahara.hep.umn.edu").
00054   //           serverport  port number of dispatcher parent server.  If
00055   //                       serverport = 0 (default), socket will attempt to
00056   //                       connect to named service "MinosDDS" instead.
00057   //           msgrequest  initial service request message sent to parent 
00058   //                       server.  The services which the client can
00059   //                       currently request from the parent server are:
00060   //                       DDS::kData == Request for data (default).
00061   //                       DDS::kShutdown == Request to shutdown parent server.
00062   //                       DDS::kStatus = Request for ddsparentserver status.
00063   //                       (Note that to request shutdown of the parent server,
00064   //                        the client must be local to the parent or the 
00065   //                        service will be denied.)
00066   //
00067   // Return: n/a.
00068   //
00069   // Contact: S. Kasahara
00070   //
00071   // Notes: Use IsValid() to check if the DDSClient was created 
00072   //        successfully.
00073   //  
00074   
00075   // Customize message service to print time stamp for all log levels
00076   // for DDS stream
00077   MsgStream* msdds = MsgService::Instance() -> GetStream("DDS");
00078   for ( int ilvl = Msg::kMinLogLevel ; ilvl < Msg::kNLogLevel; ilvl++ ) {
00079     msdds -> AddFormat(ilvl,Msg::kTime);
00080   }
00081   
00082   // Connect to dispatcher parent server
00083   if ( !serverport ) {
00084     // User has not specified an alternative port for the parent server.  Will
00085     // attempt to connect to the port assigned to the named service "MinosDDS".
00086     fTSocket = new TSocket(serverhost,"MinosDDS");
00087   }
00088   else {
00089     fTSocket = new TSocket(serverhost,serverport);   
00090   }
00091 
00092   if ( !fTSocket || !fTSocket -> IsValid() ) {
00093     // An error occured during the creation of the TSocket
00094     if ( serverport ) {
00095       MSG("DDS",Msg::kWarning) 
00096         << "Unable to create socket connected to server host\n"
00097         << serverhost << " and port " << serverport << "." << endl;
00098     }
00099     else {
00100       MSG("DDS",Msg::kWarning)
00101         << "Unable to create socket connected to server host\n"
00102         << serverhost << " and named service MinosDDS." << endl;
00103     }
00104     if (fTSocket) delete fTSocket; fTSocket = 0;
00105   }
00106 
00107   else {
00108 
00109     // Store the addresses of the client and server
00110     fClientAddress = fTSocket -> GetLocalInetAddress();
00111     fServerAddress = fTSocket -> GetInetAddress();
00112 
00113     // Construct Client identity information. This is sent to ddsparentserver
00114     fClientId = new DDSClientId(clienttype,clientname);
00115     
00116     // Send initiating message to parent server. Client is only allowed
00117     // one message exchange with parent server.
00118     fMessageOut.Reset(msgrequest | kMESS_ACK);
00119     fMessageOut.WriteObject(fClientId); 
00120 
00121     DDS::EMessageType msgrc = SendMessage();
00122     fStatus = msgrc;
00123     
00124     if ( msgrc != DDS::kOk ) {
00125       MSG("DDS",Msg::kWarning)
00126         << "The error return message " << DDS::AsString(msgrc) 
00127         << "\nwas received from the dispatcher parent server in response to\n"
00128         << "the client's request for " << DDS::AsString(msgrequest) << "." 
00129         << endl;
00130       if (fTSocket) delete fTSocket; fTSocket = 0;
00131     }
00132     if (msgrequest == DDS::kShutdown || msgrequest == DDS::kStatus ) {
00133       // Socket is no longer valid if shutdown of parent server was requested.
00134       if ( fTSocket ) delete fTSocket; fTSocket = 0;
00135       if ( msgrequest == DDS::kStatus ) {
00136         fPSStatus = dynamic_cast<DDSPSStatus*>
00137             (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00138         if ( !fPSStatus ) {
00139           MSG("DDS",Msg::kWarning) 
00140             << "Request for parentserver report status failed to receive "
00141             << "\nDDSPSStatus object." << endl;
00142         }
00143       }
00144     }
00145   }
00146 
00147   if ( IsValid() ) {
00148     // If socket has been created successfully, create default subscription
00149     fSubscription = new DDSSubscription();
00150   }
00151   
00152 }
00153   
00154 
00155 DDSClient::~DDSClient() {
00156   // Purpose: Destructor.
00157   //
00158   // Argument: n/a.
00159   //
00160   // Return: n/a.
00161   //
00162   // Contact: S. Kasahara
00163   //
00164 
00165   if ( IsValid() ) Shutdown();
00166   if ( fSubscription ) delete fSubscription; fSubscription = 0;
00167   if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00168   if ( fClientId ) delete fClientId; fClientId = 0;
00169   if ( fPSStatus ) delete fPSStatus; fPSStatus = 0;
00170   
00171 }
00172 
00173 DDS::EMessageType DDSClient::GoToFile(std::string filename,
00174                                       std::string& returnedfilename) {
00175   // 
00176   // Purpose: Advance to serving data from file filename.
00177   //
00178   // Arguments:filename  filename of interest, e.g. F00001782_0000.mdaq.root.
00179   //                     Specifying a relative path means that the path
00180   //                     will be assumed relative to the data source
00181   //                     directory defined on the server.
00182   //                     filename may be a symbolic link file, e.g.
00183   //                     'currentfile' will advance to the file pointed
00184   //                     to by 'currentfile' in the data source directory.
00185   //
00186   //           returnedfilename  filename actually advanced to on the server 
00187   //                             side.
00188   //
00189   // Return: DDS::kOk if returnedfilename is not empty, else errorcode.
00190   //
00191   // Contact: S. Kasahara
00192   //
00193   // Notes:  If file is not found, the server will be set to serve
00194   //         data from the first file alphanumerically beyond the requested
00195   //         filename.
00196   //         Special arguments: "next" (advance to next file)
00197   //
00198 
00199   DDS::EMessageType msgrc;  // return status of service request
00200   char newfilename[512];
00201 
00202   if ( this -> IsValid() ) {
00203     // reset message buffer and tell it type of msg
00204     fMessageOut.Reset(DDS::kGoToFile | kMESS_ACK);
00205     fMessageOut << filename.c_str();  // load filename into message buffer
00206 
00207     // Send fMessageOut to childserver
00208     msgrc = SendMessage();
00209     if ( msgrc == DDS::kOk ) {
00210       // fMessageIn should contain newfilename string
00211       (*fMessageIn) >> newfilename;
00212     }
00213   }
00214   else {
00215     MSG("DDS",Msg::kWarning) 
00216     << "GoToFile requested with invalid socket connection." << endl;
00217     msgrc = DDS::kSocketError; 
00218   }
00219   returnedfilename = newfilename;
00220   return msgrc;
00221 
00222 }
00223 
00224 DDS::EMessageType DDSClient::GoToNextFile(std::string& returnedfilename) {
00225   // 
00226   // Purpose: Advance to serving data from next file in directory.
00227   //          This request only makes sense when user has subscribed in
00228   //          DDS::kAll mode.
00229   //
00230   // Arguments: returnedfilename.  Name of file actually advanced to.
00231   //
00232   // Return: DDS::kOk if returnedfilename is not empty, else errorcode.
00233   //
00234   // Contact: S. Kasahara
00235   //
00236 
00237   return this -> GoToFile("next",returnedfilename);
00238 
00239 }
00240 
00241 DDS::EMessageType DDSClient::Next(MomNavigator*& mom,UInt_t waittime,
00242                                   UInt_t advanceby) {
00243   // 
00244   // Purpose: Request and receive next available entry satisfying client's
00245   //          subscription from child server.
00246   //
00247   // Arguments:mom       pointer to MomNavigator object to be filled with
00248   //                     retrieved MomNavigator from dispatcher server.
00249   //                     See Notes. 
00250   //           waittime  time(sec) that client wishes to wait for next 
00251   //                     available entry satisfying subscription before 
00252   //                     timing out (default = 120).
00253   //           advanceby number of record sets to advance by through 
00254   //                     stream(s) (default = 1).
00255   //
00256   // Return: DDS::EMessageType  return status of service request.
00257   //
00258   // Contact: S. Kasahara
00259   //
00260   // Notes:  For historic reasons, the user can specify the input 
00261   //         MomNavigator* mom pointer in one of two ways:
00262   //         1)mom == 0.  In this case, a new MomNavigator is created and mom
00263   //                      is set to point to it.  
00264   //         2)mom == a pointer to an existing MomNavigator object.  In this
00265   //                  case, objects retrieved by the dispatcher are added to
00266   //                  the existing mom object.
00267   //         In both cases, the user owns and must delete the MomNavigator
00268   //         object pointed to by mom.
00269   //
00270 
00271   DDS::EMessageType msgrc;  // return status of service request
00272   // Local copy of Mom used to receive collection of objects from childserver
00273   MomNavigator* localMom = 0;  
00274 
00275   if ( IsValid() ) {
00276     // reset message buffer and tell it type of msg
00277     fMessageOut.Reset(DDS::kNext | kMESS_ACK); 
00278     fMessageOut << waittime;  // load requested waittime into message buffer
00279     fMessageOut << advanceby; // load requested advanceby interval
00280     // Send fMessageOut to childserver
00281     msgrc = SendMessage();
00282     if ( msgrc == DDS::kOk ) {
00283       // fMessageIn should contain object of type MomNavigator
00284       localMom = dynamic_cast<MomNavigator*>
00285                  (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00286       if (!localMom) {
00287         MSG("DDS",Msg::kWarning) 
00288         << "Next failed to receive expected MomNavigator object." << endl;
00289       }
00290       else {
00291         // if user hasn't supplied a mom pointer, initialize one here
00292         if (!mom) mom = new MomNavigator(); // this is dumb, but historic
00293         // Loop over Mom fragments to repack fTempTags object in record
00294         // that otherwise wouldn't survive i/o.  This is ugly.
00295         // Add received records to users Mom
00296         TObjArray* objArray =const_cast<TObjArray*>
00297                                (localMom -> GetFragmentArray());
00298         Int_t nrecord = objArray -> GetEntriesFast();
00299         for ( Int_t irec=0; irec < nrecord/2; irec++ ) {
00300           //record is removed from localMom to pass to usr's mom for ownership 
00301           RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00302           Registry* temptags=dynamic_cast<Registry*>
00303                                (objArray->At(irec+nrecord/2));
00304           record->GetTempTags().Set("stream",temptags->GetCharString("stream"));
00305           record->GetTempTags().Set("tree",temptags->GetCharString("tree"));
00306           record->GetTempTags().Set("file",temptags->GetCharString("file"));
00307           record->GetTempTags().Set("index",temptags->GetInt("index"));
00308           record->GetTempTags().SetName(temptags->GetName());
00309           mom -> AdoptFragment(record); // new owner of record
00310         }
00311         delete localMom; localMom=0; // clears array and deletes temptags
00312       }
00313     } 
00314   }
00315   else {
00316     MSG("DDS",Msg::kWarning) 
00317     << "Next requested with invalid socket connection." << endl;
00318     msgrc = DDS::kSocketError; 
00319   }
00320 
00321   return msgrc;
00322 
00323 }
00324 
00325 std::ostream& DDSClient::Print(std::ostream& ms) const {
00326   // Purpose: Print DDSClient status on std::ostream.
00327   //
00328   // Argument: ms  std::ostream to print on.
00329   //
00330   // Return: std::ostream reference.
00331   //
00332   // Contact: S. Kasahara
00333   //
00334 
00335   if ( IsValid() ) {
00336     ms << "Client socket connected from local address:" << endl;
00337     // Print the local host name, address, and port number
00338     ms << fClientAddress.GetHostName() << "/"<< fClientAddress.GetHostAddress()
00339        << "(port " << fClientAddress.GetPort() << ")." << endl;
00340     ms << "to a child server at remote address:" << endl;
00341     // Print the child server's host name, address, and port number
00342     ms << fServerAddress.GetHostName() << "/"<< fServerAddress.GetHostAddress()
00343        << "(port " << fServerAddress.GetPort() << ")." << endl;
00344   }
00345   else {
00346     ms << "Client to child server socket is not connected." << endl;
00347   }
00348 
00349   return ms;
00350 
00351 }
00352 
00353 DDS::EMessageType DDSClient::SendMessage() {
00354   //
00355   // Purpose: Send message previously loaded in fMessageOut to child server.
00356   //
00357   // Argument: none.
00358   //
00359   // Return: msgrc of type DDS::EMessageType received from server. Complete
00360   //         message received from server is stored in fMessageIn.
00361   //
00362   // Contact: S. Kasahara
00363   //
00364 
00365   DDS::EMessageType msgrc = DDS::kSocketError;  // default return code
00366   // Delete previously retrieved message if any
00367   if (fMessageIn) delete fMessageIn; fMessageIn = 0;
00368 
00369   // Send requested message to server
00370   Int_t rc = fTSocket -> Send(fMessageOut);
00371   if (rc < 0) {
00372     MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00373                  << rc << " in send of message to server." << endl; 
00374     //  if ( errno == EPIPE ) {
00375       // Broken pipe between child server and client is fatal
00376       MSG("DDS",Msg::kWarning) 
00377       << "\nSocket will be shutdown." << endl;
00378       if ( fTSocket ) delete fTSocket; fTSocket = 0;
00379     // }
00380   }
00381   else {
00382     // Wait for return message from user
00383     Int_t rc = fTSocket -> Recv(fMessageIn);
00384     if (rc < 0) {
00385       MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00386                  << rc << " in send of message to server." << endl; 
00387       //if ( errno == EPIPE ) {
00388         // Broken pipe between child server and client is fatal
00389         MSG("DDS",Msg::kWarning) 
00390         << "\nSocket will be shutdown." << endl;
00391         if ( fTSocket ) delete fTSocket; fTSocket = 0;
00392       // }
00393     }
00394     else {
00395       if ( ! fMessageIn ) {
00396         MSG("DDS",Msg::kWarning) 
00397           << "Received empty fMessageIn from fTSocket->Recv()"
00398           << endl;
00399       }
00400       else 
00401         msgrc = static_cast< DDS::EMessageType > (fMessageIn -> What());
00402     }
00403   }
00404 
00405   return msgrc;
00406 
00407 }
00408 
00409 
00410 DDS::EMessageType DDSClient::Shutdown() {
00411   //
00412   // Purpose: Send child server the shutdown message.
00413   //
00414   // Argument: none.
00415   //
00416   // Return: DDS::EMessageType  return status of service request
00417   //
00418   // Contact: S. Kasahara
00419   //
00420 
00421   DDS::EMessageType msgrc;  // return status of service request
00422 
00423   if ( IsValid() ) {
00424     // reset message buffer & tell it msg type
00425     fMessageOut.Reset(DDS::kShutdown | kMESS_ACK); 
00426     // Send fMessageOut to childserver
00427     msgrc = SendMessage();
00428   }
00429   else {
00430     MSG("DDS",Msg::kWarning) 
00431     << "Shutdown requested with invalid socket connection." << endl;
00432     msgrc = DDS::kSocketError; 
00433   }
00434 
00435   if (fTSocket) delete fTSocket; fTSocket = 0;
00436 
00437   return msgrc;
00438 
00439 }
00440 
00441 DDS::EMessageType DDSClient::Subscribe() {
00442   //
00443   // Purpose: Submit subscription object to the dispatcher child server.
00444   //
00445   // Argument: none.
00446   //
00447   // Return: DDS::EMessageType  return status of service request
00448   //
00449   // Contact: S. Kasahara
00450   //
00451   // Notes: The subscription object of type DDSSubscription can be retrieved 
00452   //        and modified by the client via the DDSClient::GetSubscription() 
00453   //        method.  There is no limit to the number of times that the
00454   //        subscription object can be modified and resubmitted to the server.
00455   //
00456 
00457   DDS::EMessageType msgrc;  // return status of service request
00458 
00459   if ( IsValid() ) {
00460     // reset buffer & tell it msg type
00461     fMessageOut.Reset(DDS::kSubscribe | kMESS_ACK); 
00462     fMessageOut.WriteObject(fSubscription); 
00463     msgrc = SendMessage(); // Submit subscription to server
00464   }
00465   else {
00466     MSG("DDS",Msg::kWarning) 
00467     << "Subscribe requested with invalid socket connection." << endl;
00468     msgrc = DDS::kSocketError; 
00469   }
00470 
00471   return msgrc;
00472 
00473 }
00474 
00475 
00476 
00477 
00478 
00479 
00480 
00481 

Generated on Mon Feb 15 11:06:36 2010 for loon by  doxygen 1.3.9.1