cluster join works in different combinations
authorVictor Kirhenshtein <victor@netxms.org>
Wed, 28 Oct 2015 22:47:48 +0000 (00:47 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Wed, 28 Oct 2015 22:47:48 +0000 (00:47 +0200)
include/nms_cscp.h
include/nxcc.h
src/libnetxms/nxcp.cpp
src/libnxcc/ceh.cpp
src/libnxcc/comm.cpp
src/libnxcc/join.cpp
src/libnxcc/libnxcc.h

index 837cf1f..62a681d 100644 (file)
@@ -558,6 +558,7 @@ typedef struct
 #define CMD_ENTER_MAINT_MODE           0x0145
 #define CMD_LEAVE_MAINT_MODE           0x0146
 #define CMD_JOIN_CLUSTER               0x0147
+#define CMD_CLUSTER_NOTIFY             0x0148
 
 #define CMD_RS_LIST_REPORTS            0x1100
 #define CMD_RS_GET_REPORT              0x1101
index 2134616..eb69936 100644 (file)
@@ -32,7 +32,8 @@ enum ClusterNodeState
 {
    CLUSTER_NODE_DOWN = 0,
    CLUSTER_NODE_CONNECTED = 1,
-   CLUSTER_NODE_UP = 2
+   CLUSTER_NODE_SYNC = 2,
+   CLUSTER_NODE_UP = 3
 };
 
 /**
@@ -45,6 +46,7 @@ public:
    virtual ~ClusterEventHandler();
    
    virtual void onNodeJoin(UINT32 nodeId);
+   virtual void onNodeUp(UINT32 nodeId);
    virtual void onNodeDisconnect(UINT32 nodeId);
    virtual void onShutdown();
    
@@ -61,6 +63,7 @@ void LIBNXCC_EXPORTABLE ClusterShutdown();
 void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, va_list));
 
 bool LIBNXCC_EXPORTABLE ClusterIsMasterNode();
+bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected();
 
 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg);
 int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg);
index 610d2d1..219a187 100644 (file)
@@ -362,10 +362,11 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer)
       _T("CMD_REMOVE_SCHEDULE"),
       _T("CMD_ENTER_MAINT_MODE"),
       _T("CMD_LEAVE_MAINT_MODE"),
-      _T("CMD_JOIN_CLUSTER")
+      _T("CMD_JOIN_CLUSTER"),
+      _T("CMD_CLUSTER_NOTIFY")
    };
 
-   if ((code >= CMD_LOGIN) && (code <= CMD_JOIN_CLUSTER))
+   if ((code >= CMD_LOGIN) && (code <= CMD_CLUSTER_NOTIFY))
       _tcscpy(pszBuffer, pszMsgNames[code - CMD_LOGIN]);
    else
       _sntprintf(pszBuffer, 64, _T("CMD_0x%04X"), code);
index 7991c47..e7b3e65 100644 (file)
@@ -44,6 +44,13 @@ void ClusterEventHandler::onNodeJoin(UINT32 nodeId)
 }
 
 /**
+ * Node up handler
+ */
+void ClusterEventHandler::onNodeUp(UINT32 nodeId)
+{
+}
+
+/**
  * Node disconnect handler
  */
 void ClusterEventHandler::onNodeDisconnect(UINT32 nodeId)
index f62d783..b5268f3 100644 (file)
@@ -51,9 +51,20 @@ static VolatileCounter s_commandId = 1;
 static CONDITION s_joinCondition = ConditionCreate(TRUE);
 
 /**
- * Change cluster node state
+ * Process cluster notification
  */
-static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
+static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificationCode code)
+{
+   switch(code)
+   {
+      case CN_NEW_MASTER:
+         ClusterDebug(3, _T("Node %d [%s] is new master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+         node->m_master = true;
+         ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+         ConditionSet(s_joinCondition);
+         break;
+   }
+}
 
 /**
  * Node receiver thread
@@ -88,12 +99,17 @@ static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
 
          switch(msg->getCode())
          {
-            case CMD_KEEPALIVE:
-               delete msg;
+            case CMD_CLUSTER_NOTIFY:
+               ProcessClusterNotification(node, (ClusterNotificationCode)msg->getFieldAsInt16(VID_NOTIFICATION_CODE));
                break;
             case CMD_JOIN_CLUSTER:
                ProcessClusterJoinRequest(node, msg);
                delete msg;
+               if (g_nxccMasterNode)
+                  ConditionSet(s_joinCondition);
+               break;
+            case CMD_KEEPALIVE:
+               delete msg;
                break;
             default:
                if (g_nxccEventHandler->onMessage(msg, node->m_id))
@@ -148,9 +164,9 @@ static int FindClusterNode(UINT32 id)
 /**
  * Change cluster node state
  */
-static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
+void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
 {
-   static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("UP") };
+   static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("SYNC"), _T("UP") };
 
    if (node->m_state == state)
       return;
@@ -169,9 +185,12 @@ static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state
          node->m_receiverThread = INVALID_THREAD_HANDLE;
          g_nxccEventHandler->onNodeDisconnect(node->m_id);
          break;
-      case CLUSTER_NODE_UP:
+      case CLUSTER_NODE_SYNC:
          g_nxccEventHandler->onNodeJoin(node->m_id);
          break;
+      case CLUSTER_NODE_UP:
+         g_nxccEventHandler->onNodeUp(node->m_id);
+         break;
    }
 }
 
