#include <DDSChildServer.h>
Public Member Functions | |
| DDSChildServer (Int_t sockfd, Int_t maxinactive) | |
| virtual | ~DDSChildServer () |
| bool | IsValid () const |
| std::ostream & | Print (std::ostream &ms) const |
| Int_t | Run () |
Private Member Functions | |
| void | Get () |
| void | GoToFile () |
| bool | IsClientConnected () const |
| void | Next () |
| void | Shutdown () |
| void | Subscribe () |
Private Attributes | |
| TSocket * | fTSocket |
| DDSSubscription * | fSubscription |
| DDSFileHandler * | fFileHandler |
| bool | fShutdown |
| int | fPid |
| MomNavigator * | fMom |
| TMessage * | fMessageIn |
| TMessage | fMessageOut |
| PerInputStreamManager | fInputStreamManager |
| TInetAddress | fServerAddress |
| TInetAddress | fClientAddress |
| Int_t | fMaxInactive |
|
||||||||||||
|
Definition at line 52 of file DDSChildServer.cxx. References gSystem(), and MSG. 00052 : fTSocket(0), 00053 fSubscription(0), fFileHandler(0), fShutdown(false), fPid(0), fMom(0), 00054 fMessageIn(0), fMaxInactive(maxinactive) { 00055 // Purpose: Normal constructor for DDSChildServer object. This constructor 00056 // creates a ROOT TSocket attached to the connected socket 00057 // descriptor sockfd (previously established and authorized 00058 // by the parent server). 00059 // 00060 // Argument: sockfd connected socket descriptor. 00061 // 00062 // Return: n/a. 00063 // 00064 // Contact: S. Kasahara 00065 // 00066 // Notes: Use IsValid() to check if the DDSChildServer was created 00067 // successfully. 00068 // 00069 00070 // Get process id of child server for identification purposes 00071 fPid = gSystem -> GetPid(); 00072 00073 // Create ROOT TSocket from connected socket descriptor 00074 fTSocket = new TSocket(sockfd); 00075 00076 if (!fTSocket || !fTSocket -> IsValid()) { 00077 // An error occured during the creation of the TSocket 00078 MSG("DDS",Msg::kWarning) << "CS_" << fPid 00079 << ": Unable to create child server socket with socket descriptor " 00080 << sockfd << "." << endl; 00081 if (fTSocket) delete fTSocket; fTSocket = 0; 00082 fShutdown = true; 00083 } 00084 else { 00085 fServerAddress = fTSocket -> GetLocalInetAddress(); 00086 fClientAddress = fTSocket -> GetInetAddress(); 00087 } 00088 00089 fInputStreamManager.SetUpdateMode(true); 00090 fInputStreamManager.SetPrintOpt("brief"); 00091 00092 // Remove default plugin (TXNetFile) for "root:" prefaced file so that 00093 // TNetFile will be used to connect to rootd server used with dispatcher 00094 // This was done beginning with root v5.10/00, but apparently should 00095 // no longer be done beginning with root v5.16/00. 00096 if ( gROOT -> GetVersionInt() < 51600 ) 00097 gROOT -> GetPluginManager() -> RemoveHandler("TFile","^root:"); 00098 // Default subscription 00099 fSubscription = new DDSSubscription(); 00100 fFileHandler = new DDSFileHandler(fSubscription->GetDataSource(), 00101 fSubscription->IsOffLine()); 00102 00103 }
|
|
|
Definition at line 105 of file DDSChildServer.cxx. References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMom, fSubscription, fTSocket, and IsValid(). 00105 {
00106 // Purpose: Destructor.
00107 //
00108 // Argument: n/a.
00109 //
00110 // Return: n/a.
00111 //
00112 // Contact: S. Kasahara
00113 //
00114
00115 fInputStreamManager.CloseStream();
00116 if ( IsValid() ) delete fTSocket; fTSocket = 0;
00117 if ( fSubscription ) delete fSubscription; fSubscription = 0;
00118 if ( fFileHandler ) delete fFileHandler; fFileHandler = 0;
00119 if ( fMom ) delete fMom; fMom = 0;
00120 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00121
00122 }
|
|
|
Definition at line 124 of file DDSChildServer.cxx. References MomNavigator::AdoptFragment(), fInputStreamManager, fMessageOut, fMom, MomNavigator::FragmentIter(), PerInputStreamManager::Get(), and RecMinos::GetTempTags(). Referenced by Next(). 00124 {
00125 // Purpose: For now a private method to respond to a request to load
00126 // a the current record set from the stream manager.
00127 //
00128 // Argument: none.
00129 //
00130 // Return: none.
00131 //
00132 // Contact: S. Kasahara
00133 //
00134
00135 // If very first call to Next(), need to create Mom container
00136 if (!fMom) fMom = new MomNavigator();
00137
00138 // Load current record set in Mom
00139 fInputStreamManager.Get(fMom);
00140 // Because fTempTags stored in records won't survive the i/o trip
00141 // need to unpack them here and repack them on the other side
00142 TIter riter = fMom->FragmentIter();
00143 RecMinos* record = 0;
00144 while ( (record = dynamic_cast<RecMinos*>(riter.Next())) ) {
00145 Registry* temptags = new Registry(record->GetTempTags()); // get copy
00146 fMom->AdoptFragment(temptags); // adopted by mom
00147 }
00148 // Load retrieved mom object into fMessageOut for shipping
00149 fMessageOut.Reset(DDS::kOk);
00150 fMessageOut.WriteObject(fMom);
00151 return;
00152
00153 }
|
|
|
Definition at line 155 of file DDSChildServer.cxx. References fFileHandler, fMessageOut, fPid, DDSFileHandler::GoToFile(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), IsValid(), and MSG. Referenced by Run(). 00155 {
00156 // Purpose: This Service responds to a request from the client to
00157 // advance to a user-specified filename.
00158 //
00159 // Argument: none.
00160 //
00161 // Return: none.
00162 //
00163 // Contact: S. Kasahara
00164 //
00165
00166 if ( !fFileHandler || !fFileHandler -> IsValid()) {
00167 fMessageOut.Reset(DDS::kFileError);
00168 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00169 << " GoToFile called but DDSFileHandler is InValid." << endl;
00170 return;
00171 }
00172
00173 // Retrieve filename from fMessageIn
00174 char filename[512];
00175 (*fMessageIn) >> filename;
00176 std::string newfilename;
00177 if ( !strncmp(filename,"next",4) ) {
00178 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToNextFile requested."
00179 << endl;
00180 newfilename = fFileHandler->GoToNextFile();
00181 }
00182 else if ( !strncmp(filename,"symlink",7) ) {
00183 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToSymLinkFile requested."
00184 << endl;
00185 newfilename = fFileHandler->GoToSymLinkFile();
00186 }
00187 else {
00188 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " GoToFile " << filename
00189 << " requested." << endl;
00190 newfilename = fFileHandler->GoToFile(filename);
00191 }
00192
00193 if ( !newfilename.empty() ) {
00194 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00195 << " PerInputStreamManager status before closing old file:\n"
00196 << fInputStreamManager;
00197 fInputStreamManager.CloseFile();
00198 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to " << newfilename
00199 << "." << endl;
00200 if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) {
00201 fMessageOut.Reset(DDS::kFileError);
00202 }
00203 else {
00204 fMessageOut.Reset(DDS::kOk);
00205 fMessageOut << newfilename.c_str();
00206 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00207 << " PerInputStreamManager status after opening new file:\n"
00208 << fInputStreamManager;
00209 }
00210 }
00211 else {
00212 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00213 << " Failed to retrieve new filename." << endl;
00214 fMessageOut.Reset(DDS::kFileError);
00215 }
00216
00217 return;
00218
00219 }
|
|
|
Definition at line 221 of file DDSChildServer.cxx. References fTSocket, gSystem(), and len. Referenced by Next(), and Run(). 00221 {
00222 // Purpose: Check client/server socket connection to make sure that client
00223 // has not departed abruptly.
00224 //
00225 // Argument: none.
00226 //
00227 // Return: true/false.
00228 //
00229 // Contact: S. Kasahara
00230 //
00231
00232 TSystem::ResetErrno();
00233 if (!fTSocket || !fTSocket->IsValid()) return false;
00234
00235 Int_t n;
00236 UInt_t len;
00237 Int_t savevalue;
00238 fTSocket->GetOption(kNoBlock,savevalue);
00239 fTSocket->SetOption(kNoBlock,1); // set no block
00240 Int_t socketdescriptor = fTSocket->GetDescriptor();
00241 n = gSystem->RecvRaw(socketdescriptor, &len, sizeof(UInt_t),kPeek);
00242 fTSocket->SetOption(kNoBlock,savevalue); // reset noblock option
00243
00244 if ( n <= 0 && n != -4) return false; // socket error
00245 return true; // connection is valid
00246
00247 }
|
|
|
Definition at line 42 of file DDSChildServer.h. Referenced by GoToFile(), main(), Next(), Print(), and ~DDSChildServer(). 00042 { return (fTSocket != (TSocket*)0) ? true : false; }
|
|
|
Definition at line 249 of file DDSChildServer.cxx. References DDS::AsString(), MomNavigator::Clear(), fFileHandler, fInputStreamManager, fMessageOut, fMom, fPid, fSubscription, Get(), PerInputStreamManager::GetCurrentVld(), PerStream::GetFullFilePathName(), PerInputStreamManager::GetLastEntryVld(), PerFileManager::GetOpenedFile(), VldTimeStamp::GetSec(), PerStreamManager::GetStreamMap(), DDSFileHandler::GetSymLinkTargetName(), VldContext::GetTimeStamp(), DDSFileHandler::GoToNextFile(), DDSFileHandler::GoToSymLinkFile(), gSystem(), PerFileManager::Instance(), IsClientConnected(), PerInputStreamManager::IsFileEnd(), IsValid(), PerInputStreamManager::IsValidSelectionString(), MSG, DDSFileHandler::NewFileAvailable(), PerInputStreamManager::Next(), PerInputStreamManager::Previous(), PerInputStreamManager::SetFileEnd(), and timer(). Referenced by Run(). 00249 {
00250 // Purpose: This Service responds to a request from the client to
00251 // send the next available entry satisfying the client's
00252 // subscription. This method is activated when the DDS::kNext
00253 // message is received in DDSChildServer::Run().
00254 //
00255 // Argument: none.
00256 //
00257 // Return: none.
00258 //
00259 // Contact: S. Kasahara
00260 //
00261
00262 // Client must have submitted subscription first
00263 if ( !fSubscription ) {
00264 fMessageOut.Reset(DDS::kError);
00265 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00266 << " Next called but no subscription has been submitted." << endl;
00267 return;
00268 }
00269
00270 if ( !fFileHandler || !fFileHandler -> IsValid()) {
00271 fMessageOut.Reset(DDS::kFileError);
00272 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00273 << " Next called but DDSFileHandler is InValid." << endl;
00274 return;
00275 }
00276
00277 // If very first call to Next(), need to create Mom container
00278 if (!fMom) fMom = new MomNavigator();
00279
00280 // Retrieve waittime (secs) and advanceby from fMessageIn and start timer
00281 UInt_t waittime;
00282 (*fMessageIn) >> waittime;
00283 UInt_t advanceby;
00284 (*fMessageIn) >> advanceby;
00285 TStopwatch timer; timer.Start();
00286 bool isNewSet = false; // at least one new record set available
00287
00288 Int_t keepUpWindow = fSubscription -> GetKeepUpWindow();
00289 while ( timer.RealTime() < waittime ) {
00290 timer.Continue();
00291 fMom->Clear(); // clear mom object
00292
00293 DDS::EKeepUpMode keepupmode = fSubscription -> GetKeepUpMode();
00294 if ( (keepupmode == DDS::kFileKeepUp || keepupmode == DDS::kRecordKeepUp)
00295 && fFileHandler->NewFileAvailable() ) {
00296 // Set new file for managed streams
00297 std::string newfilename = fFileHandler->GoToSymLinkFile();
00298 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00299 << " Next detected NewFileAvailable and keepupmode is "
00300 << DDS::AsString(keepupmode) << ", called GoToSymLinkFile." << endl;
00301 if ( !newfilename.empty() ) {
00302 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00303 << " PerInputStreamManager status before closing old file:\n"
00304 << fInputStreamManager;
00305 fInputStreamManager.CloseFile();
00306 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Advancing to "
00307 << newfilename << "." << endl;
00308 if(!fInputStreamManager.SetFile("*",newfilename,Per::kRead)) {
00309 fMessageOut.Reset(DDS::kFileError);
00310 return;
00311 }
00312 else {
00313 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00314 << " PerInputStreamManager status after opening new file:\n"
00315 << fInputStreamManager;
00316 }
00317 }
00318 else {
00319 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00320 << " Failed to retrieve new filename." << endl;
00321 }
00322 }
00323 if ( !fInputStreamManager.IsValidSelectionString() ) {
00324 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00325 << " Has invalid subscription selection string." << endl;
00326 fMessageOut.Reset(DDS::kInvalidSelection);
00327 return;
00328 }
00329
00330 if ( fInputStreamManager.Next(0,advanceby) > 0 ) {
00331 isNewSet = true;
00332 if ( keepupmode != DDS::kRecordKeepUp ) {
00333 // Load current record set in Mom
00334 this -> Get();
00335 return;
00336 }
00337 else {
00338 Int_t lastTime = (Int_t)
00339 (fInputStreamManager.GetLastEntryVld().GetTimeStamp().GetSec());
00340 Int_t currentTime = (Int_t)
00341 (fInputStreamManager.GetCurrentVld().GetTimeStamp().GetSec());
00342 if ( lastTime - currentTime <= keepUpWindow ) {
00343 this -> Get();
00344 return;
00345 }
00346 }
00347 }
00348 else {
00349 // Determine if file has reached end or has been aborted
00350 if ( fFileHandler->NewFileAvailable() &&
00351 !fInputStreamManager.IsFileEnd() ) {
00352 std::string currentfilename = "";
00353 const PerStreamManager::StreamMap& sm_mgr
00354 = fInputStreamManager.GetStreamMap();
00355 PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin();
00356 const PerInputStream* instream = 0;
00357 if ( itr_mgr != sm_mgr.end() ) instream
00358 = dynamic_cast<PerInputStream*>(itr_mgr->second);
00359 if (instream) currentfilename = instream->GetFullFilePathName();
00360 PerFileManager& filemgr = PerFileManager::Instance();
00361 if ( !((filemgr.GetOpenedFile(currentfilename))->HasFileEndKey()) ) {
00362 std::string symlinktargetname = fFileHandler->GetSymLinkTargetName();
00363 if ( symlinktargetname != currentfilename ) {
00364 MSG("DDS",Msg::kDebug) << "CS_" << fPid << " Current file "
00365 << currentfilename << "\n not closed but symlink currentfile "
00366 << symlinktargetname << "\n has moved on."
00367 << " Assume abort, finish current file, and move to next file."
00368 << endl;
00369 fInputStreamManager.SetFileEnd();
00370 }
00371 }
00372 }
00373 else if ( fInputStreamManager.IsFileEnd() ){
00374 // All streams have reached end and writer has closed file
00375 // Attempt to update file for all managed streams
00376 std::string fullfilepathname="";
00377 if ( keepupmode == DDS::kAll) {
00378 fullfilepathname = fFileHandler->GoToNextFile();
00379 if ( !fullfilepathname.empty() ) {
00380 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00381 << " Next call to GoToNextFile retrieved file\n\t"
00382 << fullfilepathname << "." << endl;
00383 }
00384 }
00385 else if ( fFileHandler->NewFileAvailable() ) {
00386 fullfilepathname = fFileHandler->GoToSymLinkFile();
00387 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00388 << " Next call to GoToSymLinkFile retrieved filename\n\t"
00389 << fullfilepathname << "." << endl;
00390 }
00391 if (!fullfilepathname.empty()) {
00392 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00393 << " PerInputStreamManager status before closing old file:\n"
00394 << fInputStreamManager;
00395 fInputStreamManager.CloseFile();
00396 if(!fInputStreamManager.SetFile("*",fullfilepathname,Per::kRead)) {
00397 fMessageOut.Reset(DDS::kFileError);
00398 return;
00399 }
00400 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00401 << " PerInputStreamManager status after opening new file:\n"
00402 << fInputStreamManager;
00403 }
00404 else {
00405 if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) {
00406 // File end was reached. Must back up a set and then Get
00407 fInputStreamManager.Previous(0,advanceby);
00408 this -> Get();
00409 return;
00410 }
00411 // Sleep for 1 sec while we wait for new file
00412 gSystem->Sleep(1000);
00413 // After waking up, test socket connection to make sure client is
00414 // still there.
00415 if ( !IsClientConnected() ) {
00416 fMessageOut.Reset(DDS::kSocketError);
00417 return;
00418 }
00419 }
00420 }
00421 else {
00422 if ( keepupmode == DDS::kRecordKeepUp && isNewSet ) {
00423 // don't wait, retrieve most recent new set available
00424 this -> Get();
00425 return;
00426 }
00427 // Sleep for 1 sec while we wait for updated tree
00428 gSystem->Sleep(1000);
00429 // After waking up, test socket connection to make sure client is
00430 // still there.
00431 if ( !IsClientConnected() ) {
00432 fMessageOut.Reset(DDS::kSocketError);
00433 return;
00434 }
00435 }
00436 }
00437 }
00438 timer.Stop();
00439 // Next reached "TimeOut", find out if its because it was waiting for
00440 // a new record from the current file or because it was waiting for a new
00441 // file.
00442 if ( !fInputStreamManager.IsFileEnd() ) {
00443 fMessageOut.Reset(DDS::kTimeoutNewRecord);
00444 }
00445 else {
00446 fMessageOut.Reset(DDS::kTimeoutNewFile);
00447 }
00448
00449 return;
00450
00451 }
|
|
|
Definition at line 453 of file DDSChildServer.cxx. References fClientAddress, fPid, and IsValid(). Referenced by operator<<(). 00453 {
00454 // Purpose: Print DDSChildServer status on std::ostream.
00455 //
00456 // Argument: ms std::ostream to print on.
00457 //
00458 // Return: std::ostream reference.
00459 //
00460 // Contact: S. Kasahara
00461 //
00462
00463 if ( IsValid() ) {
00464 ms << "CS_" << fPid << ": connected to client at "
00465 // Print the client's host name, address, and port number
00466 << fClientAddress.GetHostName() <<"/"<< fClientAddress.GetHostAddress()
00467 << "(port " << fClientAddress.GetPort() << ")." << endl;
00468 }
00469 else {
00470 ms << "CS_" << fPid << ": childserver socket is not connected." << endl;
00471 }
00472
00473 return ms;
00474
00475 }
|
|
|
Definition at line 477 of file DDSChildServer.cxx. References fMaxInactive, fMessageIn, fMessageOut, fPid, fShutdown, fTSocket, GoToFile(), gSystem(), IsClientConnected(), MSG, Next(), Shutdown(), Subscribe(), and timer(). 00477 {
00478 // Purpose: This is the main method of the DDSChildServer. It listens
00479 // for and responds to client service requests. Each request
00480 // received (of type DDS::EMessageType) is responded to in
00481 // return with a single message of type DDS::EMessageType sent
00482 // to the client upon completion of servicing the request.
00483 // In this way the client can remain in sync with the child
00484 // server's processing of its requests.
00485 //
00486 // The services which the client can currently request from the
00487 // child server are:
00488 // DDS::kGoToFile == Request to advance to new file.
00489 // DDS::kNext == Send next entry satisfying subscription.
00490 // DDS::kShutdown == Request to shutdown child server.
00491 // DDS::kSubscribe == Request to submit new subscription.
00492 //
00493 // The return status with which the child server can respond to
00494 // the client are those returned by the service methods:
00495 // GoToFile(),Next(),Shutdown(), and Subscribe() as well as:
00496 // DDS::kMessageUnknown == Unrecognized message received.
00497 // DDS::kSocketError == An error return was received on the
00498 // socket receipt of the client's message.
00499 //
00500 // Argument: none.
00501 //
00502 // Return: Return code = 0 => normal finish
00503 // = 1 => socket error caused premature end
00504 //
00505 // Contact: S. Kasahara
00506 //
00507 // Notes: The DDSChildServer will remain in the Run method until it receives
00508 // the DDS::kShutdown message from its client.
00509 //
00510
00511 Int_t rc = 0;
00512
00513 TStopwatch timer;
00514 timer.Start();
00515
00516 while ( !fShutdown ) {
00517 // Wait for new message from client
00518 fTSocket -> SetOption(kNoBlock,1);
00519 Int_t rettype = fTSocket->Recv(fMessageIn);
00520 fTSocket -> SetOption(kNoBlock,0);
00521 if ( rettype != -4 ) {
00522 timer.Reset();
00523 if ( rettype > 0 ) {
00524 switch ( fMessageIn -> What() ) {
00525
00526 case DDS::kGoToFile:
00527 // Request to send next entry satisfying subscription
00528 GoToFile();
00529 break;
00530
00531 case DDS::kNext:
00532 // Request to send next entry satisfying subscription
00533 Next();
00534 break;
00535
00536 case DDS::kShutdown:
00537 // Request to shutdown child server
00538 Shutdown();
00539 break;
00540
00541 case DDS::kSubscribe:
00542 // Request to submit new subscription
00543 Subscribe();
00544 break;
00545
00546 default:
00547 MSG("DDS",Msg::kWarning)<< "CS_"<< fPid << ": Unknown client message: "
00548 << fMessageIn -> What() << " received.\n"
00549 << "Unable to process client request." << endl;
00550 fMessageOut.Reset(DDS::kMessageUnknown);
00551 break;
00552 } // end of switch block
00553
00554 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00555
00556 } // end of socket received message block
00557 else {
00558 // Error return on TSocket::Recv call
00559 fShutdown = true;
00560 delete fTSocket; fTSocket = 0;
00561 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00562 << ": Socket error detected on TSocket::Recv." <<
00563 "\n ChildServer will be shutdown." << endl;
00564 rc = 1;
00565 }
00566
00567 if ( fTSocket && IsClientConnected() ) {
00568 // send return status & objects to client
00569 if ( fTSocket -> Send(fMessageOut) < 0 ) {
00570 // An error occurred while sending message
00571 if (errno == EPIPE) {
00572 // Broken pipe between child server and client is fatal
00573 fShutdown = true;
00574 delete fTSocket; fTSocket = 0;
00575 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00576 << ": Socket error detected on TSocket::Send."
00577 << "\n ChildServer will be shutdown." << endl;
00578 rc = 1;
00579 }
00580 }
00581 }
00582 else if (!fShutdown) {
00583 // Error detected in socket connection
00584 fShutdown = true;
00585 delete fTSocket; fTSocket = 0;
00586 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00587 << ": Socket error detected indicating client has disconnected.\n"
00588 << " ChildServer will be shutdown."<< endl;
00589 rc = 1;
00590 }
00591 }
00592 else {
00593 if ( timer.RealTime() > fMaxInactive ) {
00594 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00595 << ": Inactivity time limit of "
00596 << fMaxInactive << "(sec) reached.\n"
00597 << "ChildServer will be shutdown." << endl;
00598 Shutdown();
00599 fMessageOut.Reset(DDS::kInactive);
00600 rc = 2;
00601 if ( fTSocket && IsClientConnected() ) fTSocket->Send(fMessageOut);
00602 }
00603 else {
00604 timer.Continue();
00605 gSystem->Sleep(10); // sleep 10 msec so as to avoid excessive cpu
00606 }
00607 }
00608 } // End of loop over client requests
00609
00610 return rc;
00611
00612 }
|
|
|
Definition at line 614 of file DDSChildServer.cxx. References PerInputStreamManager::CloseStream(), fMessageOut, fPid, fShutdown, and MSG. Referenced by Run(). 00614 {
00615 // Purpose: This Service shuts down the child server. This method is
00616 // activated when the DDS::kShutdown message is received in
00617 // DDSChildServer::Run().
00618 //
00619 // Argument: none.
00620 //
00621 // Return: none.
00622 //
00623 // Contact: S. Kasahara
00624 //
00625
00626 fShutdown = true; // stops processing loop in Run
00627 MSG("DDS",Msg::kVerbose) << "CS_" << fPid
00628 << " PerInputStreamManager status before shutdown:\n"
00629 << fInputStreamManager;
00630 fInputStreamManager.CloseStream(); // close all managed streams
00631 fMessageOut.Reset(DDS::kOk);
00632
00633 return;
00634
00635 }
|
|
|
Definition at line 637 of file DDSChildServer.cxx. References PerInputStreamManager::CloseStream(), fFileHandler, fInputStreamManager, fMessageIn, fMessageOut, fPid, fSubscription, DDSFileHandler::GetCurrentFileName(), PerInputStreamManager::GetOpenedStream(), PerStreamManager::GetStreamMap(), MSG, PerInputStreamManager::OpenStream(), DDSSubscription::Reset(), and PerInputStreamManager::SetMaxSyncDelay(). Referenced by Run(). 00637 {
00638 // Purpose: This Service receives and processes a new subscription from
00639 // the client. This method is activated when the DDS::kSubscribe
00640 // message is received in DDSChildServer::Run().
00641 //
00642 // Argument: none.
00643 //
00644 // Return: none.
00645 //
00646 // Contact: S. Kasahara
00647 //
00648
00649 // If previous subscription exists, must delete it
00650 if ( fSubscription) delete fSubscription; fSubscription = 0;
00651
00652 // Attempt to receive DDSSubscription object from fMessageIn
00653 fSubscription = dynamic_cast<DDSSubscription*>
00654 (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00655 if ( !fSubscription ) {
00656 MSG("DDS",Msg::kWarning) << "CS_" << fPid
00657 << "\nSubscribe failed to receive expected DDSSubscription from client."
00658 << endl;
00659 fMessageOut.Reset(DDS::kError);
00660 }
00661 else {
00662 // Subscription received successfully
00663 MSG("DDS",Msg::kDebug) << "CS_" << fPid
00664 << " Subscribe received subscription:\n"
00665 << fSubscription;
00666
00667 fMessageOut.Reset(DDS::kOk);
00668 if (fFileHandler && ( (fSubscription -> GetDataSource())
00669 !=(fFileHandler -> GetDataSource())
00670 || (fSubscription -> IsOffLine())
00671 !=(fFileHandler -> IsOffLine()) ) ) {
00672 delete fFileHandler; fFileHandler=0;
00673 }
00674 if (!fFileHandler)
00675 fFileHandler = new DDSFileHandler(fSubscription -> GetDataSource(),
00676 fSubscription -> IsOffLine());
00677 // Determine current file from filehandler
00678 std::string currentFileName = fFileHandler->GetCurrentFileName();
00679
00680 // Open new streams in subscription list
00681 PerInputStream* instream;
00682 const DDSSubscription::StreamMap& sm_sub = fSubscription->GetStreamMap();
00683 for (DDSSubscription::StreamMapConstItr itr_sub = sm_sub.begin();
00684 itr_sub!= sm_sub.end();++itr_sub) {
00685 string streamName = (itr_sub->first).Data();
00686 if ( !(instream = dynamic_cast<PerInputStream*>
00687 (fInputStreamManager.GetOpenedStream(streamName))) ) {
00688 // For DDS application, streamname & treename are the same;
00689 instream = fInputStreamManager.OpenStream(streamName,streamName);
00690 if (!currentFileName.empty())instream -> SetFile(currentFileName);
00691 }
00692 if ( instream ) instream -> SetSelection((itr_sub->second).Data());
00693 }
00694
00695 fInputStreamManager.SetMaxSyncDelay(fSubscription -> GetMaxSyncDelay());
00696
00697 // Finally, close all streams not on subscription list
00698 const PerStreamManager::StreamMap& sm_mgr
00699 = fInputStreamManager.GetStreamMap();
00700 for ( PerStreamManager::StreamMapConstItr itr_mgr = sm_mgr.begin();
00701 itr_mgr!= sm_mgr.end(); ++itr_mgr) {
00702 std::string streamName = itr_mgr->first;
00703 DDSSubscription::StreamMapConstItr itr_sub
00704 = sm_sub.find(streamName.c_str());
00705 if (itr_sub == sm_sub.end())fInputStreamManager.CloseStream(streamName);
00706 }
00707 }
00708
00709 return;
00710
00711 }
|
|
|
Definition at line 72 of file DDSChildServer.h. Referenced by Print(). |
|
|
Definition at line 61 of file DDSChildServer.h. Referenced by GoToFile(), Next(), Subscribe(), and ~DDSChildServer(). |
|
|
Definition at line 70 of file DDSChildServer.h. Referenced by Get(), Next(), Subscribe(), and ~DDSChildServer(). |
|
|
Definition at line 73 of file DDSChildServer.h. Referenced by Run(). |
|
|
Definition at line 68 of file DDSChildServer.h. Referenced by Run(), Subscribe(), and ~DDSChildServer(). |
|
|
Definition at line 69 of file DDSChildServer.h. Referenced by Get(), GoToFile(), Next(), Run(), Shutdown(), and Subscribe(). |
|
|
Definition at line 67 of file DDSChildServer.h. Referenced by Get(), Next(), and ~DDSChildServer(). |
|
|
Definition at line 63 of file DDSChildServer.h. Referenced by GoToFile(), Next(), Print(), Run(), Shutdown(), and Subscribe(). |
|
|
Definition at line 71 of file DDSChildServer.h. |
|
|
Definition at line 62 of file DDSChildServer.h. Referenced by Run(), and Shutdown(). |
|
|
Definition at line 60 of file DDSChildServer.h. Referenced by Next(), Subscribe(), and ~DDSChildServer(). |
|
|
Definition at line 59 of file DDSChildServer.h. Referenced by IsClientConnected(), Run(), and ~DDSChildServer(). |
1.3.9.1