00001
00002
00003
00004
00006 #include "IoModules/IoOutputModule.h"
00007
00008 #include <vector>
00009 #include <algorithm>
00010
00011 #include "MessageService/MsgService.h"
00012 #include "JobControl/JobCEnv.h"
00013 #include "JobControl/JobCommand.h"
00014 #include "JobControl/JobCModuleRegistry.h"
00015 #include "Persistency/PerOutputStreamManager.h"
00016 #include "Persistency/PerOutputStream.h"
00017 #include "Registry/Registry.h"
00018 #include "Util/UtilString.h"
00019
00020 CVSID("$Id: IoOutputModule.cxx,v 1.26 2005/11/28 04:06:11 schubert Exp $");
00021 JOBMODULE(IoOutputModule,"Output","Write data to output streams");
00022
00023
00024
00025 IoOutputModule::IoOutputModule() :
00026 fOutputStreamManager(new PerOutputStreamManager),
00027 fAutoSaveInt(0),
00028 fAutoSaveTime(0),
00029 fAutoSaveBytes(10000000),
00030 fFileName(""),
00031 fDefaultFileName(""),
00032 fAccessMode(Per::kNew),
00033 fOutputFileOpen(false)
00034 {
00035
00036 this->SetStreams("DaqSnarl,DaqMonitor,LightInjection,Cand,Config,SimSnarl");
00037 }
00038
00039
00040
00041 IoOutputModule::~IoOutputModule()
00042 {
00043
00044 if (fOutputStreamManager) {
00045 delete fOutputStreamManager;
00046 fOutputStreamManager = 0;
00047 }
00048 }
00049
00050
00051
00052 void IoOutputModule::BeginJob()
00053 {
00054
00055
00056
00057
00058 if (!fOutputStreamManager->GetOpenedStream("DaqSnarl"))
00059 this -> DefineStream("DaqSnarl","RawRecord","","DaqSnarl");
00060 if (!fOutputStreamManager->GetOpenedStream("DaqMonitor"))
00061 this -> DefineStream("DaqMonitor","RawRecord","","DaqMonitor");
00062 if (!fOutputStreamManager->GetOpenedStream("LightInjection"))
00063 this -> DefineStream("LightInjection","RawRecord","","LightInjection");
00064 if (!fOutputStreamManager->GetOpenedStream("Cand"))
00065 this -> DefineStream("Cand","CandRecord","PrimaryCandidateRecord");
00066 if (!fOutputStreamManager->GetOpenedStream("SimSnarl"))
00067 this -> DefineStream("SimSnarl","SimSnarlRecord");
00068 if (!fOutputStreamManager->GetOpenedStream("Config"))
00069 this -> DefineStream("Config","ConfigRecord");
00070 if (!fOutputStreamManager->GetOpenedStream("NtpSR"))
00071 this -> DefineStream("NtpSR","NtpSRRecord");
00072 if (!fOutputStreamManager->GetOpenedStream("NtpMC"))
00073 this -> DefineStream("NtpMC","NtpMCRecord");
00074 if (!fOutputStreamManager->GetOpenedStream("NtpTH"))
00075 this -> DefineStream("NtpTH","NtpTHRecord");
00076 if (!fOutputStreamManager->GetOpenedStream("NtpSt"))
00077 this -> DefineStream("NtpSt","NtpStRecord");
00078
00079 this -> EnableStreamList();
00080
00081 const char* saveOpt = fOutputStreamManager->GetPrintOpt();
00082 fOutputStreamManager->SetPrintOpt("brief");
00083 MSG("Io",Msg::kDebug) << "BeginJob. Status of output stream manager:\n"
00084 << *fOutputStreamManager << endl;
00085 fOutputStreamManager->SetPrintOpt(saveOpt);
00086 }
00087
00088
00089
00090 const Registry& IoOutputModule::DefaultConfig() const
00091 {
00092
00093
00094
00095 static Registry r;
00096 std::string name = this->GetName();
00097 name += ".config.default";
00098 r.SetName(name.c_str());
00099
00100 MSG("Io",Msg::kDebug) << "Loading default config\n";
00101
00102 r.UnLockValues();
00103 r.Set("AutoSaveInt", 0);
00104 r.Set("AutoSaveTime", 0);
00105 r.Set("AutoSaveBytes", 10000000);
00106 r.Set("AccessMode", "Recreate");
00107 r.Set("Streams", "DaqSnarl,DaqMonitor,LightInjection,Cand,Config,SimSnarl");
00108
00109
00110 r.Set("FileName", JobCEnv::Instance().GetDefaultOutputFileName());
00111
00112
00113 r.Set("DefaultFileName","out.root");
00114
00115 r.LockValues();
00116
00117 return r;
00118 }
00119
00120
00121
00122 void IoOutputModule::Config(const Registry& r)
00123 {
00124
00125
00126
00127 int tmpi;
00128 const char* tmps;
00129
00130 MSG("Io",Msg::kDebug) << "Config IoOutputModule with r=" << r << "\n";
00131
00132 bool doAutoSaveConfig = false;
00133 if (r.Get("AutoSaveInt",tmpi)) {
00134 fAutoSaveInt = tmpi;
00135 doAutoSaveConfig = true;
00136 }
00137 if (r.Get("AutoSaveTime",tmpi)) {
00138 fAutoSaveTime = tmpi;
00139 doAutoSaveConfig = true;
00140 }
00141 if (r.Get("AutoSaveBytes",tmpi)) {
00142 fAutoSaveBytes = tmpi;
00143 doAutoSaveConfig = true;
00144 }
00145
00146 if ( doAutoSaveConfig && fOutputStreamManager ) fOutputStreamManager
00147 ->SetAutoSave("*",fAutoSaveInt,fAutoSaveTime,fAutoSaveBytes);
00148
00149 bool doFileConfig = false;
00150 if (r.Get("FileName", tmps)) {
00151 fFileName = tmps;
00152 doFileConfig = true;
00153 }
00154 if (r.Get("AccessMode", tmps)) {
00155 tmpi = Per::GetAccessMode(tmps);
00156 if ( tmpi > 0 ) {
00157 fAccessMode = static_cast<Per::EAccessMode>(tmpi);
00158 doFileConfig = true;
00159 }
00160 else {
00161 MSG("Io",Msg::kWarning) << "Invalid file accessmode " << tmps
00162 << " will be ignored. " << endl;
00163 }
00164 }
00165 if ( doFileConfig ) {
00166
00167
00168 this -> CloseFile();
00169 }
00170 if ( r.Get("Streams",tmps) ) {
00171 this -> SetStreams(tmps);
00172 }
00173 if (r.Get("DefaultFileName", tmps)) {
00174 fDefaultFileName = tmps;
00175 }
00176 }
00177
00178
00179
00180 void IoOutputModule::EndJob()
00181 {
00182
00183 this->CloseFile();
00184
00185
00186 fOutputStreamManager->CloseStream();
00187 }
00188
00189
00190
00191 JobCResult IoOutputModule::Put(const MomNavigator* mom)
00192 {
00193 if (fOutputFileOpen==false) this->OpenFile();
00194 fOutputStreamManager->Put(mom);
00195 return JobCResult::kAOK;
00196 }
00197
00198
00199
00200 void IoOutputModule::HandleCommand(JobCommand* cmd)
00201 {
00202 if (!cmd->HaveCmd()) return;
00203 string s = cmd->PopCmd();
00204 if (s == "Open") {
00205 if (cmd->HaveOpt()) {
00206 fFileName = cmd->PopOpt();
00207
00208 string mode = "New";
00209 fAccessMode = Per::kNew;
00210 if (cmd->HaveOpt()) {
00211 mode = cmd->PopOpt();
00212 if (mode == "New") fAccessMode = Per::kRead;
00213 else if (mode == "Recreate") fAccessMode = Per::kRecreate;
00214 else if (mode == "Update") fAccessMode = Per::kUpdate;
00215 else {
00216 MSG("Io",Msg::kError) <<
00217 mode.c_str() << " is not a valid output mode\n";
00218 return;
00219 }
00220 }
00221 }
00222 else {
00223 MSG("Io",Msg::kError) <<
00224 "Open requires at least a file name argument.\n";
00225 }
00226 return;
00227 }
00228 if (s == "Close") {
00229 this->CloseFile();
00230 return;
00231 }
00232 if (s == "AutoSave") {
00233 if (cmd->HaveOpt()) {
00234 fAutoSaveInt = cmd->PopIntOpt();
00235 return;
00236 }
00237 else {
00238 MSG("Io",Msg::kWarning) << "AutoSave requires one argument.\n";
00239 return;
00240 }
00241 }
00242 if (s == "AddStreams") {
00243 string streamlist;
00244 if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00245 this -> AddStreams(streamlist);
00246 return;
00247 }
00248 if (s == "DefineStream") {
00249 string streamname,classname,username,instreamnm;
00250 Int_t splitlevel=Per::kRecSplit;
00251 if (cmd->HaveOpt()) {
00252 streamname = cmd->PopOpt();
00253 if (cmd->HaveOpt()) {
00254 classname = cmd->PopOpt(); if (classname == "<null>") classname="";
00255 if (cmd->HaveOpt()) {
00256 username = cmd->PopOpt(); if (username == "<null>") username="";
00257 if ( cmd->HaveOpt() ) {
00258 instreamnm = cmd->PopOpt(); if (instreamnm=="<null>")instreamnm="";
00259 if ( cmd->HaveOpt() ) {
00260 splitlevel = cmd->PopIntOpt();
00261 }
00262 }
00263 }
00264 this -> DefineStream(streamname,classname,username,instreamnm,
00265 splitlevel);
00266 }
00267 }
00268 if (streamname.empty() || classname.empty()) {
00269 MSG("Io",Msg::kError) <<
00270 "DefineStream requires at least streamname,recordclassname arguments.\n";
00271 }
00272 return;
00273 }
00274
00275 if (s == "RemoveStreams") {
00276 string streamlist;
00277 if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00278 this -> RemoveStreams(streamlist);
00279 return;
00280 }
00281 if (s == "SetStreams") {
00282 string streamlist;
00283 if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00284 this -> SetStreams(streamlist);
00285 return;
00286 }
00287 if (s == "Set") {
00288 const char* optc = cmd->PopOpt();
00289 if (optc) {
00290 string opt(optc);
00291 if ( opt == "Streams") {
00292 string streamlist;
00293 if (cmd->HaveOpt()) streamlist = cmd->PopOpt();
00294 this -> SetStreams(streamlist);
00295 return;
00296 }
00297 MSG("Io",Msg::kWarning) << "Unknown option '" << opt << "'.\n";
00298 }
00299 else {
00300 MSG("Io",Msg::kWarning) << "Set requires additional arguments.\n";
00301 }
00302 }
00303
00304 MSG("Io",Msg::kError) << "Illegal command: " << s.c_str() << "\n";
00305 }
00306
00307
00308
00309 bool IoOutputModule::OpenFile()
00310 {
00311
00312
00313 if (fFileName=="") fFileName = fDefaultFileName;
00314
00315 fOutputFileOpen = false;
00316
00317
00318 if (fOutputStreamManager->SetFile("*",fFileName,fAccessMode)) {
00319 MSG("Io",Msg::kDebug) << "OpenFile " << fFileName
00320 << " with accessmode " << Per::AsString(fAccessMode) << "." << endl;
00321 fOutputFileOpen = true;
00322 }
00323 return fOutputFileOpen;
00324 }
00325
00326
00327
00328 bool IoOutputModule::CloseFile()
00329 {
00330 MSG("Io",Msg::kDebug) << "CloseFile." << endl;
00331 fOutputStreamManager->Write("*",false);
00332 fOutputStreamManager->CloseFile();
00333 fOutputFileOpen = false;
00334 return true;
00335 }
00336
00337
00338
00339 int IoOutputModule::AddStreams(std::string streamList)
00340 {
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356 std::vector<std::string> addList;
00357 UtilString::StringTok(addList,streamList,":,;/ ");
00358
00359 vector<string>::iterator addItr,stItr;
00360 for ( addItr = addList.begin(); addItr != addList.end(); addItr++ ) {
00361 string addStreamName = *addItr;
00362 stItr = find(fStreamList.begin(),fStreamList.end(),addStreamName);
00363 if (stItr == fStreamList.end())fStreamList.push_back(addStreamName);
00364 }
00365
00366 Int_t nenabled = this -> EnableStreamList();
00367 MSG("Io",Msg::kVerbose) << "AddStreams " << streamList << "." << endl;
00368
00369 return nenabled;
00370
00371 }
00372
00373
00374
00375 int IoOutputModule::DefineStream(std::string streamName,std::string className,
00376 std::string userName,std::string inputStreamName,Int_t splitLevel)
00377 {
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397 if ( fOutputStreamManager->GetOpenedStream(streamName) ) {
00398
00399 fOutputStreamManager->CloseStream(streamName);
00400 }
00401
00402
00403 PerOutputStream* outStream = fOutputStreamManager ->
00404 OpenStream(streamName,streamName,className,userName,inputStreamName,
00405 splitLevel);
00406
00407 outStream->SetAutoSave(fAutoSaveInt,fAutoSaveTime,fAutoSaveBytes);
00408
00409 return 1;
00410
00411 }
00412
00413
00414
00415 int IoOutputModule::EnableStreamList()
00416 {
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 this -> AttachAssociatedStreams();
00433
00434
00435 fOutputStreamManager->SetEnable();
00436 Int_t nenabled = fOutputStreamManager->GetNumStream();
00437
00438
00439 const PerStreamManager::StreamMap& smap
00440 = fOutputStreamManager->GetStreamMap();
00441 for ( PerStreamManager::StreamMapConstItr citr = smap.begin();
00442 citr != smap.end(); ++citr ) {
00443 string streamName = citr->first;
00444 typedef vector<string>::const_iterator vs_citer;
00445 vs_citer vpos = find(fStreamList.begin(),fStreamList.end(),streamName);
00446 if ( vpos == fStreamList.end() ) {
00447 fOutputStreamManager->SetEnable(streamName,false);
00448 nenabled--;
00449 }
00450 }
00451
00452 return nenabled;
00453 }
00454
00455
00456
00457 int IoOutputModule::RemoveStreams(std::string streamList)
00458 {
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474 std::vector<std::string> removeList;
00475 UtilString::StringTok(removeList,streamList,":,;/ ");
00476
00477 vector<string>::iterator rmItr,stItr;
00478 for ( rmItr = removeList.begin(); rmItr != removeList.end(); rmItr++ ) {
00479 stItr = find(fStreamList.begin(),fStreamList.end(),*rmItr);
00480 if (stItr != fStreamList.end())fStreamList.erase(stItr);
00481 }
00482
00483 Int_t nenabled = this -> EnableStreamList();
00484 MSG("Io",Msg::kVerbose) << "RemoveStreams " << streamList << "." << endl;
00485
00486 return nenabled;
00487
00488 }
00489
00490
00491
00492 int IoOutputModule::SetStreams(string streamList)
00493 {
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507 MSG("Io",Msg::kVerbose) << "SetStreams " << streamList << "." << endl;
00508
00509
00510 fStreamList.clear();
00511 UtilString::StringTok(fStreamList,streamList,":,;/ ");
00512
00513 Int_t nenabled = this -> EnableStreamList();
00514
00515 return nenabled;
00516
00517 }
00518
00519
00520
00521 int IoOutputModule::AttachAssociatedStreams() {
00522
00523
00524
00525
00526
00527
00528
00529 int nassoc = 0;
00530
00531
00532
00533 bool isNewStream = true;
00534 while (isNewStream) {
00535 isNewStream = false;
00536 std::vector<std::string>::const_iterator s_citer;
00537 for (s_citer = fStreamList.begin();s_citer != fStreamList.end();s_citer++){
00538 std::string streamname = *s_citer;
00539 std::string assocstreams
00540 = Per::GetAssociatedStreamList(streamname.c_str());
00541 if ( !assocstreams.empty() ) {
00542
00543 std::vector<std::string> assocstreamlist;
00544 UtilString::StringTok(assocstreamlist,
00545 std::string(assocstreams),":,;/ ");
00546 std::vector<std::string>::const_iterator a_citer;
00547
00548 for ( a_citer = assocstreamlist.begin();
00549 a_citer != assocstreamlist.end(); a_citer++ ) {
00550 std::string assocstreamname = *a_citer;
00551 if ( find(fStreamList.begin(),fStreamList.end(),assocstreamname)
00552 == fStreamList.end() ) {
00553 MSG("Io",Msg::kVerbose) << "Attach associated stream "
00554 << assocstreamname.c_str() << "." << endl;
00555 fStreamList.push_back(assocstreamname);
00556 nassoc++;
00557 isNewStream = true;
00558 }
00559 }
00560 }
00561 }
00562 }
00563
00564 return nassoc;
00565
00566 }
00567
00568
00569
00571
00572
00573
00574
00575
00576
00577
00578