00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "DisplayServer.h"
00024
00025 #include <unistd.h>
00026 #include <iostream>
00027 #include <cstring>
00028
00029 #include "TMessage.h"
00030 #include "TString.h"
00031
00032 #include "MessageStorage.h"
00033 #include "SocketUtils.h"
00034 #include "ServerProtocol.h"
00035
00036 using std::string;
00037 using std::cout;
00038 using std::endl;
00039 using std::cerr;
00040
00041
00042 DisplayServer::DisplayServer(char* host, int consPort, TROOT* rt,
00043 ConsumerList* cl, bool ut) : TNamed("DisplayServer","DisplayServer"),
00044 _hostName(host),
00045 _consumerPort(consPort), _clientPort(9091), _error(DS_OK), _debugFlags(0x0),
00046 _updateTimeDiff(120000), _useTimer(ut),
00047 _ss(0), _conSock(0),
00048 _currInfo(0), _conList(cl), _glRoot(rt), _updateTimer(0)
00049 {
00050
00051 _conSock = new TSocket((TString)(_hostName.c_str()), _consumerPort);
00052
00053 if (_conSock->IsValid()) {
00054 cout << _clName << ": Connected successfully to CONSUMER socket at port "
00055 << _consumerPort << endl;
00056 cout << spaces << "on host " << _hostName << "."
00057 << endl;
00058 }
00059 else {
00060 cerr << _clName << ": Consumer socket at port " << _consumerPort
00061 << " on host\n" << spaces << _hostName << " is not valid!" << endl;
00062 _error = DS_NO_CONNECTION;
00063 }
00064
00065 if (_error < 0) ;
00066 else {
00067 _consMoni.Add(_conSock);
00068 if (openClientServerSocket() < 0) {
00069 cerr << _clName << ": ERROR could not open server socket for clients."
00070 << endl;
00071 }
00072 }
00073
00074 TConsumerInfo* currInfo = new TConsumerInfo("Display Server", 1);
00075 delete currInfo;
00076 currInfo = 0;
00077
00078 }
00079
00080
00081 const string DisplayServer::_clName = string("DisplayServer");
00082
00083
00084 DisplayServer::~DisplayServer()
00085 {
00086 }
00087
00088
00089 int DisplayServer::pollConsumer()
00090 {
00091 static int consSendCycle=0;
00092 static int numConsPoll=0;
00093 static string oldNameString;
00094 TConsumerInfo* newInfo = 0;
00095
00096 if (!_consMoni.GetActive()) {
00097 return 0;
00098 }
00099
00100 TSocket* sock0 = _consMoni.Select(50);
00101 numConsPoll++;
00102 if (sock0 != (TSocket*)-1) {
00103 if (sock0 && _conSock) {
00104 if (sock0->GetInetAddress().GetHostName() !=
00105 _conSock->GetInetAddress().GetHostName()) {
00106 cerr << "DisplayServer::pollConsumer(): Expected consumer "
00107 << "socket to be selected."
00108 << endl;
00109 return -4;
00110 }
00111 }
00112
00113
00114
00115
00116 numConsPoll=0;
00117
00118 SocketUtils sockUt0(_glRoot, &_consMoni, sock0);
00119 string ctrlString = sockUt0.getStringMessage();
00120
00121
00122
00123 if (ctrlString == DspEndConnection) {
00124 string consumerName;
00125 if (_currInfo) consumerName = string(_currInfo->GetTitle());
00126 else consumerName = "Consumer";
00127 cout << _clName << ": INFO: " << consumerName << " sent message: "
00128 << ctrlString << "\n" << spaces
00129 << "==> Going to terminate connection to " << consumerName;
00130 cout << " running on\n"
00131 << spaces << sock0->GetInetAddress().GetHostName() << endl;
00132 sockUt0.closeSocket();
00133
00134 _conList->setStatus(0,ConsumerList::Finished);
00135
00136 sleep(1);
00137 if (_ss) {
00138 _ss->Close();
00139 _glRoot->GetListOfSockets()->Remove(_ss);
00140 delete _ss;
00141 _ss = 0;
00142 }
00143
00144
00145 TIter next(_glRoot->GetListOfSockets());
00146 TSocket* socktmp = 0;
00147 while ((socktmp = (TSocket*)next())) {
00148 socktmp->Send(DspEndConnection.c_str());
00149 sleep(1);
00150 socktmp->Close();
00151 _glRoot->GetListOfSockets()->Remove(socktmp);
00152 }
00153 return 2;
00154 }
00155 else {
00156 if (ctrlString == DspConsumerSend) {
00157 consSendCycle++;
00158
00159
00160 _glRoot->DeleteAll();
00161 TMessage* conmess = 0;
00162
00163
00164 sock0->Recv(conmess);
00165 if (conmess) {
00166 if (conmess->What() == kMESS_OBJECT) {
00167 TObject *rcvobject = (TObject*)conmess->ReadObject(
00168 conmess->GetClass());
00169
00170
00171
00172
00173 if(rcvobject->InheritsFrom("TConsumerInfo")) {
00174 newInfo = (TConsumerInfo*) rcvobject;
00175 if (newInfo) {
00176 delete _currInfo;
00177 _currInfo = newInfo;
00178 }
00179 }
00180 else {
00181 cerr << _clName << ": pollConsumer(): ERROR: "
00182 << "Did not receive the TConsumerInfo!"
00183 << endl;
00184 return -1;
00185 }
00186 }
00187 delete conmess;
00188 }
00189 else {
00190
00191 cerr << _clName << ": pollConsumer(): ERROR: "
00192 << "Didn't receive a valid message." << endl;
00193 return -2;
00194 }
00195
00196
00197 string namesString = sockUt0.getStringMessage();
00198
00199
00200 if (strncmp(namesString.c_str(), "Modified", 8) == 0) {
00201 if (_debugFlags & 0x4) {
00202 cout << _clName << ": pollConsumer(): DEBUG INFO: "
00203 << "Going to edit the storage list." << endl;
00204 }
00205 editStorageList(namesString);
00206 setRequireUpdateBits();
00207 }
00208 else {
00209 if (strncmp(namesString.c_str(), "Unmodified", 10) != 0) {
00210 cerr << _clName << ": pollConsumer(): ERROR: "
00211 << "Expected key word 'Unmodified'." << endl;
00212 return -3;
00213 }
00214 }
00215
00216
00217 if (_currList.GetSize() > 0) {
00218 TListIter storeIter(&_currList);
00219 Int_t recvRes = 0;
00220 while (MessageStorage *mesStor = (MessageStorage*)storeIter()) {
00221 if (sock0) recvRes = sock0->Recv(conmess);
00222 if ((recvRes != -1) && (recvRes != 0)) {
00223 if (conmess->What() == kMESS_OBJECT) {
00224 mesStor->updateMessage(conmess);
00225
00226
00227
00228 }
00229 else {
00230 cerr << _clName << ": pollConsumer(): Expected an object."
00231 << endl;
00232 continue;
00233 }
00234 }
00235 else {
00236 cerr << _clName << ": ERROR reading object messages!" << endl;
00237 cerr << _clName << ": Closing socket!" << endl;
00238 sockUt0.closeSocket();
00239 continue;
00240 }
00241
00242
00243 conmess = 0;
00244 }
00245 }
00246 else {
00247 cerr << _clName << ": pollConsumer(): Storage list empty."
00248 << endl;
00249 }
00250
00251
00252 string endString = sockUt0.getStringMessage();
00253 if (endString != DspConsumerFinish)
00254 cerr << _clName << ": pollConsumer(): ERROR: "
00255 << "Expected different end string. Got " << endString
00256 << endl;
00257
00258 if (consSendCycle == 1) {
00259 updateStateManager();
00260 if (!_updateTimer && _useTimer) {
00261 _updateTimer = new TTimer(this, _updateTimeDiff);
00262 _updateTimer->Reset();
00263 _updateTimer->TurnOn();
00264 }
00265 }
00266 }
00267 else {
00268
00269
00270 }
00271 }
00272 }
00273
00274 return 1;
00275 }
00276
00277
00278 void DisplayServer::connectClient()
00279 {
00280
00281 TSocket* sock1=0;
00282 sock1 = _ss->Accept();
00283 if (sock1 != (TSocket*)-1) {
00284 string clientHostname(sock1->GetInetAddress().GetHostName());
00285 int clientPort = sock1->GetInetAddress().GetPort();
00286 cout << _clName << ": connectClient(): Client connected from "
00287 << clientHostname << " on port " << clientPort << "." << endl;
00288 _clientMoni.Add(sock1);
00289 char portChar[20];
00290 sprintf(portChar, "%d", clientPort);
00291 string id = clientHostname + string(":") + string(portChar);
00292 _clientMap[id] = false;
00293 }
00294
00295 return;
00296 }
00297
00298
00299 void DisplayServer::pollClients()
00300 {
00301 const string myName("DisplayServer::pollClients()");
00302
00303 if (!_clientMoni.GetActive()) return;
00304
00305
00306 TSocket* sock2 = 0;
00307 sock2 = _clientMoni.Select(50);
00308 if (sock2 != (TSocket*)-1) {
00309 SocketUtils sockUt2(_glRoot, &_clientMoni, sock2);
00310 string request = sockUt2.getStringMessage();
00311
00312
00313
00314 if (request.size() == 0) {
00315 cerr << myName << ": request empty" << endl;
00316 return;
00317 }
00318 if (request == DspEndConnection) {
00319 cout << myName << ": Terminating connection to "
00320 << sockUt2.getHostname() << "." << endl;
00321 sockUt2.closeSocket();
00322 #ifndef IRIX6
00323 _clientMap.erase(sockUt2.getId());
00324 #else
00325
00326
00327 map<string,bool>::iterator cm_itr = _clientMap.find(sockUt2.getId());
00328 _clientMap.erase(cm_itr);
00329 #endif
00330 return;
00331 }
00332 if (request == "ConsumerInfo") {
00333 TMessage infMess(kMESS_OBJECT);
00334 infMess.Reset();
00335 infMess.WriteObject(_currInfo);
00336 int sendRes = 0;
00337 sendRes = sock2->Send(infMess);
00338 if (sendRes == -1) {
00339 cerr << myName << "Error sending TConsumerInfo." << endl;
00340 sockUt2.closeSocket();
00341 }
00342 if (_clientMap[sockUt2.getId()]) _clientMap[sockUt2.getId()] = false;
00343 }
00344 else {
00345
00346
00347 if (_clientMap[sockUt2.getId()]) {
00348 Int_t sendRes=0;
00349 sendRes = sock2->Send(DspRequestNewInfo.c_str());
00350 if ( sendRes == -1) {
00351 cerr << myName << ": ERROR: Socket->Send returned "
00352 << "with " << sendRes << ".\n"
00353 << " ==> The socket will be closed."
00354 << endl;
00355
00356 sockUt2.closeSocket();
00357 }
00358 }
00359 else {
00360 Bool_t objFound = kFALSE;
00361 TListIter iter(&_currList);
00362 while (MessageStorage* obj = (MessageStorage*) iter()) {
00363 if (obj->GetName() == request) {
00364 objFound = kTRUE;
00365 TMessage sendMess(kMESS_OBJECT);
00366 TMessage* storeMess = obj->getMessage();
00367 Int_t size = storeMess->BufferSize();
00368 Int_t bufSize = size+200;
00369 char* buffer = new char[bufSize];
00370
00371
00372 storeMess->ReadBuf(buffer,bufSize);
00373 storeMess->Reset();
00374 sendMess.WriteBuf(buffer,bufSize);
00375 Int_t sendRes=0;
00376 sendRes=sock2->Send(sendMess);
00377 if ( sendRes == -1) {
00378 cerr << myName << ": ERROR: pollClients(): "
00379 << "Socket->Send returned with " << sendRes << ".\n"
00380 << " ==> The socket will be closed."
00381 << endl;
00382
00383 sockUt2.closeSocket();
00384 }
00385
00386
00387 delete [] buffer;
00388 continue;
00389 }
00390 }
00391 if (!objFound) {
00392 Int_t sendRes=0;
00393 sendRes = sock2->Send(DspObjectNotFound.c_str());
00394 if ( sendRes == -1) {
00395 cerr << myName << ": ERROR: Socket->Send returned "
00396 << "with " << sendRes << ".\n"
00397 << " ==> The socket will be closed."
00398 << endl;
00399
00400 sockUt2.closeSocket();
00401 }
00402 }
00403 }
00404 }
00405 }
00406
00407 return;
00408 }
00409
00410
00411 TConsumerInfo* DisplayServer::getCurrentInfo()
00412 {
00413 return _currInfo;
00414 }
00415
00416
00417 void DisplayServer::editStorageList(const string& nameString)
00418 {
00419 const string myName("DisplayServer::editStorageList()");
00420 char* help1 = 0;
00421 char* help2 = 0;
00422
00423
00424
00425 if (_debugFlags & 0x8) {
00426 cout << "Outdated Size = " << _outdatedList.GetSize() << " list size = "
00427 << _currList.GetSize() << endl;
00428 }
00429 TListIter iter1(&_currList);
00430 while (MessageStorage* obj = (MessageStorage*) iter1()) {
00431 Bool_t found = kFALSE;
00432
00433
00434
00435 char* cpyName = new char[nameString.size()+1];
00436 strcpy(cpyName, nameString.c_str());
00437 help1 = const_cast<char *>(strchr(cpyName,'$'));
00438 while ((help2 = const_cast<char*>(strtok(help1, "$"))) && (!found)) {
00439 if (obj->GetName() == string(help2)) found = kTRUE;
00440 help1 = 0;
00441 }
00442 if (!found) {
00443
00444
00445 TListIter iterOld(&_outdatedList);
00446 Bool_t oldFound = kFALSE;
00447 while (MessageStorage* oldObj = (MessageStorage*) iterOld()) {
00448 if (oldObj->GetName() == obj->GetName()) {
00449 oldObj->updateMessage(obj->getMessage());
00450 oldFound = kTRUE;
00451 continue;
00452 }
00453 }
00454 if (!oldFound) {
00455 cout << _clName << "::editStorageList(): INFO: The object '"
00456 << obj->GetName() << "' was removed\nfrom the object list"
00457 << " by the consumer.\nThe last copy is kept by the Display"
00458 << " Server." << endl;
00459 _outdatedList.AddLast(obj);
00460 }
00461 }
00462 }
00463 _currList.Delete();
00464 if (_currList.GetSize() > 0) {
00465 cerr << myName << ": list size should be 0." << endl;
00466 }
00467
00468
00469
00470
00471 string cpy2Name(nameString);
00472 help1 = const_cast<char*>(strchr(cpy2Name.c_str(),'$'));
00473 while ((help2 = const_cast<char*>(strtok(help1, "$")))) {
00474 if (_debugFlags & 0x2) {
00475 cout << myName << ": Found String: " << help2 << endl;
00476 }
00477 MessageStorage *storage = new MessageStorage(help2);
00478 _currList.AddLast(storage);
00479 help1 = 0;
00480 }
00481
00482 return;
00483 }
00484
00485
00486 void DisplayServer::updateStateManager()
00487 {
00488
00489
00490 const string myName("DisplayServer::updateStateManager()");
00491
00492 if (_currInfo) {
00493 _conList->addEntry(_currInfo->GetTitle(), (TString)(_hostName.c_str()),
00494 _clientPort, _currInfo->nevents(), _currInfo->runnumber(),
00495 ConsumerList::Running);
00496
00497 if (_debugFlags & 0x10)
00498 cout << myName << ": Consumer information will be sent to\n"
00499 << spaces << "the State Manager running on b0dap30.fnal.gov" << endl;
00500
00501
00502 }
00503 else {
00504 cerr << myName << ": Consumer Info not available." << endl;
00505 _conList->addEntry("Unknown", (TString)(_hostName.c_str()), _clientPort,
00506 0, 0, ConsumerList::Running);
00507
00508 }
00509
00510 return;
00511 }
00512
00513
00514 Bool_t DisplayServer::HandleTimer(TTimer* )
00515 {
00516 updateStateManager();
00517
00518 return kTRUE;
00519 }
00520
00521
00522 void DisplayServer::setRequireUpdateBits()
00523 {
00524 map<string,bool>::iterator it;
00525 for (it=_clientMap.begin(); it!=_clientMap.end(); ++it) {
00526
00527
00528 (*it).second = true;
00529 }
00530 return;
00531 }
00532
00533
00534 int DisplayServer::openClientServerSocket()
00535 {
00536 const string myName("DisplayServer::openClientServerSocket()");
00537
00538 int rtVal = DS_OK;
00539
00540
00541 _ss = new TServerSocket(_clientPort, kTRUE);
00542
00543 while (! _ss->IsValid()) {
00544 delete _ss;
00545 _clientPort++;
00546 _ss = new TServerSocket(_clientPort, kTRUE);
00547 }
00548 if (!(_ss->IsValid())) {
00549 cout << myName << ": server socket not valid." << endl;
00550 _error = DS_BAD_CLIENT_SSOCKET;
00551 rtVal = DS_BAD_CLIENT_SSOCKET;
00552 }
00553 else {
00554 cout << barrier << "\n\n"
00555 << _clName << ": Server socket for HISTO DISPLAY clients at PORT "
00556 << _clientPort << endl;
00557 cout << spaces << "available.\n\n"
00558 << spaces << "Use " << _hostName << ":" << _clientPort << endl;
00559 cout << spaces << "as input stream to connect from the HistoDisplay" << endl;
00560 cout << spaces << "to the Display Server.\n\n"
00561 << barrier << "\n"
00562 << endl;
00563
00564 _ss->SetOption(kNoBlock,1);
00565 }
00566
00567 return rtVal;
00568 }
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579