038a4fbfda2c420a31bf71a6a2c0964a2fba566b
[public/netxms.git] / src / agent / core / comm.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: comm.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25
26 //
27 // Global variables
28 //
29
30 UINT32 g_dwAcceptErrors = 0;
31 UINT32 g_dwAcceptedConnections = 0;
32 UINT32 g_dwRejectedConnections = 0;
33 CommSession **g_pSessionList = NULL;
34 MUTEX g_hSessionListAccess;
35
36 /**
37 * Static data
38 */
39 static MUTEX m_mutexWatchdogActive = INVALID_MUTEX_HANDLE;
40
41 /**
42 * Initialize session list
43 */
44 void InitSessionList()
45 {
46 // Create session list and it's access mutex
47 g_dwMaxSessions = min(max(g_dwMaxSessions, 2), 1024);
48 g_pSessionList = (CommSession **)malloc(sizeof(CommSession *) * g_dwMaxSessions);
49 memset(g_pSessionList, 0, sizeof(CommSession *) * g_dwMaxSessions);
50 g_hSessionListAccess = MutexCreate();
51 }
52
53 /**
54 * Validates server's address
55 */
56 static bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer)
57 {
58 for(int i = 0; i < g_serverList.size(); i++)
59 {
60 ServerInfo *s = g_serverList.get(i);
61 if (s->match(addr))
62 {
63 *pbMasterServer = s->isMaster();
64 *pbControlServer = s->isControl();
65 return true;
66 }
67 }
68 return false;
69 }
70
71 /**
72 * Register new session in list
73 */
74 static BOOL RegisterSession(CommSession *pSession)
75 {
76 UINT32 i;
77
78 MutexLock(g_hSessionListAccess);
79 for(i = 0; i < g_dwMaxSessions; i++)
80 if (g_pSessionList[i] == NULL)
81 {
82 g_pSessionList[i] = pSession;
83 pSession->setIndex(i);
84 MutexUnlock(g_hSessionListAccess);
85 return TRUE;
86 }
87
88 MutexUnlock(g_hSessionListAccess);
89 nxlog_write(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
90 return FALSE;
91 }
92
93 /**
94 * Unregister session
95 */
96 void UnregisterSession(UINT32 dwIndex)
97 {
98 MutexLock(g_hSessionListAccess);
99 g_pSessionList[dwIndex] = NULL;
100 MutexUnlock(g_hSessionListAccess);
101 }
102
103 /**
104 * TCP/IP Listener
105 */
106 THREAD_RESULT THREAD_CALL ListenerThread(void *)
107 {
108 // Create socket(s)
109 SOCKET hSocket = socket(AF_INET, SOCK_STREAM, 0);
110 #ifdef WITH_IPV6
111 SOCKET hSocket6 = socket(AF_INET6, SOCK_STREAM, 0);
112 #endif
113 if ((hSocket == INVALID_SOCKET)
114 #ifdef WITH_IPV6
115 && (hSocket6 == INVALID_SOCKET)
116 #endif
117 )
118 {
119 nxlog_write(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
120 exit(1);
121 }
122
123 SetSocketExclusiveAddrUse(hSocket);
124 SetSocketReuseFlag(hSocket);
125 #ifndef _WIN32
126 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
127 #endif
128
129 #ifdef WITH_IPV6
130 SetSocketExclusiveAddrUse(hSocket6);
131 SetSocketReuseFlag(hSocket6);
132 #ifndef _WIN32
133 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
134 #endif
135 #endif
136
137 // Fill in local address structure
138 struct sockaddr_in servAddr;
139 memset(&servAddr, 0, sizeof(struct sockaddr_in));
140 servAddr.sin_family = AF_INET;
141
142 #ifdef WITH_IPV6
143 struct sockaddr_in6 servAddr6;
144 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
145 servAddr6.sin6_family = AF_INET6;
146 #endif
147
148 if (!_tcscmp(g_szListenAddress, _T("*")))
149 {
150 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
151 #ifdef WITH_IPV6
152 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
153 #endif
154 }
155 else
156 {
157 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
158 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
159 {
160 servAddr.sin_addr.s_addr = bindAddress.getAddressV4();
161 }
162 else
163 {
164 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
165 }
166 #ifdef WITH_IPV6
167 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
168 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
169 {
170 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
171 }
172 else
173 {
174 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
175 servAddr6.sin6_addr.s6_addr[15] = 1;
176 }
177 #endif
178 }
179 servAddr.sin_port = htons(g_wListenPort);
180 #ifdef WITH_IPV6
181 servAddr6.sin6_port = htons(g_wListenPort);
182 #endif
183
184 // Bind socket
185 TCHAR buffer[64];
186 int bindFailures = 0;
187 DebugPrintf(INVALID_INDEX, 1, _T("Trying to bind on %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
188 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
189 {
190 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
191 bindFailures++;
192 }
193
194 #ifdef WITH_IPV6
195 DebugPrintf(INVALID_INDEX, 1, _T("Trying to bind on [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
196 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
197 {
198 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
199 bindFailures++;
200 }
201 #else
202 bindFailures++;
203 #endif
204
205 // Abort if cannot bind to socket
206 if (bindFailures == 2)
207 {
208 exit(1);
209 }
210
211 // Set up queue
212 listen(hSocket, SOMAXCONN);
213 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), g_wListenPort);
214 #ifdef WITH_IPV6
215 listen(hSocket6, SOMAXCONN);
216 nxlog_write(MSG_LISTENING, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, g_wListenPort);
217 #endif
218
219 // Wait for connection requests
220 int errorCount = 0;
221 while(!(g_dwFlags & AF_SHUTDOWN))
222 {
223 struct timeval tv;
224 tv.tv_sec = 1;
225 tv.tv_usec = 0;
226
227 fd_set rdfs;
228 FD_ZERO(&rdfs);
229 FD_SET(hSocket, &rdfs);
230 #ifdef WITH_IPV6
231 FD_SET(hSocket6, &rdfs);
232 #endif
233
234 #if defined(WITH_IPV6) && !defined(_WIN32)
235 int nRet = select(SELECT_NFDS(max(hSocket, hSocket6) + 1), &rdfs, NULL, NULL, &tv);
236 #else
237 int nRet = select(SELECT_NFDS(hSocket + 1), &rdfs, NULL, NULL, &tv);
238 #endif
239 if ((nRet > 0) && (!(g_dwFlags & AF_SHUTDOWN)))
240 {
241 char clientAddr[128];
242 socklen_t size = 128;
243 #ifdef WITH_IPV6
244 SOCKET hClientSocket = accept(FD_ISSET(hSocket, &rdfs) ? hSocket : hSocket6, (struct sockaddr *)clientAddr, &size);
245 #else
246 SOCKET hClientSocket = accept(hSocket, (struct sockaddr *)clientAddr, &size);
247 #endif
248 if (hClientSocket == INVALID_SOCKET)
249 {
250 int error = WSAGetLastError();
251
252 if (error != WSAEINTR)
253 nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
254 errorCount++;
255 g_dwAcceptErrors++;
256 if (errorCount > 1000)
257 {
258 nxlog_write(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
259 errorCount = 0;
260 }
261 ThreadSleepMs(500);
262 continue;
263 }
264
265 // Socket should be closed on successful exec
266 #ifndef _WIN32
267 fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
268 #endif
269
270 errorCount = 0; // Reset consecutive errors counter
271 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
272 DebugPrintf(INVALID_INDEX, 5, _T("Incoming connection from %s"), addr.toString(buffer));
273
274 bool masterServer, controlServer;
275 if (IsValidServerAddress(addr, &masterServer, &controlServer))
276 {
277 g_dwAcceptedConnections++;
278 DebugPrintf(INVALID_INDEX, 5, _T("Connection from %s accepted"), buffer);
279
280 // Create new session structure and threads
281 CommSession *session = new CommSession(hClientSocket, addr, masterServer, controlServer);
282
283 if (!RegisterSession(session))
284 {
285 delete session;
286 }
287 else
288 {
289 session->run();
290 }
291 }
292 else // Unauthorized connection
293 {
294 g_dwRejectedConnections++;
295 shutdown(hClientSocket, SHUT_RDWR);
296 closesocket(hClientSocket);
297 DebugPrintf(INVALID_INDEX, 5, _T("Connection from %s rejected"), buffer);
298 }
299 }
300 else if (nRet == -1)
301 {
302 int error = WSAGetLastError();
303
304 // On AIX, select() returns ENOENT after SIGINT for unknown reason
305 #ifdef _WIN32
306 if (error != WSAEINTR)
307 #else
308 if ((error != EINTR) && (error != ENOENT))
309 #endif
310 {
311 nxlog_write(MSG_SELECT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
312 ThreadSleepMs(100);
313 }
314 }
315 }
316
317 // Wait for watchdog thread
318 MutexLock(m_mutexWatchdogActive);
319 MutexUnlock(m_mutexWatchdogActive);
320 MutexDestroy(m_mutexWatchdogActive);
321
322 MutexDestroy(g_hSessionListAccess);
323 free(g_pSessionList);
324 closesocket(hSocket);
325 DebugPrintf(INVALID_INDEX, 1, _T("Listener thread terminated"));
326 return THREAD_OK;
327 }
328
329 /**
330 * Session watchdog thread
331 */
332 THREAD_RESULT THREAD_CALL SessionWatchdog(void *)
333 {
334 m_mutexWatchdogActive = MutexCreate();
335 MutexLock(m_mutexWatchdogActive);
336
337 ThreadSleep(5);
338 while(!(g_dwFlags & AF_SHUTDOWN))
339 {
340 MutexLock(g_hSessionListAccess);
341 time_t now = time(NULL);
342 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
343 if (g_pSessionList[i] != NULL)
344 {
345 if (g_pSessionList[i]->getTimeStamp() < (now - (time_t)g_dwIdleTimeout))
346 {
347 DebugPrintf(i, 5, _T("Session disconnected by watchdog (last activity timestamp is ") TIME_T_FMT _T(")"), g_pSessionList[i]->getTimeStamp());
348 g_pSessionList[i]->disconnect();
349 }
350 }
351 MutexUnlock(g_hSessionListAccess);
352 ThreadSleep(5);
353 }
354
355 // Disconnect all sessions
356 MutexLock(g_hSessionListAccess);
357 for(UINT32 i = 0; i < g_dwMaxSessions; i++)
358 if (g_pSessionList[i] != NULL)
359 g_pSessionList[i]->disconnect();
360 MutexUnlock(g_hSessionListAccess);
361
362 ThreadSleep(1);
363 MutexUnlock(m_mutexWatchdogActive);
364 DebugPrintf(INVALID_INDEX, 1, _T("Session Watchdog thread terminated"));
365
366 return THREAD_OK;
367 }
368
369 /**
370 * Handler for Agent.ActiveConnections parameter
371 */
372 LONG H_ActiveConnections(const TCHAR *pszCmd, const TCHAR *pArg, TCHAR *pValue)
373 {
374 int nCounter;
375 UINT32 i;
376
377 MutexLock(g_hSessionListAccess);
378 for(i = 0, nCounter = 0; i < g_dwMaxSessions; i++)
379 if (g_pSessionList[i] != NULL)
380 nCounter++;
381 MutexUnlock(g_hSessionListAccess);
382 ret_int(pValue, nCounter);
383 return SYSINFO_RC_SUCCESS;
384 }