changes in cluster join code
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 26 Oct 2015 12:04:43 +0000 (14:04 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 26 Oct 2015 12:04:43 +0000 (14:04 +0200)
include/nms_cscp.h
include/nxcc.h
src/libnetxms/nxcp.cpp
src/libnxcc/Makefile.am
src/libnxcc/ceh.cpp
src/libnxcc/comm.cpp
src/libnxcc/join.cpp [copied from src/libnxcc/ceh.cpp with 52% similarity]
src/libnxcc/libnxcc.h
src/libnxcc/main.cpp
tests/test-libnxcc/test-libnxcc.cpp

index 4bf2b6d..837cf1f 100644 (file)
@@ -557,6 +557,7 @@ typedef struct
 #define CMD_REMOVE_SCHEDULE            0x0144
 #define CMD_ENTER_MAINT_MODE           0x0145
 #define CMD_LEAVE_MAINT_MODE           0x0146
+#define CMD_JOIN_CLUSTER               0x0147
 
 #define CMD_RS_LIST_REPORTS            0x1100
 #define CMD_RS_GET_REPORT              0x1101
@@ -1102,6 +1103,7 @@ typedef struct
 #define VID_DASHBOARDS              ((UINT32)528)
 #define VID_OWNER                   ((UINT32)529)
 #define VID_MAINTENANCE_MODE        ((UINT32)530)
+#define VID_IS_MASTER               ((UINT32)531)
 
 // Base variabe for single threshold in message
 #define VID_THRESHOLD_BASE          ((UINT32)0x00800000)
index 6624e62..2134616 100644 (file)
 #include <nxconfig.h>
 
 /**
+ * Cluster error codes
+ */
+#define NXCC_RCC_SUCCESS         0
+#define NXCC_RCC_INVALID_NODE    1
+#define NXCC_RCC_TIMEOUT         2
+#define NXCC_RCC_COMM_FAILURE    3
+
+/**
  * Cluster node states
  */
 enum ClusterNodeState
@@ -40,7 +48,7 @@ public:
    virtual void onNodeDisconnect(UINT32 nodeId);
    virtual void onShutdown();
    
-   virtual void onMessage(NXCPMessage *msg, UINT32 sourceNodeId);
+   virtual bool onMessage(NXCPMessage *msg, UINT32 sourceNodeId);
 };
 
 /**
@@ -54,4 +62,10 @@ void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, v
 
 bool LIBNXCC_EXPORTABLE ClusterIsMasterNode();
 
+void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg);
+int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg);
+UINT32 LIBNXCC_EXPORTABLE ClusterSendDirectCommand(UINT32 nodeId, NXCPMessage *msg);
+NXCPMessage LIBNXCC_EXPORTABLE *ClusterSendDirectCommandEx(UINT32 nodeId, NXCPMessage *msg);
+void LIBNXCC_EXPORTABLE ClusterSendResponse(UINT32 nodeId, UINT32 requestId, UINT32 rcc);
+
 #endif
index fa257c1..610d2d1 100644 (file)
@@ -361,10 +361,11 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer)
       _T("CMD_UPDATE_SCHEDULE"),
       _T("CMD_REMOVE_SCHEDULE"),
       _T("CMD_ENTER_MAINT_MODE"),
-      _T("CMD_LEAVE_MAINT_MODE")
+      _T("CMD_LEAVE_MAINT_MODE"),
+      _T("CMD_JOIN_CLUSTER")
    };
 
-   if ((code >= CMD_LOGIN) && (code <= CMD_LEAVE_MAINT_MODE))
+   if ((code >= CMD_LOGIN) && (code <= CMD_JOIN_CLUSTER))
       _tcscpy(pszBuffer, pszMsgNames[code - CMD_LOGIN]);
    else
       _sntprintf(pszBuffer, 64, _T("CMD_0x%04X"), code);
index 08e156b..add3eb1 100644 (file)
@@ -1,4 +1,4 @@
-SOURCES = ceh.cpp comm.cpp main.cpp
+SOURCES = ceh.cpp comm.cpp join.cpp main.cpp
 
 lib_LTLIBRARIES = libnxcc.la
 
index 832147d..7991c47 100644 (file)
@@ -59,7 +59,9 @@ void ClusterEventHandler::onShutdown()
 
 /**
  * Incoming message handler
+ * Should return true if message was processed
  */
-void ClusterEventHandler::onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
+bool ClusterEventHandler::onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
 {
+   return false;
 }
index 4eb789d..f62d783 100644 (file)
 #include "libnxcc.h"
 
 /**
+ * Externals
+ */
+void ClusterNodeJoin(void *arg);
+void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg);
+
+/**
  * Keepalive interval
  */
 #define KEEPALIVE_INTERVAL    200
