00001
00002
00003
00004
00005
00006
00007
00009 #include "IoModules/IoInputStreamItr.h"
00010
00011 #include <algorithm>
00012 #include "TClass.h"
00013 #include "TKey.h"
00014 #include "TFile.h"
00015 #include "TROOT.h"
00016 #include "TSystem.h"
00017 #include "TRegexp.h"
00018 #include "MessageService/MsgService.h"
00019 #include "Persistency/PerFile.h"
00020 #include "Persistency/PerFileManager.h"
00021 #include "Util/UtilString.h"
00022
00023 CVSID("$Id: IoInputStreamItr.cxx,v 1.17 2009/01/05 05:55:13 schubert Exp $");
00024
00025
00026
00027 IoInputStreamItr::IoInputStreamItr(const char* streamlist) : fIsValid(true) {
00028
00029
00030
00031 this -> Streams(streamlist);
00032 }
00033
00034
00035
00036 IoInputStreamItr::~IoInputStreamItr() {
00037
00038
00039
00040 MSG("Io",Msg::kDebug) <<
00041 "Close streams " << this->GetSourceName() << "\n";
00042 fInputStreamManager.CloseStream();
00043 }
00044
00045
00046
00047 int IoInputStreamItr::LoadRecords(MomNavigator* m) {
00048
00049
00050
00051
00052 int nrecord = fInputStreamManager.Get(m);
00053 return nrecord;
00054 }
00055
00056
00057
00058 int IoInputStreamItr::Increment(int n, MomNavigator* m) {
00059
00060
00061
00062
00063 return fInputStreamManager.Next(m,n);
00064 }
00065
00066
00067
00068 int IoInputStreamItr::Decrement(int n, MomNavigator* m) {
00069
00070
00071
00072 return fInputStreamManager.Previous(m,n);
00073 }
00074
00075
00076
00077 JobCResult IoInputStreamItr::GoTo(const VldContext& vld, MomNavigator* m) {
00078
00079
00080
00081
00082
00083
00084
00085
00086 const VldContext& newVld = fInputStreamManager.RecordsAt(m,vld);
00087 if ( newVld == Per::GetVldEnd() ) return JobCResult::kEndOfInputStream;
00088 if ( newVld == Per::GetVldBegin() ) return JobCResult::kBeginOfInputStream;
00089 return JobCResult::kAOK;
00090 }
00091
00092
00093
00094 int IoInputStreamItr::GoToEOF() {
00095
00096
00097
00098
00099 fInputStreamManager.RecordsAt((MomNavigator*)0,Per::GetVldEnd());
00100 return 1;
00101
00102 }
00103
00104
00105
00106 int IoInputStreamItr::DefineStream(const char* streamname,const char* treename)
00107 {
00108
00109
00110
00111
00112 MSG("Io",Msg::kDebug) <<
00113 "Define stream " << streamname << " to serve tree " << treename << endl;
00114
00115 if ( fInputStreamManager.GetOpenedStream(streamname) ) {
00116 fInputStreamManager.CloseStream(streamname);
00117 }
00118 fInputStreamManager.OpenStream(streamname,treename);
00119
00120 return 1;
00121 }
00122
00123
00124
00125 int IoInputStreamItr::Streams(const char* streams) {
00126
00127
00128
00129
00130
00131
00132 this -> SetSourceName(streams);
00133 MSG("Io",Msg::kDebug) << "Set streams " << streams << endl;
00134
00135
00136 std::vector<std::string> streamlist;
00137 UtilString::StringTok(streamlist,std::string(streams),":,;/ ");
00138
00139 std::vector<std::string>::iterator strItr;
00140
00141 for ( strItr = streamlist.begin(); strItr != streamlist.end(); strItr++ ) {
00142 std::string streamname = *strItr;
00143 if ( !fInputStreamManager.GetOpenedStream(streamname) ) {
00144 fInputStreamManager.OpenStream(streamname,streamname);
00145 }
00146 }
00147
00148
00149 const PerStreamManager::StreamMap& smap = fInputStreamManager.GetStreamMap();
00150 for ( PerStreamManager::StreamMapConstItr citr = smap.begin();
00151 citr!= smap.end(); ++citr ) {
00152 std::string streamname = citr->first;
00153 typedef std::vector<std::string>::const_iterator vs_citer;
00154 vs_citer vpos = std::find(streamlist.begin(),streamlist.end(),streamname);
00155 if ( vpos == streamlist.end() ) {
00156 fInputStreamManager.CloseStream(streamname);
00157 }
00158 }
00159
00160
00161
00162
00163 bool isNewStream = true;
00164 while ( isNewStream ) {
00165 isNewStream = false;
00166 for ( PerStreamManager::StreamMapConstItr citr = smap.begin();
00167 citr!= smap.end(); ++citr ) {
00168 std::string streamname = citr->first;
00169 std::string assocstreams
00170 = Per::GetAssociatedStreamList(streamname.c_str());
00171 if ( !assocstreams.empty() ) {
00172
00173 std::vector<std::string> assocstreamlist;
00174 UtilString::StringTok(assocstreamlist,
00175 std::string(assocstreams),":,;/ ");
00176 std::vector<std::string>::iterator astrItr;
00177
00178 for ( astrItr = assocstreamlist.begin();
00179 astrItr!= assocstreamlist.end(); astrItr++ ) {
00180 std::string assocstreamname = *astrItr;
00181 if ( !fInputStreamManager.GetOpenedStream(assocstreamname) ) {
00182 fInputStreamManager.OpenStream(assocstreamname,assocstreamname);
00183 isNewStream = true;
00184 }
00185 }
00186 }
00187 }
00188 }
00189
00190
00191 return fInputStreamManager.GetNumStream();
00192 }
00193
00194
00195
00196 int IoInputStreamItr::Select(const char* stream, const char* selection,
00197 bool isRequired) {
00198
00199
00200
00201
00202 int isOpen = 0;
00203 MSG("Io",Msg::kDebug) <<
00204 "Set selection string " << selection << " for stream " << stream << endl;
00205
00206 if ( std::string(stream) == "*" ) {
00207 fInputStreamManager.SetSelection(stream,selection,isRequired);
00208 isOpen = 1;
00209 }
00210 else if ( fInputStreamManager.GetOpenedStream(stream) ) {
00211 fInputStreamManager.SetSelection(stream,selection,isRequired);
00212 isOpen = 1;
00213 }
00214 return isOpen;
00215 }
00216
00217
00218
00219 int IoInputStreamItr::SetPerOwnedDisabled(const char* stream,
00220 bool perowneddisabled) {
00221
00222
00223
00224
00225 int isOpen = 0;
00226 MSG("Io",Msg::kDebug) << "SetPerOwnedDisabled " << perowneddisabled
00227 << " for stream " << stream << endl;
00228
00229 if ( std::string(stream) == "*" ) {
00230 fInputStreamManager.SetPerOwnedDisabled(stream,perowneddisabled);
00231 isOpen = 1;
00232 }
00233 else if ( fInputStreamManager.GetOpenedStream(stream) ) {
00234 fInputStreamManager.SetPerOwnedDisabled(stream,perowneddisabled);
00235 isOpen = 1;
00236 }
00237 return isOpen;
00238 }
00239
00240
00241
00242 int IoInputStreamItr::SetSequenceMode(const char* stream,
00243 Per::ESequenceMode sequenceMode) {
00244
00245
00246
00247
00248 int isOpen = 0;
00249 MSG("Io",Msg::kDebug) <<
00250 "Set sequence mode " << Per::AsString(sequenceMode) << " for stream "
00251 << stream << endl;
00252
00253 if ( fInputStreamManager.GetOpenedStream(stream) ) {
00254 fInputStreamManager.SetSequenceMode(stream,sequenceMode);
00255 isOpen = 1;
00256 }
00257 return isOpen;
00258 }
00259
00260
00261
00262 int IoInputStreamItr::SetTestMode(const char* stream,
00263 bool testMode) {
00264
00265
00266
00267
00268 int isOpen = 0;
00269 std::string testModeStr = "false";
00270 if ( testMode ) testModeStr = "true";
00271
00272 MSG("Io",Msg::kDebug) <<
00273 "Set test mode " << testModeStr.c_str() << " for stream "
00274 << stream << endl;
00275
00276 if ( std::string(stream) == "*" ) {
00277 fInputStreamManager.SetTestMode(stream,testMode);
00278 isOpen = 1;
00279 }
00280 else if ( fInputStreamManager.GetOpenedStream(stream) ) {
00281 fInputStreamManager.SetTestMode(stream,testMode);
00282 isOpen = 1;
00283 }
00284 return isOpen;
00285
00286 }
00287
00288 int IoInputStreamItr::SetWindow(const char* stream, double lower, double upper)
00289 {
00290
00291
00292
00293
00294 int isOpen = 0;
00295 MSG("Io",Msg::kDebug) <<
00296 "Set window [" << lower << ", " << upper << "] for stream "
00297 << stream << endl;
00298
00299 if ( fInputStreamManager.GetOpenedStream(stream) ) {
00300 fInputStreamManager.SetWindow(stream,lower,upper);
00301 isOpen = 1;
00302 }
00303 return isOpen;
00304 }
00305
00306 int IoInputStreamItr::SetMaxFileRepeat(const char* stream, int numRepeat)
00307 {
00308
00309
00310
00311
00312
00313 int isOpen = 0;
00314 if (numRepeat < 0) numRepeat = 0;
00315 MSG("Io",Msg::kDebug)
00316 << "Set number of times to reuse each file from stream " << stream
00317 << " = " << numRepeat << endl;
00318
00319 if ( fInputStreamManager.GetOpenedStream(stream) ) {
00320 fInputStreamManager.SetMaxFileRepeat(stream,numRepeat);
00321 isOpen = 1;
00322 }
00323 return isOpen;
00324 }
00325
00326 int IoInputStreamItr::SetMeanMom(const char* stream, double mean)
00327 {
00328
00329
00330
00331
00332 int isOpen = 0;
00333 MSG("Io",Msg::kDebug)
00334 << "Set mean number of events to push to Mom from stream " << stream
00335 << " = " << mean << endl;
00336
00337 if ( fInputStreamManager.GetOpenedStream(stream) ) {
00338 fInputStreamManager.SetMeanMom(stream,mean);
00339 isOpen = 1;
00340 }
00341 return isOpen;
00342 }
00343
00344 int IoInputStreamItr::SetPushRandom(const char* stream, bool setRandom)
00345 {
00346
00347
00348
00349
00350 int isOpen = 0;
00351 MSG("Io",Msg::kDebug)
00352 << "Set push " << ((setRandom)? "random " : "constant ")
00353 << "number of events to push to Mom from stream " << stream
00354 << endl;
00355
00356 if ( fInputStreamManager.GetOpenedStream(stream) ) {
00357 fInputStreamManager.SetPushRandom(stream,setRandom);
00358 isOpen = 1;
00359 }
00360 return isOpen;
00361 }
00362
00363 void IoInputStreamItr::SetRandomSeed(int rSeed)
00364 {
00365
00366
00367
00368
00369 MSG("Io",Msg::kDebug)
00370 << "Set InputStreamManager random seed to " << rSeed << endl;
00371
00372 fInputStreamManager.SetRandomSeed(rSeed);
00373
00374 }
00375
00376
00377 void IoInputStreamItr::AddFile(const char* fullfilepathname, int at,
00378 const char* streamlist ) {
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399 std::vector<std::string> vstreams;
00400 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00401 std::vector<std::string>::iterator vsItr;
00402
00403 int nstream = 0;
00404 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00405 std::string streamname = *vsItr;
00406 if ( fInputStreamManager.GetOpenedStream(streamname) ) {
00407 nstream += fInputStreamManager.AddFile(fullfilepathname,at,streamname);
00408 }
00409 std::string assocstreams
00410 = Per::GetAssociatedStreamList(streamname.c_str());
00411 if ( !assocstreams.empty() ) {
00412
00413 std::vector<std::string> assocstreamlist;
00414 UtilString::StringTok(assocstreamlist,
00415 std::string(assocstreams),":,;/ ");
00416 std::vector<std::string>::iterator astrItr;
00417
00418 for ( astrItr = assocstreamlist.begin();
00419 astrItr!= assocstreamlist.end(); astrItr++ ) {
00420 std::string assocstreamname = *astrItr;
00421
00422
00423 typedef std::vector<std::string>::const_iterator vs_citer;
00424 vs_citer aspos = std::find(vstreams.begin(),vstreams.end(),
00425 assocstreamname);
00426 if ( aspos == vstreams.end() ) {
00427 if ( fInputStreamManager.GetOpenedStream(assocstreamname) ) {
00428 nstream
00429 +=fInputStreamManager.AddFile(fullfilepathname,at,assocstreamname);
00430 }
00431 }
00432 }
00433 }
00434 }
00435
00436 return;
00437 }
00438
00439 const char* IoInputStreamItr::GetCurrentFile(const char* streamname) const {
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451 return (fInputStreamManager.GetCurrentFile(streamname)).c_str();
00452
00453 }
00454
00455 JobCResult IoInputStreamItr::GoToFile( int n, const char* streamlist ) {
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473 int nstream = 0;
00474
00475
00476 std::vector<std::string> vstreams;
00477 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00478 std::vector<std::string>::iterator vsItr;
00479
00480 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00481 std::string streamname = *vsItr;
00482 nstream += fInputStreamManager.GoToFile(n,streamname);
00483 }
00484
00485 if ( !nstream ) return JobCResult::kWarning;
00486 return JobCResult::kAOK;
00487
00488 }
00489
00490 JobCResult IoInputStreamItr::GoToFile(const char* filename,
00491 const char* streamlist) {
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509 int nstream = 0;
00510
00511
00512 std::vector<std::string> vstreams;
00513 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00514 std::vector<std::string>::iterator vsItr;
00515
00516 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00517 std::string streamname = *vsItr;
00518 nstream += fInputStreamManager.GoToFile(filename,streamname);
00519 }
00520
00521 if ( !nstream ) return JobCResult::kWarning;
00522 return JobCResult::kAOK;
00523 }
00524
00525 JobCResult IoInputStreamItr::NextFile(int n, const char* streamlist) {
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544 int nstream = 0;
00545
00546
00547 std::vector<std::string> vstreams;
00548 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00549 std::vector<std::string>::iterator vsItr;
00550
00551 if ( vstreams.empty() ) return JobCResult::kAOK;
00552
00553 bool isWarning = false;
00554 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00555 std::string streamname = *vsItr;
00556 int nOK = fInputStreamManager.NextFile(n,streamname);
00557 if ( !nOK ) {
00558
00559 if ( !fInputStreamManager.IsEndOfFiles(streamname) ) isWarning = true;
00560 }
00561 else nstream += nOK;
00562 }
00563
00564 if ( !nstream ) {
00565 if ( isWarning ) return JobCResult::kWarning;
00566 else return JobCResult::kEndOfInputStream;
00567 }
00568
00569 return JobCResult::kAOK;
00570
00571 }
00572
00573 JobCResult IoInputStreamItr::PrevFile(int n, const char* streamlist) {
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592 int nstream = 0;
00593
00594
00595 std::vector<std::string> vstreams;
00596 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00597 std::vector<std::string>::iterator vsItr;
00598
00599 if ( vstreams.empty() ) return JobCResult::kAOK;
00600
00601 bool isWarning = false;
00602 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00603 std::string streamname = *vsItr;
00604 int nOK = fInputStreamManager.PrevFile(n,streamname);
00605 if ( !nOK ) {
00606
00607 if ( !fInputStreamManager.IsBeginOfFiles(streamname) ) isWarning = true;
00608 }
00609 else nstream += nOK;
00610 }
00611
00612 if ( !nstream ) {
00613 if ( isWarning ) return JobCResult::kWarning;
00614 else return JobCResult::kBeginOfInputStream;
00615 }
00616
00617 return JobCResult::kAOK;
00618
00619 }
00620
00621 void IoInputStreamItr::RemoveFile(const char* filename,const char* streamlist){
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638 int nstream = 0;
00639
00640
00641 std::vector<std::string> vstreams;
00642 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00643 std::vector<std::string>::iterator vsItr;
00644
00645 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00646 std::string streamname = *vsItr;
00647 nstream += fInputStreamManager.RemoveFile(filename,streamname);
00648 }
00649
00650 return;
00651 }
00652
00653 std::ostream& IoInputStreamItr::ListFile(std::ostream& os,
00654 const char* streamlist) const {
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669 std::vector<std::string> vstreams;
00670 UtilString::StringTok(vstreams,streamlist,":,;/ ");
00671 std::vector<std::string>::iterator vsItr;
00672
00673 for ( vsItr = vstreams.begin(); vsItr != vstreams.end(); vsItr++ ) {
00674 std::string streamname = *vsItr;
00675 fInputStreamManager.ListFile(os,streamname);
00676 }
00677
00678 return os;
00679 }
00680
00681
00682