Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

PerInputStreamManager.cxx

Go to the documentation of this file.
00001 
00002 //
00003 // PerInputStreamManager
00004 //
00005 // Package: Per (Persistency).
00006 //
00007 // S. Kasahara 09/2001
00008 //
00009 // Purpose: Input stream manager class used to manage PerInputStreams.
00010 //
00012 
00013 #include <iostream>
00014 
00015 #include "TTree.h"
00016 #include "TTreeFormula.h"
00017 #include "TMath.h"  //  max,min
00018 #include "TRandom3.h"
00019 
00020 #include "Persistency/PerInputStream.h"
00021 #include "Persistency/PerInputStreamManager.h"
00022 #include "MessageService/MsgService.h"
00023 #include "MinosObjectMap/MomNavigator.h"
00024 #include "Record/RecMinos.h"
00025 #include "Record/RecMinosHdr.h"
00026 #include "Record/RecRecord.h"
00027 
00028 #include <Conventions/Munits.h>
00029 
00030 std::ostream& operator<<(std::ostream& os, const PerInputStreamManager& pism) 
00031                                                {return pism.Print(os);}
00032 
00033 ClassImp(PerInputStreamManager)
00034 
00035 //   Definition of static data members
00036 //   *********************************
00037 
00038 CVSID("$Id: PerInputStreamManager.cxx,v 1.64 2009/04/29 23:55:53 arms Exp $");
00039 
00040 // Definition of methods (alphabetical order)
00041 // ******************************************
00042 
00043 //.....................................................................
00044   
00045 int PerInputStreamManager::AddFile(std::string fullfilepathname, int at,
00046                                    std::string streamname) {
00047   //
00048   //  Purpose:  Add file to file list at position at of specified stream(s).
00049   //
00050   //  Arguments: fullfilepathname
00051   //             at = position in file list (-1 => end of list)
00052   //             streamname (if "*" (default), apply to all streams)
00053   //             
00054   //  Return: number of streams on which action is successfully applied.
00055   //
00056   //  Contact:   S. Kasahara
00057 
00058   int oldnstream = this -> GetNumStreamOpen();
00059 
00060   int nstream = 0;
00061   if ( streamname == "*" ) {
00062     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00063       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00064       nstream += instream -> AddFile(fullfilepathname,at); 
00065     }
00066   }
00067   else {
00068     PerInputStream* instream = this -> GetOpenedStream(streamname);
00069     if ( instream ) nstream += instream -> AddFile(fullfilepathname,at);
00070     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00071                                   << endl;
00072   }
00073   // reset if first, otherwise leave it where it is
00074   if ( oldnstream == 0 && nstream > 0 ) SetCurrentVld(Per::GetVldBegin());
00075 
00076   return nstream;
00077 
00078 }
00079 
00080 //.....................................................................
00081 
00082 int PerInputStreamManager::AdvanceLowerBoundTags(const VldContext& keyVld) {
00083   //  Purpose:  Advance record tags in Per::kLowerBound streams to
00084   //            be equal to or less than the keyVld given in the argument.
00085   //
00086   //  Argument: none.
00087   //
00088   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00089   //           when reading in update mode).   
00090   //
00091   //  Contact:   S. Kasahara
00092   //
00093   //  Notes: To return successful when in update mode requires that the
00094   //         method is able to see at least one record set with validity
00095   //         equal to or greater than the specified input validity, or that
00096   //         the method can see that the open file has been closed by
00097   //         the writer.
00098   //
00099 
00100   int nFail = 0;
00101   for ( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end(); ++sitr ){
00102     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00103     if ( instream->GetSequenceMode() != Per::kLowerBound ) continue;
00104     if ( !instream -> AdvanceLowerBoundTags(keyVld) ) nFail++;
00105   }
00106 
00107   if ( nFail ) return 0;
00108   return 1;
00109 
00110 }
00111 
00112 //.....................................................................
00113 
00114 int PerInputStreamManager::AdvanceWindowTags(const VldContext& keyVld) {
00115   //  Purpose: Attempt to advance record tags in Per::kWindow streams
00116   //  such that their VldContext is in the window around the key
00117   //  VldContext.  The window is defined in the individual streams.
00118   //
00119   //  Argument: VldContext of the key record.
00120   //
00121   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00122   //           when reading in update mode).   
00123   //
00124 
00125   int nFail = 0;
00126   for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00127     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00128     if (instream->GetSequenceMode() != Per::kWindow) continue;
00129     if ( !instream -> AdvanceWindowTags(keyVld) ) nFail++;
00130   }
00131   
00132   if (nFail) return 0;
00133   return 1;
00134 }
00135 
00136 //.....................................................................
00137 
00138 int PerInputStreamManager::AdvanceSequentialTags() {
00139   //  Purpose: Attempt to advance record tags in Per::kSequential &
00140   //  Per::kRandom streams to make the stream active and at an acceptable
00141   //  record regardless of VldContext.
00142   //
00143   //  Argument: None.
00144   //
00145   //  Return:  1 if successful, else 0 (=> waiting for new records on streams 
00146   //           when reading in update mode).   
00147   //
00148 
00149   int nFail = 0;
00150   for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00151     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00152     if (instream->GetSequenceMode() != Per::kSequential &&
00153         instream->GetSequenceMode() != Per::kRandom      ) continue;
00154     if ( !instream -> AdvanceTagsList() ) nFail++;
00155   }
00156   
00157   if (nFail) return 0;
00158   return 1;
00159 }
00160 
00161 //.....................................................................
00162 
00163 int PerInputStreamManager::AdvanceRecordTags() {
00164   //  Purpose:  Advance record tags in managed streams to one past
00165   //            fCurrentVld.
00166   //
00167   //  Argument: none.
00168   //
00169   //  Return:  1 if successful, else 0 (=> end of all 
00170   //           streams or waiting for new records on streams when 
00171   //           reading in update mode).
00172   //
00173   //  Contact:   S. Kasahara
00174   //
00175   //  Notes: This method checks each stream's current record set to see if the 
00176   //         set has a vld that matches or is less than fCurrentVld.  If so, 
00177   //         it advances one record set on that stream.  
00178   //         At the end of the method, if advance has been successful,
00179   //         fCurrentVld is set to point to the lowest validity
00180   //         of all the current stream record tags, or Per::GetVldEnd()
00181   //         if at the end of all streams. 
00182 
00183   int nFail = 0;
00184   VldContext newVld = Per::GetVldEnd();
00185   int  nKey = 0;
00186   int  nSeq = 0;
00187   bool allSeqDone = true;
00188 
00189   for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
00190     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00191     if ( instream->GetSequenceMode() != Per::kKey ) continue;
00192     nKey++;
00193     PerRecordTags tags = instream -> GetTags();
00194     VldContext lastvld = fLastServed[sitr->first];
00195     bool isdone = false;
00196     while ( !isdone ) {
00197       isdone = true;
00198       if ( tags.IsBegin() || (tags.GetVldContext() <= fCurrentVld 
00199                           || lastvld == tags.GetVldContext())   
00200                           || (tags.IsEnd() && !instream->IsEndOfFiles()) ) {
00201         tags = instream->NextTags();
00202         if ( tags.IsEnd() ) {
00203           if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00204           else if (instream->NextFile()) isdone = false;
00205         }
00206       }
00207     }
00208     if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00209   }
00210   // if there is no kKey stream, look for a kSequential stream
00211   if (!nKey) {
00212     for ( StreamMapItr sitr=fStreamMap.begin();
00213           sitr != fStreamMap.end();++sitr ){
00214       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00215       if ( instream->GetSequenceMode() != Per::kSequential ) continue;
00216       nSeq++;
00217       PerRecordTags tags = instream -> GetTags();
00218       VldContext lastvld = fLastServed[sitr->first];
00219       bool isdone = false;
00220       while ( !isdone ) {
00221         isdone = true;
00222         if ( tags.IsBegin()                               ||
00223              ( tags.IsEnd() && !instream->IsEndOfFiles() ) ) {
00224           tags = instream->NextTags();
00225           if ( tags.IsEnd() ) {
00226             if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00227             else if (instream->NextFile()) isdone = false;
00228           }
00229         }
00230       }
00231       if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00232       if ( !( tags.IsEnd() && instream->IsEndOfFiles() ) ) allSeqDone = false;
00233     } // end for loop over the stream map
00234   }
00235 
00236   if ( !nKey && !nSeq) {
00237     // Must be at least one key stream in list of managed streams
00238     MSG("Per",Msg::kWarning)
00239       << "No stream of sequence mode Per::kKey or Per::kSequential "
00240       << "found.\n Must be at least one key or sequential stream to "
00241       << "facilitate sequencing." 
00242       << std::endl;
00243     return 0;
00244   }
00245 
00246   // Now sequence any Per::kLowerBound or Per:kWindow streams relative
00247   // to newVld (kSequential and kRandom streams advance themselves)
00248   if ( !this -> AdvanceLowerBoundTags(newVld) ) nFail++;
00249   if ( !this -> AdvanceWindowTags(newVld) ) nFail++;
00250 
00251   if ( nFail ) {
00252     // One or more streams failed to advance but file is not closed and 
00253     // update mode has been requested (e.g. client is dispatcher)
00254     // Check to see if we should declare a successful advance anyway because
00255     // one or more streams indicate that the condition of fMaxSyncDelay has
00256     // been met.
00257     MSG("Per",Msg::kVerbose) << nFail << " streams failed to advance\n" 
00258                              << " checking time to tree end against maxsyncdelay " 
00259                              << fMaxSyncDelay << "." << endl;
00260     
00261     Int_t maxTimeToTreeEnd = fMaxSyncDelay - 1;
00262     for( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end();++sitr ){
00263       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00264       if ( instream -> IsOpen() ) {
00265         Int_t timeToTreeEnd = 0;
00266         if ( instream -> fLastEntryVld.IsValid() ) {
00267           timeToTreeEnd = (Int_t)(instream->fLastEntryVld.GetTimeStamp().GetSec())
00268             - (Int_t)(newVld.GetTimeStamp().GetSec());
00269         }
00270         MSG("Per",Msg::kVerbose) << sitr->first << " time to end " 
00271                                  << timeToTreeEnd << "(sec)" << endl;
00272         maxTimeToTreeEnd = TMath::Max(maxTimeToTreeEnd,timeToTreeEnd);
00273         if ( timeToTreeEnd < (Int_t)fMaxSyncDelay ) instream -> UpdateTree();
00274       }
00275     }
00276     if ( maxTimeToTreeEnd < (Int_t)fMaxSyncDelay ) {
00277       if ( fCurrentVld != newVld ) {
00278         SetCurrentVld(newVld);
00279         this -> RewindRecordTags(); // to push back to where we were
00280       }
00281       return 0; // failure
00282     }  
00283   }
00284 
00285   for ( StreamMapItr sitr = fStreamMap.begin();
00286         sitr != fStreamMap.end(); ++sitr)  {
00287     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00288     if ( instream->GetSequenceMode() != Per::kKey       &&
00289          instream->GetSequenceMode() != Per::kSequential ) continue;
00290     PerRecordTags tags = instream -> GetTags();
00291     if ( tags.GetVldContext() == newVld ) {
00292       fLastServed[sitr->first] = newVld;
00293     }
00294   }
00295 
00296   if ( (newVld == fCurrentVld && !nSeq ) ) {
00297     return 0; // failure
00298   }
00299 
00300   if ( nSeq && allSeqDone ) {
00301     return 0; // failure
00302   }
00303 
00304   SetCurrentVld(newVld); // equals Per::GetVldEnd() when end of all streams reached
00305   return 1; // success
00306 
00307 }
00308 
00309 //.....................................................................
00310 
00311 void PerInputStreamManager::CloseFile(string streamName) {
00312   //  Purpose:  Close current file serving specified stream(s).
00313   //
00314   //  Argument: streamName string  name of stream on which to close file.
00315   //                               if streamname="*" (default), all
00316   //                               streams will have their files closed.
00317   //
00318   //  Return:  none.
00319   //
00320   //  Contact:   S. Kasahara
00321   //
00322 
00323   PerStreamManager::CloseFile(streamName);
00324   // When all files have been closed, reset fCurrentVld to reflect this
00325   if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd()); 
00326 
00327 }
00328 
00329 //.....................................................................
00330 
00331 void PerInputStreamManager::CloseStream(string streamName) {
00332   //  Purpose:  Close specified stream(s).
00333   //
00334   //  Argument: streamName string  name of stream to close.
00335   //                               if streamName="*" (default), all
00336   //                               streams will be closed.
00337   //
00338   //  Return:  none.
00339   //
00340   //  Contact:   S. Kasahara
00341   //
00342 
00343   PerStreamManager::CloseStream(streamName);
00344   // When all files have been closed, reset fCurrentVld to reflect this
00345   if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd()); 
00346   if ( streamName == "*" ) fLastServed.clear();
00347   else {
00348     std::map<std::string,VldContext>::iterator itr 
00349                                             = fLastServed.find(streamName);
00350     if ( itr != fLastServed.end() ) fLastServed.erase(itr);
00351   }
00352   
00353 }
00354 
00355 //.....................................................................
00356 
00357 Int_t PerInputStreamManager::Get(MomNavigator* mom) {
00358   //  Purpose:  Retrieve current record set from managed record streams
00359   //            with no advancing.  
00360   //
00361   //  Argument: mom        pointer to MomNavigator
00362   //
00363   //  Return:  number of records retrieved.  
00364   //
00365   //  Contact:   S. Kasahara
00366   //
00367   //  Notes:  If one record in a record set (sharing common vldcontext) is 
00368   //          rejected due to a selection cut, then entire set is rejected.
00369   //
00370 
00371   if ( !mom ) return 0;
00372 
00373   Int_t nrecord = 0;
00374   if ( IsSelectedSet() ) {
00375     nrecord = this -> LoadRecord(mom);
00376   }
00377 
00378   return nrecord;
00379 
00380 }
00381 
00382 //.....................................................................
00383 
00384 VldContext PerInputStreamManager::GetLastEntryVld() const {
00385   //
00386   //  Purpose:  Return last entry vld of any open stream.  This is useful
00387   //            when reading record sets from an open file, e.g. when
00388   //            used in context of dispatcher, to determine how close we
00389   //            are to the file end.
00390   //
00391 
00392   VldContext lastentryvld = Per::GetVldBegin();
00393   for (StreamMapConstItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); itr++){
00394     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00395     instream -> UpdateTree();
00396     if ( instream -> GetLastEntryVld() > lastentryvld ) 
00397       lastentryvld = instream->GetLastEntryVld();
00398   }
00399 
00400   return lastentryvld;
00401 
00402 }
00403 
00404 //.....................................................................
00405 
00406 int PerInputStreamManager::GoToFile(int n, std::string streamname) {
00407   //
00408   //  Purpose:  Move to nth files in file list for requested stream(s).
00409   //
00410   //  Arguments: n = file number (0 is first)
00411   //             streamname = name of stream to apply movement. if "*"(default)
00412   //                          will apply to all managed streams.
00413   //             
00414   //  Return: number of streams on which action is successfully applied.
00415   //
00416   //  Contact:   S. Kasahara
00417   // 
00418 
00419   VldContext newVld = Per::GetVldEnd();
00420 
00421   int nstream = 0;
00422   if ( streamname == "*" ) {
00423     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00424       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00425       if ( instream -> GoToFile(n) ) {
00426         nstream++;
00427         instream -> NextTags(); // must advance to first record of file
00428         newVld = instream->GetTags().GetVldContext();
00429       } 
00430     }
00431   }
00432   else {
00433     PerInputStream* instream = this -> GetOpenedStream(streamname);
00434     if ( instream ) {
00435       if ( instream -> GoToFile(n) ) {
00436         nstream++;
00437         instream -> NextTags(); // must adavnce to first record of file
00438         newVld = instream->GetTags().GetVldContext();
00439       }
00440     }
00441     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00442                                   << endl;
00443   }
00444 
00445   if ( nstream ) {
00446     // Setting vld to lower or upper bound is determined by movement direction
00447     if ( newVld > fCurrentVld ) {
00448       VldContext minVld = this -> GetCurrentKeyVld(true);
00449       if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00450     }
00451     else {
00452       VldContext maxVld = this -> GetCurrentKeyVld(false);
00453       if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00454     }
00455   }
00456 
00457   return nstream;
00458 
00459 }
00460 
00461 //.....................................................................
00462 
00463 int PerInputStreamManager::GoToFile(std::string fullfilepathname, 
00464                                     std::string streamname) {
00465   //
00466   //  Purpose:  Move to specified file in file list for requested stream(s).
00467   //
00468   //  Arguments: fullfilepathname = file name(if "" move to first file in list)
00469   //             streamname = name of stream to apply movement. if "*"(default)
00470   //                          will apply to all managed streams.
00471   //             
00472   //  Return: number of streams on which action is successfully applied.
00473   //
00474   //  Contact:   S. Kasahara
00475   // 
00476   //  Notes: After moving to file, the current record ptr is positioned at
00477   //         the first record of the file.
00478 
00479   VldContext newVld = Per::GetVldEnd();
00480 
00481   int nstream = 0;
00482   if ( streamname == "*" ) {
00483     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00484       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00485       if ( instream -> GoToFile(fullfilepathname) ) {
00486         nstream++;
00487         instream -> NextTags(); // must advance to first record of file
00488         newVld = instream->GetTags().GetVldContext();
00489       } 
00490     }
00491   }
00492   else {
00493     PerInputStream* instream = this -> GetOpenedStream(streamname);
00494     if ( instream ) {
00495       if ( instream -> GoToFile(fullfilepathname) ) {
00496         nstream++;
00497         instream -> NextTags(); // must advance to first record of file
00498         newVld = instream->GetTags().GetVldContext();
00499       }
00500     }
00501     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00502                                   << endl;
00503   }
00504 
00505   if ( nstream ) {
00506     // Setting vld to lower or upper bound is determined by movement direction
00507     if ( newVld > fCurrentVld ) {
00508       VldContext minVld = this -> GetCurrentKeyVld(true);
00509       if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00510     }
00511     else {
00512       VldContext maxVld = this -> GetCurrentKeyVld(false);
00513       if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00514     }
00515   }
00516 
00517   return nstream;
00518 
00519 }
00520 
00521 //.....................................................................
00522 
00523 int PerInputStreamManager::IsBeginOfFiles(std::string streamname) const {
00524   //  Purpose:  Return number of streams that have reached begin of filelist.
00525   //
00526   //  Argument: name of stream to check status. If "*" (default), check all 
00527   //            managed streams
00528   //
00529   //  Contact:   S. Kasahara
00530   //
00531 
00532   int nbegin = 0;  
00533 
00534   if ( streamname != "*" ) {
00535     PerInputStream* instream = this -> GetOpenedStream(streamname);
00536     if ( instream ) {
00537       nbegin = instream -> IsBeginOfFiles();
00538     }
00539     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00540                                   << endl;
00541   }
00542   else { 
00543     for ( StreamMapConstItr citr = fStreamMap.begin();
00544                             citr!= fStreamMap.end(); ++citr ) { 
00545       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00546       nbegin += instream -> IsBeginOfFiles();
00547     }
00548   }
00549 
00550   return nbegin;
00551 
00552 }
00553 
00554 //.....................................................................
00555 
00556 int PerInputStreamManager::IsEndOfFiles(std::string streamname) const {
00557   //  Purpose:  Return number of streams that have reached end of filelist.
00558   //
00559   //  Argument: name of stream to check status. If "*" (default), check all 
00560   //            managed streams
00561   //
00562   //  Contact:   S. Kasahara
00563   //
00564 
00565  
00566   int nend = 0;  
00567 
00568   if ( streamname != "*" ) {
00569     PerInputStream* instream = this -> GetOpenedStream(streamname);
00570     if ( instream ) {
00571       nend = instream -> IsEndOfFiles();
00572     }
00573     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
00574                                   << endl;
00575   }
00576   else { 
00577     for ( StreamMapConstItr citr = fStreamMap.begin();
00578                             citr!= fStreamMap.end(); ++citr ) { 
00579       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00580       nend += instream -> IsEndOfFiles();
00581     }
00582   }
00583 
00584   return nend;
00585 
00586 }
00587 
00588 //.....................................................................
00589 
00590 bool PerInputStreamManager::IsFileEnd() const {
00591   //  Purpose:  Check if input file has been closed by writer on all
00592   //            managed streams.
00593   //
00594   //  Return:  true if file end detected on all streams.
00595   //
00596   //  Contact:   S. Kasahara
00597   //
00598   //  Notes:  Invokes PerInputStream::IsFileEnd for all streams.
00599   //          This method is useful for a user (e.g. dispatcher)
00600   //          reading open files.  Note that detecting file closure
00601   //          by the writer does not mean that the user has reached
00602   //          the end of the entries in the current record map.
00603   //          Use IsEnd()&&IsFileEnd() to determine if the user has
00604   //          reached the end of the current record map *and* all
00605   //          all files have been closed by the writer.
00606   //
00607 
00608   bool isFileEnd = true;  
00609 
00610   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
00611     PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00612     if (!instream->IsFileEnd()) isFileEnd = false;
00613   }
00614 
00615   return isFileEnd;
00616 
00617 }
00618 
00619 //.....................................................................
00620 
00621 bool PerInputStreamManager::IsSelectedSet() {
00622   //  Purpose:  Check to see if record set corresponding to fCurrentVld
00623   //            has passed user specified selection cuts.
00624   //
00625   //  Arguments: none.
00626   //
00627   //  Return:  true if all records passed selection cut.
00628   //
00629   //  Contact:   S. Kasahara
00630   //
00631   //  Notes: If multiple streams have selection cuts applied, the record set
00632   //         is rejected based on an AND of the different
00633   //         selection cuts, i.e. rejecting one record rejects the entire
00634   //         set.
00635 
00636   if ( !fIsNewCurrentVldToSelect ) return fIsSelectedSet;
00637   
00638   fIsNewCurrentVldToSelect = false;
00639 
00640   if ( this -> IsEnd() || this -> IsBegin() ) {
00641     fIsSelectedSet = false;
00642     return fIsSelectedSet;
00643   }
00644   
00645   fIsSelectedSet = true;  
00646 
00647   if ( !fGlobalSelection.empty() ) {
00648     MSG("Per",Msg::kVerbose) << "Testing globally applied selection cut " 
00649                              << fGlobalSelection << endl;
00650     // Selection cut was applied globally
00651     this -> UpdateTreeFormula();
00652     if ( fTTreeFormula ) {
00653       for(StreamMapConstItr itr =fStreamMap.begin();
00654                             itr!=fStreamMap.end();++itr) {
00655         PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00656         const PerRecordTags& tags = instream->GetTags();
00657         Int_t idx = -1;
00658         if ( tags.GetVldContext() == fCurrentVld ) idx = tags.GetIndexLo();
00659         TTree* ttree = instream->fTTree;
00660         if ( ttree ) {
00661           ttree -> LoadTree(idx);
00662           if ( !instream->fTObject ) {
00663             instream -> fTBranch -> SetAddress(&(instream->fTObject));
00664           }
00665         }
00666       }
00667       Int_t arraySize = fTTreeFormula->GetNdata();
00668       bool ispass = false;
00669       for ( int jent = 0; jent < arraySize; jent++ ) {
00670         if ( fTTreeFormula -> EvalInstance(jent) != 0 ) ispass = true;  
00671       }
00672       if ( !ispass) fIsSelectedSet = false;
00673       for (StreamMapConstItr itr = fStreamMap.begin();
00674                              itr!= fStreamMap.end(); ++itr) {
00675         PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00676         instream -> Reset(true);
00677       }
00678     }
00679   }
00680   else {
00681     for(StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();++itr) {
00682       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00683       const PerRecordTags& tags = instream->GetTags();
00684       TTreeFormula* treeformula = instream->fTTreeFormula;
00685       if ( tags.GetVldContext() == fCurrentVld ) {
00686         if ( treeformula ) {
00687           MSG("Per",Msg::kVerbose) << "Testing locally applied selection cut " 
00688                 << instream->fSelection << " on stream " << itr->first << endl;
00689           TTree* ttree = instream -> fTTree;
00690           if ( ttree ) {
00691             for (Int_t idx=tags.GetIndexLo();idx <= tags.GetIndexHi(); idx++) {
00692               ttree -> LoadTree(idx);
00693               if ( !instream->fTObject ) {
00694                 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00695               }
00696               Int_t arraySize = treeformula->GetNdata();
00697               bool ispass = false;
00698               for ( int jent = 0; jent < arraySize; jent++ ) {
00699                 if ( treeformula -> EvalInstance(jent) != 0 ) ispass = true;
00700               }
00701               if ( !ispass) fIsSelectedSet = false;
00702               instream->Reset(true);
00703             }
00704           }
00705         }
00706       }
00707       // Check to see if record set is missing a record from a required stream 
00708       else if ( instream->IsRequired() ) fIsSelectedSet = false;  
00709     }
00710 
00711   }
00712   
00713   if (!fIsSelectedSet) 
00714      MSG("Per",Msg::kVerbose) << "Record set failed selection cut" << endl;
00715   else 
00716      MSG("Per",Msg::kVerbose) << "Record set passed selection cut" << endl;
00717 
00718   return fIsSelectedSet;
00719 
00720 }
00721 
00722 //.....................................................................
00723 
00724 bool PerInputStreamManager::IsValidSelectionString() const {
00725   // OR of PerInputStream::IsValidSelectionString() results, e.g.
00726   // if any one stream's application of selection string failed, return
00727   // false
00728 
00729   for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00730     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00731     bool isValid = instream -> IsValidSelectionString();
00732     if ( !isValid ) return false;
00733   }
00734 
00735   return true;
00736   
00737 }
00738 
00739 //.....................................................................
00740 
00741 std::ostream& PerInputStreamManager::ListFile(std::ostream& os, 
00742                                        std::string streamname) const {
00743  //
00744  //  Purpose:  Print the contents of the fFileList for requested stream(s).
00745  //
00746  //  Arguments: ostream reference to print on and name of stream(s). If
00747  //             streamname == "*" (default), lists files on all streams.
00748  //
00749  //  Contact:   S. Kasahara
00750  // 
00751 
00752   if ( streamname == "*" ) {
00753     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00754       os << "Stream " << itr->first << ":" << endl;
00755       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00756       instream -> ListFile(os);
00757     }
00758   }
00759   else {
00760     PerInputStream* instream = this -> GetOpenedStream(streamname);
00761     if ( instream ) {
00762       os << "Stream " << streamname << ":" << endl;
00763       instream -> ListFile(os);
00764     }
00765     else os << "Stream " << streamname << " not open." << endl;
00766   }
00767 
00768   return os;
00769 
00770 }
00771 
00772 //.....................................................................
00773 
00774 Int_t PerInputStreamManager::LoadRecord(MomNavigator* mom) {
00775   //  Purpose:  Loads records with fCurrentVld in input MomNavigator
00776   //            object.
00777   //
00778   //  Argument: pointer to MomNavigator.  
00779   //
00780   //  Return:  Number of records loaded. 
00781   //
00782   //  Contact:   S. Kasahara
00783   //
00784   //  Notes: LoadRecord does not apply user selection cuts to the record set.
00785   //
00786 
00787   MSG("Per",Msg::kVerbose) << "Load Record Set w/currentVld " << fCurrentVld 
00788                            << endl;
00789   
00790   Int_t nrecord = 0;
00791   Int_t lastNRecord = 0;
00792   if ( !mom || this -> IsBegin() || this -> IsEnd() ) return nrecord;
00793 
00794   for ( StreamMapItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); ++itr ) {
00795     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00796     std::string streamName = itr->first;
00797 
00798     if ( instream -> GetSequenceMode() == Per::kKey ) {
00799       const PerRecordTags& tags = instream -> GetTags();
00800       if ( tags.GetVldContext() == fCurrentVld ) {
00801         nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00802       }
00803     }
00804     else if ( instream -> GetSequenceMode() == Per::kLowerBound ) {
00805       const PerRecordTags& tags = instream -> GetTags();
00806       // First clean Mom of previously loaded object from this stream if not this tag
00807       VldContext loVld = tags.GetVldContext();
00808       VldContext hiVld = tags.GetVldContext();
00809       this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00810       if ( !tags.IsBegin() && !tags.IsEnd() ) {
00811         nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00812       }
00813     }
00814     else if ( instream -> GetSequenceMode() == Per::kWindow ) {
00815       std::vector<PerRecordTags> tagslist = instream -> GetWindowTags();
00816       // First clean Mom of any objects not in range of tags on this list
00817       VldContext loVld = Per::GetVldEnd(); // set in this way all records will be removed
00818       VldContext hiVld = Per::GetVldBegin();
00819       if ( !tagslist.empty() ) {
00820         loVld = (tagslist[0]).GetVldContext();
00821         hiVld = (tagslist[tagslist.size()-1]).GetVldContext();
00822       }
00823       this ->  RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00824       if ( !tagslist.empty() ) {
00825         std::vector<PerRecordTags>::iterator itr;
00826         for ( itr = tagslist.begin(); itr != tagslist.end(); itr++ ) {
00827           const PerRecordTags& tags = *itr;
00828           nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00829         }
00830       }
00831     }
00832     else if ( instream -> GetSequenceMode() == Per::kSequential ||
00833               instream -> GetSequenceMode() == Per::kRandom      ){
00834       // clear Mom of all entries of this sort
00835       this->RemoveAllFragments(mom,streamName);
00836 
00837       double meanMom = instream->GetMeanMom();
00838 
00839       if (!fRandomOverride) {
00840         fRandomSeed =
00841           int(10000*fStreamMap.size()+100*meanMom+(instream->fSumTags)+42);
00842         fRanGen->SetSeed(fRandomSeed);
00843       }
00844 
00845       int    numEvts =
00846         ( instream->GetPushRandom() )?
00847         fRanGen->Poisson(meanMom) : int(ceil(meanMom)) ;
00848 
00849       int iEvt = 0;
00850       while (iEvt < numEvts) {
00851         const PerRecordTags& tags = instream -> GetTags();
00852         int iSuccess = this -> LoadRecordWithTag(*instream,tags,mom);
00853         iEvt    += iSuccess;
00854         nrecord += iSuccess;
00855         if ( !instream->AdvanceTags() ) break;
00856       } // end loop over # events to push
00857     } // end if Per::kSequential || Per::kRandom
00858     lastNRecord = nrecord;
00859   } // end loop over stream map
00860 
00861   return nrecord;
00862 
00863 }
00864 
00865 //.....................................................................
00866 
00867 void PerInputStreamManager::RemoveFragmentsNotInWindow(
00868                 MomNavigator* mom,std::string streamName, 
00869                 const VldContext& loVld, const VldContext& hiVld) {
00870   //  Purpose:  Remove fragments < loVld || > hiVld
00871   //
00872   //  Argument: pointer to MomNavigator.  
00873   //
00874   //  Return:  none
00875   //
00876   //  Contact:   S. Kasahara
00877   //
00878 
00879   TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00880   for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00881     TObject* oldobject = fragarray -> At(idx);
00882     Registry* oldiotags = 0;
00883     const VldContext* oldvldc = 0;
00884     if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00885       oldiotags = &(oldrecord -> GetTempTags());
00886       oldvldc = oldrecord -> GetVldContext();
00887     }
00888     else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00889       oldiotags = &(oldrecord -> GetTempTags());
00890       oldvldc = &(oldrecord -> GetHeader().GetVldContext());
00891     }
00892     if ( oldiotags ) {
00893       const char* oldtagstream = 0;
00894       if ( oldiotags -> Get("stream",oldtagstream) 
00895         && strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00896         if ( oldvldc && (*oldvldc < loVld || *oldvldc > hiVld) ) {
00897           fragarray->RemoveAt(idx);
00898           delete oldobject; oldobject = 0;
00899         }
00900       }
00901     }
00902   }
00903   
00904 }
00905 
00906 //.....................................................................
00907 
00908 void PerInputStreamManager::RemoveAllFragments(
00909                  MomNavigator* mom,std::string streamName ) {
00910   //  Purpose:  Remove fragments of stream streamName from mom
00911   //
00912   //  Argument: pointer to MomNavigator & streamName  
00913   //
00914   //  Return:  none
00915   //
00916   //  Contact:   S. Kasahara & K. Arms
00917   //
00918 
00919   TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00920   for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00921     TObject* oldobject = fragarray -> At(idx);
00922     Registry* oldiotags = 0;
00923     if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00924       oldiotags = &(oldrecord -> GetTempTags());
00925     }
00926     else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00927       oldiotags = &(oldrecord -> GetTempTags());
00928     }
00929     if ( oldiotags ) {
00930       const char* oldtagstream = 0;
00931       if ( oldiotags -> Get("stream",oldtagstream)     &&
00932            strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00933           fragarray->RemoveAt(idx);
00934           delete oldobject; oldobject = 0;
00935       } // end if tags are from stream streamName
00936     } // end if tags exist
00937   } // end for loop over mom fragment array
00938   
00939 }
00940 
00941 //.....................................................................
00942   
00943 Int_t PerInputStreamManager::LoadRecordWithTag(const PerInputStream& instream,
00944                                                const PerRecordTags& tags, 
00945                                                MomNavigator* mom) {
00946   //  Purpose:  Loads records from this tag in input MomNavigator
00947   //            object.
00948   //
00949   //  Argument: pointer to MomNavigator.  
00950   //
00951   //  Return:  Number of records loaded. 
00952   //
00953   //  Contact:   S. Kasahara
00954   //
00955   //  Notes: LoadRecord does not apply user selection cuts to the record set.
00956   //
00957 
00958   int nrecord = 0;
00959 
00960   // Load the records in the tag set
00961   for (Int_t idx = tags.GetIndexLo(); idx <= tags.GetIndexHi(); idx++) {
00962     std::string streamName = instream.GetStreamName();
00963     TObject* object = mom -> GetFragmentByInputTag(streamName.c_str(),
00964             (tags.GetTreeName()).c_str(),idx,(tags.GetFileName()).c_str());
00965     if ( object ) continue;  // object already in Mom don't bother
00966     object = (const_cast<PerInputStream&>(instream)).GetObject(idx,false);
00967     if ( object ) {
00968       Registry* temptags = 0;
00969       if ( RecMinos* record = dynamic_cast<RecMinos*>(object) ) {
00970         temptags = &(record->GetTempTags());
00971         if ( instream.GetSequenceMode() != Per::kKey        &&
00972              instream.GetSequenceMode() != Per::kSequential &&
00973              instream.GetSequenceMode() != Per::kRandom      )
00974           record -> SetTransient(false);
00975       }
00976       else if ( RecRecord* record = dynamic_cast<RecRecord*>(object) ) {
00977         temptags = &(record->GetTempTags());
00978         if ( instream.GetSequenceMode() != Per::kKey        &&
00979              instream.GetSequenceMode() != Per::kSequential &&
00980              instream.GetSequenceMode() != Per::kRandom      )
00981           record -> SetTransient(false);
00982       }
00983         
00984       if ( temptags ) {
00985         // Stamp record with i/o tags
00986         temptags->UnLockValues();
00987         std::string tagname = instream.GetClassName()+".fTempTags";
00988         temptags->SetName(tagname.c_str());
00989         temptags->Set("stream",streamName.c_str());
00990         temptags->Set("tree",(tags.GetTreeName()).c_str());
00991         temptags->Set("index",idx);
00992         temptags->Set("file",(tags.GetFileName()).c_str());
00993         temptags->LockValues();
00994       }
00995       // And add record to Mom, mom now owns record
00996       if ( nrecord == 0) {
00997         MSG("Per",Msg::kVerbose) << "..Loading records with Vld" 
00998                                  <<  fCurrentVld << " from stream " 
00999                                  << streamName.c_str() << ":" << endl;
01000       }
01001       MSG("Per",Msg::kVerbose) << "....Record " << nrecord << " from"
01002                                << " stream " << streamName
01003                                << " tree " << tags.GetTreeName()
01004                                << " index " << idx
01005                                << " file "<< tags.GetFileName()
01006                                << endl;
01007       mom -> AdoptFragment(object);
01008         
01009       nrecord++;
01010     } // object retrieved
01011   } // loop over tag tree indices
01012 
01013   return nrecord; 
01014 
01015 }
01016 
01017 //.....................................................................
01018 
01019 Int_t PerInputStreamManager::Next(MomNavigator* mom, UInt_t advanceby) {
01020   //  Purpose:  Retrieve next set of records from managed streams of
01021   //            common VldContext which satisfy user's  
01022   //            selection cuts (if specified).  The records are 
01023   //            extracted from the input streams and loaded into the
01024   //            input MomNavigator object.    
01025   //
01026   //  Argument: mom        pointer to MomNavigator. (If set 0 (default), the 
01027   //                       method will advance to the record set of interest
01028   //                       but not load the records.  The user can then
01029   //                       retrieve the record set with the Get method.)
01030   //            advanceby  number of selected record sets to advance by
01031   //                       (default = 1).  Can be used to skip over record
01032   //                       sets.
01033   //
01034   //  Return:  number of selected record sets advanced.
01035   //
01036   //  Contact:   S. Kasahara
01037   //
01038   //  Notes:  Entries are subject to any user-specified selection strings
01039   //          as described under PerInputStream::GetObject and 
01040   //          PerInputStreamManager::IsSelectedSet.
01041   //
01042 
01043   MSG("Per",Msg::kVerbose) << "Next advance by " << advanceby << " sets." 
01044                            << " Start at vld " << fCurrentVld << endl;
01045   
01046   if ( this -> IsEnd() ) return 0;
01047   VldContext saveCurrentVld = fCurrentVld; 
01048 
01049   // Search for entry satisfying selection cuts
01050   UInt_t nselect = 0;
01051   while ( nselect < advanceby && this -> AdvanceRecordTags() ) {
01052     MSG("Per",Msg::kVerbose) << "Advanced to vld " << fCurrentVld 
01053                              << endl;
01054     if ( IsSelectedSet() ) {
01055       nselect++;
01056       if ( nselect >= advanceby ) {
01057         // Load record in Mom
01058         if ( mom ) this -> LoadRecord(mom);
01059       }
01060     }
01061   }
01062 
01063   if ( advanceby > 1 ) {
01064     // current vld is left at last record served if in updatemode
01065     if ( nselect < advanceby && 
01066          fUpdateMode         && 
01067          !IsFileEnd()         ) {
01068       while ( fCurrentVld > saveCurrentVld ) {
01069         this -> RewindRecordTags();
01070       }
01071       // At this point fCurrentVld should be back to original
01072       if ( saveCurrentVld != fCurrentVld ) {
01073         MSG("Per",Msg::kWarning) 
01074           << "Failure to rewind to original state in Next method." << endl;
01075       }
01076       nselect = 0; 
01077     }
01078   }
01079 
01080   return (Int_t)nselect;
01081 
01082 }
01083 
01084 //.....................................................................
01085 
01086 int PerInputStreamManager::NextFile(int n, std::string streamname) {
01087   //
01088   //  Purpose:  Move stream(s) forward n files in file list.  If n exceeds 
01089   //            number of files to end of list, ptr is left at eof marker.  
01090   //
01091   //  Arguments: n = number of files to advance (default = 1)
01092   //             streamname = name of stream to apply movement. if "*"(default)
01093   //                          will apply to all managed streams.
01094   //             
01095   //  Return: number of streams on which action is successfully applied.
01096   //
01097   //  Contact:   S. Kasahara
01098   // 
01099   //  Notes: After moving to file, the current record ptr is positioned at
01100   //         the first record of the file.
01101 
01102   MSG("Per",Msg::kVerbose) << "PerInputStreamManager::NextFile("
01103                            << n << "," << streamname << ") called"
01104                            << endl;
01105 
01106   int nstream = 0;
01107   if ( streamname == "*" ) {
01108     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01109       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01110       if ( instream -> NextFile(n) ) {
01111         nstream++;
01112         instream -> NextTags(); // must advance to first record of file
01113       } 
01114     }
01115   }
01116   else {
01117     PerInputStream* instream = this -> GetOpenedStream(streamname);
01118     if ( instream ) { 
01119       if ( instream -> NextFile(n) ) {
01120         nstream++;
01121         instream -> NextTags(); // must advance to first record of file
01122       }
01123     }
01124     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01125                                   << endl;
01126   }
01127 
01128   // Recalculate fCurrentVld to point to lower bound of new set of tags
01129   VldContext minVld = this -> GetCurrentKeyVld(true);
01130   if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
01131 
01132   return nstream;
01133 
01134 }
01135 
01136 //.....................................................................
01137 
01138 VldContext PerInputStreamManager::GetCurrentKeyVld(bool isMin) const {
01139   // 
01140   // Purpose:  Determine current minimum or maximum validity of managed 
01141   //           key stream record block tags
01142   //
01143   // Arguments: isMin = true (default) => return minimum validity, else
01144   //            returns max validity
01145   //
01146   // Contact: S. Kasahara
01147   //
01148 
01149   VldContext currentVld = Per::GetVldEnd();
01150   if ( !isMin ) currentVld = Per::GetVldBegin();
01151 
01152   for( StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++ ) {
01153     PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01154     if ( instream && instream->IsOpen() ) {
01155       VldContext tagVld = instream->GetTags().GetVldContext();
01156       if ( isMin && tagVld < currentVld ) currentVld = tagVld;
01157       else if ( !isMin && tagVld > currentVld ) currentVld = tagVld;
01158     } 
01159   }
01160 
01161   return currentVld;
01162 
01163 }
01164 
01165 //.....................................................................
01166 
01167 PerInputStream* PerInputStreamManager::OpenStream(std::string streamName, 
01168                                                   std::string treeName) {
01169   //
01170   //  Purpose:  Open input stream with name streamName serving tree treeName.
01171   //
01172   //  Arguments: streamName  string   name of stream to be opened. Names of 
01173   //                                  managed streams must be unique.
01174   //             treeName    string   name of tree served by this stream.
01175   //
01176   //  Return:  pointer to PerInputStream. If unable to open stream (because
01177   //           stream with this streamName has already been opened), returns
01178   //           (PerInputStream*)0.
01179   //
01180   //  Contact:   S. Kasahara
01181   //
01182   //  Notes: PerInputStream objects are owned by the PerInputStreamManager 
01183   //         and should only be deleted through the 
01184   //         PerStreamManager::CloseStream method.
01185 
01186   bool openok = false;
01187 
01188   PerInputStream* stream=dynamic_cast<PerInputStream*>(fStreamMap[streamName]);
01189   if ( !stream ) {
01190     // Stream not found in map, need to create new one
01191     stream = new PerInputStream(treeName);
01192     stream -> SetStreamName(streamName);
01193     stream -> SetUpdateMode(fUpdateMode);
01194     Per::ESequenceMode seqMode = Per::GetDefSequenceMode(streamName.c_str());
01195     stream -> SetSequenceMode(seqMode);
01196     fStreamMap[streamName] = stream;
01197     fLastServed[streamName] = Per::GetVldEnd();
01198     openok = true;
01199   }
01200   else {
01201     MSG("Per",Msg::kWarning)
01202       <<"Stream manager failed to open\nrequested stream "
01203       << streamName << " because name conflicts with previously opened stream."
01204       << endl;
01205   }
01206 
01207   return (openok) ? stream : (PerInputStream*)0;
01208 
01209 }
01210 
01211 //.....................................................................
01212 
01213 PerInputStreamManager::PerInputStreamManager() : fCurrentVld(Per::GetVldEnd()),
01214       fGlobalSelection(""),fTTreeFormula(0),fFormulaMap(),
01215       fUpdateMode(false),fMaxSyncDelay(15), fIsNewCurrentVldToSelect(true), 
01216       fIsSelectedSet(false), fRandomSeed(0), fRandomOverride(false) {
01217   //
01218   //  Purpose:  Default constructor.
01219   //
01220   //  Arguments: none.
01221   //
01222   //  Return:  n/a.
01223   //
01224   //  Contact:   S. Kasahara
01225   //
01226   fRanGen = new TRandom3(0);
01227   MSG("Per",Msg::kDebug)
01228     << "PerInputStreamManager ctor : Random Seed set to "
01229     << fRandomSeed << endl;
01230 }
01231 
01232 //.....................................................................
01233 
01234 PerInputStreamManager::~PerInputStreamManager() {
01235   //
01236   //  Purpose:  Default constructor.
01237   //
01238   //  Arguments: none.
01239   //
01240   //  Return:  n/a.
01241   //
01242   //  oCntact:   S. Kasahara
01243   //
01244 
01245  if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01246  fFormulaMap.clear();
01247 
01248  if ( fRanGen ) delete fRanGen; fRanGen = 0;
01249 
01250 }
01251 
01252 //.....................................................................
01253 
01254 Int_t PerInputStreamManager::Previous(MomNavigator* mom, UInt_t rewindby) {
01255   //  Purpose:  Retrieve previous set of records from managed streams of
01256   //            common VldContext which satisfy user's  
01257   //            selection cuts (if specified).  The records are 
01258   //            extracted from the input streams and loaded into the
01259   //            input MomNavigator object.    
01260   //
01261   //  Argument: mom        pointer to MomNavigator (default=0 means 
01262   //                       streams are rewound but no record set is loaded.
01263   //                       The user can then use Get to retrieve the record 
01264   //                       set.)
01265   //            rewindby   number of selected record sets to rewind by
01266   //                       (default = 1).  Can be used to skip over record
01267   //                       sets.
01268   //
01269   //  Return:  number of record sets rewound.  
01270   //
01271   //  Contact:   S. Kasahara
01272   //
01273   //  Notes:  Entries are subject to any user-specified selection strings
01274   //          as described under PerInputStream::GetObject and 
01275   //          PerInputStreamManager::IsSelectedSet.
01276   //
01277 
01278   if ( this -> IsBegin() ) return 0;
01279 
01280   UInt_t nselect = 0;
01281   // Search for entry satisfying selection cuts
01282   while ( nselect < rewindby && this -> RewindRecordTags() ) {
01283     if ( IsSelectedSet() ) {
01284       nselect++;
01285       if ( nselect >= rewindby ) {
01286         // Load record in Mom
01287         if ( mom ) this -> LoadRecord(mom);
01288       }
01289     }
01290   }
01291 
01292   return (Int_t)nselect;
01293 
01294 }
01295 
01296 //.....................................................................
01297 
01298 int PerInputStreamManager::PrevFile(int n, std::string streamname) {
01299   //
01300   //  Purpose:  Move stream(s) back n files in file list.  If n exceeds 
01301   //            number of files to beginning of list, ptr is left at first
01302   //            file.
01303   //
01304   //  Arguments: n = number of files to rewind (default = 1)
01305   //             streamname = name of stream to apply movement. if "*"(default)
01306   //                          will apply to all managed streams.
01307   //             
01308   //  Return: number of streams on which action is successfully applied.
01309   //
01310   //  Contact:   S. Kasahara
01311   // 
01312   //  Notes: After moving to file, the current record ptr is positioned at
01313   //         the first record of the file.
01314 
01315   int nstream = 0;
01316   if ( streamname == "*" ) {
01317     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01318       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01319       if ( instream -> PrevFile(n) ) {
01320         nstream++;
01321         instream -> NextTags(); // position at first record of file
01322       } 
01323     }
01324   }
01325   else {
01326     PerInputStream* instream = this -> GetOpenedStream(streamname);
01327     if ( instream ) {
01328       if ( instream -> PrevFile(n) ) {
01329         nstream++;
01330         instream -> NextTags(); // position at first record of file
01331       }
01332     }
01333     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01334                                   << endl;
01335   }
01336 
01337   // Recalculate fCurrentVld to point to upper bound of new set of tags
01338   VldContext maxVld = this -> GetCurrentKeyVld(false);
01339   if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
01340 
01341   return nstream;
01342 
01343 }
01344 
01345 //.....................................................................
01346 
01347 std::ostream& PerInputStreamManager::Print(std::ostream& os, 
01348                                            const char* option) const {
01349   //
01350   //  Purpose:  Print status of input stream manager on std::ostream.
01351   //
01352   //  Arguments: os           std::ostream to display on.
01353   //             option       verbosity level ("" (default), or "brief")
01354   //
01355   //  Return:  std::ostream reference.
01356   //
01357   //  Contact:   S. Kasahara
01358   //
01359 
01360   TString opt = option;
01361   opt.ToLower();
01362   if ( opt == "brief" || opt.IsNull() && fPrintOpt == "brief" ) {
01363     Int_t nenabled = 0;
01364     for ( StreamMapConstItr citr  = fStreamMap.begin();
01365                             citr != fStreamMap.end(); ++ citr ) {
01366       PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
01367       if ( instream -> IsEnabled() && instream -> IsOpen() ) {
01368         os << "    " << nenabled << ")" << citr->first << " serving tree " 
01369            << instream -> GetTreeName()
01370            << " w/" << instream->GetNumEntries() 
01371            << " entries (current entry = " << instream->GetEntry()
01372            << ") using sequence mode " 
01373            << Per::AsString(instream->GetSequenceMode())
01374            << "\n      from the " 
01375            << ((instream->IsFileEnd()) ? "closed file " : "open file ")  
01376            << instream->GetFullFilePathName() << "." << endl;
01377         nenabled++;
01378       }
01379     }
01380   }
01381   else {
01382     os << "PerInput";
01383     PerStreamManager::Print(os);
01384   }
01385   return os;
01386 
01387 }
01388 
01389 //.....................................................................
01390 
01391 VldContext PerInputStreamManager::RecordsAt(MomNavigator* mom,
01392                                             const VldContext& vld) {
01393   //  Purpose:  Retrieve record set with requested VldContext from managed
01394   //            record streams.  Record set is subject to selection
01395   //            cuts as specified by user.  The records are extracted from
01396   //            the input streams and loaded into the input MomNavigator
01397   //            object.  If exact match not found, the record set corresponding
01398   //            to the validity just beyond that requested is loaded.
01399   //
01400   //  Argument: mom        pointer to MomNavigator
01401   //                       (if == 0, streams are moved to vld but records are
01402   //                        not loaded)
01403   //            vld        VldContext of requested record set
01404   //
01405   //  Return:  VldContext of new record set (= Per::GetVldEnd() or 
01406   //           Per::GetVldBegin() if end-of-input-streams or 
01407   //           begin-of-input-streams respectively).
01408   //
01409   //  Contact:   S. Kasahara
01410   //
01411 
01412   if ( vld == fCurrentVld && IsSelectedSet() ) {
01413     if ( mom ) this -> LoadRecord(mom);
01414     return fCurrentVld;
01415   }
01416 
01417   if ( vld < fCurrentVld ) {
01418     while ( RewindRecordTags() ) {
01419       if ( vld >= fCurrentVld && IsSelectedSet() ) {
01420         if ( mom ) this -> LoadRecord(mom);
01421         return fCurrentVld;
01422       }
01423     }
01424   }
01425   else {
01426     while ( AdvanceRecordTags() ) {
01427       if ( vld <= fCurrentVld && IsSelectedSet() ) {
01428         if ( mom ) this -> LoadRecord(mom);
01429         return fCurrentVld;
01430       }
01431     }
01432   }
01433 
01434   return fCurrentVld; // begin-or-end-of-streams
01435 
01436 }
01437 
01438 //.....................................................................
01439 
01440 int PerInputStreamManager::RemoveFile(std::string fullfilepathname, 
01441                                       std::string streamname) {
01442   //
01443   //  Purpose:  Remove file from file list of specified stream(s).
01444   //
01445   //  Arguments: fullfilepathname (if "*" (default), remove all files.)
01446   //             streamname (if "*" (default), apply to all streams)
01447   //             
01448   //  Return: number of streams on which action is successfully applied.
01449   //         
01450   //  Contact:   S. Kasahara
01451   // 
01452 
01453   int nstream = 0;
01454   if ( streamname == "*" ) {
01455     for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01456       PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01457       nstream += instream -> RemoveFile(fullfilepathname); 
01458     }
01459   }
01460   else {
01461     PerInputStream* instream = this -> GetOpenedStream(streamname);
01462     if ( instream ) nstream += instream -> RemoveFile(fullfilepathname);
01463     else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open." 
01464                                   << endl;
01465   }
01466 
01467   return nstream;
01468 
01469 }
01470 
01471 //.....................................................................
01472 
01473 int PerInputStreamManager::RewindRecordTags() {
01474   //  Purpose:  Rewind record tags in managed streams to one previous to
01475   //            fCurrentVld.
01476   //
01477   //  Argument: none.
01478   //
01479   //  Return:  1 if success, else 0 (end of all 
01480   //           streams reached)
01481   //
01482   //  Contact:   S. Kasahara
01483   //
01484   //  Notes: This method checks each stream's current record set to see if the 
01485   //         set has a vld that matches or exceeds fCurrentVld.  If so, it 
01486   //         rewinds one record set on that stream.  
01487   //         At the end of the method, if rewind has been successful,
01488   //         fCurrentVld is set to point to the highest validity
01489   //         of all the current stream record tags, or Per::GetVldBegin()
01490   //         if at the beginning of all streams. 
01491 
01492   VldContext newVld = Per::GetVldBegin();
01493   int nKey = 0;
01494   int nSeq = 0;
01495   bool allSeqDone = true;
01496 
01497   for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
01498     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01499     if ( instream->GetSequenceMode() != Per::kKey ) continue;
01500     nKey++;
01501     PerRecordTags tags = instream->GetTags();
01502     bool isdone = false;
01503     while ( !isdone ) {
01504       isdone = true;
01505       if ( tags.IsEnd() || tags.GetVldContext() >= fCurrentVld 
01506                         || (tags.IsBegin() && !instream->IsBeginOfFiles()) ) {
01507         tags = instream->PrevTags();
01508         if ( tags.IsBegin() ) {
01509           if ( instream -> PrevFile() ) isdone = false;
01510         }
01511       }
01512     }
01513     if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01514   }
01515 
01516   // if there is no kKey stream, look for a kSequential stream
01517   if ( !nKey ) {
01518     for ( StreamMapItr sitr=fStreamMap.begin();
01519           sitr != fStreamMap.end();++sitr ){
01520       PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01521       if ( instream->GetSequenceMode() != Per::kSequential ) continue;
01522       nSeq++;
01523       PerRecordTags tags = instream->GetTags();
01524       bool isdone = false;
01525       while ( !isdone ) {
01526         isdone = true;
01527         if ( tags.IsEnd()                                     ||
01528              ( tags.IsBegin() && !instream->IsBeginOfFiles() ) ) {
01529           tags = instream->PrevTags();
01530           if ( tags.IsBegin() ) {
01531             if ( instream -> PrevFile() ) isdone = false;
01532           }
01533         }
01534       }
01535       if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01536       if ( !( tags.IsBegin() && instream->IsBeginOfFiles() ) ) allSeqDone = false;
01537     } // end loop over the stream map
01538   } // end if no kKey stream
01539 
01540   if ( !nKey && !nSeq) {
01541     // Must be at least one key stream in list of managed streams
01542     MSG("Per",Msg::kWarning)
01543       << "No stream of sequence mode Per::kKey or Per::kSequential "
01544       << "found.\n Must be at least one key or sequential stream to "
01545       << "facilitate sequencing."
01546       << std::endl;
01547     return 0;
01548   }
01549 
01550   if ( fCurrentVld == newVld && !nSeq ) {
01551     return 0; // failure
01552   }
01553 
01554   if ( nSeq && allSeqDone ) {
01555     return 0; // failure
01556   }
01557 
01558   for ( StreamMapItr sitr = fStreamMap.begin();
01559         sitr != fStreamMap.end(); ++sitr)  {
01560     PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01561     if ( instream->GetSequenceMode() != Per::kKey       &&
01562          instream->GetSequenceMode() != Per::kSequential ) continue;
01563     fLastServed[sitr->first] = Per::GetVldBegin();
01564   }
01565 
01566   SetCurrentVld(newVld); //equals Per::GetVldBegin() when begin of all strms reached
01567   this -> AdvanceLowerBoundTags(fCurrentVld);
01568   this -> AdvanceWindowTags(fCurrentVld);
01569   // kSequential and kRandom streams advance themselves as appropriate
01570 
01571   return 1;
01572 
01573 }
01574 
01575 //.....................................................................
01576 
01577 bool PerInputStreamManager::SetFile(string streamName,string fullFilePathName,
01578                                     Per::EAccessMode accessMode) {
01579   //  Purpose:  Sets new file for specified stream(s).
01580   //
01581   //  Argument: streamName  string  name of stream to set file.
01582   //                                if streamName="*" (default), all
01583   //                                streams will have their file set.
01584   //            fullFilePathName string  new filename.
01585   //            accessMode          Per::EAccessMode accessMode in which to
01586   //                                open file.
01587   //
01588   //  Return:  bool  set true if PerInputStream::SetFile returned true for at
01589   //                 least one of the requested streams.
01590   //
01591   //  Contact:   S. Kasahara
01592   //
01593   //  Notes:  Invokes PerInputStream::SetFile for each requested stream.
01594   //
01595 
01596    Int_t oldNumStream = this -> GetNumStreamOpen();
01597    bool openok 
01598    = PerStreamManager::SetFile(streamName,fullFilePathName,accessMode);
01599    // reset if first, otherwise leave it where it is
01600    if ( openok && oldNumStream == 0) {
01601      SetCurrentVld(Per::GetVldBegin());
01602    } 
01603 
01604    return openok;
01605 
01606 }
01607 
01608 //.....................................................................
01609 
01610 void PerInputStreamManager::SetCurrentVld(const VldContext& vld) {
01611   // Set fCurrentVld to vld, and mark that IsSelected needs to be
01612   // applied to this validity
01613  
01614   if ( vld != fCurrentVld ) fIsNewCurrentVldToSelect = true;
01615   fCurrentVld = vld;
01616 
01617 }
01618 
01619 //.....................................................................
01620   
01621 void PerInputStreamManager::SetFileEnd(bool isFileEnd) {
01622   //  Purpose:  Force file end on input file.  Can be used for recovering
01623   //            aborted files.
01624   //
01625   //  Arguments: isFileEnd  true (default) to force file end
01626   //
01627   //  Return:  none.
01628   //
01629   //  Contact:   S. Kasahara
01630   //
01631 
01632   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01633     PerInputStream* instream = dynamic_cast<PerInputStream*>(citr -> second);
01634     instream -> SetFileEnd(isFileEnd);
01635   }
01636 
01637   return;
01638 
01639 }
01640 
01641 //.....................................................................
01642 
01643 void PerInputStreamManager::SetSelection(string streamName, string selection,
01644                                          bool isRequired) {
01645   //  Purpose:  Set selection string for specified stream(s).
01646   //
01647   //  Argument: streamName  string  name of stream to set selection..
01648   //                                if streamName="*" (default), all
01649   //                                streams will have selection set.
01650   //            selection   string  selection cut to be applied to TTree
01651   //                                served by this stream. See PerInputStream::
01652   //                                SetSelection for more information.
01653   //
01654   //  Return:  none.
01655   //
01656   //  Contact:   S. Kasahara
01657   //
01658   //  Notes:  Invokes PerInputStream::SetSelection for each requested stream.
01659   //
01660 
01661   if (streamName == "*") {
01662     // Apply selection string globally
01663     fGlobalSelection = selection;
01664     fFormulaMap.clear();
01665     if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01666     // New TTreeFormula will be built at time of application
01667   }
01668   else {
01669     // SetSelection on stream of specified streamName
01670     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01671     if (stream) { 
01672       stream -> SetSelection(selection);
01673       stream -> SetRequired(isRequired);
01674     }
01675   }
01676 
01677 }
01678 
01679 //.....................................................................
01680 
01681 void PerInputStreamManager::SetSequenceMode(string streamName, 
01682                                             Per::ESequenceMode sequenceMode) {
01683   //  Purpose:  Set sequence mode for specified stream(s).
01684   //            For kSequential and kRandom streams, a pointer to the
01685   //            Manager's random numner generator is passed as well
01686   //
01687   //  Argument: streamName  string  name of stream to set selection..
01688   //                                if streamName="*" (default), all
01689   //                                streams will have selection set.
01690   //            sequenceMode        Per::kKey (default), Per::kLowerBound,
01691   //                                Per::kWindow, Per::kSequential, or
01692   //                                Per::kRandom
01693   //
01694   //  Return:  none.
01695   //
01696   //  Contact:   S. Kasahara & K. Arms
01697   //
01698   //  Notes:  Invokes PerInputStream::SetSequenceMode for each requested 
01699   //          stream.
01700   //
01701 
01702 
01703   if (streamName == "*") {
01704     // Set selection on all streams
01705     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01706           citr!= fStreamMap.end(); ++citr ) {
01707       ((PerInputStream*)(citr -> second)) -> SetSequenceMode(sequenceMode);
01708       if (sequenceMode == Per::kSequential ||
01709           sequenceMode == Per::kRandom      )
01710         ((PerInputStream*)(citr -> second)) -> SetRandomGenerator(fRanGen);
01711     } // end for loop
01712   } else {
01713     // SetSelection on stream of specified streamName
01714     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01715     if (stream) {
01716       stream -> SetSequenceMode(sequenceMode);
01717       if (sequenceMode == Per::kSequential ||
01718           sequenceMode == Per::kRandom      )
01719         stream -> SetRandomGenerator(fRanGen);
01720     } // end if stream is OK
01721   } // end else
01722 }
01723 
01724 //.....................................................................
01725 
01726 void PerInputStreamManager::SetMaxFileRepeat(std::string streamName,
01727                                              int         numRepeat )
01728 {
01729   // Purpose: Set the maximum number of times to reuse files
01730   //          from this stream before moving to the next file
01731   //          (only used for kSequential & kRandom stream sequence
01732   //          modes)
01733   //
01734   // Argument: stream name of interest, max number (int >= 0)
01735   //
01736   // Return: none
01737   //
01738   // Note: Invokes PerInPutStream::SetMaxFileRepeat() for each stream
01739   //
01740 
01741   if (numRepeat < 0) numRepeat = 0;
01742 
01743   if (streamName == "*") {
01744     // Set info for all streams
01745     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01746           citr!= fStreamMap.end(); ++citr ) {
01747       ((PerInputStream*)(citr -> second)) -> SetMaxFileRepeat(numRepeat);
01748     } // end loop over streams
01749   }
01750   else {
01751     // Set info for specified streamName
01752     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01753     stream -> SetMaxFileRepeat(numRepeat);
01754   }
01755 }
01756 
01757 //.....................................................................
01758 
01759 void PerInputStreamManager::SetMeanMom(std::string streamName,double mm)
01760 {
01761   // Purpose: Set the mean number of events to push to Mom from this stream
01762   //
01763   // Argument: stream name of interest, mean number (double >= 0.)
01764   //
01765   // Return: none
01766   //
01767   // Note: Invokes PerInPutStream::SetMeanMom() for each stream
01768   //
01769 
01770   if (mm < 0.) return;
01771 
01772   if (streamName == "*") {
01773     // Set info for all streams
01774     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01775           citr!= fStreamMap.end(); ++citr ) {
01776       ((PerInputStream*)(citr -> second)) -> SetMeanMom(mm);
01777     } // end loop over streams
01778   }
01779   else {
01780     // Set info for specified streamName
01781     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01782     stream -> SetMeanMom(mm);
01783   }
01784 }
01785 
01786 //.....................................................................
01787 
01788 void PerInputStreamManager::SetPushRandom(std::string streamName,bool tf)
01789 {
01790   // Purpose: Set whether to push a constant (false) or random (true)
01791   //          number of events to Mom from this stream
01792   //
01793   // Argument: stream name of interest, bool
01794   //
01795   // Return: none
01796   //
01797   // Note: Invokes PerInPutStream::SetPushRandom() for each stream
01798   //
01799 
01800   if (streamName == "*") {
01801     // Set info for all streams
01802     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01803           citr!= fStreamMap.end(); ++citr ) {
01804       ((PerInputStream*)(citr -> second)) -> SetPushRandom(tf);
01805     } // end loop over streams
01806   }
01807   else {
01808     // Set info for specified streamName
01809     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01810     stream -> SetPushRandom(tf);
01811   }
01812 }
01813 
01814 //.....................................................................
01815 
01816 void PerInputStreamManager::SetRandomSeed(int rSeed)
01817 { 
01818   // User input method. If code modifies the random seed, use fRanGen->SetSeed()
01819   fRanGen->SetSeed(rSeed);
01820   fRandomSeed = rSeed;
01821   fRandomOverride = true;
01822   MSG("Per",Msg::kInfo)
01823     << "PerInputStreamManager::SetRandomSeed : Random Seed set to "
01824     << ((rSeed==0)? fRanGen->GetSeed() : fRandomSeed) << endl;
01825 
01826 }
01827 
01828 //.....................................................................
01829 
01830 void PerInputStreamManager::SetWindow(string streamName, 
01831                                       double lower, double upper)
01832 {
01833   //  Purpose:  Set window to use if kWindow sequence mode is used.
01834   //
01835   //  Argument: streamName  string  name of stream to set window.
01836   //                                if streamName="*" (default), all
01837   //                                streams will have selection set.
01838   //            lower       double  define window relative to 
01839   //            upper       double  the key stream VldContext
01840   //
01841   //  Return:  none.
01842   //
01843   //  Contact:   S. Kasahara, B. Viren
01844   //
01845   //  Notes: Invokes PerInputStream::SetWindow for each requested
01846   //  stream.
01847   //
01848 
01849 
01850   if (streamName == "*") {
01851     // Set selection on all streams
01852     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01853                             citr!= fStreamMap.end(); ++citr ) 
01854       ((PerInputStream*)(citr -> second)) -> SetWindow(lower,upper);
01855   }
01856   else {
01857     // SetSelection on stream of specified streamName
01858     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01859     if (stream) stream -> SetWindow(lower,upper);
01860   }
01861 
01862 }
01863 
01864 //.....................................................................
01865 
01866 void PerInputStreamManager::SetPerOwnedDisabled(string streamName, 
01867                                                 bool perowneddisabled)
01868 {
01869   //  Purpose:  Set PerOwnedDisabled for requested stream(s).
01870   //
01871   //  Argument: streamName  string  name of stream. if streamName="*"(default),
01872   //                                apply to all streams.
01873   //            perowneddisabled  bool  (default = true). Argument passed to 
01874   //                                     SetPerOwnedDisabled for each
01875   //                                     requested stream.
01876   //
01877   //  Return:  none.
01878   //
01879   //  Contact:   S. Kasahara
01880   //
01881 
01882   if (streamName == "*") {
01883     // Invoke SetPerOwnedDisabled on all streams
01884     for ( StreamMapConstItr citr = fStreamMap.begin(); 
01885                             citr!= fStreamMap.end(); ++citr ) 
01886       ((PerInputStream*)(citr -> second)) 
01887                       -> SetPerOwnedDisabled(perowneddisabled);
01888   }
01889   else {
01890     // Invoke SetPerOwnedDisabled on stream of specified streamName
01891     PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01892     if (stream) stream -> SetPerOwnedDisabled(perowneddisabled);
01893   }
01894 
01895 }
01896 
01897 //.....................................................................
01898 
01899 void PerInputStreamManager::SetUpdateMode(bool updateMode) {
01900   //  Purpose:  Set update mode true or false. true should be set
01901   //            when reading open files (e.g. user is dispatcher).
01902   //
01903   //  Argument: updateMode  bool  true => stream tree will be updated when
01904   //                              end of entries is reached. Appropriate
01905   //                              for read of open files.  Default when
01906   //                              stream is opened is false.
01907   //
01908   //  Return:  none.
01909   //
01910   //  Contact:   S. Kasahara
01911   //
01912 
01913   fUpdateMode = updateMode;
01914   for ( StreamMapConstItr citr = fStreamMap.begin(); 
01915                           citr!= fStreamMap.end(); ++citr ) 
01916     ((PerInputStream*)(citr -> second)) -> SetUpdateMode(updateMode);
01917 
01918 }
01919 
01920 //.....................................................................
01921 
01922 void PerInputStreamManager::UpdateTreeFormula() {
01923   //  Purpose:  Private method, called by IsSelectedSet when selection
01924   //            cut has been applied globally.
01925   //
01926   //  Return:  none.
01927   //
01928   //  Contact:   S. Kasahara
01929   //
01930 
01931   if ( fGlobalSelection.empty() ) return;
01932   if ( this -> GetNumStreamOpen() <= 0 ) {
01933     if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01934     fFormulaMap.clear();
01935     return;
01936   }
01937 
01938   bool rebuild = false;
01939   if ( !fTTreeFormula ) rebuild = true;
01940   else {
01941     // Loop through formula map checking for differences
01942     if ( this -> GetNumStream() != fFormulaMap.size() ) rebuild = true;
01943     else {
01944       for ( StreamMapConstItr citr = fStreamMap.begin();
01945                               citr!= fStreamMap.end(); ++citr) {
01946         PerInputStream* stream = (PerInputStream*)(citr->second);
01947         std::string streamname = citr->first;
01948         if ( fFormulaMap[streamname] != stream->GetFullFilePathName() ) {
01949           rebuild = true;
01950           break;
01951         }
01952       }
01953     }
01954   }
01955 
01956   if ( !rebuild ) return;  
01957   //  if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01958   // If file boundary has been crossed on one of the streams, must
01959   // update treeformula
01960   fFormulaMap.clear();
01961   std::string minstreamnm = "";
01962   Int_t minentries = -1; 
01963   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01964     PerInputStream* stream = (PerInputStream*)(citr -> second);
01965     if ( stream->fTTree != 0  && 
01966         (stream->GetNumEntries() < minentries || minstreamnm.empty()) ) {
01967       minentries  = stream->GetNumEntries();
01968       minstreamnm = citr->first;
01969     }
01970     fFormulaMap.insert(make_pair(citr->first,stream->GetFullFilePathName()));
01971   }
01972 
01973 
01974   // Smallest tree becomes master, else root will complain
01975   TTree* mastertree = ((PerInputStream*)fStreamMap[minstreamnm])->fTTree;
01976   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01977     if ( citr->first == minstreamnm ) continue;
01978     PerInputStream* stream = (PerInputStream*)(citr->second);
01979     if ( stream->fTTree != 0 ) mastertree -> AddFriend(stream->fTTree);
01980   }
01981   if ( !fTTreeFormula ) {
01982     fTTreeFormula 
01983     = new TTreeFormula("PerMgrSelection",fGlobalSelection.c_str(),mastertree);
01984   }
01985   else {
01986     fTTreeFormula -> SetTree(mastertree);
01987     fTTreeFormula -> UpdateFormulaLeaves();
01988   }
01989   if ( fTTreeFormula -> GetNdim() <= 0 ) {
01990     // TTreeformula build failed
01991     delete fTTreeFormula; fTTreeFormula = 0;
01992     fFormulaMap.clear();
01993   }
01994 
01995   for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01996     if ( citr->first == minstreamnm ) continue;
01997     PerInputStream* stream = (PerInputStream*)(citr->second);
01998     if ( stream->fTTree != 0 ) mastertree -> RemoveFriend(stream->fTTree);
01999   }
02000 
02001   return;
02002 
02003 }

Generated on Mon Feb 15 11:07:19 2010 for loon by  doxygen 1.3.9.1