Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

DisplayServer.cc

Go to the documentation of this file.
00001 //-------------------------------------------------------------------------
00002 // File: DisplayServer.hh
00003 //
00004 // The DisplayServer class comprises methods to receive monitoring data from
00005 // the consumers, store them and send them to the HistoDisplay clients.
00006 // The communication between the consumers, the DisplayServer and the
00007 // HistoDisplay is implemented with TSockets.
00008 //
00009 // Author: Wolfgang Wagner 
00010 //
00011 //-------------------------------------------------------------------------
00012 //****************************************************************************
00013 // RCS Current Revision Record
00014 //-----------------------------------------------------------------------------
00015 // $Source: /cvs/minoscvs/rep1/minossoft/CDFMonitoringFwk/DisplayServer.cc,v $
00016 // $Revision: 1.10 $
00017 // $Date: 2003/10/17 20:35:58 $
00018 // $Author: dap56 $
00019 // $State: Exp $
00020 // $Locker:  $
00021 //*****************************************************************************
00022 
00023 #include "DisplayServer.h"
00024 
00025 #include <unistd.h>
00026 #include <iostream>
00027 #include <cstring>
00028 
00029 #include "TMessage.h"
00030 #include "TString.h"
00031 
00032 #include "MessageStorage.h"
00033 #include "SocketUtils.h"
00034 #include "ServerProtocol.h"
00035 
00036 using std::string;
00037 using std::cout;
00038 using std::endl;
00039 using std::cerr;
00040 
00041 
00042 DisplayServer::DisplayServer(char* host, int consPort, TROOT* rt, 
00043   ConsumerList* cl, bool ut) : TNamed("DisplayServer","DisplayServer"),
00044   _hostName(host), 
00045   _consumerPort(consPort), _clientPort(9091), _error(DS_OK), _debugFlags(0x0),
00046   _updateTimeDiff(120000), _useTimer(ut), 
00047   _ss(0), _conSock(0), 
00048   _currInfo(0), _conList(cl), _glRoot(rt), _updateTimer(0)
00049 {
00050   // Connect to the consumer server socket.
00051   _conSock = new TSocket((TString)(_hostName.c_str()), _consumerPort);
00052 
00053   if (_conSock->IsValid()) {
00054     cout << _clName << ": Connected successfully to CONSUMER socket at port "
00055          << _consumerPort << endl;
00056     cout << spaces << "on host " << _hostName << "." 
00057          << endl;
00058   }
00059   else {
00060     cerr << _clName << ": Consumer socket at port " << _consumerPort
00061          << " on host\n" << spaces << _hostName << " is not valid!" << endl;  
00062     _error = DS_NO_CONNECTION;
00063   } 
00064   
00065   if (_error < 0) ;
00066   else {
00067     _consMoni.Add(_conSock);
00068     if (openClientServerSocket() < 0) {
00069       cerr << _clName << ": ERROR could not open server socket for clients."
00070            << endl;
00071     }
00072   }
00073   // The following 3 lines are included to prevent linking problems. -------
00074   TConsumerInfo* currInfo = new TConsumerInfo("Display Server", 1);
00075   delete currInfo;
00076   currInfo = 0;
00077   //------------------------------------------------------------------------
00078 }
00079 
00080 
00081 const string DisplayServer::_clName = string("DisplayServer");
00082 
00083 
00084 DisplayServer::~DisplayServer()
00085 {
00086 }
00087 
00088 
00089 int DisplayServer::pollConsumer()
00090 {
00091   static int consSendCycle=0;
00092   static int numConsPoll=0;   // Count the number of polls 
00093   static string oldNameString;
00094   TConsumerInfo* newInfo = 0;
00095 
00096   if (!_consMoni.GetActive()) {
00097     return 0;
00098   }
00099 
00100   TSocket* sock0 = _consMoni.Select(50);
00101   numConsPoll++;
00102   if (sock0 != (TSocket*)-1) {       
00103     if (sock0 && _conSock) {
00104       if (sock0->GetInetAddress().GetHostName() !=
00105           _conSock->GetInetAddress().GetHostName()) {
00106         cerr << "DisplayServer::pollConsumer(): Expected consumer "
00107              << "socket to be selected."
00108              << endl;
00109         return -4;
00110       }
00111     }      
00112     // For debugging:
00113     // cout << "Returned with valid sock0. # of polls = " 
00114     //      << numConsPoll << endl;
00115     // Reset the number of polls if the connection was successful.
00116     numConsPoll=0;
00117     // Process message
00118     SocketUtils sockUt0(_glRoot, &_consMoni, sock0);
00119     string ctrlString = sockUt0.getStringMessage();
00120     // For debugging only:---------------------------
00121     // cout << _clName << ": Got string: " << ctrlString << endl;
00122     //-----------------------------------------------
00123     if (ctrlString == DspEndConnection) {
00124       string consumerName; 
00125       if (_currInfo) consumerName = string(_currInfo->GetTitle());
00126       else           consumerName = "Consumer";
00127       cout << _clName << ": INFO: " << consumerName << " sent message: "
00128            << ctrlString << "\n" << spaces              
00129            << "==> Going to terminate connection to " << consumerName;
00130       cout << " running on\n" 
00131            << spaces << sock0->GetInetAddress().GetHostName() << endl;
00132       sockUt0.closeSocket();
00133       //---------------------------------------------
00134       _conList->setStatus(0,ConsumerList::Finished);
00135       //      _conList->sendList("b0dap30.fnal.gov",9090);
00136       sleep(1);
00137       if (_ss) {
00138         _ss->Close();
00139         _glRoot->GetListOfSockets()->Remove(_ss);
00140         delete _ss;
00141         _ss = 0;
00142       }
00143       //---------------------------------------------       
00144       // Now there should be only the client sockets left.
00145       TIter next(_glRoot->GetListOfSockets());
00146       TSocket* socktmp = 0;
00147       while ((socktmp = (TSocket*)next())) {
00148         socktmp->Send(DspEndConnection.c_str());
00149         sleep(1);
00150         socktmp->Close();
00151         _glRoot->GetListOfSockets()->Remove(socktmp);
00152       }
00153       return 2;
00154     }
00155     else {
00156       if (ctrlString == DspConsumerSend) {
00157         consSendCycle++;
00158         // For debugging:
00159         // cout << "Send Cycle: " << consSendCycle << endl; 
00160         _glRoot->DeleteAll();
00161         TMessage* conmess = 0;
00162         //---------------------------------------------------------
00163         // 2.) Receive the TConsumerInfo 
00164         sock0->Recv(conmess);  
00165         if (conmess) { 
00166           if (conmess->What() == kMESS_OBJECT) {
00167             TObject *rcvobject = (TObject*)conmess->ReadObject( 
00168                                  conmess->GetClass());
00169             // For debugging: -------------------------------
00170             // cout << _clName << ": objectname = " 
00171             //     << rcvobject->GetName() << endl;
00172             //-----------------------------------------------
00173             if(rcvobject->InheritsFrom("TConsumerInfo")) {
00174               newInfo = (TConsumerInfo*) rcvobject;
00175               if (newInfo) {
00176                 delete _currInfo;
00177                 _currInfo = newInfo;
00178               }  
00179             }
00180             else {
00181               cerr << _clName << ": pollConsumer(): ERROR: "
00182                    << "Did not receive the TConsumerInfo!"
00183                    << endl;
00184               return -1;
00185             }           
00186           }
00187           delete conmess;
00188         }  
00189         else {
00190           // Error:
00191           cerr << _clName << ": pollConsumer(): ERROR: "
00192                << "Didn't receive a valid message." << endl;        
00193           return -2; 
00194         }  
00195         //---------------------------------------------------------
00196         // 3.) Receive the string with the object names.
00197         string namesString = sockUt0.getStringMessage();
00198         // For Debugging: 
00199         // cout << "Got names-String: " << namesString << endl;
00200         if (strncmp(namesString.c_str(), "Modified", 8) == 0) {
00201           if (_debugFlags & 0x4) {
00202             cout << _clName << ": pollConsumer(): DEBUG INFO: "
00203                  << "Going to edit the storage list." << endl;
00204           }             
00205           editStorageList(namesString);
00206           setRequireUpdateBits(); 
00207         }
00208         else { 
00209           if (strncmp(namesString.c_str(), "Unmodified", 10) != 0) {
00210             cerr << _clName << ": pollConsumer(): ERROR: "
00211                  << "Expected key word 'Unmodified'." << endl;
00212             return -3;
00213           }
00214         }  
00215         //-----------------------------------------------------
00216         // 4.) Receive the objects expect to fill the list.
00217         if (_currList.GetSize() > 0) {
00218           TListIter storeIter(&_currList);
00219           Int_t     recvRes = 0;
00220           while (MessageStorage *mesStor = (MessageStorage*)storeIter()) {
00221             if (sock0) recvRes = sock0->Recv(conmess);
00222             if ((recvRes != -1) && (recvRes != 0)) { 
00223               if (conmess->What() == kMESS_OBJECT) {
00224                 mesStor->updateMessage(conmess);
00225                 // For Debugging:
00226                 // cout << _clName << ": pollConsumer(): INFO: "
00227                 //      << "stored " << mesStor->GetName() << endl;
00228               }
00229               else {
00230                 cerr << _clName << ": pollConsumer(): Expected an object."
00231                      << endl;
00232                 continue;
00233               }
00234             }
00235             else {
00236               cerr << _clName << ": ERROR reading object messages!" << endl;
00237               cerr << _clName << ": Closing socket!" <<  endl;
00238               sockUt0.closeSocket();
00239               continue;
00240             }
00241             // Make sure that conmess does not point anymore to
00242             // the objects in the storage list.
00243             conmess = 0;   
00244           }
00245         }  
00246         else {
00247           cerr << _clName << ": pollConsumer(): Storage list empty."
00248                << endl;
00249         }
00250         //---------------------------------------------------------
00251         // 5.) Wait for the end string to be sent.
00252         string endString = sockUt0.getStringMessage();
00253         if (endString != DspConsumerFinish)
00254           cerr << _clName << ": pollConsumer(): ERROR: "
00255                << "Expected different end string. Got " << endString
00256                << endl;
00257         //----------------------------------------------------
00258         if (consSendCycle == 1) {
00259           updateStateManager();
00260           if (!_updateTimer && _useTimer) {
00261             _updateTimer = new TTimer(this, _updateTimeDiff);
00262             _updateTimer->Reset();
00263             _updateTimer->TurnOn(); 
00264           }
00265         }
00266       }
00267       else {
00268         //        cerr << _clName << ": Got an unexpected string from the "
00269         //   << "consumer." << endl;
00270       }
00271     } // else of if (string=="END CONNECTION")
00272   } 
00273 
00274   return 1;
00275 }
00276 
00277 
00278 void DisplayServer::connectClient()
00279 {
00280   // Check for requests by the display clients 
00281   TSocket* sock1=0;
00282   sock1 =  _ss->Accept();
00283   if (sock1 != (TSocket*)-1) {
00284     string clientHostname(sock1->GetInetAddress().GetHostName());
00285     int  clientPort = sock1->GetInetAddress().GetPort();
00286     cout << _clName << ": connectClient(): Client connected from " 
00287          << clientHostname << " on port " << clientPort << "." << endl;
00288     _clientMoni.Add(sock1);
00289     char portChar[20];
00290     sprintf(portChar, "%d", clientPort);
00291     string id = clientHostname + string(":") + string(portChar);
00292     _clientMap[id] = false;
00293   }
00294 
00295   return;
00296 }
00297 
00298 
00299 void DisplayServer::pollClients()
00300 {
00301   const string myName("DisplayServer::pollClients()");
00302 
00303   if (!_clientMoni.GetActive()) return;
00304 
00305   // wait for message 
00306   TSocket* sock2 = 0;
00307   sock2 = _clientMoni.Select(50);
00308   if (sock2 != (TSocket*)-1) {
00309     SocketUtils sockUt2(_glRoot, &_clientMoni, sock2);
00310     string request = sockUt2.getStringMessage();
00311     // For debugging purposes:
00312     // cout << _clName << ": Got requestString: " << request << endl;
00313     // cout << "                Size = " << request.size() << endl;
00314     if (request.size() == 0) {
00315       cerr << myName << ": request empty" << endl;
00316       return;
00317     }  
00318     if (request == DspEndConnection) {
00319       cout << myName << ": Terminating connection to "  
00320            << sockUt2.getHostname() << "." << endl;
00321       sockUt2.closeSocket();
00322 #ifndef IRIX6
00323       _clientMap.erase(sockUt2.getId());
00324 #else
00325       // erase entry from map
00326       // SGI barfs at _clientMap.erase(sockUt2.getId());
00327       map<string,bool>::iterator cm_itr = _clientMap.find(sockUt2.getId());
00328       _clientMap.erase(cm_itr);
00329 #endif
00330       return;
00331     }  
00332     if (request == "ConsumerInfo") {  
00333       TMessage infMess(kMESS_OBJECT);
00334       infMess.Reset();
00335       infMess.WriteObject(_currInfo);
00336       int sendRes = 0;
00337       sendRes = sock2->Send(infMess);
00338       if (sendRes == -1) {
00339         cerr << myName << "Error sending TConsumerInfo." << endl;
00340         sockUt2.closeSocket();
00341       } 
00342       if (_clientMap[sockUt2.getId()]) _clientMap[sockUt2.getId()] = false;      
00343     }
00344     else {
00345       // Assume the received requestString is a request for an object.
00346       // Get object by name and send it
00347       if (_clientMap[sockUt2.getId()]) {
00348         Int_t sendRes=0;
00349         sendRes = sock2->Send(DspRequestNewInfo.c_str());
00350         if ( sendRes == -1) {
00351           cerr << myName << ": ERROR: Socket->Send returned "
00352                << "with " << sendRes << ".\n" 
00353                << "               ==> The socket will be closed."
00354                << endl;
00355           // Close the socket in case of an error.
00356           sockUt2.closeSocket();
00357         }
00358       }        
00359       else {
00360         Bool_t objFound = kFALSE; 
00361         TListIter iter(&_currList);
00362         while (MessageStorage* obj = (MessageStorage*) iter()) {
00363           if (obj->GetName() == request) {
00364             objFound = kTRUE;  
00365             TMessage sendMess(kMESS_OBJECT);
00366             TMessage* storeMess = obj->getMessage();
00367             Int_t size = storeMess->BufferSize();
00368             Int_t bufSize = size+200;  
00369             char* buffer = new char[bufSize];
00370             // For debugging: 
00371             // cout << "Buffer Size:" << storeMess->BufferSize() << endl;
00372             storeMess->ReadBuf(buffer,bufSize);
00373             storeMess->Reset();
00374             sendMess.WriteBuf(buffer,bufSize);
00375             Int_t sendRes=0;
00376             sendRes=sock2->Send(sendMess);
00377             if ( sendRes == -1) {
00378               cerr << myName << ": ERROR: pollClients(): "
00379                    << "Socket->Send returned with " << sendRes << ".\n" 
00380                    << "               ==> The socket will be closed."
00381                    << endl;
00382               // Close the socket in case of an error.
00383               sockUt2.closeSocket();
00384             }
00385             // For debugging:                
00386             // cout << "SendRes = " << sendRes << endl;
00387             delete [] buffer;
00388             continue;
00389           }
00390         }       
00391         if (!objFound) {
00392           Int_t sendRes=0;
00393           sendRes = sock2->Send(DspObjectNotFound.c_str());
00394           if ( sendRes == -1) {
00395             cerr << myName << ": ERROR: Socket->Send returned "
00396                  << "with " << sendRes << ".\n" 
00397                  << "               ==> The socket will be closed."
00398                  << endl;
00399             // Close the socket in case of an error.
00400             sockUt2.closeSocket();
00401           }
00402         }
00403       }          
00404     } // else of if (request == "ConsumerInfo")
00405   } // if (sock2 != (TSocket*)-1)
00406   
00407   return;
00408 }
00409 
00410 
00411 TConsumerInfo* DisplayServer::getCurrentInfo()
00412 {
00413   return _currInfo;  
00414 }
00415 
00416 
00417 void DisplayServer::editStorageList(const string& nameString)
00418 {
00419   const string myName("DisplayServer::editStorageList()");
00420   char* help1 = 0;
00421   char* help2 = 0;
00422 
00423   // Add those objects to "oldList" which are in list but not named
00424   // in nameString anymore.
00425   if (_debugFlags & 0x8) {
00426     cout << "Outdated Size = " << _outdatedList.GetSize() << "  list size = "
00427          << _currList.GetSize() << endl;
00428   }  
00429   TListIter iter1(&_currList);
00430   while (MessageStorage* obj = (MessageStorage*) iter1()) {
00431     Bool_t found = kFALSE;
00432     // For debugging:
00433     //cout << myName << obj->GetName() << endl;
00434     //Find the first $:
00435     char* cpyName = new char[nameString.size()+1];
00436     strcpy(cpyName, nameString.c_str());
00437     help1 = const_cast<char *>(strchr(cpyName,'$'));
00438     while ((help2 = const_cast<char*>(strtok(help1, "$"))) && (!found)) {
00439       if (obj->GetName() == string(help2)) found = kTRUE;        
00440       help1 = 0; 
00441     }
00442     if (!found) {
00443       // Check whether an object of this type exists already in
00444       // oldList.
00445       TListIter iterOld(&_outdatedList);
00446       Bool_t oldFound = kFALSE;
00447       while (MessageStorage* oldObj = (MessageStorage*) iterOld()) {
00448         if (oldObj->GetName() == obj->GetName()) {
00449           oldObj->updateMessage(obj->getMessage());
00450           oldFound = kTRUE;
00451           continue;
00452         }      
00453       }
00454       if (!oldFound) {
00455         cout << _clName << "::editStorageList(): INFO: The object '"
00456              << obj->GetName() << "' was removed\nfrom the object list"
00457              << " by the consumer.\nThe last copy is kept by the Display"
00458              << " Server." << endl; 
00459         _outdatedList.AddLast(obj);
00460       } 
00461     }
00462   }       
00463   _currList.Delete();
00464   if (_currList.GetSize() > 0) {
00465     cerr << myName << ": list size should be 0." << endl;
00466   }  
00467   // Extract the names of the Canvases, which are supposed to be separated
00468   // by $ signs.
00469   // Build the list new.
00470   //Find the first $:
00471   string cpy2Name(nameString);
00472   help1 = const_cast<char*>(strchr(cpy2Name.c_str(),'$'));
00473   while ((help2 = const_cast<char*>(strtok(help1, "$")))) {
00474     if (_debugFlags & 0x2) {
00475       cout << myName << ": Found String: " << help2 << endl;
00476     }  
00477     MessageStorage *storage = new MessageStorage(help2);
00478     _currList.AddLast(storage);    
00479     help1 = 0; 
00480   } 
00481 
00482   return;
00483 }
00484 
00485 
00486 void DisplayServer::updateStateManager()
00487 {
00488   // Fill consumer list and send the list to the state manager. 
00489 
00490   const string myName("DisplayServer::updateStateManager()");
00491 
00492   if (_currInfo) {
00493     _conList->addEntry(_currInfo->GetTitle(), (TString)(_hostName.c_str()),
00494                        _clientPort, _currInfo->nevents(), _currInfo->runnumber(),
00495                        ConsumerList::Running);
00496     //conslist.print();
00497     if (_debugFlags & 0x10)
00498       cout << myName << ": Consumer information will be sent to\n"
00499            << spaces << "the State Manager running on b0dap30.fnal.gov" << endl;
00500     // Send list to StateManager
00501     //    _conList->sendList("b0dap30.fnal.gov",9090);
00502   }
00503   else {
00504     cerr << myName << ": Consumer Info not available." << endl;
00505     _conList->addEntry("Unknown", (TString)(_hostName.c_str()), _clientPort,
00506                        0, 0, ConsumerList::Running);
00507     //    _conList->sendList("b0dap30.fnal.gov",9090);
00508   }
00509 
00510   return;
00511 }
00512 
00513 
00514 Bool_t DisplayServer::HandleTimer(TTimer* /* timer */)
00515 {
00516   updateStateManager();
00517 
00518   return kTRUE;
00519 }
00520 
00521 
00522 void DisplayServer::setRequireUpdateBits()
00523 {
00524   map<string,bool>::iterator it;
00525   for (it=_clientMap.begin(); it!=_clientMap.end(); ++it) {
00526     //string name = (*it).first;
00527     //_clientMap[name] = true;
00528     (*it).second = true;
00529   }
00530   return;
00531 }
00532 
00533 
00534 int DisplayServer::openClientServerSocket() 
00535 {
00536   const string myName("DisplayServer::openClientServerSocket()");
00537 
00538   int rtVal = DS_OK;
00539 
00540   // Open a server socket for the display clients to connect 
00541   _ss = new TServerSocket(_clientPort, kTRUE);
00542   // Open a server socket for the clients, testing different ports
00543   while (! _ss->IsValid()) {  
00544     delete _ss;
00545     _clientPort++;
00546     _ss = new TServerSocket(_clientPort, kTRUE);
00547   }
00548   if (!(_ss->IsValid())) {
00549     cout << myName << ": server socket not valid." << endl;
00550     _error = DS_BAD_CLIENT_SSOCKET;
00551     rtVal  = DS_BAD_CLIENT_SSOCKET;
00552   }
00553   else {    
00554     cout << barrier << "\n\n" 
00555          << _clName << ": Server socket for HISTO DISPLAY clients at PORT " 
00556          << _clientPort << endl; 
00557     cout << spaces << "available.\n\n"
00558          << spaces << "Use   " << _hostName << ":" << _clientPort << endl; 
00559     cout << spaces << "as input stream to connect from the HistoDisplay" << endl;
00560     cout << spaces << "to the Display Server.\n\n"  
00561          << barrier << "\n" 
00562          << endl;
00563     // Disable blocking mode for this server socket  
00564     _ss->SetOption(kNoBlock,1);  
00565   }
00566 
00567   return rtVal;
00568 } 
00569 
00570 
00571 
00572 
00573 
00574 
00575 
00576 
00577 
00578 
00579 

Generated on Mon Feb 15 11:06:37 2010 for loon by  doxygen 1.3.9.1