00001 00002 // // 00003 // DDSParentServer // 00004 // // 00005 // Package: DDS (Data Dispatcher System). // 00006 // // 00007 // S. Kasahara 2/2001 // 00008 // // 00009 // Purpose: This class is part of the MINOS data dispatcher system. It is // 00010 // the main class of the parent server, which resides on the same // 00011 // local network as the online data generator. The DDSParentServer // 00012 // handles the setup of a ROOT TServerSocket to listen for // 00013 // connections from potential clients. When a client connection is // 00014 // accepted, DDSParentServer spawns a child server (managed by // 00015 // DDSChildServer class) to handle the client data requests. // 00016 // // 00017 // Based in part on ideas from the LHCb OTT online monitoring system. // 00019 00020 #include <cstdlib> 00021 #include <unistd.h> // fork used to spawn child process 00022 #ifndef MACOSX 00023 #include <wait.h> // waitpid used to check status of exited children 00024 #else 00025 #include <sys/wait.h> // waitpid used to check status of exited children 00026 #endif 00027 #include <errno.h> // errno used to catalog errors in system calls 00028 00029 #include "TMessage.h" 00030 #include "TSystem.h" 00031 00032 #include "MessageService/MsgService.h" 00033 00034 #include "Dispatcher/DDS.h" 00035 #include "Dispatcher/DDSParentServer.h" 00036 #include "Dispatcher/DDSClientId.h" 00037 00038 std::ostream& operator << (std::ostream& ms, DDSParentServer* ps) 00039 {return ps->Print(ms);} 00040 00041 ClassImp(DDSParentServer) 00042 00043 // Definition of static data members 00044 // ********************************* 00045 00046 CVSID("$Id: DDSParentServer.cxx,v 1.16 2009/02/28 21:46:12 gmieg Exp $"); 00047 00048 // Definition of methods (alphabetical order) 00049 // ****************************************** 00050 00051 //_____________________________________________________________________________ 00052 bool DDSParentServer::Authorize() const { 00053 // Purpose: Authorize currently connected client to use dispatcher. 00054 // Address of currently connected client must have been previously 00055 // stored in fClientAddress. 00056 // 00057 // Argument: none. 00058 // 00059 // Return: true if client is authorized, else false. 00060 // 00061 // Contact: S. Kasahara 00062 // 00063 // Notes: Currently a dummy routine, always returns true. 00064 // 00065 00066 // perform check on client address, if ok then 00067 return true; 00068 00069 } 00070 00071 //_____________________________________________________________________________ 00072 UInt_t DDSParentServer::CheckChildStatus() { 00073 // Purpose: Check status of any exited child processes spawned by 00074 // parent. This has the important side benefit of clearing 00075 // "zombies" (child processes that have exited but have status 00076 // values left to be read). The fNumChild counter of currently 00077 // active children is decremented for each newly exited child. 00078 // 00079 // Argument: none. 00080 // 00081 // Return: Number of remaining active child processes. 00082 // 00083 // Contact: S. Kasahara 00084 // 00085 // Notes: Another way to do this would be for the parent server to maintain 00086 // a list of spawned child processes and inquire each child's status 00087 // according to its process id. 00088 00089 int childpid; // pid number of exited child processes 00090 Int_t childstat; // exit status of child process 00091 00092 bool newexit = false; 00093 while ((childpid = waitpid(-1, &childstat, WNOHANG)) > 0) { 00094 // This child has exited 00095 if ( !newexit ) { 00096 newexit = true; 00097 MSG("DDS",Msg::kInfo) 00098 << "PS: ParentServer reporting on status of exited children: " << endl; 00099 } 00100 00101 fNumChild--; // decrement number of active children 00102 00103 if ( childstat != 0 ) { 00104 // the child process exited with an error status 00105 MSG("DDS",Msg::kWarning) << "PS: Child process " << childpid << 00106 " has exited with error status " << WEXITSTATUS(childstat) 00107 << "." << endl; 00108 } 00109 else { 00110 // if in info mode report all child exits 00111 MSG("DDS",Msg::kInfo) << "PS: Child process " << childpid << 00112 " has exited successfully." << endl; 00113 } 00114 fPSStatus.RemoveClientId(childpid); 00115 } 00116 00117 if ( newexit ) { 00118 MSG("DDS",Msg::kInfo) << "PS: Number of children currently running = " 00119 << fNumChild << "." << endl; 00120 } 00121 00122 return fNumChild; 00123 00124 } 00125 00126 00127 //_____________________________________________________________________________ 00128 DDSParentServer::DDSParentServer(UInt_t port, UInt_t maxchild, Int_t logLevel, 00129 Int_t maxInactive, Int_t backlog): 00130 fTServerSocket(0),fMaxChild(maxchild),fLogLevel(logLevel), 00131 fMaxInactive(maxInactive),fNumChild(0),fShutdown(false),fTSocket(0), 00132 fMessageIn(0), fMessageOut(), fPSStatus() { 00133 // Purpose: Constructor. This constructor creates a TServerSocket 00134 // attached to the specified port. A TServerSocket is created 00135 // and bound to the specified port. 00136 // 00137 // Arguments: port: port number assigned to server socket. If port = 0 00138 // (default), the serversocket will be connected to the 00139 // named service "MinosDDS", which should be assigned to 00140 // a port in the /etc/services file. 00141 // maxchild: maximum number of concurrent child servers allowed 00142 // (default = kMaxChild). 00143 // logLevel: message verbose level 00144 // maxInactive: max inactive time before connection shutdown 00145 // backlog: maximum number of clients the kernel should queue 00146 // up on this port for pending connections (default is 00147 // TServerSocket::kDefaultBacklog). 00148 // 00149 // Return: n/a. 00150 // 00151 // Contact: S. Kasahara 00152 // 00153 // Notes: Use IsValid() to check if DDSParentServer was created successfully. 00154 // 00155 00156 // Create server socket 00157 // The second argument kTRUE sets the SO_REUSEADDR socket option 00158 // (kFALSE by default). When set true, reuse allows the listening server to 00159 // start and bind its port even if previously established connections 00160 // exist that use this port as their local port (such as if the parent 00161 // starts, spawns a child, dies, and needs to be restarted with the child 00162 // still serving from that port). 00163 if (!port) { 00164 // User has not specified an alternative port. server socket will be 00165 // bound to the port assigned to the named service "MinosDDS" 00166 fTServerSocket = new TServerSocket("MinosDDS",kTRUE,backlog); 00167 } 00168 else { 00169 fTServerSocket = new TServerSocket(port,kTRUE,backlog); 00170 } 00171 00172 // Check validity of socket 00173 if(!fTServerSocket || !fTServerSocket -> IsValid()) { 00174 // An error occured during the creation of the server socket 00175 00176 if ( port ) { 00177 MSG("DDS",Msg::kWarning) 00178 << "PS: Unable to create parent server socket connected to port " 00179 << port << "." << endl; 00180 } 00181 else { 00182 MSG("DDS",Msg::kWarning) 00183 << "PS: Unable to create parent server socket connected to named service " 00184 << "MinosDDS." << endl; 00185 } 00186 if(fTServerSocket)delete fTServerSocket; fTServerSocket = 0; 00187 fShutdown = true; 00188 } 00189 else { 00190 // Store local address of the server socket. 00191 fServerAddress = fTServerSocket -> GetLocalInetAddress(); 00192 } 00193 00194 } 00195 00196 //_____________________________________________________________________________ 00197 DDSParentServer::~DDSParentServer() { 00198 // Purpose: Destructor. 00199 // 00200 // Arguments: n/a. 00201 // 00202 // Return: n/a. 00203 // 00204 // Contact: S. Kasahara 00205 // 00206 00207 if ( IsValid() ) { 00208 delete fTServerSocket; fTServerSocket = 0; 00209 } 00210 // fTSocket should always be deleted and cleared in DDSParentServer::Run() 00211 if ( fTSocket ) delete fTSocket; fTSocket = 0; 00212 00213 } 00214 00215 //_____________________________________________________________________________ 00216 bool DDSParentServer::IsLocalClient() const { 00217 // Purpose: Determine if currently connected client with address given 00218 // in fClientAddress is local. 00219 // 00220 // Arguments: none. 00221 // 00222 // Return: true if local. 00223 // 00224 // Contact: S. Kasahara 00225 // 00226 // Notes: This method currently determines that a client is local only if it 00227 // resides on the same host as the parent server. The definition 00228 // of a local client will eventually be expanded to include other 00229 // hosts in the same local area network (once I know how to do that). 00230 // Having local client status bestows special privileges on that 00231 // client, such as: 00232 // a) local client child servers are run at higher priority than 00233 // remote client child servers. 00234 // b) only local clients may shutdown the parent server. 00235 // 00236 00237 bool local = false; 00238 00239 TString localhost(gSystem->GetHostByName(gSystem->HostName()).GetHostName()); 00240 00241 if(!localhost.CompareTo(fClientAddress.GetHostName(),TString::kIgnoreCase)){ 00242 local = true; 00243 } 00244 00245 return local; 00246 00247 } 00248 00249 //_____________________________________________________________________________ 00250 std::ostream& DDSParentServer::Print(std::ostream& ms) const { 00251 // Purpose: Print DDSParentServer status on std::ostream. 00252 // 00253 // Arguments: ms std::ostream to print on. 00254 // 00255 // Return: std::ostream reference. 00256 // 00257 // Contact: S. Kasahara 00258 // 00259 00260 if ( IsValid() ) { 00261 ms << "PS: local addr " 00262 // Print the local host name, address, port number 00263 << fServerAddress.GetHostName() 00264 << "(port " << fServerAddress.GetPort() << ")" 00265 << ", maxchild " << fMaxChild 00266 << ", loglvl " << Msg::LevelAsString(fLogLevel) 00267 << ", maxinactive(sec) " << fMaxInactive 00268 << ", currentchild " << fNumChild << "." << endl; 00269 } 00270 else { 00271 ms << "PS: socket is not connected." << endl; 00272 } 00273 00274 return ms; 00275 00276 } 00277 00278 //_____________________________________________________________________________ 00279 void DDSParentServer::Run() { 00280 // Purpose: This is the main method of the DDSParentServer. It 00281 // listens for connections from new dispatcher clients. 00282 // When a new connection with a client is established, 00283 // the parent server will receive, process and respond to 00284 // one and only one service request from the client before 00285 // before disconnecting from that client. This allows 00286 // the parent server to remain open for listening to new connection 00287 // requests. Each request received (of type DDS::EMessageType) 00288 // is responded to in return with a single message of type 00289 // DDS::EMessageType sent to the client upon completion of 00290 // servicing the request. In this way the client can remain in 00291 // sync with the parent server's processing of its request. 00292 // 00293 // The services which the client can currently request are: 00294 // DDS::kData == Request for data. This request spawns a separate 00295 // child server to handle the client. 00296 // DDS::kShutdown == Request to shutdown parent server. 00297 // Note that client must be local to 00298 // have authorization to shutdown parent 00299 // server. 00300 // DDS:kStatus = Request for parent server status. Returns 00301 // kOk if alive. 00302 // 00303 // The return status with which the parent server can respond 00304 // to the client may be one of the following: 00305 // DDS::kPermissionDenied == Unauthorized client access denied. 00306 // DDS::kMessageUnknown == Unrecognized message from client. 00307 // DDS::kSystemError == Error occured in system call 00308 // (e.g. fork failed when spawning child server). 00309 // DDS::kSocketError == An error return was received on the 00310 // socket receipt of the client's message. 00311 // DDS::kSaturated == Maximum number of child servers reached. 00312 // DDS::kOk == Parent server status ok or service completed ok. 00313 // 00314 // Arguments: none. 00315 // 00316 // Return: none. 00317 // 00318 // Contact: S. Kasahara 00319 // 00320 // Notes: The DDSParentServer will remain in the Run method until it 00321 // receives the DDS::kShutdown message from a local 00322 // client. 00323 // 00324 00325 while ( !fShutdown ) { 00326 // Wait for new connection to parent server 00327 fTSocket = fTServerSocket -> Accept(); 00328 if ( !fTSocket ) { 00329 MSG("DDS",Msg::kWarning) 00330 << "PS: An error occured in TServerSocket::Accept while attempting to\n" 00331 << "accept a new client connection." << endl; 00332 continue; // loop and try again 00333 } 00334 00335 fClientAddress = fTSocket -> GetInetAddress(); 00336 00337 00338 // Handle connected socket request 00339 if( ( fTSocket -> Recv(fMessageIn) ) > 0 ) { 00340 if ( fMessageIn -> What() != DDS::kStatus ) { 00341 MSG("DDS",Msg::kInfo) << "PS: Connected to new client at " 00342 << fClientAddress.GetHostName() << "/" 00343 << fClientAddress.GetHostAddress() 00344 << "(port " << fClientAddress.GetPort() << ")" 00345 << endl; 00346 } 00347 fMessageOut.Reset(); 00348 00349 if ( !Authorize() ) { 00350 // Client is not authorized to access parent server 00351 fMessageOut.Reset(DDS::kPermissionDenied); 00352 } 00353 else { 00354 00355 switch ( fMessageIn -> What()) { 00356 00357 case DDS::kStatus: 00358 // Request for parentserver status 00359 Status(); 00360 break; 00361 00362 case DDS::kData: 00363 // Request for data 00364 fMessageOut.Reset(SpawnChildServer()); 00365 break; 00366 00367 case DDS::kShutdown: 00368 // Request to shutdown parent server 00369 fMessageOut.Reset(Shutdown()); 00370 break; 00371 00372 default: 00373 MSG("DDS",Msg::kWarning) << "PS: Unknown client message: " 00374 << fMessageIn->What() << " received.\n" 00375 <<"Unable to process client request." << endl; 00376 fMessageOut.Reset(DDS::kMessageUnknown); 00377 break; 00378 } // end of switch block 00379 00380 } // end of fMessageIn processing 00381 00382 delete fMessageIn; fMessageIn = 0; 00383 } 00384 else { 00385 MSG("DDS",Msg::kInfo) << "PS: Connected to new client at " 00386 << fClientAddress.GetHostName() << "/" 00387 << fClientAddress.GetHostAddress() 00388 << "(port " << fClientAddress.GetPort() << ")" 00389 << endl; 00390 // Error return on TSocket::Recv call 00391 MSG("DDS",Msg::kWarning) 00392 << "PS: Error returned from TSocket::Recv of client message.\n" << endl; 00393 fMessageOut.Reset(DDS::kSocketError); 00394 } // end of socket received fMessageIn block 00395 00396 fTSocket -> Send(fMessageOut); // send return status to client 00397 // Client only allowed one chance to send valid message to parent server 00398 // to avoid backlog waiting for one client to get it right; child server 00399 // is more tolerant 00400 delete fTSocket; fTSocket=0; // close and delete TSocket object 00401 CheckChildStatus(); // check status of any exited children 00402 00403 } // while server processing block 00404 00405 } 00406 00407 //_____________________________________________________________________________ 00408 DDS::EMessageType DDSParentServer::Shutdown() { 00409 // Purpose: This Service shuts down the parent server. This routine is 00410 // activated when the DDS::kShutdown message is received in 00411 // DDSParentServer::Run(). 00412 // 00413 // Arguments: none. 00414 // 00415 // Return: DDS::EMessageType containing the return status of this service. 00416 // This service can return one of 2 messages: 00417 // DDS::kOk (service provided ok) 00418 // DDS::kPermissionDenied (client doesn't have 00419 // permission to request shutdown) 00420 // 00421 // Contact: S. Kasahara 00422 // 00423 // Notes: Client must be local (as determined by IsLocalClient()) to have 00424 // authority to shutdown parent server. 00425 00426 DDS::EMessageType msgrc = DDS::kOk; // default return value 00427 00428 if ( IsLocalClient() ) { 00429 fShutdown = true; // stops processing loop in Run 00430 } 00431 else { 00432 MSG("DDS",Msg::kWarning) 00433 << "PS: Nonlocal client at address:\n" << fClientAddress.GetHostName() 00434 << "/" << fClientAddress.GetHostAddress() << "(port " 00435 << fClientAddress.GetPort() << ")\n" 00436 << "has requested shutdown of parent server on host " 00437 << gSystem->GetHostByName(gSystem->HostName()).GetHostName() 00438 << ".\nPermission denied."<< endl; 00439 msgrc = DDS::kPermissionDenied; 00440 } 00441 00442 return msgrc; 00443 00444 } 00445 00446 //_____________________________________________________________________________ 00447 DDS::EMessageType DDSParentServer::Status() { 00448 // Purpose: This Service returns the parent server status. This routine is 00449 // activated when the DDS::kStatus message is received in 00450 // DDSParentServer::Run(). 00451 // 00452 // Arguments: none. 00453 // 00454 // Return: DDS::EMessageType containing the return status of this service. 00455 // This service can return one message: 00456 // DDS::kOk (service provided ok) 00457 // 00458 // Contact: S. Kasahara 00459 // 00460 // Notes: Eventually could send status object back to client. Currently 00461 // always returns okay (e.g. this server is up and listening). 00462 00463 DDS::EMessageType msgrc = DDS::kOk; // default return value 00464 CheckChildStatus(); // first check for any newly returned children 00465 if( fNumChild >= fMaxChild ) { 00466 msgrc = DDS::kSaturated; 00467 } 00468 fMessageOut.Reset(msgrc); 00469 fMessageOut.WriteObject(&fPSStatus); 00470 00471 return msgrc; 00472 00473 } 00474 00475 //_____________________________________________________________________________ 00476 DDS::EMessageType DDSParentServer::SpawnChildServer() { 00477 // Purpose: This Service spawns a child server to handle client request 00478 // for near-online data. This routine is activated when the 00479 // DDS:kData message is received in DDSParentServer::Run(). 00480 // 00481 // Arguments: none. 00482 // 00483 // Return: DDS::EMessageType containing the return status of this 00484 // service. This service can return one of 00485 // 3 messages: 00486 // DDS::kOk (service provided ok) 00487 // DDS::kSaturated (max no. of children reached) 00488 // DDS::kSystemError (fork or exec failed) 00489 // 00490 // Contact: S. Kasahara 00491 // 00492 // Notes: Local client child servers are executed at a higher priority 00493 // than remote clients. A successfully spawned child increments 00494 // the fNumChild counter by 1. 00495 // 00496 00497 DDS::EMessageType msgrc = DDS::kOk; // default return value 00498 00499 DDSClientId* ddsId = dynamic_cast<DDSClientId*> 00500 (fMessageIn -> ReadObject(fMessageIn->GetClass())); 00501 00502 if ( !ddsId ) { 00503 MSG("DDS",Msg::kWarning) 00504 << "PS: SpawnChildServer failed to received DDSClientId from client." 00505 << "\nClient sw likely out-of-date. Refuse to spawn childserver." << endl; 00506 msgrc = DDS::kError; 00507 return msgrc; 00508 } 00509 00510 // First check to see if parent has reached the saturation level with 00511 // the number of supported children. 00512 CheckChildStatus(); // first check for any newly returned children 00513 if( fNumChild == fMaxChild ) { 00514 MSG("DDS",Msg::kWarning) << "PS: Unable to spawn new child per request\n" 00515 << "because number of child servers running (" << fNumChild 00516 << ") is maximum allowed." << endl; 00517 msgrc = DDS::kSaturated; 00518 } 00519 else { 00520 // Fork a child process to handle the new client 00521 int pid; 00522 if( (pid = fork()) < 0) { 00523 // Error in fork, unable to spawn child 00524 MSG("DDS",Msg::kError) 00525 << "PS: Unable to spawn child. Fork failed with error " 00526 << strerror(errno) << endl; 00527 msgrc = DDS::kSystemError; 00528 } 00529 else if ( pid > 0 ) { 00530 // Successful fork, report new child process id 00531 MSG("DDS",Msg::kInfo) << "PS: Spawned child with process id " << pid 00532 << " to handle client request." << endl; 00533 fNumChild++; 00534 00535 // However, wait one second to check to see if execlp failed to start 00536 // ChildServer properly. This is done so that the correct status can be 00537 // reported back to the client if the execlp failed. 00538 gSystem -> Sleep(1000); 00539 int childpid; // pid number of exited child processes 00540 Int_t childstat; // exit status of child process 00541 childpid = waitpid(pid, &childstat, WNOHANG); 00542 if (childpid > 0) { 00543 // Spawned child process exited prematurely 00544 fNumChild--; 00545 msgrc = DDS::kSystemError; 00546 } 00547 else { 00548 // Successfully spawned child. Enter pid in ClientId 00549 ddsId -> Connected(pid); 00550 fPSStatus.SetClientId(pid,ddsId); 00551 MSG("DDS",Msg::kInfo) << fPSStatus << endl; 00552 } 00553 } 00554 00555 if (pid == 0) { 00556 // In the child process. 00557 // Must Close serversocket in child because reference count of 00558 // serversocket is incremented by 1 when child is created. Close 00559 // just decrements reference count of serversocket by 1 (child won't 00560 // use server socket). 00561 fTServerSocket -> Close(); 00562 Int_t sockfd = fTSocket->GetDescriptor(); //connected socket descriptor 00563 char csockfd[20]; 00564 sprintf(csockfd,"%i",sockfd); // convert to character string 00565 // Determine the priority level at which the child server will run. 00566 // Local clients are served at a priority level higher than remote 00567 // clients. 00568 Int_t niceincr = (IsLocalClient()) ? 0 : 5; 00569 char cniceincr[20]; 00570 sprintf(cniceincr,"%i",niceincr); // convert to character string 00571 char cloglevel[20]; 00572 sprintf(cloglevel,"%i",fLogLevel); // convert loglevel to char string 00573 char cmaxinactive[20]; 00574 sprintf(cmaxinactive,"%i",fMaxInactive); // maxinactive to char string 00575 // Pass the program which handles the child the connected socket 00576 // descriptor 00577 execlp("ddschildserver","ddschildserver", csockfd, cniceincr,cloglevel, 00578 cmaxinactive,(char *)0); 00579 // Only reach this point if execlp fails 00580 MSG("DDS",Msg::kError) << "PS: execlp of ChildServer\n" 00581 << "failed with error: " << strerror(errno) << ". Please check that\n" 00582 <<"the location of the ChildServer is in the calling process search PATH.\n" 00583 << endl; 00584 exit(1); 00585 } // end of child processing 00586 00587 } 00588 00589 return msgrc; 00590 00591 } 00592 00593 00594 00595 00596 00597 00598 00599
1.3.9.1