@@ -35,6 +41,11 @@ static THREAD s_connectorThread = INVALID_THREAD_HANDLE;
 static THREAD s_keepaliveThread = INVALID_THREAD_HANDLE;
 
 /**
+ * Command ID
+ */
+static VolatileCounter s_commandId = 1;
+
+/**
  * Join condition
  */
 static CONDITION s_joinCondition = ConditionCreate(TRUE);
@@ -68,8 +79,29 @@ static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
       NXCPMessage *msg = receiver.readMessage(KEEPALIVE_INTERVAL * 3, &result);
       if (msg != NULL)
       {
-         g_nxccEventHandler->onMessage(msg, node->m_id);
-         delete msg;
+         if (msg->getCode() != CMD_KEEPALIVE)
+         {
+            TCHAR buffer[128];
+            ClusterDebug(7, _T("ClusterReceiverThread: message %s from node %d [%s]"),
+               NXCPMessageCodeName(msg->getCode(), buffer), node->m_id, (const TCHAR *)node->m_addr->toString());
+         }
+
+         switch(msg->getCode())
+         {
+            case CMD_KEEPALIVE:
+               delete msg;
+               break;
+            case CMD_JOIN_CLUSTER:
+               ProcessClusterJoinRequest(node, msg);
+               delete msg;
+               break;
+            default:
+               if (g_nxccEventHandler->onMessage(msg, node->m_id))
+                  delete msg;
+               else
+                  node->m_msgWaitQueue->put(msg);
+               break;
+         }
       }
       else
       {
@@ -103,6 +135,17 @@ static int FindClusterNode(const InetAddress& addr)
 }
 
 /**
+ * Find cluster node by ID
+ */
+static int FindClusterNode(UINT32 id)
+{
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      if (g_nxccNodes[i].m_id == id)
+         return i;
+   return -1;
+}
+
+/**
  * Change cluster node state
  */
 static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
