agent: added options to disable IPv4 or IPv6
[public/netxms.git] / src / agent / core / comm.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: comm.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25
26 //
27 // Global variables
28 //
29
30 UINT32 g_dwAcceptErrors = 0;
31 UINT32 g_dwAcceptedConnections = 0;
32 UINT32 g_dwRejectedConnections = 0;
33 CommSession **g_pSessionList = NULL;
34 MUTEX g_hSessionListAccess;
35
36 /**
37 * Static data
38 */
39 static MUTEX m_mutexWatchdogActive = INVALID_MUTEX_HANDLE;
40
41 /**
42 * Initialize session list
43 */
44 void InitSessionList()
45 {
46 // Create session list and it's access mutex
47 g_dwMaxSessions = min(max(g_dwMaxSessions, 2), 1024);
48 g_pSessionList = (CommSession **)malloc(sizeof(CommSession *) * g_dwMaxSessions);
49 memset(g_pSessionList, 0, sizeof(CommSession *) * g_dwMaxSessions);
50 g_hSessionListAccess = MutexCreate();
51 }
52
53 /**
54 * Validates server's address
55 */
56 static bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer)
57 {
58 for(int i = 0; i < g_serverList.size(); i++)
59 {
60 ServerInfo *s = g_serverList.get(i);
61 if (s->match(addr))
62 {
63 *pbMasterServer = s->isMaster();
64 *pbControlServer = s->isControl();
65 return true;
66 }
67 }
68 return false;
69 }
70
71 /**
72 * Register new session in list
73 */
74 static BOOL RegisterSession(CommSession *pSession)
75 {
76 UINT32 i;
77
78 MutexLock(g_hSessionListAccess);
79 for(i = 0; i < g_dwMaxSessions; i++)
80 if (g_pSessionList[i] == NULL)
81 {
82 g_pSessionList[i] = pSession;
83 pSession->setIndex(i);
84 MutexUnlock(g_hSessionListAccess);
85 return TRUE;
86 }
87
88 MutexUnlock(g_hSessionListAccess);
89 nxlog_write(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
90 return FALSE;
91 }
92
93 /**
94 * Unregister session
95 */
96 void UnregisterSession(UINT32 dwIndex)
97 {
98 MutexLock(g_hSessionListAccess);
99 g_pSessionList[dwIndex] = NULL;
100 MutexUnlock(g_hSessionListAccess);
101 }
102
103 /**
104 * TCP/IP Listener
105 */
106 THREAD_RESULT THREAD_CALL ListenerThread(void *)
107 {
108 // Create socket(s)
109 SOCKET hSocket = (g_dwFlags & AF_DISABLE_IPV4) ? INVALID_SOCKET : socket(AF_INET, SOCK_STREAM, 0);
110 #ifdef WITH_IPV6
111 SOCKET hSocket6 = (g_dwFlags & AF_DISABLE_IPV6) ? INVALID_SOCKET : socket(AF_INET6, SOCK_STREAM, 0);
112 #endif
113 if (((hSocket == INVALID_SOCKET) && !(g_dwFlags & AF_DISABLE_IPV4))
114 #ifdef WITH_IPV6
115 && ((hSocket6 == INVALID_SOCKET) && !(g_dwFlags & AF_DISABLE_IPV6))
116 #endif
117 )
118 {
119 nxlog_write(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
120 exit(1);
121 }
122
123 if (!(g_dwFlags & AF_DISABLE_IPV4))
124 {
125 SetSocketExclusiveAddrUse(hSocket);
126 SetSocketReuseFlag(hSocket);
127 #ifndef _WIN32
128 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
129 #endif
130 }
131
132 #ifdef WITH_IPV6
133 if (!(g_dwFlags & AF_DISABLE_IPV6))
134 {
135 SetSocketExclusiveAddrUse(hSocket6);
136 SetSocketReuseFlag(hSocket6);
137 #ifndef _WIN32
138 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
139 #endif
140 }
141 #endif
142
143 // Fill in local address structure
144 struct sockaddr_in servAddr;
145 memset(&servAddr, 0, sizeof(struct sockaddr_in));
146 servAddr.sin_family = AF_INET;
147
148 #ifdef WITH_IPV6
149 struct sockaddr_in6 servAddr6;
150 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
151 servAddr6.sin6_family = AF_INET6;
152 #endif
153
154 if (!_tcscmp(g_szListenAddress, _T("*")))
155 {
156 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
157 #ifdef WITH_IPV6
158 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
159 #endif
160 }
161 else
162 {
163 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
164 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
165 {
166 servAddr.sin_addr.s_addr = bindAddress.getAddressV4();
167 }
168 else
169 {
170 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
171 }
172 #ifdef WITH_IPV6
173 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
174 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
175 {
176 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
177 }
178 else
179 {
180 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
181 servAddr6.sin6_addr.s6_addr[15] = 1;
182 }
183 #endif
184 }
185 servAddr.sin_port = htons(g_wListenPort);
186 #ifdef WITH_IPV6
187 servAddr6.sin6_port = htons(g_wListenPort);
188 #endif
189
190 // Bind socket
191 TCHAR buffer[64];
192 int bindFailures = 0;
193 if (!(g_dwFlags & AF_DISABLE_IPV4))
194 {
195 DebugPrintf(INVALID_INDEX, 1, _T("Trying to bind on %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
196 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
197 {
198 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
199 bindFailures++;
200 }
201 }
202 else
203 {
204 bindFailures++;
205 }
206
207 #ifdef WITH_IPV6
208 if (!(g_dwFlags & AF_DISABLE_IPV6))
209 {
210 DebugPrintf(INVALID_INDEX, 1, _T("Trying to bind on [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
211 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
212 {
213 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
214 bindFailures++;
215 }
216 }
217 else
218 {
219 bindFailures++;
220 }
221 #else
222 bindFailures++;
223 #endif
224
225 // Abort if cannot bind to socket
226 if (bindFailures == 2)
227 {
228 exit(1);
229 }
230
231 // Set up queue
232 if (!(g_dwFlags & AF_DISABLE_IPV4))
233 {
234 listen(hSocket, SOMAXCONN);
235 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), g_wListenPort);
236 }
237 #ifdef WITH_IPV6
238 if (!(g_dwFlags & AF_DISABLE_IPV6))
239 {
240 listen(hSocket6, SOMAXCONN);
241 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, g_wListenPort);
242 }
243 #endif
244
245 // Wait for connection requests
246 int errorCount = 0;
247 while(!(g_dwFlags & AF_SHUTDOWN))
248 {
249 struct timeval tv;
250 tv.tv_sec = 1;
251 tv.tv_usec = 0;
252
253 fd_set rdfs;
254 FD_ZERO(&rdfs);
255 if (hSocket != INVALID_SOCKET)
256 FD_SET(hSocket, &rdfs);
257 #ifdef WITH_IPV6
258 if (hSocket6 != INVALID_SOCKET)
259 FD_SET(hSocket6, &rdfs);
260 #endif
261
262 #if defined(WITH_IPV6) && !defined(_WIN32)
263 SOCKET nfds = 0;
264 if (hSocket != INVALID_SOCKET)
265 nfds = hSocket;
266 if ((hSocket6 != INVALID_SOCKET) && (hSocket6 > nfds))
267 nfds = hSocket6;
268 int nRet = select(SELECT_NFDS(nfds + 1), &rdfs, NULL, NULL, &tv);
269 #else
270 int nRet = select(SELECT_NFDS(hSocket + 1), &rdfs, NULL, NULL, &tv);
271 #endif
272 if ((nRet > 0) && (!(g_dwFlags & AF_SHUTDOWN)))
273 {
274 char clientAddr[128];
275 socklen_t size = 128;
276 #ifdef WITH_IPV6
277 SOCKET hClientSocket = accept(FD_ISSET(hSocket, &rdfs) ? hSocket : hSocket6, (struct sockaddr *)clientAddr, &size);
278 #else
279 SOCKET hClientSocket = accept(hSocket, (struct sockaddr *)clientAddr, &size);
280 #endif
281 if (hClientSocket == INVALID_SOCKET)
282 {
283 int error = WSAGetLastError();
284
285 if (error != WSAEINTR)
286 nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
287 errorCount++;
288 g_dwAcceptErrors++;
289 if (errorCount > 1000)
290 {
291 nxlog_write(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
292 errorCount = 0;
293 }
294 ThreadSleepMs(500);
295 continue;
296 }
297
298 // Socket should be closed on successful exec
299 #ifndef _WIN32
300 fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
301 #endif
302
303 errorCount = 0; // Reset consecutive errors counter
304 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
305 DebugPrintf(INVALID_INDEX, 5, _T("Incoming connection from %s"), addr.toString(buffer));
306
307 bool masterServer, controlServer;
308 if (IsValidServerAddress(addr, &masterServer, &controlServer))
309 {
310 g_dwAcceptedConnections++;
311 DebugPrintf(INVALID_INDEX, 5, _T("Connection from %s accepted"), buffer);
312
313 // Create new session structure and threads
314 CommSession *session = new CommSession(hClientSocket, addr, masterServer, controlServer);
315
316 if (!RegisterSession(session))
317 {
318 delete session;
319 }
320 else
321 {
322 session->run();
323 }
324 }
325 else // Unauthorized connection
326 {
327 g_dwRejectedConnections++;
328 shutdown(hClientSocket, SHUT_RDWR);
329 closesocket(hClientSocket);
330 DebugPrintf(INVALID_INDEX, 5, _T("Connection from %s rejected"), buffer);
331 }
332 }
333 else if (nRet == -1)
334 {
335 int error = WSAGetLastError();
336
337 // On AIX, select() returns ENOENT after SIGINT for unknown reason
338 #ifdef _WIN32
339 if (error != WSAEINTR)
340 #else
341 if ((error != EINTR) && (error != ENOENT))
342 #endif
343 {
344 nxlog_write(MSG_SELECT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
345 ThreadSleepMs(100);
346 }
347 }
348 }
349
350 // Wait for watchdog thread
351 MutexLock(m_mutexWatchdogActive);
352 MutexUnlock(m_mutexWatchdogActive);
353 MutexDestroy(m_mutexWatchdogActive);
354
355 MutexDestroy(g_hSessionListAccess);
356 free(g_pSessionList);
357 closesocket(hSocket);
358 DebugPrintf(INVALID_INDEX, 1, _T("Listener thread terminated"));
359 return THREAD_OK;
360 }
361
362 /**
363 * Session watchdog thread
364 */
365 THREAD_RESULT THREAD_CALL SessionWatchdog(void *)
366 {
367 m_mutexWatchdogActive = MutexCreate();
368 MutexLock(m_mutexWatchdogActive);
369
370 ThreadSleep(5);
371 while(!(g_dwFlags & AF_SHUTDOWN))
372 {
373 MutexLock(g_hSessionListAccess);
374 time_t now = time(NULL);
375 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
376 if (g_pSessionList[i] != NULL)
377 {
378 if (g_pSessionList[i]->getTimeStamp() < (now - (time_t)g_dwIdleTimeout))
379 {
380 DebugPrintf(i, 5, _T("Session disconnected by watchdog (last activity timestamp is ") TIME_T_FMT _T(")"), g_pSessionList[i]->getTimeStamp());
381 g_pSessionList[i]->disconnect();
382 }
383 }
384 MutexUnlock(g_hSessionListAccess);
385 ThreadSleep(5);
386 }
387
388 // Disconnect all sessions
389 MutexLock(g_hSessionListAccess);
390 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
391 if (g_pSessionList[i] != NULL)
392 g_pSessionList[i]->disconnect();
393 MutexUnlock(g_hSessionListAccess);
394
395 ThreadSleep(1);
396 MutexUnlock(m_mutexWatchdogActive);
397 DebugPrintf(INVALID_INDEX, 1, _T("Session Watchdog thread terminated"));
398
399 return THREAD_OK;
400 }
401
402 /**
403 * Handler for Agent.ActiveConnections parameter
404 */
405 LONG H_ActiveConnections(const TCHAR *pszCmd, const TCHAR *pArg, TCHAR *pValue)
406 {
407 int nCounter;
408 UINT32 i;
409
410 MutexLock(g_hSessionListAccess);
411 for(i = 0, nCounter = 0; i < g_dwMaxSessions; i++)
412 if (g_pSessionList[i] != NULL)
413 nCounter++;
414 MutexUnlock(g_hSessionListAccess);
415 ret_int(pValue, nCounter);
416 return SYSINFO_RC_SUCCESS;
417 }