fixes in cluster library
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 29 Oct 2015 19:50:10 +0000 (21:50 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 29 Oct 2015 19:50:10 +0000 (21:50 +0200)
include/nxcc.h
src/libnxcc/comm.cpp
src/libnxcc/join.cpp
src/libnxcc/libnxcc.h
src/libnxcc/main.cpp
tests/test-libnxcc/test-libnxcc.cpp

index eb69936..e67b494 100644 (file)
@@ -58,14 +58,19 @@ public:
  */
 bool LIBNXCC_EXPORTABLE ClusterInit(Config *config, const TCHAR *section, ClusterEventHandler *eventHandler);
 bool LIBNXCC_EXPORTABLE ClusterJoin();
+void LIBNXCC_EXPORTABLE ClusterSetRunning();
 void LIBNXCC_EXPORTABLE ClusterShutdown();
 
 void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, va_list));
 
+UINT32 LIBNXCC_EXPORTABLE ClusterGetLocalNodeId();
 bool LIBNXCC_EXPORTABLE ClusterIsMasterNode();
+bool LIBNXCC_EXPORTABLE ClusterIsSyncNeeded();
 bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected();
 
 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg);
+void LIBNXCC_EXPORTABLE ClusterNotify(INT16 code);
+void LIBNXCC_EXPORTABLE ClusterDirectNotify(UINT32 nodeId, INT16 code);
 int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg);
 UINT32 LIBNXCC_EXPORTABLE ClusterSendDirectCommand(UINT32 nodeId, NXCPMessage *msg);
 NXCPMessage LIBNXCC_EXPORTABLE *ClusterSendDirectCommandEx(UINT32 nodeId, NXCPMessage *msg);
index b5268f3..3dc297f 100644 (file)
@@ -51,10 +51,19 @@ static VolatileCounter s_commandId = 1;
 static CONDITION s_joinCondition = ConditionCreate(TRUE);
 
 /**
+ * Mark as joined
+ */
+void SetJoinCondition()
+{
+   ConditionSet(s_joinCondition);
+}
+
+/**
  * Process cluster notification
  */
 static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificationCode code)
 {
+   ClusterDebug(4, _T("ProcessClusterNotification: code %d from node %d [%s]"), code, node->m_id, (const TCHAR *)node->m_addr->toString());
    switch(code)
    {
       case CN_NEW_MASTER:
@@ -63,7 +72,38 @@ static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificatio
          ChangeClusterNodeState(node, CLUSTER_NODE_UP);
          ConditionSet(s_joinCondition);
          break;
+      case CN_NODE_RUNNING:
+         ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+         break;
+   }
+}
+
+/**
+ * Receiver thread stop data
+ */
+struct ReceiverThreadStopData
+{
+   ClusterNodeInfo *node;
+   SOCKET s;
+};
+
+/**
+ * Shutdown callback for receiver thread
+ */
+static void ReceiverThreadShutdownCB(void *arg)
+{
+   ReceiverThreadStopData *data = (ReceiverThreadStopData *)arg;
+
+   MutexLock(data->node->m_mutex);
+   if (data->node->m_socket == data->s)
+   {
+      shutdown(data->s, SHUT_RDWR);
+      data->node->m_socket = INVALID_SOCKET;
+      ChangeClusterNodeState(data->node, CLUSTER_NODE_DOWN);
    }
+   MutexUnlock(data->node->m_mutex);
+   free(data);
+   ClusterDebug(6, _T("Cluster receiver thread shutdown callback completed"));
 }
 
 /**
@@ -105,8 +145,6 @@ static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
             case CMD_JOIN_CLUSTER:
                ProcessClusterJoinRequest(node, msg);
                delete msg;
-               if (g_nxccMasterNode)
-                  ConditionSet(s_joinCondition);
                break;
             case CMD_KEEPALIVE:
                delete msg;
@@ -123,14 +161,11 @@ static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
       {
          ClusterDebug(5, _T("Receiver error for cluster node %d [%s] on socket %d: %s"),
             node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s, AbstractMessageReceiver::resultToText(result));
-         MutexLock(node->m_mutex);
-         if (node->m_socket == s)
-         {
-            shutdown(s, SHUT_RDWR);
-            node->m_socket = INVALID_SOCKET;
-            ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
-         }
-         MutexUnlock(node->m_mutex);
+         ReceiverThreadStopData *data = (ReceiverThreadStopData *)malloc(sizeof(ReceiverThreadStopData));
+         data->node = node;
+         data->s = s;
+         ThreadPoolExecute(g_nxccThreadPool, ReceiverThreadShutdownCB, data);
+         break;
       }
    }
 
@@ -184,6 +219,11 @@ void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
          ThreadJoin(node->m_receiverThread);
          node->m_receiverThread = INVALID_THREAD_HANDLE;
          g_nxccEventHandler->onNodeDisconnect(node->m_id);
+         if (node->m_master)
+         {
+            node->m_master = false;
+            PromoteClusterNode();
+         }
          break;
       case CLUSTER_NODE_SYNC:
          g_nxccEventHandler->onNodeJoin(node->m_id);
@@ -226,7 +266,7 @@ static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
 #endif
 
          InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
-         ClusterDebug(5, _T("Incoming connection from %s"), (const TCHAR *)addr.toString());
+         ClusterDebug(5, _T("ClusterListenerThread: incoming connection from %s"), (const TCHAR *)addr.toString());
 
          int idx = FindClusterNode(addr);
          if (idx == -1)
@@ -248,7 +288,7 @@ static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
          {
             ClusterDebug(5, _T("Cluster connection from peer %d [%s] discarded because connection already present"),
                g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
-            closesocket(s);
+            closesocket(in);
          }
          MutexUnlock(g_nxccNodes[idx].m_mutex);
       }
@@ -372,6 +412,20 @@ void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg)
 /**
  * Send notification to all connected nodes
  */
