00001
00002
00003
00004
00005
00006
00008
00009 #include "Rotorooter/RotoClientModule.h"
00010 #include "Rotorooter/RotoClientBinaryFile.h"
00011 #include "Rotorooter/RotoClient.h"
00012 #include "Rotorooter/RotoObjectifier.h"
00013
00014 #include "MinosObjectMap/MomNavigator.h"
00015 #include "MessageService/MsgService.h"
00016 #include "JobControl/JobCModuleRegistry.h"
00017 #include "JobControl/JobCommand.h"
00018
00019 #include "RawData/RawRecord.h"
00020 #include "RawData/RawDaqHeader.h"
00021 #include "RawData/RawDataBlock.h"
00022
00023 #include "RawData/RawDaqHeaderBlock.h"
00024 #include "RawData/RawRunStartBlock.h"
00025 #include "RawData/RawRunEndBlock.h"
00026
00027 #include "TSystem.h"
00028
00029 const Char_t *dfltHost = "localhost";
00030 const Int_t dfltPort = 9011;
00031
00032 const Bool_t dfltPrimary = true;
00033 const Int_t dfltBufferWords = 6 * 1024 * 1024;
00034
00035 const UInt_t dbg_DumpHead = 0x0001;
00036 const UInt_t dbg_DumpAddBlock = 0x0002;
00037 const UInt_t dbg_PrintAddBlock = 0x0004;
00038 const UInt_t dbg_PrintHexAddBlock = 0x0008;
00039
00040 ClassImp(RotoClientModule)
00041
00042
00043
00044 CVSID("$Id: RotoClientModule.cxx,v 1.17 2007/07/09 00:50:06 rhatcher Exp $");
00045 JOBMODULE(RotoClientModule, "RotoClientModule",
00046 "Sends RawRecord info to Roto-Rooter");
00047
00048
00049
00050 RotoClientModule::RotoClientModule()
00051 : fRotoClient(0), fBufferWords(dfltBufferWords), fBuffer(0),
00052 fUseRototalk(true), fGenFakeRec(true), fDebugFlags(0)
00053 {
00054
00055
00056
00057
00058
00059 fPrimary = dfltPrimary;
00060
00061
00062 const char *ROTOSERVER = gSystem->Getenv("ROTOSERVER");
00063 if (!ROTOSERVER || strlen(ROTOSERVER) == 0) {
00064 ROTOSERVER = dfltHost;
00065 }
00066 SetHostPort(ROTOSERVER,0);
00067
00068 MSG("Roto", Msg::kInfo) << " Initially configured for host \""
00069 << fHost.c_str() << "\" Port " << fPort << endl;
00070 }
00071
00072
00073
00074 RotoClientModule::~RotoClientModule()
00075 {
00076
00077
00078
00079
00080 DestroyClient();
00081 DestroyBuffer();
00082 }
00083
00084
00085
00086 JobCResult RotoClientModule::Put(const MomNavigator *mom)
00087 {
00088
00089
00090
00091
00092 JobCResult jobcStatus(JobCResult::kAOK);
00093
00094 int* iptr_data = GetBuffer();
00095 if (!iptr_data) return JobCResult::kError;
00096
00097 RotoClient* client = GetClient();
00098 if (!client) return JobCResult::kError;
00099
00100 Bool_t ok = true;
00101
00102
00103
00104 TObject *tobj = 0;
00105 TIter reciter = const_cast<MomNavigator*>(mom)->FragmentIter();
00106 while ( (tobj = reciter() ) ) {
00107 RawRecord *rawrec = dynamic_cast<RawRecord*>(tobj);
00108 if (!rawrec) continue;
00109
00110 Char_t* iptr_bytes = (Char_t*) iptr_data;
00111 Int_t maxbytes = fBufferWords * sizeof(Int_t)/sizeof(Char_t);
00112 Int_t nbytes_used =
00113 RotoObjectifier::BufferSquish(rawrec,iptr_bytes,maxbytes);
00114
00115 VldContext vldc = rawrec->GetRawHeader()->GetVldContext();
00116
00117 const RawDaqHeader *rawdaqhead =
00118 dynamic_cast<const RawDaqHeader *>(rawrec->GetRawHeader());
00119 Int_t run = rawdaqhead->GetRun();
00120 Int_t subrun = rawdaqhead->GetSubRun();
00121
00122 ReOpenOutputFile(vldc.GetDetector(),run,subrun,&vldc,fGenFakeRec);
00123
00124
00125 ok &= client->SendRecordBuffer(iptr_bytes,nbytes_used);
00126 }
00127
00128 if (ok) return JobCResult::kPassed;
00129 return jobcStatus;
00130 }
00131
00132
00133 void RotoClientModule::BeginJob()
00134 {
00135
00136
00137 }
00138
00139 void RotoClientModule::EndJob()
00140 {
00141
00142
00143
00144
00145 ReOpenOutputFile(-1,-1,-1,0,fGenFakeRec);
00146 }
00147
00148 void RotoClientModule::BeginFile()
00149 {
00150
00151
00152 }
00153
00154 void RotoClientModule::EndFile()
00155 {
00156
00157
00158 }
00159
00160 void RotoClientModule::BeginRun()
00161 {
00162
00163
00164 }
00165
00166 void RotoClientModule::EndRun()
00167 {
00168
00169
00170 }
00171
00172 void RotoClientModule::Report()
00173 {
00174
00175 MSG("Roto", Msg::kInfo) << "RotoClientModule::Report" << endl;
00176
00177 MSG("Roto", Msg::kInfo) << "RotoClientModule has:" << endl;
00178
00179
00180 if (fRotoClient) {
00181 RotoClient* client = GetClient();
00182 if (client) {
00183 int maxbytes = 1024;
00184 char* buffer = new char[maxbytes];
00185 client->GetRotoStatus(buffer,maxbytes);
00186 if (fPort >= 0) {
00187 MSG("Roto", Msg::kInfo)
00188 << " connected client to \""
00189 << fHost.c_str()
00190 << "\" Port "
00191 << fPort
00192 << endl
00193 << " state_report: \"" << buffer << "\""
00194 << endl;
00195 } else {
00196 MSG("Roto", Msg::kInfo)
00197 << " open binary file \""
00198 << fHost.c_str()
00199 << "\" "
00200 << endl
00201 << buffer
00202 << endl;
00203 }
00204 delete [] buffer;
00205 } else {
00206 MSG("Roto", Msg::kInfo)
00207 << " connected client to \""
00208 << fHost.c_str()
00209 << "\" Port "
00210 << fPort
00211 << endl;
00212 }
00213 } else {
00214 MSG("Roto", Msg::kInfo)
00215 << " no currently connected client"
00216 << endl
00217 << " configured to use to \""
00218 << fHost.c_str()
00219 << "\" Port "
00220 << fPort
00221 << endl;
00222 }
00223
00224
00225 if (fBuffer) {
00226 MSG("Roto", Msg::kInfo)
00227 << " buffer of "
00228 << fBufferWords
00229 << " words"
00230 << endl;
00231 } else {
00232 MSG("Roto", Msg::kInfo)
00233 << " no buffer"
00234 << endl
00235 << " configured to use "
00236 << fBufferWords
00237 << " words"
00238 << endl;
00239 }
00240
00241
00242 MSG("Roto", Msg::kInfo)
00243 << " primary flag set to " << fPrimary
00244 << endl
00245 << " DebugFlags set to 0x" << hex << setw(8) << fDebugFlags << dec
00246 << endl
00247 << endl;
00248 }
00249
00250
00251 void RotoClientModule::HandleCommand(JobCommand *command)
00252 {
00253
00254
00255
00256
00257
00258
00259 TString cmd = command->PopCmd();
00260 if (cmd == "Set") {
00261 TString opt = command->PopOpt();
00262 if (opt == "HostPort") {
00263 string newHost = command->PopOpt();
00264 Int_t newPort = command->PopIntOpt();
00265 if (newHost != fHost || newPort != fPort) {
00266 DestroyClient();
00267 SetHostPort(newHost.c_str(),newPort);
00268 }
00269 }
00270 else if (opt == "BufferWords") {
00271 Int_t newSize = command->PopIntOpt();
00272 if (newSize != fBufferWords) {
00273 DestroyBuffer();
00274 fBufferWords = newSize;
00275 }
00276 }
00277 else if (opt == "Primary") {
00278 fPrimary = (command->PopIntOpt()) ? 1 : 0;
00279 }
00280 else if (opt == "GenFakeRec") {
00281 fGenFakeRec = (command->PopIntOpt()) ? 1 : 0;
00282 }
00283 else if (opt == "DebugFlags") {
00284 fDebugFlags = command->PopIntOpt();
00285 }
00286 else if (opt == "ClientDebugFlags") {
00287 Int_t flags = command->PopIntOpt();
00288 RotoClient::SetDebugFlags(flags);
00289 }
00290 else if (opt == "ObjectifierDebugFlags") {
00291 Int_t flags = command->PopIntOpt();
00292 RotoObjectifier::SetDebugFlags(flags);
00293 }
00294 else {
00295 MSG("Roto", Msg::kWarning)
00296 << "RotoClientModule: Unrecognized Set option " << opt << endl;
00297 }
00298 } else if (cmd == "Report") {
00299 Report();
00300 } else if (cmd == "CrashAndBurn") {
00301 MSG("Roto", Msg::kFatal)
00302 << "RotoClientModule: CrashAndBurn command " << endl;
00303 exit(0);
00304
00305 } else {
00306 MSG("Roto", Msg::kWarning)
00307 << "RotoClientModule: Unrecognized command " << cmd << endl;
00308 }
00309
00310 }
00311
00312
00313 void RotoClientModule::Config(const Registry& r)
00314 {
00315
00316
00317 string newHost = fHost;
00318 int newPort = fPort;
00319 int newSize = fBufferWords;
00320 const char* tmpcs;
00321 int tmpi;
00322
00323 if (r.Get("Host:Port",tmpcs)) {
00324 char host[255];
00325 char* hostptr = host;
00326
00327 while (*tmpcs != ':' && *tmpcs != '\0') *hostptr++ = *tmpcs++;
00328 if (host != hostptr) {
00329 *hostptr = '\0';
00330 newHost = host;
00331 }
00332 if (*tmpcs == ':') {
00333 tmpcs++;
00334 newPort = atoi(tmpcs);
00335 }
00336 }
00337
00338 if (r.Get("BufferWords",tmpi)) newSize = tmpi;
00339 if (r.Get("UseRototalk",tmpi)) fUseRototalk = tmpi;
00340 if (r.Get("Primary",tmpi)) fPrimary = tmpi;
00341 if (r.Get("GenFakeRec",tmpi)) fGenFakeRec = tmpi;
00342 if (r.Get("DebugFlags",tmpi)) fDebugFlags = tmpi;
00343 if (r.Get("ClientDebugFlags",tmpi)) RotoClient::SetDebugFlags(tmpi);
00344
00345 if ( newHost != fHost || newPort != fPort ) {
00346 DestroyClient();
00347 SetHostPort(newHost.c_str(),newPort);
00348 }
00349
00350 if (newSize != fBufferWords) {
00351 DestroyBuffer();
00352 fBufferWords = newSize;
00353 }
00354
00355 }
00356
00357 const Registry& RotoClientModule::DefaultConfig() const
00358 {
00359
00360 static Registry r;
00361
00362 std::string name = this->GetName();
00363 name += ".config.default";
00364 r.SetName(name.c_str());
00365
00366 r.UnLockValues();
00367
00368
00369
00370 char dfltHostPort[255];
00371 sprintf(dfltHostPort,"%s:%d",dfltHost,dfltPort);
00372 r.Set("Host:Port",dfltHostPort);
00373
00374 r.Set("UseRototalk",true);
00375 r.Set("BufferWords",dfltBufferWords);
00376 r.Set("Primary",true);
00377 r.Set("GenFakeRec",false);
00378
00379
00380 r.LockValues();
00381
00382 return r;
00383 }
00384
00385
00386 void RotoClientModule::Help()
00387 {
00388 MSG("Roto", Msg::kInfo)
00389 << "Help for 'RotoClientModule':" << endl
00390 << " RotoClientModule is a JobCModule for flattening a RawRecord" << endl
00391 << " to look like one that would come from the DAQ and then to" << endl
00392 << " write it out, either by sending the buffer to a Roto-rooter" << endl
00393 << " or writing it to a binary file" << endl
00394 << endl
00395 << "Commands implemented:" << endl
00396 << endl
00397 << " /RotoClientModule/Set HostPort <host> <port>" << endl
00398 << " set the output host (string) and port (int)" << endl
00399 << " if (port == -1) host is the name of the binary file" << endl
00400 << " if (port == 0) use default port #" << endl
00401 << endl
00402 << " /RotoClientModule/Set BufferWords <nwords>" << endl
00403 << " sets the buffer size (32 bit words) to use" << endl
00404 << endl
00405 << " /RotoClientModule/Set Primary {0|1}" << endl
00406 << " if not primary then only no writing is done" << endl
00407 << " but Report can be requested from server" << endl
00408 << endl
00409 << " /RotoClientModule/Set DebugFlags <bits>" << endl
00410 << " sets the debug flags bits" << endl
00411 << endl;
00412 }
00413
00414
00415 void RotoClientModule::SetHostPort(const Char_t* host, Int_t port)
00416 {
00417
00418
00419
00420 char* hostBuffer = new Char_t[strlen(host)+1];
00421 strcpy(hostBuffer,host);
00422
00423
00424 char *colon = strchr(hostBuffer,':');
00425 int bport = 0;
00426 if (colon) {
00427 sscanf(colon,":%d",&bport);
00428 *colon = 0;
00429 }
00430 fHost = hostBuffer;
00431 if (fHost == "") fHost = dfltHost;
00432 fPort = port;
00433 if (!fPort) fPort = bport;
00434 if (!fPort) fPort = dfltPort;
00435
00436 delete [] hostBuffer;
00437
00438 }
00439
00440
00441 RotoClient* RotoClientModule::GetClient()
00442 {
00443
00444
00445
00446
00447 if (fRotoClient) return fRotoClient;
00448
00449 if (fPort >= 0) {
00450 fRotoClient = new RotoClient(fHost.c_str(),fPort,
00451 MINOS_ROOTER_DCP,fUseRototalk);
00452 } else {
00453 fRotoClient = new RotoClientBinaryFile(fHost.c_str(),fPort);
00454 }
00455 if (fRotoClient->Connected()) return fRotoClient;
00456
00457 delete fRotoClient;
00458 fRotoClient = 0;
00459
00460 MSG("Roto", Msg::kWarning)
00461 << "RotoClientModule::GetClient for host " << fHost.c_str()
00462 << " port " << fPort << " failed " << endl;
00463 return 0;
00464
00465 }
00466
00467
00468 void RotoClientModule::DestroyClient()
00469 {
00470
00471
00472
00473 if (fRotoClient) { delete fRotoClient; fRotoClient=0; }
00474 }
00475
00476
00477 Int_t* RotoClientModule::GetBuffer()
00478 {
00479
00480
00481
00482 if (fBuffer) return fBuffer;
00483
00484 fBuffer = new Int_t[fBufferWords];
00485 if (fBuffer) return fBuffer;
00486
00487 MSG("Roto", Msg::kWarning)
00488 << "RotoClientModule::GetBuffer for size " << fBufferWords
00489 << " failed " << endl;
00490 return 0;
00491
00492 }
00493
00494
00495 void RotoClientModule::DestroyBuffer()
00496 {
00497
00498
00499 if (fBuffer) { delete [] fBuffer; fBuffer=0; }
00500 }
00501
00502
00503 void RotoClientModule::ReOpenOutputFile(Int_t detector, Int_t run, Int_t subrun,
00504 VldContext* pvldc, Bool_t fakeRunRec)
00505
00506 {
00507
00508
00509
00510
00511 static Int_t last_det = Detector::kUnknown;
00512 static Int_t last_run = -1;
00513 static Int_t last_subrun = -1;
00514 static VldContext last_vldc;
00515 static VldContext start_vldc;
00516
00517 RotoClient* client = GetClient();
00518
00519 if (!client) return;
00520
00521 bool newrun = (run != last_run || subrun != last_subrun);
00522
00523 if (newrun) {
00524
00525 if (last_run != -1 || last_subrun != -1) {
00526
00527 if (fakeRunRec) {
00528
00529
00530
00531 VldTimeStamp end_timestamp(last_vldc.GetTimeStamp());
00532 end_timestamp.Add(VldTimeStamp((time_t)0,+1));
00533 VldContext end_vldc(last_vldc.GetDetector(),
00534 last_vldc.GetSimFlag(),
00535 end_timestamp);
00536
00537 int runtype = 0;
00538 int timeframe = -1;
00539 RawDaqHeaderBlock* daqHdr =
00540 new RawDaqHeaderBlock(end_vldc,last_run,last_subrun,
00541 runtype,timeframe);
00542 RawDataBlock* runEnd =
00543 new RawRunEndBlock(start_vldc,end_timestamp,
00544 last_run,last_subrun,runtype);
00545 SendFakeStartEndRecord(daqHdr,runEnd);
00546 delete daqHdr;
00547 delete runEnd;
00548 }
00549
00550 client->CloseDAQFile(last_det,last_run,last_subrun);
00551 }
00552 }
00553
00554
00555
00556
00557 last_det = detector;
00558 last_run = run;
00559 last_subrun = subrun;
00560 if (pvldc) last_vldc = *pvldc;
00561
00562 if (newrun) {
00563
00564 if (last_run != -1 || last_subrun != -1) {
00565
00566 client->OpenDAQFile(last_det,last_run,last_subrun);
00567
00568 if (fakeRunRec) {
00569
00570
00571 VldTimeStamp start_timestamp(last_vldc.GetTimeStamp());
00572 start_timestamp.Add(VldTimeStamp((time_t)0,-1));
00573 start_vldc = VldContext(last_vldc.GetDetector(),
00574 last_vldc.GetSimFlag(),
00575 start_timestamp);
00576
00577 int runtype = 0;
00578 int timeframe = -1;
00579 RawDaqHeaderBlock* daqHdr =
00580 new RawDaqHeaderBlock(start_vldc,last_run,last_subrun,
00581 runtype,timeframe);
00582 RawDataBlock* runStart =
00583 new RawRunStartBlock(start_vldc,last_run,last_subrun,runtype);
00584 SendFakeStartEndRecord(daqHdr,runStart);
00585 delete daqHdr;
00586 delete runStart;
00587 }
00588 }
00589 }
00590
00591 }
00592
00593 void RotoClientModule::SendFakeStartEndRecord(RawDaqHeaderBlock* headerblk,
00594 RawDataBlock* startend)
00595
00596 {
00597
00598
00599
00600 RotoClient* client = GetClient();
00601
00602 if (!client) return;
00603
00604 int nwords = headerblk->GetSize() + startend->GetSize();
00605 int nbytes = sizeof(Int_t) * nwords;
00606
00607
00608 Int_t* buffer = new Int_t [nwords];
00609 Int_t* iptr = buffer;
00610
00611
00612 iptr = headerblk->AppendToBuffer(iptr);
00613 iptr = startend->AppendToBuffer(iptr);
00614
00615
00616 client->SendRecordBuffer(buffer,nbytes);
00617
00618
00619
00620 delete [] buffer;
00621 }
00622