GammaWare  Head Version for release 0.9
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ALoadedActor.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * Copyright (C) 2004-2006 by Olivier Stezowski & Christian Finck *
3  * stezow(AT)ipnl.in2p3.fr, cfinck(AT)ires.in2p3.fr *
4  * *
5  * This program is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU General Public License as published by *
7  * the Free Software Foundation; either version 2 of the License, or *
8  * (at your option) any later version. *
9  * *
10  * This program is distributed in the hope that it will be useful, *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13  * GNU General Public License for more details. *
14  * *
15  * You should have received a copy of the GNU General Public License *
16  * along with this program; if not, write to the *
17  * Free Software Foundation, Inc., *
18  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19  ***************************************************************************/
20 
23 #include "ALoadedActor.h"
24 #include <stdexcept>
25 
26 //#define L_DEBUG
27 
28 using namespace Gw;
29 
31  fError(0u),
32  fRefToLoadedActor(lactor),
33  fConfigFunction(f)
34 {
35  ;
36 }
37 
39 {
40  // now call the conf method, get path from the loaded actor
41  NamedItem<std::string> conf_path("ConfDir","./");
42  //
43  fRefToLoadedActor->GetGlobal(conf_path);
44  //
45  std::string path =
46  conf_path.Get();
47  fConfigFunction(path.data(),&fError);
48 }
49 
50 
51 Gw::ALoadedActor::ALoadedActor(const Char_t *name) :
52  fLog("ALoadedActor"),
53  fName(name),
54  fGlobals(),
55  fGeneration(0x0),
56  fConfigurator(0x0),
57  fInputs(),
58  fOutputs(),
59  fDepth(-1),
60  fActor(0x0),
61  fActorType( kUnknown ),
62  fActorStatus( kUndefined ),
63  fLastError(0u)
64 {
65  ;
66 }
67 
69 {
70  AConfigurator *old_conf = fConfigurator; fConfigurator = new_conf;
71  //
72  return old_conf;
73 }
74 
75 
77 
80 {
81  // add the connection to this and attach the right block of the connection
82  // to the embedded actor
83  fInputs.push_back(aconnection);
84 }
85 
87 {
88  // add the connection to this and attach the right block of the connection
89  // to the embedded actor
90  fOutputs.push_back(aconnection);
91 }
92 
94 {
95  if ( GetNbInput() == 0 )
96  return;
97 
98  depth++;
99 
100  Int_t tmp_depth = depth, max_depth = 0;
101 
102  for (Int_t i = 0; i < GetNbInput() ; i++) {
103  ALoadedActor *node = GetInput(i);
104 
105  tmp_depth = depth;
106  node->ComputeDepth(tmp_depth);
107 
108  if ( tmp_depth > max_depth ) {
109  max_depth = tmp_depth;
110  }
111  }
112  depth = max_depth;
113 }
114 
115 void Gw::ALoadedActor::SetGlobals( const std::vector < ANamedItem *> &globals )
116 {
117  for (size_t i = 0; i < globals.size(); i++) {
118  Bool_t add_new = true;
119  for (size_t j = 0; j < fGlobals.size(); j++) {
120  //
121  if ( globals[i]->GetName() == fGlobals[j]->GetName() ) {
122  // change is possible means same name same type
123  if ( fGlobals[j]->Set(globals[i]) ) {
124  add_new = false;
125  break;
126  }
127  }
128  }
129  if ( add_new ) {
130  fGlobals.push_back( globals[i]->MakeAClone() );
131  }
132 
133  }
134  if ( fGeneration == 0x0 ) {
135  for (size_t j = 0; j < fGlobals.size(); j++) {
136  if ( fGlobals[j]->GetName() == std::string("Generation") ) {
137  fGeneration = dynamic_cast< NamedItem<Short_t> * > ( fGlobals[j] );
138  }
139  }
140  }
141 }
142 
144 {
145  Bool_t add_new = true;
146  //
147  for (size_t j = 0; j < fGlobals.size(); j++) {
148  //
149  if ( global.GetName() == fGlobals[j]->GetName() ) {
150  // change is possible means same name same type
151  if ( fGlobals[j]->Set(&global) ) {
152  add_new = false;
153  break;
154  }
155  }
156  }
157  if ( add_new ) {
158  fGlobals.push_back( global.MakeAClone() );
159  }
160  if ( fGeneration == 0x0 ) {
161  for (size_t j = 0; j < fGlobals.size(); j++) {
162  if ( fGlobals[j]->GetName() == std::string("Generation") ) {
163  fGeneration = dynamic_cast< NamedItem<Short_t> * > ( fGlobals[j] );
164  }
165  }
166  }
167 }
168 
170 {
171  Bool_t is_set = false;
172  //
173  for (size_t j = 0; j < fGlobals.size(); j++) {
174  //
175  if ( global.GetName() == fGlobals[j]->GetName() ) {
176  // change is possible means same name same type
177  if ( global.Set(fGlobals[j]) ) {
178  is_set = true;
179  break;
180  }
181  }
182  }
183  return is_set;
184 }
185 
186 Short_t Gw::ALoadedActor::IsLinked(ALoadedActor *actor, AnActorConnection *&connection) const
187 {
188  Short_t result = 0;
189 
190  // std::cout << "Input size " << fInputs.size() << " for " << << std::endl;
191  for (UInt_t i = 0; i < fInputs.size(); i++) {
192  if ( fInputs[i]->GetProducer() == actor ) {
193  connection = fInputs[i];
194  result = -1;
195 
196  return result;
197  }
198  }
199 
200  // std::cout << "Output size " << fOutputs.size() << std::endl;
201  for (UInt_t i = 0; i < fOutputs.size(); i++) {
202  if ( fOutputs[i]->GetConsumer() == actor ) {
203  connection = fOutputs[i];
204  result = +1;
205 
206  return result;
207  }
208  }
209  connection = 0x0; return result;
210 }
211 
213 {
214  fConfigurator->SetError(0u);
215  fConfigurator->Configure();
216 
217  SetLastError(fConfigurator->GetError());
218 }
219 
221 {
222  fLog.SetProcessMethod("RegisterActor()");
223 
224  if ( fConfigurator == 0x0 )
225  throw std::bad_alloc();
226 
227  fConfigurator->SetError(0u);
228  fActor =
229  fConfigurator->Register();
230 
231  if ( fActor == 0x0 || fConfigurator->GetError() != 0u ) {
232  SetLastError(fConfigurator->GetError());
233  //
234  throw std::bad_alloc();
235  }
236 
237  if ( fActor->GetMaxInput() == 0 && fActor->GetMaxOutput() == 0 ) { // virtual or not well defined
238  //
239  fLog << error << "The registered actor " << GetName() << " cannot have any input and output " << dolog;
240  //
241  fActorType = kUnknown;
242  SetLastError(fConfigurator->GetError());
243  //
244  return;
245  }
246  SetLastError(0u);
247  if ( fActor->GetMaxInput() > 1 || fActor->GetMaxOutput() > 1 ) { // Shunter
248  UInt_t wrong = 0u;
249  // input and output are attached only for generation > 2 for which the proper narval interface is defined and the Shunter
250  if ( fGeneration->Get() > 0 ) {
251  for (size_t i = 0; i < fInputs.size(); i++) {
252  if ( fActor->AttachInput(fInputs[i]->GetBlock('c') ) < 0 ) {// cannot be attached
253  wrong = 1;
254  break;
255  }
256  }
257  for (size_t i = 0; i < fOutputs.size(); i++) {
258  if ( fActor->AttachOutput(fOutputs[i]->GetBlock('p') ) < 0 ) {// cannot be attached
259  wrong = 2;
260  break;
261  }
262  }
263  }
264  else wrong = 1; // to low generation for that
265  //
266  if ( wrong == 0 )
267  fActorType = kShunter;
268  else {
269  SetLastError(1u);
270  }
271  }
272  else {
273  if ( fActor->GetMaxInput() == 0 ) { // producer
274  if ( fInputs.size() != 0 || fOutputs.size() != 1 ) {
275  fLog << error << "The registered producer " << GetName() << " has not the right number of [input and/or output] [" << fInputs.size() << "," << fOutputs.size() << "]" << nline;
276  SetLastError(1u);
277  }
278  else {
279  if ( fGeneration->Get() > 0 ) {
280  fActor->AttachOutput(fOutputs[0]->GetBlock('p'));
281  }
282  fActorType = kProducer;
283  }
284  }
285  else {
286  if ( fActor->GetMaxOutput() == 0 ) { //consumer
287  if ( fInputs.size() != 1 || fOutputs.size() != 0 ) {
288  fLog << error << "The registered consumer " << GetName() << " has not the right number of [input and/or output] [" << fInputs.size() << "," << fOutputs.size() << "]" << nline;
289  SetLastError(1u);
290  }
291  else {
292  if ( fGeneration->Get() > 0 ) {
293  fActor->AttachInput(fInputs[0]->GetBlock('c'));
294  }
295  fActorType = kConsumer;
296  }
297  }
298  else {
299  if ( fInputs.size() != 1 || fOutputs.size() != 1 ) {
300  fLog << error << "The registered filter " << GetName() << " has not the right number of [input and/or output] [" << fInputs.size() << "," << fOutputs.size() << "]" << nline;
301  SetLastError(1u);
302  }
303  else {
304  if ( fGeneration->Get() > 0 ) {
305  fActor->AttachInput(fInputs[0]->GetBlock('c'));
306  fActor->AttachOutput(fOutputs[0]->GetBlock('p'));
307  }
308  fActorType = kFilter;
309  }
310  }
311  }
312  }
313  fLog << dolog;
314 }
315 
317 {
318  UInt_t error_code = 0u;
319  fActor->process_initialise(&error_code);
320 
321  SetLastError(error_code);
322 }
323 
325 {
326  NarvalProducer *p; NarvalConsumer *c; NarvalFilter *f; FrameBlock *block_in, *block_out; UInt_t error_code = 0u, size_in = 0u, size_out = 0u, used_size_out = 0u; char *input_buffer, *output_buffer;
327 
328  switch ( generation ) {
329  case -2:
330  // std::cout << " Generation -2" << std::endl;
331  case -1:
332  // std::cout << " Generation -1" << std::endl;
333  switch (fActorType) {
334  case kProducer:
335  p = dynamic_cast<NarvalProducer *> (fActor); block_out = fOutputs[0]->GetBlock('p');
336 
337  output_buffer = block_out->GetBufferIO()->Address();
338  size_out = block_out->GetMaxSize();
339  //std::cout << " proceess_produc 1" << std::endl;
340  p->process_block(output_buffer,size_out,&used_size_out,&error_code);
341 
342  //std::cout << " proceess_produc 2" << size_out << " " << used_size_out << std::endl;
343 
344  // the filled memory is again re attached to the block. It takes care of setting correctly size, eob etc ...
345  // do not rewind since it is passed to the next actor
346  block_out->SetBlock(output_buffer,used_size_out,'-');
347  if ( used_size_out > 0u )
348  SetStatus(ALoadedActor::kHasRun);
349 
350  break;
351 
352  case kConsumer:
353  c = dynamic_cast<NarvalConsumer *> (fActor); block_in = fInputs[0]->GetBlock('c');
354 
355  input_buffer = block_in->GetBufferIO()->Address();
356  size_in = block_in->GetSize();
357  //std::cout << " proceess_consu 1" << std::endl;
358  if ( size_in > 0u ) {
359  c->process_block(input_buffer,size_in,&error_code);
360  SetStatus(ALoadedActor::kHasRun);
361  }
362  else SetStatus(ALoadedActor::kIdle);
363 
364  //std::cout << " proceess_consu 2" << size_in << std::endl;
365  break;
366 
367  case kFilter:
368  f = dynamic_cast<NarvalFilter *> (fActor); block_in = fInputs[0]->GetBlock('c'); block_out = fOutputs[0]->GetBlock('p');
369 
370  input_buffer = block_out->GetBufferIO()->Address();
371  size_in = block_in->GetSize();
372  output_buffer = block_out->GetBufferIO()->Address();
373  size_out = block_out->GetMaxSize();
374 
375  if ( size_in > 0u ) {
376  f->process_block(input_buffer,size_in,output_buffer,size_out,&used_size_out,&error_code);
377  if ( used_size_out > 0u )
378  SetStatus(ALoadedActor::kHasRun);
379  }
380  else SetStatus(ALoadedActor::kIdle);
381 
382  // the filled memory is again re attached to the block. It takes care of setting correctly size, eob etc ...
383  // do not rewind since it is passed to the next actor
384  block_out->SetBlock(output_buffer,used_size_out,'-');
385 
386  break;
387  default:
388  throw std::logic_error("Unknown actor for emulator generation 0,1");
389  break;
390  }
391  break;
392  case 0:
393  // std::cout << " Generation 0" << std::endl;
394 
395  {
396  FrameBlock *block, *block_in, *block_out;
397  switch (fActorType) {
398  case kProducer:
399  block = fOutputs[0]->GetBlock('p'); block->Rewind(ConfAgent::kWrite);
400  if ( GetStatus() == kDoRunning ) {
401  p = dynamic_cast<NarvalProducer *> (fActor);
402  SetStatus(kRunning);
403  error_code = p->ProcessBlock((*block));
404 #ifdef L_DEBUG
405  std::cout << "Actor " << GetName() << " has produced " << block->GetSize() << " bytes " << std::endl;
406 #endif
407  if ( block->GetSize() ) { // producer are specific, could produce empty
408  SetStatus(ALoadedActor::kHasRun);
409  }
410  /*
411  if ( error_code > 0u ){
412  SetStatus(kFinished);
413  }
414  else {
415  if ( block->GetSize() ) {
416  SetStatus(ALoadedActor::kHasRun);
417  }
418  SetStatus(ALoadedActor::kIdle);
419  }
420  */
421  }
422  break;
423  case kConsumer:
424  c = dynamic_cast<NarvalConsumer *> (fActor); block_in = fInputs[0]->GetBlock('c');
425 #ifdef L_DEBUG
426  std::cout << "cunsumer " << GetName() << " has " << block_in->GetSize() << " bytes to consume " << std::endl;
427 #endif
428  if ( block_in->GetSize() ) {
429  SetStatus(kRunning);
430  error_code = c->ProcessBlock((*block_in));
431  if ( block_in->IsEoB() )
432  SetStatus(ALoadedActor::kHasRun);
433  /*
434  if ( error_code > 0u ) {
435  SetStatus(kFinished);
436  }
437  else {
438  SetStatus(ALoadedActor::kHasRun);
439  }*/
440  }
441  break;
442  case kFilter:
443  f = dynamic_cast<NarvalFilter *> (fActor); block_in = fInputs[0]->GetBlock('c'); block_out = fOutputs[0]->GetBlock('p');
444 #ifdef L_DEBUG
445  std::cout << "Filter " << GetName() << " has " << block_in->GetSize() << " bytes to consume " << std::endl;
446 #endif
447 
448  SetStatus(ALoadedActor::kRunning);
449  error_code = f->ProcessBlock((*block_in),(*block_out));
450 
451 #ifdef L_DEBUG
452  std::cout << "Filter " << GetName() << " has produced " << block_out->GetSize() << " bytes " << std::endl;
453 #endif
454  if ( block_out->GetSize() )
455  SetStatus(ALoadedActor::kHasRun);
456  else {
457  ;
458  }
459 
460 /*
461  if ( block_in->GetSize() ) {
462  SetStatus(ALoadedActor::kRunning);
463  error_code = f->ProcessBlock((*block_in),(*block_out));
464 
465 #ifdef L_DEBUG
466  std::cout << "Filter " << GetName() << " has produced " << block_out->GetSize() << " bytes " << std::endl;
467 #endif
468  if ( block_out->GetSize() )
469  SetStatus(ALoadedActor::kHasRun);
470 
471  if ( error_code > 0u ) {
472  SetStatus(kFinished);
473  }
474  else {
475  if ( block_out->GetSize() )
476  SetStatus(ALoadedActor::kHasRun);
477  }
478 
479  }
480  else {
481  block_out->Rewind(ConfAgent::kWrite); // force empty out
482  }
483  */
484  break;
485  default:
486  throw std::logic_error("Unknown actor for emulator generation 0");
487  break;
488  }
489  }
490  break;
491  default:
492  break;
493  }
494  if ( error_code )
495  SetStatus(ALoadedActor::kDead);
496 
497  SetLastError(error_code);
498 }
499 
501 {
502 // std::cout << " Call of (ProcessActor) " << fName << std::endl ;
503 
504  // to be used to check if the actors has done something
505  std::vector <UInt_t> inputs_off, outputs_off;
506 
507  // to deal with generation of Emulator
508  Short_t generation = fGeneration->Get();
509 
510  try
511  {
512  // to deal with generation for which the interface depends of the kind of actor
513  if ( generation < 1 ) {
514  ProcessActorGeneration_2_1_0(generation);
515  return;
516  }
517 
518  // loop on input for that node and check if they are free i.e. Owner is 0x0
519  // if so and the buffer is full, Rewind it to be ready to be read / written
520 
521  UInt_t ok_condition = fInputs.size() + fOutputs.size(), ok_val = 0u, nb_lazy = 0u, identical; inputs_off.clear(); outputs_off.clear();
522  for (UInt_t i = 0; i < fInputs.size(); i++) {
523  //
524  inputs_off.push_back(fInputs[i]->GetBlock('c')->GetOffset());
525  if ( fInputs[i]->IsLazy() ) {
526  nb_lazy++;
527  }
528  if ( fInputs[i]->IsReadyforConsumer() ) {
529  if ( !fInputs[i]->IsLazy() )
530  ok_val++;
531  fInputs[i]->SetReadyforConsumer();
532  }
533  }
534  for (UInt_t i = 0; i < fOutputs.size(); i++) {
535  //
536  outputs_off.push_back(fOutputs[i]->GetBlock('p')->GetOffset());
537 
538  if ( fOutputs[i]->IsLazy() ) {
539  nb_lazy++;
540  }
541  if ( fOutputs[i]->IsReadyforProducer() ) {
542  if ( !fOutputs[i]->IsLazy() )
543  ok_val++;
544  fOutputs[i]->SetReadyforProducer();
545  }
546  }
547  ok_condition -= nb_lazy;
548 
549  // now call processing of the embedded actor
550  if ( ok_condition == ok_val ) {
551  fActor->ProcessBlocks();
552 
553  // check if at least one of the input / output has changed
554 
555  identical = 0u;
556  for (UInt_t i = 0; i < fInputs.size(); i++) {
557  if ( inputs_off[i] == fInputs[i]->GetBlock('c')->GetOffset() )
558  identical++;
559  }
560  for (UInt_t i = 0; i < fOutputs.size(); i++) {
561  if ( outputs_off[i] == fOutputs[i]->GetBlock('p')->GetOffset() )
562  identical++;
563  }
564  if ( identical == (inputs_off.size()+outputs_off.size()) ) // additional check with poistion before / now of the offset in all the blocks
565  SetStatus(ALoadedActor::kIdle);
566  else
567  SetStatus(ALoadedActor::kHasRun);
568 
569  // SetStatus(ALoadedActor::kHasRun);
570  }
571  else {
572  SetStatus(ALoadedActor::kIdle); // nothing has been done ==> kIdle
573 // if ( ok_condition == 0 )
574  }
575  // to add : check all input and output ... if freed, means DLActor is Idle
576  /*
577  for (UInt_t i = 0; i < fInputs.size(); i++) {
578  //
579  if ( fInputs[i]->IsReadyForConsumer() ) {
580  }
581  }
582  for (UInt_t i = 0; i < fOutputs.size(); i++) {
583 
584  // the block to be treated
585  FrameBlock *block = fOutputs[i]->GetBlock('p');
586  //
587  if ( fOutputs[i]->IsReadyForProducer() ) {
588 
589  }
590  }
591  */
592 
593  // SetLastError(error_code);
594  }
595  catch (const std::exception& ex)
596  {
597  std::cout << " ERROR " << ex.what() << std::endl ;
598  }
599 }
600 
601 
void(* ConfigFunction)(const Char_t *, UInt_t *)
the function used to configure the underlying actor. Added to allow configuration by calling explicit...
Definition: ALoadedActor.h:35
virtual Long64_t GetSize(UInt_t=0u) const
size of the current block
Definition: FrameBlock.h:141
virtual Bool_t Set(const ANamedItem *)=0
change value of this nmaed item using the passed one. it checks if asked, it check first if of the sa...
virtual void ConfigureActor()
configure this actor i.e call static methods. The way it is done depends on the Globals set by using ...
void ProcessActorGeneration_2_1_0(Short_t generation)
to deal with generation for which the interface depends of the kind of actor
Short_t IsLinked(ALoadedActor *, AnActorConnection *&) const
try to see if the two actors are linked: 0 no, -1 yes as input, +1 yes as output
virtual void InitActor()
Init this actor. The way it is done depends on the Globals set by using SetEnvironment (most of the t...
It defines the interface needed to be a narval actor (producer).
virtual ANamedItem * MakeAClone() const =0
clone that named item
LogMessage & error(LogMessage &)
virtual Long64_t GetMaxSize(UInt_t=0u) const
max size of the block
Definition: FrameBlock.h:144
void ComputeDepth(Int_t &depth)
compute depth (the highest one) of this in the data flow.
A Block of Frames.
Definition: FrameBlock.h:43
header file for ALoadedActor.cpp
virtual void AddOutput(AnActorConnection *)
Add one input line to the actor. It does not check if possible ... it has to be done by the user befo...
ALoadedActor(const Char_t *name)
LogMessage & nline(LogMessage &)
virtual void process_block(void *input_buffer, UInt_t size_of_input_buffer, UInt_t *error_code)
Narval interface, Ask the algorithm to process the input data block.
virtual void process_block(void *input_buffer, UInt_t size_of_input_buffer, void *output_buffer, UInt_t size_of_output_buffer, UInt_t *used_size_of_output_buffer, UInt_t *error_code)
Ask the algorithm to process the data block.
std::string GetName() const
change value of this nmaed item using the passed one. it checks if asked, it check first if of the sa...
Definition: DataHandling.h:219
AConfigurator(ALoadedActor *, ConfigFunction)
virtual void RegisterActor()
Register this actor. The way it is done depends on the Globals set by using SetEnvironment (most of t...
Bool_t IsEoB() const
Definition: FrameBlock.h:86
An actor looded in an emulator.
Definition: ALoadedActor.h:88
It defines the interface needed to be a consumer.
virtual void AddInput(AnActorConnection *)
Add one input line to the actor. It does not check if possible ... it has to be done by the user befo...
LogMessage & dolog(LogMessage &)
virtual Data_T Get() const
Get the data.
Definition: DataHandling.h:296
virtual BufferIO * GetBufferIO(UInt_t which=0u)=0
To get the address of the underlying buffer.
ANamedItem.
Definition: DataHandling.h:188
virtual UInt_t ProcessBlock(FrameBlock &)=0
virtual method to be implemented
virtual void ProcessActor()
do the job
virtual Bool_t SetBlock(Char_t *=0x0, UInt_t=0u, char= 'r', char= 'l', UInt_t=0u)=0
Attach a external block to this.
ADF::LogMessage & endl(ADF::LogMessage &log)
virtual void SetGlobals(const std::vector< ANamedItem * > &)
change all globals simultaneously
virtual Bool_t GetGlobal(ANamedItem &) const
to get back one global
virtual AConfigurator * SetConfigurator(AConfigurator *)
adopt it so it is delete by this. It returns the previous one so that the user can delete it once the...
virtual void SetGlobal(const ANamedItem &)
change one global
virtual UInt_t ProcessBlock(FrameBlock &)=0
process one block, virtual method to be implemented
Char_t * Address()
for classes that needs it to write directly data into it.
Definition: BufferIO.h:98
virtual UInt_t ProcessBlock(FrameBlock &, FrameBlock &)=0
virtual method to be implemented
virtual void Rewind(ConfAgent::EMode mode=ConfAgent::kRead)=0
Rewind to be ready to be used again.
base class in charge of configurating, allocating the true actor loaded in a LoadedActor ...
Definition: ALoadedActor.h:43
virtual void process_block(void *output_buffer, UInt_t size_of_output_buffer, UInt_t *used_size_of_output_buffer, UInt_t *error_code)
Ask the algorithm to process the data block.
It defines the interface needed to be a narval actor (Filter).
Base class that connects two actors in a general topology.
virtual void Configure()
configure environment for the actor.