00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00012
00013 #include <iostream>
00014
00015 #include "TTree.h"
00016 #include "TTreeFormula.h"
00017 #include "TMath.h"
00018 #include "TRandom3.h"
00019
00020 #include "Persistency/PerInputStream.h"
00021 #include "Persistency/PerInputStreamManager.h"
00022 #include "MessageService/MsgService.h"
00023 #include "MinosObjectMap/MomNavigator.h"
00024 #include "Record/RecMinos.h"
00025 #include "Record/RecMinosHdr.h"
00026 #include "Record/RecRecord.h"
00027
00028 #include <Conventions/Munits.h>
00029
00030 std::ostream& operator<<(std::ostream& os, const PerInputStreamManager& pism)
00031 {return pism.Print(os);}
00032
00033 ClassImp(PerInputStreamManager)
00034
00035
00036
00037
00038 CVSID("$Id: PerInputStreamManager.cxx,v 1.64 2009/04/29 23:55:53 arms Exp $");
00039
00040
00041
00042
00043
00044
00045 int PerInputStreamManager::AddFile(std::string fullfilepathname, int at,
00046 std::string streamname) {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 int oldnstream = this -> GetNumStreamOpen();
00059
00060 int nstream = 0;
00061 if ( streamname == "*" ) {
00062 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00063 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00064 nstream += instream -> AddFile(fullfilepathname,at);
00065 }
00066 }
00067 else {
00068 PerInputStream* instream = this -> GetOpenedStream(streamname);
00069 if ( instream ) nstream += instream -> AddFile(fullfilepathname,at);
00070 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00071 << endl;
00072 }
00073
00074 if ( oldnstream == 0 && nstream > 0 ) SetCurrentVld(Per::GetVldBegin());
00075
00076 return nstream;
00077
00078 }
00079
00080
00081
00082 int PerInputStreamManager::AdvanceLowerBoundTags(const VldContext& keyVld) {
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 int nFail = 0;
00101 for ( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end(); ++sitr ){
00102 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00103 if ( instream->GetSequenceMode() != Per::kLowerBound ) continue;
00104 if ( !instream -> AdvanceLowerBoundTags(keyVld) ) nFail++;
00105 }
00106
00107 if ( nFail ) return 0;
00108 return 1;
00109
00110 }
00111
00112
00113
00114 int PerInputStreamManager::AdvanceWindowTags(const VldContext& keyVld) {
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 int nFail = 0;
00126 for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00127 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00128 if (instream->GetSequenceMode() != Per::kWindow) continue;
00129 if ( !instream -> AdvanceWindowTags(keyVld) ) nFail++;
00130 }
00131
00132 if (nFail) return 0;
00133 return 1;
00134 }
00135
00136
00137
00138 int PerInputStreamManager::AdvanceSequentialTags() {
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149 int nFail = 0;
00150 for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00151 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00152 if (instream->GetSequenceMode() != Per::kSequential &&
00153 instream->GetSequenceMode() != Per::kRandom ) continue;
00154 if ( !instream -> AdvanceTagsList() ) nFail++;
00155 }
00156
00157 if (nFail) return 0;
00158 return 1;
00159 }
00160
00161
00162
00163 int PerInputStreamManager::AdvanceRecordTags() {
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 int nFail = 0;
00184 VldContext newVld = Per::GetVldEnd();
00185 int nKey = 0;
00186 int nSeq = 0;
00187 bool allSeqDone = true;
00188
00189 for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
00190 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00191 if ( instream->GetSequenceMode() != Per::kKey ) continue;
00192 nKey++;
00193 PerRecordTags tags = instream -> GetTags();
00194 VldContext lastvld = fLastServed[sitr->first];
00195 bool isdone = false;
00196 while ( !isdone ) {
00197 isdone = true;
00198 if ( tags.IsBegin() || (tags.GetVldContext() <= fCurrentVld
00199 || lastvld == tags.GetVldContext())
00200 || (tags.IsEnd() && !instream->IsEndOfFiles()) ) {
00201 tags = instream->NextTags();
00202 if ( tags.IsEnd() ) {
00203 if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00204 else if (instream->NextFile()) isdone = false;
00205 }
00206 }
00207 }
00208 if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00209 }
00210
00211 if (!nKey) {
00212 for ( StreamMapItr sitr=fStreamMap.begin();
00213 sitr != fStreamMap.end();++sitr ){
00214 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00215 if ( instream->GetSequenceMode() != Per::kSequential ) continue;
00216 nSeq++;
00217 PerRecordTags tags = instream -> GetTags();
00218 VldContext lastvld = fLastServed[sitr->first];
00219 bool isdone = false;
00220 while ( !isdone ) {
00221 isdone = true;
00222 if ( tags.IsBegin() ||
00223 ( tags.IsEnd() && !instream->IsEndOfFiles() ) ) {
00224 tags = instream->NextTags();
00225 if ( tags.IsEnd() ) {
00226 if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00227 else if (instream->NextFile()) isdone = false;
00228 }
00229 }
00230 }
00231 if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00232 if ( !( tags.IsEnd() && instream->IsEndOfFiles() ) ) allSeqDone = false;
00233 }
00234 }
00235
00236 if ( !nKey && !nSeq) {
00237
00238 MSG("Per",Msg::kWarning)
00239 << "No stream of sequence mode Per::kKey or Per::kSequential "
00240 << "found.\n Must be at least one key or sequential stream to "
00241 << "facilitate sequencing."
00242 << std::endl;
00243 return 0;
00244 }
00245
00246
00247
00248 if ( !this -> AdvanceLowerBoundTags(newVld) ) nFail++;
00249 if ( !this -> AdvanceWindowTags(newVld) ) nFail++;
00250
00251 if ( nFail ) {
00252
00253
00254
00255
00256
00257 MSG("Per",Msg::kVerbose) << nFail << " streams failed to advance\n"
00258 << " checking time to tree end against maxsyncdelay "
00259 << fMaxSyncDelay << "." << endl;
00260
00261 Int_t maxTimeToTreeEnd = fMaxSyncDelay - 1;
00262 for( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end();++sitr ){
00263 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00264 if ( instream -> IsOpen() ) {
00265 Int_t timeToTreeEnd = 0;
00266 if ( instream -> fLastEntryVld.IsValid() ) {
00267 timeToTreeEnd = (Int_t)(instream->fLastEntryVld.GetTimeStamp().GetSec())
00268 - (Int_t)(newVld.GetTimeStamp().GetSec());
00269 }
00270 MSG("Per",Msg::kVerbose) << sitr->first << " time to end "
00271 << timeToTreeEnd << "(sec)" << endl;
00272 maxTimeToTreeEnd = TMath::Max(maxTimeToTreeEnd,timeToTreeEnd);
00273 if ( timeToTreeEnd < (Int_t)fMaxSyncDelay ) instream -> UpdateTree();
00274 }
00275 }
00276 if ( maxTimeToTreeEnd < (Int_t)fMaxSyncDelay ) {
00277 if ( fCurrentVld != newVld ) {
00278 SetCurrentVld(newVld);
00279 this -> RewindRecordTags();
00280 }
00281 return 0;
00282 }
00283 }
00284
00285 for ( StreamMapItr sitr = fStreamMap.begin();
00286 sitr != fStreamMap.end(); ++sitr) {
00287 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00288 if ( instream->GetSequenceMode() != Per::kKey &&
00289 instream->GetSequenceMode() != Per::kSequential ) continue;
00290 PerRecordTags tags = instream -> GetTags();
00291 if ( tags.GetVldContext() == newVld ) {
00292 fLastServed[sitr->first] = newVld;
00293 }
00294 }
00295
00296 if ( (newVld == fCurrentVld && !nSeq ) ) {
00297 return 0;
00298 }
00299
00300 if ( nSeq && allSeqDone ) {
00301 return 0;
00302 }
00303
00304 SetCurrentVld(newVld);
00305 return 1;
00306
00307 }
00308
00309
00310
00311 void PerInputStreamManager::CloseFile(string streamName) {
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323 PerStreamManager::CloseFile(streamName);
00324
00325 if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd());
00326
00327 }
00328
00329
00330
00331 void PerInputStreamManager::CloseStream(string streamName) {
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343 PerStreamManager::CloseStream(streamName);
00344
00345 if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd());
00346 if ( streamName == "*" ) fLastServed.clear();
00347 else {
00348 std::map<std::string,VldContext>::iterator itr
00349 = fLastServed.find(streamName);
00350 if ( itr != fLastServed.end() ) fLastServed.erase(itr);
00351 }
00352
00353 }
00354
00355
00356
00357 Int_t PerInputStreamManager::Get(MomNavigator* mom) {
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 if ( !mom ) return 0;
00372
00373 Int_t nrecord = 0;
00374 if ( IsSelectedSet() ) {
00375 nrecord = this -> LoadRecord(mom);
00376 }
00377
00378 return nrecord;
00379
00380 }
00381
00382
00383
00384 VldContext PerInputStreamManager::GetLastEntryVld() const {
00385
00386
00387
00388
00389
00390
00391
00392 VldContext lastentryvld = Per::GetVldBegin();
00393 for (StreamMapConstItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); itr++){
00394 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00395 instream -> UpdateTree();
00396 if ( instream -> GetLastEntryVld() > lastentryvld )
00397 lastentryvld = instream->GetLastEntryVld();
00398 }
00399
00400 return lastentryvld;
00401
00402 }
00403
00404
00405
00406 int PerInputStreamManager::GoToFile(int n, std::string streamname) {
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419 VldContext newVld = Per::GetVldEnd();
00420
00421 int nstream = 0;
00422 if ( streamname == "*" ) {
00423 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00424 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00425 if ( instream -> GoToFile(n) ) {
00426 nstream++;
00427 instream -> NextTags();
00428 newVld = instream->GetTags().GetVldContext();
00429 }
00430 }
00431 }
00432 else {
00433 PerInputStream* instream = this -> GetOpenedStream(streamname);
00434 if ( instream ) {
00435 if ( instream -> GoToFile(n) ) {
00436 nstream++;
00437 instream -> NextTags();
00438 newVld = instream->GetTags().GetVldContext();
00439 }
00440 }
00441 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00442 << endl;
00443 }
00444
00445 if ( nstream ) {
00446
00447 if ( newVld > fCurrentVld ) {
00448 VldContext minVld = this -> GetCurrentKeyVld(true);
00449 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00450 }
00451 else {
00452 VldContext maxVld = this -> GetCurrentKeyVld(false);
00453 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00454 }
00455 }
00456
00457 return nstream;
00458
00459 }
00460
00461
00462
00463 int PerInputStreamManager::GoToFile(std::string fullfilepathname,
00464 std::string streamname) {
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479 VldContext newVld = Per::GetVldEnd();
00480
00481 int nstream = 0;
00482 if ( streamname == "*" ) {
00483 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00484 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00485 if ( instream -> GoToFile(fullfilepathname) ) {
00486 nstream++;
00487 instream -> NextTags();
00488 newVld = instream->GetTags().GetVldContext();
00489 }
00490 }
00491 }
00492 else {
00493 PerInputStream* instream = this -> GetOpenedStream(streamname);
00494 if ( instream ) {
00495 if ( instream -> GoToFile(fullfilepathname) ) {
00496 nstream++;
00497 instream -> NextTags();
00498 newVld = instream->GetTags().GetVldContext();
00499 }
00500 }
00501 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00502 << endl;
00503 }
00504
00505 if ( nstream ) {
00506
00507 if ( newVld > fCurrentVld ) {
00508 VldContext minVld = this -> GetCurrentKeyVld(true);
00509 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00510 }
00511 else {
00512 VldContext maxVld = this -> GetCurrentKeyVld(false);
00513 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00514 }
00515 }
00516
00517 return nstream;
00518
00519 }
00520
00521
00522
00523 int PerInputStreamManager::IsBeginOfFiles(std::string streamname) const {
00524
00525
00526
00527
00528
00529
00530
00531
00532 int nbegin = 0;
00533
00534 if ( streamname != "*" ) {
00535 PerInputStream* instream = this -> GetOpenedStream(streamname);
00536 if ( instream ) {
00537 nbegin = instream -> IsBeginOfFiles();
00538 }
00539 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00540 << endl;
00541 }
00542 else {
00543 for ( StreamMapConstItr citr = fStreamMap.begin();
00544 citr!= fStreamMap.end(); ++citr ) {
00545 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00546 nbegin += instream -> IsBeginOfFiles();
00547 }
00548 }
00549
00550 return nbegin;
00551
00552 }
00553
00554
00555
00556 int PerInputStreamManager::IsEndOfFiles(std::string streamname) const {
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566 int nend = 0;
00567
00568 if ( streamname != "*" ) {
00569 PerInputStream* instream = this -> GetOpenedStream(streamname);
00570 if ( instream ) {
00571 nend = instream -> IsEndOfFiles();
00572 }
00573 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00574 << endl;
00575 }
00576 else {
00577 for ( StreamMapConstItr citr = fStreamMap.begin();
00578 citr!= fStreamMap.end(); ++citr ) {
00579 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00580 nend += instream -> IsEndOfFiles();
00581 }
00582 }
00583
00584 return nend;
00585
00586 }
00587
00588
00589
00590 bool PerInputStreamManager::IsFileEnd() const {
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608 bool isFileEnd = true;
00609
00610 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
00611 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00612 if (!instream->IsFileEnd()) isFileEnd = false;
00613 }
00614
00615 return isFileEnd;
00616
00617 }
00618
00619
00620
00621 bool PerInputStreamManager::IsSelectedSet() {
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636 if ( !fIsNewCurrentVldToSelect ) return fIsSelectedSet;
00637
00638 fIsNewCurrentVldToSelect = false;
00639
00640 if ( this -> IsEnd() || this -> IsBegin() ) {
00641 fIsSelectedSet = false;
00642 return fIsSelectedSet;
00643 }
00644
00645 fIsSelectedSet = true;
00646
00647 if ( !fGlobalSelection.empty() ) {
00648 MSG("Per",Msg::kVerbose) << "Testing globally applied selection cut "
00649 << fGlobalSelection << endl;
00650
00651 this -> UpdateTreeFormula();
00652 if ( fTTreeFormula ) {
00653 for(StreamMapConstItr itr =fStreamMap.begin();
00654 itr!=fStreamMap.end();++itr) {
00655 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00656 const PerRecordTags& tags = instream->GetTags();
00657 Int_t idx = -1;
00658 if ( tags.GetVldContext() == fCurrentVld ) idx = tags.GetIndexLo();
00659 TTree* ttree = instream->fTTree;
00660 if ( ttree ) {
00661 ttree -> LoadTree(idx);
00662 if ( !instream->fTObject ) {
00663 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00664 }
00665 }
00666 }
00667 Int_t arraySize = fTTreeFormula->GetNdata();
00668 bool ispass = false;
00669 for ( int jent = 0; jent < arraySize; jent++ ) {
00670 if ( fTTreeFormula -> EvalInstance(jent) != 0 ) ispass = true;
00671 }
00672 if ( !ispass) fIsSelectedSet = false;
00673 for (StreamMapConstItr itr = fStreamMap.begin();
00674 itr!= fStreamMap.end(); ++itr) {
00675 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00676 instream -> Reset(true);
00677 }
00678 }
00679 }
00680 else {
00681 for(StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();++itr) {
00682 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00683 const PerRecordTags& tags = instream->GetTags();
00684 TTreeFormula* treeformula = instream->fTTreeFormula;
00685 if ( tags.GetVldContext() == fCurrentVld ) {
00686 if ( treeformula ) {
00687 MSG("Per",Msg::kVerbose) << "Testing locally applied selection cut "
00688 << instream->fSelection << " on stream " << itr->first << endl;
00689 TTree* ttree = instream -> fTTree;
00690 if ( ttree ) {
00691 for (Int_t idx=tags.GetIndexLo();idx <= tags.GetIndexHi(); idx++) {
00692 ttree -> LoadTree(idx);
00693 if ( !instream->fTObject ) {
00694 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00695 }
00696 Int_t arraySize = treeformula->GetNdata();
00697 bool ispass = false;
00698 for ( int jent = 0; jent < arraySize; jent++ ) {
00699 if ( treeformula -> EvalInstance(jent) != 0 ) ispass = true;
00700 }
00701 if ( !ispass) fIsSelectedSet = false;
00702 instream->Reset(true);
00703 }
00704 }
00705 }
00706 }
00707
00708 else if ( instream->IsRequired() ) fIsSelectedSet = false;
00709 }
00710
00711 }
00712
00713 if (!fIsSelectedSet)
00714 MSG("Per",Msg::kVerbose) << "Record set failed selection cut" << endl;
00715 else
00716 MSG("Per",Msg::kVerbose) << "Record set passed selection cut" << endl;
00717
00718 return fIsSelectedSet;
00719
00720 }
00721
00722
00723
00724 bool PerInputStreamManager::IsValidSelectionString() const {
00725
00726
00727
00728
00729 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00730 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00731 bool isValid = instream -> IsValidSelectionString();
00732 if ( !isValid ) return false;
00733 }
00734
00735 return true;
00736
00737 }
00738
00739
00740
00741 std::ostream& PerInputStreamManager::ListFile(std::ostream& os,
00742 std::string streamname) const {
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752 if ( streamname == "*" ) {
00753 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00754 os << "Stream " << itr->first << ":" << endl;
00755 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00756 instream -> ListFile(os);
00757 }
00758 }
00759 else {
00760 PerInputStream* instream = this -> GetOpenedStream(streamname);
00761 if ( instream ) {
00762 os << "Stream " << streamname << ":" << endl;
00763 instream -> ListFile(os);
00764 }
00765 else os << "Stream " << streamname << " not open." << endl;
00766 }
00767
00768 return os;
00769
00770 }
00771
00772
00773
00774 Int_t PerInputStreamManager::LoadRecord(MomNavigator* mom) {
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787 MSG("Per",Msg::kVerbose) << "Load Record Set w/currentVld " << fCurrentVld
00788 << endl;
00789
00790 Int_t nrecord = 0;
00791 Int_t lastNRecord = 0;
00792 if ( !mom || this -> IsBegin() || this -> IsEnd() ) return nrecord;
00793
00794 for ( StreamMapItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); ++itr ) {
00795 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00796 std::string streamName = itr->first;
00797
00798 if ( instream -> GetSequenceMode() == Per::kKey ) {
00799 const PerRecordTags& tags = instream -> GetTags();
00800 if ( tags.GetVldContext() == fCurrentVld ) {
00801 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00802 }
00803 }
00804 else if ( instream -> GetSequenceMode() == Per::kLowerBound ) {
00805 const PerRecordTags& tags = instream -> GetTags();
00806
00807 VldContext loVld = tags.GetVldContext();
00808 VldContext hiVld = tags.GetVldContext();
00809 this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00810 if ( !tags.IsBegin() && !tags.IsEnd() ) {
00811 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00812 }
00813 }
00814 else if ( instream -> GetSequenceMode() == Per::kWindow ) {
00815 std::vector<PerRecordTags> tagslist = instream -> GetWindowTags();
00816
00817 VldContext loVld = Per::GetVldEnd();
00818 VldContext hiVld = Per::GetVldBegin();
00819 if ( !tagslist.empty() ) {
00820 loVld = (tagslist[0]).GetVldContext();
00821 hiVld = (tagslist[tagslist.size()-1]).GetVldContext();
00822 }
00823 this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00824 if ( !tagslist.empty() ) {
00825 std::vector<PerRecordTags>::iterator itr;
00826 for ( itr = tagslist.begin(); itr != tagslist.end(); itr++ ) {
00827 const PerRecordTags& tags = *itr;
00828 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00829 }
00830 }
00831 }
00832 else if ( instream -> GetSequenceMode() == Per::kSequential ||
00833 instream -> GetSequenceMode() == Per::kRandom ){
00834
00835 this->RemoveAllFragments(mom,streamName);
00836
00837 double meanMom = instream->GetMeanMom();
00838
00839 if (!fRandomOverride) {
00840 fRandomSeed =
00841 int(10000*fStreamMap.size()+100*meanMom+(instream->fSumTags)+42);
00842 fRanGen->SetSeed(fRandomSeed);
00843 }
00844
00845 int numEvts =
00846 ( instream->GetPushRandom() )?
00847 fRanGen->Poisson(meanMom) : int(ceil(meanMom)) ;
00848
00849 int iEvt = 0;
00850 while (iEvt < numEvts) {
00851 const PerRecordTags& tags = instream -> GetTags();
00852 int iSuccess = this -> LoadRecordWithTag(*instream,tags,mom);
00853 iEvt += iSuccess;
00854 nrecord += iSuccess;
00855 if ( !instream->AdvanceTags() ) break;
00856 }
00857 }
00858 lastNRecord = nrecord;
00859 }
00860
00861 return nrecord;
00862
00863 }
00864
00865
00866
00867 void PerInputStreamManager::RemoveFragmentsNotInWindow(
00868 MomNavigator* mom,std::string streamName,
00869 const VldContext& loVld, const VldContext& hiVld) {
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879 TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00880 for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00881 TObject* oldobject = fragarray -> At(idx);
00882 Registry* oldiotags = 0;
00883 const VldContext* oldvldc = 0;
00884 if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00885 oldiotags = &(oldrecord -> GetTempTags());
00886 oldvldc = oldrecord -> GetVldContext();
00887 }
00888 else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00889 oldiotags = &(oldrecord -> GetTempTags());
00890 oldvldc = &(oldrecord -> GetHeader().GetVldContext());
00891 }
00892 if ( oldiotags ) {
00893 const char* oldtagstream = 0;
00894 if ( oldiotags -> Get("stream",oldtagstream)
00895 && strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00896 if ( oldvldc && (*oldvldc < loVld || *oldvldc > hiVld) ) {
00897 fragarray->RemoveAt(idx);
00898 delete oldobject; oldobject = 0;
00899 }
00900 }
00901 }
00902 }
00903
00904 }
00905
00906
00907
00908 void PerInputStreamManager::RemoveAllFragments(
00909 MomNavigator* mom,std::string streamName ) {
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919 TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00920 for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00921 TObject* oldobject = fragarray -> At(idx);
00922 Registry* oldiotags = 0;
00923 if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00924 oldiotags = &(oldrecord -> GetTempTags());
00925 }
00926 else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00927 oldiotags = &(oldrecord -> GetTempTags());
00928 }
00929 if ( oldiotags ) {
00930 const char* oldtagstream = 0;
00931 if ( oldiotags -> Get("stream",oldtagstream) &&
00932 strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00933 fragarray->RemoveAt(idx);
00934 delete oldobject; oldobject = 0;
00935 }
00936 }
00937 }
00938
00939 }
00940
00941
00942
00943 Int_t PerInputStreamManager::LoadRecordWithTag(const PerInputStream& instream,
00944 const PerRecordTags& tags,
00945 MomNavigator* mom) {
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958 int nrecord = 0;
00959
00960
00961 for (Int_t idx = tags.GetIndexLo(); idx <= tags.GetIndexHi(); idx++) {
00962 std::string streamName = instream.GetStreamName();
00963 TObject* object = mom -> GetFragmentByInputTag(streamName.c_str(),
00964 (tags.GetTreeName()).c_str(),idx,(tags.GetFileName()).c_str());
00965 if ( object ) continue;
00966 object = (const_cast<PerInputStream&>(instream)).GetObject(idx,false);
00967 if ( object ) {
00968 Registry* temptags = 0;
00969 if ( RecMinos* record = dynamic_cast<RecMinos*>(object) ) {
00970 temptags = &(record->GetTempTags());
00971 if ( instream.GetSequenceMode() != Per::kKey &&
00972 instream.GetSequenceMode() != Per::kSequential &&
00973 instream.GetSequenceMode() != Per::kRandom )
00974 record -> SetTransient(false);
00975 }
00976 else if ( RecRecord* record = dynamic_cast<RecRecord*>(object) ) {
00977 temptags = &(record->GetTempTags());
00978 if ( instream.GetSequenceMode() != Per::kKey &&
00979 instream.GetSequenceMode() != Per::kSequential &&
00980 instream.GetSequenceMode() != Per::kRandom )
00981 record -> SetTransient(false);
00982 }
00983
00984 if ( temptags ) {
00985
00986 temptags->UnLockValues();
00987 std::string tagname = instream.GetClassName()+".fTempTags";
00988 temptags->SetName(tagname.c_str());
00989 temptags->Set("stream",streamName.c_str());
00990 temptags->Set("tree",(tags.GetTreeName()).c_str());
00991 temptags->Set("index",idx);
00992 temptags->Set("file",(tags.GetFileName()).c_str());
00993 temptags->LockValues();
00994 }
00995
00996 if ( nrecord == 0) {
00997 MSG("Per",Msg::kVerbose) << "..Loading records with Vld"
00998 << fCurrentVld << " from stream "
00999 << streamName.c_str() << ":" << endl;
01000 }
01001 MSG("Per",Msg::kVerbose) << "....Record " << nrecord << " from"
01002 << " stream " << streamName
01003 << " tree " << tags.GetTreeName()
01004 << " index " << idx
01005 << " file "<< tags.GetFileName()
01006 << endl;
01007 mom -> AdoptFragment(object);
01008
01009 nrecord++;
01010 }
01011 }
01012
01013 return nrecord;
01014
01015 }
01016
01017
01018
01019 Int_t PerInputStreamManager::Next(MomNavigator* mom, UInt_t advanceby) {
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043 MSG("Per",Msg::kVerbose) << "Next advance by " << advanceby << " sets."
01044 << " Start at vld " << fCurrentVld << endl;
01045
01046 if ( this -> IsEnd() ) return 0;
01047 VldContext saveCurrentVld = fCurrentVld;
01048
01049
01050 UInt_t nselect = 0;
01051 while ( nselect < advanceby && this -> AdvanceRecordTags() ) {
01052 MSG("Per",Msg::kVerbose) << "Advanced to vld " << fCurrentVld
01053 << endl;
01054 if ( IsSelectedSet() ) {
01055 nselect++;
01056 if ( nselect >= advanceby ) {
01057
01058 if ( mom ) this -> LoadRecord(mom);
01059 }
01060 }
01061 }
01062
01063 if ( advanceby > 1 ) {
01064
01065 if ( nselect < advanceby &&
01066 fUpdateMode &&
01067 !IsFileEnd() ) {
01068 while ( fCurrentVld > saveCurrentVld ) {
01069 this -> RewindRecordTags();
01070 }
01071
01072 if ( saveCurrentVld != fCurrentVld ) {
01073 MSG("Per",Msg::kWarning)
01074 << "Failure to rewind to original state in Next method." << endl;
01075 }
01076 nselect = 0;
01077 }
01078 }
01079
01080 return (Int_t)nselect;
01081
01082 }
01083
01084
01085
01086 int PerInputStreamManager::NextFile(int n, std::string streamname) {
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102 MSG("Per",Msg::kVerbose) << "PerInputStreamManager::NextFile("
01103 << n << "," << streamname << ") called"
01104 << endl;
01105
01106 int nstream = 0;
01107 if ( streamname == "*" ) {
01108 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01109 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01110 if ( instream -> NextFile(n) ) {
01111 nstream++;
01112 instream -> NextTags();
01113 }
01114 }
01115 }
01116 else {
01117 PerInputStream* instream = this -> GetOpenedStream(streamname);
01118 if ( instream ) {
01119 if ( instream -> NextFile(n) ) {
01120 nstream++;
01121 instream -> NextTags();
01122 }
01123 }
01124 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01125 << endl;
01126 }
01127
01128
01129 VldContext minVld = this -> GetCurrentKeyVld(true);
01130 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
01131
01132 return nstream;
01133
01134 }
01135
01136
01137
01138 VldContext PerInputStreamManager::GetCurrentKeyVld(bool isMin) const {
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149 VldContext currentVld = Per::GetVldEnd();
01150 if ( !isMin ) currentVld = Per::GetVldBegin();
01151
01152 for( StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++ ) {
01153 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01154 if ( instream && instream->IsOpen() ) {
01155 VldContext tagVld = instream->GetTags().GetVldContext();
01156 if ( isMin && tagVld < currentVld ) currentVld = tagVld;
01157 else if ( !isMin && tagVld > currentVld ) currentVld = tagVld;
01158 }
01159 }
01160
01161 return currentVld;
01162
01163 }
01164
01165
01166
01167 PerInputStream* PerInputStreamManager::OpenStream(std::string streamName,
01168 std::string treeName) {
01169
01170
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183
01184
01185
01186 bool openok = false;
01187
01188 PerInputStream* stream=dynamic_cast<PerInputStream*>(fStreamMap[streamName]);
01189 if ( !stream ) {
01190
01191 stream = new PerInputStream(treeName);
01192 stream -> SetStreamName(streamName);
01193 stream -> SetUpdateMode(fUpdateMode);
01194 Per::ESequenceMode seqMode = Per::GetDefSequenceMode(streamName.c_str());
01195 stream -> SetSequenceMode(seqMode);
01196 fStreamMap[streamName] = stream;
01197 fLastServed[streamName] = Per::GetVldEnd();
01198 openok = true;
01199 }
01200 else {
01201 MSG("Per",Msg::kWarning)
01202 <<"Stream manager failed to open\nrequested stream "
01203 << streamName << " because name conflicts with previously opened stream."
01204 << endl;
01205 }
01206
01207 return (openok) ? stream : (PerInputStream*)0;
01208
01209 }
01210
01211
01212
01213 PerInputStreamManager::PerInputStreamManager() : fCurrentVld(Per::GetVldEnd()),
01214 fGlobalSelection(""),fTTreeFormula(0),fFormulaMap(),
01215 fUpdateMode(false),fMaxSyncDelay(15), fIsNewCurrentVldToSelect(true),
01216 fIsSelectedSet(false), fRandomSeed(0), fRandomOverride(false) {
01217
01218
01219
01220
01221
01222
01223
01224
01225
01226 fRanGen = new TRandom3(0);
01227 MSG("Per",Msg::kDebug)
01228 << "PerInputStreamManager ctor : Random Seed set to "
01229 << fRandomSeed << endl;
01230 }
01231
01232
01233
01234 PerInputStreamManager::~PerInputStreamManager() {
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244
01245 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01246 fFormulaMap.clear();
01247
01248 if ( fRanGen ) delete fRanGen; fRanGen = 0;
01249
01250 }
01251
01252
01253
01254 Int_t PerInputStreamManager::Previous(MomNavigator* mom, UInt_t rewindby) {
01255
01256
01257
01258
01259
01260
01261
01262
01263
01264
01265
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278 if ( this -> IsBegin() ) return 0;
01279
01280 UInt_t nselect = 0;
01281
01282 while ( nselect < rewindby && this -> RewindRecordTags() ) {
01283 if ( IsSelectedSet() ) {
01284 nselect++;
01285 if ( nselect >= rewindby ) {
01286
01287 if ( mom ) this -> LoadRecord(mom);
01288 }
01289 }
01290 }
01291
01292 return (Int_t)nselect;
01293
01294 }
01295
01296
01297
01298 int PerInputStreamManager::PrevFile(int n, std::string streamname) {
01299
01300
01301
01302
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315 int nstream = 0;
01316 if ( streamname == "*" ) {
01317 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01318 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01319 if ( instream -> PrevFile(n) ) {
01320 nstream++;
01321 instream -> NextTags();
01322 }
01323 }
01324 }
01325 else {
01326 PerInputStream* instream = this -> GetOpenedStream(streamname);
01327 if ( instream ) {
01328 if ( instream -> PrevFile(n) ) {
01329 nstream++;
01330 instream -> NextTags();
01331 }
01332 }
01333 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01334 << endl;
01335 }
01336
01337
01338 VldContext maxVld = this -> GetCurrentKeyVld(false);
01339 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
01340
01341 return nstream;
01342
01343 }
01344
01345
01346
01347 std::ostream& PerInputStreamManager::Print(std::ostream& os,
01348 const char* option) const {
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360 TString opt = option;
01361 opt.ToLower();
01362 if ( opt == "brief" || opt.IsNull() && fPrintOpt == "brief" ) {
01363 Int_t nenabled = 0;
01364 for ( StreamMapConstItr citr = fStreamMap.begin();
01365 citr != fStreamMap.end(); ++ citr ) {
01366 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
01367 if ( instream -> IsEnabled() && instream -> IsOpen() ) {
01368 os << " " << nenabled << ")" << citr->first << " serving tree "
01369 << instream -> GetTreeName()
01370 << " w/" << instream->GetNumEntries()
01371 << " entries (current entry = " << instream->GetEntry()
01372 << ") using sequence mode "
01373 << Per::AsString(instream->GetSequenceMode())
01374 << "\n from the "
01375 << ((instream->IsFileEnd()) ? "closed file " : "open file ")
01376 << instream->GetFullFilePathName() << "." << endl;
01377 nenabled++;
01378 }
01379 }
01380 }
01381 else {
01382 os << "PerInput";
01383 PerStreamManager::Print(os);
01384 }
01385 return os;
01386
01387 }
01388
01389
01390
01391 VldContext PerInputStreamManager::RecordsAt(MomNavigator* mom,
01392 const VldContext& vld) {
01393
01394
01395
01396
01397
01398
01399
01400
01401
01402
01403
01404
01405
01406
01407
01408
01409
01410
01411
01412 if ( vld == fCurrentVld && IsSelectedSet() ) {
01413 if ( mom ) this -> LoadRecord(mom);
01414 return fCurrentVld;
01415 }
01416
01417 if ( vld < fCurrentVld ) {
01418 while ( RewindRecordTags() ) {
01419 if ( vld >= fCurrentVld && IsSelectedSet() ) {
01420 if ( mom ) this -> LoadRecord(mom);
01421 return fCurrentVld;
01422 }
01423 }
01424 }
01425 else {
01426 while ( AdvanceRecordTags() ) {
01427 if ( vld <= fCurrentVld && IsSelectedSet() ) {
01428 if ( mom ) this -> LoadRecord(mom);
01429 return fCurrentVld;
01430 }
01431 }
01432 }
01433
01434 return fCurrentVld;
01435
01436 }
01437
01438
01439
01440 int PerInputStreamManager::RemoveFile(std::string fullfilepathname,
01441 std::string streamname) {
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453 int nstream = 0;
01454 if ( streamname == "*" ) {
01455 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01456 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01457 nstream += instream -> RemoveFile(fullfilepathname);
01458 }
01459 }
01460 else {
01461 PerInputStream* instream = this -> GetOpenedStream(streamname);
01462 if ( instream ) nstream += instream -> RemoveFile(fullfilepathname);
01463 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01464 << endl;
01465 }
01466
01467 return nstream;
01468
01469 }
01470
01471
01472
01473 int PerInputStreamManager::RewindRecordTags() {
01474
01475
01476
01477
01478
01479
01480
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492 VldContext newVld = Per::GetVldBegin();
01493 int nKey = 0;
01494 int nSeq = 0;
01495 bool allSeqDone = true;
01496
01497 for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
01498 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01499 if ( instream->GetSequenceMode() != Per::kKey ) continue;
01500 nKey++;
01501 PerRecordTags tags = instream->GetTags();
01502 bool isdone = false;
01503 while ( !isdone ) {
01504 isdone = true;
01505 if ( tags.IsEnd() || tags.GetVldContext() >= fCurrentVld
01506 || (tags.IsBegin() && !instream->IsBeginOfFiles()) ) {
01507 tags = instream->PrevTags();
01508 if ( tags.IsBegin() ) {
01509 if ( instream -> PrevFile() ) isdone = false;
01510 }
01511 }
01512 }
01513 if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01514 }
01515
01516
01517 if ( !nKey ) {
01518 for ( StreamMapItr sitr=fStreamMap.begin();
01519 sitr != fStreamMap.end();++sitr ){
01520 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01521 if ( instream->GetSequenceMode() != Per::kSequential ) continue;
01522 nSeq++;
01523 PerRecordTags tags = instream->GetTags();
01524 bool isdone = false;
01525 while ( !isdone ) {
01526 isdone = true;
01527 if ( tags.IsEnd() ||
01528 ( tags.IsBegin() && !instream->IsBeginOfFiles() ) ) {
01529 tags = instream->PrevTags();
01530 if ( tags.IsBegin() ) {
01531 if ( instream -> PrevFile() ) isdone = false;
01532 }
01533 }
01534 }
01535 if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01536 if ( !( tags.IsBegin() && instream->IsBeginOfFiles() ) ) allSeqDone = false;
01537 }
01538 }
01539
01540 if ( !nKey && !nSeq) {
01541
01542 MSG("Per",Msg::kWarning)
01543 << "No stream of sequence mode Per::kKey or Per::kSequential "
01544 << "found.\n Must be at least one key or sequential stream to "
01545 << "facilitate sequencing."
01546 << std::endl;
01547 return 0;
01548 }
01549
01550 if ( fCurrentVld == newVld && !nSeq ) {
01551 return 0;
01552 }
01553
01554 if ( nSeq && allSeqDone ) {
01555 return 0;
01556 }
01557
01558 for ( StreamMapItr sitr = fStreamMap.begin();
01559 sitr != fStreamMap.end(); ++sitr) {
01560 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01561 if ( instream->GetSequenceMode() != Per::kKey &&
01562 instream->GetSequenceMode() != Per::kSequential ) continue;
01563 fLastServed[sitr->first] = Per::GetVldBegin();
01564 }
01565
01566 SetCurrentVld(newVld);
01567 this -> AdvanceLowerBoundTags(fCurrentVld);
01568 this -> AdvanceWindowTags(fCurrentVld);
01569
01570
01571 return 1;
01572
01573 }
01574
01575
01576
01577 bool PerInputStreamManager::SetFile(string streamName,string fullFilePathName,
01578 Per::EAccessMode accessMode) {
01579
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596 Int_t oldNumStream = this -> GetNumStreamOpen();
01597 bool openok
01598 = PerStreamManager::SetFile(streamName,fullFilePathName,accessMode);
01599
01600 if ( openok && oldNumStream == 0) {
01601 SetCurrentVld(Per::GetVldBegin());
01602 }
01603
01604 return openok;
01605
01606 }
01607
01608
01609
01610 void PerInputStreamManager::SetCurrentVld(const VldContext& vld) {
01611
01612
01613
01614 if ( vld != fCurrentVld ) fIsNewCurrentVldToSelect = true;
01615 fCurrentVld = vld;
01616
01617 }
01618
01619
01620
01621 void PerInputStreamManager::SetFileEnd(bool isFileEnd) {
01622
01623
01624
01625
01626
01627
01628
01629
01630
01631
01632 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01633 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr -> second);
01634 instream -> SetFileEnd(isFileEnd);
01635 }
01636
01637 return;
01638
01639 }
01640
01641
01642
01643 void PerInputStreamManager::SetSelection(string streamName, string selection,
01644 bool isRequired) {
01645
01646
01647
01648
01649
01650
01651
01652
01653
01654
01655
01656
01657
01658
01659
01660
01661 if (streamName == "*") {
01662
01663 fGlobalSelection = selection;
01664 fFormulaMap.clear();
01665 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01666
01667 }
01668 else {
01669
01670 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01671 if (stream) {
01672 stream -> SetSelection(selection);
01673 stream -> SetRequired(isRequired);
01674 }
01675 }
01676
01677 }
01678
01679
01680
01681 void PerInputStreamManager::SetSequenceMode(string streamName,
01682 Per::ESequenceMode sequenceMode) {
01683
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696
01697
01698
01699
01700
01701
01702
01703 if (streamName == "*") {
01704
01705 for ( StreamMapConstItr citr = fStreamMap.begin();
01706 citr!= fStreamMap.end(); ++citr ) {
01707 ((PerInputStream*)(citr -> second)) -> SetSequenceMode(sequenceMode);
01708 if (sequenceMode == Per::kSequential ||
01709 sequenceMode == Per::kRandom )
01710 ((PerInputStream*)(citr -> second)) -> SetRandomGenerator(fRanGen);
01711 }
01712 } else {
01713
01714 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01715 if (stream) {
01716 stream -> SetSequenceMode(sequenceMode);
01717 if (sequenceMode == Per::kSequential ||
01718 sequenceMode == Per::kRandom )
01719 stream -> SetRandomGenerator(fRanGen);
01720 }
01721 }
01722 }
01723
01724
01725
01726 void PerInputStreamManager::SetMaxFileRepeat(std::string streamName,
01727 int numRepeat )
01728 {
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741 if (numRepeat < 0) numRepeat = 0;
01742
01743 if (streamName == "*") {
01744
01745 for ( StreamMapConstItr citr = fStreamMap.begin();
01746 citr!= fStreamMap.end(); ++citr ) {
01747 ((PerInputStream*)(citr -> second)) -> SetMaxFileRepeat(numRepeat);
01748 }
01749 }
01750 else {
01751
01752 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01753 stream -> SetMaxFileRepeat(numRepeat);
01754 }
01755 }
01756
01757
01758
01759 void PerInputStreamManager::SetMeanMom(std::string streamName,double mm)
01760 {
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770 if (mm < 0.) return;
01771
01772 if (streamName == "*") {
01773
01774 for ( StreamMapConstItr citr = fStreamMap.begin();
01775 citr!= fStreamMap.end(); ++citr ) {
01776 ((PerInputStream*)(citr -> second)) -> SetMeanMom(mm);
01777 }
01778 }
01779 else {
01780
01781 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01782 stream -> SetMeanMom(mm);
01783 }
01784 }
01785
01786
01787
01788 void PerInputStreamManager::SetPushRandom(std::string streamName,bool tf)
01789 {
01790
01791
01792
01793
01794
01795
01796
01797
01798
01799
01800 if (streamName == "*") {
01801
01802 for ( StreamMapConstItr citr = fStreamMap.begin();
01803 citr!= fStreamMap.end(); ++citr ) {
01804 ((PerInputStream*)(citr -> second)) -> SetPushRandom(tf);
01805 }
01806 }
01807 else {
01808
01809 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01810 stream -> SetPushRandom(tf);
01811 }
01812 }
01813
01814
01815
01816 void PerInputStreamManager::SetRandomSeed(int rSeed)
01817 {
01818
01819 fRanGen->SetSeed(rSeed);
01820 fRandomSeed = rSeed;
01821 fRandomOverride = true;
01822 MSG("Per",Msg::kInfo)
01823 << "PerInputStreamManager::SetRandomSeed : Random Seed set to "
01824 << ((rSeed==0)? fRanGen->GetSeed() : fRandomSeed) << endl;
01825
01826 }
01827
01828
01829
01830 void PerInputStreamManager::SetWindow(string streamName,
01831 double lower, double upper)
01832 {
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850 if (streamName == "*") {
01851
01852 for ( StreamMapConstItr citr = fStreamMap.begin();
01853 citr!= fStreamMap.end(); ++citr )
01854 ((PerInputStream*)(citr -> second)) -> SetWindow(lower,upper);
01855 }
01856 else {
01857
01858 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01859 if (stream) stream -> SetWindow(lower,upper);
01860 }
01861
01862 }
01863
01864
01865
01866 void PerInputStreamManager::SetPerOwnedDisabled(string streamName,
01867 bool perowneddisabled)
01868 {
01869
01870
01871
01872
01873
01874
01875
01876
01877
01878
01879
01880
01881
01882 if (streamName == "*") {
01883
01884 for ( StreamMapConstItr citr = fStreamMap.begin();
01885 citr!= fStreamMap.end(); ++citr )
01886 ((PerInputStream*)(citr -> second))
01887 -> SetPerOwnedDisabled(perowneddisabled);
01888 }
01889 else {
01890
01891 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01892 if (stream) stream -> SetPerOwnedDisabled(perowneddisabled);
01893 }
01894
01895 }
01896
01897
01898
01899 void PerInputStreamManager::SetUpdateMode(bool updateMode) {
01900
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913 fUpdateMode = updateMode;
01914 for ( StreamMapConstItr citr = fStreamMap.begin();
01915 citr!= fStreamMap.end(); ++citr )
01916 ((PerInputStream*)(citr -> second)) -> SetUpdateMode(updateMode);
01917
01918 }
01919
01920
01921
01922 void PerInputStreamManager::UpdateTreeFormula() {
01923
01924
01925
01926
01927
01928
01929
01930
01931 if ( fGlobalSelection.empty() ) return;
01932 if ( this -> GetNumStreamOpen() <= 0 ) {
01933 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01934 fFormulaMap.clear();
01935 return;
01936 }
01937
01938 bool rebuild = false;
01939 if ( !fTTreeFormula ) rebuild = true;
01940 else {
01941
01942 if ( this -> GetNumStream() != fFormulaMap.size() ) rebuild = true;
01943 else {
01944 for ( StreamMapConstItr citr = fStreamMap.begin();
01945 citr!= fStreamMap.end(); ++citr) {
01946 PerInputStream* stream = (PerInputStream*)(citr->second);
01947 std::string streamname = citr->first;
01948 if ( fFormulaMap[streamname] != stream->GetFullFilePathName() ) {
01949 rebuild = true;
01950 break;
01951 }
01952 }
01953 }
01954 }
01955
01956 if ( !rebuild ) return;
01957
01958
01959
01960 fFormulaMap.clear();
01961 std::string minstreamnm = "";
01962 Int_t minentries = -1;
01963 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01964 PerInputStream* stream = (PerInputStream*)(citr -> second);
01965 if ( stream->fTTree != 0 &&
01966 (stream->GetNumEntries() < minentries || minstreamnm.empty()) ) {
01967 minentries = stream->GetNumEntries();
01968 minstreamnm = citr->first;
01969 }
01970 fFormulaMap.insert(make_pair(citr->first,stream->GetFullFilePathName()));
01971 }
01972
01973
01974
01975 TTree* mastertree = ((PerInputStream*)fStreamMap[minstreamnm])->fTTree;
01976 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01977 if ( citr->first == minstreamnm ) continue;
01978 PerInputStream* stream = (PerInputStream*)(citr->second);
01979 if ( stream->fTTree != 0 ) mastertree -> AddFriend(stream->fTTree);
01980 }
01981 if ( !fTTreeFormula ) {
01982 fTTreeFormula
01983 = new TTreeFormula("PerMgrSelection",fGlobalSelection.c_str(),mastertree);
01984 }
01985 else {
01986 fTTreeFormula -> SetTree(mastertree);
01987 fTTreeFormula -> UpdateFormulaLeaves();
01988 }
01989 if ( fTTreeFormula -> GetNdim() <= 0 ) {
01990
01991 delete fTTreeFormula; fTTreeFormula = 0;
01992 fFormulaMap.clear();
01993 }
01994
01995 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01996 if ( citr->first == minstreamnm ) continue;
01997 PerInputStream* stream = (PerInputStream*)(citr->second);
01998 if ( stream->fTTree != 0 ) mastertree -> RemoveFriend(stream->fTTree);
01999 }
02000
02001 return;
02002
02003 }