2e6b954c69c275fa443b12de51dce738c184438e
[public/netxms.git] / src / libnxcc / comm.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published by
7 ** the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: comm.cpp
20 **
21 **/
22
23 #include "libnxcc.h"
24
25 /**
26 * Externals
27 */
28 void ClusterNodeJoin(void *arg);
29 void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg);
30
31 /**
32 * Keepalive interval
33 */
34 #define KEEPALIVE_INTERVAL 200
35
36 /**
37 * Thread handles
38 */
39 static THREAD s_listenerThread = INVALID_THREAD_HANDLE;
40 static THREAD s_connectorThread = INVALID_THREAD_HANDLE;
41 static THREAD s_keepaliveThread = INVALID_THREAD_HANDLE;
42
43 /**
44 * Command ID
45 */
46 static VolatileCounter s_commandId = 1;
47
48 /**
49 * Join condition
50 */
51 static CONDITION s_joinCondition = ConditionCreate(TRUE);
52
53 /**
54 * Mark as joined
55 */
56 void SetJoinCondition()
57 {
58 ConditionSet(s_joinCondition);
59 }
60
61 /**
62 * Process cluster notification
63 */
64 static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificationCode code)
65 {
66 ClusterDebug(4, _T("ProcessClusterNotification: code %d from node %d [%s]"), code, node->m_id, (const TCHAR *)node->m_addr->toString());
67 switch(code)
68 {
69 case CN_NEW_MASTER:
70 ClusterDebug(3, _T("Node %d [%s] is new master"), node->m_id, (const TCHAR *)node->m_addr->toString());
71 node->m_master = true;
72 ChangeClusterNodeState(node, CLUSTER_NODE_UP);
73 ConditionSet(s_joinCondition);
74 break;
75 case CN_NODE_RUNNING:
76 ChangeClusterNodeState(node, CLUSTER_NODE_UP);
77 break;
78 }
79 }
80
81 /**
82 * Receiver thread stop data
83 */
84 struct ReceiverThreadStopData
85 {
86 ClusterNodeInfo *node;
87 SOCKET s;
88 };
89
90 /**
91 * Shutdown callback for receiver thread
92 */
93 static void ReceiverThreadShutdownCB(void *arg)
94 {
95 ReceiverThreadStopData *data = (ReceiverThreadStopData *)arg;
96
97 MutexLock(data->node->m_mutex);
98 if (data->node->m_socket == data->s)
99 {
100 shutdown(data->s, SHUT_RDWR);
101 data->node->m_socket = INVALID_SOCKET;
102 ChangeClusterNodeState(data->node, CLUSTER_NODE_DOWN);
103 }
104 MutexUnlock(data->node->m_mutex);
105 free(data);
106 ClusterDebug(6, _T("Cluster receiver thread shutdown callback completed"));
107 }
108
109 /**
110 * Node receiver thread
111 */
112 static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
113 {
114 ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
115 SOCKET s = node->m_socket;
116 ClusterDebug(5, _T("Receiver thread started for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
117
118 SocketMessageReceiver receiver(s, 8192, 4194304);
119
120 while(!g_nxccShutdown)
121 {
122 MutexLock(node->m_mutex);
123 SOCKET cs = node->m_socket;
124 MutexUnlock(node->m_mutex);
125
126 if (cs != s)
127 break; // socket was changed
128
129 MessageReceiverResult result;
130 NXCPMessage *msg = receiver.readMessage(KEEPALIVE_INTERVAL * 3, &result);
131 if (msg != NULL)
132 {
133 if (msg->getCode() != CMD_KEEPALIVE)
134 {
135 TCHAR buffer[128];
136 ClusterDebug(7, _T("ClusterReceiverThread: message %s from node %d [%s]"),
137 NXCPMessageCodeName(msg->getCode(), buffer), node->m_id, (const TCHAR *)node->m_addr->toString());
138 }
139
140 switch(msg->getCode())
141 {
142 case CMD_CLUSTER_NOTIFY:
143 ProcessClusterNotification(node, (ClusterNotificationCode)msg->getFieldAsInt16(VID_NOTIFICATION_CODE));
144 break;
145 case CMD_JOIN_CLUSTER:
146 ProcessClusterJoinRequest(node, msg);
147 delete msg;
148 break;
149 case CMD_KEEPALIVE:
150 delete msg;
151 break;
152 default:
153 if (g_nxccEventHandler->onMessage(msg, node->m_id))
154 delete msg;
155 else
156 node->m_msgWaitQueue->put(msg);
157 break;
158 }
159 }
160 else
161 {
162 ClusterDebug(5, _T("Receiver error for cluster node %d [%s] on socket %d: %s"),
163 node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s, AbstractMessageReceiver::resultToText(result));
164 ReceiverThreadStopData *data = (ReceiverThreadStopData *)malloc(sizeof(ReceiverThreadStopData));
165 data->node = node;
166 data->s = s;
167 ThreadPoolExecute(g_nxccThreadPool, ReceiverThreadShutdownCB, data);
168 break;
169 }
170 }
171
172 closesocket(s);
173 ClusterDebug(5, _T("Receiver thread stopped for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
174 return THREAD_OK;
175 }
176
177 /**
178 * Find cluster node by IP
179 */
180 static int FindClusterNode(const InetAddress& addr)
181 {
182 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
183 if ((g_nxccNodes[i].m_id != 0) && g_nxccNodes[i].m_addr->equals(addr))
184 return i;
185 return -1;
186 }
187
188 /**
189 * Find cluster node by ID
190 */
191 static int FindClusterNode(UINT32 id)
192 {
193 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
194 if (g_nxccNodes[i].m_id == id)
195 return i;
196 return -1;
197 }
198
199 /**
200 * Change cluster node state
201 */
202 void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
203 {
204 static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("SYNC"), _T("UP") };
205
206 if (node->m_state == state)
207 return;
208
209 node->m_state = state;
210 ClusterDebug(1, _T("Cluster node %d [%s] changed state to %s"), node->m_id, (const TCHAR *)node->m_addr->toString(), stateNames[state]);
211 switch(state)
212 {
213 case CLUSTER_NODE_CONNECTED:
214 node->m_receiverThread = ThreadCreateEx(ClusterReceiverThread, 0, node);
215 if (node->m_id < g_nxccNodeId)
216 ThreadPoolExecute(g_nxccThreadPool, ClusterNodeJoin, node);
217 break;
218 case CLUSTER_NODE_DOWN:
219 ThreadJoin(node->m_receiverThread);
220 node->m_receiverThread = INVALID_THREAD_HANDLE;
221 g_nxccEventHandler->onNodeDisconnect(node->m_id);
222 if (node->m_master)
223 {
224 node->m_master = false;
225 PromoteClusterNode();
226 }
227 break;
228 case CLUSTER_NODE_SYNC:
229 g_nxccEventHandler->onNodeJoin(node->m_id);
230 break;
231 case CLUSTER_NODE_UP:
232 g_nxccEventHandler->onNodeUp(node->m_id);
233 break;
234 }
235 }
236
237 /**
238 * Listener thread
239 */
240 static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
241 {
242 SOCKET s = CAST_FROM_POINTER(arg, SOCKET);
243 while(!g_nxccShutdown)
244 {
245 struct timeval tv;
246 tv.tv_sec = 1;
247 tv.tv_usec = 0;
248
249 fd_set rdfs;
250 FD_ZERO(&rdfs);
251 FD_SET(s, &rdfs);
252 int rc = select(SELECT_NFDS(s + 1), &rdfs, NULL, NULL, &tv);
253 if ((rc > 0) && !g_nxccShutdown)
254 {
255 char clientAddr[128];
256 socklen_t size = 128;
257 SOCKET in = accept(s, (struct sockaddr *)clientAddr, &size);
258 if (in == INVALID_SOCKET)
259 {
260 ClusterDebug(5, _T("ClusterListenerThread: accept() failure"));
261 continue;
262 }
263
264 #ifndef _WIN32
265 fcntl(in, F_SETFD, fcntl(in, F_GETFD) | FD_CLOEXEC);
266 #endif
267
268 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
269 ClusterDebug(5, _T("ClusterListenerThread: incoming connection from %s"), (const TCHAR *)addr.toString());
270
271 int idx = FindClusterNode(addr);
272 if (idx == -1)
273 {
274 ClusterDebug(5, _T("ClusterListenerThread: incoming connection rejected (unknown IP address)"));
275 closesocket(in);
276 continue;
277 }
278
279 MutexLock(g_nxccNodes[idx].m_mutex);
280 if (g_nxccNodes[idx].m_socket == INVALID_SOCKET)
281 {
282 g_nxccNodes[idx].m_socket = in;
283 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
284 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
285 ChangeClusterNodeState(&g_nxccNodes[idx], CLUSTER_NODE_CONNECTED);
286 }
287 else
288 {
289 ClusterDebug(5, _T("Cluster connection from peer %d [%s] discarded because connection already present"),
290 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
291 closesocket(in);
292 }
293 MutexUnlock(g_nxccNodes[idx].m_mutex);
294 }
295 }
296
297 closesocket(s);
298 ClusterDebug(1, _T("Cluster listener thread stopped"));
299 return THREAD_OK;
300 }
301
302 /**
303 * Connector thread
304 */
305 static THREAD_RESULT THREAD_CALL ClusterConnectorThread(void *arg)
306 {
307 ClusterDebug(1, _T("Cluster connector thread started"));
308
309 while(!g_nxccShutdown)
310 {
311 ThreadSleepMs(500);
312 if (g_nxccShutdown)
313 break;
314
315 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
316 {
317 MutexLock(g_nxccNodes[i].m_mutex);
318 if ((g_nxccNodes[i].m_id != 0) && (g_nxccNodes[i].m_socket == INVALID_SOCKET))
319 {
320 MutexUnlock(g_nxccNodes[i].m_mutex);
321 SOCKET s = ConnectToHost(*g_nxccNodes[i].m_addr, g_nxccNodes[i].m_port, 500);
322 MutexLock(g_nxccNodes[i].m_mutex);
323 if (s != INVALID_SOCKET)
324 {
325 if (g_nxccNodes[i].m_socket == INVALID_SOCKET)
326 {
327 g_nxccNodes[i].m_socket = s;
328 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
329 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
330 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_CONNECTED);
331 }
332 else
333 {
334 ClusterDebug(5, _T("Cluster connection established with peer %d [%s] but discarded because connection already present"),
335 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
336 closesocket(s);
337 }
338 }
339 }
340 MutexUnlock(g_nxccNodes[i].m_mutex);
341 }
342 }
343
344 ClusterDebug(1, _T("Cluster connector thread stopped"));
345 return THREAD_OK;
346 }
347
348 /**
349 * Cluster keepalive thread
350 */
351 static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
352 {
353 ClusterDebug(1, _T("Cluster keepalive thread started"));
354
355 NXCPMessage msg;
356 msg.setCode(CMD_KEEPALIVE);
357 msg.setField(VID_NODE_ID, g_nxccNodeId);
358 NXCP_MESSAGE *rawMsg = msg.createMessage();
359
360 while(!g_nxccShutdown)
361 {
362 ThreadSleepMs(KEEPALIVE_INTERVAL);
363 if (g_nxccShutdown)
364 break;
365
366 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
367 {
368 if (g_nxccNodes[i].m_id == 0)
369 continue; // empty slot
370
371 MutexLock(g_nxccNodes[i].m_mutex);
372 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
373 {
374 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
375 {
376 ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
377 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
378 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
379 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
380 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
381 }
382 }
383 MutexUnlock(g_nxccNodes[i].m_mutex);
384 }
385 }
386
387 ClusterDebug(1, _T("Cluster keepalive thread stopped"));
388 return THREAD_OK;
389 }
390
391 /**
392 * Send message to cluster node
393 */
394 void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg)
395 {
396 NXCP_MESSAGE *rawMsg = msg->createMessage();
397 MutexLock(node->m_mutex);
398 if (node->m_socket != INVALID_SOCKET)
399 {
400 if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
401 {
402 ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
403 shutdown(node->m_socket, SHUT_RDWR);
404 node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
405 ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
406 }
407 }
408 MutexUnlock(node->m_mutex);
409 free(rawMsg);
410 }
411
412 /**
413 * Send notification to all connected nodes
414 */
415 void LIBNXCC_EXPORTABLE ClusterNotify(INT16 code)
416 {
417 NXCPMessage msg;
418 msg.setCode(CMD_CLUSTER_NOTIFY);
419 msg.setId((UINT32)InterlockedIncrement(&s_commandId));
420 msg.setField(VID_NOTIFICATION_CODE, code);
421 msg.setField(VID_NODE_ID, g_nxccNodeId);
422 msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
423 ClusterNotify(&msg);
424 }
425
426 /**
427 * Send notification to all connected nodes
428 */
429 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
430 {
431 NXCP_MESSAGE *rawMsg = msg->createMessage();
432
433 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
434 {
435 if (g_nxccNodes[i].m_id == 0)
436 continue; // empty slot
437
438 MutexLock(g_nxccNodes[i].m_mutex);
439 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
440 {
441 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
442 {
443 ClusterDebug(5, _T("ClusterNotify: send failed for peer %d [%s]"),
444 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
445 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
446 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
447 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
448 }
449 }
450 MutexUnlock(g_nxccNodes[i].m_mutex);
451 }
452
453 free(rawMsg);
454 }
455
456 /**
457 * Direct notify with just notification code
458 */
459 void LIBNXCC_EXPORTABLE ClusterDirectNotify(UINT32 nodeId, INT16 code)
460 {
461 int index = FindClusterNode(nodeId);
462 if (index != -1)
463 ClusterDirectNotify(&g_nxccNodes[index], code);
464 }
465
466 /**
467 * Direct notify with just notification code
468 */
469 void ClusterDirectNotify(ClusterNodeInfo *node, INT16 code)
470 {
471 NXCPMessage msg;
472 msg.setCode(CMD_CLUSTER_NOTIFY);
473 msg.setId((UINT32)InterlockedIncrement(&s_commandId));
474 msg.setField(VID_NOTIFICATION_CODE, code);
475 msg.setField(VID_NODE_ID, g_nxccNodeId);
476 msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
477 ClusterSendMessage(node, &msg);
478 }
479
480 /**
481 * Send command to all connected nodes and wait for response
482 *
483 * @return number of execution errors
484 */
485 int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg)
486 {
487 UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
488 msg->setId(requestId);
489 NXCP_MESSAGE *rawMsg = msg->createMessage();
490
491 bool waitFlags[CLUSTER_MAX_NODE_ID];
492 memset(waitFlags, 0, sizeof(waitFlags));
493
494 int errors = 0;
495 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
496 {
497 if (g_nxccNodes[i].m_id == 0)
498 continue; // empty slot
499
500 MutexLock(g_nxccNodes[i].m_mutex);
501 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
502 {
503 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
504 {
505 waitFlags[i] = true;
506 }
507 else
508 {
509 ClusterDebug(5, _T("ClusterCommand: send failed for peer %d [%s]"),
510 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
511 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
512 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
513 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
514 errors++;
515 }
516 }
517 MutexUnlock(g_nxccNodes[i].m_mutex);
518 }
519
520 free(rawMsg);
521
522 // Collect responses
523 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
524 {
525 if (!waitFlags[i])
526 continue;
527 NXCPMessage *response = g_nxccNodes[i].m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
528 if (response != NULL)
529 {
530 UINT32 rcc = response->getFieldAsInt32(VID_RCC);
531 if (rcc != NXCC_RCC_SUCCESS)
532 {
533 ClusterDebug(5, _T("ClusterCommand: failed request to peer %d [%s] RCC=%d"),
534 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString(), rcc);
535 errors++;
536 }
537 delete response;
538 }
539 else
540 {
541 ClusterDebug(5, _T("ClusterCommand: timed out request to peer %d [%s]"),
542 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
543 errors++;
544 }
545 }
546
547 return errors;
548 }
549
550 /**
551 * Send command to specific node and wait for response
552 *
553 * @return request completion code
554 */
555 UINT32 LIBNXCC_EXPORTABLE ClusterSendDirectCommand(UINT32 nodeId, NXCPMessage *msg)
556 {
557 NXCPMessage *response = ClusterSendDirectCommandEx(nodeId, msg);
558 if (response == NULL)
559 return NXCC_RCC_TIMEOUT;
560
561 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
562 if (rcc != 0)
563 {
564 ClusterDebug(5, _T("ClusterDirectCommand: failed request to peer %d: rcc=%d"), nodeId, rcc);
565 }
566 delete response;
567 return rcc;
568 }
569
570 /**
571 * Send command to specific node and wait for response
572 *
573 * @return request completion code
574 */
575 NXCPMessage LIBNXCC_EXPORTABLE *ClusterSendDirectCommandEx(UINT32 nodeId, NXCPMessage *msg)
576 {
577 int index = FindClusterNode(nodeId);
578 if (index == -1)
579 {
580 NXCPMessage *response = new NXCPMessage();
581 response->setField(VID_RCC, NXCC_RCC_INVALID_NODE);
582 return response;
583 }
584
585 ClusterNodeInfo *node = &g_nxccNodes[index];
586
587 UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
588 msg->setId(requestId);
589 NXCP_MESSAGE *rawMsg = msg->createMessage();
590
591 bool sent = false;
592 MutexLock(node->m_mutex);
593 if (node->m_socket != INVALID_SOCKET)
594 {
595 if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
596 {
597 sent = true;
598 }
599 else
600 {
601 ClusterDebug(5, _T("ClusterDirectCommand: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
602 shutdown(node->m_socket, SHUT_RDWR);
603 node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
604 ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
605 }
606 }
607 MutexUnlock(node->m_mutex);
608
609 free(rawMsg);
610
611 // Wait for responses
612 NXCPMessage *response;
613 if (sent)
614 {
615 response = node->m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
616 }
617 else
618 {
619 response = new NXCPMessage();
620 response->setField(VID_RCC, NXCC_RCC_COMM_FAILURE);
621 }
622
623 return response;
624 }
625
626 /**
627 * Send response to cluster peer
628 */
629 void LIBNXCC_EXPORTABLE ClusterSendResponse(UINT32 nodeId, UINT32 requestId, UINT32 rcc)
630 {
631 int index = FindClusterNode(nodeId);
632 if (index == -1)
633 return;
634
635 ClusterNodeInfo *node = &g_nxccNodes[index];
636
637 NXCPMessage msg;
638 msg.setCode(CMD_REQUEST_COMPLETED);
639 msg.setId(requestId);
640
641 ClusterSendMessage(node, &msg);
642 }
643
644 /**
645 * Send response to cluster peer
646 */
647 void LIBNXCC_EXPORTABLE ClusterSendResponseEx(UINT32 nodeId, UINT32 requestId, NXCPMessage *msg)
648 {
649 int index = FindClusterNode(nodeId);
650 if (index == -1)
651 return;
652
653 ClusterNodeInfo *node = &g_nxccNodes[index];
654
655 msg->setCode(CMD_REQUEST_COMPLETED);
656 msg->setId(requestId);
657
658 ClusterSendMessage(node, msg);
659 }
660
661 /**
662 * Check if all cluster nodes connected
663 */
664 bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected()
665 {
666 int total = 0, connected = 0;
667 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
668 if (g_nxccNodes[i].m_id > 0)
669 {
670 total++;
671 if (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED)
672 connected++;
673 }
674 return total == connected;
675 }
676
677 /**
678 * Join cluster
679 *
680 * @return true on successful join
681 */
682 bool LIBNXCC_EXPORTABLE ClusterJoin()
683 {
684 if (!g_nxccInitialized)
685 return false;
686
687 SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
688 if (s == INVALID_SOCKET)
689 {
690 ClusterDebug(1, _T("ClusterJoin: cannot create socket"));
691 return false;
692 }
693
694 SetSocketExclusiveAddrUse(s);
695 SetSocketReuseFlag(s);
696
697 struct sockaddr_in servAddr;
698 memset(&servAddr, 0, sizeof(struct sockaddr_in));
699 servAddr.sin_family = AF_INET;
700 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
701 servAddr.sin_port = htons((UINT16)(47000 + g_nxccNodeId));
702 if (bind(s, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
703 {
704 ClusterDebug(1, _T("ClusterJoin: cannot bind listening socket (%s)"), _tcserror(WSAGetLastError()));
705 closesocket(s);
706 return false;
707 }
708
709 if (listen(s, SOMAXCONN) == 0)
710 {
711 ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)ntohs(servAddr.sin_port));
712 }
713 else
714 {
715 ClusterDebug(1, _T("ClusterJoin: listen() failed (%s)"), _tcserror(WSAGetLastError()));
716 closesocket(s);
717 return false;
718 }
719
720 s_listenerThread = ThreadCreateEx(ClusterListenerThread, 0, CAST_TO_POINTER(s, void *));
721 s_connectorThread = ThreadCreateEx(ClusterConnectorThread, 0, NULL);
722 s_keepaliveThread = ThreadCreateEx(ClusterKeepaliveThread, 0, NULL);
723
724 ClusterDebug(1, _T("ClusterJoin: waiting for other nodes"));
725 if (ConditionWait(s_joinCondition, 60000)) // wait 1 minute for other nodes to join
726 {
727 ClusterDebug(1, _T("ClusterJoin: successfully joined"));
728 }
729 else
730 {
731 // no other nodes, declare self as master
732 ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
733 PromoteClusterNode();
734 }
735
736 return true;
737 }
738
739 /**
740 * Disconnect all sockets
741 */
742 void ClusterDisconnect()
743 {
744 ThreadJoin(s_listenerThread);
745 ThreadJoin(s_connectorThread);
746 ThreadJoin(s_keepaliveThread);
747 }
748
749 /**
750 * Set current node as running
751 */
752 void LIBNXCC_EXPORTABLE ClusterSetRunning()
753 {
754 g_nxccNeedSync = false;
755 ClusterNotify(CN_NODE_RUNNING);
756 }