#include <IoInputModule.h>
Inheritance diagram for IoInputModule:

Public Member Functions | |
| IoInputModule () | |
| ~IoInputModule () | |
| void | BeginJob () |
| void | EndJob () |
| const Registry & | DefaultConfig () const |
| void | Config (const Registry &r) |
| JobCResult | Get () |
| JobCResult | Next (int n=1) |
| JobCResult | Prev (int n=1) |
| JobCResult | GoTo (int run, int snarl, int searchDir=0) |
| JobCResult | GoTo (const VldContext &vld) |
| void | List (const char *streamlist="*") const |
| void | AddFile (const char *filename, const char *streamlist="*", int at=-1) |
| void | RemoveFile (const char *filename, const char *streamlist="*") |
| JobCResult | NextFile (int n=1, const char *streamlist="*") |
| JobCResult | PrevFile (int n=1, const char *streamlist="*") |
| JobCResult | GoToFile (int i, const char *streamlist="*") |
| JobCResult | GoToFile (const char *filename, const char *streamlist="*") |
| void | DefineStream (const char *stream, const char *tree) |
| void | Select (const char *stream, const char *select, bool isRequired=false) |
| void | SetSequenceMode (const char *stream, Per::ESequenceMode sequenceMode=Per::kKey) |
| void | SetPerOwnedDisabled (const char *stream, bool perowneddisabled=true) |
| void | SetTestMode (const char *stream, bool testMode) |
| void | SetWindow (const char *stream, double lower, double upper) |
| void | SetMaxFileRepeat (const char *stream, int numRepeat) |
| void | SetMeanMom (const char *stream, double mean) |
| void | SetPushRandom (const char *stream, bool setRandom) |
| void | SetRandomSeed (int rSeed) |
| Int_t | GetCurrentRun () const |
| Int_t | GetLastRun () const |
| const char * | GetCurrentFile (const char *streamname="*") const |
| const char * | GetLastFile (const char *streamname="*") const |
| Int_t | GetCurrentSnarl () const |
Private Member Functions | |
| void | LoadFilesFromCommandLine () |
| int | ReadHeader () |
| void | UpdateDDSConfig () |
| void | UpdateFormatConfig () |
| void | UpdateFileList () |
| void | UpdateStreamConfig () |
| int | OpenStreamItr () |
| void | CloseStreamItr () |
| JobCResult | Get (MomNavigator *) |
Private Attributes | |
| std::map< std::string, std::string > | fStreamSelectionMap |
| std::map< std::string, bool > | fStreamRequiredMap |
| std::map< std::string, std::string > | fStreamDefMap |
| std::map< std::string, Per::ESequenceMode > | fStreamSeqModeMap |
| std::map< std::string, bool > | fStreamPerOwnedDisabledMap |
| std::map< std::string, bool > | fStreamTestModeMap |
| std::map< std::string, std::pair< double, double > > | fStreamWindowMap |
| std::list< IoFileListItem > | fFileList |
| IoDataStreamItr * | fDataStreamItr |
| std::string | fFormat |
| std::string | fStreamList |
| std::map< std::string, int > | fStreamMaxRepeatMap |
| std::map< std::string, double > | fStreamMeanMap |
| std::map< std::string, bool > | fStreamPushRandomMap |
| int | fRandomSeed |
| std::string | fServer |
| int | fPort |
| int | fTimeOut |
| int | fDataSource |
| int | fKeepUpMode |
| int | fMaxSyncDelay |
| bool | fOffLine |
| int | fMaxRetry |
| int | fRetryDelay |
| DDS::EClientType | fClientType |
| string | fClientName |
| JobCResult | fStatus |
| int | fLastRun |
| int | fLastSnarl |
| int | fCurrentRun |
| int | fCurrentSnarl |
| std::map< std::string, std::string > | fCurrentFileMap |
| std::map< std::string, std::string > | fLastFileMap |
| std::string | fLastBeginFile |
| std::string | fLastEndFile |
| bool | fLoadedCommandLineFiles |
| TStopwatch | fStopwatch |
|
|
Definition at line 66 of file IoInputModule.cxx. References fStopwatch. 00066 : 00067 fDataStreamItr(0), 00068 fFormat(""), 00069 fStreamList(""), 00070 fServer(""), 00071 fPort(0), 00072 fTimeOut(0), 00073 fDataSource(0), 00074 fKeepUpMode(0), 00075 fMaxSyncDelay(0), 00076 fOffLine(false), 00077 fMaxRetry(0), 00078 fRetryDelay(1), 00079 fClientType(DDS::kUnknownClientType), 00080 fClientName(""), 00081 fStatus(JobCResult::kAOK), 00082 fLastRun(-1), 00083 fLastSnarl(-1), 00084 fCurrentRun(-1), 00085 fCurrentSnarl(-1), 00086 fLoadedCommandLineFiles(false) 00087 #ifdef SITE_HAS_SAM 00088 ,fsamProject(0) 00089 #endif 00090 { fStopwatch.Reset(); fStopwatch.Stop(); }
|
|
|
Definition at line 94 of file IoInputModule.cxx. References fDataStreamItr. 00095 {
00096 if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00097 }
|
|
||||||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 497 of file IoInputModule.cxx. References VldTimeStamp::AsString(), fDataStreamItr, fFileList, Registry::Get(), JobCModule::GetConfig(), IoFileListItem::GetFileStreamMap(), and MSG. Referenced by LoadFilesFromCommandLine(), IoInputModuleValidate::TestAdd(), and UpdateFileList(). 00498 {
00499 //======================================================================
00500 // Add to the list of attached streams at the position "at". -1 = end
00501 // of list
00502 //======================================================================
00503
00504
00505 // Find out by checking the format of the filepath whether this
00506 // is a SAM job. Format will be SAM:samdataset of SAM_FILE::file
00507
00508 const char *s1 = "SAM";
00509 if ( strstr(filepath,s1) != NULL ) {
00510
00511 #ifdef SITE_HAS_SAM
00512
00513 // SAM options
00514
00515 const char* tmps;
00516 int tmpi;
00517
00518 Registry& r = GetConfig();
00519 if (r.Get("Station",tmps)) {fStation = tmps;}
00520 if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00521 if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00522 if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00523 if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00524 if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00525 if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00526 if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00527
00528 // Decide whether we have a dataset or a single file
00529 // dataset will be SAM, file will be SAM_FILE
00530 // Get the samdataset or file from filepath
00531
00532 std::string temp = filepath;
00533 size_t pos = temp.find(":")+1;
00534 std::string sam_access_type = temp.substr(0,pos-1);
00535 if (sam_access_type == "SAM") {
00536 std::string samdataset = temp.substr(pos,temp.length()-pos);
00537
00538
00539 std::string projectname;
00540
00541 // If this is a new project then append time-stamp
00542 if (fStartNewProject == 1) {
00543 // Construct full project name including timestamp
00544 // Get timestamp as a string
00545 VldTimeStamp ts;
00546 std::string timestamp = ts.AsString("lc");
00547 // Replace blank with "-"
00548 size_t pos = timestamp.find(" ");
00549 timestamp.replace(pos,1,"-");
00550 // Replace : with - as DbServer does not like : in project names
00551 pos = timestamp.find(":");
00552 while ( pos != string::npos ) {
00553 timestamp.replace(pos,1,"-");
00554 pos = timestamp.find(":",pos+1);
00555 }
00556 // Append timestamp to fProjectName taken from registry
00557 fProjectName.append("-");
00558 projectname = fProjectName+timestamp;
00559 }
00560 // Otherwise use project name that is supplied
00561 else if(fStartNewProject == 0) {
00562 projectname = fProjectName;
00563 }
00564
00565
00566 MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00567 fSnapShotVers << " Work Group Name " << fWorkGroupName
00568 << " Application Name "
00569 << fApplicationName << " Application Version " << fApplicationVers <<
00570 " Project Name " << projectname << endl;
00571
00572
00573 // Define snapshot version
00574
00575 long snapshot;
00576 // Snapshot version = 0 means create New Snapshot
00577 if ( fSnapShotVers == 0 ) {
00578 snapshot = sam::SamProject::NewSnapshotVersion;
00579 }
00580 // Snapshot version < 0 means use last one created
00581 else if (fSnapShotVers < 0) {
00582 snapshot = sam::SamProject::LatestSnapshotVersion;
00583 }
00584 else if (fSnapShotVers > 0 ) {
00585 // Use specified SnapShot version
00586 snapshot = fSnapShotVers;
00587 }
00588
00589 MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl;
00590
00591 if (fStartNewProject == 1) {
00592
00593 // Create SAM project
00594
00595 fsamProject = new sam::SamProject(projectname,fStation);
00596
00597 // Start SAM project
00598
00599 std::list<std::string> projectMasterArgList;
00600
00601 try {
00602 MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname <<
00603 " on station " << fStation << endl;
00604 fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00605 projectMasterArgList);
00606 }
00607 catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00608 MSG("Io",Msg::kInfo) << "Rejected start SAM project request "
00609 << ex << endl;
00610 }
00611 }
00612
00613 // Start SAM consumer to deliver files
00614
00615 const int projectMasterTimeout(60);
00616 const std::string processDescription("Loon Analysis Process");
00617
00618 try{
00619
00620
00621 sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00622 fApplicationName,fApplicationVers,
00623 processDescription,
00624 fMaxNumberOfFiles,
00625 projectMasterTimeout);
00626
00627 MSG("Io",Msg::kInfo) << "Started SAM Consumer" << endl;
00628
00629 // Now get files. Format of returned files depends on whether
00630 // SAM cache is local disk, dcache disk or AFS file space
00631
00632 std::map<std::string,std::string> filelist;
00633 map<std::string,std::string>::iterator fitr;
00634
00635 int location;
00636 int length;
00637 int comp;
00638 std::string fileonly;
00639 std::string restOfPath;
00640 std::string afsroot("afsroot:");
00641 try {
00642 while(true) {
00643 std::string filename = fsamConsumer.getFile().getFullFileName();
00644 // The files need to be sorted as they come back in an undefined
00645 // order. Split them into a filename and the rest of the path.
00646 // Put them in a map and then iterate over key which is filename
00647 // - guarantees correct order.
00648
00649 MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00650 location = filename.find_last_of("/");
00651 length = filename.length();
00652 fileonly = filename.substr(location+1,length-1);
00653 // Need to look for afsroot: at start of path. If it is there the
00654 // remove it and rest of path is AFS path.
00655 comp = filename.compare(0,8,afsroot);
00656 if (comp == 0 ) {
00657 restOfPath = filename.substr(8,location-8);
00658 }
00659 else {
00660 restOfPath = filename.substr(0,location);
00661 }
00662
00663 restOfPath.append("/");
00664 filelist.insert(make_pair(fileonly,restOfPath));
00665
00666 MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path "
00667 << restOfPath << endl;
00668
00669 // Release file
00670
00671 fsamConsumer.releaseFile();
00672 }
00673 }
00674 catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00675 MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00676 }
00677
00678 // Got all files. Now need to add them to file list. Iterate over map
00679
00680 std::string sfile;
00681 const char *samfile = 0;
00682 for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00683 sfile = (fitr->second+fitr->first);
00684 samfile = sfile.data();
00685 MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00686 // Add file to file list
00687 IoFileListItem iofile(samfile,at,streamlist);
00688 fFileList.push_back(iofile);
00689 }
00690 }
00691 catch(const sam::SamConsumer::InitializationError& ex) {
00692 MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request "
00693 << ex << endl;
00694 }
00695
00696
00697 if (fsamProject) {
00698 try {
00699 MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00700 fsamProject->endProject();
00701 }
00702 catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00703 MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00704
00705 }
00706 catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00707 MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00708 }
00709 }
00710 }
00711 else if (sam_access_type == "SAM_FILE") {
00712
00713 // User just wants a single file
00714 std::string samfile = temp.substr(pos,temp.length()-pos);
00715
00716 // Use sam_locate to get pnfs path
00717
00718 sam::LocationList samFiles;
00719 try {
00720 MSG("Io",Msg::kInfo) << "Locating file " << samfile << endl;
00721 samFiles = sam::locate(samfile);
00722 // Now need to translate pnfs path into dcache path
00723 // insert fnal.gov/usr/ after /pnfs/
00724 samFiles[0].insert(6,"fnal.gov/usr/");
00725 // There are two dcache ports so chose one at random
00726 TRandom3 rand(0);
00727 Double_t r = rand.Rndm();
00728 if (r <= 0.5) {
00729 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24125");
00730 }
00731 else if (r > 0.5) {
00732 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24136");
00733 }
00734 // Finally append filename
00735 samFiles[0].append("/");
00736 samFiles[0].append(samfile);
00737 MSG("Io",Msg::kInfo) << "Adding file " << samFiles[0].c_str() << " to input list" << endl;
00738 // Add file to file list
00739 IoFileListItem iofile(samFiles[0].c_str(),at,streamlist);
00740 fFileList.push_back(iofile);
00741 }
00742 catch(const sam::exception::DataFileNotFound& ex) {
00743 MSG("Io",Msg::kInfo) << ex << endl;
00744 }
00745 }
00746
00747 #endif // End of ifdef SITE_HAS_SAM
00748
00749 }
00750 else {
00751
00752 // Add file to file list
00753 IoFileListItem iofile(filepath,at,streamlist);
00754 fFileList.push_back(iofile);
00755
00756 if ( !fDataStreamItr ) return;
00757
00758 // Add files to stream managed lists
00759 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00760
00761 if ( at < 0 ) {
00762 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00763 for ( ; itr != filemap.end(); itr++ ) {
00764 std::string filename = itr -> first;
00765 std::string streamlist = itr -> second;
00766 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00767 }
00768 }
00769 else {
00770 // Apply files in reverse to have first file in list inserted at pos At
00771 IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00772 for ( ; itr != filemap.rend(); itr++ ) {
00773 std::string filename = itr -> first;
00774 std::string streamlist = itr -> second;
00775 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00776 }
00777 }
00778 }
00779 }
|
|
|
Implement for notification of begin of job Reimplemented from JobCModule. Definition at line 101 of file IoInputModule.cxx. References LoadFilesFromCommandLine(). 00102 {
00103 this->LoadFilesFromCommandLine();
00104 // Delay opening files until first action (Next,Prev,etc.) call
00105 }
|
|
|
Definition at line 1516 of file IoInputModule.cxx. References fDataStreamItr, fStatus, IoDataStreamItr::GetFormat(), MSG, JobCResult::SetEndFile(), JobCResult::SetEndOfInputStream(), and JobCResult::SetEndRun(). Referenced by OpenStreamItr(), UpdateDDSConfig(), and UpdateFormatConfig(). 01517 {
01518 //======================================================================
01519 // Close the currently openned stream
01520 //======================================================================
01521 if (fDataStreamItr) {
01522 MSG("Io",Msg::kDebug)
01523 << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01524 delete fDataStreamItr;
01525 fDataStreamItr = 0;
01526 fStatus.SetEndRun();
01527 fStatus.SetEndFile();
01528 fStatus.SetEndOfInputStream();
01529 }
01530 }
|
|
|
Return the actual configuration. If your module directly pulls its configuration from the fConfig Registry, you don't need to override this. Override if you have local config variables. Reimplemented from JobCModule. Definition at line 176 of file IoInputModule.cxx. References fClientName, fClientType, fDataSource, fFormat, fKeepUpMode, fMaxRetry, fMaxSyncDelay, fOffLine, fPort, fRetryDelay, fServer, fStreamList, fTimeOut, Registry::Get(), DDS::GetClientType(), DDS::GetDataSourceCode(), DDS::GetKeepUpCode(), MSG, UpdateDDSConfig(), UpdateFormatConfig(), and UpdateStreamConfig(). Referenced by IoInputModuleValidate::IoInputModuleValidate(). 00177 {
00178 //======================================================================
00179 // Configure the module based on the contents of the registry r
00180 //======================================================================
00181 const char* tmps;
00182 int tmpi;
00183 int tmpb; // bools
00184
00185 MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00186
00187 // Input data stream configuration
00188 bool doFormatConfig = false;
00189 if (r.Get("Format", tmps)) { fFormat = tmps; doFormatConfig = true; }
00190 if (doFormatConfig) this->UpdateFormatConfig();
00191 bool doStreamConfig = false;
00192 if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00193 if (doStreamConfig) this->UpdateStreamConfig();
00194
00195 // DDS options
00196 bool doDDSConfig = false;
00197 if (r.Get("DDSServer", tmps)) { fServer = tmps; doDDSConfig = true; }
00198 if (r.Get("DDSPort", tmpi)) { fPort = tmpi; doDDSConfig = true; }
00199 if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00200 if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps);
00201 doDDSConfig = true; }
00202 if (r.Get("DDSClientName",tmps)) { fClientName = tmps;
00203 doDDSConfig = true; }
00204 if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00205 doDDSConfig = true;}
00206 if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps);
00207 doDDSConfig = true; }
00208 if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00209 if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00210 if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00211 if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00212 if (doDDSConfig) this->UpdateDDSConfig();
00213
00214
00215 }
|
|
|
Get the default configuration registry. This should normally be overridden. One useful idiom is to implement it like: const Registry& MyModule::DefaultConfig() const { static Registry cfg; // never is destroyed if (cfg.Size()) return cfg; // already filled it // set defaults: cfg.Set("TheAnswer",42); cfg.Set("Units","unknown"); return cfg; } Reimplemented from JobCModule. Definition at line 121 of file IoInputModule.cxx. References gSystem(), Registry::LockValues(), MSG, Registry::Set(), and Registry::UnLockValues(). Referenced by IoInputModuleValidate::IoInputModuleValidate(). 00122 {
00123 //======================================================================
00124 // Get the default configuration for this module
00125 //======================================================================
00126 static Registry r;
00127 r.SetName("INPUT.config");
00128
00129 r.UnLockValues();
00130
00131 MSG("Io",Msg::kDebug) << "Loading default config\n";
00132
00133 // Stream config
00134 r.Set("Format" ,"input");
00135 r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00136 // r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection,DcsMonitor,DcsAlarm");
00137
00138 // DDS config
00139 r.Set("DDSServer", "daqdds.minos-soudan.org");
00140 r.Set("DDSPort", DDS::kPort);
00141 r.Set("DDSTimeOut", 120);
00142 r.Set("DDSDataSource","Daq");
00143 r.Set("DDSKeepUpMode", "FileKeepUp");
00144 r.Set("DDSMaxSyncDelay",15);
00145 r.Set("DDSOffLine",false);
00146 r.Set("DDSMaxRetry",0);
00147 r.Set("DDSRetryDelay",1);
00148 r.Set("DDSClientType","Unknown");
00149 r.Set("DDSClientName","");
00150
00151 #ifdef SITE_HAS_SAM
00152
00153 // SAM config
00154 r.Set("Station","minos");
00155 r.Set("SnapShotVers",0);
00156 r.Set("WorkGroupName","minos");
00157 r.Set("ApplicationName","loon");
00158 r.Set("ApplicationVers","dev");
00159 r.Set("MaxNumberOfFiles",0);
00160 r.Set("StartNewProject",1);
00161
00162 // Create default project name
00163 // Get $USER
00164 const char* username = gSystem->Getenv("USER");
00165 if (!username) username = "unknown";
00166 r.Set("ProjectName",username);
00167
00168 #endif
00169
00170 r.LockValues();
00171 return r;
00172 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 945 of file IoInputModule.cxx. References IoDataStreamItr::DefineStream(), fDataStreamItr, and fStreamDefMap. 00945 {
00946 //======================================================================
00947 // Define stream to serve specified tree
00948 //======================================================================
00949 // Insert the definition into the map
00950 fStreamDefMap[stream] = tree;
00951
00952 // Pass the info on to the data stream
00953 if ( fDataStreamItr ) {
00954 fDataStreamItr->DefineStream(stream, tree);
00955 }
00956
00957 }
|
|
|
Implement for notification of end of job Reimplemented from JobCModule. Definition at line 110 of file IoInputModule.cxx. References fStopwatch, and MSG. 00111 {
00112 fStopwatch.Stop();
00113 MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real "
00114 << fStopwatch.RealTime() << ", CPU "
00115 << fStopwatch.CpuTime() << endl;
00116
00117 }
|
|
|
Implement if your module needs to read data from some external source and fill mom Reimplemented from JobCInputModule. Definition at line 105 of file IoInputModule.h. 00105 { return JobCResult::kAOK; }
|
|
|
Reimplemented from JobCInputModule. Definition at line 219 of file IoInputModule.cxx. References JobCResult::EndOfInputStream(), fDataStreamItr, fFormat, fStatus, fStopwatch, JobCInputModule::GetMom(), IoDataStreamItr::LoadRecords(), MSG, OpenStreamItr(), ReadHeader(), JobCResult::SetEndFile(), JobCResult::SetEndOfInputStream(), JobCResult::SetEndRun(), and UtilString::ToUpper(). Referenced by GoTo(), Next(), Prev(), and IoInputModuleValidate::TestGet(). 00220 {
00221 //======================================================================
00222 // Load the data records at the current position in the input stream
00223 //======================================================================
00224
00225
00226 if ( fDataStreamItr==0 ) {
00227 if ( this->OpenStreamItr()==0 ) {
00228 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00229 return fStatus;
00230 }
00231 return this->Get();
00232 }
00233
00234 MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00235
00236 fStopwatch.Start(false);
00237 MomNavigator* mom = this->GetMom();
00238 assert(mom);
00239 mom -> Clear(); // Moving on so clear contents of Mom
00240
00241 int nrecord = fDataStreamItr->LoadRecords(mom);
00242 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00243 // special treatment required because dds doesn't separate advance from load
00244 if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00245
00246 this->ReadHeader(); // sets beginrun/endrun beginfile/endfile fStatus bits
00247 if ( fStatus.EndOfInputStream() )
00248 { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00249 MSG("Io",Msg::kVerbose)
00250 << "IoInputModule::Get returning status " << fStatus << endl;
00251 fStopwatch.Stop();
00252 return fStatus;
00253 }
|
|
|
Return the currently opened input file. This should not be overridden. Use this in BeginFile(). Reimplemented from JobCInputModule. Definition at line 1068 of file IoInputModule.cxx. References done(), fCurrentFileMap, fDataStreamItr, IoDataStreamItr::GetCurrentFile(), mapStrStrItr_t, and MSG. 01069 {
01070 MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01071 mapStrStrItr_t it, done=fCurrentFileMap.end();
01072 std::string strmstring = streamname;
01073
01074 for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01075 MSG("Io",Msg::kVerbose)
01076 << "stream: " << setw(16) << it->first
01077 << " file: " << it->second
01078 <<endl;
01079 }
01080
01081 for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01082 if ( strmstring == it->first ) return it->second.c_str();
01083 }
01084 // Sue's original approach
01085 if (!fDataStreamItr) return 0;
01086 return fDataStreamItr->GetCurrentFile(streamname);
01087 }
|
|
|
Return the current run #. This should not be overridden. Use this in BeginRun(). Reimplemented from JobCInputModule. Definition at line 87 of file IoInputModule.h. 00087 { return fCurrentRun; };
|
|
|
Reimplemented from JobCInputModule. Definition at line 91 of file IoInputModule.h. 00091 { return fCurrentSnarl; };
|
|
|
Return the previously opened input file. This should not be overridden. Use this in EndFile(). Reimplemented from JobCInputModule. Definition at line 1089 of file IoInputModule.cxx. References done(), fLastFileMap, mapStrStrItr_t, and MSG. 01090 {
01091 MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01092 mapStrStrItr_t it, done = fLastFileMap.end();
01093 std::string strmstring = streamname;
01094
01095 for (it = fLastFileMap.begin(); it!=done; ++it) {
01096 MSG("Io",Msg::kVerbose)
01097 << "stream: " << setw(16) << it->first
01098 << " file: " << it->second
01099 <<endl;
01100 }
01101
01102 for (it = fLastFileMap.begin(); it!=done; ++it) {
01103 if ( strmstring == it->first ) return it->second.c_str();
01104 }
01105 return 0;
01106
01107 }
|
|
|
Return the previous run #. This should not be overridden. Use this in EndRun(). Reimplemented from JobCInputModule. Definition at line 88 of file IoInputModule.h. 00088 { return fLastRun; };
|
|
|
Reimplemented from JobCInputModule. Definition at line 440 of file IoInputModule.cxx. References fDataStreamItr, CallDepth::fsDepth, fStatus, Get(), GoTo(), MSG, and OpenStreamItr(). 00441 {
00442 //======================================================================
00443 // Go to records that match validity context. If vld is not found, will
00444 // GoTo record set one beyond requested validity.
00445 //======================================================================
00446 CallDepth d;
00447
00448 // Set the input status to "all clear"
00449 if (d.fsDepth==1) fStatus = gsAllClear;
00450
00451 if (fDataStreamItr==0) {
00452 if (this->OpenStreamItr()==0) {
00453 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00454 return fStatus;
00455 }
00456 return this->GoTo(vld);
00457 }
00458
00459 fStatus |= fDataStreamItr -> GoTo(vld);
00460
00461 // Load the current event
00462 return this->Get();
00463
00464 }
|
|
||||||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 372 of file IoInputModule.cxx. References JobCResult::BeginOfInputStream(), JobCResult::EndOfInputStream(), fCurrentRun, fCurrentSnarl, fDataStreamItr, fLastRun, CallDepth::fsDepth, fStatus, Get(), MSG, Next(), OpenStreamItr(), Prev(), and run(). Referenced by GoTo(), and IoInputModuleValidate::TestGoToEOF(). 00373 {
00374 CallDepth d;
00375
00376 if (d.fsDepth==1) fStatus = gsAllClear;
00377
00378 if (fDataStreamItr==0) {
00379 if (this->OpenStreamItr()==0) {
00380 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00381 return fStatus;
00382 }
00383 return this->GoTo(run,snarl,searchDir);
00384 }
00385
00386 if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00387
00388 int dir = searchDir;
00389 if (dir==0) {
00390 if (run>fLastRun) { dir = 1; }
00391 else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00392 else {
00393 if (snarl>fLastSnarl) { dir = 1; }
00394 else { dir = -1; }
00395 }
00396 }
00397
00398 // Move position in the stream looking for run/event number
00399 while ( 1 ) {
00400 if ( dir > 0 ) {
00401 this->Next();
00402 if (fCurrentRun>run) {
00403 MSG("Io",Msg::kWarning) <<
00404 "Went to run "<<fCurrentRun<<
00405 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00406 return fStatus;
00407 }
00408 if (fCurrentRun==run && fCurrentSnarl>snarl) {
00409 MSG("Io",Msg::kWarning) <<
00410 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00411 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00412 return fStatus;
00413 }
00414 }
00415 if ( dir <0 ) {
00416 this->Prev();
00417 if (fCurrentRun<run) {
00418 MSG("Io",Msg::kWarning) <<
00419 "Went to run "<<fCurrentRun<<
00420 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00421 return fStatus;
00422 }
00423 if (fCurrentRun==run && fCurrentSnarl<snarl) {
00424 MSG("Io",Msg::kWarning) <<
00425 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00426 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00427 return fStatus;
00428 }
00429 }
00430 // Check if we're done
00431 if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00432 if (dir>0 && fStatus.EndOfInputStream()) return fStatus;
00433 if (dir<0 && fStatus.BeginOfInputStream()) return fStatus;
00434 }
00435 return fStatus;
00436 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 884 of file IoInputModule.cxx. References fDataStreamItr, CallDepth::fsDepth, fStatus, GoToFile(), MSG, and OpenStreamItr(). 00884 {
00885 //======================================================================
00886 // Move the stream to a named file
00887 //======================================================================
00888 CallDepth d;
00889
00890 if (d.fsDepth==1) fStatus = gsAllClear;
00891
00892 if (fDataStreamItr==0) {
00893 if (this->OpenStreamItr()==0) {
00894 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00895 return fStatus;
00896 }
00897 return this->GoToFile(filename,streamlist);
00898 }
00899
00900 fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00901
00902 return fStatus;
00903 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 859 of file IoInputModule.cxx. References fDataStreamItr, CallDepth::fsDepth, fStatus, MSG, and OpenStreamItr(). Referenced by GoToFile(), and IoInputModuleValidate::TestGoToFile(). 00860 {
00861 //======================================================================
00862 // Move the stream to the nth file in the list (n=0 is first)
00863 //======================================================================
00864 CallDepth d;
00865
00866 if (d.fsDepth==1) fStatus = gsAllClear;
00867
00868 if (fDataStreamItr==0) {
00869 if (this->OpenStreamItr()==0) {
00870 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00871 return fStatus;
00872 }
00873 return this->GoToFile(n,streamlist);
00874 }
00875
00876 fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00877
00878 return fStatus;
00879
00880 }
|
|
|
Reimplemented from JobCInputModule. Definition at line 468 of file IoInputModule.cxx. References fDataStreamItr, fFileList, fFormat, and MSGSTREAM. Referenced by IoInputModuleValidate::TestAdd(), IoInputModuleValidate::TestGoToEOF(), IoInputModuleValidate::TestGoToFile(), IoInputModuleValidate::TestNext(), IoInputModuleValidate::TestNextFile(), IoInputModuleValidate::TestPrevFile(), IoInputModuleValidate::TestRemove(), and IoInputModuleValidate::TestStreams(). 00469 {
00470 //======================================================================
00471 // Print list of files loaded
00472 //======================================================================
00473
00474 MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00475
00476 m << "IoInputModule using data format " << fFormat << endl;
00477
00478 if ( fDataStreamItr ) {
00479 fDataStreamItr -> ListFile(std::cout,streamlist);
00480 return;
00481 }
00482
00483 // data stream not yet open, list files from fFileList
00484 m << "File Name\tStream List " << endl;
00485 m << "=========\t=========== " << endl;
00486 std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00487 std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00488
00489 for ( ; itr != itrEnd; itr++ ) {
00490 m << *itr;
00491 }
00492
00493 }
|
|
|
Definition at line 1111 of file IoInputModule.cxx. References AddFile(), fLoadedCommandLineFiles, JobCEnv::GetFileName(), JobCEnv::GetNfile(), and JobCEnv::Instance(). Referenced by BeginJob(). 01112 {
01113 //======================================================================
01114 // Load the files listed on the program command line
01115 //======================================================================
01116 JobCEnv& jce = JobCEnv::Instance();
01117 if (!fLoadedCommandLineFiles) {
01118 for (int i=0; i<jce.GetNfile(); ++i) {
01119 this->AddFile(jce.GetFileName(i));
01120 }
01121 fLoadedCommandLineFiles = true;
01122 }
01123 }
|
|
|
Reimplemented from JobCInputModule. Definition at line 257 of file IoInputModule.cxx. References JobCResult::EndOfInputStream(), fDataStreamItr, CallDepth::fsDepth, fStatus, fStopwatch, Get(), JobCInputModule::GetMom(), IoDataStreamItr::Increment(), MSG, NextFile(), and OpenStreamItr(). Referenced by GoTo(), IoInputModuleValidate::TestGet(), and IoInputModuleValidate::TestNext(). 00258 {
00259 //======================================================================
00260 // Advance the position in the stream n record sets. Load the records
00261 // at the last position
00262 //======================================================================
00263 CallDepth d; // Keep track of the call depth
00264
00265 // Set the input status to "all clear" since advancing
00266 if (d.fsDepth==1) fStatus = gsAllClear;
00267
00268 MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00269
00270 if ( fDataStreamItr==0 ) {
00271 if (this->OpenStreamItr()==0) {
00272 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00273 return fStatus;
00274 }
00275 return this->Next(n);
00276 }
00277
00278 fStopwatch.Start(false);
00279 MomNavigator* mom = this->GetMom();
00280 assert(mom);
00281 mom -> Clear(); // Moving on so clear contents of Mom
00282
00283 // Advance the position in the input stream until we run out of
00284 // records and files
00285 int nstep = 0;
00286 int ndone = 0;
00287 int ntry = 0;
00288 while ( ndone < n ) {
00289 ntry = n - ndone;
00290 nstep = fDataStreamItr->Increment(ntry);
00291
00292 if ( nstep < ntry ) {
00293 // Reached end of file, load next one
00294 fStatus |= this->NextFile();
00295
00296 // If this is the end of the input stream, we're done.
00297 if ( fStatus.EndOfInputStream() ) {
00298 fStopwatch.Stop();
00299 return this->Get();
00300 }
00301 }
00302 ndone += nstep;
00303 }
00304
00305 // Load the current event
00306 fStopwatch.Stop();
00307 return this->Get();
00308 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 805 of file IoInputModule.cxx. References fDataStreamItr, CallDepth::fsDepth, fStatus, IoDataStreamItr::GetCurrentFile(), MSG, and OpenStreamItr(). Referenced by Next(), and IoInputModuleValidate::TestNextFile(). 00806 {
00807 //======================================================================
00808 // Move to the next file in the list (move by n positions)
00809 //======================================================================
00810 CallDepth d;
00811
00812 if (d.fsDepth==1) fStatus = gsAllClear;
00813
00814 if (fDataStreamItr==0) {
00815 if (this->OpenStreamItr()==0) {
00816 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00817 return fStatus;
00818 }
00819 return this->NextFile(n,streamlist);
00820 }
00821
00822 fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00823
00824 MSG("Io",Msg::kDebug)
00825 << "status is " << fStatus
00826 << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00827
00828 return fStatus;
00829
00830 }
|
|
|
Definition at line 1473 of file IoInputModule.cxx. References CloseStreamItr(), IoDataStreamFactory::CreateDataStreamItr(), fDataStreamItr, fFormat, fStatus, IoDataStreamItr::GetFormat(), MSG, JobCResult::SetBeginFile(), JobCResult::SetBeginOfInputStream(), JobCResult::SetBeginRun(), JobCResult::SetEndFile(), JobCResult::SetEndOfInputStream(), JobCResult::SetEndRun(), UtilString::ToUpper(), UpdateDDSConfig(), UpdateFileList(), and UpdateStreamConfig(). Referenced by Get(), GoTo(), GoToFile(), Next(), NextFile(), Prev(), and PrevFile(). 01474 {
01475 //======================================================================
01476 // Open a new stream iterator
01477 //======================================================================
01478 if (fDataStreamItr) this->CloseStreamItr();
01479
01480 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01481 std::string src;
01482 if ( isDDS ) src = fServer;
01483
01484 fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01485 fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01486 fClientType,fClientName);
01487
01488
01489 if (fDataStreamItr == 0) {
01490 MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01491 " using format '" << fFormat << "'" << endl;
01492 fStatus.SetEndRun();
01493 fStatus.SetEndFile();
01494 fStatus.SetEndOfInputStream();
01495 return 0;
01496 }
01497
01498 fStatus.SetBeginOfInputStream();
01499 fStatus.SetBeginFile();
01500 fStatus.SetBeginRun();
01501
01502 // Configure the file and module
01503 this->UpdateStreamConfig(); // this should be called before filelist
01504 this->UpdateFileList();
01505 this->UpdateDDSConfig();
01506 fFormat = fDataStreamItr->GetFormat();
01507
01508 MSG("Io",Msg::kDebug)
01509 << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01510
01511 return 1;
01512 }
|
|
|
Reimplemented from JobCInputModule. Definition at line 312 of file IoInputModule.cxx. References JobCResult::BeginOfInputStream(), IoDataStreamItr::Decrement(), JobCResult::EndOfInputStream(), fDataStreamItr, CallDepth::fsDepth, fStatus, fStopwatch, Get(), JobCInputModule::GetMom(), IoDataStreamItr::GoToEOF(), MSG, OpenStreamItr(), and PrevFile(). Referenced by GoTo(), and IoInputModuleValidate::TestPrev(). 00313 {
00314 //======================================================================
00315 // Back up n positions in the input data stream. Load the records at
00316 // the current position
00317 //======================================================================
00318 CallDepth d;
00319
00320 // Set the input status to "all clear"
00321 if (d.fsDepth == 1) fStatus = gsAllClear;
00322
00323 MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00324
00325 if (fDataStreamItr==0) {
00326 if (this->OpenStreamItr()==0) {
00327 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00328 return fStatus;
00329 }
00330 return this->Prev(n);
00331 }
00332
00333 // Back up the the position in the input stream until we run out of
00334 // records and files
00335 fStopwatch.Start(false);
00336 MomNavigator* mom = this->GetMom();
00337 assert(mom);
00338 mom -> Clear(); // Moving on so clear contents of Mom
00339
00340 int nstep = 0;
00341 int ndone = 0;
00342 int ntry = 0;
00343 while (ndone < n) {
00344 ntry = n - ndone;
00345 nstep = fDataStreamItr->Decrement(ntry);
00346
00347 if (nstep < ntry) {
00348 // Reached start of file, load previous file.
00349 fStatus |= this->PrevFile();
00350
00351 // If there is no previous file, then we're done.
00352 if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00353 return this->Get(); // may be end if failed to open stream itr
00354 fStopwatch.Stop();
00355 }
00356
00357 // Move the position to the end of the current file so we can
00358 // walk backwards over it
00359 fDataStreamItr->GoToEOF();
00360 }
00361 ndone += nstep;
00362 }
00363
00364 // Load the current event
00365 fStopwatch.Stop();
00366 return this->Get();
00367
00368 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 834 of file IoInputModule.cxx. References fDataStreamItr, CallDepth::fsDepth, fStatus, MSG, and OpenStreamItr(). Referenced by Prev(), and IoInputModuleValidate::TestPrevFile(). 00835 {
00836 //======================================================================
00837 // Move to the previous list in the file (move back by n files)
00838 //======================================================================
00839 CallDepth d;
00840
00841 if (d.fsDepth==1) fStatus = gsAllClear;
00842
00843 if (fDataStreamItr==0) {
00844 if (this->OpenStreamItr()==0) {
00845 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00846 return fStatus;
00847 }
00848 return this->PrevFile(n,streamlist);
00849 }
00850
00851 fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00852
00853 return fStatus;
00854
00855 }
|
|
|
Definition at line 1127 of file IoInputModule.cxx. References MomNavigator::At(), JobCResult::BeginFile(), done(), fCurrentFileMap, fCurrentRun, fCurrentSnarl, fLastFileMap, fLastRun, fLastSnarl, fStatus, Registry::Get(), MomNavigator::GetFragmentArray(), RecRecord::GetHeader(), RecMinos::GetHeader(), JobCInputModule::GetMom(), RecDataHeader::GetRun(), CandHeader::GetRun(), RawDaqHeader::GetRun(), RecPhysicsHeader::GetSnarl(), CandHeader::GetSnarl(), RawDaqSnarlHeader::GetSnarl(), RecRecord::GetTempTags(), RecMinos::GetTempTags(), mapStrStrItr_t, MSG, run(), JobCResult::SetBeginFile(), JobCResult::SetBeginRun(), JobCResult::SetEndFile(), and JobCResult::SetEndRun(). Referenced by Get(). 01128 {
01129 //======================================================================
01130 // Read temptags to get file name
01131 // Read header information to get run/snarl info.
01132 // if found Run and Snarl info return 2,
01133 // if only Run info return 1,
01134 // else return 0
01135 //======================================================================
01136 const MomNavigator* mom = this->GetMom();
01137 assert(mom);
01138
01139 // BeginFile/EndFile boundaries may not be in synch across the different
01140 // data streams. The definition used here is to set file boundary true if
01141 // the file has changed for any of the managed streams.
01142
01143 const TObjArray* momarray = mom->GetFragmentArray();
01144 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01145 TObject* obj = momarray->At(i);
01146 if (!obj) continue;
01147 Registry* temptags = 0;
01148 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01149 temptags = &(record->GetTempTags());
01150 }
01151 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01152 temptags = &(record->GetTempTags());
01153 }
01154 if ( ! temptags ) continue;
01155
01156 // on a named stream?
01157 const char* tagstream = 0;
01158 if ( ! temptags->Get("stream",tagstream) ) continue;
01159
01160 // stream managed by i/o?
01161 std::string streamname(tagstream);
01162 const char* tagnewfile = 0;
01163 if ( ! temptags->Get("file",tagnewfile) ) continue;
01164
01165 std::string lstfilename = fLastFileMap[streamname];
01166 std::string curfilename = fCurrentFileMap[streamname];
01167 std::string newfilename(tagnewfile);
01168
01169 if ( newfilename != curfilename ) {
01170
01171 std::string starcur = fCurrentFileMap.begin()->first;
01172 std::string starlast = fLastFileMap.begin()->first;
01173
01174 MSG("Io",Msg::kDebug)
01175 << "SetBeginFile on streamname '" << streamname << "'" << endl
01176 << " current '" << fCurrentFileMap[streamname] << "'" << endl
01177 << " last '" << fLastFileMap[streamname] << "'" << endl
01178 << " current['" << starcur << "'] '"
01179 << fCurrentFileMap[starcur] << "'" << endl
01180 << " last['" << starlast << "'] '"
01181 << fLastFileMap[starlast] << "'" << endl
01182 << " new '" << newfilename << "' != "
01183 << " cur '" << curfilename << "'" << endl
01184 << " update \"*\" ? "
01185 << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01186 << endl;
01187
01188 // update "*" stream first
01189 if ( newfilename != fCurrentFileMap["*"] ) {
01190 fStatus.SetBeginFile();
01191 //if ( lstfilename != "" ) fStatus.SetEndFile();
01192 if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01193
01194 fLastFileMap["*"] = fCurrentFileMap["*"];
01195 fCurrentFileMap["*"] = newfilename;
01196
01197 MSG("Io",Msg::kDebug)
01198 << "SetBeginFile on '*'" << endl
01199 << " current['" << starcur << "'] '"
01200 << fCurrentFileMap[starcur] << "'" << endl
01201 << " last['" << starlast << "'] '"
01202 << fLastFileMap[starlast] << "'" << endl;
01203 }
01204 // update this named stream
01205 fLastFileMap[streamname] = curfilename;
01206 fCurrentFileMap[streamname] = newfilename;
01207
01208 // if a stream on a file moved on then presumably all the
01209 // other streams on the same file have also been exhausted
01210 // and are going to move on -- help them along so we don't
01211 // have to wait for that stream to be the next record on
01212 // that stream is the next VldContext
01213 mapStrStrItr_t it, done = fCurrentFileMap.end();
01214 for (it = fCurrentFileMap.begin(); it != done; ++it) {
01215 if ( it->second == curfilename ) {
01216 std::string altstream = it->first;
01217 fLastFileMap[altstream] = curfilename;
01218 fCurrentFileMap[altstream] = newfilename;
01219 }
01220 }
01221
01222 } // new != cur filename
01223
01224 MSG("Io",Msg::kVerbose)
01225 << " stream '" << streamname << "' set fLastFileMap to '"
01226 << curfilename << "', fCurrentFileMap to '"
01227 << newfilename << "'"
01228 << " * '" << fLastFileMap["*"] << "' '" << fCurrentFileMap["*"] << "'"
01229 << endl;
01230
01231 } // loop over records
01232
01233 // update for EOF/EOJ condition (which doesn't come throught this
01234 // function) if it isn't a file change
01235 // this won't work is the file has just one record
01236 // ...the whole procedure is fundamentally flawed -- it shouldn't be
01237 // based on what we find in "mom" but rather the stream/file
01238 // management classes...
01239 if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01240
01241 // BeginRun/EndRun
01242 int run = -1; // default and flag value
01243 int snarl = -1; // default and flag value
01244 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01245 TObject* obj = momarray->At(i);
01246 if (!obj) continue;
01247 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01248 // old style
01249
01250 // all DAQ generated records can supply run #
01251 const RawDaqHeader* rdh
01252 = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01253 if (rdh) run = rdh->GetRun();
01254
01255 // but only DaqSnarl records can supply snarl #
01256 const RawDaqSnarlHeader* rdsh
01257 = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01258 if (rdsh) snarl = rdsh->GetSnarl();
01259
01260 if (!rdh) {
01261 // not a DAQ record, perhaps it's a CandRecord
01262 const CandHeader* candhdr
01263 = dynamic_cast<const CandHeader*>(record->GetHeader());
01264 if (candhdr) {
01265 run = candhdr->GetRun();
01266 snarl = candhdr->GetSnarl();
01267 }
01268 }
01269 }
01270 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01271 // New style
01272 const RecPhysicsHeader* rph
01273 = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01274 if ( rph ) {
01275 run = rph->GetRun();
01276 snarl = rph->GetSnarl();
01277 }
01278 }
01279 // break early only if determined snarl in case it one of those
01280 // crazy record sets with a DaqMonitor and a DaqSnarl record.
01281 if ( snarl >= 0 ) break;
01282 }
01283
01284 // set the status flags based on what was extracted
01285 fCurrentSnarl = snarl;
01286 if ( run < 0 ) {
01287 fCurrentRun = -1;
01288 return 0;
01289 }
01290 if ( run != fCurrentRun ) {
01291 fStatus.SetBeginRun();
01292 if ( fLastRun >= 0 ) fStatus.SetEndRun();
01293 }
01294 fLastRun = fCurrentRun;
01295 fCurrentRun = run;
01296 if ( snarl >= 0 ) {
01297 fLastSnarl = fCurrentSnarl;
01298 fCurrentSnarl = snarl;
01299 return 2;
01300 }
01301 return 1;
01302 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 783 of file IoInputModule.cxx. References fDataStreamItr, fFileList, IoFileListItem::GetNumFile(), and IoFileListItem::RemoveFile(). Referenced by IoInputModuleValidate::TestRemove(), and UpdateFileList(). 00783 {
00784 //======================================================================
00785 // Remove the file "filename" from the list of input data files.
00786 //======================================================================
00787
00788 if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00789
00790 std::string f(filename);
00791 std::list<IoFileListItem>::iterator itr = fFileList.end();
00792 while ( !fFileList.empty() && itr != fFileList.begin() ) {
00793 itr--;
00794 IoFileListItem& iofile = *itr;
00795 iofile.RemoveFile(filename,streamlist);
00796 if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00797 }
00798
00799 return;
00800
00801 }
|
|
||||||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 907 of file IoInputModule.cxx. References fDataStreamItr, fStreamRequiredMap, fStreamSelectionMap, and IoDataStreamItr::Select(). 00909 {
00910 //======================================================================
00911 // Add/Change the selection cuts for a stream
00912 //======================================================================
00913 // Insert the selection into the map
00914 fStreamSelectionMap[stream] = select;
00915 fStreamRequiredMap[stream] = isRequired;
00916
00917 // Pass the info on to the data stream
00918 if (fDataStreamItr) {
00919 fDataStreamItr->Select(stream, select,isRequired);
00920 }
00921
00922 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 1011 of file IoInputModule.cxx. References fDataStreamItr, fStreamMaxRepeatMap, and IoDataStreamItr::SetMaxFileRepeat(). 01012 {
01013 //======================================================================
01014 // Define maximum number of times to reuse a file in the stream before
01015 // loading the next one; for kSequential and kRandom sequence modes
01016 //======================================================================
01017 fStreamMaxRepeatMap[stream] = numRepeat;
01018
01019 // Pass it on to the data stream
01020 if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
01021 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 1025 of file IoInputModule.cxx. References fDataStreamItr, fStreamMeanMap, and IoDataStreamItr::SetMeanMom(). 01026 {
01027 //======================================================================
01028 // Define mean number of events to push to mom for this stream
01029 // for kSequential and kRandom sequence modes
01030 //======================================================================
01031 fStreamMeanMap[stream] = mean;
01032
01033 // Pass it on to the data stream
01034 if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
01035 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 926 of file IoInputModule.cxx. References fDataStreamItr, fStreamPerOwnedDisabledMap, and IoDataStreamItr::SetPerOwnedDisabled(). Referenced by UpdateStreamConfig(). 00928 {
00929 //======================================================================
00930 // Used to disable Persistency Ownership of records for this stream
00931 //======================================================================
00932 // Insert the perowneddisabled bool into the map
00933 fStreamPerOwnedDisabledMap[stream] = perowneddisabled;
00934
00935 // Pass the info on to the data stream
00936 if (fDataStreamItr) {
00937 fDataStreamItr->SetPerOwnedDisabled(stream, perowneddisabled);
00938 }
00939
00940 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 1039 of file IoInputModule.cxx. References fDataStreamItr, fStreamPushRandomMap, and IoDataStreamItr::SetPushRandom(). 01040 {
01041 //======================================================================
01042 // Define whether to push a random or constant number of events to mom
01043 // for this stream for kSequential and kRandom sequence modes
01044 //======================================================================
01045 fStreamPushRandomMap[stream] = setRandom;
01046
01047 // Pass it on to the data stream
01048 if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
01049 }
|
|
|
Reimplemented from JobCInputModule. Definition at line 1053 of file IoInputModule.cxx. References fDataStreamItr, fRandomSeed, and IoDataStreamItr::SetRandomSeed(). 01054 {
01055 //======================================================================
01056 // Set the random seed for SetPushRandom(stream,true) case
01057 // for kSequential and kRandom sequence modes
01058 //======================================================================
01059 fRandomSeed = rSeed;
01060
01061 // Pass it on to the data stream
01062 if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01063
01064 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 961 of file IoInputModule.cxx. References fDataStreamItr, fStreamSeqModeMap, and IoDataStreamItr::SetSequenceMode(). 00962 {
00963 //======================================================================
00964 // Define stream sequence mode
00965 //======================================================================
00966 // Insert the sequence mode into the map
00967 fStreamSeqModeMap[stream] = sequenceMode;
00968
00969 // Pass the info on to the data stream
00970 if ( fDataStreamItr ) {
00971 fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00972 }
00973
00974 }
|
|
||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 978 of file IoInputModule.cxx. References fDataStreamItr, fStreamTestModeMap, and IoDataStreamItr::SetTestMode(). 00979 {
00980 //======================================================================
00981 // Define stream test mode
00982 //======================================================================
00983 // Insert the test mode into the map
00984 fStreamTestModeMap[stream] = testMode;
00985
00986 // Pass the info on to the data stream
00987 if ( fDataStreamItr ) {
00988 fDataStreamItr->SetTestMode(stream, testMode);
00989 }
00990
00991 }
|
|
||||||||||||||||
|
Reimplemented from JobCInputModule. Definition at line 995 of file IoInputModule.cxx. References fDataStreamItr, fStreamWindowMap, and IoDataStreamItr::SetWindow(). 00996 {
00997 //======================================================================
00998 // Define stream window if kWindow sequence mode is used
00999 //======================================================================
01000 fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
01001
01002 // Pass the info on to the data stream
01003 if ( fDataStreamItr ) {
01004 fDataStreamItr->SetWindow(stream, lower, upper);
01005 }
01006
01007 }
|
|
|
Definition at line 1306 of file IoInputModule.cxx. References CloseStreamItr(), fClientName, fClientType, fDataSource, fDataStreamItr, fKeepUpMode, fMaxSyncDelay, fOffLine, fPort, fServer, fTimeOut, IoDDSStreamItr::GetClientName(), IoDDSStreamItr::GetClientType(), IoDDSStreamItr::GetPort(), IoDataStreamItr::GetSourceName(), and IoDDSStreamItr::SetTimeOut(). Referenced by Config(), and OpenStreamItr(). 01306 {
01307 //======================================================================
01308 // Update the dispatcher configuration
01309 //======================================================================
01310 if ( fDataStreamItr == 0 ) return;
01311
01312 IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01313 if ( ! ddsItr ) return;
01314
01315 ddsItr->SetTimeOut(fTimeOut);
01316
01317 // Need to reinitialize dispatcher if server hostname, port, clienttype
01318 // or clientname have changed
01319 bool reinit = (fServer != ddsItr->GetSourceName()
01320 || fPort != ddsItr->GetPort()
01321 || fClientType != ddsItr->GetClientType()
01322 || fClientName != ddsItr->GetClientName() );
01323
01324 if ( reinit ) {
01325 this -> CloseStreamItr(); // wait for next action to reopen
01326 }
01327 else {
01328 ddsItr -> SetKeepUpMode(fKeepUpMode);
01329 ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01330 ddsItr -> SetDataSource(fDataSource);
01331 ddsItr -> SetOffLine(fOffLine);
01332 }
01333
01334 }
|
|
|
Definition at line 1357 of file IoInputModule.cxx. References AddFile(), fDataStreamItr, fFileList, IoFileListItem::GetAt(), IoFileListItem::GetFileStreamMap(), and RemoveFile(). Referenced by OpenStreamItr(). 01358 {
01359 //======================================================================
01360 // Update the file list
01361 //======================================================================
01362
01363 if ( fDataStreamItr == 0 ) return;
01364
01365 // Fresh start. This should typically only be called when data stream
01366 // iterator is newly opened (i.e. when the format changes)
01367 fDataStreamItr -> RemoveFile("*");
01368
01369 std::list<IoFileListItem>::iterator itr = fFileList.begin();
01370 for ( ; itr != fFileList.end(); itr++ ) {
01371 IoFileListItem& iofile = *itr;
01372 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01373 int at = iofile.GetAt();
01374
01375 if ( at < 0 ) {
01376 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01377 for ( ; itr != filemap.end(); itr++ ) {
01378 std::string filename = itr -> first;
01379 std::string streamlist = itr -> second;
01380 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01381 }
01382 }
01383 else {
01384 // Apply files in reverse to have first file in list inserted at pos At
01385 IoFileListItem::FileStreamMap::const_reverse_iterator itr
01386 = filemap.rbegin();
01387 for ( ; itr != filemap.rend(); itr++ ) {
01388 std::string filename = itr -> first;
01389 std::string streamlist = itr -> second;
01390 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01391 }
01392 }
01393 }
01394
01395 return;
01396
01397 }
|
|
|
Definition at line 1339 of file IoInputModule.cxx. References CloseStreamItr(), fDataStreamItr, fFormat, and IoDataStreamItr::GetFormat(). Referenced by Config(). 01339 {
01340 //======================================================================
01341 // Update the stream itr to match requested format
01342 //======================================================================
01343
01344 if ( fDataStreamItr == 0 ) return;
01345
01346 bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01347 if ( reopen ) {
01348 this -> CloseStreamItr(); // wait for next action to reopen
01349 }
01350
01351 return;
01352
01353 }
|
|
|
Definition at line 1401 of file IoInputModule.cxx. References IoDataStreamItr::DefineStream(), fDataStreamItr, fRandomSeed, fStreamDefMap, fStreamList, fStreamMaxRepeatMap, fStreamMeanMap, fStreamPerOwnedDisabledMap, fStreamPushRandomMap, fStreamRequiredMap, fStreamSelectionMap, fStreamSeqModeMap, fStreamTestModeMap, fStreamWindowMap, mapStrStrItr_t, IoDataStreamItr::Select(), IoDataStreamItr::SetMaxFileRepeat(), IoDataStreamItr::SetMeanMom(), SetPerOwnedDisabled(), IoDataStreamItr::SetPushRandom(), IoDataStreamItr::SetRandomSeed(), IoDataStreamItr::SetSequenceMode(), IoDataStreamItr::SetTestMode(), IoDataStreamItr::SetWindow(), and IoDataStreamItr::Streams(). Referenced by Config(), and OpenStreamItr(). 01402 {
01403 //======================================================================
01404 // Set the stream and selection cuts for the open streams
01405 //======================================================================
01406 if (fDataStreamItr==0) return;
01407
01408 // Define streams
01409 mapStrStrItr_t itr;
01410 for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01411 fDataStreamItr->DefineStream((itr->first).c_str(), // Stream
01412 (itr->second).c_str()); // Definition
01413 }
01414
01415 // Set the streams to be activated
01416 fDataStreamItr->Streams(fStreamList.c_str());
01417 // Set the selection cuts for each stream
01418 for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01419 std::map<std::string,bool>::const_iterator reqitr
01420 = fStreamRequiredMap.find(itr->first);
01421 bool isrequired = false;
01422 if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01423 fDataStreamItr->Select((itr->first).c_str(), // Stream
01424 (itr->second).c_str(), // Selection
01425 isrequired); // IsRequired
01426 }
01427
01428 // Set the test mode for each stream
01429 std::map<std::string,bool>::const_iterator testitr;
01430 for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01431 ++testitr ) {
01432 fDataStreamItr->SetTestMode((testitr->first).c_str(), // Stream
01433 testitr->second); // TestMode
01434 }
01435
01436 // Disable per owned for each stream requested
01437 std::map<std::string,bool>::const_iterator disableitr;
01438 for ( disableitr = fStreamPerOwnedDisabledMap.begin();
01439 disableitr != fStreamPerOwnedDisabledMap.end(); ++disableitr ) {
01440 fDataStreamItr -> SetPerOwnedDisabled((disableitr->first).c_str(),
01441 disableitr->second);
01442 }
01443
01444
01445 // Set the sequence mode for each stream
01446 bool setRandom = false;
01447 std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01448 for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01449 ++seqitr ) {
01450 fDataStreamItr->SetSequenceMode((seqitr->first).c_str(), // Stream
01451 seqitr->second); // Sequence Mode
01452 pair<double,double> window = fStreamWindowMap[seqitr->first];
01453 fDataStreamItr->SetWindow((seqitr->first).c_str(), // Stream
01454 window.first,window.second);
01455 if (seqitr->second == Per::kSequential ||
01456 seqitr->second == Per::kRandom ) {
01457 if (!setRandom) {
01458 setRandom = true;
01459 fDataStreamItr->SetRandomSeed(fRandomSeed);
01460 }
01461 int repeat = fStreamMaxRepeatMap[seqitr->first];
01462 fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01463 double mean = fStreamMeanMap[seqitr->first];
01464 fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01465 bool pushRand = fStreamPushRandomMap[seqitr->first];
01466 fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01467 } // end if kSeq or kRand
01468 }
01469 }
|
|
|
Definition at line 138 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 137 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 146 of file IoInputModule.h. Referenced by GetCurrentFile(), and ReadHeader(). |
|
|
Definition at line 144 of file IoInputModule.h. Referenced by GoTo(), and ReadHeader(). |
|
|
Definition at line 145 of file IoInputModule.h. Referenced by GoTo(), and ReadHeader(). |
|
|
Definition at line 131 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 117 of file IoInputModule.h. Referenced by AddFile(), CloseStreamItr(), DefineStream(), Get(), GetCurrentFile(), GoTo(), GoToFile(), List(), Next(), NextFile(), OpenStreamItr(), Prev(), PrevFile(), RemoveFile(), Select(), SetMaxFileRepeat(), SetMeanMom(), SetPerOwnedDisabled(), SetPushRandom(), SetRandomSeed(), SetSequenceMode(), SetTestMode(), SetWindow(), UpdateDDSConfig(), UpdateFileList(), UpdateFormatConfig(), UpdateStreamConfig(), and ~IoInputModule(). |
|
|
Definition at line 116 of file IoInputModule.h. Referenced by AddFile(), List(), RemoveFile(), and UpdateFileList(). |
|
|
Definition at line 119 of file IoInputModule.h. Referenced by Config(), Get(), List(), OpenStreamItr(), and UpdateFormatConfig(). |
|
|
Definition at line 132 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 148 of file IoInputModule.h. |
|
|
Definition at line 149 of file IoInputModule.h. |
|
|
Definition at line 147 of file IoInputModule.h. Referenced by GetLastFile(), and ReadHeader(). |
|
|
Definition at line 142 of file IoInputModule.h. Referenced by GoTo(), and ReadHeader(). |
|
|
Definition at line 143 of file IoInputModule.h. Referenced by ReadHeader(). |
|
|
Definition at line 150 of file IoInputModule.h. Referenced by LoadFilesFromCommandLine(). |
|
|
Definition at line 135 of file IoInputModule.h. Referenced by Config(). |
|
|
Definition at line 133 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 134 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 129 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 125 of file IoInputModule.h. Referenced by SetRandomSeed(), and UpdateStreamConfig(). |
|
|
Definition at line 136 of file IoInputModule.h. Referenced by Config(). |
|
|
Definition at line 128 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
|
|
Definition at line 141 of file IoInputModule.h. Referenced by CloseStreamItr(), Get(), GoTo(), GoToFile(), Next(), NextFile(), OpenStreamItr(), Prev(), PrevFile(), and ReadHeader(). |
|
|
Definition at line 151 of file IoInputModule.h. Referenced by EndJob(), Get(), IoInputModule(), Next(), and Prev(). |
|
|
Definition at line 110 of file IoInputModule.h. Referenced by DefineStream(), and UpdateStreamConfig(). |
|
|
Definition at line 120 of file IoInputModule.h. Referenced by Config(), and UpdateStreamConfig(). |
|
|
Definition at line 122 of file IoInputModule.h. Referenced by SetMaxFileRepeat(), and UpdateStreamConfig(). |
|
|
Definition at line 123 of file IoInputModule.h. Referenced by SetMeanMom(), and UpdateStreamConfig(). |
|
|
Definition at line 112 of file IoInputModule.h. Referenced by SetPerOwnedDisabled(), and UpdateStreamConfig(). |
|
|
Definition at line 124 of file IoInputModule.h. Referenced by SetPushRandom(), and UpdateStreamConfig(). |
|
|
Definition at line 109 of file IoInputModule.h. Referenced by Select(), and UpdateStreamConfig(). |
|
|
Definition at line 108 of file IoInputModule.h. Referenced by Select(), and UpdateStreamConfig(). |
|
|
Definition at line 111 of file IoInputModule.h. Referenced by SetSequenceMode(), and UpdateStreamConfig(). |
|
|
Definition at line 113 of file IoInputModule.h. Referenced by SetTestMode(), and UpdateStreamConfig(). |
|
|
Definition at line 114 of file IoInputModule.h. Referenced by SetWindow(), and UpdateStreamConfig(). |
|
|
Definition at line 130 of file IoInputModule.h. Referenced by Config(), and UpdateDDSConfig(). |
1.3.9.1