1+ // Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+ // See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+ // All rights not expressly granted are reserved.
4+ //
5+ // This software is distributed under the terms of the GNU General Public
6+ // License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+ //
8+ // In applying this license CERN does not waive the privileges and immunities
9+ // granted to it by virtue of its status as an Intergovernmental Organization
10+ // or submit itself to any jurisdiction.
11+
12+ // /
13+ // / \file Location.cxx
14+ // / \author Julian Myrcha
15+ // /
16+
17+ #include " EventVisualisationDataConverter/Location.h"
18+ #include < fairlogger/Logger.h>
19+ #include < sys/socket.h>
20+ #include < unistd.h>
21+ #include < netdb.h>
22+ #include < fcntl.h>
23+ #include < poll.h>
24+ #include < ctime>
25+
26+ using namespace std ;
27+
28+ namespace o2 ::event_visualisation
29+ {
30+
31+ int connect_with_timeout (const int socket, const struct sockaddr * addr, socklen_t addrlen, const unsigned int timeout_ms)
32+ {
33+ int connection = 0 ;
34+ // Setting O_NONBLOCK
35+ int socket_flags_before;
36+ if ((socket_flags_before = fcntl (socket, F_GETFL, 0 ) < 0 )) {
37+ return -1 ;
38+ }
39+ if (fcntl (socket, F_SETFL, socket_flags_before | O_NONBLOCK) < 0 ) {
40+ return -1 ;
41+ }
42+ do {
43+ if (connect (socket, addr, addrlen) < 0 ) {
44+ if ((errno != EWOULDBLOCK) && (errno != EINPROGRESS)) {
45+ connection = -1 ; // error
46+ } else { // wait for complete
47+ // deadline 'timeout' ms from now
48+ timespec now; // NOLINT(*-pro-type-member-init)
49+ if (clock_gettime (CLOCK_MONOTONIC, &now) < 0 ) {
50+ connection = -1 ;
51+ break ;
52+ }
53+ const timespec deadline = {.tv_sec = now.tv_sec ,
54+ .tv_nsec = now.tv_nsec + timeout_ms * 1000000l };
55+ do {
56+ if (clock_gettime (CLOCK_MONOTONIC, &now) < 0 ) {
57+ connection = -1 ;
58+ break ;
59+ }
60+ // compute remaining deadline
61+ const int ms_until_deadline = static_cast <int >((deadline.tv_sec - now.tv_sec ) * 1000l + (deadline.tv_nsec - now.tv_nsec ) / 1000000l );
62+ if (ms_until_deadline < 0 ) {
63+ connection = 0 ;
64+ break ;
65+ }
66+ pollfd connectionPool[] = {{.fd = socket, .events = POLLOUT}};
67+ connection = poll (connectionPool, 1 , ms_until_deadline);
68+
69+ if (connection > 0 ) { // confirm the success
70+ int error = 0 ;
71+ socklen_t len = sizeof (error);
72+ if (getsockopt (socket, SOL_SOCKET, SO_ERROR, &error, &len) == 0 ) {
73+ errno = error;
74+ }
75+ if (error != 0 ) {
76+ connection = -1 ;
77+ }
78+ }
79+ } while (connection == -1 && errno == EINTR); // If interrupted, try again.
80+ if (connection == 0 ) {
81+ errno = ETIMEDOUT;
82+ connection = -1 ;
83+ }
84+ }
85+ }
86+ } while (false );
87+ // Restore socket state
88+ if (fcntl (socket, F_SETFL, socket_flags_before) < 0 ) {
89+ return -1 ;
90+ }
91+ return connection;
92+ }
93+
94+ void Location::open ()
95+ {
96+ if (this ->mToFile ) {
97+ this ->mOut = new std::ofstream (mFileName , std::ios::out | std::ios::binary);
98+ }
99+ if (this ->mToSocket ) {
100+ // resolve host name
101+ sockaddr_in serverAddress; // NOLINT(*-pro-type-member-init)
102+ serverAddress.sin_family = AF_INET;
103+ serverAddress.sin_port = htons (this ->mPort ); // Port number
104+
105+ // ask once
106+ static auto server = gethostbyname (this ->mHostName .c_str ());
107+ if (server == nullptr ) {
108+ LOGF (info, " Error no such host %s" , this ->mHostName .c_str ());
109+ return ;
110+ };
111+
112+ bcopy ((char *)server->h_addr ,
113+ (char *)&serverAddress.sin_addr .s_addr ,
114+ server->h_length );
115+
116+ // Connect to the server
117+ this ->mClientSocket = socket (AF_INET, SOCK_STREAM, 0 );
118+ if (this ->mClientSocket == -1 ) {
119+ LOGF (info, " Error creating socket" );
120+ return ;
121+ }
122+
123+ if (connect_with_timeout (this ->mClientSocket , (sockaddr*)&serverAddress,
124+ sizeof (serverAddress), this ->mTimeout ) == -1 ) {
125+ LOGF (info, " Error connecting to %s:%d" , this ->mHostName .c_str (), this ->mPort );
126+ ::close (this ->mClientSocket );
127+ this ->mClientSocket = -1 ;
128+ return ;
129+ }
130+ try {
131+ char buf[256 ] = " SEND:" ;
132+ strncpy (buf + 6 , this ->mFileName .c_str (), sizeof (buf) - 7 );
133+ strncpy (buf + sizeof (buf) - 6 , " ALICE" , 6 );
134+ auto real = send (this ->mClientSocket , buf, sizeof (buf), 0 );
135+ if (real != sizeof (buf)) {
136+ throw real;
137+ }
138+ } catch (...) {
139+ ::close (this ->mClientSocket );
140+ this ->mClientSocket = -1 ;
141+ LOGF (info, " Error sending file name to %s:%d" , this ->mHostName .c_str (), this ->mPort );
142+ }
143+ }
144+ }
145+
146+ void Location::close ()
147+ {
148+ if (this ->mToFile && this ->mOut ) {
149+ this ->mOut ->close ();
150+ delete this ->mOut ;
151+ this ->mOut = nullptr ;
152+ }
153+ if (this ->mToSocket && this ->mClientSocket != -1 ) {
154+ ::close (this ->mClientSocket );
155+ this ->mClientSocket = -1 ;
156+ }
157+ }
158+
159+ void Location::write (char * buf, std::streamsize size)
160+ {
161+ if (size == 0 ) {
162+ return ;
163+ }
164+ if (this ->mToFile && this ->mOut ) {
165+ this ->mOut ->write (buf, size);
166+ }
167+ if (this ->mToSocket && this ->mClientSocket != -1 ) {
168+ LOGF (info, " Location::write() socket %s ++++++++++++++++++++++" , fileName ());
169+ try {
170+ auto real = send (this ->mClientSocket , buf, size, 0 );
171+ if (real != size) {
172+ throw real;
173+ }
174+ } catch (...) {
175+ ::close (this ->mClientSocket );
176+ this ->mClientSocket = -1 ;
177+ LOGF (info, " Error sending data to %s:%d" , this ->mHostName .c_str (), this ->mPort );
178+ }
179+ }
180+ }
181+
182+ } // namespace o2::event_visualisation
0 commit comments