00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00013
00014 #include <errno.h>
00015 #include "TClass.h"
00016 #include "TSocket.h"
00017 #include "MessageService/MsgService.h"
00018 #include "MinosObjectMap/MomNavigator.h"
00019
00020 #include "Dispatcher/DDSClientId.h"
00021 #include "Dispatcher/DDSClient.h"
00022 #include "Dispatcher/DDSSubscription.h"
00023 #include "Dispatcher/DDSPSStatus.h"
00024
00025 #include "Record/RecMinos.h"
00026
00027 std::ostream& operator << (std::ostream& ms, DDSClient* cs) {
00028 return cs->Print(ms); }
00029
00030 ClassImp(DDSClient)
00031
00032
00033
00034
00035 CVSID("$Id: DDSClient.cxx,v 1.16 2007/08/24 05:23:48 schubert Exp $");
00036
00037
00038
00039
00040 DDSClient::DDSClient(const char* serverhost, UInt_t serverport,
00041 DDS::EMessageType msgrequest,
00042 DDS::EClientType clienttype, string clientname) :
00043 fStatus(DDS::kSocketError), fTSocket(0),
00044 fSubscription(0), fMessageIn(0), fClientId(0),
00045 fPSStatus(0) {
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 MsgStream* msdds = MsgService::Instance() -> GetStream("DDS");
00078 for ( int ilvl = Msg::kMinLogLevel ; ilvl < Msg::kNLogLevel; ilvl++ ) {
00079 msdds -> AddFormat(ilvl,Msg::kTime);
00080 }
00081
00082
00083 if ( !serverport ) {
00084
00085
00086 fTSocket = new TSocket(serverhost,"MinosDDS");
00087 }
00088 else {
00089 fTSocket = new TSocket(serverhost,serverport);
00090 }
00091
00092 if ( !fTSocket || !fTSocket -> IsValid() ) {
00093
00094 if ( serverport ) {
00095 MSG("DDS",Msg::kWarning)
00096 << "Unable to create socket connected to server host\n"
00097 << serverhost << " and port " << serverport << "." << endl;
00098 }
00099 else {
00100 MSG("DDS",Msg::kWarning)
00101 << "Unable to create socket connected to server host\n"
00102 << serverhost << " and named service MinosDDS." << endl;
00103 }
00104 if (fTSocket) delete fTSocket; fTSocket = 0;
00105 }
00106
00107 else {
00108
00109
00110 fClientAddress = fTSocket -> GetLocalInetAddress();
00111 fServerAddress = fTSocket -> GetInetAddress();
00112
00113
00114 fClientId = new DDSClientId(clienttype,clientname);
00115
00116
00117
00118 fMessageOut.Reset(msgrequest | kMESS_ACK);
00119 fMessageOut.WriteObject(fClientId);
00120
00121 DDS::EMessageType msgrc = SendMessage();
00122 fStatus = msgrc;
00123
00124 if ( msgrc != DDS::kOk ) {
00125 MSG("DDS",Msg::kWarning)
00126 << "The error return message " << DDS::AsString(msgrc)
00127 << "\nwas received from the dispatcher parent server in response to\n"
00128 << "the client's request for " << DDS::AsString(msgrequest) << "."
00129 << endl;
00130 if (fTSocket) delete fTSocket; fTSocket = 0;
00131 }
00132 if (msgrequest == DDS::kShutdown || msgrequest == DDS::kStatus ) {
00133
00134 if ( fTSocket ) delete fTSocket; fTSocket = 0;
00135 if ( msgrequest == DDS::kStatus ) {
00136 fPSStatus = dynamic_cast<DDSPSStatus*>
00137 (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00138 if ( !fPSStatus ) {
00139 MSG("DDS",Msg::kWarning)
00140 << "Request for parentserver report status failed to receive "
00141 << "\nDDSPSStatus object." << endl;
00142 }
00143 }
00144 }
00145 }
00146
00147 if ( IsValid() ) {
00148
00149 fSubscription = new DDSSubscription();
00150 }
00151
00152 }
00153
00154
00155 DDSClient::~DDSClient() {
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165 if ( IsValid() ) Shutdown();
00166 if ( fSubscription ) delete fSubscription; fSubscription = 0;
00167 if ( fMessageIn ) delete fMessageIn; fMessageIn = 0;
00168 if ( fClientId ) delete fClientId; fClientId = 0;
00169 if ( fPSStatus ) delete fPSStatus; fPSStatus = 0;
00170
00171 }
00172
00173 DDS::EMessageType DDSClient::GoToFile(std::string filename,
00174 std::string& returnedfilename) {
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199 DDS::EMessageType msgrc;
00200 char newfilename[512];
00201
00202 if ( this -> IsValid() ) {
00203
00204 fMessageOut.Reset(DDS::kGoToFile | kMESS_ACK);
00205 fMessageOut << filename.c_str();
00206
00207
00208 msgrc = SendMessage();
00209 if ( msgrc == DDS::kOk ) {
00210
00211 (*fMessageIn) >> newfilename;
00212 }
00213 }
00214 else {
00215 MSG("DDS",Msg::kWarning)
00216 << "GoToFile requested with invalid socket connection." << endl;
00217 msgrc = DDS::kSocketError;
00218 }
00219 returnedfilename = newfilename;
00220 return msgrc;
00221
00222 }
00223
00224 DDS::EMessageType DDSClient::GoToNextFile(std::string& returnedfilename) {
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 return this -> GoToFile("next",returnedfilename);
00238
00239 }
00240
00241 DDS::EMessageType DDSClient::Next(MomNavigator*& mom,UInt_t waittime,
00242 UInt_t advanceby) {
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271 DDS::EMessageType msgrc;
00272
00273 MomNavigator* localMom = 0;
00274
00275 if ( IsValid() ) {
00276
00277 fMessageOut.Reset(DDS::kNext | kMESS_ACK);
00278 fMessageOut << waittime;
00279 fMessageOut << advanceby;
00280
00281 msgrc = SendMessage();
00282 if ( msgrc == DDS::kOk ) {
00283
00284 localMom = dynamic_cast<MomNavigator*>
00285 (fMessageIn -> ReadObject(fMessageIn->GetClass()));
00286 if (!localMom) {
00287 MSG("DDS",Msg::kWarning)
00288 << "Next failed to receive expected MomNavigator object." << endl;
00289 }
00290 else {
00291
00292 if (!mom) mom = new MomNavigator();
00293
00294
00295
00296 TObjArray* objArray =const_cast<TObjArray*>
00297 (localMom -> GetFragmentArray());
00298 Int_t nrecord = objArray -> GetEntriesFast();
00299 for ( Int_t irec=0; irec < nrecord/2; irec++ ) {
00300
00301 RecMinos* record = dynamic_cast<RecMinos*>(objArray->RemoveAt(irec));
00302 Registry* temptags=dynamic_cast<Registry*>
00303 (objArray->At(irec+nrecord/2));
00304 record->GetTempTags().Set("stream",temptags->GetCharString("stream"));
00305 record->GetTempTags().Set("tree",temptags->GetCharString("tree"));
00306 record->GetTempTags().Set("file",temptags->GetCharString("file"));
00307 record->GetTempTags().Set("index",temptags->GetInt("index"));
00308 record->GetTempTags().SetName(temptags->GetName());
00309 mom -> AdoptFragment(record);
00310 }
00311 delete localMom; localMom=0;
00312 }
00313 }
00314 }
00315 else {
00316 MSG("DDS",Msg::kWarning)
00317 << "Next requested with invalid socket connection." << endl;
00318 msgrc = DDS::kSocketError;
00319 }
00320
00321 return msgrc;
00322
00323 }
00324
00325 std::ostream& DDSClient::Print(std::ostream& ms) const {
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335 if ( IsValid() ) {
00336 ms << "Client socket connected from local address:" << endl;
00337
00338 ms << fClientAddress.GetHostName() << "/"<< fClientAddress.GetHostAddress()
00339 << "(port " << fClientAddress.GetPort() << ")." << endl;
00340 ms << "to a child server at remote address:" << endl;
00341
00342 ms << fServerAddress.GetHostName() << "/"<< fServerAddress.GetHostAddress()
00343 << "(port " << fServerAddress.GetPort() << ")." << endl;
00344 }
00345 else {
00346 ms << "Client to child server socket is not connected." << endl;
00347 }
00348
00349 return ms;
00350
00351 }
00352
00353 DDS::EMessageType DDSClient::SendMessage() {
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365 DDS::EMessageType msgrc = DDS::kSocketError;
00366
00367 if (fMessageIn) delete fMessageIn; fMessageIn = 0;
00368
00369
00370 Int_t rc = fTSocket -> Send(fMessageOut);
00371 if (rc < 0) {
00372 MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00373 << rc << " in send of message to server." << endl;
00374
00375
00376 MSG("DDS",Msg::kWarning)
00377 << "\nSocket will be shutdown." << endl;
00378 if ( fTSocket ) delete fTSocket; fTSocket = 0;
00379
00380 }
00381 else {
00382
00383 Int_t rc = fTSocket -> Recv(fMessageIn);
00384 if (rc < 0) {
00385 MSG("DDS",Msg::kWarning) << "\nReceived socket error code "
00386 << rc << " in send of message to server." << endl;
00387
00388
00389 MSG("DDS",Msg::kWarning)
00390 << "\nSocket will be shutdown." << endl;
00391 if ( fTSocket ) delete fTSocket; fTSocket = 0;
00392
00393 }
00394 else {
00395 if ( ! fMessageIn ) {
00396 MSG("DDS",Msg::kWarning)
00397 << "Received empty fMessageIn from fTSocket->Recv()"
00398 << endl;
00399 }
00400 else
00401 msgrc = static_cast< DDS::EMessageType > (fMessageIn -> What());
00402 }
00403 }
00404
00405 return msgrc;
00406
00407 }
00408
00409
00410 DDS::EMessageType DDSClient::Shutdown() {
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 DDS::EMessageType msgrc;
00422
00423 if ( IsValid() ) {
00424
00425 fMessageOut.Reset(DDS::kShutdown | kMESS_ACK);
00426
00427 msgrc = SendMessage();
00428 }
00429 else {
00430 MSG("DDS",Msg::kWarning)
00431 << "Shutdown requested with invalid socket connection." << endl;
00432 msgrc = DDS::kSocketError;
00433 }
00434
00435 if (fTSocket) delete fTSocket; fTSocket = 0;
00436
00437 return msgrc;
00438
00439 }
00440
00441 DDS::EMessageType DDSClient::Subscribe() {
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457 DDS::EMessageType msgrc;
00458
00459 if ( IsValid() ) {
00460
00461 fMessageOut.Reset(DDS::kSubscribe | kMESS_ACK);
00462 fMessageOut.WriteObject(fSubscription);
00463 msgrc = SendMessage();
00464 }
00465 else {
00466 MSG("DDS",Msg::kWarning)
00467 << "Subscribe requested with invalid socket connection." << endl;
00468 msgrc = DDS::kSocketError;
00469 }
00470
00471 return msgrc;
00472
00473 }
00474
00475
00476
00477
00478
00479
00480
00481