GammaWare  Head Version for release 0.9
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
WatcherProducer.cpp
Go to the documentation of this file.
1 
2 
3 
4 #ifndef Gw_WatcherProducer
5 #include "WatcherProducer.h"
6 #endif
7 
8 // root includes
9 #ifndef ROOT_TROOT
10 #include "TROOT.h"
11 #endif
12 #ifndef ROOT_TSystem
13 #include "TSystem.h"
14 #endif
15 #ifndef ROOT_TH3
16 #include "TH3.h"
17 #endif
18 
19 // ADF include
20 #ifndef ADF_FrameIO
21 #include "FrameIO.h"
22 #endif
23 
24 using namespace ADF;
25 using namespace Gw;
26 
27 //#define APPLY_PATCH
28 
29 #ifdef APPLY_PATCH
30 
31 #include <stdio.h> /* for printf() and fprintf() */
32 #include <stdlib.h> /* for malloc */
33 #include <sys/socket.h> /* for socket(), connect(), send(), and recv() */
34 #include <arpa/inet.h> /* for sockaddr_in and inet_addr() */
35 #include <string.h> /* for memset() */
36 #include <unistd.h> /* for close() */
37 #include <errno.h>
38 #include <netinet/in.h>
39 #include <netdb.h>
40 
41 // command to know if a port is open : netstat -a | egrep 'Proto|LISTEN'
42 
43 namespace {
44  int sock = 0;
45 }
46 #endif
47 
48 void WatcherProducer::process_reset (UInt_t */*error_code*/)
49 {
50  CloseSocket();
51 }
52 
53 WatcherProducer::~WatcherProducer()
54 {
55  UInt_t error_code;
56  process_reset(&error_code);
57 }
58 
59 Bool_t WatcherProducer::SetConnection(const Char_t *machine, UInt_t port)
60 {
61  fMachine = machine;
62  fPort = port;
63 
64  return true;
65 /*
66  if ( fSocket )
67  { delete fSocket; }
68  fSocket = new TSocket(machine,port);
69 
70  return fSocket->IsValid(); */
71 }
72 
73 Bool_t WatcherProducer::OpenSocket()
74 {
75  // just in case
76 #ifdef APPLY_PATCH
77  struct sockaddr_in servAddr; /* server address */
78  struct sockaddr_in sin_me;
79  int one;
80  struct hostent *host_addr;
81  unsigned tmp;
82 
83  one=1;
84 // printf("toto 1\n");
85  sock = socket(AF_INET, SOCK_STREAM, 0);
86  if (sock == 0) {
87  strerror(errno);
88  return false;
89  }
90 // printf("toto 2\n");
91 
92  memset((char *)&sin_me, 0, sizeof(sin_me));
93 // printf("toto 3\n");
94  sin_me.sin_port = 0; /* free choice */
95  memset(&servAddr, 0, sizeof(servAddr));
96 // printf("toto 4 %s \n",fMachine.Data());
97  servAddr.sin_family = AF_INET; /* Internet address family */
98  host_addr = gethostbyname(fMachine.Data());
99 // printf("toto 5\n");
100  servAddr.sin_family = host_addr->h_addrtype;
101 
102  memcpy((char *)&tmp,host_addr->h_addr,host_addr->h_length);
103 // memcpy((char *)&tmp,host_addr->h_addr,sizeof(unsigned));
104 
105 // printf("toto 6\n");
106  servAddr.sin_addr.s_addr = tmp;
107  servAddr.sin_port = htons(fPort); /* Server port */
108 // printf("toto 7\n");
109  setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
110  bind(sock, (struct sockaddr *)&sin_me, sizeof(sin_me));
111 // printf("titi %d %d \n",servAddr.sin_port,fPort);
112 
113  Int_t val = 0;
114  fcntl(sock, F_SETFL, O_NONBLOCK);
115  if ( ( val = connect(sock, (struct sockaddr *) &servAddr, sizeof(servAddr)) ) < 0)
116  {
117  close(sock);
118 
119  printf("toto 3 %d %s \n",val,strerror(errno));
120  return false;
121  }
122 
123 #else
124  CloseSocket();
125  // printf(" --> TRYING TO OPEN %s %d \n",fMachine.Data(),fPort);
126  if ( (fSocket = new TSocket(fMachine.Data(),fPort)) ) {
127  return fSocket->IsValid();
128  }
129 #endif
130  return true;
131 }
132 
133 Bool_t WatcherProducer::CloseSocket()
134 {
135 #ifdef APPLY_PATCH
136  if ( sock )
137  close(sock);
138 #else
139  if ( fSocket )
140  { delete fSocket; fSocket = 0x0; }
141 #endif
142  return true;
143 }
144 
145 void WatcherProducer::SetTimeout(Long_t timeout)
146 {
147  fTimeout = timeout;
148 }
149 
150 UInt_t WatcherProducer::ProcessBlock (FrameBlock &block)
151 {
152  // 0 ok
153  // 1 means socket is not good
154  // 2 means the real size of the next buffer is 0
155  // 3 means failed to get the size of the next buf from the socket
156  // 4 means too large buffer
157  // 5 means fails to get the whole buffer from the socket
158  UInt_t error_code = 0u; Int_t bsize = 0; Int_t ret;
159 
160  // check the connection
161  if ( !OpenSocket() )
162  return 1u;
163 
165 /*
166  ret = fSocket->RecvRaw(&bsize, sizeof(UInt_t),kDontBlock);
167  if ( gDebug )
168  Log << debug << " Receive size of the buffer " << ret << dolog;
169 
170  if ( ret == sizeof(UInt_t) && bsize > 0u ) {
171  // if possible get the full buffer and ask ProcessBlock to treat it
172  fBuffer.Offset();
173  if ( fBuffer.Reserve(bsize) ) {
174  if ( gDebug )
175  Log << debug << " Ask for the buffer " << dolog;
176 
177  ret = fSocket->RecvRaw(fBuffer.Address(), bsize);
178  if ( gDebug )
179  Log << debug << " Receive buffer with size " << ret << dolog;
180  if ( ret == bsize ) {
181  // used linked with fBuffer, so no need to allocated a large block
182  block.SetBlock(fBuffer.Address(),bsize,'-');
183  if ( gDebug )
184  Log << info << "A new block has been received " << bsize << dolog;
185  }
186  else error_code = 5u;
187  }
188  else error_code = 4u;
189  }
190  else {
191  if ( bsize == 0 )
192  error_code = 2u;
193  else
194  error_code = 3u;
195  }
196 */
198 
200 /*
201  if ( gDebug )
202  Log << debug << " Wait " << fTimeout << " to see if there are data " << dolog;
203 
204  switch ( fSocket->Select(TSocket::kRead,fTimeout) ) {
205  case -1:
206  error_code = 1u;
207  break;
208  case 0:
209  error_code = 2u;
210  break;
211  default:
212  ret = fSocket->RecvRaw(&bsize, sizeof(UInt_t),kDontBlock);
213  if ( gDebug )
214  Log << debug << " Receive size of the buffer " << ret << dolog;
215 
216  if ( ret == sizeof(UInt_t) && bsize > 0 ) {
217  // if possible get the full buffer and ask ProcessBlock to treat it
218  fBuffer.Offset();
219  if ( fBuffer.Reserve(bsize) ) {
220  if ( gDebug )
221  Log << debug << " Ask for the buffer " << dolog;
222  ret = fSocket->RecvRaw(fBuffer.Address(), bsize);
223  if ( gDebug )
224  Log << debug << " Receive buffer with size " << ret << dolog;
225  if ( ret == bsize ) {
226  // used linked with fBuffer, so no need to allocated a large block
227  block.SetBlock(fBuffer.Address(),bsize,'-');
228  if ( gDebug )
229  Log << info << "A new block has been received " << bsize << dolog;
230  }
231  else error_code = 6u;
232  }
233  else error_code = 5u;
234  }
235  else {
236  if ( bsize == 0 )
237  error_code = 3u;
238  else
239  error_code = 4u;
240  }
241  break;
242  }
243 */
245 
247 #ifdef APPLY_PATCH
248  ret = recv (sock, &bsize, sizeof (UInt_t), 0);
249 // ret = fSocket->RecvRaw(&bsize, sizeof(UInt_t),kDontBlock);
250  if ( gDebug )
251  Log << debug << " Receive size of the buffer " << ret << dolog;
252 
253  if ( ret == sizeof(UInt_t) && bsize > 0 ) {
254  // if possible get the full buffer and ask ProcessBlock to treat it
255  fBuffer.Offset();
256  if ( fBuffer.Reserve(bsize) ) {
257  if ( gDebug )
258  Log << debug << " Ask for the buffer " << dolog;
259  // new code !
260  int total_received_data = 0;
261  while (total_received_data < bsize)
262  {
263  ret = recv (sock, fBuffer.CurrentAddress(),bsize, 0);
264 // ret = fSocket->RecvRaw(fBuffer.CurrentAddress(), bsize);
265  total_received_data += ret;
266  if ( ret < 1 ) {
267  break;
268  }
269  fBuffer.SetOffset(total_received_data);
270  // buf += received_data ;
271  }
272  if ( total_received_data == bsize ) {
273  // used linked with fBuffer, so no need to allocated a large block
274  block.SetBlock(fBuffer.Address(),bsize,'-');
275  if ( gDebug )
276  Log << info << "A new block has been received " << bsize << dolog;
277  }
278  else error_code = 5u;
279  }
280  else error_code = 4u;
281  }
282  else {
283  if ( bsize == 0 )
284  error_code = 2u;
285  else
286  error_code = 3u;
287  }
288 #else
289  ret = fSocket->RecvRaw(&bsize, sizeof(UInt_t),kDontBlock);
290  if ( gDebug )
291  Log << debug << " Receive size of the buffer " << ret << dolog;
292 
293  if ( ret == sizeof(UInt_t) && bsize > 0 ) {
294  // if possible get the full buffer and ask ProcessBlock to treat it
295  fBuffer.Offset();
296  if ( fBuffer.Reserve(bsize) ) {
297  if ( gDebug )
298  Log << debug << " Ask for the buffer " << dolog;
299 
300  switch ( fSocket->Select(TSocket::kRead,2) )
301  {
302  case 0:
303  Log << info << "--> timeout " << dolog;
304  error_code = 2u;
305  break;
306  case -1:
307  Log << info << "--> Error " << dolog;
308  error_code = 5u;
309  break;
310  default:
311  // fSocket->RecvRaw(&bsize, sizeof(Int_t));
312  // cout << " Size expected " << bsize << endl;
313  ret = fSocket->RecvRaw(fBuffer.Address(), bsize);
314  if ( ret == bsize ) {
315  block.SetBlock(fBuffer.Address(),bsize,'-');
316  }
317  else
318  Log << info << " Size received too small " << ret << " expected " << bsize << dolog;
319  // cout << " Size received " << ret << endl;
320 
321  break;
322  }
323 
324  // new code !
325 /*
326  int total_received_data = 0;
327  while (total_received_data < bsize)
328  {
329  ret = fSocket->RecvRaw(fBuffer.CurrentAddress(), bsize);
330  total_received_data += ret;
331  if ( ret < 1 ) {
332  break;
333  }
334  fBuffer.SetOffset(total_received_data);
335  // buf += received_data ;
336  }
337  if ( total_received_data == bsize ) {
338  // used linked with fBuffer, so no need to allocated a large block
339  block.SetBlock(fBuffer.Address(),bsize,'-');
340  if ( gDebug )
341  Log << info << "A new block has been received " << bsize << dolog;
342  }
343  else error_code = 5u;
344 */
345  }
346  else error_code = 4u;
347  }
348  else {
349  if ( bsize == 0 )
350  error_code = 2u;
351  else
352  error_code = 3u;
353  }
354 
355 #endif
356 
358 
359  // Narval way to work: the connection is open only to transfert one buffer
360  CloseSocket();
361  // return error
362  return error_code;
363 }
364 
365 
366 
printf("******************************************************************** \n")
LogMessage & info(LogMessage &)
manipulator to modify the LogMessage
A Block of Frames.
Definition: FrameBlock.h:43
LogMessage & debug(LogMessage &)
virtual Bool_t SetBlock(Char_t *=0x0, UInt_t=0u, char= 'r', char= 'l', UInt_t=0u)=0
Attach a external block to this.
header file for FrameIO.cpp
LogMessage & dolog(LogMessage &)