#include <PerInputStreamManager.h>
Inheritance diagram for PerInputStreamManager:

Public Member Functions | |
| PerInputStreamManager () | |
| virtual | ~PerInputStreamManager () |
| VldContext | GetCurrentVld () const |
| VldContext | GetLastEntryVld () const |
| PerInputStream * | GetOpenedStream (std::string streamname) const |
| UInt_t | GetMaxSyncDelay () const |
| bool | IsBegin () const |
| bool | IsEnd () const |
| bool | IsFileEnd () const |
| bool | IsValidSelectionString () const |
| std::ostream & | Print (std::ostream &s, const char *option="") const |
| void | CloseFile (string streamName="*") |
| void | CloseStream (string streamName="*") |
| Int_t | Get (MomNavigator *mom) |
| Int_t | Next (MomNavigator *mom=0, UInt_t advanceby=1) |
| PerInputStream * | OpenStream (std::string streamName, std::string treeName) |
| Int_t | Previous (MomNavigator *mom=0, UInt_t rewindby=1) |
| VldContext | RecordsAt (MomNavigator *mom, const VldContext &vld) |
| bool | SetFile (string streamName, string fullFilePathName, Per::EAccessMode accessmode) |
| void | SetFileEnd (bool isFileEnd=true) |
| void | SetMaxSyncDelay (UInt_t maxSyncDelay) |
| void | SetSelection (std::string streamName="*", std::string selection="", bool isRequired=false) |
| void | SetSequenceMode (std::string streamName="*", Per::ESequenceMode seqMode=Per::kKey) |
| void | SetPerOwnedDisabled (std::string streamName="*", bool perOwnedDisabled=true) |
| void | SetWindow (std::string streamName="*", double lower=0, double upper=0) |
| void | SetUpdateMode (bool updatemode) |
| void | SetMaxFileRepeat (std::string streamName, int numRepeat=0) |
| void | SetMeanMom (std::string streamName="*", double mm=0.) |
| void | SetPushRandom (std::string streamName="*", bool tf=true) |
| void | SetRandomSeed (int rSeed=0) |
| int | AddFile (std::string fullfilepathname, int at=-1, std::string streamname="*") |
| int | GoToFile (int n, std::string streamname="*") |
| int | GoToFile (std::string fullfilepathname, std::string streamname="*") |
| int | NextFile (int n=1, std::string streamname="*") |
| int | PrevFile (int n=1, std::string streamname="*") |
| int | RemoveFile (std::string fullfilepathname="*", std::string streamname="*") |
| int | IsBeginOfFiles (std::string streamname="*") const |
| int | IsEndOfFiles (std::string streamname="*") const |
| std::ostream & | ListFile (std::ostream &os, std::string streamname="*") const |
Private Member Functions | |
| int | AdvanceLowerBoundTags (const VldContext &vld) |
| int | AdvanceWindowTags (const VldContext &vld) |
| int | AdvanceSequentialTags () |
| int | AdvanceRecordTags () |
| int | LoadRecord (MomNavigator *mom) |
| int | LoadRecordWithTag (const PerInputStream &instream, const PerRecordTags &tag, MomNavigator *mom) |
| void | RemoveAllFragments (MomNavigator *mom, std::string streamName) |
| void | RemoveFragmentsNotInWindow (MomNavigator *mom, std::string streamName, const VldContext &loVld, const VldContext &hiVld) |
| int | RewindRecordTags () |
| VldContext | GetCurrentKeyVld (bool isMin=true) const |
| void | UpdateTreeFormula () |
| void | SetCurrentVld (const VldContext &vld) |
| bool | IsSelectedSet () |
Private Attributes | |
| VldContext | fCurrentVld |
| std::string | fGlobalSelection |
| TTreeFormula * | fTTreeFormula |
| std::map< std::string, std::string > | fFormulaMap |
| bool | fUpdateMode |
| UInt_t | fMaxSyncDelay |
| bool | fIsNewCurrentVldToSelect |
| bool | fIsSelectedSet |
| std::map< std::string, VldContext > | fLastServed |
| TRandom3 * | fRanGen |
| int | fRandomSeed |
| OWNED HERE BUT USED BY PERINPUTSTREAMs. | |
| bool | fRandomOverride |
|
|
Definition at line 1213 of file PerInputStreamManager.cxx. References fRandomSeed, fRanGen, and MSG. 01213 : fCurrentVld(Per::GetVldEnd()), 01214 fGlobalSelection(""),fTTreeFormula(0),fFormulaMap(), 01215 fUpdateMode(false),fMaxSyncDelay(15), fIsNewCurrentVldToSelect(true), 01216 fIsSelectedSet(false), fRandomSeed(0), fRandomOverride(false) { 01217 // 01218 // Purpose: Default constructor. 01219 // 01220 // Arguments: none. 01221 // 01222 // Return: n/a. 01223 // 01224 // Contact: S. Kasahara 01225 // 01226 fRanGen = new TRandom3(0); 01227 MSG("Per",Msg::kDebug) 01228 << "PerInputStreamManager ctor : Random Seed set to " 01229 << fRandomSeed << endl; 01230 }
|
|
|
Definition at line 1234 of file PerInputStreamManager.cxx. References fFormulaMap, fRanGen, and fTTreeFormula. 01234 {
01235 //
01236 // Purpose: Default constructor.
01237 //
01238 // Arguments: none.
01239 //
01240 // Return: n/a.
01241 //
01242 // oCntact: S. Kasahara
01243 //
01244
01245 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01246 fFormulaMap.clear();
01247
01248 if ( fRanGen ) delete fRanGen; fRanGen = 0;
01249
01250 }
|
|
||||||||||||||||
|
Definition at line 45 of file PerInputStreamManager.cxx. References PerStreamManager::GetNumStreamOpen(), GetOpenedStream(), MSG, and SetCurrentVld(). Referenced by IoInputStreamItr::AddFile(), PerValidate::StreamFileAdd(), and PerValidate::StreamMgrParallelFileSeq(). 00046 {
00047 //
00048 // Purpose: Add file to file list at position at of specified stream(s).
00049 //
00050 // Arguments: fullfilepathname
00051 // at = position in file list (-1 => end of list)
00052 // streamname (if "*" (default), apply to all streams)
00053 //
00054 // Return: number of streams on which action is successfully applied.
00055 //
00056 // Contact: S. Kasahara
00057
00058 int oldnstream = this -> GetNumStreamOpen();
00059
00060 int nstream = 0;
00061 if ( streamname == "*" ) {
00062 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00063 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00064 nstream += instream -> AddFile(fullfilepathname,at);
00065 }
00066 }
00067 else {
00068 PerInputStream* instream = this -> GetOpenedStream(streamname);
00069 if ( instream ) nstream += instream -> AddFile(fullfilepathname,at);
00070 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00071 << endl;
00072 }
00073 // reset if first, otherwise leave it where it is
00074 if ( oldnstream == 0 && nstream > 0 ) SetCurrentVld(Per::GetVldBegin());
00075
00076 return nstream;
00077
00078 }
|
|
|
Definition at line 82 of file PerInputStreamManager.cxx. References PerInputStream::GetSequenceMode(). Referenced by AdvanceRecordTags(), and RewindRecordTags(). 00082 {
00083 // Purpose: Advance record tags in Per::kLowerBound streams to
00084 // be equal to or less than the keyVld given in the argument.
00085 //
00086 // Argument: none.
00087 //
00088 // Return: 1 if successful, else 0 (=> waiting for new records on streams
00089 // when reading in update mode).
00090 //
00091 // Contact: S. Kasahara
00092 //
00093 // Notes: To return successful when in update mode requires that the
00094 // method is able to see at least one record set with validity
00095 // equal to or greater than the specified input validity, or that
00096 // the method can see that the open file has been closed by
00097 // the writer.
00098 //
00099
00100 int nFail = 0;
00101 for ( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end(); ++sitr ){
00102 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00103 if ( instream->GetSequenceMode() != Per::kLowerBound ) continue;
00104 if ( !instream -> AdvanceLowerBoundTags(keyVld) ) nFail++;
00105 }
00106
00107 if ( nFail ) return 0;
00108 return 1;
00109
00110 }
|
|
|
Definition at line 163 of file PerInputStreamManager.cxx. References AdvanceLowerBoundTags(), AdvanceWindowTags(), fCurrentVld, PerInputStream::fLastEntryVld, fLastServed, fMaxSyncDelay, fUpdateMode, VldTimeStamp::GetSec(), PerInputStream::GetSequenceMode(), VldContext::GetTimeStamp(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), PerRecordTags::IsBegin(), PerRecordTags::IsEnd(), PerInputStream::IsEndOfFiles(), PerInputStream::IsFileEnd(), MSG, PerInputStream::NextFile(), PerInputStream::NextTags(), RewindRecordTags(), and SetCurrentVld(). Referenced by Next(), and RecordsAt(). 00163 {
00164 // Purpose: Advance record tags in managed streams to one past
00165 // fCurrentVld.
00166 //
00167 // Argument: none.
00168 //
00169 // Return: 1 if successful, else 0 (=> end of all
00170 // streams or waiting for new records on streams when
00171 // reading in update mode).
00172 //
00173 // Contact: S. Kasahara
00174 //
00175 // Notes: This method checks each stream's current record set to see if the
00176 // set has a vld that matches or is less than fCurrentVld. If so,
00177 // it advances one record set on that stream.
00178 // At the end of the method, if advance has been successful,
00179 // fCurrentVld is set to point to the lowest validity
00180 // of all the current stream record tags, or Per::GetVldEnd()
00181 // if at the end of all streams.
00182
00183 int nFail = 0;
00184 VldContext newVld = Per::GetVldEnd();
00185 int nKey = 0;
00186 int nSeq = 0;
00187 bool allSeqDone = true;
00188
00189 for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
00190 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00191 if ( instream->GetSequenceMode() != Per::kKey ) continue;
00192 nKey++;
00193 PerRecordTags tags = instream -> GetTags();
00194 VldContext lastvld = fLastServed[sitr->first];
00195 bool isdone = false;
00196 while ( !isdone ) {
00197 isdone = true;
00198 if ( tags.IsBegin() || (tags.GetVldContext() <= fCurrentVld
00199 || lastvld == tags.GetVldContext())
00200 || (tags.IsEnd() && !instream->IsEndOfFiles()) ) {
00201 tags = instream->NextTags();
00202 if ( tags.IsEnd() ) {
00203 if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00204 else if (instream->NextFile()) isdone = false;
00205 }
00206 }
00207 }
00208 if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00209 }
00210 // if there is no kKey stream, look for a kSequential stream
00211 if (!nKey) {
00212 for ( StreamMapItr sitr=fStreamMap.begin();
00213 sitr != fStreamMap.end();++sitr ){
00214 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00215 if ( instream->GetSequenceMode() != Per::kSequential ) continue;
00216 nSeq++;
00217 PerRecordTags tags = instream -> GetTags();
00218 VldContext lastvld = fLastServed[sitr->first];
00219 bool isdone = false;
00220 while ( !isdone ) {
00221 isdone = true;
00222 if ( tags.IsBegin() ||
00223 ( tags.IsEnd() && !instream->IsEndOfFiles() ) ) {
00224 tags = instream->NextTags();
00225 if ( tags.IsEnd() ) {
00226 if ( fUpdateMode && !instream->IsFileEnd() ) nFail++;
00227 else if (instream->NextFile()) isdone = false;
00228 }
00229 }
00230 }
00231 if ( tags.GetVldContext() < newVld ) newVld = tags.GetVldContext();
00232 if ( !( tags.IsEnd() && instream->IsEndOfFiles() ) ) allSeqDone = false;
00233 } // end for loop over the stream map
00234 }
00235
00236 if ( !nKey && !nSeq) {
00237 // Must be at least one key stream in list of managed streams
00238 MSG("Per",Msg::kWarning)
00239 << "No stream of sequence mode Per::kKey or Per::kSequential "
00240 << "found.\n Must be at least one key or sequential stream to "
00241 << "facilitate sequencing."
00242 << std::endl;
00243 return 0;
00244 }
00245
00246 // Now sequence any Per::kLowerBound or Per:kWindow streams relative
00247 // to newVld (kSequential and kRandom streams advance themselves)
00248 if ( !this -> AdvanceLowerBoundTags(newVld) ) nFail++;
00249 if ( !this -> AdvanceWindowTags(newVld) ) nFail++;
00250
00251 if ( nFail ) {
00252 // One or more streams failed to advance but file is not closed and
00253 // update mode has been requested (e.g. client is dispatcher)
00254 // Check to see if we should declare a successful advance anyway because
00255 // one or more streams indicate that the condition of fMaxSyncDelay has
00256 // been met.
00257 MSG("Per",Msg::kVerbose) << nFail << " streams failed to advance\n"
00258 << " checking time to tree end against maxsyncdelay "
00259 << fMaxSyncDelay << "." << endl;
00260
00261 Int_t maxTimeToTreeEnd = fMaxSyncDelay - 1;
00262 for( StreamMapItr sitr=fStreamMap.begin(); sitr!=fStreamMap.end();++sitr ){
00263 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00264 if ( instream -> IsOpen() ) {
00265 Int_t timeToTreeEnd = 0;
00266 if ( instream -> fLastEntryVld.IsValid() ) {
00267 timeToTreeEnd = (Int_t)(instream->fLastEntryVld.GetTimeStamp().GetSec())
00268 - (Int_t)(newVld.GetTimeStamp().GetSec());
00269 }
00270 MSG("Per",Msg::kVerbose) << sitr->first << " time to end "
00271 << timeToTreeEnd << "(sec)" << endl;
00272 maxTimeToTreeEnd = TMath::Max(maxTimeToTreeEnd,timeToTreeEnd);
00273 if ( timeToTreeEnd < (Int_t)fMaxSyncDelay ) instream -> UpdateTree();
00274 }
00275 }
00276 if ( maxTimeToTreeEnd < (Int_t)fMaxSyncDelay ) {
00277 if ( fCurrentVld != newVld ) {
00278 SetCurrentVld(newVld);
00279 this -> RewindRecordTags(); // to push back to where we were
00280 }
00281 return 0; // failure
00282 }
00283 }
00284
00285 for ( StreamMapItr sitr = fStreamMap.begin();
00286 sitr != fStreamMap.end(); ++sitr) {
00287 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00288 if ( instream->GetSequenceMode() != Per::kKey &&
00289 instream->GetSequenceMode() != Per::kSequential ) continue;
00290 PerRecordTags tags = instream -> GetTags();
00291 if ( tags.GetVldContext() == newVld ) {
00292 fLastServed[sitr->first] = newVld;
00293 }
00294 }
00295
00296 if ( (newVld == fCurrentVld && !nSeq ) ) {
00297 return 0; // failure
00298 }
00299
00300 if ( nSeq && allSeqDone ) {
00301 return 0; // failure
00302 }
00303
00304 SetCurrentVld(newVld); // equals Per::GetVldEnd() when end of all streams reached
00305 return 1; // success
00306
00307 }
|
|
|
Definition at line 138 of file PerInputStreamManager.cxx. References PerInputStream::GetSequenceMode(). 00138 {
00139 // Purpose: Attempt to advance record tags in Per::kSequential &
00140 // Per::kRandom streams to make the stream active and at an acceptable
00141 // record regardless of VldContext.
00142 //
00143 // Argument: None.
00144 //
00145 // Return: 1 if successful, else 0 (=> waiting for new records on streams
00146 // when reading in update mode).
00147 //
00148
00149 int nFail = 0;
00150 for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00151 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00152 if (instream->GetSequenceMode() != Per::kSequential &&
00153 instream->GetSequenceMode() != Per::kRandom ) continue;
00154 if ( !instream -> AdvanceTagsList() ) nFail++;
00155 }
00156
00157 if (nFail) return 0;
00158 return 1;
00159 }
|
|
|
Definition at line 114 of file PerInputStreamManager.cxx. References PerInputStream::GetSequenceMode(). Referenced by AdvanceRecordTags(), and RewindRecordTags(). 00114 {
00115 // Purpose: Attempt to advance record tags in Per::kWindow streams
00116 // such that their VldContext is in the window around the key
00117 // VldContext. The window is defined in the individual streams.
00118 //
00119 // Argument: VldContext of the key record.
00120 //
00121 // Return: 1 if successful, else 0 (=> waiting for new records on streams
00122 // when reading in update mode).
00123 //
00124
00125 int nFail = 0;
00126 for ( StreamMapItr sitr=fStreamMap.begin();sitr!=fStreamMap.end(); sitr++ ) {
00127 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
00128 if (instream->GetSequenceMode() != Per::kWindow) continue;
00129 if ( !instream -> AdvanceWindowTags(keyVld) ) nFail++;
00130 }
00131
00132 if (nFail) return 0;
00133 return 1;
00134 }
|
|
|
Reimplemented from PerStreamManager. Definition at line 311 of file PerInputStreamManager.cxx. References PerStreamManager::CloseFile(), PerStreamManager::GetNumStreamOpen(), and SetCurrentVld(). Referenced by DemoInputModule::EndFile(), PerValidate::StreamMgrFileChangeSeq(), and PerValidate::StreamMgrParallelFileSeq(). 00311 {
00312 // Purpose: Close current file serving specified stream(s).
00313 //
00314 // Argument: streamName string name of stream on which to close file.
00315 // if streamname="*" (default), all
00316 // streams will have their files closed.
00317 //
00318 // Return: none.
00319 //
00320 // Contact: S. Kasahara
00321 //
00322
00323 PerStreamManager::CloseFile(streamName);
00324 // When all files have been closed, reset fCurrentVld to reflect this
00325 if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd());
00326
00327 }
|
|
|
Reimplemented from PerStreamManager. Definition at line 331 of file PerInputStreamManager.cxx. References PerStreamManager::CloseStream(), fLastServed, PerStreamManager::GetNumStreamOpen(), and SetCurrentVld(). Referenced by IoInputStreamItr::DefineStream(), DemoInputModule::EndJob(), DDSChildServer::Shutdown(), IoInputStreamItr::Streams(), DDSChildServer::Subscribe(), DDSChildServer::~DDSChildServer(), and IoInputStreamItr::~IoInputStreamItr(). 00331 {
00332 // Purpose: Close specified stream(s).
00333 //
00334 // Argument: streamName string name of stream to close.
00335 // if streamName="*" (default), all
00336 // streams will be closed.
00337 //
00338 // Return: none.
00339 //
00340 // Contact: S. Kasahara
00341 //
00342
00343 PerStreamManager::CloseStream(streamName);
00344 // When all files have been closed, reset fCurrentVld to reflect this
00345 if ( this->GetNumStreamOpen() <= 0 ) SetCurrentVld(Per::GetVldEnd());
00346 if ( streamName == "*" ) fLastServed.clear();
00347 else {
00348 std::map<std::string,VldContext>::iterator itr
00349 = fLastServed.find(streamName);
00350 if ( itr != fLastServed.end() ) fLastServed.erase(itr);
00351 }
00352
00353 }
|
|
|
Definition at line 357 of file PerInputStreamManager.cxx. References IsSelectedSet(), and LoadRecord(). Referenced by DDSChildServer::Get(), IoInputStreamItr::LoadRecords(), RemoveAllFragments(), and RemoveFragmentsNotInWindow(). 00357 {
00358 // Purpose: Retrieve current record set from managed record streams
00359 // with no advancing.
00360 //
00361 // Argument: mom pointer to MomNavigator
00362 //
00363 // Return: number of records retrieved.
00364 //
00365 // Contact: S. Kasahara
00366 //
00367 // Notes: If one record in a record set (sharing common vldcontext) is
00368 // rejected due to a selection cut, then entire set is rejected.
00369 //
00370
00371 if ( !mom ) return 0;
00372
00373 Int_t nrecord = 0;
00374 if ( IsSelectedSet() ) {
00375 nrecord = this -> LoadRecord(mom);
00376 }
00377
00378 return nrecord;
00379
00380 }
|
|
|
Definition at line 1138 of file PerInputStreamManager.cxx. References PerInputStream::GetTags(), Per::GetVldBegin(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), and PerInputStream::IsOpen(). Referenced by GoToFile(), NextFile(), and PrevFile(). 01138 {
01139 //
01140 // Purpose: Determine current minimum or maximum validity of managed
01141 // key stream record block tags
01142 //
01143 // Arguments: isMin = true (default) => return minimum validity, else
01144 // returns max validity
01145 //
01146 // Contact: S. Kasahara
01147 //
01148
01149 VldContext currentVld = Per::GetVldEnd();
01150 if ( !isMin ) currentVld = Per::GetVldBegin();
01151
01152 for( StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++ ) {
01153 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01154 if ( instream && instream->IsOpen() ) {
01155 VldContext tagVld = instream->GetTags().GetVldContext();
01156 if ( isMin && tagVld < currentVld ) currentVld = tagVld;
01157 else if ( !isMin && tagVld > currentVld ) currentVld = tagVld;
01158 }
01159 }
01160
01161 return currentVld;
01162
01163 }
|
|
|
Definition at line 36 of file PerInputStreamManager.h. Referenced by DDSChildServer::Next(). 00036 { return fCurrentVld; }
|
|
|
Definition at line 384 of file PerInputStreamManager.cxx. References PerInputStream::GetLastEntryVld(), and Per::GetVldBegin(). Referenced by DDSChildServer::Next(). 00384 {
00385 //
00386 // Purpose: Return last entry vld of any open stream. This is useful
00387 // when reading record sets from an open file, e.g. when
00388 // used in context of dispatcher, to determine how close we
00389 // are to the file end.
00390 //
00391
00392 VldContext lastentryvld = Per::GetVldBegin();
00393 for (StreamMapConstItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); itr++){
00394 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00395 instream -> UpdateTree();
00396 if ( instream -> GetLastEntryVld() > lastentryvld )
00397 lastentryvld = instream->GetLastEntryVld();
00398 }
00399
00400 return lastentryvld;
00401
00402 }
|
|
|
Definition at line 39 of file PerInputStreamManager.h. 00039 { return fMaxSyncDelay; }
|
|
|
Reimplemented from PerStreamManager. Definition at line 131 of file PerInputStreamManager.h. References PerStreamManager::GetOpenedStream(). Referenced by AddFile(), IoInputStreamItr::AddFile(), IoInputStreamItr::DefineStream(), GoToFile(), IsBeginOfFiles(), IsEndOfFiles(), ListFile(), NextFile(), PrevFile(), RemoveFile(), IoInputStreamItr::Select(), SetMaxFileRepeat(), IoInputStreamItr::SetMaxFileRepeat(), SetMeanMom(), IoInputStreamItr::SetMeanMom(), SetPerOwnedDisabled(), IoInputStreamItr::SetPerOwnedDisabled(), SetPushRandom(), IoInputStreamItr::SetPushRandom(), SetSelection(), SetSequenceMode(), IoInputStreamItr::SetSequenceMode(), IoInputStreamItr::SetTestMode(), SetWindow(), IoInputStreamItr::SetWindow(), IoInputStreamItr::Streams(), PerValidate::StreamTagsSeq(), and DDSChildServer::Subscribe(). 00131 {
00132 return (dynamic_cast<PerInputStream*>
00133 (PerStreamManager::GetOpenedStream(streamname))); }
|
|
||||||||||||
|
Definition at line 463 of file PerInputStreamManager.cxx. References GetCurrentKeyVld(), GetOpenedStream(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), GoToFile(), MSG, and SetCurrentVld(). 00464 {
00465 //
00466 // Purpose: Move to specified file in file list for requested stream(s).
00467 //
00468 // Arguments: fullfilepathname = file name(if "" move to first file in list)
00469 // streamname = name of stream to apply movement. if "*"(default)
00470 // will apply to all managed streams.
00471 //
00472 // Return: number of streams on which action is successfully applied.
00473 //
00474 // Contact: S. Kasahara
00475 //
00476 // Notes: After moving to file, the current record ptr is positioned at
00477 // the first record of the file.
00478
00479 VldContext newVld = Per::GetVldEnd();
00480
00481 int nstream = 0;
00482 if ( streamname == "*" ) {
00483 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00484 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00485 if ( instream -> GoToFile(fullfilepathname) ) {
00486 nstream++;
00487 instream -> NextTags(); // must advance to first record of file
00488 newVld = instream->GetTags().GetVldContext();
00489 }
00490 }
00491 }
00492 else {
00493 PerInputStream* instream = this -> GetOpenedStream(streamname);
00494 if ( instream ) {
00495 if ( instream -> GoToFile(fullfilepathname) ) {
00496 nstream++;
00497 instream -> NextTags(); // must advance to first record of file
00498 newVld = instream->GetTags().GetVldContext();
00499 }
00500 }
00501 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00502 << endl;
00503 }
00504
00505 if ( nstream ) {
00506 // Setting vld to lower or upper bound is determined by movement direction
00507 if ( newVld > fCurrentVld ) {
00508 VldContext minVld = this -> GetCurrentKeyVld(true);
00509 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00510 }
00511 else {
00512 VldContext maxVld = this -> GetCurrentKeyVld(false);
00513 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00514 }
00515 }
00516
00517 return nstream;
00518
00519 }
|
|
||||||||||||
|
Definition at line 406 of file PerInputStreamManager.cxx. References GetCurrentKeyVld(), GetOpenedStream(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), MSG, and SetCurrentVld(). Referenced by GoToFile(), IoInputStreamItr::GoToFile(), PerValidate::StreamFileGoToByIndex(), and PerValidate::StreamFileGoToByName(). 00406 {
00407 //
00408 // Purpose: Move to nth files in file list for requested stream(s).
00409 //
00410 // Arguments: n = file number (0 is first)
00411 // streamname = name of stream to apply movement. if "*"(default)
00412 // will apply to all managed streams.
00413 //
00414 // Return: number of streams on which action is successfully applied.
00415 //
00416 // Contact: S. Kasahara
00417 //
00418
00419 VldContext newVld = Per::GetVldEnd();
00420
00421 int nstream = 0;
00422 if ( streamname == "*" ) {
00423 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00424 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00425 if ( instream -> GoToFile(n) ) {
00426 nstream++;
00427 instream -> NextTags(); // must advance to first record of file
00428 newVld = instream->GetTags().GetVldContext();
00429 }
00430 }
00431 }
00432 else {
00433 PerInputStream* instream = this -> GetOpenedStream(streamname);
00434 if ( instream ) {
00435 if ( instream -> GoToFile(n) ) {
00436 nstream++;
00437 instream -> NextTags(); // must adavnce to first record of file
00438 newVld = instream->GetTags().GetVldContext();
00439 }
00440 }
00441 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00442 << endl;
00443 }
00444
00445 if ( nstream ) {
00446 // Setting vld to lower or upper bound is determined by movement direction
00447 if ( newVld > fCurrentVld ) {
00448 VldContext minVld = this -> GetCurrentKeyVld(true);
00449 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
00450 }
00451 else {
00452 VldContext maxVld = this -> GetCurrentKeyVld(false);
00453 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
00454 }
00455 }
00456
00457 return nstream;
00458
00459 }
|
|
|
Definition at line 40 of file PerInputStreamManager.h. References fCurrentVld, and Per::IsBegin(). Referenced by DemoInputModule::IsBegin(), IsSelectedSet(), LoadRecord(), and Previous(). 00040 {return Per::IsBegin(fCurrentVld); }
|
|
|
Definition at line 523 of file PerInputStreamManager.cxx. References GetOpenedStream(), and MSG. Referenced by IoInputStreamItr::PrevFile(). 00523 {
00524 // Purpose: Return number of streams that have reached begin of filelist.
00525 //
00526 // Argument: name of stream to check status. If "*" (default), check all
00527 // managed streams
00528 //
00529 // Contact: S. Kasahara
00530 //
00531
00532 int nbegin = 0;
00533
00534 if ( streamname != "*" ) {
00535 PerInputStream* instream = this -> GetOpenedStream(streamname);
00536 if ( instream ) {
00537 nbegin = instream -> IsBeginOfFiles();
00538 }
00539 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00540 << endl;
00541 }
00542 else {
00543 for ( StreamMapConstItr citr = fStreamMap.begin();
00544 citr!= fStreamMap.end(); ++citr ) {
00545 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00546 nbegin += instream -> IsBeginOfFiles();
00547 }
00548 }
00549
00550 return nbegin;
00551
00552 }
|
|
|
Definition at line 41 of file PerInputStreamManager.h. References fCurrentVld, and Per::IsEnd(). Referenced by DemoInputModule::IsEnd(), IsSelectedSet(), LoadRecord(), and Next(). 00041 { return Per::IsEnd(fCurrentVld); }
|
|
|
Definition at line 556 of file PerInputStreamManager.cxx. References GetOpenedStream(), and MSG. Referenced by IoInputStreamItr::NextFile(). 00556 {
00557 // Purpose: Return number of streams that have reached end of filelist.
00558 //
00559 // Argument: name of stream to check status. If "*" (default), check all
00560 // managed streams
00561 //
00562 // Contact: S. Kasahara
00563 //
00564
00565
00566 int nend = 0;
00567
00568 if ( streamname != "*" ) {
00569 PerInputStream* instream = this -> GetOpenedStream(streamname);
00570 if ( instream ) {
00571 nend = instream -> IsEndOfFiles();
00572 }
00573 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
00574 << endl;
00575 }
00576 else {
00577 for ( StreamMapConstItr citr = fStreamMap.begin();
00578 citr!= fStreamMap.end(); ++citr ) {
00579 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00580 nend += instream -> IsEndOfFiles();
00581 }
00582 }
00583
00584 return nend;
00585
00586 }
|
|
|
Definition at line 590 of file PerInputStreamManager.cxx. References PerInputStream::IsFileEnd(). Referenced by Next(), and DDSChildServer::Next(). 00590 {
00591 // Purpose: Check if input file has been closed by writer on all
00592 // managed streams.
00593 //
00594 // Return: true if file end detected on all streams.
00595 //
00596 // Contact: S. Kasahara
00597 //
00598 // Notes: Invokes PerInputStream::IsFileEnd for all streams.
00599 // This method is useful for a user (e.g. dispatcher)
00600 // reading open files. Note that detecting file closure
00601 // by the writer does not mean that the user has reached
00602 // the end of the entries in the current record map.
00603 // Use IsEnd()&&IsFileEnd() to determine if the user has
00604 // reached the end of the current record map *and* all
00605 // all files have been closed by the writer.
00606 //
00607
00608 bool isFileEnd = true;
00609
00610 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
00611 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
00612 if (!instream->IsFileEnd()) isFileEnd = false;
00613 }
00614
00615 return isFileEnd;
00616
00617 }
|
|
|
Definition at line 621 of file PerInputStreamManager.cxx. References fGlobalSelection, fIsNewCurrentVldToSelect, fIsSelectedSet, PerInputStream::fSelection, PerStream::fTObject, PerStream::fTTree, PerInputStream::fTTreeFormula, fTTreeFormula, PerRecordTags::GetIndexHi(), PerRecordTags::GetIndexLo(), PerInputStream::GetTags(), PerRecordTags::GetVldContext(), IsBegin(), IsEnd(), PerInputStream::IsRequired(), MSG, PerStream::Reset(), and UpdateTreeFormula(). Referenced by Get(), Next(), Previous(), and RecordsAt(). 00621 {
00622 // Purpose: Check to see if record set corresponding to fCurrentVld
00623 // has passed user specified selection cuts.
00624 //
00625 // Arguments: none.
00626 //
00627 // Return: true if all records passed selection cut.
00628 //
00629 // Contact: S. Kasahara
00630 //
00631 // Notes: If multiple streams have selection cuts applied, the record set
00632 // is rejected based on an AND of the different
00633 // selection cuts, i.e. rejecting one record rejects the entire
00634 // set.
00635
00636 if ( !fIsNewCurrentVldToSelect ) return fIsSelectedSet;
00637
00638 fIsNewCurrentVldToSelect = false;
00639
00640 if ( this -> IsEnd() || this -> IsBegin() ) {
00641 fIsSelectedSet = false;
00642 return fIsSelectedSet;
00643 }
00644
00645 fIsSelectedSet = true;
00646
00647 if ( !fGlobalSelection.empty() ) {
00648 MSG("Per",Msg::kVerbose) << "Testing globally applied selection cut "
00649 << fGlobalSelection << endl;
00650 // Selection cut was applied globally
00651 this -> UpdateTreeFormula();
00652 if ( fTTreeFormula ) {
00653 for(StreamMapConstItr itr =fStreamMap.begin();
00654 itr!=fStreamMap.end();++itr) {
00655 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00656 const PerRecordTags& tags = instream->GetTags();
00657 Int_t idx = -1;
00658 if ( tags.GetVldContext() == fCurrentVld ) idx = tags.GetIndexLo();
00659 TTree* ttree = instream->fTTree;
00660 if ( ttree ) {
00661 ttree -> LoadTree(idx);
00662 if ( !instream->fTObject ) {
00663 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00664 }
00665 }
00666 }
00667 Int_t arraySize = fTTreeFormula->GetNdata();
00668 bool ispass = false;
00669 for ( int jent = 0; jent < arraySize; jent++ ) {
00670 if ( fTTreeFormula -> EvalInstance(jent) != 0 ) ispass = true;
00671 }
00672 if ( !ispass) fIsSelectedSet = false;
00673 for (StreamMapConstItr itr = fStreamMap.begin();
00674 itr!= fStreamMap.end(); ++itr) {
00675 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00676 instream -> Reset(true);
00677 }
00678 }
00679 }
00680 else {
00681 for(StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();++itr) {
00682 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00683 const PerRecordTags& tags = instream->GetTags();
00684 TTreeFormula* treeformula = instream->fTTreeFormula;
00685 if ( tags.GetVldContext() == fCurrentVld ) {
00686 if ( treeformula ) {
00687 MSG("Per",Msg::kVerbose) << "Testing locally applied selection cut "
00688 << instream->fSelection << " on stream " << itr->first << endl;
00689 TTree* ttree = instream -> fTTree;
00690 if ( ttree ) {
00691 for (Int_t idx=tags.GetIndexLo();idx <= tags.GetIndexHi(); idx++) {
00692 ttree -> LoadTree(idx);
00693 if ( !instream->fTObject ) {
00694 instream -> fTBranch -> SetAddress(&(instream->fTObject));
00695 }
00696 Int_t arraySize = treeformula->GetNdata();
00697 bool ispass = false;
00698 for ( int jent = 0; jent < arraySize; jent++ ) {
00699 if ( treeformula -> EvalInstance(jent) != 0 ) ispass = true;
00700 }
00701 if ( !ispass) fIsSelectedSet = false;
00702 instream->Reset(true);
00703 }
00704 }
00705 }
00706 }
00707 // Check to see if record set is missing a record from a required stream
00708 else if ( instream->IsRequired() ) fIsSelectedSet = false;
00709 }
00710
00711 }
00712
00713 if (!fIsSelectedSet)
00714 MSG("Per",Msg::kVerbose) << "Record set failed selection cut" << endl;
00715 else
00716 MSG("Per",Msg::kVerbose) << "Record set passed selection cut" << endl;
00717
00718 return fIsSelectedSet;
00719
00720 }
|
|
|
Definition at line 724 of file PerInputStreamManager.cxx. Referenced by DDSChildServer::Next(). 00724 {
00725 // OR of PerInputStream::IsValidSelectionString() results, e.g.
00726 // if any one stream's application of selection string failed, return
00727 // false
00728
00729 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00730 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00731 bool isValid = instream -> IsValidSelectionString();
00732 if ( !isValid ) return false;
00733 }
00734
00735 return true;
00736
00737 }
|
|
||||||||||||
|
Definition at line 741 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::ListFile(), PerValidate::StreamFileAdd(), PerValidate::StreamFileGoToByIndex(), PerValidate::StreamFileGoToByName(), PerValidate::StreamFileNext(), PerValidate::StreamFilePrev(), PerValidate::StreamFileRemove(), and PerValidate::StreamMgrFileList(). 00742 {
00743 //
00744 // Purpose: Print the contents of the fFileList for requested stream(s).
00745 //
00746 // Arguments: ostream reference to print on and name of stream(s). If
00747 // streamname == "*" (default), lists files on all streams.
00748 //
00749 // Contact: S. Kasahara
00750 //
00751
00752 if ( streamname == "*" ) {
00753 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
00754 os << "Stream " << itr->first << ":" << endl;
00755 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00756 instream -> ListFile(os);
00757 }
00758 }
00759 else {
00760 PerInputStream* instream = this -> GetOpenedStream(streamname);
00761 if ( instream ) {
00762 os << "Stream " << streamname << ":" << endl;
00763 instream -> ListFile(os);
00764 }
00765 else os << "Stream " << streamname << " not open." << endl;
00766 }
00767
00768 return os;
00769
00770 }
|
|
|
Definition at line 774 of file PerInputStreamManager.cxx. References PerInputStream::AdvanceTags(), fCurrentVld, fRandomSeed, fRanGen, PerInputStream::fSumTags, PerInputStream::GetMeanMom(), PerInputStream::GetPushRandom(), Per::GetSequenceMode(), Per::GetVldBegin(), PerRecordTags::GetVldContext(), Per::GetVldEnd(), PerRecordTags::IsBegin(), IsBegin(), PerRecordTags::IsEnd(), IsEnd(), LoadRecordWithTag(), MSG, RemoveAllFragments(), and RemoveFragmentsNotInWindow(). Referenced by Get(), Next(), Previous(), and RecordsAt(). 00774 {
00775 // Purpose: Loads records with fCurrentVld in input MomNavigator
00776 // object.
00777 //
00778 // Argument: pointer to MomNavigator.
00779 //
00780 // Return: Number of records loaded.
00781 //
00782 // Contact: S. Kasahara
00783 //
00784 // Notes: LoadRecord does not apply user selection cuts to the record set.
00785 //
00786
00787 MSG("Per",Msg::kVerbose) << "Load Record Set w/currentVld " << fCurrentVld
00788 << endl;
00789
00790 Int_t nrecord = 0;
00791 Int_t lastNRecord = 0;
00792 if ( !mom || this -> IsBegin() || this -> IsEnd() ) return nrecord;
00793
00794 for ( StreamMapItr itr=fStreamMap.begin(); itr!=fStreamMap.end(); ++itr ) {
00795 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
00796 std::string streamName = itr->first;
00797
00798 if ( instream -> GetSequenceMode() == Per::kKey ) {
00799 const PerRecordTags& tags = instream -> GetTags();
00800 if ( tags.GetVldContext() == fCurrentVld ) {
00801 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00802 }
00803 }
00804 else if ( instream -> GetSequenceMode() == Per::kLowerBound ) {
00805 const PerRecordTags& tags = instream -> GetTags();
00806 // First clean Mom of previously loaded object from this stream if not this tag
00807 VldContext loVld = tags.GetVldContext();
00808 VldContext hiVld = tags.GetVldContext();
00809 this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00810 if ( !tags.IsBegin() && !tags.IsEnd() ) {
00811 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00812 }
00813 }
00814 else if ( instream -> GetSequenceMode() == Per::kWindow ) {
00815 std::vector<PerRecordTags> tagslist = instream -> GetWindowTags();
00816 // First clean Mom of any objects not in range of tags on this list
00817 VldContext loVld = Per::GetVldEnd(); // set in this way all records will be removed
00818 VldContext hiVld = Per::GetVldBegin();
00819 if ( !tagslist.empty() ) {
00820 loVld = (tagslist[0]).GetVldContext();
00821 hiVld = (tagslist[tagslist.size()-1]).GetVldContext();
00822 }
00823 this -> RemoveFragmentsNotInWindow(mom,streamName,loVld,hiVld);
00824 if ( !tagslist.empty() ) {
00825 std::vector<PerRecordTags>::iterator itr;
00826 for ( itr = tagslist.begin(); itr != tagslist.end(); itr++ ) {
00827 const PerRecordTags& tags = *itr;
00828 nrecord += this -> LoadRecordWithTag(*instream,tags,mom);
00829 }
00830 }
00831 }
00832 else if ( instream -> GetSequenceMode() == Per::kSequential ||
00833 instream -> GetSequenceMode() == Per::kRandom ){
00834 // clear Mom of all entries of this sort
00835 this->RemoveAllFragments(mom,streamName);
00836
00837 double meanMom = instream->GetMeanMom();
00838
00839 if (!fRandomOverride) {
00840 fRandomSeed =
00841 int(10000*fStreamMap.size()+100*meanMom+(instream->fSumTags)+42);
00842 fRanGen->SetSeed(fRandomSeed);
00843 }
00844
00845 int numEvts =
00846 ( instream->GetPushRandom() )?
00847 fRanGen->Poisson(meanMom) : int(ceil(meanMom)) ;
00848
00849 int iEvt = 0;
00850 while (iEvt < numEvts) {
00851 const PerRecordTags& tags = instream -> GetTags();
00852 int iSuccess = this -> LoadRecordWithTag(*instream,tags,mom);
00853 iEvt += iSuccess;
00854 nrecord += iSuccess;
00855 if ( !instream->AdvanceTags() ) break;
00856 } // end loop over # events to push
00857 } // end if Per::kSequential || Per::kRandom
00858 lastNRecord = nrecord;
00859 } // end loop over stream map
00860
00861 return nrecord;
00862
00863 }
|
|
||||||||||||||||
|
Definition at line 943 of file PerInputStreamManager.cxx. References fCurrentVld, PerStream::GetClassName(), PerRecordTags::GetFileName(), PerRecordTags::GetIndexHi(), PerRecordTags::GetIndexLo(), PerInputStream::GetSequenceMode(), PerStream::GetStreamName(), RecRecord::GetTempTags(), RecMinos::GetTempTags(), PerRecordTags::GetTreeName(), Registry::LockValues(), MSG, Registry::Set(), and Registry::UnLockValues(). Referenced by LoadRecord(). 00945 {
00946 // Purpose: Loads records from this tag in input MomNavigator
00947 // object.
00948 //
00949 // Argument: pointer to MomNavigator.
00950 //
00951 // Return: Number of records loaded.
00952 //
00953 // Contact: S. Kasahara
00954 //
00955 // Notes: LoadRecord does not apply user selection cuts to the record set.
00956 //
00957
00958 int nrecord = 0;
00959
00960 // Load the records in the tag set
00961 for (Int_t idx = tags.GetIndexLo(); idx <= tags.GetIndexHi(); idx++) {
00962 std::string streamName = instream.GetStreamName();
00963 TObject* object = mom -> GetFragmentByInputTag(streamName.c_str(),
00964 (tags.GetTreeName()).c_str(),idx,(tags.GetFileName()).c_str());
00965 if ( object ) continue; // object already in Mom don't bother
00966 object = (const_cast<PerInputStream&>(instream)).GetObject(idx,false);
00967 if ( object ) {
00968 Registry* temptags = 0;
00969 if ( RecMinos* record = dynamic_cast<RecMinos*>(object) ) {
00970 temptags = &(record->GetTempTags());
00971 if ( instream.GetSequenceMode() != Per::kKey &&
00972 instream.GetSequenceMode() != Per::kSequential &&
00973 instream.GetSequenceMode() != Per::kRandom )
00974 record -> SetTransient(false);
00975 }
00976 else if ( RecRecord* record = dynamic_cast<RecRecord*>(object) ) {
00977 temptags = &(record->GetTempTags());
00978 if ( instream.GetSequenceMode() != Per::kKey &&
00979 instream.GetSequenceMode() != Per::kSequential &&
00980 instream.GetSequenceMode() != Per::kRandom )
00981 record -> SetTransient(false);
00982 }
00983
00984 if ( temptags ) {
00985 // Stamp record with i/o tags
00986 temptags->UnLockValues();
00987 std::string tagname = instream.GetClassName()+".fTempTags";
00988 temptags->SetName(tagname.c_str());
00989 temptags->Set("stream",streamName.c_str());
00990 temptags->Set("tree",(tags.GetTreeName()).c_str());
00991 temptags->Set("index",idx);
00992 temptags->Set("file",(tags.GetFileName()).c_str());
00993 temptags->LockValues();
00994 }
00995 // And add record to Mom, mom now owns record
00996 if ( nrecord == 0) {
00997 MSG("Per",Msg::kVerbose) << "..Loading records with Vld"
00998 << fCurrentVld << " from stream "
00999 << streamName.c_str() << ":" << endl;
01000 }
01001 MSG("Per",Msg::kVerbose) << "....Record " << nrecord << " from"
01002 << " stream " << streamName
01003 << " tree " << tags.GetTreeName()
01004 << " index " << idx
01005 << " file "<< tags.GetFileName()
01006 << endl;
01007 mom -> AdoptFragment(object);
01008
01009 nrecord++;
01010 } // object retrieved
01011 } // loop over tag tree indices
01012
01013 return nrecord;
01014
01015 }
|
|
||||||||||||
|
Definition at line 1019 of file PerInputStreamManager.cxx. References AdvanceRecordTags(), fCurrentVld, fUpdateMode, IsEnd(), IsFileEnd(), IsSelectedSet(), LoadRecord(), MSG, and RewindRecordTags(). Referenced by IoInputStreamItr::Increment(), DemoInputModule::Next(), DDSChildServer::Next(), PerValidate::StreamMgrFileChangeSeq(), PerValidate::StreamMgrParallelFileSeq(), PerValidate::StreamMgrSelectionSeq(), PerValidate::StreamMgrSkipByThreeSeq(), PerValidate::StreamMgrSkipByTwoSeq(), PerValidate::StreamMgrTagsSeq(), and PerValidate::StreamMgrTagsSeqZigZag(). 01019 {
01020 // Purpose: Retrieve next set of records from managed streams of
01021 // common VldContext which satisfy user's
01022 // selection cuts (if specified). The records are
01023 // extracted from the input streams and loaded into the
01024 // input MomNavigator object.
01025 //
01026 // Argument: mom pointer to MomNavigator. (If set 0 (default), the
01027 // method will advance to the record set of interest
01028 // but not load the records. The user can then
01029 // retrieve the record set with the Get method.)
01030 // advanceby number of selected record sets to advance by
01031 // (default = 1). Can be used to skip over record
01032 // sets.
01033 //
01034 // Return: number of selected record sets advanced.
01035 //
01036 // Contact: S. Kasahara
01037 //
01038 // Notes: Entries are subject to any user-specified selection strings
01039 // as described under PerInputStream::GetObject and
01040 // PerInputStreamManager::IsSelectedSet.
01041 //
01042
01043 MSG("Per",Msg::kVerbose) << "Next advance by " << advanceby << " sets."
01044 << " Start at vld " << fCurrentVld << endl;
01045
01046 if ( this -> IsEnd() ) return 0;
01047 VldContext saveCurrentVld = fCurrentVld;
01048
01049 // Search for entry satisfying selection cuts
01050 UInt_t nselect = 0;
01051 while ( nselect < advanceby && this -> AdvanceRecordTags() ) {
01052 MSG("Per",Msg::kVerbose) << "Advanced to vld " << fCurrentVld
01053 << endl;
01054 if ( IsSelectedSet() ) {
01055 nselect++;
01056 if ( nselect >= advanceby ) {
01057 // Load record in Mom
01058 if ( mom ) this -> LoadRecord(mom);
01059 }
01060 }
01061 }
01062
01063 if ( advanceby > 1 ) {
01064 // current vld is left at last record served if in updatemode
01065 if ( nselect < advanceby &&
01066 fUpdateMode &&
01067 !IsFileEnd() ) {
01068 while ( fCurrentVld > saveCurrentVld ) {
01069 this -> RewindRecordTags();
01070 }
01071 // At this point fCurrentVld should be back to original
01072 if ( saveCurrentVld != fCurrentVld ) {
01073 MSG("Per",Msg::kWarning)
01074 << "Failure to rewind to original state in Next method." << endl;
01075 }
01076 nselect = 0;
01077 }
01078 }
01079
01080 return (Int_t)nselect;
01081
01082 }
|
|
||||||||||||
|
Definition at line 1086 of file PerInputStreamManager.cxx. References GetCurrentKeyVld(), GetOpenedStream(), MSG, and SetCurrentVld(). Referenced by IoInputStreamItr::NextFile(), and PerValidate::StreamFileNext(). 01086 {
01087 //
01088 // Purpose: Move stream(s) forward n files in file list. If n exceeds
01089 // number of files to end of list, ptr is left at eof marker.
01090 //
01091 // Arguments: n = number of files to advance (default = 1)
01092 // streamname = name of stream to apply movement. if "*"(default)
01093 // will apply to all managed streams.
01094 //
01095 // Return: number of streams on which action is successfully applied.
01096 //
01097 // Contact: S. Kasahara
01098 //
01099 // Notes: After moving to file, the current record ptr is positioned at
01100 // the first record of the file.
01101
01102 MSG("Per",Msg::kVerbose) << "PerInputStreamManager::NextFile("
01103 << n << "," << streamname << ") called"
01104 << endl;
01105
01106 int nstream = 0;
01107 if ( streamname == "*" ) {
01108 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01109 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01110 if ( instream -> NextFile(n) ) {
01111 nstream++;
01112 instream -> NextTags(); // must advance to first record of file
01113 }
01114 }
01115 }
01116 else {
01117 PerInputStream* instream = this -> GetOpenedStream(streamname);
01118 if ( instream ) {
01119 if ( instream -> NextFile(n) ) {
01120 nstream++;
01121 instream -> NextTags(); // must advance to first record of file
01122 }
01123 }
01124 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01125 << endl;
01126 }
01127
01128 // Recalculate fCurrentVld to point to lower bound of new set of tags
01129 VldContext minVld = this -> GetCurrentKeyVld(true);
01130 if ( minVld > fCurrentVld ) SetCurrentVld(minVld);
01131
01132 return nstream;
01133
01134 }
|
|
||||||||||||
|
Definition at line 1167 of file PerInputStreamManager.cxx. References fLastServed, fUpdateMode, Per::GetDefSequenceMode(), Per::GetVldEnd(), MSG, SetSequenceMode(), and SetUpdateMode(). Referenced by DemoInputModule::BeginJob(), IoInputStreamItr::DefineStream(), PerValidate::RunAllTests(), IoInputStreamItr::Streams(), and DDSChildServer::Subscribe(). 01168 {
01169 //
01170 // Purpose: Open input stream with name streamName serving tree treeName.
01171 //
01172 // Arguments: streamName string name of stream to be opened. Names of
01173 // managed streams must be unique.
01174 // treeName string name of tree served by this stream.
01175 //
01176 // Return: pointer to PerInputStream. If unable to open stream (because
01177 // stream with this streamName has already been opened), returns
01178 // (PerInputStream*)0.
01179 //
01180 // Contact: S. Kasahara
01181 //
01182 // Notes: PerInputStream objects are owned by the PerInputStreamManager
01183 // and should only be deleted through the
01184 // PerStreamManager::CloseStream method.
01185
01186 bool openok = false;
01187
01188 PerInputStream* stream=dynamic_cast<PerInputStream*>(fStreamMap[streamName]);
01189 if ( !stream ) {
01190 // Stream not found in map, need to create new one
01191 stream = new PerInputStream(treeName);
01192 stream -> SetStreamName(streamName);
01193 stream -> SetUpdateMode(fUpdateMode);
01194 Per::ESequenceMode seqMode = Per::GetDefSequenceMode(streamName.c_str());
01195 stream -> SetSequenceMode(seqMode);
01196 fStreamMap[streamName] = stream;
01197 fLastServed[streamName] = Per::GetVldEnd();
01198 openok = true;
01199 }
01200 else {
01201 MSG("Per",Msg::kWarning)
01202 <<"Stream manager failed to open\nrequested stream "
01203 << streamName << " because name conflicts with previously opened stream."
01204 << endl;
01205 }
01206
01207 return (openok) ? stream : (PerInputStream*)0;
01208
01209 }
|
|
||||||||||||
|
Definition at line 1298 of file PerInputStreamManager.cxx. References GetCurrentKeyVld(), GetOpenedStream(), MSG, and SetCurrentVld(). Referenced by IoInputStreamItr::PrevFile(), RewindRecordTags(), PerValidate::StreamFileNext(), and PerValidate::StreamFilePrev(). 01298 {
01299 //
01300 // Purpose: Move stream(s) back n files in file list. If n exceeds
01301 // number of files to beginning of list, ptr is left at first
01302 // file.
01303 //
01304 // Arguments: n = number of files to rewind (default = 1)
01305 // streamname = name of stream to apply movement. if "*"(default)
01306 // will apply to all managed streams.
01307 //
01308 // Return: number of streams on which action is successfully applied.
01309 //
01310 // Contact: S. Kasahara
01311 //
01312 // Notes: After moving to file, the current record ptr is positioned at
01313 // the first record of the file.
01314
01315 int nstream = 0;
01316 if ( streamname == "*" ) {
01317 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01318 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01319 if ( instream -> PrevFile(n) ) {
01320 nstream++;
01321 instream -> NextTags(); // position at first record of file
01322 }
01323 }
01324 }
01325 else {
01326 PerInputStream* instream = this -> GetOpenedStream(streamname);
01327 if ( instream ) {
01328 if ( instream -> PrevFile(n) ) {
01329 nstream++;
01330 instream -> NextTags(); // position at first record of file
01331 }
01332 }
01333 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01334 << endl;
01335 }
01336
01337 // Recalculate fCurrentVld to point to upper bound of new set of tags
01338 VldContext maxVld = this -> GetCurrentKeyVld(false);
01339 if ( maxVld < fCurrentVld ) SetCurrentVld(maxVld);
01340
01341 return nstream;
01342
01343 }
|
|
||||||||||||
|
Definition at line 1254 of file PerInputStreamManager.cxx. References IsBegin(), IsSelectedSet(), LoadRecord(), and RewindRecordTags(). Referenced by IoInputStreamItr::Decrement(), DDSChildServer::Next(), DemoInputModule::Previous(), PerValidate::StreamMgrFileChangeSeq(), PerValidate::StreamMgrParallelFileSeq(), PerValidate::StreamMgrSelectionSeq(), PerValidate::StreamMgrSkipByThreeSeq(), PerValidate::StreamMgrSkipByTwoSeq(), PerValidate::StreamMgrTagsSeq(), and PerValidate::StreamMgrTagsSeqZigZag(). 01254 {
01255 // Purpose: Retrieve previous set of records from managed streams of
01256 // common VldContext which satisfy user's
01257 // selection cuts (if specified). The records are
01258 // extracted from the input streams and loaded into the
01259 // input MomNavigator object.
01260 //
01261 // Argument: mom pointer to MomNavigator (default=0 means
01262 // streams are rewound but no record set is loaded.
01263 // The user can then use Get to retrieve the record
01264 // set.)
01265 // rewindby number of selected record sets to rewind by
01266 // (default = 1). Can be used to skip over record
01267 // sets.
01268 //
01269 // Return: number of record sets rewound.
01270 //
01271 // Contact: S. Kasahara
01272 //
01273 // Notes: Entries are subject to any user-specified selection strings
01274 // as described under PerInputStream::GetObject and
01275 // PerInputStreamManager::IsSelectedSet.
01276 //
01277
01278 if ( this -> IsBegin() ) return 0;
01279
01280 UInt_t nselect = 0;
01281 // Search for entry satisfying selection cuts
01282 while ( nselect < rewindby && this -> RewindRecordTags() ) {
01283 if ( IsSelectedSet() ) {
01284 nselect++;
01285 if ( nselect >= rewindby ) {
01286 // Load record in Mom
01287 if ( mom ) this -> LoadRecord(mom);
01288 }
01289 }
01290 }
01291
01292 return (Int_t)nselect;
01293
01294 }
|
|
||||||||||||
|
Definition at line 1347 of file PerInputStreamManager.cxx. References Per::AsString(), PerStream::GetEntry(), PerStream::GetFullFilePathName(), PerStream::GetNumEntries(), PerInputStream::GetSequenceMode(), PerInputStream::IsFileEnd(), and PerStreamManager::Print(). Referenced by operator<<(), and DemoInputModule::Print(). 01348 {
01349 //
01350 // Purpose: Print status of input stream manager on std::ostream.
01351 //
01352 // Arguments: os std::ostream to display on.
01353 // option verbosity level ("" (default), or "brief")
01354 //
01355 // Return: std::ostream reference.
01356 //
01357 // Contact: S. Kasahara
01358 //
01359
01360 TString opt = option;
01361 opt.ToLower();
01362 if ( opt == "brief" || opt.IsNull() && fPrintOpt == "brief" ) {
01363 Int_t nenabled = 0;
01364 for ( StreamMapConstItr citr = fStreamMap.begin();
01365 citr != fStreamMap.end(); ++ citr ) {
01366 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr->second);
01367 if ( instream -> IsEnabled() && instream -> IsOpen() ) {
01368 os << " " << nenabled << ")" << citr->first << " serving tree "
01369 << instream -> GetTreeName()
01370 << " w/" << instream->GetNumEntries()
01371 << " entries (current entry = " << instream->GetEntry()
01372 << ") using sequence mode "
01373 << Per::AsString(instream->GetSequenceMode())
01374 << "\n from the "
01375 << ((instream->IsFileEnd()) ? "closed file " : "open file ")
01376 << instream->GetFullFilePathName() << "." << endl;
01377 nenabled++;
01378 }
01379 }
01380 }
01381 else {
01382 os << "PerInput";
01383 PerStreamManager::Print(os);
01384 }
01385 return os;
01386
01387 }
|
|
||||||||||||
|
Definition at line 1391 of file PerInputStreamManager.cxx. References AdvanceRecordTags(), fCurrentVld, IsSelectedSet(), LoadRecord(), and RewindRecordTags(). Referenced by IoInputStreamItr::GoTo(), IoInputStreamItr::GoToEOF(), and PerValidate::StreamMgrParallelFileSeq(). 01392 {
01393 // Purpose: Retrieve record set with requested VldContext from managed
01394 // record streams. Record set is subject to selection
01395 // cuts as specified by user. The records are extracted from
01396 // the input streams and loaded into the input MomNavigator
01397 // object. If exact match not found, the record set corresponding
01398 // to the validity just beyond that requested is loaded.
01399 //
01400 // Argument: mom pointer to MomNavigator
01401 // (if == 0, streams are moved to vld but records are
01402 // not loaded)
01403 // vld VldContext of requested record set
01404 //
01405 // Return: VldContext of new record set (= Per::GetVldEnd() or
01406 // Per::GetVldBegin() if end-of-input-streams or
01407 // begin-of-input-streams respectively).
01408 //
01409 // Contact: S. Kasahara
01410 //
01411
01412 if ( vld == fCurrentVld && IsSelectedSet() ) {
01413 if ( mom ) this -> LoadRecord(mom);
01414 return fCurrentVld;
01415 }
01416
01417 if ( vld < fCurrentVld ) {
01418 while ( RewindRecordTags() ) {
01419 if ( vld >= fCurrentVld && IsSelectedSet() ) {
01420 if ( mom ) this -> LoadRecord(mom);
01421 return fCurrentVld;
01422 }
01423 }
01424 }
01425 else {
01426 while ( AdvanceRecordTags() ) {
01427 if ( vld <= fCurrentVld && IsSelectedSet() ) {
01428 if ( mom ) this -> LoadRecord(mom);
01429 return fCurrentVld;
01430 }
01431 }
01432 }
01433
01434 return fCurrentVld; // begin-or-end-of-streams
01435
01436 }
|
|
||||||||||||
|
Definition at line 908 of file PerInputStreamManager.cxx. References Get(), MomNavigator::GetFragmentArray(), and DataUtil::GetTempTags(). Referenced by LoadRecord(). 00909 {
00910 // Purpose: Remove fragments of stream streamName from mom
00911 //
00912 // Argument: pointer to MomNavigator & streamName
00913 //
00914 // Return: none
00915 //
00916 // Contact: S. Kasahara & K. Arms
00917 //
00918
00919 TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00920 for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00921 TObject* oldobject = fragarray -> At(idx);
00922 Registry* oldiotags = 0;
00923 if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00924 oldiotags = &(oldrecord -> GetTempTags());
00925 }
00926 else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00927 oldiotags = &(oldrecord -> GetTempTags());
00928 }
00929 if ( oldiotags ) {
00930 const char* oldtagstream = 0;
00931 if ( oldiotags -> Get("stream",oldtagstream) &&
00932 strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00933 fragarray->RemoveAt(idx);
00934 delete oldobject; oldobject = 0;
00935 } // end if tags are from stream streamName
00936 } // end if tags exist
00937 } // end for loop over mom fragment array
00938
00939 }
|
|
||||||||||||
|
Definition at line 1440 of file PerInputStreamManager.cxx. References GetOpenedStream(), and MSG. Referenced by IoInputStreamItr::RemoveFile(), PerValidate::StreamFileRemove(), and PerValidate::StreamMgrParallelFileSeq(). 01441 {
01442 //
01443 // Purpose: Remove file from file list of specified stream(s).
01444 //
01445 // Arguments: fullfilepathname (if "*" (default), remove all files.)
01446 // streamname (if "*" (default), apply to all streams)
01447 //
01448 // Return: number of streams on which action is successfully applied.
01449 //
01450 // Contact: S. Kasahara
01451 //
01452
01453 int nstream = 0;
01454 if ( streamname == "*" ) {
01455 for (StreamMapConstItr itr=fStreamMap.begin();itr!=fStreamMap.end();itr++){
01456 PerInputStream* instream = dynamic_cast<PerInputStream*>(itr->second);
01457 nstream += instream -> RemoveFile(fullfilepathname);
01458 }
01459 }
01460 else {
01461 PerInputStream* instream = this -> GetOpenedStream(streamname);
01462 if ( instream ) nstream += instream -> RemoveFile(fullfilepathname);
01463 else MSG("Per",Msg::kWarning) << "Stream " << streamname << " not open."
01464 << endl;
01465 }
01466
01467 return nstream;
01468
01469 }
|
|
||||||||||||||||||||
|
Definition at line 867 of file PerInputStreamManager.cxx. References Get(), MomNavigator::GetFragmentArray(), DataUtil::GetTempTags(), and GetVldContext(). Referenced by LoadRecord(). 00869 {
00870 // Purpose: Remove fragments < loVld || > hiVld
00871 //
00872 // Argument: pointer to MomNavigator.
00873 //
00874 // Return: none
00875 //
00876 // Contact: S. Kasahara
00877 //
00878
00879 TObjArray* fragarray=const_cast<TObjArray*>(mom->GetFragmentArray());
00880 for (int idx = fragarray->GetEntriesFast()-1; idx >= 0; idx--) {
00881 TObject* oldobject = fragarray -> At(idx);
00882 Registry* oldiotags = 0;
00883 const VldContext* oldvldc = 0;
00884 if ( RecMinos* oldrecord = dynamic_cast<RecMinos*>(oldobject) ) {
00885 oldiotags = &(oldrecord -> GetTempTags());
00886 oldvldc = oldrecord -> GetVldContext();
00887 }
00888 else if(RecRecord* oldrecord = dynamic_cast<RecRecord*>(oldobject)){
00889 oldiotags = &(oldrecord -> GetTempTags());
00890 oldvldc = &(oldrecord -> GetHeader().GetVldContext());
00891 }
00892 if ( oldiotags ) {
00893 const char* oldtagstream = 0;
00894 if ( oldiotags -> Get("stream",oldtagstream)
00895 && strcmp(streamName.c_str(),oldtagstream) == 0 ) {
00896 if ( oldvldc && (*oldvldc < loVld || *oldvldc > hiVld) ) {
00897 fragarray->RemoveAt(idx);
00898 delete oldobject; oldobject = 0;
00899 }
00900 }
00901 }
00902 }
00903
00904 }
|
|
|
Definition at line 1473 of file PerInputStreamManager.cxx. References AdvanceLowerBoundTags(), AdvanceWindowTags(), fCurrentVld, fLastServed, PerInputStream::GetSequenceMode(), PerInputStream::GetTags(), Per::GetVldBegin(), PerRecordTags::GetVldContext(), PerRecordTags::IsBegin(), PerInputStream::IsBeginOfFiles(), PerRecordTags::IsEnd(), MSG, PrevFile(), PerInputStream::PrevTags(), and SetCurrentVld(). Referenced by AdvanceRecordTags(), Next(), Previous(), and RecordsAt(). 01473 {
01474 // Purpose: Rewind record tags in managed streams to one previous to
01475 // fCurrentVld.
01476 //
01477 // Argument: none.
01478 //
01479 // Return: 1 if success, else 0 (end of all
01480 // streams reached)
01481 //
01482 // Contact: S. Kasahara
01483 //
01484 // Notes: This method checks each stream's current record set to see if the
01485 // set has a vld that matches or exceeds fCurrentVld. If so, it
01486 // rewinds one record set on that stream.
01487 // At the end of the method, if rewind has been successful,
01488 // fCurrentVld is set to point to the highest validity
01489 // of all the current stream record tags, or Per::GetVldBegin()
01490 // if at the beginning of all streams.
01491
01492 VldContext newVld = Per::GetVldBegin();
01493 int nKey = 0;
01494 int nSeq = 0;
01495 bool allSeqDone = true;
01496
01497 for ( StreamMapItr sitr=fStreamMap.begin();sitr != fStreamMap.end();++sitr ){
01498 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01499 if ( instream->GetSequenceMode() != Per::kKey ) continue;
01500 nKey++;
01501 PerRecordTags tags = instream->GetTags();
01502 bool isdone = false;
01503 while ( !isdone ) {
01504 isdone = true;
01505 if ( tags.IsEnd() || tags.GetVldContext() >= fCurrentVld
01506 || (tags.IsBegin() && !instream->IsBeginOfFiles()) ) {
01507 tags = instream->PrevTags();
01508 if ( tags.IsBegin() ) {
01509 if ( instream -> PrevFile() ) isdone = false;
01510 }
01511 }
01512 }
01513 if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01514 }
01515
01516 // if there is no kKey stream, look for a kSequential stream
01517 if ( !nKey ) {
01518 for ( StreamMapItr sitr=fStreamMap.begin();
01519 sitr != fStreamMap.end();++sitr ){
01520 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01521 if ( instream->GetSequenceMode() != Per::kSequential ) continue;
01522 nSeq++;
01523 PerRecordTags tags = instream->GetTags();
01524 bool isdone = false;
01525 while ( !isdone ) {
01526 isdone = true;
01527 if ( tags.IsEnd() ||
01528 ( tags.IsBegin() && !instream->IsBeginOfFiles() ) ) {
01529 tags = instream->PrevTags();
01530 if ( tags.IsBegin() ) {
01531 if ( instream -> PrevFile() ) isdone = false;
01532 }
01533 }
01534 }
01535 if ( tags.GetVldContext() > newVld ) newVld = tags.GetVldContext();
01536 if ( !( tags.IsBegin() && instream->IsBeginOfFiles() ) ) allSeqDone = false;
01537 } // end loop over the stream map
01538 } // end if no kKey stream
01539
01540 if ( !nKey && !nSeq) {
01541 // Must be at least one key stream in list of managed streams
01542 MSG("Per",Msg::kWarning)
01543 << "No stream of sequence mode Per::kKey or Per::kSequential "
01544 << "found.\n Must be at least one key or sequential stream to "
01545 << "facilitate sequencing."
01546 << std::endl;
01547 return 0;
01548 }
01549
01550 if ( fCurrentVld == newVld && !nSeq ) {
01551 return 0; // failure
01552 }
01553
01554 if ( nSeq && allSeqDone ) {
01555 return 0; // failure
01556 }
01557
01558 for ( StreamMapItr sitr = fStreamMap.begin();
01559 sitr != fStreamMap.end(); ++sitr) {
01560 PerInputStream* instream = dynamic_cast<PerInputStream*>(sitr->second);
01561 if ( instream->GetSequenceMode() != Per::kKey &&
01562 instream->GetSequenceMode() != Per::kSequential ) continue;
01563 fLastServed[sitr->first] = Per::GetVldBegin();
01564 }
01565
01566 SetCurrentVld(newVld); //equals Per::GetVldBegin() when begin of all strms reached
01567 this -> AdvanceLowerBoundTags(fCurrentVld);
01568 this -> AdvanceWindowTags(fCurrentVld);
01569 // kSequential and kRandom streams advance themselves as appropriate
01570
01571 return 1;
01572
01573 }
|
|
|
Definition at line 1610 of file PerInputStreamManager.cxx. References fCurrentVld, and fIsNewCurrentVldToSelect. Referenced by AddFile(), AdvanceRecordTags(), CloseFile(), CloseStream(), GoToFile(), NextFile(), PrevFile(), RewindRecordTags(), and SetFile(). 01610 {
01611 // Set fCurrentVld to vld, and mark that IsSelected needs to be
01612 // applied to this validity
01613
01614 if ( vld != fCurrentVld ) fIsNewCurrentVldToSelect = true;
01615 fCurrentVld = vld;
01616
01617 }
|
|
||||||||||||||||
|
Reimplemented from PerStreamManager. Definition at line 1577 of file PerInputStreamManager.cxx. References PerStreamManager::GetNumStreamOpen(), SetCurrentVld(), and PerStreamManager::SetFile(). Referenced by DemoInputModule::BeginFile(), PerValidate::RunAllTests(), PerValidate::StreamMgrFileChangeSeq(), and PerValidate::StreamMgrParallelFileSeq(). 01578 {
01579 // Purpose: Sets new file for specified stream(s).
01580 //
01581 // Argument: streamName string name of stream to set file.
01582 // if streamName="*" (default), all
01583 // streams will have their file set.
01584 // fullFilePathName string new filename.
01585 // accessMode Per::EAccessMode accessMode in which to
01586 // open file.
01587 //
01588 // Return: bool set true if PerInputStream::SetFile returned true for at
01589 // least one of the requested streams.
01590 //
01591 // Contact: S. Kasahara
01592 //
01593 // Notes: Invokes PerInputStream::SetFile for each requested stream.
01594 //
01595
01596 Int_t oldNumStream = this -> GetNumStreamOpen();
01597 bool openok
01598 = PerStreamManager::SetFile(streamName,fullFilePathName,accessMode);
01599 // reset if first, otherwise leave it where it is
01600 if ( openok && oldNumStream == 0) {
01601 SetCurrentVld(Per::GetVldBegin());
01602 }
01603
01604 return openok;
01605
01606 }
|
|
|
Definition at line 1621 of file PerInputStreamManager.cxx. Referenced by DDSChildServer::Next(). 01621 {
01622 // Purpose: Force file end on input file. Can be used for recovering
01623 // aborted files.
01624 //
01625 // Arguments: isFileEnd true (default) to force file end
01626 //
01627 // Return: none.
01628 //
01629 // Contact: S. Kasahara
01630 //
01631
01632 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01633 PerInputStream* instream = dynamic_cast<PerInputStream*>(citr -> second);
01634 instream -> SetFileEnd(isFileEnd);
01635 }
01636
01637 return;
01638
01639 }
|
|
||||||||||||
|
Definition at line 1726 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::SetMaxFileRepeat(). 01728 {
01729 // Purpose: Set the maximum number of times to reuse files
01730 // from this stream before moving to the next file
01731 // (only used for kSequential & kRandom stream sequence
01732 // modes)
01733 //
01734 // Argument: stream name of interest, max number (int >= 0)
01735 //
01736 // Return: none
01737 //
01738 // Note: Invokes PerInPutStream::SetMaxFileRepeat() for each stream
01739 //
01740
01741 if (numRepeat < 0) numRepeat = 0;
01742
01743 if (streamName == "*") {
01744 // Set info for all streams
01745 for ( StreamMapConstItr citr = fStreamMap.begin();
01746 citr!= fStreamMap.end(); ++citr ) {
01747 ((PerInputStream*)(citr -> second)) -> SetMaxFileRepeat(numRepeat);
01748 } // end loop over streams
01749 }
01750 else {
01751 // Set info for specified streamName
01752 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01753 stream -> SetMaxFileRepeat(numRepeat);
01754 }
01755 }
|
|
|
Definition at line 57 of file PerInputStreamManager.h. References fMaxSyncDelay. Referenced by DDSChildServer::Subscribe(). 00057 { fMaxSyncDelay = maxSyncDelay; }
|
|
||||||||||||
|
Definition at line 1759 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::SetMeanMom(). 01760 {
01761 // Purpose: Set the mean number of events to push to Mom from this stream
01762 //
01763 // Argument: stream name of interest, mean number (double >= 0.)
01764 //
01765 // Return: none
01766 //
01767 // Note: Invokes PerInPutStream::SetMeanMom() for each stream
01768 //
01769
01770 if (mm < 0.) return;
01771
01772 if (streamName == "*") {
01773 // Set info for all streams
01774 for ( StreamMapConstItr citr = fStreamMap.begin();
01775 citr!= fStreamMap.end(); ++citr ) {
01776 ((PerInputStream*)(citr -> second)) -> SetMeanMom(mm);
01777 } // end loop over streams
01778 }
01779 else {
01780 // Set info for specified streamName
01781 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01782 stream -> SetMeanMom(mm);
01783 }
01784 }
|
|
||||||||||||
|
Definition at line 1866 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::SetPerOwnedDisabled(). 01868 {
01869 // Purpose: Set PerOwnedDisabled for requested stream(s).
01870 //
01871 // Argument: streamName string name of stream. if streamName="*"(default),
01872 // apply to all streams.
01873 // perowneddisabled bool (default = true). Argument passed to
01874 // SetPerOwnedDisabled for each
01875 // requested stream.
01876 //
01877 // Return: none.
01878 //
01879 // Contact: S. Kasahara
01880 //
01881
01882 if (streamName == "*") {
01883 // Invoke SetPerOwnedDisabled on all streams
01884 for ( StreamMapConstItr citr = fStreamMap.begin();
01885 citr!= fStreamMap.end(); ++citr )
01886 ((PerInputStream*)(citr -> second))
01887 -> SetPerOwnedDisabled(perowneddisabled);
01888 }
01889 else {
01890 // Invoke SetPerOwnedDisabled on stream of specified streamName
01891 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01892 if (stream) stream -> SetPerOwnedDisabled(perowneddisabled);
01893 }
01894
01895 }
|
|
||||||||||||
|
Definition at line 1788 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::SetPushRandom(). 01789 {
01790 // Purpose: Set whether to push a constant (false) or random (true)
01791 // number of events to Mom from this stream
01792 //
01793 // Argument: stream name of interest, bool
01794 //
01795 // Return: none
01796 //
01797 // Note: Invokes PerInPutStream::SetPushRandom() for each stream
01798 //
01799
01800 if (streamName == "*") {
01801 // Set info for all streams
01802 for ( StreamMapConstItr citr = fStreamMap.begin();
01803 citr!= fStreamMap.end(); ++citr ) {
01804 ((PerInputStream*)(citr -> second)) -> SetPushRandom(tf);
01805 } // end loop over streams
01806 }
01807 else {
01808 // Set info for specified streamName
01809 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01810 stream -> SetPushRandom(tf);
01811 }
01812 }
|
|
|
Definition at line 1816 of file PerInputStreamManager.cxx. References fRandomOverride, fRandomSeed, fRanGen, and MSG. Referenced by IoInputStreamItr::SetRandomSeed(). 01817 {
01818 // User input method. If code modifies the random seed, use fRanGen->SetSeed()
01819 fRanGen->SetSeed(rSeed);
01820 fRandomSeed = rSeed;
01821 fRandomOverride = true;
01822 MSG("Per",Msg::kInfo)
01823 << "PerInputStreamManager::SetRandomSeed : Random Seed set to "
01824 << ((rSeed==0)? fRanGen->GetSeed() : fRandomSeed) << endl;
01825
01826 }
|
|
||||||||||||||||
|
Definition at line 1643 of file PerInputStreamManager.cxx. References fFormulaMap, fGlobalSelection, fTTreeFormula, and GetOpenedStream(). Referenced by IoInputStreamItr::Select(), and PerValidate::StreamMgrSelectionSeq(). 01644 {
01645 // Purpose: Set selection string for specified stream(s).
01646 //
01647 // Argument: streamName string name of stream to set selection..
01648 // if streamName="*" (default), all
01649 // streams will have selection set.
01650 // selection string selection cut to be applied to TTree
01651 // served by this stream. See PerInputStream::
01652 // SetSelection for more information.
01653 //
01654 // Return: none.
01655 //
01656 // Contact: S. Kasahara
01657 //
01658 // Notes: Invokes PerInputStream::SetSelection for each requested stream.
01659 //
01660
01661 if (streamName == "*") {
01662 // Apply selection string globally
01663 fGlobalSelection = selection;
01664 fFormulaMap.clear();
01665 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01666 // New TTreeFormula will be built at time of application
01667 }
01668 else {
01669 // SetSelection on stream of specified streamName
01670 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01671 if (stream) {
01672 stream -> SetSelection(selection);
01673 stream -> SetRequired(isRequired);
01674 }
01675 }
01676
01677 }
|
|
||||||||||||
|
Definition at line 1681 of file PerInputStreamManager.cxx. References fRanGen, and GetOpenedStream(). Referenced by OpenStream(), and IoInputStreamItr::SetSequenceMode(). 01682 {
01683 // Purpose: Set sequence mode for specified stream(s).
01684 // For kSequential and kRandom streams, a pointer to the
01685 // Manager's random numner generator is passed as well
01686 //
01687 // Argument: streamName string name of stream to set selection..
01688 // if streamName="*" (default), all
01689 // streams will have selection set.
01690 // sequenceMode Per::kKey (default), Per::kLowerBound,
01691 // Per::kWindow, Per::kSequential, or
01692 // Per::kRandom
01693 //
01694 // Return: none.
01695 //
01696 // Contact: S. Kasahara & K. Arms
01697 //
01698 // Notes: Invokes PerInputStream::SetSequenceMode for each requested
01699 // stream.
01700 //
01701
01702
01703 if (streamName == "*") {
01704 // Set selection on all streams
01705 for ( StreamMapConstItr citr = fStreamMap.begin();
01706 citr!= fStreamMap.end(); ++citr ) {
01707 ((PerInputStream*)(citr -> second)) -> SetSequenceMode(sequenceMode);
01708 if (sequenceMode == Per::kSequential ||
01709 sequenceMode == Per::kRandom )
01710 ((PerInputStream*)(citr -> second)) -> SetRandomGenerator(fRanGen);
01711 } // end for loop
01712 } else {
01713 // SetSelection on stream of specified streamName
01714 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01715 if (stream) {
01716 stream -> SetSequenceMode(sequenceMode);
01717 if (sequenceMode == Per::kSequential ||
01718 sequenceMode == Per::kRandom )
01719 stream -> SetRandomGenerator(fRanGen);
01720 } // end if stream is OK
01721 } // end else
01722 }
|
|
|
Definition at line 1899 of file PerInputStreamManager.cxx. References fUpdateMode. Referenced by OpenStream(), and PerValidate::RunAllTests(). 01899 {
01900 // Purpose: Set update mode true or false. true should be set
01901 // when reading open files (e.g. user is dispatcher).
01902 //
01903 // Argument: updateMode bool true => stream tree will be updated when
01904 // end of entries is reached. Appropriate
01905 // for read of open files. Default when
01906 // stream is opened is false.
01907 //
01908 // Return: none.
01909 //
01910 // Contact: S. Kasahara
01911 //
01912
01913 fUpdateMode = updateMode;
01914 for ( StreamMapConstItr citr = fStreamMap.begin();
01915 citr!= fStreamMap.end(); ++citr )
01916 ((PerInputStream*)(citr -> second)) -> SetUpdateMode(updateMode);
01917
01918 }
|
|
||||||||||||||||
|
Definition at line 1830 of file PerInputStreamManager.cxx. References GetOpenedStream(). Referenced by IoInputStreamItr::SetWindow(). 01832 {
01833 // Purpose: Set window to use if kWindow sequence mode is used.
01834 //
01835 // Argument: streamName string name of stream to set window.
01836 // if streamName="*" (default), all
01837 // streams will have selection set.
01838 // lower double define window relative to
01839 // upper double the key stream VldContext
01840 //
01841 // Return: none.
01842 //
01843 // Contact: S. Kasahara, B. Viren
01844 //
01845 // Notes: Invokes PerInputStream::SetWindow for each requested
01846 // stream.
01847 //
01848
01849
01850 if (streamName == "*") {
01851 // Set selection on all streams
01852 for ( StreamMapConstItr citr = fStreamMap.begin();
01853 citr!= fStreamMap.end(); ++citr )
01854 ((PerInputStream*)(citr -> second)) -> SetWindow(lower,upper);
01855 }
01856 else {
01857 // SetSelection on stream of specified streamName
01858 PerInputStream* stream = (PerInputStream*)GetOpenedStream(streamName);
01859 if (stream) stream -> SetWindow(lower,upper);
01860 }
01861
01862 }
|
|
|
Definition at line 1922 of file PerInputStreamManager.cxx. References fFormulaMap, fGlobalSelection, PerStream::fTTree, fTTreeFormula, PerStream::GetFullFilePathName(), PerStream::GetNumEntries(), PerStreamManager::GetNumStream(), and PerStreamManager::GetNumStreamOpen(). Referenced by IsSelectedSet(). 01922 {
01923 // Purpose: Private method, called by IsSelectedSet when selection
01924 // cut has been applied globally.
01925 //
01926 // Return: none.
01927 //
01928 // Contact: S. Kasahara
01929 //
01930
01931 if ( fGlobalSelection.empty() ) return;
01932 if ( this -> GetNumStreamOpen() <= 0 ) {
01933 if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01934 fFormulaMap.clear();
01935 return;
01936 }
01937
01938 bool rebuild = false;
01939 if ( !fTTreeFormula ) rebuild = true;
01940 else {
01941 // Loop through formula map checking for differences
01942 if ( this -> GetNumStream() != fFormulaMap.size() ) rebuild = true;
01943 else {
01944 for ( StreamMapConstItr citr = fStreamMap.begin();
01945 citr!= fStreamMap.end(); ++citr) {
01946 PerInputStream* stream = (PerInputStream*)(citr->second);
01947 std::string streamname = citr->first;
01948 if ( fFormulaMap[streamname] != stream->GetFullFilePathName() ) {
01949 rebuild = true;
01950 break;
01951 }
01952 }
01953 }
01954 }
01955
01956 if ( !rebuild ) return;
01957 // if ( fTTreeFormula ) delete fTTreeFormula; fTTreeFormula = 0;
01958 // If file boundary has been crossed on one of the streams, must
01959 // update treeformula
01960 fFormulaMap.clear();
01961 std::string minstreamnm = "";
01962 Int_t minentries = -1;
01963 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01964 PerInputStream* stream = (PerInputStream*)(citr -> second);
01965 if ( stream->fTTree != 0 &&
01966 (stream->GetNumEntries() < minentries || minstreamnm.empty()) ) {
01967 minentries = stream->GetNumEntries();
01968 minstreamnm = citr->first;
01969 }
01970 fFormulaMap.insert(make_pair(citr->first,stream->GetFullFilePathName()));
01971 }
01972
01973
01974 // Smallest tree becomes master, else root will complain
01975 TTree* mastertree = ((PerInputStream*)fStreamMap[minstreamnm])->fTTree;
01976 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01977 if ( citr->first == minstreamnm ) continue;
01978 PerInputStream* stream = (PerInputStream*)(citr->second);
01979 if ( stream->fTTree != 0 ) mastertree -> AddFriend(stream->fTTree);
01980 }
01981 if ( !fTTreeFormula ) {
01982 fTTreeFormula
01983 = new TTreeFormula("PerMgrSelection",fGlobalSelection.c_str(),mastertree);
01984 }
01985 else {
01986 fTTreeFormula -> SetTree(mastertree);
01987 fTTreeFormula -> UpdateFormulaLeaves();
01988 }
01989 if ( fTTreeFormula -> GetNdim() <= 0 ) {
01990 // TTreeformula build failed
01991 delete fTTreeFormula; fTTreeFormula = 0;
01992 fFormulaMap.clear();
01993 }
01994
01995 for(StreamMapConstItr citr=fStreamMap.begin();citr!=fStreamMap.end();++citr){
01996 if ( citr->first == minstreamnm ) continue;
01997 PerInputStream* stream = (PerInputStream*)(citr->second);
01998 if ( stream->fTTree != 0 ) mastertree -> RemoveFriend(stream->fTTree);
01999 }
02000
02001 return;
02002
02003 }
|
|
|
Definition at line 108 of file PerInputStreamManager.h. Referenced by AdvanceRecordTags(), IsBegin(), IsEnd(), LoadRecord(), LoadRecordWithTag(), Next(), RecordsAt(), RewindRecordTags(), and SetCurrentVld(). |
|
|
Definition at line 111 of file PerInputStreamManager.h. Referenced by SetSelection(), UpdateTreeFormula(), and ~PerInputStreamManager(). |
|
|
Definition at line 109 of file PerInputStreamManager.h. Referenced by IsSelectedSet(), SetSelection(), and UpdateTreeFormula(). |
|
|
Definition at line 116 of file PerInputStreamManager.h. Referenced by IsSelectedSet(), and SetCurrentVld(). |
|
|
Definition at line 117 of file PerInputStreamManager.h. Referenced by IsSelectedSet(). |
|
|
Definition at line 118 of file PerInputStreamManager.h. Referenced by AdvanceRecordTags(), CloseStream(), OpenStream(), and RewindRecordTags(). |
|
|
Definition at line 115 of file PerInputStreamManager.h. Referenced by AdvanceRecordTags(), and SetMaxSyncDelay(). |
|
|
Definition at line 124 of file PerInputStreamManager.h. Referenced by SetRandomSeed(). |
|
|
OWNED HERE BUT USED BY PERINPUTSTREAMs.
Definition at line 123 of file PerInputStreamManager.h. Referenced by LoadRecord(), PerInputStreamManager(), and SetRandomSeed(). |
|
|
Definition at line 122 of file PerInputStreamManager.h. Referenced by LoadRecord(), PerInputStreamManager(), SetRandomSeed(), SetSequenceMode(), and ~PerInputStreamManager(). |
|
|
Definition at line 110 of file PerInputStreamManager.h. Referenced by IsSelectedSet(), SetSelection(), UpdateTreeFormula(), and ~PerInputStreamManager(). |
|
|
Definition at line 114 of file PerInputStreamManager.h. Referenced by AdvanceRecordTags(), Next(), OpenStream(), and SetUpdateMode(). |
1.3.9.1