min/max calls replaced with std::min/std::max
[public/netxms.git] / src / agent / core / comm.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS multiplatform core agent
eae56922 3** Copyright (C) 2003-2017 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: comm.cpp
20**
21**/
22
23#include "nxagentd.h"
24
da8453a5
VK
25/**
26 * Statistics
27 */
28UINT32 g_acceptErrors = 0;
29UINT32 g_acceptedConnections = 0;
30UINT32 g_rejectedConnections = 0;
5039dede 31
da8453a5
VK
32/**
33 * Session list
34 */
5039dede
AK
35CommSession **g_pSessionList = NULL;
36MUTEX g_hSessionListAccess;
37
5e60b759
VK
38/**
39 * Static data
40 */
5039dede 41static MUTEX m_mutexWatchdogActive = INVALID_MUTEX_HANDLE;
b368969c 42static VolatileCounter s_messageId = (INT32)time(NULL);
09ecddac
VK
43
44/**
45 * Generate new message ID
46 */
47UINT32 GenerateMessageId()
48{
49 return (UINT32)InterlockedIncrement(&s_messageId);
50}
5039dede 51
13dbd3dd
VK
52/**
53 * Initialize session list
54 */
6aab2983
AK
55void InitSessionList()
56{
3785aead
VK
57 if (g_dwMaxSessions == 0) // default value
58 {
59 g_dwMaxSessions = (g_dwFlags & (AF_ENABLE_PROXY | AF_ENABLE_SNMP_PROXY)) ? 1024 : 32;
60 }
61 else
62 {
78032263 63 g_dwMaxSessions = MIN(MAX(g_dwMaxSessions, 2), 4096);
3785aead
VK
64 }
65 nxlog_debug(2, _T("Maximum number of sessions set to %d"), g_dwMaxSessions);
6aab2983
AK
66 g_pSessionList = (CommSession **)malloc(sizeof(CommSession *) * g_dwMaxSessions);
67 memset(g_pSessionList, 0, sizeof(CommSession *) * g_dwMaxSessions);
68 g_hSessionListAccess = MutexCreate();
69}
70
13dbd3dd 71/**
13783551
VK
72 * Destroy session list
73 */
74void DestroySessionList()
75{
76 MutexDestroy(g_hSessionListAccess);
77 free(g_pSessionList);
78}
79
80/**
13dbd3dd
VK
81 * Validates server's address
82 */
e3ff8ad1 83bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer)
5039dede 84{
9319c166 85 for(int i = 0; i < g_serverList.size(); i++)
968031d9 86 {
9319c166
VK
87 ServerInfo *s = g_serverList.get(i);
88 if (s->match(addr))
5039dede 89 {
9319c166
VK
90 *pbMasterServer = s->isMaster();
91 *pbControlServer = s->isControl();
92 return true;
5039dede 93 }
968031d9 94 }
9319c166 95 return false;
5039dede
AK
96}
97
5e60b759
VK
98/**
99 * Register new session in list
100 */
e3ff8ad1 101bool RegisterSession(CommSession *session)
5039dede 102{
c17f6cbc 103 MutexLock(g_hSessionListAccess);
e3ff8ad1 104 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
5039dede
AK
105 if (g_pSessionList[i] == NULL)
106 {
e3ff8ad1
VK
107 g_pSessionList[i] = session;
108 session->setIndex(i);
5039dede 109 MutexUnlock(g_hSessionListAccess);
e3ff8ad1 110 return true;
5039dede
AK
111 }
112
113 MutexUnlock(g_hSessionListAccess);
114 nxlog_write(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
e3ff8ad1 115 return false;
5039dede
AK
116}
117
5e60b759
VK
118/**
119 * Unregister session
120 */
da8453a5 121void UnregisterSession(UINT32 index, UINT32 id)
5039dede 122{
c17f6cbc 123 MutexLock(g_hSessionListAccess);
da8453a5
VK
124 if ((g_pSessionList[index] != NULL) && (g_pSessionList[index]->getId() == id))
125 {
126 g_pSessionList[index]->debugPrintf(4, _T("Session unregistered"));
127 g_pSessionList[index] = NULL;
128 }
5039dede
AK
129 MutexUnlock(g_hSessionListAccess);
130}
131
1a93e64a 132/**
34b9a3e7
VK
133 * Enumerates active agent sessions. Callback will be called for each valid session.
134 * Callback must return _STOP to stop enumeration or _CONTINUE to continue.
135 *
136 * @return true if enumeration was stopped by callback
137 */
138bool EnumerateSessions(EnumerationCallbackResult (* callback)(AbstractCommSession *, void *), void *data)
139{
140 bool result = false;
141 MutexLock(g_hSessionListAccess);
142 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
143 {
144 if (g_pSessionList[i] == NULL)
145 continue;
146
147 if (callback(g_pSessionList[i], data) == _STOP)
148 {
149 result = true;
150 break;
151 }
152 }
153 MutexUnlock(g_hSessionListAccess);
154 return result;
155}
156
157/**
6fbaa926
VK
158 * Find server session. Caller must call decRefCount() for session object when finished.
159 */
160AbstractCommSession *FindServerSession(UINT64 serverId)
161{
162 AbstractCommSession *session = NULL;
163 MutexLock(g_hSessionListAccess);
164 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
165 {
d1dfba1a 166 if ((g_pSessionList[i] != NULL) && (g_pSessionList[i]->getServerId() == serverId))
6fbaa926
VK
167 {
168 session = g_pSessionList[i];
169 session->incRefCount();
170 break;
171 }
172 }
173 MutexUnlock(g_hSessionListAccess);
174 return session;
175}
176
177/**
a1273b42
VK
178 * Find server session using comparator callback. Caller must call decRefCount() for session object when finished.
179 */
180AbstractCommSession *FindServerSession(bool (*comparator)(AbstractCommSession *, void *), void *userData)
181{
182 AbstractCommSession *session = NULL;
183 MutexLock(g_hSessionListAccess);
184 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
185 {
186 if ((g_pSessionList[i] != NULL) && (comparator(g_pSessionList[i], userData)))
187 {
188 session = g_pSessionList[i];
189 session->incRefCount();
190 break;
191 }
192 }
193 MutexUnlock(g_hSessionListAccess);
194 return session;
195}
196
197/**
1a93e64a
VK
198 * TCP/IP Listener
199 */
5039dede
AK
200THREAD_RESULT THREAD_CALL ListenerThread(void *)
201{
c7853753 202 // Create socket(s)
6239e6a4 203 SOCKET hSocket = (g_dwFlags & AF_DISABLE_IPV4) ? INVALID_SOCKET : socket(AF_INET, SOCK_STREAM, 0);
c7853753 204#ifdef WITH_IPV6
6239e6a4 205 SOCKET hSocket6 = (g_dwFlags & AF_DISABLE_IPV6) ? INVALID_SOCKET : socket(AF_INET6, SOCK_STREAM, 0);
c7853753 206#endif
6239e6a4 207 if (((hSocket == INVALID_SOCKET) && !(g_dwFlags & AF_DISABLE_IPV4))
c7853753 208#ifdef WITH_IPV6
6239e6a4 209 && ((hSocket6 == INVALID_SOCKET) && !(g_dwFlags & AF_DISABLE_IPV6))
c7853753
VK
210#endif
211 )
5039dede
AK
212 {
213 nxlog_write(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
214 exit(1);
215 }
216
6239e6a4
VK
217 if (!(g_dwFlags & AF_DISABLE_IPV4))
218 {
219 SetSocketExclusiveAddrUse(hSocket);
220 SetSocketReuseFlag(hSocket);
5039dede 221#ifndef _WIN32
6239e6a4 222 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
5039dede 223#endif
6239e6a4 224 }
5039dede 225
c7853753 226#ifdef WITH_IPV6
6239e6a4
VK
227 if (!(g_dwFlags & AF_DISABLE_IPV6))
228 {
b8adb0f2
VK
229 SetSocketExclusiveAddrUse(hSocket6);
230 SetSocketReuseFlag(hSocket6);
c7853753 231#ifndef _WIN32
6239e6a4 232 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
c7853753 233#endif
b8adb0f2
VK
234#ifdef IPV6_V6ONLY
235 int on = 1;
4864fd4a 236 setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
b8adb0f2 237#endif
6239e6a4 238 }
c7853753
VK
239#endif
240
5039dede 241 // Fill in local address structure
c7853753 242 struct sockaddr_in servAddr;
5039dede
AK
243 memset(&servAddr, 0, sizeof(struct sockaddr_in));
244 servAddr.sin_family = AF_INET;
c7853753
VK
245
246#ifdef WITH_IPV6
247 struct sockaddr_in6 servAddr6;
248 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
249 servAddr6.sin6_family = AF_INET6;
250#endif
251
252 if (!_tcscmp(g_szListenAddress, _T("*")))
5039dede
AK
253 {
254 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
c7853753
VK
255#ifdef WITH_IPV6
256 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
257#endif
5039dede
AK
258 }
259 else
260 {
c7853753
VK
261 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
262 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
263 {
b4533910 264 servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
c7853753
VK
265 }
266 else
267 {
268 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
269 }
270#ifdef WITH_IPV6
271 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
272 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
273 {
274 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
275 }
276 else
277 {
278 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
279 servAddr6.sin6_addr.s6_addr[15] = 1;
280 }
281#endif
5039dede
AK
282 }
283 servAddr.sin_port = htons(g_wListenPort);
c7853753
VK
284#ifdef WITH_IPV6
285 servAddr6.sin6_port = htons(g_wListenPort);
286#endif
5039dede
AK
287
288 // Bind socket
c7853753
VK
289 TCHAR buffer[64];
290 int bindFailures = 0;
6239e6a4
VK
291 if (!(g_dwFlags & AF_DISABLE_IPV4))
292 {
da8453a5 293 DebugPrintf(1, _T("Trying to bind on %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
6239e6a4
VK
294 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
295 {
296 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
297 bindFailures++;
298 }
299 }
300 else
5039dede 301 {
c7853753
VK
302 bindFailures++;
303 }
304
305#ifdef WITH_IPV6
6239e6a4
VK
306 if (!(g_dwFlags & AF_DISABLE_IPV6))
307 {
da8453a5 308 DebugPrintf(1, _T("Trying to bind on [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
6239e6a4
VK
309 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
310 {
311 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
312 bindFailures++;
313 }
314 }
315 else
c7853753 316 {
c7853753
VK
317 bindFailures++;
318 }
319#else
320 bindFailures++;
321#endif
322
323 // Abort if cannot bind to socket
324 if (bindFailures == 2)
325 {
5039dede
AK
326 exit(1);
327 }
328
329 // Set up queue
6239e6a4
VK
330 if (!(g_dwFlags & AF_DISABLE_IPV4))
331 {
b8adb0f2
VK
332 if (listen(hSocket, SOMAXCONN) == 0)
333 {
334 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), g_wListenPort);
335 }
336 else
337 {
338 closesocket(hSocket);
339 hSocket = INVALID_SOCKET;
340 }
6239e6a4 341 }
c7853753 342#ifdef WITH_IPV6
6239e6a4
VK
343 if (!(g_dwFlags & AF_DISABLE_IPV6))
344 {
b8adb0f2
VK
345 if (listen(hSocket6, SOMAXCONN) == 0)
346 {
347 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, g_wListenPort);
348 }
349 else
350 {
351 closesocket(hSocket6);
352 hSocket6 = INVALID_SOCKET;
353 }
6239e6a4 354 }
c7853753 355#endif
5039dede 356
5039dede 357 // Wait for connection requests
fdd1d56b 358 SocketPoller sp;
c7853753 359 int errorCount = 0;
5039dede
AK
360 while(!(g_dwFlags & AF_SHUTDOWN))
361 {
fdd1d56b 362 sp.reset();
6239e6a4 363 if (hSocket != INVALID_SOCKET)
fdd1d56b 364 sp.add(hSocket);
c7853753 365#ifdef WITH_IPV6
6239e6a4 366 if (hSocket6 != INVALID_SOCKET)
fdd1d56b 367 sp.add(hSocket6);
c7853753
VK
368#endif
369
fdd1d56b 370 int nRet = sp.poll(1000);
5039dede
AK
371 if ((nRet > 0) && (!(g_dwFlags & AF_SHUTDOWN)))
372 {
c7853753 373 char clientAddr[128];
202729e9 374 socklen_t size = 128;
c7853753 375#ifdef WITH_IPV6
fdd1d56b 376 SOCKET hClientSocket = accept(sp.isSet(hSocket) ? hSocket : hSocket6, (struct sockaddr *)clientAddr, &size);
c7853753
VK
377#else
378 SOCKET hClientSocket = accept(hSocket, (struct sockaddr *)clientAddr, &size);
379#endif
380 if (hClientSocket == INVALID_SOCKET)
5039dede
AK
381 {
382 int error = WSAGetLastError();
383
384 if (error != WSAEINTR)
385 nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
c7853753 386 errorCount++;
da8453a5 387 g_acceptErrors++;
c7853753 388 if (errorCount > 1000)
5039dede
AK
389 {
390 nxlog_write(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
c7853753 391 errorCount = 0;
5039dede
AK
392 }
393 ThreadSleepMs(500);
394 continue;
395 }
396
397 // Socket should be closed on successful exec
398#ifndef _WIN32
399 fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
400#endif
401
c7853753
VK
402 errorCount = 0; // Reset consecutive errors counter
403 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
da8453a5 404 DebugPrintf(5, _T("Incoming connection from %s"), addr.toString(buffer));
5039dede 405
9319c166
VK
406 bool masterServer, controlServer;
407 if (IsValidServerAddress(addr, &masterServer, &controlServer))
5039dede 408 {
da8453a5
VK
409 g_acceptedConnections++;
410 DebugPrintf(5, _T("Connection from %s accepted"), buffer);
5039dede
AK
411
412 // Create new session structure and threads
eae56922
VK
413 SocketCommChannel *channel = new SocketCommChannel(hClientSocket);
414 CommSession *session = new CommSession(channel, addr, masterServer, controlServer);
415 channel->decRefCount();
bf3b7f79 416
c7853753 417 if (!RegisterSession(session))
5039dede 418 {
c7853753 419 delete session;
5039dede
AK
420 }
421 else
422 {
da8453a5 423 DebugPrintf(9, _T("Session registered for %s"), buffer);
c7853753 424 session->run();
5039dede
AK
425 }
426 }
427 else // Unauthorized connection
428 {
da8453a5 429 g_rejectedConnections++;
5039dede
AK
430 shutdown(hClientSocket, SHUT_RDWR);
431 closesocket(hClientSocket);
da8453a5 432 DebugPrintf(5, _T("Connection from %s rejected"), buffer);
5039dede
AK
433 }
434 }
435 else if (nRet == -1)
436 {
437 int error = WSAGetLastError();
438
439 // On AIX, select() returns ENOENT after SIGINT for unknown reason
440#ifdef _WIN32
441 if (error != WSAEINTR)
442#else
443 if ((error != EINTR) && (error != ENOENT))
444#endif
445 {
446 nxlog_write(MSG_SELECT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
447 ThreadSleepMs(100);
448 }
449 }
450 }
451
452 // Wait for watchdog thread
c17f6cbc 453 MutexLock(m_mutexWatchdogActive);
5039dede
AK
454 MutexUnlock(m_mutexWatchdogActive);
455 MutexDestroy(m_mutexWatchdogActive);
456
5039dede 457 closesocket(hSocket);
13783551
VK
458#ifdef WITH_IPV6
459 closesocket(hSocket6);
460#endif
da8453a5 461 DebugPrintf(1, _T("Listener thread terminated"));
5039dede
AK
462 return THREAD_OK;
463}
464
1a93e64a
VK
465/**
466 * Session watchdog thread
467 */
5039dede
AK
468THREAD_RESULT THREAD_CALL SessionWatchdog(void *)
469{
5039dede 470 m_mutexWatchdogActive = MutexCreate();
c17f6cbc 471 MutexLock(m_mutexWatchdogActive);
5039dede 472
da8453a5 473 while(!AgentSleepAndCheckForShutdown(5000))
5039dede 474 {
c17f6cbc 475 MutexLock(g_hSessionListAccess);
5e60b759
VK
476 time_t now = time(NULL);
477 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
5039dede
AK
478 if (g_pSessionList[i] != NULL)
479 {
e4a64da2 480 if (g_pSessionList[i]->getTimeStamp() < (now - (time_t)g_dwIdleTimeout))
4685a2ad 481 {
da8453a5 482 g_pSessionList[i]->debugPrintf(4, _T("Session disconnected by watchdog (last activity timestamp is ") UINT64_FMT _T(")"), (UINT64)g_pSessionList[i]->getTimeStamp());
e4a64da2 483 g_pSessionList[i]->disconnect();
da8453a5 484 g_pSessionList[i] = NULL;
4685a2ad 485 }
5039dede
AK
486 }
487 MutexUnlock(g_hSessionListAccess);
488 }
489
490 // Disconnect all sessions
c17f6cbc 491 MutexLock(g_hSessionListAccess);
5e60b759 492 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
5039dede 493 if (g_pSessionList[i] != NULL)
e4a64da2 494 g_pSessionList[i]->disconnect();
5039dede
AK
495 MutexUnlock(g_hSessionListAccess);
496
497 ThreadSleep(1);
498 MutexUnlock(m_mutexWatchdogActive);
da8453a5 499 DebugPrintf(1, _T("Session Watchdog thread terminated"));
5039dede
AK
500
501 return THREAD_OK;
502}
503
5e60b759
VK
504/**
505 * Handler for Agent.ActiveConnections parameter
506 */
060c5a11 507LONG H_ActiveConnections(const TCHAR *pszCmd, const TCHAR *pArg, TCHAR *pValue, AbstractCommSession *session)
5039dede
AK
508{
509 int nCounter;
967893bb 510 UINT32 i;
5039dede 511
c17f6cbc 512 MutexLock(g_hSessionListAccess);
5039dede
AK
513 for(i = 0, nCounter = 0; i < g_dwMaxSessions; i++)
514 if (g_pSessionList[i] != NULL)
515 nCounter++;
516 MutexUnlock(g_hSessionListAccess);
517 ret_int(pValue, nCounter);
518 return SYSINFO_RC_SUCCESS;
519}