@@ -118,6 +161,8 @@ static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state
    {
       case CLUSTER_NODE_CONNECTED:
          node->m_receiverThread = ThreadCreateEx(ClusterReceiverThread, 0, node);
+         if (node->m_id < g_nxccNodeId)
+            ThreadPoolExecute(g_nxccThreadPool, ClusterNodeJoin, node);
          break;
       case CLUSTER_NODE_DOWN:
          ThreadJoin(node->m_receiverThread);
@@ -285,6 +330,213 @@ static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
 }
 
 /**
+ * Send notification to all connected nodes
+ */
+void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
+{
+   NXCP_MESSAGE *rawMsg = msg->createMessage();
+
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+   {
+      if (g_nxccNodes[i].m_id == 0)
+         continue;   // empty slot
+
+      MutexLock(g_nxccNodes[i].m_mutex);
+      if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
+      {
+         if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+         {
+            ClusterDebug(5, _T("ClusterNotify: send failed for peer %d [%s]"),
+               g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+            shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
+            g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+            ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
+         }
+      }
+      MutexUnlock(g_nxccNodes[i].m_mutex);
+   }
+
+   free(rawMsg);
+}
+
+/**
+ * Send command to all connected nodes and wait for response
+ *
+ * @return number of execution errors
+ */
+int LIBNXCC_EXPORTABLE ClusterSendCommand(NXCPMessage *msg)
+{
+   UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
+   msg->setId(requestId);
+   NXCP_MESSAGE *rawMsg = msg->createMessage();
+
+   bool waitFlags[CLUSTER_MAX_NODE_ID];
+   memset(waitFlags, 0, sizeof(waitFlags));
+
+   int errors = 0;
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+   {
+      if (g_nxccNodes[i].m_id == 0)
+         continue;   // empty slot
+
+      MutexLock(g_nxccNodes[i].m_mutex);
+      if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
+      {
+         if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
+         {
+            waitFlags[i] = true;
+         }
+         else
+         {
+            ClusterDebug(5, _T("ClusterCommand: send failed for peer %d [%s]"),
+               g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+            shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
+            g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+            ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
+            errors++;
+         }
+      }
+      MutexUnlock(g_nxccNodes[i].m_mutex);
+   }
+
+   free(rawMsg);
+
+   // Collect responses
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+   {
+      if (!waitFlags[i])
+         continue;
+      NXCPMessage *response = g_nxccNodes[i].m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
+      if (response != NULL)
+      {
+         UINT32 rcc = response->getFieldAsInt32(VID_RCC);
+         if (rcc != 0)
+         {
+            ClusterDebug(5, _T("ClusterCommand: failed request to peer %d [%s] RCC=%d"),
+               g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString(), rcc);
+            errors++;
+         }
+         delete response;
+      }
+      else
+      {
+         ClusterDebug(5, _T("ClusterCommand: timed out request to peer %d [%s]"),
+            g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+         errors++;
+      }
+   }
+
+   return errors;
+}
+
+/**
+ * Send command to specific node and wait for response
+ *
+ * @return request completion code
+ */
+UINT32 LIBNXCC_EXPORTABLE ClusterSendDirectCommand(UINT32 nodeId, NXCPMessage *msg)
+{
+   NXCPMessage *response = ClusterSendDirectCommandEx(nodeId, msg);
+   if (response == NULL)
+      return NXCC_RCC_TIMEOUT;
+
+   UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
+   if (rcc != 0)
+   {
+      ClusterDebug(5, _T("ClusterDirectCommand: failed request to peer %d: rcc=%d"), nodeId, rcc);
+   }
+   delete response;
+   return rcc;
+}
+
+/**
+ * Send command to specific node and wait for response
+ *
+ * @return request completion code
+ */
+NXCPMessage LIBNXCC_EXPORTABLE *ClusterSendDirectCommandEx(UINT32 nodeId, NXCPMessage *msg)
+{
+   int index = FindClusterNode(nodeId);
+   if (index == -1)
+   {
+      NXCPMessage *response = new NXCPMessage();
+      response->setField(VID_RCC, NXCC_RCC_INVALID_NODE);
+      return response;
+   }
+
+   ClusterNodeInfo *node = &g_nxccNodes[index];
+
+   UINT32 requestId = (UINT32)InterlockedIncrement(&s_commandId);
+   msg->setId(requestId);
+   NXCP_MESSAGE *rawMsg = msg->createMessage();
+
+   bool sent = false;
+   MutexLock(node->m_mutex);
+   if (node->m_socket != INVALID_SOCKET)
+   {
+      if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) > 0)
+      {
+         sent = true;
+      }
+      else
+      {
+         ClusterDebug(5, _T("ClusterDirectCommand: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
+         shutdown(node->m_socket, SHUT_RDWR);
+         node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+         ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+      }
+   }
+   MutexUnlock(node->m_mutex);
+
+   free(rawMsg);
+
+   // Wait for responses
+   NXCPMessage *response;
+   if (sent)
+   {
+      response = node->m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, requestId, g_nxccCommandTimeout);
+   }
+   else
+   {
+      response = new NXCPMessage();
+      response->setField(VID_RCC, NXCC_RCC_COMM_FAILURE);
+   }
+
+   return response;
+}
+
+/**
+ * Send response to cluster peer
+ */
+void LIBNXCC_EXPORTABLE ClusterSendResponse(UINT32 nodeId, UINT32 requestId, UINT32 rcc)
+{
+   int index = FindClusterNode(nodeId);
+   if (index == -1)
+      return;
+
+   ClusterNodeInfo *node = &g_nxccNodes[index];
+
+   NXCPMessage msg;
+   msg.setCode(CMD_REQUEST_COMPLETED);
+   msg.setId(requestId);
+
+   NXCP_MESSAGE *rawMsg = msg.createMessage();
+   MutexLock(node->m_mutex);
+   if (node->m_socket != INVALID_SOCKET)
+   {
+      if (SendEx(node->m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+      {
+         ClusterDebug(5, _T("ClusterSendResponse: send failed for peer %d [%s]"), nodeId, (const TCHAR *)node->m_addr->toString());
+         shutdown(node->m_socket, SHUT_RDWR);
+         node->m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+         ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+      }
+   }
+   MutexUnlock(node->m_mutex);
+   free(rawMsg);
+}
+
+/**
  * Join cluster
  *
  * @return true on successful join
@@ -318,7 +570,7 @@ bool LIBNXCC_EXPORTABLE ClusterJoin()
 
    if (listen(s, SOMAXCONN) == 0)
    {
-      ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)g_nxccListenPort);
+      ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)ntohs(servAddr.sin_port));
    }
    else
    {
@@ -331,9 +583,10 @@ bool LIBNXCC_EXPORTABLE ClusterJoin()
    s_connectorThread = ThreadCreateEx(ClusterConnectorThread, 0, NULL);
    s_keepaliveThread = ThreadCreateEx(ClusterKeepaliveThread, 0, NULL);
 
+   ClusterDebug(1, _T("ClusterJoin: waiting for other nodes"));
    if (ConditionWait(s_joinCondition, 60000))  // wait 1 minute for other nodes to join
    {
-
+      ClusterDebug(1, _T("ClusterJoin: successfully joined"));
    }
    else
    {
@@ -354,33 +607,3 @@ void ClusterDisconnect()
    ThreadJoin(s_connectorThread);
    ThreadJoin(s_keepaliveThread);
 }
-
-/**
- * Send notification to all connected nodes
- */
-void LIBNXCC_EXPORTABLE ClusterNotify(NXCPMessage *msg)
-{
-   NXCP_MESSAGE *rawMsg = msg->createMessage();
-
-   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
-   {
-      if (g_nxccNodes[i].m_id == 0)
-         continue;   // empty slot
-
-      MutexLock(g_nxccNodes[i].m_mutex);
-      if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
-      {
-         if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
-         {
-            ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
-               g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
-            shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
-            g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
-            ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
-         }
-      }
-      MutexUnlock(g_nxccNodes[i].m_mutex);
-   }
-
-   free(rawMsg);
-}
similarity index 52%
copy from src/libnxcc/ceh.cpp
copy to src/libnxcc/join.cpp
index 832147d..5c1bc3c 100644 (file)
@@ -1,4 +1,4 @@
-/* 
+/*
 ** NetXMS - Network Management System
 ** Copyright (C) 2003-2015 Victor Kirhenshtein
 **
 ** along with this program; if not, write to the Free Software
 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 **
-** File: ceh.cpp
+** File: join.cpp
 **
-*/
+**/
 
 #include "libnxcc.h"
 
 /**
- * Constructor
+ * Process joining of single cluster node
  */
-ClusterEventHandler::ClusterEventHandler()
+void ClusterNodeJoin(void *arg)
 {
-}
+   ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
+   ClusterDebug(4, _T("ClusterNodeJoin: requesting join from from node %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
 
-/**
- * Destructor
- */
-ClusterEventHandler::~ClusterEventHandler()
-{
-}
+   NXCPMessage msg;
+   msg.setCode(CMD_JOIN_CLUSTER);
+   msg.setField(VID_NODE_ID, g_nxccNodeId);
+   msg.setField(VID_IS_MASTER, (INT16)(g_nxccMasterNode ? 1 : 0));
 
-/**
- * Node join handler
- */
-void ClusterEventHandler::onNodeJoin(UINT32 nodeId)
-{
+   NXCPMessage *response = ClusterSendDirectCommandEx(node->m_id, &msg);
 }
 
 /**
- * Node disconnect handler
+ * Process join request received from other node
  */
-void ClusterEventHandler::onNodeDisconnect(UINT32 nodeId)
+void ProcessClusterJoinRequest(ClusterNodeInfo *node, NXCPMessage *msg)
 {
-}
+   ClusterDebug(4, _T("ProcessClusterJoinRequest: request from node %d [%s]"), node->m_id, (const TCHAR *)node->m_addr->toString());
 
-/**
- * Shutdown handler
- */
-void ClusterEventHandler::onShutdown()
-{
-}
-
-/**
- * Incoming message handler
- */
-void ClusterEventHandler::onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
-{
 }
index 62e64ae..9b27f16 100644 (file)
@@ -49,6 +49,7 @@ struct ClusterNodeInfo
    bool m_master;
    MUTEX m_mutex;
    THREAD m_receiverThread;
+   MsgWaitQueue *m_msgWaitQueue;
 };
 
 /**
@@ -69,5 +70,7 @@ extern bool g_nxccInitialized;
 extern bool g_nxccMasterNode;
 extern bool g_nxccShutdown;
 extern UINT16 g_nxccListenPort;
+extern UINT32 g_nxccCommandTimeout;
+extern ThreadPool *g_nxccThreadPool;
 
 #endif
index 624cd5b..a6c50df 100644 (file)
@@ -32,6 +32,7 @@ bool g_nxccInitialized = false;
 bool g_nxccMasterNode = false;
 bool g_nxccShutdown = false;
 UINT16 g_nxccListenPort = 47000;
+UINT32 g_nxccCommandTimeout = 500;
 
 /**
  * Other cluster nodes
@@ -39,6 +40,11 @@ UINT16 g_nxccListenPort = 47000;
 ClusterNodeInfo g_nxccNodes[CLUSTER_MAX_NODE_ID];
 
 /**
+ * Cluster thread pool
+ */
+ThreadPool *g_nxccThreadPool;
+
+/**
  * Debug callback
  */
 static void (*s_debugCallback)(int, const TCHAR *, va_list) = NULL;
@@ -90,6 +96,7 @@ static bool AddPeerNode(TCHAR *cfg)
    g_nxccNodes[id].m_addr = new InetAddress(InetAddress::resolveHostName(s));
    g_nxccNodes[id].m_port = (UINT16)(47000 + id);
    g_nxccNodes[id].m_socket = INVALID_SOCKET;
+   g_nxccNodes[id].m_msgWaitQueue = new MsgWaitQueue();
    ClusterDebug(1, _T("ClusterInit: added peer node %d"), id);
    return true;
 }
@@ -100,6 +107,7 @@ static bool AddPeerNode(TCHAR *cfg)
 static TCHAR *s_peerNodeList = NULL;
 static NX_CFG_TEMPLATE s_clusterConfigTemplate[] =
 {
+   { _T("CommandTimeout"), CT_LONG, 0, 0, 0, 0, &g_nxccCommandTimeout, NULL },
    { _T("NodeId"), CT_LONG, 0, 0, 0, 0, &g_nxccNodeId, NULL },
    { _T("PeerNode"), CT_STRING_LIST, '\n', 0, 0, 0, &s_peerNodeList, NULL },
    { _T(""), CT_END_OF_LIST, 0, 0, 0, 0, NULL, NULL }
@@ -116,6 +124,8 @@ bool LIBNXCC_EXPORTABLE ClusterInit(Config *config, const TCHAR *section, Cluste
    if ((g_nxccNodeId < 1) || (g_nxccNodeId > CLUSTER_MAX_NODE_ID))
       return false;
 
+   g_nxccThreadPool = ThreadPoolCreate(1, 16, _T("CLUSTER"));
+
    memset(g_nxccNodes, 0, sizeof(g_nxccNodes));
    for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
    {
@@ -159,6 +169,12 @@ void LIBNXCC_EXPORTABLE ClusterShutdown()
 
    g_nxccShutdown = true;
    ClusterDisconnect();
+
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+   {
+      MutexDestroy(g_nxccNodes[i].m_mutex);
+      delete g_nxccNodes[i].m_msgWaitQueue;
+   }
 }
 
 /**
index 2bdbbdb..c4ff3f4 100644 (file)
@@ -4,6 +4,7 @@
 #include <testtools.h>
 
 static MUTEX cbLock = MutexCreate();
+static UINT32 s_nodeId;
 
 static void DebugCallback(int level, const TCHAR *format, va_list args)
 {
@@ -34,12 +35,32 @@ public:
       _tprintf(_T("** cluster shutdown\n"));
    }
 
-   virtual void onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
+   virtual bool onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
    {
-
+      if (msg->getCode() == 111)
+      {
+         ClusterSendResponse(sourceNodeId, msg->getId(), NXCC_RCC_SUCCESS);
+      }
+      return false;
    }
 };
 
+static void TestCommand()
+{
+   NXCPMessage msg;
+   msg.setCode(111);
+   int e = ClusterSendCommand(&msg);
+   _tprintf(_T("TestCommand: %d errors\n"), e);
+}
+
+static void TestDirectCommand()
+{
+   NXCPMessage msg;
+   msg.setCode(111);
+   UINT32 rcc = ClusterSendDirectCommand(s_nodeId == 1 ? 2 : 1, &msg);
+   _tprintf(_T("TestDirectCommand: rcc=%d\n"), rcc);
+}
+
 /**
  * main()
  */
@@ -56,11 +77,11 @@ int main(int argc, char *argv[])
    WSAStartup(MAKEWORD(2, 2), &wsaData);
 #endif
 
-   UINT32 nodeId = strtoul(argv[1], NULL, 0);
+   s_nodeId = strtoul(argv[1], NULL, 0);
 
    Config *config = new Config();
-   config->setValue(_T("/CLUSTER/NodeId"), nodeId);
-   config->setValue(_T("/CLUSTER/PeerNode"), (nodeId == 1) ? _T("2:127.0.0.1") : _T("1:127.0.0.1"));
+   config->setValue(_T("/CLUSTER/NodeId"), s_nodeId);
+   config->setValue(_T("/CLUSTER/PeerNode"), (s_nodeId == 1) ? _T("2:127.0.0.1") : _T("1:127.0.0.1"));
 
    ClusterSetDebugCallback(DebugCallback);
    ClusterInit(config, _T("CLUSTER"), new EventHandler());
@@ -68,6 +89,11 @@ int main(int argc, char *argv[])
    ClusterJoin();
    _tprintf(_T("CLUSTER RUNNING\n"));
 
+   ThreadSleep(1);
+
+   TestCommand();
+   TestDirectCommand();
+
    ThreadSleep(60);
 
    ClusterShutdown();