#define CMD_ENTER_MAINT_MODE 0x0145
#define CMD_LEAVE_MAINT_MODE 0x0146
#define CMD_JOIN_CLUSTER 0x0147
+#define CMD_CLUSTER_NOTIFY 0x0148
#define CMD_RS_LIST_REPORTS 0x1100
#define CMD_RS_GET_REPORT 0x1101
{
CLUSTER_NODE_DOWN = 0,
CLUSTER_NODE_CONNECTED = 1,
- CLUSTER_NODE_UP = 2
+ CLUSTER_NODE_SYNC = 2,
+ CLUSTER_NODE_UP = 3
};
/**
virtual ~ClusterEventHandler();
virtual void onNodeJoin(UINT32 nodeId);
+ virtual void onNodeUp(UINT32 nodeId);
virtual void onNodeDisconnect(UINT32 nodeId);
virtual void onShutdown();
void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, va_list));
bool LIBNXCC_EXPORTABLE ClusterIsMasterNode();
+bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected();
void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg);
int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg);
_T("CMD_REMOVE_SCHEDULE"),
_T("CMD_ENTER_MAINT_MODE"),
_T("CMD_LEAVE_MAINT_MODE"),
- _T("CMD_JOIN_CLUSTER")
+ _T("CMD_JOIN_CLUSTER"),
+ _T("CMD_CLUSTER_NOTIFY")
};
- if ((code >= CMD_LOGIN) && (code <= CMD_JOIN_CLUSTER))
+ if ((code >= CMD_LOGIN) && (code <= CMD_CLUSTER_NOTIFY))
_tcscpy(pszBuffer, pszMsgNames[code - CMD_LOGIN]);
else
_sntprintf(pszBuffer, 64, _T("CMD_0x%04X"), code);
}
/**
+ * Node up handler
+ */
+void ClusterEventHandler::onNodeUp(UINT32 nodeId)
+{
+}
+
+/**
* Node disconnect handler
*/
void ClusterEventHandler::onNodeDisconnect(UINT32 nodeId)
static CONDITION s_joinCondition = ConditionCreate(TRUE);
/**
- * Change cluster node state
+ * Process cluster notification
*/
-static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
+static void ProcessClusterNotification(ClusterNodeInfo *node, ClusterNotificationCode code)
+{
+ switch(code)
+ {
+ case CN_NEW_MASTER:
+ ClusterDebug(3, _T("Node %d [%s] is new master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+ node->m_master = true;
+ ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+ ConditionSet(s_joinCondition);
+ break;
+ }
+}
/**
* Node receiver thread
switch(msg->getCode())
{
- case CMD_KEEPALIVE:
- delete msg;
+ case CMD_CLUSTER_NOTIFY:
+ ProcessClusterNotification(node, (ClusterNotificationCode)msg->getFieldAsInt16(VID_NOTIFICATION_CODE));
break;
case CMD_JOIN_CLUSTER:
ProcessClusterJoinRequest(node, msg);
delete msg;
+ if (g_nxccMasterNode)
+ ConditionSet(s_joinCondition);
+ break;
+ case CMD_KEEPALIVE:
+ delete msg;
break;
default:
if (g_nxccEventHandler->onMessage(msg, node->m_id))
/**
* Change cluster node state
*/
-static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
+void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
{
- static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("UP") };
+ static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("SYNC"), _T("UP") };
if (node->m_state == state)
return;
node->m_receiverThread = INVALID_THREAD_HANDLE;
g_nxccEventHandler->onNodeDisconnect(node->m_id);
break;
- case CLUSTER_NODE_UP:
+ case CLUSTER_NODE_SYNC:
g_nxccEventHandler->onNodeJoin(node->m_id);
break;
+ case CLUSTER_NODE_UP:
+ g_nxccEventHandler->onNodeUp(node->m_id);
+ break;
}
}
}
/**
+ * Send message to cluster node
+ */
+void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg)
+{
+ NXCP_MESSAGE *rawMsg = msg->createMessage();
+ MutexLock(node->m_mutex);
+ if (node->m_socket != INVALID_SOCKET)
+ {
+ if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+ {
+ ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
+ shutdown(node->m_socket, SHUT_RDWR);
+ node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+ ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+ }
+ }
+ MutexUnlock(node->m_mutex);
+ free(rawMsg);
+}
+
+/**
* Send notification to all connected nodes
*/
void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
msg.setCode(CMD_REQUEST_COMPLETED);
msg.setId(requestId);
- NXCP_MESSAGE *rawMsg = msg.createMessage();
- MutexLock(node->m_mutex);
- if (node->m_socket != INVALID_SOCKET)
- {
- if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+ ClusterSendMessage(node, &msg);
+}
+
+/**
+ * Check if all cluster nodes connected
+ */
+bool LIBNXCC_EXPORTABLE ClusterAllNodesConnected()
+{
+ int total = 0, connected = 0;
+ for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+ if (g_nxccNodes[i].m_id > 0)
{
- ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
- shutdown(node->m_socket, SHUT_RDWR);
- node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
- ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+ total++;
+ if (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED)
+ connected++;
}
- }
- MutexUnlock(node->m_mutex);
- free(rawMsg);
+ return total == connected;
}
/**
else
{
// no other nodes, declare self as master
- g_nxccMasterNode = true;
ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
+ PromoteClusterNode();
}
return true;
msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
NXCPMessage *response = ClusterSendDirectCommandEx(node->m_id, &msg);
+ if (response != NULL)
+ {
+ ClusterJoinResponse r = (ClusterJoinResponse)response->getFieldAsInt16(VID_RCC);
+ ClusterDebug(4, _T("ClusterNodeJoin: join response from node %d [%s]: %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), r);
+ switch(r)
+ {
+ case CJR_ACCEPTED_AS_SECONDARY:
+ ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+ node->m_master = response->getFieldAsBoolean(VID_IS_MASTER);
+ break;
+ case CJR_ACCEPTED_AS_MASTER:
+ ChangeClusterNodeState(node, CLUSTER_NODE_SYNC);
+ break;
+ }
+ delete response;
+ }
}
/**
* Process join request received from other node
*/
-void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg)
+void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *request)
{
ClusterDebug(4, _T("ProcessClusterJoinRequest: request from node %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
+ NXCPMessage response;
+ response.setCode(CMD_REQUEST_COMPLETED);
+ response.setId(request->getId());
+
+ response.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
+
+ bool remoteMaster = request->getFieldAsBoolean(VID_IS_MASTER);
+ if (g_nxccMasterNode && !remoteMaster)
+ {
+ response.setField(VID_RCC, (INT16)CJR_ACCEPTED_AS_SECONDARY);
+ ClusterDebug(4, _T("ProcessClusterJoinRequest: node %d [%s] accepted into running cluster"), node->m_id, (const TCHAR *)node->m_addr->toString());
+ ChangeClusterNodeState(node, CLUSTER_NODE_SYNC);
+ }
+ else if (!g_nxccMasterNode && !remoteMaster)
+ {
+ response.setField(VID_RCC, (INT16)CJR_WAIT_FOR_MASTER);
+ ClusterDebug(4, _T("ProcessClusterJoinRequest: waiting for master"));
+ }
+ else if (!g_nxccMasterNode && remoteMaster)
+ {
+ response.setField(VID_RCC, (INT16)CJR_ACCEPTED_AS_MASTER);
+ ClusterDebug(4, _T("ProcessClusterJoinRequest: joined running cluster with node %d [%s] as master"), node->m_id, (const TCHAR *)node->m_addr->toString());
+ ChangeClusterNodeState(node, CLUSTER_NODE_UP);
+ }
+ else
+ {
+ response.setField(VID_RCC, (INT16)CJR_SPLIT_BRAIN);
+ ClusterDebug(4, _T("ProcessClusterJoinRequest: split-brain condition detected"));
+ }
+
+ ClusterSendMessage(node, &response);
+
+ // Promote this node to master if all cluster nodes already connected
+ // and there are no master yet (new cluster)
+ if (!g_nxccMasterNode && !remoteMaster && ClusterAllNodesConnected())
+ {
+ PromoteClusterNode();
+ }
+}
+
+/**
+ * Promote this node to master if possible
+ */
+void PromoteClusterNode()
+{
+ for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+ if ((g_nxccNodes[i].m_id > 0) && (g_nxccNodes[i].m_id < g_nxccNodeId) && (g_nxccNodes[i].m_state >= CLUSTER_NODE_CONNECTED))
+ {
+ ClusterDebug(4, _T("PromoteClusterNode: found connected node with higher priority"));
+ break;
+ }
+
+ ClusterDebug(4, _T("PromoteClusterNode: promote this node to master"));
+ g_nxccMasterNode = true;
+
+ NXCPMessage msg;
+ msg.setCode(CMD_CLUSTER_NOTIFY);
+ msg.setField(VID_NODE_ID, g_nxccNodeId);
+ msg.setField(VID_NOTIFICATION_CODE, (INT16)CN_NEW_MASTER);
+ ClusterNotify(&msg);
}
};
/**
+ * Cluster join responses
+ */
+enum ClusterJoinResponse
+{
+ CJR_ACCEPTED_AS_SECONDARY = 0,
+ CJR_ACCEPTED_AS_MASTER = 1,
+ CJR_WAIT_FOR_MASTER = 2,
+ CJR_SPLIT_BRAIN = 3
+};
+
+/**
+ * Cluster notification codes
+ */
+enum ClusterNotificationCode
+{
+ CN_NEW_MASTER = 1
+};
+
+/**
* Internal functions
*/
void ClusterDebug(int level, const TCHAR *format, ...);
void ClusterDisconnect();
+void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
+void ClusterSendMessage(ClusterNodeInfo *node, NXCPMessage *msg);
+
+void PromoteClusterNode();
/**
* Global cluster node settings