GammaWare  Head Version for release 0.9
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
WatcherServer.C
Go to the documentation of this file.
1 /***************************************************************************
2  * Copyright (C) 2010 by Olivier Stezowski *
3  * stezow(AT)ipnl.in2p3.fr *
4  * *
5  * This program is free software; you can redistribute it and/or modify *
6  * it under the terms of the GNU General Public License as published by *
7  * the Free Software Foundation; either version 2 of the License, or *
8  * (at your option) any later version. *
9  * *
10  * This program is distributed in the hope that it will be useful, *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13  * GNU General Public License for more details. *
14  * *
15  * You should have received a copy of the GNU General Public License *
16  * along with this program; if not, write to the *
17  * Free Software Foundation, Inc., *
18  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
19  ***************************************************************************/
20 
21 // ADF includes
22 
23 #include "GwLogCollector.h"
24 
25 #ifndef ADF_FrameBlock
26 #include "FrameBlock.h"
27 #endif
28 #ifndef ADF_NarvalInterface
29 #include "NarvalInterface.h"
30 #endif
31 
32 // RO0T includes
33 #ifndef ROOT_TServerSocket
34 #include "TServerSocket.h"
35 #endif
36 #ifndef ROOT_TMonitor
37 #include "TMonitor.h"
38 #endif
39 #ifndef ROOT_TList
40 #include "TList.h"
41 #endif
42 
43 // system include
44 #include <string>
45 
46 using namespace std;
47 using namespace ADF;
48 
49 template <typename Prod_actor>
51 {
52 private:
53  LogMessage fLog;
54 
55 private:
56  Prod_actor *fProducer;
57  OneBlock fBProd;
58  string fPathProd;
59 
60 private:
61  UInt_t fPort;
62  string fUrl;
63 
64 private:
65  TServerSocket *fServ; // server socket
66  TMonitor *fMon; // socket monitor
67  TList *fSockets; // list of open spy sockets
68 
69 protected:
70  void HandleSocket(TSocket *s);
71 
72 public:
73  WatcherServer(const char *path, UInt_t bsize = aMByte);
74  ~WatcherServer();
75 
76  Bool_t Init();
77  Bool_t Run(Int_t askedloop = -1);
78 };
79 
80 template <typename Prod_actor>
81 WatcherServer<Prod_actor>::WatcherServer(const char *path, UInt_t bsize):
82  fLog("WatcherServer"),
83  fProducer(0x0),
84  fBProd(bsize,ConfAgent::kWrite),
85  fPathProd(path),
86  fPort(9090),
87  fUrl("localhost"),
88  fServ(0x0),
89  fMon(0x0),
90  fSockets( new TList() )
91 {
92  fSockets->SetOwner(true);
93 }
94 
95 template <typename Prod_actor>
97 {
98  //
99  if ( fProducer )
100  delete fProducer;
101  if ( fMon )
102  fMon->Remove(fServ);
103  if ( fServ )
104  delete fServ;
105  if ( fMon )
106  delete fMon;
107  if ( fSockets )
108  delete fSockets;
109 }
110 
111 template <typename Prod_actor>
113 {
114  UInt_t err = 0u;
115  fLog.SetProcessMethod("Init");
116 
117  // just in case a actor forget to call it and it is required.
118  NarvalInterface::process_config(fPathProd.data(),&err);
119  if ( err > 0u ) {
120  fLog << warning << " in NarvalInterface::process_config " << nline;
121  }
122 
123  err = 0u;
124  // it creates and inits the different actors
125  // producer
126  Prod_actor::process_config(fPathProd.data(),&err);
127  if ( err > 0u ) {
128  fLog << error << " in process_config for Producer " << dolog;
129  return false;
130  }
131  fProducer = new Prod_actor();
132  fProducer->process_initialise(&err);
133  if ( err > 0u ) {
134  fLog << error << " in process_initialse for Producer " << dolog;
135  return false;
136  }
137 
138  // connection to send buffers
139  fServ = new TServerSocket(fPort, kTRUE);
140 
141  // Add server socket to monitor so we are notified when a client needs to be
142  // accepted
143  fMon = new TMonitor();
144  fMon->Add(fServ);
145 
146  return true;
147 }
148 
149 
150 template <typename Prod_actor>
151 Bool_t WatcherServer<Prod_actor>::Run(Int_t askedloop)
152 {
153  printf("************** EMULATOR MAIN LOOP ************** \n");
154  // loop on events
155  Int_t nb_loop = 0, nb_loop_throwaway = 0; UInt_t err = 0u;
156  while ( 1 ) {
157  // to get an input block
158  if ( gDebug )
159  cout << " Get Block " << endl;
160  err = fProducer->ProcessBlock(fBProd);
161  if ( gDebug )
162  cout << " Block " << fBProd.GetSize() << endl;
163 
164  if ( err != 0u )
165  break;
166  if ( fBProd.GetSize() == 0u )
167  break;
168 
169  TSocket *s;
170  // if ( (s = fMon->Select(2000)) != (TSocket*)-1 )
171  if ( (s = fMon->Select(1)) != (TSocket*)-1 ) {
172  HandleSocket(s);
173  }
174  else {
175  nb_loop_throwaway++;
176  if ( gDebug )
177  cout << " Empty cycle " << nb_loop_throwaway << endl;
178  }
179  nb_loop++;
180 
181  if ( nb_loop == askedloop )
182  break;
183  // stop from ROOT
184 // if (gROOT->IsInterrupted())
185 // break;
186  }
187  printf("nb_loop %d, nb_loop_unsent %d, error %d\n",nb_loop,nb_loop_throwaway,err);
188  printf("************** EMULATOR MAIN LOOP ************** \n");
189  return true;
190 }
191 
192 template <typename Prod_actor>
194 {
195  UInt_t size;
196  if ( gDebug )
197  printf("HANDLE SOCKET \n");
198 
199  // accept new connection from spy and send it data
200  if (s->IsA() == TServerSocket::Class()) {
201  TSocket *sock = ((TServerSocket*)s)->Accept();
202  fMon->Add(sock);
203  fSockets->Add(sock);
204  if ( gDebug )
205  fLog << info << "accepted connection from "
206  << sock->GetInetAddress().GetHostName() << dolog;
207 
208  if ( gDebug )
209  printf("SEND DATA \n");
210  size = fBProd.GetSize() ;
211  sock->SendRaw(&size,sizeof(size));
212  if ( size > 0u )
213  sock->SendRaw(fBProd.GetBufferIO()->GetAddress(),size);
214  }
215  else { // means a socket wants to close the connection
216  fMon->Remove(s);
217  fSockets->Remove(s);
218  if ( gDebug )
219  fLog << info << "closed connection from "
220  << s->GetInetAddress().GetHostName() << dolog;
221  delete s;
222  }
223 }
224 
225 #ifndef ADF_BasicAFP
226 #include "BasicAFP.h"
227 #endif
228 
229 #ifndef ADF_RootAFP
230 #include "RootAFP.h"
231 #endif
232 
233 void RunWatcherServer(const char *path, Int_t nbloop = -1, UInt_t bsize = aMByte)
234 {
235  WatcherServer< RootAFP > aserver(path,bsize);
236 
237  // Init the Producer Actor
238  aserver.Init();
239 
240  // loop
241  aserver.Run(nbloop);
242 }
243 
244 void InfiniteWatcherServer(const char *path, UInt_t bsize = aMByte, UInt_t nb_times = 10000u)
245 {
246  for (UInt_t i = 0u; i <nb_times ; i++) {
247  RunWatcherServer(path,-1,bsize);
248  }
249 }
250 
251 
252 
253 
printf("******************************************************************** \n")
header file for FrameBlock.cpp
LogMessage & error(LogMessage &)
LogMessage & warning(LogMessage &)
LogMessage & nline(LogMessage &)
Bool_t Run(Int_t askedloop=-1)
WatcherServer(const char *path, UInt_t bsize=aMByte)
Definition: WatcherServer.C:81
const UInt_t aMByte
Definition: BaseBuffer.h:36
header file for BasicAFP.cpp
LogMessage & info(LogMessage &)
manipulator to modify the LogMessage
ConfAgent.
Definition: ConfAgent.h:63
LogMessage & dolog(LogMessage &)
void RunWatcherServer(const char *path, Int_t nbloop=-1, UInt_t bsize=aMByte)
Base class for a Log message.
Definition: ADFLogMessage.h:80
header file for RootAFP.cpp
ADF::LogMessage & endl(ADF::LogMessage &log)
header file for NarvalInterface.cpp
void InfiniteWatcherServer(const char *path, UInt_t bsize=aMByte, UInt_t nb_times=10000u)
const Int_t size
Definition: BenchIO.C:24
void HandleSocket(TSocket *s)
It implements an in-memory block of Frames.
Definition: FrameBlock.h:206