00001
00002
00003
00004
00005
00006
00008 #include "TSystem.h"
00009 #include "TRegexp.h"
00010
00011 #include "Dispatcher/DDS.h"
00012 #include "IoModules/IoInputModule.h"
00013 #include <cassert>
00014 #include "MessageService/MsgService.h"
00015 #include "MinosObjectMap/MomNavigator.h"
00016 #include "IoModules/IoDataStreamItr.h"
00017 #include "IoModules/IoDDSStreamItr.h"
00018 #include "IoModules/IoDataStreamFactory.h"
00019 #include "JobControl/JobCInputModule.h"
00020 #include "JobControl/JobCModuleRegistry.h"
00021 #include "JobControl/JobCEnv.h"
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqSnarlHeader.h"
00024 #include "CandData/CandHeader.h"
00025 #include "Record/RecRecord.h"
00026 #include "Record/RecPhysicsHeader.h"
00027 #include "Registry/Registry.h"
00028 #include "Validity/VldContext.h"
00029 #include "Validity/VldTimeStamp.h"
00030 #include "Util/UtilString.h"
00031 #include "TSystem.h"
00032 #include "TRandom3.h"
00033
00034 #include <algorithm>
00035 #include <cstring>
00036 #include <string>
00037 #include <map>
00038
00039 #ifdef SITE_HAS_SAM
00040 #include "sam_cpp_api/SamConsumer.hpp"
00041 #include "sam_cpp_api/SamLocate.hpp"
00042 #endif
00043
00044 CVSID("$Id: IoInputModule.cxx,v 1.86 2009/01/05 05:55:13 schubert Exp $");
00045 JOBMODULE(IoInputModule,"INPUT","Read and configure input streams");
00046
00047 typedef std::map<std::string,std::string>::const_iterator mapStrStrItr_t;
00048
00049
00050 static const JobCResult gsAllClear = JobCResult::kAOK;
00051
00052
00053
00054
00055
00056 class CallDepth {
00057 public:
00058 CallDepth() { ++fsDepth; }
00059 ~CallDepth() { --fsDepth; }
00060 static int fsDepth;
00061 };
00062 int CallDepth::fsDepth = 0;
00063
00064
00065
00066 IoInputModule::IoInputModule() :
00067 fDataStreamItr(0),
00068 fFormat(""),
00069 fStreamList(""),
00070 fServer(""),
00071 fPort(0),
00072 fTimeOut(0),
00073 fDataSource(0),
00074 fKeepUpMode(0),
00075 fMaxSyncDelay(0),
00076 fOffLine(false),
00077 fMaxRetry(0),
00078 fRetryDelay(1),
00079 fClientType(DDS::kUnknownClientType),
00080 fClientName(""),
00081 fStatus(JobCResult::kAOK),
00082 fLastRun(-1),
00083 fLastSnarl(-1),
00084 fCurrentRun(-1),
00085 fCurrentSnarl(-1),
00086 fLoadedCommandLineFiles(false)
00087 #ifdef SITE_HAS_SAM
00088 ,fsamProject(0)
00089 #endif
00090 { fStopwatch.Reset(); fStopwatch.Stop(); }
00091
00092
00093
00094 IoInputModule::~IoInputModule()
00095 {
00096 if ( fDataStreamItr ) { delete fDataStreamItr; fDataStreamItr = 0; }
00097 }
00098
00099
00100
00101 void IoInputModule::BeginJob()
00102 {
00103 this->LoadFilesFromCommandLine();
00104
00105 }
00106
00107
00108
00109
00110 void IoInputModule::EndJob()
00111 {
00112 fStopwatch.Stop();
00113 MSG("Io",Msg::kDebug) << "IoInputModule::EndJob, Time(sec), Real "
00114 << fStopwatch.RealTime() << ", CPU "
00115 << fStopwatch.CpuTime() << endl;
00116
00117 }
00118
00119
00120
00121 const Registry& IoInputModule::DefaultConfig() const
00122 {
00123
00124
00125
00126 static Registry r;
00127 r.SetName("INPUT.config");
00128
00129 r.UnLockValues();
00130
00131 MSG("Io",Msg::kDebug) << "Loading default config\n";
00132
00133
00134 r.Set("Format" ,"input");
00135 r.Set("Streams","DaqMonitor,DaqSnarl,LightInjection");
00136
00137
00138
00139 r.Set("DDSServer", "daqdds.minos-soudan.org");
00140 r.Set("DDSPort", DDS::kPort);
00141 r.Set("DDSTimeOut", 120);
00142 r.Set("DDSDataSource","Daq");
00143 r.Set("DDSKeepUpMode", "FileKeepUp");
00144 r.Set("DDSMaxSyncDelay",15);
00145 r.Set("DDSOffLine",false);
00146 r.Set("DDSMaxRetry",0);
00147 r.Set("DDSRetryDelay",1);
00148 r.Set("DDSClientType","Unknown");
00149 r.Set("DDSClientName","");
00150
00151 #ifdef SITE_HAS_SAM
00152
00153
00154 r.Set("Station","minos");
00155 r.Set("SnapShotVers",0);
00156 r.Set("WorkGroupName","minos");
00157 r.Set("ApplicationName","loon");
00158 r.Set("ApplicationVers","dev");
00159 r.Set("MaxNumberOfFiles",0);
00160 r.Set("StartNewProject",1);
00161
00162
00163
00164 const char* username = gSystem->Getenv("USER");
00165 if (!username) username = "unknown";
00166 r.Set("ProjectName",username);
00167
00168 #endif
00169
00170 r.LockValues();
00171 return r;
00172 }
00173
00174
00175
00176 void IoInputModule::Config(const Registry& r)
00177 {
00178
00179
00180
00181 const char* tmps;
00182 int tmpi;
00183 int tmpb;
00184
00185 MSG("Io",Msg::kDebug) << "Config IoInputModule with r=" << r << "\n";
00186
00187
00188 bool doFormatConfig = false;
00189 if (r.Get("Format", tmps)) { fFormat = tmps; doFormatConfig = true; }
00190 if (doFormatConfig) this->UpdateFormatConfig();
00191 bool doStreamConfig = false;
00192 if (r.Get("Streams",tmps)) { fStreamList = tmps; doStreamConfig = true; }
00193 if (doStreamConfig) this->UpdateStreamConfig();
00194
00195
00196 bool doDDSConfig = false;
00197 if (r.Get("DDSServer", tmps)) { fServer = tmps; doDDSConfig = true; }
00198 if (r.Get("DDSPort", tmpi)) { fPort = tmpi; doDDSConfig = true; }
00199 if (r.Get("DDSTimeOut",tmpi)) { fTimeOut = tmpi; doDDSConfig = true; }
00200 if (r.Get("DDSClientType",tmps)) { fClientType = DDS::GetClientType(tmps);
00201 doDDSConfig = true; }
00202 if (r.Get("DDSClientName",tmps)) { fClientName = tmps;
00203 doDDSConfig = true; }
00204 if (r.Get("DDSDataSource",tmps)){fDataSource = DDS::GetDataSourceCode(tmps);
00205 doDDSConfig = true;}
00206 if (r.Get("DDSKeepUpMode",tmps)) { fKeepUpMode = DDS::GetKeepUpCode(tmps);
00207 doDDSConfig = true; }
00208 if (r.Get("DDSMaxSyncDelay",tmpi)){fMaxSyncDelay = tmpi; doDDSConfig = true;}
00209 if (r.Get("DDSOffLine",tmpb)) {fOffLine = tmpb; doDDSConfig = true;}
00210 if (r.Get("DDSMaxRetry",tmpi)) {fMaxRetry = tmpi; doDDSConfig = true; }
00211 if (r.Get("DDSRetryDelay",tmpi)) {fRetryDelay = tmpi; doDDSConfig = true; }
00212 if (doDDSConfig) this->UpdateDDSConfig();
00213
00214
00215 }
00216
00217
00218
00219 JobCResult IoInputModule::Get()
00220 {
00221
00222
00223
00224
00225
00226 if ( fDataStreamItr==0 ) {
00227 if ( this->OpenStreamItr()==0 ) {
00228 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00229 return fStatus;
00230 }
00231 return this->Get();
00232 }
00233
00234 MSG("Io",Msg::kVerbose) << "IoInputModule::Get " << endl;
00235
00236 fStopwatch.Start(false);
00237 MomNavigator* mom = this->GetMom();
00238 assert(mom);
00239 mom -> Clear();
00240
00241 int nrecord = fDataStreamItr->LoadRecords(mom);
00242 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
00243
00244 if ( isDDS && !nrecord ) fStatus.SetEndOfInputStream();
00245
00246 this->ReadHeader();
00247 if ( fStatus.EndOfInputStream() )
00248 { fStatus.SetEndFile(); fStatus.SetEndRun(); }
00249 MSG("Io",Msg::kVerbose)
00250 << "IoInputModule::Get returning status " << fStatus << endl;
00251 fStopwatch.Stop();
00252 return fStatus;
00253 }
00254
00255
00256
00257 JobCResult IoInputModule::Next(int n)
00258 {
00259
00260
00261
00262
00263 CallDepth d;
00264
00265
00266 if (d.fsDepth==1) fStatus = gsAllClear;
00267
00268 MSG("Io",Msg::kVerbose) << "IoInputModule::Next " << n << endl;
00269
00270 if ( fDataStreamItr==0 ) {
00271 if (this->OpenStreamItr()==0) {
00272 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00273 return fStatus;
00274 }
00275 return this->Next(n);
00276 }
00277
00278 fStopwatch.Start(false);
00279 MomNavigator* mom = this->GetMom();
00280 assert(mom);
00281 mom -> Clear();
00282
00283
00284
00285 int nstep = 0;
00286 int ndone = 0;
00287 int ntry = 0;
00288 while ( ndone < n ) {
00289 ntry = n - ndone;
00290 nstep = fDataStreamItr->Increment(ntry);
00291
00292 if ( nstep < ntry ) {
00293
00294 fStatus |= this->NextFile();
00295
00296
00297 if ( fStatus.EndOfInputStream() ) {
00298 fStopwatch.Stop();
00299 return this->Get();
00300 }
00301 }
00302 ndone += nstep;
00303 }
00304
00305
00306 fStopwatch.Stop();
00307 return this->Get();
00308 }
00309
00310
00311
00312 JobCResult IoInputModule::Prev(int n)
00313 {
00314
00315
00316
00317
00318 CallDepth d;
00319
00320
00321 if (d.fsDepth == 1) fStatus = gsAllClear;
00322
00323 MSG("Io",Msg::kVerbose) << "IoInputModule::Prev " << n << endl;
00324
00325 if (fDataStreamItr==0) {
00326 if (this->OpenStreamItr()==0) {
00327 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00328 return fStatus;
00329 }
00330 return this->Prev(n);
00331 }
00332
00333
00334
00335 fStopwatch.Start(false);
00336 MomNavigator* mom = this->GetMom();
00337 assert(mom);
00338 mom -> Clear();
00339
00340 int nstep = 0;
00341 int ndone = 0;
00342 int ntry = 0;
00343 while (ndone < n) {
00344 ntry = n - ndone;
00345 nstep = fDataStreamItr->Decrement(ntry);
00346
00347 if (nstep < ntry) {
00348
00349 fStatus |= this->PrevFile();
00350
00351
00352 if ( fStatus.BeginOfInputStream() || fStatus.EndOfInputStream() ) {
00353 return this->Get();
00354 fStopwatch.Stop();
00355 }
00356
00357
00358
00359 fDataStreamItr->GoToEOF();
00360 }
00361 ndone += nstep;
00362 }
00363
00364
00365 fStopwatch.Stop();
00366 return this->Get();
00367
00368 }
00369
00370
00371
00372 JobCResult IoInputModule::GoTo(int run, int snarl, int searchDir)
00373 {
00374 CallDepth d;
00375
00376 if (d.fsDepth==1) fStatus = gsAllClear;
00377
00378 if (fDataStreamItr==0) {
00379 if (this->OpenStreamItr()==0) {
00380 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00381 return fStatus;
00382 }
00383 return this->GoTo(run,snarl,searchDir);
00384 }
00385
00386 if ( run == fCurrentRun && snarl == fCurrentSnarl ) return this -> Get();
00387
00388 int dir = searchDir;
00389 if (dir==0) {
00390 if (run>fLastRun) { dir = 1; }
00391 else if (run<fLastRun || (run==fLastRun && fCurrentRun < 0)) { dir = -1; }
00392 else {
00393 if (snarl>fLastSnarl) { dir = 1; }
00394 else { dir = -1; }
00395 }
00396 }
00397
00398
00399 while ( 1 ) {
00400 if ( dir > 0 ) {
00401 this->Next();
00402 if (fCurrentRun>run) {
00403 MSG("Io",Msg::kWarning) <<
00404 "Went to run "<<fCurrentRun<<
00405 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00406 return fStatus;
00407 }
00408 if (fCurrentRun==run && fCurrentSnarl>snarl) {
00409 MSG("Io",Msg::kWarning) <<
00410 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00411 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00412 return fStatus;
00413 }
00414 }
00415 if ( dir <0 ) {
00416 this->Prev();
00417 if (fCurrentRun<run) {
00418 MSG("Io",Msg::kWarning) <<
00419 "Went to run "<<fCurrentRun<<
00420 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00421 return fStatus;
00422 }
00423 if (fCurrentRun==run && fCurrentSnarl<snarl) {
00424 MSG("Io",Msg::kWarning) <<
00425 "Went to run="<<fCurrentRun<<" snarl="<<fCurrentSnarl<<
00426 " without finding run="<<run<<" snarl="<<snarl<<"\n";
00427 return fStatus;
00428 }
00429 }
00430
00431 if (fCurrentRun == run && fCurrentSnarl == snarl) return fStatus;
00432 if (dir>0 && fStatus.EndOfInputStream()) return fStatus;
00433 if (dir<0 && fStatus.BeginOfInputStream()) return fStatus;
00434 }
00435 return fStatus;
00436 }
00437
00438
00439
00440 JobCResult IoInputModule::GoTo(const VldContext& vld)
00441 {
00442
00443
00444
00445
00446 CallDepth d;
00447
00448
00449 if (d.fsDepth==1) fStatus = gsAllClear;
00450
00451 if (fDataStreamItr==0) {
00452 if (this->OpenStreamItr()==0) {
00453 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00454 return fStatus;
00455 }
00456 return this->GoTo(vld);
00457 }
00458
00459 fStatus |= fDataStreamItr -> GoTo(vld);
00460
00461
00462 return this->Get();
00463
00464 }
00465
00466
00467
00468 void IoInputModule::List(const char* streamlist) const
00469 {
00470
00471
00472
00473
00474 MsgStream& m = MSGSTREAM("Io",Msg::kInfo);
00475
00476 m << "IoInputModule using data format " << fFormat << endl;
00477
00478 if ( fDataStreamItr ) {
00479 fDataStreamItr -> ListFile(std::cout,streamlist);
00480 return;
00481 }
00482
00483
00484 m << "File Name\tStream List " << endl;
00485 m << "=========\t=========== " << endl;
00486 std::list<IoFileListItem>::const_iterator itr = fFileList.begin();
00487 std::list<IoFileListItem>::const_iterator itrEnd = fFileList.end();
00488
00489 for ( ; itr != itrEnd; itr++ ) {
00490 m << *itr;
00491 }
00492
00493 }
00494
00495
00496
00497 void IoInputModule::AddFile(const char *filepath, const char* streamlist,
00498 int at) {
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508 const char *s1 = "SAM";
00509 if ( strstr(filepath,s1) != NULL ) {
00510
00511 #ifdef SITE_HAS_SAM
00512
00513
00514
00515 const char* tmps;
00516 int tmpi;
00517
00518 Registry& r = GetConfig();
00519 if (r.Get("Station",tmps)) {fStation = tmps;}
00520 if (r.Get("SnapShotVers",tmpi)){fSnapShotVers = tmpi;}
00521 if (r.Get("WorkGroupName",tmps)) {fWorkGroupName = tmps;}
00522 if (r.Get("ApplicationName",tmps)) {fApplicationName = tmps;}
00523 if (r.Get("ApplicationVers",tmps)) {fApplicationVers = tmps;}
00524 if (r.Get("ProjectName",tmps)) {fProjectName = tmps;}
00525 if (r.Get("MaxNumberOfFiles",tmpi)) {fMaxNumberOfFiles = tmpi;}
00526 if (r.Get("StartNewProject",tmpi)) {fStartNewProject = tmpi;}
00527
00528
00529
00530
00531
00532 std::string temp = filepath;
00533 size_t pos = temp.find(":")+1;
00534 std::string sam_access_type = temp.substr(0,pos-1);
00535 if (sam_access_type == "SAM") {
00536 std::string samdataset = temp.substr(pos,temp.length()-pos);
00537
00538
00539 std::string projectname;
00540
00541
00542 if (fStartNewProject == 1) {
00543
00544
00545 VldTimeStamp ts;
00546 std::string timestamp = ts.AsString("lc");
00547
00548 size_t pos = timestamp.find(" ");
00549 timestamp.replace(pos,1,"-");
00550
00551 pos = timestamp.find(":");
00552 while ( pos != string::npos ) {
00553 timestamp.replace(pos,1,"-");
00554 pos = timestamp.find(":",pos+1);
00555 }
00556
00557 fProjectName.append("-");
00558 projectname = fProjectName+timestamp;
00559 }
00560
00561 else if(fStartNewProject == 0) {
00562 projectname = fProjectName;
00563 }
00564
00565
00566 MSG("Io",Msg::kDebug) << "Sam Station " << fStation << " Snap Shot " <<
00567 fSnapShotVers << " Work Group Name " << fWorkGroupName
00568 << " Application Name "
00569 << fApplicationName << " Application Version " << fApplicationVers <<
00570 " Project Name " << projectname << endl;
00571
00572
00573
00574
00575 long snapshot;
00576
00577 if ( fSnapShotVers == 0 ) {
00578 snapshot = sam::SamProject::NewSnapshotVersion;
00579 }
00580
00581 else if (fSnapShotVers < 0) {
00582 snapshot = sam::SamProject::LatestSnapshotVersion;
00583 }
00584 else if (fSnapShotVers > 0 ) {
00585
00586 snapshot = fSnapShotVers;
00587 }
00588
00589 MSG("Io",Msg::kDebug) << "SnapShot Version " << snapshot << endl;
00590
00591 if (fStartNewProject == 1) {
00592
00593
00594
00595 fsamProject = new sam::SamProject(projectname,fStation);
00596
00597
00598
00599 std::list<std::string> projectMasterArgList;
00600
00601 try {
00602 MSG("Io",Msg::kInfo) << "Starting SAM Project " << projectname <<
00603 " on station " << fStation << endl;
00604 fsamProject->startProject(fWorkGroupName,samdataset,snapshot,
00605 projectMasterArgList);
00606 }
00607 catch(const sam::SamProject::StartProjectRequestRejected& ex) {
00608 MSG("Io",Msg::kInfo) << "Rejected start SAM project request "
00609 << ex << endl;
00610 }
00611 }
00612
00613
00614
00615 const int projectMasterTimeout(60);
00616 const std::string processDescription("Loon Analysis Process");
00617
00618 try{
00619
00620
00621 sam::SamConsumer fsamConsumer(projectname,fStation,fWorkGroupName,
00622 fApplicationName,fApplicationVers,
00623 processDescription,
00624 fMaxNumberOfFiles,
00625 projectMasterTimeout);
00626
00627 MSG("Io",Msg::kInfo) << "Started SAM Consumer" << endl;
00628
00629
00630
00631
00632 std::map<std::string,std::string> filelist;
00633 map<std::string,std::string>::iterator fitr;
00634
00635 int location;
00636 int length;
00637 int comp;
00638 std::string fileonly;
00639 std::string restOfPath;
00640 std::string afsroot("afsroot:");
00641 try {
00642 while(true) {
00643 std::string filename = fsamConsumer.getFile().getFullFileName();
00644
00645
00646
00647
00648
00649 MSG("Io",Msg::kDebug) << "Filename " << filename << endl;
00650 location = filename.find_last_of("/");
00651 length = filename.length();
00652 fileonly = filename.substr(location+1,length-1);
00653
00654
00655 comp = filename.compare(0,8,afsroot);
00656 if (comp == 0 ) {
00657 restOfPath = filename.substr(8,location-8);
00658 }
00659 else {
00660 restOfPath = filename.substr(0,location);
00661 }
00662
00663 restOfPath.append("/");
00664 filelist.insert(make_pair(fileonly,restOfPath));
00665
00666 MSG("Io",Msg::kDebug) << "File Only " << fileonly << " Rest Of Path "
00667 << restOfPath << endl;
00668
00669
00670
00671 fsamConsumer.releaseFile();
00672 }
00673 }
00674 catch(const sam::SamConsumer::EndOfFileStreamReached& ex) {
00675 MSG("Io",Msg::kDebug) << "End of File Stream reached" << endl;
00676 }
00677
00678
00679
00680 std::string sfile;
00681 const char *samfile = 0;
00682 for (fitr = filelist.begin(); fitr != filelist.end(); fitr++) {
00683 sfile = (fitr->second+fitr->first);
00684 samfile = sfile.data();
00685 MSG("Io",Msg::kInfo) << "Adding File " << samfile << endl;
00686
00687 IoFileListItem iofile(samfile,at,streamlist);
00688 fFileList.push_back(iofile);
00689 }
00690 }
00691 catch(const sam::SamConsumer::InitializationError& ex) {
00692 MSG("Io",Msg::kInfo) << "Rejected start SAM Consumer request "
00693 << ex << endl;
00694 }
00695
00696
00697 if (fsamProject) {
00698 try {
00699 MSG("Io",Msg::kInfo) << "Requesting end of SAM project " << endl;
00700 fsamProject->endProject();
00701 }
00702 catch(const sam::SamProject::EndProjectRequestRejected& ex) {
00703 MSG("Io",Msg::kInfo) << "SAM Project end request rejected "<< ex << endl;
00704
00705 }
00706 catch(const sam::SamProject::EndProjectRequestFailed& ex) {
00707 MSG("Io",Msg::kInfo) << "SAM Project end request failed "<< ex << endl;
00708 }
00709 }
00710 }
00711 else if (sam_access_type == "SAM_FILE") {
00712
00713
00714 std::string samfile = temp.substr(pos,temp.length()-pos);
00715
00716
00717
00718 sam::LocationList samFiles;
00719 try {
00720 MSG("Io",Msg::kInfo) << "Locating file " << samfile << endl;
00721 samFiles = sam::locate(samfile);
00722
00723
00724 samFiles[0].insert(6,"fnal.gov/usr/");
00725
00726 TRandom3 rand(0);
00727 Double_t r = rand.Rndm();
00728 if (r <= 0.5) {
00729 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24125");
00730 }
00731 else if (r > 0.5) {
00732 samFiles[0].insert(0,"dcap://fndca1.fnal.gov:24136");
00733 }
00734
00735 samFiles[0].append("/");
00736 samFiles[0].append(samfile);
00737 MSG("Io",Msg::kInfo) << "Adding file " << samFiles[0].c_str() << " to input list" << endl;
00738
00739 IoFileListItem iofile(samFiles[0].c_str(),at,streamlist);
00740 fFileList.push_back(iofile);
00741 }
00742 catch(const sam::exception::DataFileNotFound& ex) {
00743 MSG("Io",Msg::kInfo) << ex << endl;
00744 }
00745 }
00746
00747 #endif // End of ifdef SITE_HAS_SAM
00748
00749 }
00750 else {
00751
00752
00753 IoFileListItem iofile(filepath,at,streamlist);
00754 fFileList.push_back(iofile);
00755
00756 if ( !fDataStreamItr ) return;
00757
00758
00759 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
00760
00761 if ( at < 0 ) {
00762 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
00763 for ( ; itr != filemap.end(); itr++ ) {
00764 std::string filename = itr -> first;
00765 std::string streamlist = itr -> second;
00766 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00767 }
00768 }
00769 else {
00770
00771 IoFileListItem::FileStreamMap::const_reverse_iterator itr=filemap.rbegin();
00772 for ( ; itr != filemap.rend(); itr++ ) {
00773 std::string filename = itr -> first;
00774 std::string streamlist = itr -> second;
00775 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
00776 }
00777 }
00778 }
00779 }
00780
00781
00782
00783 void IoInputModule::RemoveFile(const char* filename, const char* streamlist) {
00784
00785
00786
00787
00788 if ( fDataStreamItr ) fDataStreamItr -> RemoveFile(filename,streamlist);
00789
00790 std::string f(filename);
00791 std::list<IoFileListItem>::iterator itr = fFileList.end();
00792 while ( !fFileList.empty() && itr != fFileList.begin() ) {
00793 itr--;
00794 IoFileListItem& iofile = *itr;
00795 iofile.RemoveFile(filename,streamlist);
00796 if ( iofile.GetNumFile() == 0 ) fFileList.erase(itr);
00797 }
00798
00799 return;
00800
00801 }
00802
00803
00804
00805 JobCResult IoInputModule::NextFile(int n, const char* streamlist)
00806 {
00807
00808
00809
00810 CallDepth d;
00811
00812 if (d.fsDepth==1) fStatus = gsAllClear;
00813
00814 if (fDataStreamItr==0) {
00815 if (this->OpenStreamItr()==0) {
00816 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00817 return fStatus;
00818 }
00819 return this->NextFile(n,streamlist);
00820 }
00821
00822 fStatus |= fDataStreamItr -> NextFile(n,streamlist);
00823
00824 MSG("Io",Msg::kDebug)
00825 << "status is " << fStatus
00826 << " current file is " << fDataStreamItr->GetCurrentFile() << endl;
00827
00828 return fStatus;
00829
00830 }
00831
00832
00833
00834 JobCResult IoInputModule::PrevFile(int n, const char* streamlist)
00835 {
00836
00837
00838
00839 CallDepth d;
00840
00841 if (d.fsDepth==1) fStatus = gsAllClear;
00842
00843 if (fDataStreamItr==0) {
00844 if (this->OpenStreamItr()==0) {
00845 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00846 return fStatus;
00847 }
00848 return this->PrevFile(n,streamlist);
00849 }
00850
00851 fStatus |= fDataStreamItr -> PrevFile(n,streamlist);
00852
00853 return fStatus;
00854
00855 }
00856
00857
00858
00859 JobCResult IoInputModule::GoToFile(int n, const char* streamlist)
00860 {
00861
00862
00863
00864 CallDepth d;
00865
00866 if (d.fsDepth==1) fStatus = gsAllClear;
00867
00868 if (fDataStreamItr==0) {
00869 if (this->OpenStreamItr()==0) {
00870 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00871 return fStatus;
00872 }
00873 return this->GoToFile(n,streamlist);
00874 }
00875
00876 fStatus |= fDataStreamItr -> GoToFile(n,streamlist);
00877
00878 return fStatus;
00879
00880 }
00881
00882
00883
00884 JobCResult IoInputModule::GoToFile(const char* filename, const char*streamlist){
00885
00886
00887
00888 CallDepth d;
00889
00890 if (d.fsDepth==1) fStatus = gsAllClear;
00891
00892 if (fDataStreamItr==0) {
00893 if (this->OpenStreamItr()==0) {
00894 MSG("Io",Msg::kWarning) << "Failed to open input stream itr." << endl;
00895 return fStatus;
00896 }
00897 return this->GoToFile(filename,streamlist);
00898 }
00899
00900 fStatus |= fDataStreamItr -> GoToFile(filename,streamlist);
00901
00902 return fStatus;
00903 }
00904
00905
00906
00907 void IoInputModule::Select(const char* stream, const char* select,
00908 bool isRequired)
00909 {
00910
00911
00912
00913
00914 fStreamSelectionMap[stream] = select;
00915 fStreamRequiredMap[stream] = isRequired;
00916
00917
00918 if (fDataStreamItr) {
00919 fDataStreamItr->Select(stream, select,isRequired);
00920 }
00921
00922 }
00923
00924
00925
00926 void IoInputModule::SetPerOwnedDisabled(const char* stream,
00927 bool perowneddisabled)
00928 {
00929
00930
00931
00932
00933 fStreamPerOwnedDisabledMap[stream] = perowneddisabled;
00934
00935
00936 if (fDataStreamItr) {
00937 fDataStreamItr->SetPerOwnedDisabled(stream, perowneddisabled);
00938 }
00939
00940 }
00941
00942
00943
00944
00945 void IoInputModule::DefineStream(const char* stream, const char* tree) {
00946
00947
00948
00949
00950 fStreamDefMap[stream] = tree;
00951
00952
00953 if ( fDataStreamItr ) {
00954 fDataStreamItr->DefineStream(stream, tree);
00955 }
00956
00957 }
00958
00959
00960
00961 void IoInputModule::SetSequenceMode(const char* stream,
00962 Per::ESequenceMode sequenceMode) {
00963
00964
00965
00966
00967 fStreamSeqModeMap[stream] = sequenceMode;
00968
00969
00970 if ( fDataStreamItr ) {
00971 fDataStreamItr->SetSequenceMode(stream, sequenceMode);
00972 }
00973
00974 }
00975
00976
00977
00978 void IoInputModule::SetTestMode(const char* stream,
00979 bool testMode) {
00980
00981
00982
00983
00984 fStreamTestModeMap[stream] = testMode;
00985
00986
00987 if ( fDataStreamItr ) {
00988 fDataStreamItr->SetTestMode(stream, testMode);
00989 }
00990
00991 }
00992
00993
00994
00995 void IoInputModule::SetWindow(const char* stream, double lower, double upper)
00996 {
00997
00998
00999
01000 fStreamWindowMap[stream] = std::pair<double,double>(lower,upper);
01001
01002
01003 if ( fDataStreamItr ) {
01004 fDataStreamItr->SetWindow(stream, lower, upper);
01005 }
01006
01007 }
01008
01009
01010
01011 void IoInputModule::SetMaxFileRepeat(const char* stream, int numRepeat)
01012 {
01013
01014
01015
01016
01017 fStreamMaxRepeatMap[stream] = numRepeat;
01018
01019
01020 if ( fDataStreamItr ) fDataStreamItr->SetMaxFileRepeat(stream,numRepeat);
01021 }
01022
01023
01024
01025 void IoInputModule::SetMeanMom(const char* stream, double mean)
01026 {
01027
01028
01029
01030
01031 fStreamMeanMap[stream] = mean;
01032
01033
01034 if ( fDataStreamItr ) fDataStreamItr->SetMeanMom(stream,mean);
01035 }
01036
01037
01038
01039 void IoInputModule::SetPushRandom(const char* stream, bool setRandom)
01040 {
01041
01042
01043
01044
01045 fStreamPushRandomMap[stream] = setRandom;
01046
01047
01048 if ( fDataStreamItr ) fDataStreamItr->SetPushRandom(stream,setRandom);
01049 }
01050
01051
01052
01053 void IoInputModule::SetRandomSeed(int rSeed)
01054 {
01055
01056
01057
01058
01059 fRandomSeed = rSeed;
01060
01061
01062 if ( fDataStreamItr ) fDataStreamItr->SetRandomSeed(rSeed);
01063
01064 }
01065
01066
01067
01068 const char* IoInputModule::GetCurrentFile(const char* streamname) const
01069 {
01070 MSG("Io",Msg::kDebug) << "IoInputModule::GetCurrentFile()" << endl;
01071 mapStrStrItr_t it, done=fCurrentFileMap.end();
01072 std::string strmstring = streamname;
01073
01074 for (it = fCurrentFileMap.begin(); it !=done; ++it) {
01075 MSG("Io",Msg::kVerbose)
01076 << "stream: " << setw(16) << it->first
01077 << " file: " << it->second
01078 <<endl;
01079 }
01080
01081 for (it = fCurrentFileMap.begin(); it!=done; ++it) {
01082 if ( strmstring == it->first ) return it->second.c_str();
01083 }
01084
01085 if (!fDataStreamItr) return 0;
01086 return fDataStreamItr->GetCurrentFile(streamname);
01087 }
01088
01089 const char* IoInputModule::GetLastFile(const char* streamname) const
01090 {
01091 MSG("Io",Msg::kInfo) << "IoInputModule::GetLastFile()" << endl;
01092 mapStrStrItr_t it, done = fLastFileMap.end();
01093 std::string strmstring = streamname;
01094
01095 for (it = fLastFileMap.begin(); it!=done; ++it) {
01096 MSG("Io",Msg::kVerbose)
01097 << "stream: " << setw(16) << it->first
01098 << " file: " << it->second
01099 <<endl;
01100 }
01101
01102 for (it = fLastFileMap.begin(); it!=done; ++it) {
01103 if ( strmstring == it->first ) return it->second.c_str();
01104 }
01105 return 0;
01106
01107 }
01108
01109
01110
01111 void IoInputModule::LoadFilesFromCommandLine()
01112 {
01113
01114
01115
01116 JobCEnv& jce = JobCEnv::Instance();
01117 if (!fLoadedCommandLineFiles) {
01118 for (int i=0; i<jce.GetNfile(); ++i) {
01119 this->AddFile(jce.GetFileName(i));
01120 }
01121 fLoadedCommandLineFiles = true;
01122 }
01123 }
01124
01125
01126
01127 int IoInputModule::ReadHeader()
01128 {
01129
01130
01131
01132
01133
01134
01135
01136 const MomNavigator* mom = this->GetMom();
01137 assert(mom);
01138
01139
01140
01141
01142
01143 const TObjArray* momarray = mom->GetFragmentArray();
01144 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01145 TObject* obj = momarray->At(i);
01146 if (!obj) continue;
01147 Registry* temptags = 0;
01148 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01149 temptags = &(record->GetTempTags());
01150 }
01151 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01152 temptags = &(record->GetTempTags());
01153 }
01154 if ( ! temptags ) continue;
01155
01156
01157 const char* tagstream = 0;
01158 if ( ! temptags->Get("stream",tagstream) ) continue;
01159
01160
01161 std::string streamname(tagstream);
01162 const char* tagnewfile = 0;
01163 if ( ! temptags->Get("file",tagnewfile) ) continue;
01164
01165 std::string lstfilename = fLastFileMap[streamname];
01166 std::string curfilename = fCurrentFileMap[streamname];
01167 std::string newfilename(tagnewfile);
01168
01169 if ( newfilename != curfilename ) {
01170
01171 std::string starcur = fCurrentFileMap.begin()->first;
01172 std::string starlast = fLastFileMap.begin()->first;
01173
01174 MSG("Io",Msg::kDebug)
01175 << "SetBeginFile on streamname '" << streamname << "'" << endl
01176 << " current '" << fCurrentFileMap[streamname] << "'" << endl
01177 << " last '" << fLastFileMap[streamname] << "'" << endl
01178 << " current['" << starcur << "'] '"
01179 << fCurrentFileMap[starcur] << "'" << endl
01180 << " last['" << starlast << "'] '"
01181 << fLastFileMap[starlast] << "'" << endl
01182 << " new '" << newfilename << "' != "
01183 << " cur '" << curfilename << "'" << endl
01184 << " update \"*\" ? "
01185 << (( newfilename != fCurrentFileMap["*"] ) ? "yes":"no")
01186 << endl;
01187
01188
01189 if ( newfilename != fCurrentFileMap["*"] ) {
01190 fStatus.SetBeginFile();
01191
01192 if ( fCurrentFileMap["*"] != "" ) fStatus.SetEndFile();
01193
01194 fLastFileMap["*"] = fCurrentFileMap["*"];
01195 fCurrentFileMap["*"] = newfilename;
01196
01197 MSG("Io",Msg::kDebug)
01198 << "SetBeginFile on '*'" << endl
01199 << " current['" << starcur << "'] '"
01200 << fCurrentFileMap[starcur] << "'" << endl
01201 << " last['" << starlast << "'] '"
01202 << fLastFileMap[starlast] << "'" << endl;
01203 }
01204
01205 fLastFileMap[streamname] = curfilename;
01206 fCurrentFileMap[streamname] = newfilename;
01207
01208
01209
01210
01211
01212
01213 mapStrStrItr_t it, done = fCurrentFileMap.end();
01214 for (it = fCurrentFileMap.begin(); it != done; ++it) {
01215 if ( it->second == curfilename ) {
01216 std::string altstream = it->first;
01217 fLastFileMap[altstream] = curfilename;
01218 fCurrentFileMap[altstream] = newfilename;
01219 }
01220 }
01221
01222 }
01223
01224 MSG("Io",Msg::kVerbose)
01225 << " stream '" << streamname << "' set fLastFileMap to '"
01226 << curfilename << "', fCurrentFileMap to '"
01227 << newfilename << "'"
01228 << " * '" << fLastFileMap["*"] << "' '" << fCurrentFileMap["*"] << "'"
01229 << endl;
01230
01231 }
01232
01233
01234
01235
01236
01237
01238
01239 if ( ! fStatus.BeginFile() ) fLastFileMap["*"] = fCurrentFileMap["*"];
01240
01241
01242 int run = -1;
01243 int snarl = -1;
01244 for (int i=0; i < momarray->GetEntriesFast(); ++i) {
01245 TObject* obj = momarray->At(i);
01246 if (!obj) continue;
01247 if ( RecMinos* record = dynamic_cast<RecMinos*>(obj) ) {
01248
01249
01250
01251 const RawDaqHeader* rdh
01252 = dynamic_cast<const RawDaqHeader*>(record->GetHeader());
01253 if (rdh) run = rdh->GetRun();
01254
01255
01256 const RawDaqSnarlHeader* rdsh
01257 = dynamic_cast<const RawDaqSnarlHeader*>(record->GetHeader());
01258 if (rdsh) snarl = rdsh->GetSnarl();
01259
01260 if (!rdh) {
01261
01262 const CandHeader* candhdr
01263 = dynamic_cast<const CandHeader*>(record->GetHeader());
01264 if (candhdr) {
01265 run = candhdr->GetRun();
01266 snarl = candhdr->GetSnarl();
01267 }
01268 }
01269 }
01270 else if ( RecRecord* record = dynamic_cast<RecRecord*>(obj) ) {
01271
01272 const RecPhysicsHeader* rph
01273 = dynamic_cast<const RecPhysicsHeader*>(&(record->GetHeader()));
01274 if ( rph ) {
01275 run = rph->GetRun();
01276 snarl = rph->GetSnarl();
01277 }
01278 }
01279
01280
01281 if ( snarl >= 0 ) break;
01282 }
01283
01284
01285 fCurrentSnarl = snarl;
01286 if ( run < 0 ) {
01287 fCurrentRun = -1;
01288 return 0;
01289 }
01290 if ( run != fCurrentRun ) {
01291 fStatus.SetBeginRun();
01292 if ( fLastRun >= 0 ) fStatus.SetEndRun();
01293 }
01294 fLastRun = fCurrentRun;
01295 fCurrentRun = run;
01296 if ( snarl >= 0 ) {
01297 fLastSnarl = fCurrentSnarl;
01298 fCurrentSnarl = snarl;
01299 return 2;
01300 }
01301 return 1;
01302 }
01303
01304
01305
01306 void IoInputModule::UpdateDDSConfig() {
01307
01308
01309
01310 if ( fDataStreamItr == 0 ) return;
01311
01312 IoDDSStreamItr* ddsItr = dynamic_cast<IoDDSStreamItr*>(fDataStreamItr);
01313 if ( ! ddsItr ) return;
01314
01315 ddsItr->SetTimeOut(fTimeOut);
01316
01317
01318
01319 bool reinit = (fServer != ddsItr->GetSourceName()
01320 || fPort != ddsItr->GetPort()
01321 || fClientType != ddsItr->GetClientType()
01322 || fClientName != ddsItr->GetClientName() );
01323
01324 if ( reinit ) {
01325 this -> CloseStreamItr();
01326 }
01327 else {
01328 ddsItr -> SetKeepUpMode(fKeepUpMode);
01329 ddsItr -> SetMaxSyncDelay(fMaxSyncDelay);
01330 ddsItr -> SetDataSource(fDataSource);
01331 ddsItr -> SetOffLine(fOffLine);
01332 }
01333
01334 }
01335
01336
01337
01338
01339 void IoInputModule::UpdateFormatConfig() {
01340
01341
01342
01343
01344 if ( fDataStreamItr == 0 ) return;
01345
01346 bool reopen = ( fFormat != fDataStreamItr->GetFormat() );
01347 if ( reopen ) {
01348 this -> CloseStreamItr();
01349 }
01350
01351 return;
01352
01353 }
01354
01355
01356
01357 void IoInputModule::UpdateFileList()
01358 {
01359
01360
01361
01362
01363 if ( fDataStreamItr == 0 ) return;
01364
01365
01366
01367 fDataStreamItr -> RemoveFile("*");
01368
01369 std::list<IoFileListItem>::iterator itr = fFileList.begin();
01370 for ( ; itr != fFileList.end(); itr++ ) {
01371 IoFileListItem& iofile = *itr;
01372 const IoFileListItem::FileStreamMap& filemap = iofile.GetFileStreamMap();
01373 int at = iofile.GetAt();
01374
01375 if ( at < 0 ) {
01376 IoFileListItem::FileStreamMapConstItr itr = filemap.begin();
01377 for ( ; itr != filemap.end(); itr++ ) {
01378 std::string filename = itr -> first;
01379 std::string streamlist = itr -> second;
01380 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01381 }
01382 }
01383 else {
01384
01385 IoFileListItem::FileStreamMap::const_reverse_iterator itr
01386 = filemap.rbegin();
01387 for ( ; itr != filemap.rend(); itr++ ) {
01388 std::string filename = itr -> first;
01389 std::string streamlist = itr -> second;
01390 fDataStreamItr -> AddFile(filename.c_str(),at,streamlist.c_str());
01391 }
01392 }
01393 }
01394
01395 return;
01396
01397 }
01398
01399
01400
01401 void IoInputModule::UpdateStreamConfig()
01402 {
01403
01404
01405
01406 if (fDataStreamItr==0) return;
01407
01408
01409 mapStrStrItr_t itr;
01410 for ( itr = fStreamDefMap.begin(); itr != fStreamDefMap.end(); itr++ ) {
01411 fDataStreamItr->DefineStream((itr->first).c_str(),
01412 (itr->second).c_str());
01413 }
01414
01415
01416 fDataStreamItr->Streams(fStreamList.c_str());
01417
01418 for (itr=fStreamSelectionMap.begin();itr!=fStreamSelectionMap.end();++itr) {
01419 std::map<std::string,bool>::const_iterator reqitr
01420 = fStreamRequiredMap.find(itr->first);
01421 bool isrequired = false;
01422 if ( reqitr != fStreamRequiredMap.end() ) isrequired = reqitr -> second;
01423 fDataStreamItr->Select((itr->first).c_str(),
01424 (itr->second).c_str(),
01425 isrequired);
01426 }
01427
01428
01429 std::map<std::string,bool>::const_iterator testitr;
01430 for ( testitr=fStreamTestModeMap.begin(); testitr !=fStreamTestModeMap.end();
01431 ++testitr ) {
01432 fDataStreamItr->SetTestMode((testitr->first).c_str(),
01433 testitr->second);
01434 }
01435
01436
01437 std::map<std::string,bool>::const_iterator disableitr;
01438 for ( disableitr = fStreamPerOwnedDisabledMap.begin();
01439 disableitr != fStreamPerOwnedDisabledMap.end(); ++disableitr ) {
01440 fDataStreamItr -> SetPerOwnedDisabled((disableitr->first).c_str(),
01441 disableitr->second);
01442 }
01443
01444
01445
01446 bool setRandom = false;
01447 std::map<std::string,Per::ESequenceMode>::const_iterator seqitr;
01448 for ( seqitr=fStreamSeqModeMap.begin(); seqitr!=fStreamSeqModeMap.end();
01449 ++seqitr ) {
01450 fDataStreamItr->SetSequenceMode((seqitr->first).c_str(),
01451 seqitr->second);
01452 pair<double,double> window = fStreamWindowMap[seqitr->first];
01453 fDataStreamItr->SetWindow((seqitr->first).c_str(),
01454 window.first,window.second);
01455 if (seqitr->second == Per::kSequential ||
01456 seqitr->second == Per::kRandom ) {
01457 if (!setRandom) {
01458 setRandom = true;
01459 fDataStreamItr->SetRandomSeed(fRandomSeed);
01460 }
01461 int repeat = fStreamMaxRepeatMap[seqitr->first];
01462 fDataStreamItr->SetMaxFileRepeat( (seqitr->first).c_str(), repeat );
01463 double mean = fStreamMeanMap[seqitr->first];
01464 fDataStreamItr->SetMeanMom( (seqitr->first).c_str(), mean );
01465 bool pushRand = fStreamPushRandomMap[seqitr->first];
01466 fDataStreamItr->SetPushRandom( (seqitr->first).c_str(), pushRand );
01467 }
01468 }
01469 }
01470
01471
01472
01473 int IoInputModule::OpenStreamItr()
01474 {
01475
01476
01477
01478 if (fDataStreamItr) this->CloseStreamItr();
01479
01480 bool isDDS = (UtilString::ToUpper(fFormat) == "DDS");
01481 std::string src;
01482 if ( isDDS ) src = fServer;
01483
01484 fDataStreamItr = IoDataStreamFactory::CreateDataStreamItr(src.c_str(),
01485 fFormat.c_str(),fPort,fMaxRetry,fRetryDelay,
01486 fClientType,fClientName);
01487
01488
01489 if (fDataStreamItr == 0) {
01490 MSG("Io",Msg::kWarning) << "Failed to open stream '" << src << "'" <<
01491 " using format '" << fFormat << "'" << endl;
01492 fStatus.SetEndRun();
01493 fStatus.SetEndFile();
01494 fStatus.SetEndOfInputStream();
01495 return 0;
01496 }
01497
01498 fStatus.SetBeginOfInputStream();
01499 fStatus.SetBeginFile();
01500 fStatus.SetBeginRun();
01501
01502
01503 this->UpdateStreamConfig();
01504 this->UpdateFileList();
01505 this->UpdateDDSConfig();
01506 fFormat = fDataStreamItr->GetFormat();
01507
01508 MSG("Io",Msg::kDebug)
01509 << "Opened stream itr of format " << fDataStreamItr->GetFormat() << endl;
01510
01511 return 1;
01512 }
01513
01514
01515
01516 void IoInputModule::CloseStreamItr()
01517 {
01518
01519
01520
01521 if (fDataStreamItr) {
01522 MSG("Io",Msg::kDebug)
01523 << "Close stream itr of format " << fDataStreamItr->GetFormat() << endl;
01524 delete fDataStreamItr;
01525 fDataStreamItr = 0;
01526 fStatus.SetEndRun();
01527 fStatus.SetEndFile();
01528 fStatus.SetEndOfInputStream();
01529 }
01530 }
01531
01533
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546