fixed broken cached data collection for DCIs with default interval
[public/netxms.git] / src / libnxcc / comm.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published by
7 ** the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: comm.cpp
20 **
21 **/
22
23 #include "libnxcc.h"
24
25 /**
26 * Keepalive interval
27 */
28 #define KEEPALIVE_INTERVAL 200
29
30 /**
31 * Thread handles
32 */
33 static THREAD s_listenerThread = INVALID_THREAD_HANDLE;
34 static THREAD s_connectorThread = INVALID_THREAD_HANDLE;
35 static THREAD s_keepaliveThread = INVALID_THREAD_HANDLE;
36
37 /**
38 * Join condition
39 */
40 static CONDITION s_joinCondition = ConditionCreate(TRUE);
41
42 /**
43 * Change cluster node state
44 */
45 static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
46
47 /**
48 * Node receiver thread
49 */
50 static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
51 {
52 ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
53 SOCKET s = node->m_socket;
54 ClusterDebug(5, _T("Receiver thread started for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
55
56 SocketMessageReceiver receiver(s, 8192, 4194304);
57
58 while(!g_nxccShutdown)
59 {
60 MutexLock(node->m_mutex);
61 SOCKET cs = node->m_socket;
62 MutexUnlock(node->m_mutex);
63
64 if (cs != s)
65 break; // socket was changed
66
67 MessageReceiverResult result;
68 NXCPMessage *msg = receiver.readMessage(KEEPALIVE_INTERVAL * 3, &result);
69 if (msg != NULL)
70 {
71 g_nxccEventHandler->onMessage(msg, node->m_id);
72 delete msg;
73 }
74 else
75 {
76 ClusterDebug(5, _T("Receiver error for cluster node %d [%s] on socket %d: %s"),
77 node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s, AbstractMessageReceiver::resultToText(result));
78 MutexLock(node->m_mutex);
79 if (node->m_socket == s)
80 {
81 shutdown(s, SHUT_RDWR);
82 node->m_socket = INVALID_SOCKET;
83 ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
84 }
85 MutexUnlock(node->m_mutex);
86 }
87 }
88
89 closesocket(s);
90 ClusterDebug(5, _T("Receiver thread stopped for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
91 return THREAD_OK;
92 }
93
94 /**
95 * Find cluster node by IP
96 */
97 static int FindClusterNode(const InetAddress& addr)
98 {
99 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
100 if ((g_nxccNodes[i].m_id != 0) && g_nxccNodes[i].m_addr->equals(addr))
101 return i;
102 return -1;
103 }
104
105 /**
106 * Change cluster node state
107 */
108 static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
109 {
110 static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("UP") };
111
112 if (node->m_state == state)
113 return;
114
115 node->m_state = state;
116 ClusterDebug(1, _T("Cluster node %d [%s] changed state to %s"), node->m_id, (const TCHAR *)node->m_addr->toString(), stateNames[state]);
117 switch(state)
118 {
119 case CLUSTER_NODE_CONNECTED:
120 node->m_receiverThread = ThreadCreateEx(ClusterReceiverThread, 0, node);
121 break;
122 case CLUSTER_NODE_DOWN:
123 ThreadJoin(node->m_receiverThread);
124 node->m_receiverThread = INVALID_THREAD_HANDLE;
125 g_nxccEventHandler->onNodeDisconnect(node->m_id);
126 break;
127 case CLUSTER_NODE_UP:
128 g_nxccEventHandler->onNodeJoin(node->m_id);
129 break;
130 }
131 }
132
133 /**
134 * Listener thread
135 */
136 static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
137 {
138 SOCKET s = CAST_FROM_POINTER(arg, SOCKET);
139 while(!g_nxccShutdown)
140 {
141 struct timeval tv;
142 tv.tv_sec = 1;
143 tv.tv_usec = 0;
144
145 fd_set rdfs;
146 FD_ZERO(&rdfs);
147 FD_SET(s, &rdfs);
148 int rc = select(SELECT_NFDS(s + 1), &rdfs, NULL, NULL, &tv);
149 if ((rc > 0) && !g_nxccShutdown)
150 {
151 char clientAddr[128];
152 socklen_t size = 128;
153 SOCKET in = accept(s, (struct sockaddr *)clientAddr, &size);
154 if (in == INVALID_SOCKET)
155 {
156 ClusterDebug(5, _T("ClusterListenerThread: accept() failure"));
157 continue;
158 }
159
160 #ifndef _WIN32
161 fcntl(in, F_SETFD, fcntl(in, F_GETFD) | FD_CLOEXEC);
162 #endif
163
164 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
165 ClusterDebug(5, _T("Incoming connection from %s"), (const TCHAR *)addr.toString());
166
167 int idx = FindClusterNode(addr);
168 if (idx == -1)
169 {
170 ClusterDebug(5, _T("ClusterListenerThread: incoming connection rejected (unknown IP address)"));
171 closesocket(in);
172 continue;
173 }
174
175 MutexLock(g_nxccNodes[idx].m_mutex);
176 if (g_nxccNodes[idx].m_socket == INVALID_SOCKET)
177 {
178 g_nxccNodes[idx].m_socket = in;
179 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
180 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
181 ChangeClusterNodeState(&g_nxccNodes[idx], CLUSTER_NODE_CONNECTED);
182 }
183 else
184 {
185 ClusterDebug(5, _T("Cluster connection from peer %d [%s] discarded because connection already present"),
186 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
187 closesocket(s);
188 }
189 MutexUnlock(g_nxccNodes[idx].m_mutex);
190 }
191 }
192
193 closesocket(s);
194 ClusterDebug(1, _T("Cluster listener thread stopped"));
195 return THREAD_OK;
196 }
197
198 /**
199 * Connector thread
200 */
201 static THREAD_RESULT THREAD_CALL ClusterConnectorThread(void *arg)
202 {
203 ClusterDebug(1, _T("Cluster connector thread started"));
204
205 while(!g_nxccShutdown)
206 {
207 ThreadSleepMs(500);
208 if (g_nxccShutdown)
209 break;
210
211 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
212 {
213 MutexLock(g_nxccNodes[i].m_mutex);
214 if ((g_nxccNodes[i].m_id != 0) && (g_nxccNodes[i].m_socket == INVALID_SOCKET))
215 {
216 MutexUnlock(g_nxccNodes[i].m_mutex);
217 SOCKET s = ConnectToHost(*g_nxccNodes[i].m_addr, g_nxccNodes[i].m_port, 500);
218 MutexLock(g_nxccNodes[i].m_mutex);
219 if (s != INVALID_SOCKET)
220 {
221 if (g_nxccNodes[i].m_socket == INVALID_SOCKET)
222 {
223 g_nxccNodes[i].m_socket = s;
224 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
225 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
226 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_CONNECTED);
227 }
228 else
229 {
230 ClusterDebug(5, _T("Cluster connection established with peer %d [%s] but discarded because connection already present"),
231 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
232 closesocket(s);
233 }
234 }
235 }
236 MutexUnlock(g_nxccNodes[i].m_mutex);
237 }
238 }
239
240 ClusterDebug(1, _T("Cluster connector thread stopped"));
241 return THREAD_OK;
242 }
243
244 /**
245 * Cluster keepalive thread
246 */
247 static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
248 {
249 ClusterDebug(1, _T("Cluster keepalive thread started"));
250
251 NXCPMessage msg;
252 msg.setCode(CMD_KEEPALIVE);
253 msg.setField(VID_NODE_ID, g_nxccNodeId);
254 NXCP_MESSAGE *rawMsg = msg.createMessage();
255
256 while(!g_nxccShutdown)
257 {
258 ThreadSleepMs(KEEPALIVE_INTERVAL);
259 if (g_nxccShutdown)
260 break;
261
262 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
263 {
264 if (g_nxccNodes[i].m_id == 0)
265 continue; // empty slot
266
267 MutexLock(g_nxccNodes[i].m_mutex);
268 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
269 {
270 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
271 {
272 ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
273 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
274 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
275 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
276 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
277 }
278 }
279 MutexUnlock(g_nxccNodes[i].m_mutex);
280 }
281 }
282
283 ClusterDebug(1, _T("Cluster keepalive thread stopped"));
284 return THREAD_OK;
285 }
286
287 /**
288 * Join cluster
289 *
290 * @return true on successful join
291 */
292 bool LIBNXCC_EXPORTABLE ClusterJoin()
293 {
294 if (!g_nxccInitialized)
295 return false;
296
297 SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
298 if (s == INVALID_SOCKET)
299 {
300 ClusterDebug(1, _T("ClusterJoin: cannot create socket"));
301 return false;
302 }
303
304 SetSocketExclusiveAddrUse(s);
305 SetSocketReuseFlag(s);
306
307 struct sockaddr_in servAddr;
308 memset(&servAddr, 0, sizeof(struct sockaddr_in));
309 servAddr.sin_family = AF_INET;
310 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
311 servAddr.sin_port = htons((UINT16)(47000 + g_nxccNodeId));
312 if (bind(s, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
313 {
314 ClusterDebug(1, _T("ClusterJoin: cannot bind listening socket (%s)"), _tcserror(WSAGetLastError()));
315 closesocket(s);
316 return false;
317 }
318
319 if (listen(s, SOMAXCONN) == 0)
320 {
321 ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)g_nxccListenPort);
322 }
323 else
324 {
325 ClusterDebug(1, _T("ClusterJoin: listen() failed (%s)"), _tcserror(WSAGetLastError()));
326 closesocket(s);
327 return false;
328 }
329
330 s_listenerThread = ThreadCreateEx(ClusterListenerThread, 0, CAST_TO_POINTER(s, void *));
331 s_connectorThread = ThreadCreateEx(ClusterConnectorThread, 0, NULL);
332 s_keepaliveThread = ThreadCreateEx(ClusterKeepaliveThread, 0, NULL);
333
334 if (ConditionWait(s_joinCondition, 60000)) // wait 1 minute for other nodes to join
335 {
336
337 }
338 else
339 {
340 // no other nodes, declare self as master
341 g_nxccMasterNode = true;
342 ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
343 }
344
345 return true;
346 }
347
348 /**
349 * Disconnect all sockets
350 */
351 void ClusterDisconnect()
352 {
353 ThreadJoin(s_listenerThread);
354 ThreadJoin(s_connectorThread);
355 ThreadJoin(s_keepaliveThread);
356 }
357
358 /**
359 * Send notification to all connected nodes
360 */
361 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
362 {
363 NXCP_MESSAGE *rawMsg = msg->createMessage();
364
365 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
366 {
367 if (g_nxccNodes[i].m_id == 0)
368 continue; // empty slot
369
370 MutexLock(g_nxccNodes[i].m_mutex);
371 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
372 {
373 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
374 {
375 ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
376 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
377 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
378 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
379 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
380 }
381 }
382 MutexUnlock(g_nxccNodes[i].m_mutex);
383 }
384
385 free(rawMsg);
386 }