Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

ConsumerExport.cc

Go to the documentation of this file.
00001 //-------------------------------------------------------------------------
00002 // File: ConsumerExport.cc
00003 //
00004 // The ConsumerExport class provides methods for the communication of the
00005 // consumer with its display server.
00006 //
00007 // 07/05/2000 Started    Wolfgang Wagner 
00008 //
00009 //-------------------------------------------------------------------------
00010 //****************************************************************************
00011 // RCS Current Revision Record
00012 //-----------------------------------------------------------------------------
00013 // $Source: /cvs/minoscvs/rep1/minossoft/CDFMonitoringFwk/ConsumerExport.cc,v $
00014 // $Revision: 1.9 $
00015 // $Date: 2009/02/28 21:46:11 $
00016 // $Author: gmieg $
00017 // $State: Exp $
00018 // $Locker:  $
00019 //*****************************************************************************
00020 
00021 // Comment on Error Handling:
00022 // If a socket->send fails the socket is closed immediately.
00023 
00024 #include "ConsumerExport.h"
00025 
00026 #include <sys/wait.h>
00027 #include <signal.h>
00028 #include <unistd.h>
00029 
00030 #include <cstdlib>
00031 #include <string>
00032 
00033 #include "TConsumerInfo.h"
00034 #include "ServerProtocol.h"
00035 
00036 using std::string;
00037 using std::cout;
00038 using std::endl;
00039 using std::cerr;
00040 
00041 // #define CONSUMEREXPORT_DEBUG
00042 
00043 const int serverStartFailCode = 20;  
00044 static pid_t serverPid = 0;
00045 static bool serverStartFailed = false;
00046 static bool brokenPipe = false;
00047 
00048 extern "C"
00049 {
00050   static void sigChildHandler(int sig)
00051   {
00052     const string myName = string("ConsumerExport#sigChildHandler");
00053     bool locDebug = false;
00054 #ifdef CONSUMEREXPORT_DEBUG
00055     locDebug = true;
00056 #endif
00057 
00058     if (locDebug) {
00059       cout << myName << ": Got signal ";
00060       if (sig == SIGCHLD) cout << "SIGCHLD (";
00061       cout << "= " << sig << ")." << endl;
00062     }
00063     pid_t waitRes;
00064     int status;
00065     waitRes = waitpid(serverPid, &status, WNOHANG);         
00066     if (locDebug)  cout << myName << ": Returned process: pid = " 
00067                         << waitRes << "." << endl;
00068     if (waitRes == serverPid) {
00069       // The returned process was the display server. 
00070       if (WIFEXITED(status)) {
00071         if (locDebug) cout << myName << ": Normal termination, exit status = " 
00072                            << WEXITSTATUS(status) << endl;
00073         if (WEXITSTATUS(status) == serverStartFailCode) {
00074           cerr << myName <<  ": Failed to start the Server. The forked\n"
00075                << "child process could not find the Server program."
00076                << endl;
00077           serverStartFailed = true;
00078         }
00079       } 
00080       else {
00081         if (WIFSIGNALED(status)) {
00082           cerr << myName << ": Abnormal termination, signal number = "
00083                << WTERMSIG(status) << 
00084 #ifdef WCOREDUMP
00085           ( WCOREDUMP(status) ? ". Core file generated." : "." );
00086 #else
00087           "."; 
00088 #endif
00089           cout << endl;
00090         }
00091       }
00092     }
00093     else {
00094       if (locDebug) cout << "Process other than the server returned." << endl;
00095     }
00096  
00097     return;
00098   }
00099   //------------------------------------------------------------------------
00100   static void pipeBroken(int sig)
00101   {    
00102     const string myName = string("ConsumerExport#pipeBroken");
00103     cerr << myName << ": Got signal SIGPIPE (" << sig << ")."  << endl;  
00104     brokenPipe = true;
00105   } 
00106 
00107 }
00108 
00109 ConsumerExport::ConsumerExport(const int firstPort=9050, 
00110   const bool restartEnable, const int reportToStateManager) 
00111   : _debug(false), 
00112     _restartEnable(restartEnable), 
00113     _sendingEnable(true),
00114     _reportToStateManager(reportToStateManager), 
00115     _port(firstPort), 
00116     _serverPid(1), 
00117     _canvasString(0), 
00118     _consMonitor(new TMonitor), _consSS(0), _consSock(0)       
00119 {
00120   const int maxPort=9999;
00121   const string myName = string("ConsumerExport::Constructor");
00122   
00123 #ifdef CONSUMEREXPORT_DEBUG
00124     _debug = true;
00125 #endif
00126 
00127   if ((int)signal(SIGCHLD, sigChildHandler) == -1) 
00128     cerr << myName << ": ERROR setting signal handler for SIGCHLD." << endl;
00129   if ((int)signal(SIGPIPE,pipeBroken) == -1) 
00130     cerr << myName << ": ERROR setting signal handler for SIGPIPE." << endl;
00131 
00132   _consSS      = new TServerSocket(_port,kTRUE);
00133   // Open server socket, Testing different ports  
00134   while( (!_consSS->IsValid()) && (_port < maxPort) ) {  
00135     delete _consSS;
00136     _port++;
00137     _consSS = new TServerSocket(_port,kTRUE);
00138   }
00139   if(_port == maxPort) {
00140     cerr << myName << ": Maximum port number reached. No free port "
00141          << "was found."
00142          << " ==>> RETURN NOW ! " << endl;   
00143     _sendingEnable = false;
00144   }
00145   if (_consSS) {
00146     if (!_consSS->IsValid()) {
00147       cerr << myName << ": Server socket is not valid." << endl;
00148       _sendingEnable = false;
00149       delete _consSS;
00150       _consSS = 0;
00151     }  
00152     else {
00153       if ( (startServer() > 0) && (!serverStartFailed) ) connectServer();
00154       else {
00155         cerr << myName << ": ERROR: Failed to start the server program."  
00156              << endl;  
00157         cerr << myName << ": The Consumer will continue without sending "
00158              << "monitoring information to the server." << endl;
00159         _sendingEnable = false;
00160         delete _consSS;
00161         _consSS = 0;
00162       }
00163     }
00164   }
00165   else {
00166     _sendingEnable = false;
00167   }
00168   if (_debug) cout << myName << ": End of Constructor." << endl;
00169 }
00170 
00171 
00172 ConsumerExport::~ConsumerExport()
00173 {
00174   if (_consSock) {
00175     _consSock->Send(DspEndConnection.c_str());
00176     _consSock->Close();
00177   }
00178 
00179   delete [] _canvasString;
00180 
00181 }
00182 
00183 
00184 int ConsumerExport::_maxStringLength=1000;
00185 
00186 
00187 int ConsumerExport::send(TConsumerInfo *consinfo, Bool_t modFlag)
00188 { 
00189   // This method sends out the consumer info and the whole list of objects 
00190   // stored in consumer info via the socket to the display server. 
00191   // The protocol with a leading "CONSUMER SEND"
00192   // and a trailing "CONSUMER FINISHED" is implemented.
00193   //
00194   // Return value: number of messages which were successfully sent
00195   //               If the socket is no longer valid: -1
00196   //
00197   // If the number of consequetively failed messages exceeds a limit 
00198   // (_failedLimit) the socket is closed. As a consequence IsValid
00199   // returns false and the send-method returns -1.
00200   //   
00201   const string myName = string("ConsumerExport::send()");
00202   
00203   static Bool_t firstCall=kTRUE;
00204   int rtvalue=0; 
00205   Int_t sendRes=0;
00206  
00207   // cout << myName << ": begin of function." << endl;
00208   if (_sendingEnable && !brokenPipe) {
00209     if (_consSock && _consSock->IsValid() && !brokenPipe) { 
00210       TMessage mess(kMESS_OBJECT);
00211       mess.Reset();
00212       //-----------------------------------------------------
00213       // 1.) Send start key word "CONSUMER SEND"
00214       sendRes=_consSock->Send(DspConsumerSend.c_str());
00215       if (sendRes == -1) {
00216         errorReact(sendRes);
00217         return -4;
00218       } 
00219       //-----------------------------------------------------
00220       // 2.) send the consumer info
00221       if (_consSock && _consSock->IsValid() && !brokenPipe) {
00222         if (consinfo) {
00223           mess.Reset();
00224           mess.WriteObject(consinfo);
00225           sendRes = _consSock->Send(mess);
00226           if (sendRes == -1) {
00227             errorReact(sendRes);
00228             return -5;
00229           }
00230           else {
00231             rtvalue++;
00232           }
00233         }
00234         else {
00235           cerr << myName << ": Pointer to TConsumerInfo is NULL." << endl;
00236           return -3;
00237         } 
00238       }
00239       //-----------------------------------------------------
00240       // 3.) send all objects in the TConsumerInfo list of objects
00241       if ((consinfo->list())->GetSize() > 0) {
00242         const char* ctrlString;
00243         if (firstCall || modFlag || consinfo->isModified()) {
00244           // Create a list of all objects to be sent. 
00245           // Find out whether the default string length for the canvases
00246           // is large enough. 
00247           int actualLength=lengthNeeded(consinfo->list());
00248           if (_maxStringLength < actualLength) {
00249             _maxStringLength = actualLength;       
00250           }
00251           createCanvasString(consinfo->list());
00252           firstCall  = kFALSE;
00253           ctrlString = _canvasString;
00254           consinfo->setModified(false);
00255         }
00256         else {
00257           ctrlString = "Unmodified";
00258         }       
00259         if (_consSock && _consSock->IsValid() && !brokenPipe) {
00260           mess.Reset();
00261           // 4.) Send the string with the objects' name
00262           sendRes=_consSock->Send(ctrlString);
00263           if (sendRes == -1) errorReact(sendRes);
00264           else {
00265             rtvalue++;
00266           }
00267         }  
00268         send(consinfo->list());
00269       }
00270       //-----------------------------------------------------
00271       // 5.) Send the finish string 
00272       mess.Reset();
00273       if (_consSock->IsValid()) {
00274         sendRes=_consSock->Send(DspConsumerFinish.c_str());
00275         if (sendRes == -1) errorReact(sendRes);
00276       }
00277     }
00278     else {
00279       cerr << myName << ": ERROR: Socket to display server is no "
00280            << "longer valid." << endl;
00281       if (_consSS && _restartEnable) {
00282         // It makes only sense to try to reconnect if the consumer server
00283         // socket is still there.  
00284         reestablishServer();
00285         firstCall = kTRUE;
00286       }
00287       else {
00288         // Disable the sending. 
00289         _sendingEnable = false;
00290       }
00291       rtvalue = -1;         
00292     }
00293   }
00294   else { // Sending is disabled.
00295     rtvalue = -2;
00296   } 
00297 
00298   return rtvalue;
00299 }
00300 
00301 
00302 pid_t ConsumerExport::getServerPid()
00303 {
00304   return _serverPid;
00305 }
00306 
00307 
00308 int ConsumerExport::send(TList *objlist)
00309 { 
00310   // This method sends out a whole list of objects via the socket 
00311   // to the display server.
00312   //
00313   // Return value: number of objects in the list which were sent
00314   //               If the socket is no longer valid: -1
00315   //
00316   // If the number of consequtively failed messages exceeds a limit 
00317   // (_failedLimit) the socket is closed. As a consequence IsValid
00318   // returns false and the send-method returns -1.
00319   // 
00320   int rtvalue=0; 
00321   Int_t sendRes=0;
00322 
00323   if (_sendingEnable) {
00324     if (_consSock && _consSock->IsValid() && !brokenPipe) { 
00325       TMessage mess(kMESS_OBJECT);
00326       //-----------------------------------------------------
00327       TListIter iter(objlist);
00328       while (TObject *obj = iter()) {
00329         if (_consSock && _consSock->IsValid() && !brokenPipe) { 
00330           mess.Reset();
00331           mess.WriteObject(obj);
00332           sendRes=_consSock->Send(mess);
00333           if (sendRes == -1) errorReact(sendRes);
00334           else {
00335             rtvalue++;
00336           } 
00337         }    
00338       }
00339     }
00340     else {
00341       cerr << "ConsumerExport: ERROR: Socket to display server is no "
00342            << "longer valid." << endl;
00343       rtvalue = -1;         
00344     } 
00345   }
00346   else { // Sending is disabled.
00347     rtvalue = -2;
00348   } 
00349 
00350   return rtvalue;
00351 }
00352 
00353 int ConsumerExport::send(const char *messString)
00354 {
00355   // This method simply sends out a string to the consumer.
00356   //
00357   // Return value:  1 if no error occured
00358   //                0 if the send of the string failed
00359   //               -1 if the socket is no longer valid
00360   // 
00361   int rtvalue=0;  
00362   Int_t sendRes=0;
00363 
00364   if (_sendingEnable) {
00365     if( _consSock && _consSock->IsValid()) { 
00366       sendRes=_consSock->Send(messString);
00367       if (sendRes == -1) errorReact(sendRes);
00368       else {
00369         rtvalue++;
00370       } 
00371     }
00372     else {
00373       cout << "ConsumerExport: ERROR: Socket to display server is no "
00374            << "longer valid." << endl;
00375       rtvalue = -1;         
00376     } 
00377   }
00378   else { // Sending is disabled.
00379     rtvalue = -2;
00380   } 
00381 
00382   return rtvalue;
00383 }
00384 
00385 int ConsumerExport::send(const TObject *object)
00386 {
00387   // This method sends a single objects via the socket to the display server.
00388   // The protocol with a leading "CONSUMER SEND" and a trailing 
00389   // "CONSUMER FINISHED" is implemented.
00390   //
00391   // Return value:  1 if no error occured
00392   //                0 if the send failed       
00393   //               -1 if the socket is no longer valid
00394   int rtvalue=0; 
00395   Int_t sendRes=0;
00396 
00397   if (_sendingEnable) {
00398     if (_consSock && _consSock->IsValid()) { 
00399       TMessage mess(kMESS_OBJECT);
00400       mess.Reset();
00401       sendRes=_consSock->Send(DspConsumerSend.c_str());
00402       if (sendRes == -1) errorReact(sendRes); 
00403       //-----------------------------------------------------    
00404       if (_consSock->IsValid()) {
00405         mess.Reset();
00406         mess.WriteObject(object);
00407         sendRes=_consSock->Send(mess);
00408         if (sendRes == -1) errorReact(sendRes);
00409         else {
00410           rtvalue=1;
00411         }
00412       }
00413       //-----------------------------------------------------
00414       mess.Reset();
00415       sendRes=_consSock->Send(DspConsumerFinish.c_str());
00416       if (sendRes == -1) errorReact(sendRes);
00417     }
00418     else {
00419       cout << "ConsumerExport: ERROR: Socket to display server is no "
00420            << "longer valid." << endl;
00421       rtvalue = -1;         
00422     }
00423   }
00424   else { // Sending is disabled.
00425     rtvalue = -2;
00426   } 
00427 
00428   return rtvalue;
00429 } 
00430 
00431 
00432 void ConsumerExport::errorReact(const Int_t sendRes)
00433 {
00434   // This method implements the error reaction if the sending of a message  
00435   // fails.
00436 
00437   cerr << "ConsumerExport: ERROR: Socket-Send returned with "
00438        << sendRes << ".\n" 
00439        << "               ==> Socket connection to the Display Server is \n"
00440        << "                   going to be closed IMMEDIATELY."
00441        << endl;
00442   _consSock->Close();
00443   _consSock = 0;
00444 }
00445 
00446 
00447 void ConsumerExport::createCanvasString(TList *canList)
00448 {
00449   delete [] _canvasString;
00450   _canvasString = new char[_maxStringLength];
00451 
00452   strcpy(_canvasString, "Modified");
00453   TListIter iter(canList);
00454   while (TObject *obj = iter()) {
00455     strcat(_canvasString, "$");
00456     strcat(_canvasString, obj->GetName());
00457   } 
00458 
00459 }
00460 
00461 int ConsumerExport::lengthNeeded(TList *canList)
00462 {
00463   int length=0;
00464 
00465   TListIter iter(canList);
00466   while (TObject *obj = iter()) {
00467     length += strlen(obj->GetName());
00468     length++;      // Take also the $ signs into account
00469   } 
00470   
00471   length += strlen("Unmodified");
00472 
00473   length += 10;   // Add some extra margin 
00474 
00475   return length;
00476 }
00477 
00478 
00479 int ConsumerExport::startServer()
00480 {
00481   // Starts the display server using fork and exclp.
00482 
00483   const string myName = string("ConsumerExport::startServer()");
00484 
00485   int rtValue = 0;
00486   char portChar[128];
00487   char reportToSmChar[128];
00488   char fullPath[128];
00489   char fullPath2[128];
00490 
00491   sprintf(fullPath, "Server");
00492   sprintf(portChar, "%d", _port);
00493   sprintf(reportToSmChar, "%d", _reportToStateManager);
00494   if ( (_serverPid = fork()) < 0) { 
00495     cerr << myName << ": ERROR: fork failed." << endl;  
00496     rtValue = _serverPid;
00497   }
00498   else {  // Fork successful.
00499     rtValue = _serverPid;
00500     if (_serverPid == 0) {  // Child process
00501       int execRes = 0;
00502       if (_debug) cout << myName << ": fullPath = " << fullPath << endl;
00503       if ( (execRes = execlp(fullPath, "Server", portChar, reportToSmChar,
00504            (char *) 0)) < 0) {  
00505         if (getenv("CONSUMERBINDIR")) {
00506           if (_debug) cout << "consumerbindir:" << getenv("CONSUMERBINDIR") 
00507                            << endl;
00508           sprintf(fullPath, "%s/Server", getenv("CONSUMERBINDIR"));
00509           if ( (execRes = execlp(fullPath, "Server", portChar, reportToSmChar,
00510                (char *) 0)) < 0) {  
00511             if (getenv("CDFSOFT2_DIR")) {
00512               // cout << "CDFSOFT2_DIR = " << getenv("CDFSOFT2_DIR") << endl; 
00513               sprintf(fullPath2, "%s/bin/%s/Server", getenv("CDFSOFT2_DIR"),
00514                 getenv("BFARCH"));
00515               if ( (execRes = execlp(fullPath2, "Server", portChar, 
00516                    reportToSmChar, (char *) 0)) < 0) {  
00517                 cerr << myName << ": ERROR: execlp failed. rt = " << execRes 
00518                      << endl;
00519                 cerr << myName << ": ERROR: Could not find the Server "
00520                      << "Executable.\n Three different possibilities were "
00521                      << "checked, all of them failed." << endl;
00522                 cerr << myName << "1.) Default PATH. " << endl;  
00523                 cerr << myName << "2.) using $CONSUMERBINDIR: " << fullPath
00524                      << "." << endl;
00525                 cerr << myName << "3.) using the release directory "
00526                      << "$CDFSOFT2_DIR: " << fullPath2 << endl; 
00527               }
00528             }
00529           } 
00530         } 
00531         else {
00532           if (getenv("CDFSOFT2_DIR")) {
00533             // cout << "CDFSOFT2_DIR = " << getenv("CDFSOFT2_DIR") << endl; 
00534             sprintf(fullPath2, "%s/bin/%s/Server", getenv("CDFSOFT2_DIR"),
00535             getenv("BFARCH"));
00536             if ( (execRes = 
00537                   execlp(fullPath2, "Server", portChar, (char *) 0)) < 0) {  
00538               cerr << myName << ": ERROR: execlp failed. rt = " << execRes << endl;
00539               cerr << myName << ": ERROR: Could not find the Server "
00540                    << "Executable. Three different possibilities\n were "
00541                    << "checked. All of them FAILED !!!" << endl;
00542               cerr << myName << ": 1.) Server not in PATH." << endl;  
00543               cerr << myName << ": 2.) $CONSUMERBINDIR is not set." << endl;
00544               cerr << myName << ": 3.) Using the release directory "
00545                      << "$CDFSOFT2_DIR: " << fullPath2 << endl; 
00546               cerr << myName << ": Therefore, the child process EXISTS NOW!"
00547                    << endl;
00548               // Since the Server program could not be found anywhere,
00549               // the child has to exit.
00550               exit(serverStartFailCode); 
00551             }
00552           }
00553         }
00554       }
00555     }
00556     else {  // Mother process
00557       serverPid = _serverPid;
00558       sleep(1);
00559       if (serverStartFailed) rtValue = -10;
00560     }
00561   }
00562   time_t locTime; 
00563   _serverStartTime = time(&locTime); 
00564 
00565   return rtValue;
00566 }
00567 
00568 
00569 int ConsumerExport::connectServer()
00570 {
00571   const string myName = string("ConsumerExport::connectServer()");
00572   int res = 1;
00573 
00574   if (_debug) 
00575     cout << myName << ": Waiting for display server to connect." << endl;
00576   if (_consSS) {
00577     _consSock = _consSS->Accept();
00578     if (_debug) cout << myName << ": Accept returned." << endl; 
00579     if (_consSock && _consSock->IsValid()) {
00580       if (_debug) { 
00581         cout << myName << ": Connection to display server okay." << endl; 
00582       }
00583       _consSock->SetOption(kNoDelay,1);
00584       _consMonitor->Add(_consSock);
00585       res = 0;
00586       brokenPipe = false;
00587     }
00588     else {
00589       cerr << myName << ": ERROR: the socket connection between display "
00590            << "server and \nconsumer failed." << endl;
00591       res = -1;
00592     }        
00593   }
00594   else {
00595     res = -2;
00596     cerr << myName << ": ERROR: consumer server socket not valid." 
00597          << endl;
00598     cerr << myName << ": Further reconnect trials don't make sense." << endl;
00599   }
00600 
00601   return res;
00602 }
00603 
00604 
00605 int ConsumerExport::reestablishServer()
00606 {
00607   const string myName = string("ConsumerExport::reestablishServer()");  
00608   const int minReconnectTime = 30;
00609   int rtValue = 1;
00610   bool locDebug = false;   
00611 #ifdef CONSUMEREXPORT_DEBUG
00612   locDebug = true;
00613 #endif
00614   // Try to restart and reconnect to the display server,
00615   // but only if at least a minimum difference passed since the last
00616   // restart.   
00617   time_t nowTime;
00618   time(&nowTime);
00619   if ( (difftime(nowTime, _serverStartTime) > minReconnectTime) &&
00620        _restartEnable) {
00621     // Check whether the previous server existed.
00622     pid_t waitRes;
00623     int status;
00624     waitRes = waitpid(_serverPid, &status, WNOHANG);         
00625     if (locDebug) cout << myName << ": waitRes = " << waitRes << endl;
00626     if ( ((!WIFEXITED(status)) && (!WIFSIGNALED(status))) 
00627 #ifdef WIFSTOPPED
00628 || WIFSTOPPED(status) 
00629 #endif 
00630 #ifdef WIFCONTINUED
00631 || WIFCONTINUED(status) 
00632 #endif
00633       ) {
00634       // The server did not terminat neither normally nor abnormally.
00635       // In this case we try to kill the process to make sure
00636       // it terminated before starting a new process.
00637       cerr << myName << ": Server was still alive. Need to kill it."
00638            << endl; 
00639       kill(_serverPid, SIGKILL);
00640     }
00641     sleep(2);
00642     // Check /proc
00643     char procFileName[200];
00644     sprintf(procFileName, "/proc/%d/stat", ((int)_serverPid));
00645     FILE* fp;
00646     if ((fp = fopen(procFileName, "r"))) {
00647       cerr << myName << ": Server was still alive (/proc entry). "
00648            << "Need to kill it." << endl; 
00649       kill(_serverPid, SIGKILL);
00650       fclose(fp);
00651     }
00652     cout << myName << ": Going to restart the server." << endl;
00653     _serverStartTime = time(&nowTime);
00654     if (startServer() != -1) rtValue = connectServer();
00655     sleep(1);
00656   }
00657   else {
00658     rtValue = -2;
00659   }
00660 
00661   return rtValue;
00662 }

Generated on Mon Feb 15 11:06:33 2010 for loon by  doxygen 1.3.9.1