Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

IoOutputModule.cxx

Go to the documentation of this file.
00001 
00002 // $Id: IoOutputModule.cxx,v 1.26 2005/11/28 04:06:11 schubert Exp $
00003 //
00004 // messier@huhepl.harvard.edu
00006 #include "IoModules/IoOutputModule.h"
00007 
00008 #include <vector>
00009 #include <algorithm>
00010 
00011 #include "MessageService/MsgService.h"
00012 #include "JobControl/JobCEnv.h"
00013 #include "JobControl/JobCommand.h"
00014 #include "JobControl/JobCModuleRegistry.h"
00015 #include "Persistency/PerOutputStreamManager.h"
00016 #include "Persistency/PerOutputStream.h"
00017 #include "Registry/Registry.h"
00018 #include "Util/UtilString.h"  // StringTok method
00019 
00020 CVSID("$Id: IoOutputModule.cxx,v 1.26 2005/11/28 04:06:11 schubert Exp $");
00021 JOBMODULE(IoOutputModule,"Output","Write data to output streams");
00022 
00023 //......................................................................
00024 
00025 IoOutputModule::IoOutputModule() :
00026   fOutputStreamManager(new PerOutputStreamManager),
00027   fAutoSaveInt(0),
00028   fAutoSaveTime(0),
00029   fAutoSaveBytes(10000000), // default is to save tree every 10 Mbytes
00030   fFileName(""),
00031   fDefaultFileName(""),
00032   fAccessMode(Per::kNew),
00033   fOutputFileOpen(false)
00034 { 
00035   // Initialize fStreamList with default list of output streams 
00036   this->SetStreams("DaqSnarl,DaqMonitor,LightInjection,Cand,Config,SimSnarl");
00037 }
00038 
00039 //......................................................................
00040 
00041 IoOutputModule::~IoOutputModule() 
00042 {
00043   // If we have output stream manager, delete it
00044   if (fOutputStreamManager) {
00045     delete fOutputStreamManager;
00046     fOutputStreamManager = 0;
00047   }
00048 }
00049 
00050 //......................................................................
00051 
00052 void IoOutputModule::BeginJob() 
00053 {
00054   // Open and configure the default output streams.
00055 
00056   // Define and enable default list of output streams, but don't override
00057   // user definitions if specified
00058   if (!fOutputStreamManager->GetOpenedStream("DaqSnarl"))
00059     this -> DefineStream("DaqSnarl","RawRecord","","DaqSnarl");
00060   if (!fOutputStreamManager->GetOpenedStream("DaqMonitor"))
00061     this -> DefineStream("DaqMonitor","RawRecord","","DaqMonitor");
00062   if (!fOutputStreamManager->GetOpenedStream("LightInjection"))
00063     this -> DefineStream("LightInjection","RawRecord","","LightInjection");
00064   if (!fOutputStreamManager->GetOpenedStream("Cand"))
00065     this -> DefineStream("Cand","CandRecord","PrimaryCandidateRecord");
00066   if (!fOutputStreamManager->GetOpenedStream("SimSnarl"))
00067     this -> DefineStream("SimSnarl","SimSnarlRecord");
00068   if (!fOutputStreamManager->GetOpenedStream("Config"))
00069     this -> DefineStream("Config","ConfigRecord");
00070   if (!fOutputStreamManager->GetOpenedStream("NtpSR"))
00071     this -> DefineStream("NtpSR","NtpSRRecord");
00072   if (!fOutputStreamManager->GetOpenedStream("NtpMC"))
00073     this -> DefineStream("NtpMC","NtpMCRecord");
00074   if (!fOutputStreamManager->GetOpenedStream("NtpTH"))
00075     this -> DefineStream("NtpTH","NtpTHRecord");
00076   if (!fOutputStreamManager->GetOpenedStream("NtpSt"))
00077     this -> DefineStream("NtpSt","NtpStRecord");
00078   
00079   this -> EnableStreamList();
00080   
00081   const char* saveOpt = fOutputStreamManager->GetPrintOpt();
00082   fOutputStreamManager->SetPrintOpt("brief"); // must be a better way
00083   MSG("Io",Msg::kDebug) << "BeginJob. Status of output stream manager:\n"
00084                         << *fOutputStreamManager << endl;
00085   fOutputStreamManager->SetPrintOpt(saveOpt);
00086 }
00087 
00088 //......................................................................
00089 
00090 const Registry& IoOutputModule::DefaultConfig() const
00091 {
00092 //======================================================================
00093 // Configure the module given the values held in registry r
00094 //======================================================================
00095   static Registry r;
00096   std::string name = this->GetName();
00097   name += ".config.default";
00098   r.SetName(name.c_str());
00099 
00100   MSG("Io",Msg::kDebug) << "Loading default config\n";
00101 
00102   r.UnLockValues();
00103   r.Set("AutoSaveInt",    0);
00104   r.Set("AutoSaveTime",   0);
00105   r.Set("AutoSaveBytes", 10000000);
00106   r.Set("AccessMode",     "Recreate");
00107   r.Set("Streams",  "DaqSnarl,DaqMonitor,LightInjection,Cand,Config,SimSnarl");
00108 
00109   // Grab the default output file from the command line
00110   r.Set("FileName",       JobCEnv::Instance().GetDefaultOutputFileName());
00111 
00112   // Set a fall back if nothing is given on the command line
00113   r.Set("DefaultFileName","out.root");
00114 
00115   r.LockValues();
00116 
00117   return r;
00118 }
00119 
00120 //......................................................................
00121 
00122 void IoOutputModule::Config(const Registry& r)
00123 {
00124 //======================================================================
00125 // Configure the module given the values held in registry r
00126 //======================================================================
00127   int         tmpi;
00128   const char* tmps;
00129 
00130   MSG("Io",Msg::kDebug) << "Config IoOutputModule with r=" << r << "\n";
00131 
00132   bool doAutoSaveConfig = false;
00133   if (r.Get("AutoSaveInt",tmpi)) {
00134     fAutoSaveInt = tmpi;
00135     doAutoSaveConfig = true;
00136   }
00137   if (r.Get("AutoSaveTime",tmpi)) {
00138     fAutoSaveTime = tmpi;
00139     doAutoSaveConfig = true;
00140   }
00141   if (r.Get("AutoSaveBytes",tmpi)) {
00142     fAutoSaveBytes = tmpi;
00143     doAutoSaveConfig = true;
00144   }
00145 
00146   if ( doAutoSaveConfig && fOutputStreamManager ) fOutputStreamManager
00147           ->SetAutoSave("*",fAutoSaveInt,fAutoSaveTime,fAutoSaveBytes);
00148 
00149   bool doFileConfig = false;
00150   if (r.Get("FileName", tmps)) {
00151     fFileName = tmps;
00152     doFileConfig = true;
00153   }
00154   if (r.Get("AccessMode", tmps)) {
00155     tmpi = Per::GetAccessMode(tmps);
00156     if ( tmpi > 0 ) {
00157       fAccessMode = static_cast<Per::EAccessMode>(tmpi);
00158       doFileConfig = true;
00159     }
00160     else {
00161       MSG("Io",Msg::kWarning) << "Invalid file accessmode " << tmps
00162                               << " will be ignored. " << endl;
00163     }
00164   }
00165   if ( doFileConfig ) {
00166     // Close file because filename or accessmode has changed.  Wait until
00167     // IoOutputModule::Put call to reopen
00168     this -> CloseFile();
00169   }
00170   if ( r.Get("Streams",tmps) ) {
00171     this -> SetStreams(tmps);
00172   }
00173   if (r.Get("DefaultFileName", tmps)) {
00174     fDefaultFileName = tmps;
00175   }
00176 }
00177 
00178 //......................................................................
00179 
00180 void IoOutputModule::EndJob()
00181 {
00182   // Close current output file
00183   this->CloseFile();
00184 
00185   // Close all streams
00186   fOutputStreamManager->CloseStream();
00187 }
00188 
00189 //......................................................................
00190 
00191 JobCResult IoOutputModule::Put(const MomNavigator* mom)
00192 { 
00193   if (fOutputFileOpen==false) this->OpenFile();
00194   fOutputStreamManager->Put(mom);
00195   return JobCResult::kAOK;
00196 }
00197 
00198 //......................................................................
00199 
00200 void IoOutputModule::HandleCommand(JobCommand* cmd) 
00201 {
00202   if (!cmd->HaveCmd()) return;
00203   string s = cmd->PopCmd();
00204   if (s == "Open") {
00205     if (cmd->HaveOpt()) {
00206       fFileName = cmd->PopOpt(); // Set the file name
00207       // Determine the access mode
00208       string mode = "New";
00209       fAccessMode = Per::kNew;
00210       if (cmd->HaveOpt()) {
00211         mode = cmd->PopOpt();
00212         if      (mode == "New")      fAccessMode = Per::kRead;
00213         else if (mode == "Recreate") fAccessMode = Per::kRecreate;
00214         else if (mode == "Update")   fAccessMode = Per::kUpdate;
00215         else {
00216           MSG("Io",Msg::kError) << 
00217             mode.c_str() << " is not a valid output mode\n";
00218           return;
00219         }
00220       }
00221     }
00222     else {
00223       MSG("Io",Msg::kError) <<
00224         "Open requires at least a file name argument.\n";
00225     }
00226     return;
00227   }
00228   if (s == "Close") { 
00229     this->CloseFile(); 
00230     return; 
00231   }
00232   if (s == "AutoSave") {
00233     if (cmd->HaveOpt()) {
00234       fAutoSaveInt = cmd->PopIntOpt();
00235       return;
00236     }
00237     else {
00238       MSG("Io",Msg::kWarning) << "AutoSave requires one argument.\n";
00239       return;
00240     }
00241   }
00242   if (s == "AddStreams") {
00243     string streamlist;
00244     if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00245     this -> AddStreams(streamlist);
00246     return;
00247   }
00248   if (s == "DefineStream") {
00249     string streamname,classname,username,instreamnm;
00250     Int_t splitlevel=Per::kRecSplit;
00251     if (cmd->HaveOpt()) {
00252       streamname = cmd->PopOpt(); 
00253       if (cmd->HaveOpt()) {
00254         classname = cmd->PopOpt(); if (classname == "<null>") classname="";
00255         if (cmd->HaveOpt()) {
00256           username = cmd->PopOpt(); if (username == "<null>") username="";
00257           if ( cmd->HaveOpt() ) {
00258             instreamnm = cmd->PopOpt(); if (instreamnm=="<null>")instreamnm="";
00259             if ( cmd->HaveOpt() ) {
00260               splitlevel = cmd->PopIntOpt();
00261             }
00262           }
00263         }
00264         this -> DefineStream(streamname,classname,username,instreamnm,
00265                              splitlevel);
00266       }
00267     }
00268     if (streamname.empty() || classname.empty()) {
00269       MSG("Io",Msg::kError) <<
00270       "DefineStream requires at least streamname,recordclassname arguments.\n";
00271     }
00272     return;
00273   }
00274 
00275   if (s == "RemoveStreams") {
00276     string streamlist;
00277     if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00278     this -> RemoveStreams(streamlist);
00279     return;
00280   }
00281   if (s == "SetStreams") {
00282     string streamlist;
00283     if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00284     this -> SetStreams(streamlist);
00285     return;
00286   }
00287   if (s == "Set") {
00288     const char* optc = cmd->PopOpt();
00289     if (optc) {
00290       string opt(optc);
00291       if ( opt == "Streams") {
00292         string streamlist;
00293         if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00294         this -> SetStreams(streamlist);
00295         return;
00296       }
00297       MSG("Io",Msg::kWarning) << "Unknown option '" << opt << "'.\n";
00298     }
00299     else {
00300       MSG("Io",Msg::kWarning) << "Set requires additional arguments.\n";
00301     }
00302   }
00303   // Errors fall through to here
00304   MSG("Io",Msg::kError) << "Illegal command: " << s.c_str() << "\n";
00305 }
00306 
00307 //......................................................................
00308 
00309 bool IoOutputModule::OpenFile() 
00310 {
00311   // If no file name has been given by this time, use the default as a
00312   // fall back
00313   if (fFileName=="") fFileName = fDefaultFileName;
00314 
00315   fOutputFileOpen = false;
00316 
00317   // set new file on all managed output streams
00318   if (fOutputStreamManager->SetFile("*",fFileName,fAccessMode)) {
00319     MSG("Io",Msg::kDebug) << "OpenFile " << fFileName 
00320       << " with accessmode " << Per::AsString(fAccessMode) << "." << endl;
00321     fOutputFileOpen = true;
00322   }
00323   return fOutputFileOpen;
00324 }
00325 
00326 //......................................................................
00327 
00328 bool IoOutputModule::CloseFile()
00329 {
00330   MSG("Io",Msg::kDebug) << "CloseFile." << endl;
00331   fOutputStreamManager->Write("*",false);
00332   fOutputStreamManager->CloseFile();
00333   fOutputFileOpen = false;
00334   return true;
00335 }
00336 
00337 //......................................................................
00338 
00339 int IoOutputModule::AddStreams(std::string streamList)
00340 {
00341   //
00342   // Purpose: Add output streams on the input streamList to the list of
00343   //          enabled streams.
00344   // 
00345   // Argument: streamlist  list of streams separated by delimiters
00346   //
00347   // Return: Number of streams remaining open.
00348   //
00349   // Contact: S.Kasahara
00350   // 
00351   // Notes: Streams must be defined via the DefineStream method.
00352   //
00353 
00354 
00355   // Parse input streamList into vector of requested output streams.
00356   std::vector<std::string> addList;
00357   UtilString::StringTok(addList,streamList,":,;/ ");
00358   
00359   vector<string>::iterator addItr,stItr;
00360   for ( addItr = addList.begin(); addItr != addList.end(); addItr++ ) {
00361     string addStreamName = *addItr;
00362     stItr = find(fStreamList.begin(),fStreamList.end(),addStreamName);
00363     if (stItr == fStreamList.end())fStreamList.push_back(addStreamName);
00364   }
00365   
00366   Int_t nenabled = this -> EnableStreamList();
00367   MSG("Io",Msg::kVerbose) << "AddStreams " << streamList << "." << endl;
00368  
00369   return nenabled;
00370 
00371 }
00372 
00373 //......................................................................
00374 
00375 int IoOutputModule::DefineStream(std::string streamName,std::string className, 
00376           std::string userName,std::string inputStreamName,Int_t splitLevel)
00377 {
00378   //
00379   // Purpose: Define and open output stream.
00380   // 
00381   // Argument:   streamName   name of output stream
00382   //              className   class of records to persist
00383   //               userName   TNamed name of record (optional, default="")
00384   //        inputStreamName   origin of record  (optional, default="")
00385   //             splitLevel   splitLevel of tree (optional, def=Per::kRecSplit)
00386   //
00387   // Return: 1.
00388   //
00389   // Contact: S.Kasahara
00390   // 
00391   // Notes: If stream with name streamName has already been defined it will
00392   //        be overridden with new definition.  classname,username,
00393   //        and inputstreamname are the key used to extract the record(s) 
00394   //        from mom.
00395   //
00396 
00397   if ( fOutputStreamManager->GetOpenedStream(streamName) ) {
00398     // Close existing stream before proceeding
00399     fOutputStreamManager->CloseStream(streamName);
00400   }
00401 
00402   // Open output stream
00403   PerOutputStream* outStream = fOutputStreamManager ->
00404     OpenStream(streamName,streamName,className,userName,inputStreamName,
00405                                                         splitLevel);
00406 
00407   outStream->SetAutoSave(fAutoSaveInt,fAutoSaveTime,fAutoSaveBytes);
00408 
00409   return 1;
00410 
00411 }
00412 
00413 //......................................................................
00414 
00415 int IoOutputModule::EnableStreamList() 
00416 {
00417   //
00418   // Purpose: Enable all streams on fStreamList.  
00419   // 
00420   // Argument: none.
00421   //
00422   // Return: Number of streams remaining open.
00423   //
00424   // Contact: S.Kasahara
00425   // 
00426   // Notes: Streams must have been previously defined via the 
00427   //        DefineStream method.
00428   //
00429 
00430   // Attach "Associated" streams. An example is the Config stream is
00431   // associated with the Cand stream.
00432   this -> AttachAssociatedStreams();
00433 
00434   // Begin by enabling all defined streams
00435   fOutputStreamManager->SetEnable();
00436   Int_t nenabled = fOutputStreamManager->GetNumStream();
00437 
00438   // Disable streams that are not on requested list
00439   const PerStreamManager::StreamMap& smap 
00440                             = fOutputStreamManager->GetStreamMap();
00441   for ( PerStreamManager::StreamMapConstItr citr  = smap.begin();
00442                                             citr != smap.end(); ++citr ) {
00443     string streamName = citr->first;
00444     typedef vector<string>::const_iterator vs_citer;
00445     vs_citer vpos = find(fStreamList.begin(),fStreamList.end(),streamName);
00446     if ( vpos == fStreamList.end() ) {
00447       fOutputStreamManager->SetEnable(streamName,false);
00448       nenabled--;
00449     }
00450   }
00451 
00452   return nenabled;
00453 }
00454 
00455 //......................................................................
00456 
00457 int IoOutputModule::RemoveStreams(std::string streamList)
00458 {
00459   //
00460   // Purpose: Remove output streams on the streamList from the list of
00461   //          enabled streams.
00462   // 
00463   // Argument: streamlist  list of streams separated by delimiters
00464   //
00465   // Return: Number of streams remaining open.
00466   //
00467   // Contact: S.Kasahara
00468   // 
00469   // Notes: Streams must be defined via the DefineStream method.
00470   //
00471 
00472 
00473   // Parse input streamList into vector of requested output streams.
00474   std::vector<std::string> removeList;
00475   UtilString::StringTok(removeList,streamList,":,;/ ");
00476   
00477   vector<string>::iterator rmItr,stItr;
00478   for ( rmItr = removeList.begin(); rmItr != removeList.end(); rmItr++ ) {
00479     stItr = find(fStreamList.begin(),fStreamList.end(),*rmItr);
00480     if (stItr != fStreamList.end())fStreamList.erase(stItr);
00481   }
00482 
00483   Int_t nenabled = this -> EnableStreamList();
00484   MSG("Io",Msg::kVerbose) << "RemoveStreams " << streamList << "." << endl;
00485 
00486   return nenabled;
00487 
00488 }
00489 
00490 //......................................................................
00491 
00492 int IoOutputModule::SetStreams(string streamList)
00493 {
00494   //
00495   // Purpose: Modify enabled output streams to match those on the streamlist
00496   // 
00497   // Argument: streamlist  list of streams separated by delimiters
00498   //
00499   // Return: Number of streams remaining open.
00500   //
00501   // Contact: S.Kasahara
00502   // 
00503   // Notes: Streams must be defined via the DefineStream method.
00504   //
00505 
00506 
00507   MSG("Io",Msg::kVerbose) << "SetStreams " << streamList << "." << endl;
00508 
00509   // Parse input streamList into vector of requested output streams.
00510   fStreamList.clear();
00511   UtilString::StringTok(fStreamList,streamList,":,;/ ");
00512 
00513   Int_t nenabled = this -> EnableStreamList();
00514  
00515   return nenabled;
00516 
00517 }
00518 
00519 //......................................................................
00520 
00521 int IoOutputModule::AttachAssociatedStreams() {
00522   //
00523   // Purpose: Private method to attach "Associated" streams to output
00524   //          stream list
00525   //
00526   // Return: Number of attached associated streams.
00527   // 
00528 
00529   int nassoc = 0;
00530 
00531   // Since associated streams may in turn have associated streams,
00532   // this is done as a while loop until no more streams are added
00533   bool isNewStream = true;
00534   while (isNewStream) {
00535     isNewStream = false;
00536     std::vector<std::string>::const_iterator s_citer;
00537     for (s_citer = fStreamList.begin();s_citer != fStreamList.end();s_citer++){
00538       std::string streamname = *s_citer;
00539       std::string assocstreams 
00540                           = Per::GetAssociatedStreamList(streamname.c_str());
00541       if ( !assocstreams.empty() ) {
00542         // parse delimiter separated streamlist into a vector of streamnames
00543         std::vector<std::string> assocstreamlist;
00544         UtilString::StringTok(assocstreamlist,
00545                               std::string(assocstreams),":,;/ ");
00546         std::vector<std::string>::const_iterator a_citer;
00547         // Add associated streams to fStreamList that aren't already present
00548         for ( a_citer = assocstreamlist.begin(); 
00549               a_citer != assocstreamlist.end(); a_citer++ ) {
00550           std::string assocstreamname = *a_citer;
00551           if ( find(fStreamList.begin(),fStreamList.end(),assocstreamname)
00552                == fStreamList.end() ) {
00553             MSG("Io",Msg::kVerbose) << "Attach associated stream " 
00554                                     << assocstreamname.c_str() << "." << endl;
00555             fStreamList.push_back(assocstreamname);
00556             nassoc++;
00557             isNewStream = true;
00558           }
00559         }
00560       }
00561     }
00562   }
00563   
00564   return nassoc;
00565 
00566 }
00567 
00568 
00569 
00571 
00572 
00573 
00574 
00575 
00576 
00577 
00578 

Generated on Mon Feb 15 11:06:48 2010 for loon by  doxygen 1.3.9.1