@@ -330,6 +349,27 @@ static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
 }
 
 /**
+ * Send message to cluster node
+ */
+void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg)
+{
+   NXCP_MESSAGE *rawMsg = msg->createMessage();
+   MutexLock(node->m_mutex);
+   if (node->m_socket != INVALID_SOCKET)
+   {
+      if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+      {
+         ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
+         shutdown(node->m_socket, SHUT_RDWR);
+         node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+         ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+      }
+   }
+   MutexUnlock(node->m_mutex);
+   free(rawMsg);
+}
+
+/**
  * Send notification to all connected nodes
  */
 void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
@@ -520,20 +560,23 @@ void LIBNXCC_EXPORTABLE ClusterSendResponse(UINT32 nodeId, UINT32 requestId, UIN
    msg.setCode(CMD_REQUEST_COMPLETED);
    msg.setId(requestId);
 
-   NXCP_MESSAGE *rawMsg = msg.createMessage();
-   MutexLock(node->m_mutex);
-   if (node->m_socket != INVALID_SOCKET)
-   {
-      if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+   ClusterSendMessage(node, &msg);
+}
+
+/**
+ * Check if all cluster nodes connected
+ */
+bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected()
+{
+   int total = 0, connected = 0;
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      if (g_nxccNodes[i].m_id > 0)
       {
-         ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
-         shutdown(node->m_socket, SHUT_RDWR);
-         node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
-         ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+         total++;
+         if (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED)
+            connected++;
       }
-   }
-   MutexUnlock(node->m_mutex);
-   free(rawMsg);
+   return total == connected;
 }
 
 /**
@@ -591,8 +634,8 @@ bool LIBNXCC_EXPORTABLE ClusterJoin()
    else
    {
       // no other nodes, declare self as master
-      g_nxccMasterNode = true;
       ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
+      PromoteClusterNode();
    }
 
    return true;
index 5c1bc3c..2519884 100644 (file)
@@ -36,13 +36,89 @@ void ClusterNodeJoin(void *arg)
    msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
 
    NXCPMessage *response = ClusterSendDirectCommandEx(node->m_id, &msg);
+   if (response != NULL)
+   {
+      ClusterJoinResponse r = (ClusterJoinResponse)response->getFieldAsInt16(VID_RCC);
+      ClusterDebug(4, _T("ClusterNodeJoin: join response from node %d [%s]: %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), r);
+      switch(r)
+      {
+         case CJR_ACCEPTED_AS_SECONDARY:
+            ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+            node->m_master = response->getFieldAsBoolean(VID_IS_MASTER);
+            break;
+         case CJR_ACCEPTED_AS_MASTER:
+            ChangeClusterNodeState(node, CLUSTER_NODE_SYNC);
+            break;
+      }
+      delete response;
+   }
 }
 
 /**
  * Process join request received from other node
  */
