libnxcc: correct handling of CMD_REQUEST_COMPLETED
[public/netxms.git] / src / libnxcc / comm.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published by
7 ** the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: comm.cpp
20 **
21 **/
22
23 #include "libnxcc.h"
24
25 /**
26 * Externals
27 */
28 void ClusterNodeJoin(void *arg);
29 void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg);
30
31 /**
32 * Keepalive interval
33 */
34 #define KEEPALIVE_INTERVAL 200
35
36 /**
37 * Thread handles
38 */
39 static THREAD s_listenerThread = INVALID_THREAD_HANDLE;
40 static THREAD s_connectorThread = INVALID_THREAD_HANDLE;
41 static THREAD s_keepaliveThread = INVALID_THREAD_HANDLE;
42
43 /**
44 * Command ID
45 */
46 static VolatileCounter s_commandId = 1;
47
48 /**
49 * Join condition
50 */
51 static CONDITION s_joinCondition = ConditionCreate(TRUE);
52
53 /**
54 * Mark as joined
55 */
56 void SetJoinCondition()
57 {
58 ConditionSet(s_joinCondition);
59 }
60
61 /**
62 * Process cluster notification
63 */
64 static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificationCode code)
65 {
66 ClusterDebug(4, _T("ProcessClusterNotification: code %d from node %d [%s]"), code, node->m_id, (const TCHAR *)node->m_addr->toString());
67 switch(code)
68 {
69 case CN_NEW_MASTER:
70 ClusterDebug(3, _T("Node %d [%s] is new master"), node->m_id, (const TCHAR *)node->m_addr->toString());
71 node->m_master = true;
72 ChangeClusterNodeState(node, CLUSTER_NODE_UP);
73 ConditionSet(s_joinCondition);
74 break;
75 case CN_NODE_RUNNING:
76 ChangeClusterNodeState(node, CLUSTER_NODE_UP);
77 break;
78 }
79 }
80
81 /**
82 * Receiver thread stop data
83 */
84 struct ReceiverThreadStopData
85 {
86 ClusterNodeInfo *node;
87 SOCKET s;
88 };
89
90 /**
91 * Shutdown callback for receiver thread
92 */
93 static void ReceiverThreadShutdownCB(void *arg)
94 {
95 ReceiverThreadStopData *data = (ReceiverThreadStopData *)arg;
96
97 MutexLock(data->node->m_mutex);
98 if (data->node->m_socket == data->s)
99 {
100 shutdown(data->s, SHUT_RDWR);
101 data->node->m_socket = INVALID_SOCKET;
102 ChangeClusterNodeState(data->node, CLUSTER_NODE_DOWN);
103 }
104 MutexUnlock(data->node->m_mutex);
105 free(data);
106 ClusterDebug(6, _T("Cluster receiver thread shutdown callback completed"));
107 }
108
109 /**
110 * Node receiver thread
111 */
112 static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
113 {
114 ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
115 SOCKET s = node->m_socket;
116 ClusterDebug(5, _T("Receiver thread started for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
117
118 SocketMessageReceiver receiver(s, 8192, 4194304);
119
120 while(!g_nxccShutdown)
121 {
122 MutexLock(node->m_mutex);
123 SOCKET cs = node->m_socket;
124 MutexUnlock(node->m_mutex);
125
126 if (cs != s)
127 break; // socket was changed
128
129 MessageReceiverResult result;
130 NXCPMessage *msg = receiver.readMessage(KEEPALIVE_INTERVAL * 3, &result);
131 if (msg != NULL)
132 {
133 if (msg->getCode() != CMD_KEEPALIVE)
134 {
135 TCHAR buffer[128];
136 ClusterDebug(7, _T("ClusterReceiverThread: message %s from node %d [%s]"),
137 NXCPMessageCodeName(msg->getCode(), buffer), node->m_id, (const TCHAR *)node->m_addr->toString());
138 }
139
140 switch(msg->getCode())
141 {
142 case CMD_CLUSTER_NOTIFY:
143 ProcessClusterNotification(node, (ClusterNotificationCode)msg->getFieldAsInt16(VID_NOTIFICATION_CODE));
144 break;
145 case CMD_JOIN_CLUSTER:
146 ProcessClusterJoinRequest(node, msg);
147 delete msg;
148 break;
149 case CMD_KEEPALIVE:
150 delete msg;
151 break;
152 case CMD_REQUEST_COMPLETED:
153 node->m_msgWaitQueue->put(msg);
154 break;
155 default:
156 if (g_nxccEventHandler->onMessage(msg, node->m_id))
157 delete msg;
158 else
159 node->m_msgWaitQueue->put(msg);
160 break;
161 }
162 }
163 else
164 {
165 ClusterDebug(5, _T("Receiver error for cluster node %d [%s] on socket %d: %s"),
166 node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s, AbstractMessageReceiver::resultToText(result));
167 ReceiverThreadStopData *data = (ReceiverThreadStopData *)malloc(sizeof(ReceiverThreadStopData));
168 data->node = node;
169 data->s = s;
170 ThreadPoolExecute(g_nxccThreadPool, ReceiverThreadShutdownCB, data);
171 break;
172 }
173 }
174
175 closesocket(s);
176 ClusterDebug(5, _T("Receiver thread stopped for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
177 return THREAD_OK;
178 }
179
180 /**
181 * Find cluster node by IP
182 */
183 static int FindClusterNode(const InetAddress& addr)
184 {
185 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
186 if ((g_nxccNodes[i].m_id != 0) && g_nxccNodes[i].m_addr->equals(addr))
187 return i;
188 return -1;
189 }
190
191 /**
192 * Find cluster node by ID
193 */
194 static int FindClusterNode(UINT32 id)
195 {
196 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
197 if (g_nxccNodes[i].m_id == id)
198 return i;
199 return -1;
200 }
201
202 /**
203 * Change cluster node state
204 */
205 void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
206 {
207 static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("SYNC"), _T("UP") };
208
209 if (node->m_state == state)
210 return;
211
212 node->m_state = state;
213 ClusterDebug(1, _T("Cluster node %d [%s] changed state to %s"), node->m_id, (const TCHAR *)node->m_addr->toString(), stateNames[state]);
214 switch(state)
215 {
216 case CLUSTER_NODE_CONNECTED:
217 node->m_receiverThread = ThreadCreateEx(ClusterReceiverThread, 0, node);
218 if (node->m_id < g_nxccNodeId)
219 ThreadPoolExecute(g_nxccThreadPool, ClusterNodeJoin, node);
220 break;
221 case CLUSTER_NODE_DOWN:
222 ThreadJoin(node->m_receiverThread);
223 node->m_receiverThread = INVALID_THREAD_HANDLE;
224 g_nxccEventHandler->onNodeDisconnect(node->m_id);
225 if (node->m_master)
226 {
227 node->m_master = false;
228 PromoteClusterNode();
229 }
230 break;
231 case CLUSTER_NODE_SYNC:
232 g_nxccEventHandler->onNodeJoin(node->m_id);
233 break;
234 case CLUSTER_NODE_UP:
235 g_nxccEventHandler->onNodeUp(node->m_id);
236 break;
237 }
238 }
239
240 /**
241 * Listener thread
242 */
243 static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
244 {
245 SOCKET s = CAST_FROM_POINTER(arg, SOCKET);
246 while(!g_nxccShutdown)
247 {
248 struct timeval tv;
249 tv.tv_sec = 1;
250 tv.tv_usec = 0;
251
252 fd_set rdfs;
253 FD_ZERO(&rdfs);
254 FD_SET(s, &rdfs);
255 int rc = select(SELECT_NFDS(s + 1), &rdfs, NULL, NULL, &tv);
256 if ((rc > 0) && !g_nxccShutdown)
257 {
258 char clientAddr[128];
259 socklen_t size = 128;
260 SOCKET in = accept(s, (struct sockaddr *)clientAddr, &size);
261 if (in == INVALID_SOCKET)
262 {
263 ClusterDebug(5, _T("ClusterListenerThread: accept() failure"));
264 continue;
265 }
266
267 #ifndef _WIN32
268 fcntl(in, F_SETFD, fcntl(in, F_GETFD) | FD_CLOEXEC);
269 #endif
270
271 InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
272 ClusterDebug(5, _T("ClusterListenerThread: incoming connection from %s"), (const TCHAR *)addr.toString());
273
274 int idx = FindClusterNode(addr);
275 if (idx == -1)
276 {
277 ClusterDebug(5, _T("ClusterListenerThread: incoming connection rejected (unknown IP address)"));
278 closesocket(in);
279 continue;
280 }
281
282 MutexLock(g_nxccNodes[idx].m_mutex);
283 if (g_nxccNodes[idx].m_socket == INVALID_SOCKET)
284 {
285 g_nxccNodes[idx].m_socket = in;
286 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
287 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
288 ChangeClusterNodeState(&g_nxccNodes[idx], CLUSTER_NODE_CONNECTED);
289 }
290 else
291 {
292 ClusterDebug(5, _T("Cluster connection from peer %d [%s] discarded because connection already present"),
293 g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
294 closesocket(in);
295 }
296 MutexUnlock(g_nxccNodes[idx].m_mutex);
297 }
298 }
299
300 closesocket(s);
301 ClusterDebug(1, _T("Cluster listener thread stopped"));
302 return THREAD_OK;
303 }
304
305 /**
306 * Connector thread
307 */
308 static THREAD_RESULT THREAD_CALL ClusterConnectorThread(void *arg)
309 {
310 ClusterDebug(1, _T("Cluster connector thread started"));
311
312 while(!g_nxccShutdown)
313 {
314 ThreadSleepMs(500);
315 if (g_nxccShutdown)
316 break;
317
318 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
319 {
320 MutexLock(g_nxccNodes[i].m_mutex);
321 if ((g_nxccNodes[i].m_id != 0) && (g_nxccNodes[i].m_socket == INVALID_SOCKET))
322 {
323 MutexUnlock(g_nxccNodes[i].m_mutex);
324 SOCKET s = ConnectToHost(*g_nxccNodes[i].m_addr, g_nxccNodes[i].m_port, 500);
325 MutexLock(g_nxccNodes[i].m_mutex);
326 if (s != INVALID_SOCKET)
327 {
328 if (g_nxccNodes[i].m_socket == INVALID_SOCKET)
329 {
330 g_nxccNodes[i].m_socket = s;
331 ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
332 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
333 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_CONNECTED);
334 }
335 else
336 {
337 ClusterDebug(5, _T("Cluster connection established with peer %d [%s] but discarded because connection already present"),
338 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
339 closesocket(s);
340 }
341 }
342 }
343 MutexUnlock(g_nxccNodes[i].m_mutex);
344 }
345 }
346
347 ClusterDebug(1, _T("Cluster connector thread stopped"));
348 return THREAD_OK;
349 }
350
351 /**
352 * Cluster keepalive thread
353 */
354 static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
355 {
356 ClusterDebug(1, _T("Cluster keepalive thread started"));
357
358 NXCPMessage msg;
359 msg.setCode(CMD_KEEPALIVE);
360 msg.setField(VID_NODE_ID, g_nxccNodeId);
361 NXCP_MESSAGE *rawMsg = msg.createMessage();
362
363 while(!g_nxccShutdown)
364 {
365 ThreadSleepMs(KEEPALIVE_INTERVAL);
366 if (g_nxccShutdown)
367 break;
368
369 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
370 {
371 if (g_nxccNodes[i].m_id == 0)
372 continue; // empty slot
373
374 MutexLock(g_nxccNodes[i].m_mutex);
375 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
376 {
377 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
378 {
379 ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
380 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
381 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
382 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
383 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
384 }
385 }
386 MutexUnlock(g_nxccNodes[i].m_mutex);
387 }
388 }
389
390 ClusterDebug(1, _T("Cluster keepalive thread stopped"));
391 return THREAD_OK;
392 }
393
394 /**
395 * Send message to cluster node
396 */
397 void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg)
398 {
399 NXCP_MESSAGE *rawMsg = msg->createMessage();
400 MutexLock(node->m_mutex);
401 if (node->m_socket != INVALID_SOCKET)
402 {
403 if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
404 {
405 ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
406 shutdown(node->m_socket, SHUT_RDWR);
407 node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
408 ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
409 }
410 }
411 MutexUnlock(node->m_mutex);
412 free(rawMsg);
413 }
414
415 /**
416 * Send notification to all connected nodes
417 */
418 void LIBNXCC_EXPORTABLE ClusterNotify(INT16 code)
419 {
420 NXCPMessage msg;
421 msg.setCode(CMD_CLUSTER_NOTIFY);
422 msg.setId((UINT32)InterlockedIncrement(&s_commandId));
423 msg.setField(VID_NOTIFICATION_CODE, code);
424 msg.setField(VID_NODE_ID, g_nxccNodeId);
425 msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
426 ClusterNotify(&msg);
427 }
428
429 /**
430 * Send notification to all connected nodes
431 */
432 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
433 {
434 NXCP_MESSAGE *rawMsg = msg->createMessage();
435
436 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
437 {
438 if (g_nxccNodes[i].m_id == 0)
439 continue; // empty slot
440
441 MutexLock(g_nxccNodes[i].m_mutex);
442 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
443 {
444 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
445 {
446 ClusterDebug(5, _T("ClusterNotify: send failed for peer %d [%s]"),
447 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
448 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
449 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
450 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
451 }
452 }
453 MutexUnlock(g_nxccNodes[i].m_mutex);
454 }
455
456 free(rawMsg);
457 }
458
459 /**
460 * Direct notify with just notification code
461 */
462 void LIBNXCC_EXPORTABLE ClusterDirectNotify(UINT32 nodeId, INT16 code)
463 {
464 int index = FindClusterNode(nodeId);
465 if (index != -1)
466 ClusterDirectNotify(&g_nxccNodes[index], code);
467 }
468
469 /**
470 * Direct notify with just notification code
471 */
472 void ClusterDirectNotify(ClusterNodeInfo *node, INT16 code)
473 {
474 NXCPMessage msg;
475 msg.setCode(CMD_CLUSTER_NOTIFY);
476 msg.setId((UINT32)InterlockedIncrement(&s_commandId));
477 msg.setField(VID_NOTIFICATION_CODE, code);
478 msg.setField(VID_NODE_ID, g_nxccNodeId);
479 msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
480 ClusterSendMessage(node, &msg);
481 }
482
483 /**
484 * Send command to all connected nodes and wait for response
485 *
486 * @return number of execution errors
487 */
488 int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg)
489 {
490 UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
491 msg->setId(requestId);
492 NXCP_MESSAGE *rawMsg = msg->createMessage();
493
494 bool waitFlags[CLUSTER_MAX_NODE_ID];
495 memset(waitFlags, 0, sizeof(waitFlags));
496
497 int errors = 0;
498 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
499 {
500 if (g_nxccNodes[i].m_id == 0)
501 continue; // empty slot
502
503 MutexLock(g_nxccNodes[i].m_mutex);
504 if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
505 {
506 if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
507 {
508 waitFlags[i] = true;
509 }
510 else
511 {
512 ClusterDebug(5, _T("ClusterCommand: send failed for peer %d [%s]"),
513 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
514 shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
515 g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
516 ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
517 errors++;
518 }
519 }
520 MutexUnlock(g_nxccNodes[i].m_mutex);
521 }
522
523 free(rawMsg);
524
525 // Collect responses
526 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
527 {
528 if (!waitFlags[i])
529 continue;
530 NXCPMessage *response = g_nxccNodes[i].m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
531 if (response != NULL)
532 {
533 UINT32 rcc = response->getFieldAsInt32(VID_RCC);
534 if (rcc != NXCC_RCC_SUCCESS)
535 {
536 ClusterDebug(5, _T("ClusterCommand: failed request to peer %d [%s] RCC=%d"),
537 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString(), rcc);
538 errors++;
539 }
540 delete response;
541 }
542 else
543 {
544 ClusterDebug(5, _T("ClusterCommand: timed out request to peer %d [%s]"),
545 g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
546 errors++;
547 }
548 }
549
550 return errors;
551 }
552
553 /**
554 * Send command to specific node and wait for response
555 *
556 * @return request completion code
557 */
558 UINT32 LIBNXCC_EXPORTABLE ClusterSendDirectCommand(UINT32 nodeId, NXCPMessage *msg)
559 {
560 NXCPMessage *response = ClusterSendDirectCommandEx(nodeId, msg);
561 if (response == NULL)
562 return NXCC_RCC_TIMEOUT;
563
564 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
565 if (rcc != 0)
566 {
567 ClusterDebug(5, _T("ClusterDirectCommand: failed request to peer %d: rcc=%d"), nodeId, rcc);
568 }
569 delete response;
570 return rcc;
571 }
572
573 /**
574 * Send command to specific node and wait for response
575 *
576 * @return request completion code
577 */
578 NXCPMessage LIBNXCC_EXPORTABLE *ClusterSendDirectCommandEx(UINT32 nodeId, NXCPMessage *msg)
579 {
580 int index = FindClusterNode(nodeId);
581 if (index == -1)
582 {
583 NXCPMessage *response = new NXCPMessage();
584 response->setField(VID_RCC, NXCC_RCC_INVALID_NODE);
585 return response;
586 }
587
588 ClusterNodeInfo *node = &g_nxccNodes[index];
589
590 UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
591 msg->setId(requestId);
592 NXCP_MESSAGE *rawMsg = msg->createMessage();
593
594 bool sent = false;
595 MutexLock(node->m_mutex);
596 if (node->m_socket != INVALID_SOCKET)
597 {
598 if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
599 {
600 sent = true;
601 }
602 else
603 {
604 ClusterDebug(5, _T("ClusterDirectCommand: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
605 shutdown(node->m_socket, SHUT_RDWR);
606 node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
607 ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
608 }
609 }
610 MutexUnlock(node->m_mutex);
611
612 free(rawMsg);
613
614 // Wait for responses
615 NXCPMessage *response;
616 if (sent)
617 {
618 response = node->m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
619 }
620 else
621 {
622 response = new NXCPMessage();
623 response->setField(VID_RCC, NXCC_RCC_COMM_FAILURE);
624 }
625
626 return response;
627 }
628
629 /**
630 * Send response to cluster peer
631 */
632 void LIBNXCC_EXPORTABLE ClusterSendResponse(UINT32 nodeId, UINT32 requestId, UINT32 rcc)
633 {
634 int index = FindClusterNode(nodeId);
635 if (index == -1)
636 return;
637
638 ClusterNodeInfo *node = &g_nxccNodes[index];
639
640 NXCPMessage msg;
641 msg.setCode(CMD_REQUEST_COMPLETED);
642 msg.setId(requestId);
643
644 ClusterSendMessage(node, &msg);
645 }
646
647 /**
648 * Send response to cluster peer
649 */
650 void LIBNXCC_EXPORTABLE ClusterSendResponseEx(UINT32 nodeId, UINT32 requestId, NXCPMessage *msg)
651 {
652 int index = FindClusterNode(nodeId);
653 if (index == -1)
654 return;
655
656 ClusterNodeInfo *node = &g_nxccNodes[index];
657
658 msg->setCode(CMD_REQUEST_COMPLETED);
659 msg->setId(requestId);
660
661 ClusterSendMessage(node, msg);
662 }
663
664 /**
665 * Check if all cluster nodes connected
666 */
667 bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected()
668 {
669 int total = 0, connected = 0;
670 for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
671 if (g_nxccNodes[i].m_id > 0)
672 {
673 total++;
674 if (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED)
675 connected++;
676 }
677 return total == connected;
678 }
679
680 /**
681 * Join cluster
682 *
683 * @return true on successful join
684 */
685 bool LIBNXCC_EXPORTABLE ClusterJoin()
686 {
687 if (!g_nxccInitialized)
688 return false;
689
690 SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
691 if (s == INVALID_SOCKET)
692 {
693 ClusterDebug(1, _T("ClusterJoin: cannot create socket"));
694 return false;
695 }
696
697 SetSocketExclusiveAddrUse(s);
698 SetSocketReuseFlag(s);
699
700 struct sockaddr_in servAddr;
701 memset(&servAddr, 0, sizeof(struct sockaddr_in));
702 servAddr.sin_family = AF_INET;
703 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
704 servAddr.sin_port = htons((UINT16)(47000 + g_nxccNodeId));
705 if (bind(s, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
706 {
707 ClusterDebug(1, _T("ClusterJoin: cannot bind listening socket (%s)"), _tcserror(WSAGetLastError()));
708 closesocket(s);
709 return false;
710 }
711
712 if (listen(s, SOMAXCONN) == 0)
713 {
714 ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)ntohs(servAddr.sin_port));
715 }
716 else
717 {
718 ClusterDebug(1, _T("ClusterJoin: listen() failed (%s)"), _tcserror(WSAGetLastError()));
719 closesocket(s);
720 return false;
721 }
722
723 s_listenerThread = ThreadCreateEx(ClusterListenerThread, 0, CAST_TO_POINTER(s, void *));
724 s_connectorThread = ThreadCreateEx(ClusterConnectorThread, 0, NULL);
725 s_keepaliveThread = ThreadCreateEx(ClusterKeepaliveThread, 0, NULL);
726
727 ClusterDebug(1, _T("ClusterJoin: waiting for other nodes"));
728 if (ConditionWait(s_joinCondition, 60000)) // wait 1 minute for other nodes to join
729 {
730 ClusterDebug(1, _T("ClusterJoin: successfully joined"));
731 }
732 else
733 {
734 // no other nodes, declare self as master
735 ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
736 PromoteClusterNode();
737 }
738
739 return true;
740 }
741
742 /**
743 * Disconnect all sockets
744 */
745 void ClusterDisconnect()
746 {
747 ThreadJoin(s_listenerThread);
748 ThreadJoin(s_connectorThread);
749 ThreadJoin(s_keepaliveThread);
750 }
751
752 /**
753 * Set current node as running
754 */
755 void LIBNXCC_EXPORTABLE ClusterSetRunning()
756 {
757 g_nxccNeedSync = false;
758 ClusterNotify(CN_NODE_RUNNING);
759 }