+void LIBNXCC_EXPORTABLE ClusterNotify(INT16 code)
+{
+   NXCPMessage msg;
+   msg.setCode(CMD_CLUSTER_NOTIFY);
+   msg.setId((UINT32)InterlockedIncrement(&s_commandId));
+   msg.setField(VID_NOTIFICATION_CODE, code);
+   msg.setField(VID_NODE_ID, g_nxccNodeId);
+   msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
+   ClusterNotify(&msg);
+}
+
+/**
+ * Send notification to all connected nodes
+ */
 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
 {
    NXCP_MESSAGE *rawMsg = msg->createMessage();
@@ -400,6 +454,30 @@ void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
 }
 
 /**
+ * Direct notify with just notification code
+ */
+void LIBNXCC_EXPORTABLE ClusterDirectNotify(UINT32 nodeId, INT16 code)
+{
+   int index = FindClusterNode(nodeId);
+   if (index != -1)
+      ClusterDirectNotify(&g_nxccNodes[index], code);
+}
+
+/**
+ * Direct notify with just notification code
+ */
+void ClusterDirectNotify(ClusterNodeInfo *node, INT16 code)
+{
+   NXCPMessage msg;
+   msg.setCode(CMD_CLUSTER_NOTIFY);
+   msg.setId((UINT32)InterlockedIncrement(&s_commandId));
+   msg.setField(VID_NOTIFICATION_CODE, code);
+   msg.setField(VID_NODE_ID, g_nxccNodeId);
+   msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
+   ClusterSendMessage(node, &msg);
+}
+
+/**
  * Send command to all connected nodes and wait for response
  *
  * @return number of execution errors
@@ -650,3 +728,12 @@ void ClusterDisconnect()
    ThreadJoin(s_connectorThread);
    ThreadJoin(s_keepaliveThread);
 }
+
+/**
+ * Set current node as running
+ */
+void LIBNXCC_EXPORTABLE ClusterSetRunning()
+{
+   g_nxccNeedSync = false;
+   ClusterNotify(CN_NODE_RUNNING);
+}
index 2519884..ac154a1 100644 (file)
@@ -27,6 +27,8 @@
  */
 void ClusterNodeJoin(void *arg)
 {
+   static const TCHAR *rspNames[] = { _T("ACCEPTED AS SECONDARY"), _T("ACCEPTED AS MASTER"), _T("WAIT FOR MASTER"), _T("SPLIT BRAIN") };
+
    ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
    ClusterDebug(4, _T("ClusterNodeJoin: requesting join from from node %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
 
@@ -39,15 +41,18 @@ void ClusterNodeJoin(void *arg)
    if (response != NULL)
    {
       ClusterJoinResponse r = (ClusterJoinResponse)response->getFieldAsInt16(VID_RCC);
-      ClusterDebug(4, _T("ClusterNodeJoin: join response from node %d [%s]: %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), r);
+      ClusterDebug(4, _T("ClusterNodeJoin: join response from node %d [%s]: %s"), node->m_id, (const TCHAR *)node->m_addr->toString(), rspNames[r]);
       switch(r)
       {
          case CJR_ACCEPTED_AS_SECONDARY:
             ChangeClusterNodeState(node, CLUSTER_NODE_UP);
             node->m_master = response->getFieldAsBoolean(VID_IS_MASTER);
+            g_nxccNeedSync = true;
+            SetJoinCondition();
             break;
          case CJR_ACCEPTED_AS_MASTER:
             ChangeClusterNodeState(node, CLUSTER_NODE_SYNC);
+            SetJoinCondition();
             break;
       }
       delete response;
@@ -82,8 +87,11 @@ void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *request)
    else if (!g_nxccMasterNode && remoteMaster)
    {
       response.setField(VID_RCC, (INT16)CJR_ACCEPTED_AS_MASTER);
-      ClusterDebug(4, _T("ProcessClusterJoinRequest: joined running cluster with node %d [%s] as master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+      ClusterDebug(4, _T("ProcessClusterJoinRequest: joined running cluster as secondary with node %d [%s] as master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+      node->m_master = true;
       ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+      g_nxccNeedSync = true;
+      SetJoinCondition();
    }
    else
    {
@@ -102,15 +110,15 @@ void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *request)
 }
 
 /**
- * Promote this node to master if possible
+ * Promote cluster node - callback for running on separate thread
  */
-void PromoteClusterNode()
+static void PromoteClusterNodeCB(void *arg)
 {
    for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
       if ((g_nxccNodes[i].m_id > 0) && (g_nxccNodes[i].m_id < g_nxccNodeId) && (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED))
       {
          ClusterDebug(4, _T("PromoteClusterNode: found connected node with higher priority"));
-         break;
+         return;
       }
 
    ClusterDebug(4, _T("PromoteClusterNode: promote this node to master"));
@@ -121,4 +129,14 @@ void PromoteClusterNode()
    msg.setField(VID_NODE_ID, g_nxccNodeId);
    msg.setField(VID_NOTIFICATION_CODE, (INT16)CN_NEW_MASTER);
    ClusterNotify(&msg);
+
+   SetJoinCondition();
+}
+
+/**
+ * Promote this node to master if possible (always executed on separate thread)
+ */
+void PromoteClusterNode()
+{
+   ThreadPoolExecute(g_nxccThreadPool, PromoteClusterNodeCB, NULL);
 }
index ab300ff..bb6e74d 100644 (file)
@@ -64,11 +64,12 @@ enum ClusterJoinResponse
 };
 
 /**
- * Cluster notification codes
+ * Standard cluster notification codes
  */
 enum ClusterNotificationCode
 {
-   CN_NEW_MASTER = 1
+   CN_NEW_MASTER = 1,
+   CN_NODE_RUNNING = 2
 };
 
 /**
@@ -79,9 +80,12 @@ void ClusterDebug(int level, const TCHAR *format, ...);
 void ClusterDisconnect();
 void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
 void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg);
+void ClusterDirectNotify(ClusterNodeInfo *node, INT16 code);
 
 void PromoteClusterNode();
 
+void SetJoinCondition();
+
 /**
  * Global cluster node settings
  */
@@ -92,6 +96,7 @@ extern ClusterNodeInfo g_nxccNodes[CLUSTER_MAX_NODE_ID];
 extern bool g_nxccInitialized;
 extern bool g_nxccMasterNode;
 extern bool g_nxccShutdown;
+extern bool g_nxccNeedSync;
 extern UINT16 g_nxccListenPort;
 extern UINT32 g_nxccCommandTimeout;
 extern ThreadPool *g_nxccThreadPool;
index a6c50df..329da07 100644 (file)
@@ -31,6 +31,7 @@ ClusterNodeState g_nxccState = CLUSTER_NODE_DOWN;
 bool g_nxccInitialized = false;
 bool g_nxccMasterNode = false;
 bool g_nxccShutdown = false;
+bool g_nxccNeedSync = false;
 UINT16 g_nxccListenPort = 47000;
 UINT32 g_nxccCommandTimeout = 500;
 
@@ -185,6 +186,22 @@ bool LIBNXCC_EXPORTABLE ClusterIsMasterNode()
    return g_nxccMasterNode;
 }
 
+/**
+ * Check if synchronization needed
+ */
+bool LIBNXCC_EXPORTABLE ClusterIsSyncNeeded()
+{
+   return g_nxccNeedSync;
+}
+
+/**
+ * Get local node ID
+ */
+UINT32 LIBNXCC_EXPORTABLE ClusterGetLocalNodeId()
+{
+   return g_nxccNodeId;
+}
+
 #ifdef _WIN32
 
 /**
index c4ff3f4..ad1e520 100644 (file)
@@ -84,9 +84,10 @@ int main(int argc, char *argv[])
    config->setValue(_T("/CLUSTER/PeerNode"), (s_nodeId == 1) ? _T("2:127.0.0.1") : _T("1:127.0.0.1"));
 
    ClusterSetDebugCallback(DebugCallback);
-   ClusterInit(config, _T("CLUSTER"), new EventHandler());
+   AssertTrue(ClusterInit(config, _T("CLUSTER"), new EventHandler()));
 
-   ClusterJoin();
+   AssertTrue(ClusterJoin());
+   ClusterSetRunning();
    _tprintf(_T("CLUSTER RUNNING\n"));
 
    ThreadSleep(1);