-void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg)
+void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *request)
 {
    ClusterDebug(4, _T("ProcessClusterJoinRequest: request from node %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
 
+   NXCPMessage response;
+   response.setCode(CMD_REQUEST_COMPLETED);
+   response.setId(request->getId());
+
+   response.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
+
+   bool remoteMaster = request->getFieldAsBoolean(VID_IS_MASTER);
+   if (g_nxccMasterNode && !remoteMaster)
+   {
+      response.setField(VID_RCC, (INT16)CJR_ACCEPTED_AS_SECONDARY);
+      ClusterDebug(4, _T("ProcessClusterJoinRequest: node %d [%s] accepted into running cluster"), node->m_id, (const TCHAR *)node->m_addr->toString());
+      ChangeClusterNodeState(node, CLUSTER_NODE_SYNC);
+   }
+   else if (!g_nxccMasterNode && !remoteMaster)
+   {
+      response.setField(VID_RCC, (INT16)CJR_WAIT_FOR_MASTER);
+      ClusterDebug(4, _T("ProcessClusterJoinRequest: waiting for master"));
+   }
+   else if (!g_nxccMasterNode && remoteMaster)
+   {
+      response.setField(VID_RCC, (INT16)CJR_ACCEPTED_AS_MASTER);
+      ClusterDebug(4, _T("ProcessClusterJoinRequest: joined running cluster with node %d [%s] as master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+      ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+   }
+   else
+   {
+      response.setField(VID_RCC, (INT16)CJR_SPLIT_BRAIN);
+      ClusterDebug(4, _T("ProcessClusterJoinRequest: split-brain condition detected"));
+   }
+
+   ClusterSendMessage(node, &response);
+
+   // Promote this node to master if all cluster nodes already connected
+   // and there are no master yet (new cluster)
+   if (!g_nxccMasterNode && !remoteMaster && ClusterAllNodesConnected())
+   {
+      PromoteClusterNode();
+   }
+}
+
+/**
+ * Promote this node to master if possible
+ */
+void PromoteClusterNode()
+{
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      if ((g_nxccNodes[i].m_id > 0) && (g_nxccNodes[i].m_id < g_nxccNodeId) && (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED))
+      {
+         ClusterDebug(4, _T("PromoteClusterNode: found connected node with higher priority"));
+         break;
+      }
+
+   ClusterDebug(4, _T("PromoteClusterNode: promote this node to master"));
+   g_nxccMasterNode = true;
+
+   NXCPMessage msg;
+   msg.setCode(CMD_CLUSTER_NOTIFY);
+   msg.setField(VID_NODE_ID, g_nxccNodeId);
+   msg.setField(VID_NOTIFICATION_CODE, (INT16)CN_NEW_MASTER);
+   ClusterNotify(&msg);
 }
index 9b27f16..ab300ff 100644 (file)
@@ -53,11 +53,34 @@ struct ClusterNodeInfo
 };
 
 /**
+ * Cluster join responses
+ */
+enum ClusterJoinResponse
+{
+   CJR_ACCEPTED_AS_SECONDARY = 0,
+   CJR_ACCEPTED_AS_MASTER = 1,
+   CJR_WAIT_FOR_MASTER = 2,
+   CJR_SPLIT_BRAIN = 3
+};
+
+/**
+ * Cluster notification codes
+ */
+enum ClusterNotificationCode
+{
+   CN_NEW_MASTER = 1
+};
+
+/**
  * Internal functions
  */
 void ClusterDebug(int level, const TCHAR *format, ...);
 
 void ClusterDisconnect();
+void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
+void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg);
+
+void PromoteClusterNode();
 
 /**
  * Global cluster node settings