added cluster communication library (unfinished, used by SH)
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 3 Sep 2015 12:22:27 +0000 (15:22 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 3 Sep 2015 12:22:27 +0000 (15:22 +0300)
17 files changed:
configure.ac
include/Makefile.am
include/nxcc.h [new file with mode: 0644]
src/libnxcc/.project [new file with mode: 0644]
src/libnxcc/.settings/language.settings.xml [new file with mode: 0644]
src/libnxcc/Makefile.am [new file with mode: 0644]
src/libnxcc/ceh.cpp [new file with mode: 0644]
src/libnxcc/comm.cpp [new file with mode: 0644]
src/libnxcc/libnxcc.h [new file with mode: 0644]
src/libnxcc/main.cpp [new file with mode: 0644]
src/server/drivers/catalyst/catalyst.cpp
tests/Makefile.am
tests/test-libnxcc/.project [new file with mode: 0644]
tests/test-libnxcc/.settings/language.settings.xml [new file with mode: 0644]
tests/test-libnxcc/Makefile.am [copied from tests/Makefile.am with 64% similarity]
tests/test-libnxcc/test-libnxcc [new file with mode: 0755]
tests/test-libnxcc/test-libnxcc.cpp [new file with mode: 0644]

index fbbc925..f9230d1 100644 (file)
@@ -4,7 +4,7 @@
 # Configure script
 #
 
-AC_INIT([NetXMS], [2.0-RC1], [bugs@netxms.org])
+AC_INIT([NetXMS], [2.0-RC2], [bugs@netxms.org])
 AC_CONFIG_AUX_DIR(config)
 AC_CONFIG_HEADERS(config.h)
 AM_INIT_AUTOMAKE
@@ -515,7 +515,7 @@ AC_ARG_ENABLE(64bit,
 AC_ARG_WITH(dist,
 [AS_HELP_STRING(--with-dist,for maintainers only)],
        DB_DRIVERS=" mysql pgsql odbc mssql sqlite oracle db2 informix"
-       MODULES="jansson libexpat libstrophe libtre zlib libnetxms install sqlite snmp libnxsl libnxmb libnxlp db server agent libnxmap client nxscript nxcproxy nxlptest tools"
+       MODULES="jansson libexpat libstrophe libtre zlib libnetxms install sqlite snmp libnxsl libnxmb libnxlp libnxcc db server agent libnxmap client nxscript nxcproxy nxlptest tools"
        SUBAGENT_DIRS="linux ds18x20 freebsd openbsd netbsd sunos aix ipso hpux odbcquery informix oracle lmsensors darwin rpi java ubntlw netsvc db2 tuxedo mongodb"
        SMSDRV_DIRS="kannel websms"
    HDLINK_DIRS="jira"
@@ -716,7 +716,7 @@ if test $? = 0; then
                AC_MSG_ERROR(You must select at least one database driver when building server.)
        fi
        BUILD_SERVER="yes"
-       MODULES="$MODULES libnxmap libnxsl server nxscript"
+       MODULES="$MODULES libnxmap libnxsl libnxcc server nxscript"
        TOP_LEVEL_MODULES="$TOP_LEVEL_MODULES sql images"
        CONTRIB_MODULES="$CONTRIB_MODULES mibs backgrounds music"
        if test "x$XMPP_SUPPORT" = "xyes"; then
@@ -799,7 +799,7 @@ fi
 
 check_substr "$COMPONENTS" "sdk"
 if test $? = 0; then
-       MODULES="$MODULES libnxmap client libnxsl libnxmb libnxlp db nxscript"
+       MODULES="$MODULES libnxmap client libnxsl libnxmb libnxlp libnxcc db nxscript"
 fi
 
 
@@ -2942,6 +2942,7 @@ AC_CONFIG_FILES([
        src/libexpat/Makefile
        src/libexpat/libexpat/Makefile
        src/libnetxms/Makefile
+       src/libnxcc/Makefile
        src/libnxlp/Makefile
        src/libnxmap/Makefile
        src/libnxmb/Makefile
@@ -3018,6 +3019,7 @@ AC_CONFIG_FILES([
        tests/Makefile
        tests/include/Makefile
        tests/test-libnetxms/Makefile
+       tests/test-libnxcc/Makefile
        tests/test-libnxdb/Makefile
        tools/Makefile
 ])
index be69d50..8b99aac 100644 (file)
@@ -31,6 +31,7 @@ include_HEADERS = \
        nms_threads.h \
        nms_util.h \
        nxappc_internal.h \
+       nxcc.h \
        nxcldefs.h \
        nxclient.h \
        nxclobj.h \
diff --git a/include/nxcc.h b/include/nxcc.h
new file mode 100644 (file)
index 0000000..925ceb6
--- /dev/null
@@ -0,0 +1,57 @@
+#ifndef _nxcc_h_
+#define _nxcc_h_
+
+#ifdef _WIN32
+#ifdef LIBNXCC_EXPORTS
+#define LIBNXCC_EXPORTABLE __declspec(dllexport)
+#else
+#define LIBNXCC_EXPORTABLE __declspec(dllimport)
+#endif
+#else    /* _WIN32 */
+#define LIBNXCC_EXPORTABLE
+#endif
+
+#include <nms_common.h>
+#include <nms_util.h>
+#include <nms_threads.h>
+#include <nxcpapi.h>
+#include <nxconfig.h>
+
+/**
+ * Cluster node states
+ */
+enum ClusterNodeState
+{
+   CLUSTER_NODE_DOWN = 0,
+   CLUSTER_NODE_CONNECTED = 1,
+   CLUSTER_NODE_UP = 2
+};
+
+/**
+ * Cluster node event handler
+ */
+class ClusterEventHandler
+{
+public:
+   ClusterEventHandler();
+   virtual ~ClusterEventHandler();
+   
+   virtual void onNodeJoin(UINT32 nodeId);
+   virtual void onNodeDisconnect(UINT32 nodeId);
+   virtual void onShutdown();
+   
+   virtual void onMessage(NXCPMessage *msg, UINT32 sourceNodeId);
+};
+
+/**
+ * API functions
+ */
+bool LIBNXCC_EXPORTABLE ClusterInit(Config *config, const TCHAR *section, ClusterEventHandler *eventHandler);
+bool LIBNXCC_EXPORTABLE ClusterJoin();
+void LIBNXCC_EXPORTABLE ClusterShutdown();
+
+void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, va_list));
+
+bool LIBNXCC_EXPORTABLE ClusterIsMasterNode();
+
+#endif
diff --git a/src/libnxcc/.project b/src/libnxcc/.project
new file mode 100644 (file)
index 0000000..53a5f73
--- /dev/null
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+       <name>libnxcc</name>
+       <comment></comment>
+       <projects>
+       </projects>
+       <buildSpec>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
+                       <triggers>clean,full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.ScannerConfigBuilder</name>
+                       <triggers>full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+       </buildSpec>
+       <natures>
+               <nature>org.eclipse.cdt.core.cnature</nature>
+               <nature>org.eclipse.cdt.core.ccnature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
+       </natures>
+       <filteredResources>
+               <filter>
+                       <id>1441204968565</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.la</arguments>
+                       </matcher>
+               </filter>
+               <filter>
+                       <id>1441204968566</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.lo</arguments>
+                       </matcher>
+               </filter>
+               <filter>
+                       <id>1441204968568</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.o</arguments>
+                       </matcher>
+               </filter>
+       </filteredResources>
+</projectDescription>
diff --git a/src/libnxcc/.settings/language.settings.xml b/src/libnxcc/.settings/language.settings.xml
new file mode 100644 (file)
index 0000000..7cda2be
--- /dev/null
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<project>
+       <configuration id="cdt.managedbuild.toolchain.gnu.base.607754065" name="Default">
+               <extension point="org.eclipse.cdt.core.LanguageSettingsProvider">
+                       <provider class="org.eclipse.cdt.core.language.settings.providers.LanguageSettingsGenericProvider" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider" name="CDT User Setting Entries" prefer-non-shared="true"/>
+                       <provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
+                       <provider copy-of="extension" id="org.eclipse.cdt.managedbuilder.core.GCCBuildCommandParser"/>
+                       <provider-reference id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" ref="shared-provider"/>
+                       <provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
+               </extension>
+       </configuration>
+</project>
diff --git a/src/libnxcc/Makefile.am b/src/libnxcc/Makefile.am
new file mode 100644 (file)
index 0000000..08e156b
--- /dev/null
@@ -0,0 +1,12 @@
+SOURCES = ceh.cpp comm.cpp main.cpp
+
+lib_LTLIBRARIES = libnxcc.la
+
+libnxcc_la_SOURCES = $(SOURCES)
+libnxcc_la_CPPFLAGS=-I@top_srcdir@/include
+libnxcc_la_LDFLAGS = -version-info $(NETXMS_LIBRARY_VERSION)
+libnxcc_la_LIBADD = @top_srcdir@/src/libnetxms/libnetxms.la
+
+EXTRA_DIST = \
+       libnxcc.vcproj \
+       libnxcc.h
diff --git a/src/libnxcc/ceh.cpp b/src/libnxcc/ceh.cpp
new file mode 100644 (file)
index 0000000..aeb23ae
--- /dev/null
@@ -0,0 +1,43 @@
+#include "libnxcc.h"
+
+/**
+ * Constructor
+ */
+ClusterEventHandler::ClusterEventHandler()
+{
+}
+
+/**
+ * Destructor
+ */
+ClusterEventHandler::~ClusterEventHandler()
+{
+}
+
+/**
+ * Node join handler
+ */
+void ClusterEventHandler::onNodeJoin(UINT32 nodeId)
+{
+}
+
+/**
+ * Node disconnect handler
+ */
+void ClusterEventHandler::onNodeDisconnect(UINT32 nodeId)
+{
+}
+
+/**
+ * Shutdown handler
+ */
+void ClusterEventHandler::onShutdown()
+{
+}
+
+/**
+ * Incoming message handler
+ */
+void ClusterEventHandler::onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
+{
+}
diff --git a/src/libnxcc/comm.cpp b/src/libnxcc/comm.cpp
new file mode 100644 (file)
index 0000000..e753df9
--- /dev/null
@@ -0,0 +1,334 @@
+#include "libnxcc.h"
+
+/**
+ * Keepalive interval
+ */
+#define KEEPALIVE_INTERVAL    200
+
+/**
+ * Thread handles
+ */
+static THREAD s_listenerThread = INVALID_THREAD_HANDLE;
+static THREAD s_connectorThread = INVALID_THREAD_HANDLE;
+static THREAD s_keepaliveThread = INVALID_THREAD_HANDLE;
+
+/**
+ * Join condition
+ */
+static CONDITION s_joinCondition = ConditionCreate(TRUE);
+
+/**
+ * Change cluster node state
+ */
+static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state);
+
+/**
+ * Node receiver thread
+ */
+static THREAD_RESULT THREAD_CALL ClusterReceiverThread(void *arg)
+{
+   ClusterNodeInfo *node = (ClusterNodeInfo *)arg;
+   SOCKET s = node->m_socket;
+   ClusterDebug(5, _T("Receiver thread started for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
+
+   SocketMessageReceiver receiver(s, 8192, 4194304);
+
+   while(!g_nxccShutdown)
+   {
+      MutexLock(node->m_mutex);
+      SOCKET cs = node->m_socket;
+      MutexUnlock(node->m_mutex);
+
+      if (cs != s)
+         break;   // socket was changed
+
+      MessageReceiverResult result;
+      NXCPMessage *msg = receiver.readMessage(KEEPALIVE_INTERVAL * 3, &result);
+      if (msg != NULL)
+      {
+         g_nxccEventHandler->onMessage(msg, node->m_id);
+         delete msg;
+      }
+      else
+      {
+         ClusterDebug(5, _T("Receiver error for cluster node %d [%s] on socket %d: %s"),
+            node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s, AbstractMessageReceiver::resultToText(result));
+         MutexLock(node->m_mutex);
+         if (node->m_socket == s)
+         {
+            shutdown(s, SHUT_RDWR);
+            node->m_socket = INVALID_SOCKET;
+            ChangeClusterNodeState(node, CLUSTER_NODE_DOWN);
+         }
+         MutexUnlock(node->m_mutex);
+      }
+   }
+
+   closesocket(s);
+   ClusterDebug(5, _T("Receiver thread stopped for cluster node %d [%s] on socket %d"), node->m_id, (const TCHAR *)node->m_addr->toString(), (int)s);
+   return THREAD_OK;
+}
+
+/**
+ * Find cluster node by IP
+ */
+static int FindClusterNode(const InetAddress& addr)
+{
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      if ((g_nxccNodes[i].m_id != 0) && g_nxccNodes[i].m_addr->equals(addr))
+         return i;
+   return -1;
+}
+
+/**
+ * Change cluster node state
+ */
+static void ChangeClusterNodeState(ClusterNodeInfo *node, ClusterNodeState state)
+{
+   static const TCHAR *stateNames[] = { _T("DOWN"), _T("CONNECTED"), _T("UP") };
+
+   if (node->m_state == state)
+      return;
+
+   node->m_state = state;
+   ClusterDebug(1, _T("Cluster node %d [%s] changed state to %s"), node->m_id, (const TCHAR *)node->m_addr->toString(), stateNames[state]);
+   switch(state)
+   {
+      case CLUSTER_NODE_CONNECTED:
+         node->m_receiverThread = ThreadCreateEx(ClusterReceiverThread, 0, node);
+         break;
+      case CLUSTER_NODE_DOWN:
+         ThreadJoin(node->m_receiverThread);
+         node->m_receiverThread = INVALID_THREAD_HANDLE;
+         g_nxccEventHandler->onNodeDisconnect(node->m_id);
+         break;
+      case CLUSTER_NODE_UP:
+         g_nxccEventHandler->onNodeJoin(node->m_id);
+         break;
+   }
+}
+
+/**
+ * Listener thread
+ */
+static THREAD_RESULT THREAD_CALL ClusterListenerThread(void *arg)
+{
+   SOCKET s = CAST_FROM_POINTER(arg, SOCKET);
+   while(!g_nxccShutdown)
+   {
+      struct timeval tv;
+      tv.tv_sec = 1;
+      tv.tv_usec = 0;
+
+      fd_set rdfs;
+      FD_ZERO(&rdfs);
+      FD_SET(s, &rdfs);
+      int rc = select(SELECT_NFDS(s + 1), &rdfs, NULL, NULL, &tv);
+      if ((rc > 0) && !g_nxccShutdown)
+      {
+         char clientAddr[128];
+         socklen_t size = 128;
+         SOCKET in = accept(s, (struct sockaddr *)clientAddr, &size);
+         if (in == INVALID_SOCKET)
+         {
+            ClusterDebug(5, _T("ClusterListenerThread: accept() failure"));
+            continue;
+         }
+
+#ifndef _WIN32
+         fcntl(in, F_SETFD, fcntl(in, F_GETFD) | FD_CLOEXEC);
+#endif
+
+         InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
+         ClusterDebug(5, _T("Incoming connection from %s"), (const TCHAR *)addr.toString());
+
+         int idx = FindClusterNode(addr);
+         if (idx == -1)
+         {
+            ClusterDebug(5, _T("ClusterListenerThread: incoming connection rejected (unknown IP address)"));
+            closesocket(in);
+            continue;
+         }
+
+         MutexLock(g_nxccNodes[idx].m_mutex);
+         if (g_nxccNodes[idx].m_socket == INVALID_SOCKET)
+         {
+            g_nxccNodes[idx].m_socket = in;
+            ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
+               g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
+            ChangeClusterNodeState(&g_nxccNodes[idx], CLUSTER_NODE_CONNECTED);
+         }
+         else
+         {
+            ClusterDebug(5, _T("Cluster connection from peer %d [%s] discarded because connection already present"),
+               g_nxccNodes[idx].m_id, (const TCHAR *)g_nxccNodes[idx].m_addr->toString());
+            closesocket(s);
+         }
+         MutexUnlock(g_nxccNodes[idx].m_mutex);
+      }
+   }
+
+   closesocket(s);
+   ClusterDebug(1, _T("Cluster listener thread stopped"));
+   return THREAD_OK;
+}
+
+/**
+ * Connector thread
+ */
+static THREAD_RESULT THREAD_CALL ClusterConnectorThread(void *arg)
+{
+   ClusterDebug(1, _T("Cluster connector thread started"));
+
+   while(!g_nxccShutdown)
+   {
+      ThreadSleepMs(500);
+      if (g_nxccShutdown)
+         break;
+
+      for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      {
+         MutexLock(g_nxccNodes[i].m_mutex);
+         if ((g_nxccNodes[i].m_id != 0) && (g_nxccNodes[i].m_socket == INVALID_SOCKET))
+         {
+            MutexUnlock(g_nxccNodes[i].m_mutex);
+            SOCKET s = ConnectToHost(*g_nxccNodes[i].m_addr, g_nxccNodes[i].m_port, 500);
+            MutexLock(g_nxccNodes[i].m_mutex);
+            if (s != INVALID_SOCKET)
+            {
+               if (g_nxccNodes[i].m_socket == INVALID_SOCKET)
+               {
+                  g_nxccNodes[i].m_socket = s;
+                  ClusterDebug(5, _T("Cluster peer node %d [%s] connected"),
+                     g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+                  ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_CONNECTED);
+               }
+               else
+               {
+                  ClusterDebug(5, _T("Cluster connection established with peer %d [%s] but discarded because connection already present"),
+                     g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+                  closesocket(s);
+               }
+            }
+         }
+         MutexUnlock(g_nxccNodes[i].m_mutex);
+      }
+   }
+
+   ClusterDebug(1, _T("Cluster connector thread stopped"));
+   return THREAD_OK;
+}
+
+/**
+ * Cluster keepalive thread
+ */
+static THREAD_RESULT THREAD_CALL ClusterKeepaliveThread(void *arg)
+{
+   ClusterDebug(1, _T("Cluster keepalive thread started"));
+
+   NXCPMessage msg;
+   msg.setCode(CMD_KEEPALIVE);
+   msg.setField(VID_NODE_ID, g_nxccNodeId);
+   NXCP_MESSAGE *rawMsg = msg.createMessage();
+
+   while(!g_nxccShutdown)
+   {
+      ThreadSleepMs(KEEPALIVE_INTERVAL);
+      if (g_nxccShutdown)
+         break;
+
+      for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+      {
+         if (g_nxccNodes[i].m_id == 0)
+            continue;   // empty slot
+
+         MutexLock(g_nxccNodes[i].m_mutex);
+         if (g_nxccNodes[i].m_socket != INVALID_SOCKET)
+         {
+            if (SendEx(g_nxccNodes[i].m_socket, rawMsg, ntohl(rawMsg->size), 0, NULL) <= 0)
+            {
+               ClusterDebug(5, _T("ClusterKeepaliveThread: send failed for peer %d [%s]"),
+                  g_nxccNodes[i].m_id, (const TCHAR *)g_nxccNodes[i].m_addr->toString());
+               shutdown(g_nxccNodes[i].m_socket, SHUT_RDWR);
+               g_nxccNodes[i].m_socket = INVALID_SOCKET; // current socket will be closed by receiver
+               ChangeClusterNodeState(&g_nxccNodes[i], CLUSTER_NODE_DOWN);
+            }
+         }
+         MutexUnlock(g_nxccNodes[i].m_mutex);
+      }
+   }
+
+   ClusterDebug(1, _T("Cluster keepalive thread stopped"));
+   return THREAD_OK;
+}
+
+/**
+ * Join cluster
+ *
+ * @return true on successful join
+ */
+bool LIBNXCC_EXPORTABLE ClusterJoin()
+{
+   if (!g_nxccInitialized)
+      return false;
+
+   SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
+   if (s == INVALID_SOCKET)
+   {
+      ClusterDebug(1, _T("ClusterJoin: cannot create socket"));
+      return false;
+   }
+
+   SetSocketExclusiveAddrUse(s);
+   SetSocketReuseFlag(s);
+
+   struct sockaddr_in servAddr;
+   memset(&servAddr, 0, sizeof(struct sockaddr_in));
+   servAddr.sin_family = AF_INET;
+   servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+   servAddr.sin_port = htons((UINT16)(47000 + g_nxccNodeId));
+   if (bind(s, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
+   {
+      ClusterDebug(1, _T("ClusterJoin: cannot bind listening socket (%s)"), _tcserror(WSAGetLastError()));
+      closesocket(s);
+      return false;
+   }
+
+   if (listen(s, SOMAXCONN) == 0)
+   {
+      ClusterDebug(1, _T("ClusterJoin: listening on port %d"), (int)g_nxccListenPort);
+   }
+   else
+   {
+      ClusterDebug(1, _T("ClusterJoin: listen() failed (%s)"), _tcserror(WSAGetLastError()));
+      closesocket(s);
+      return false;
+   }
+
+   s_listenerThread = ThreadCreateEx(ClusterListenerThread, 0, CAST_TO_POINTER(s, void *));
+   s_connectorThread = ThreadCreateEx(ClusterConnectorThread, 0, NULL);
+   s_keepaliveThread = ThreadCreateEx(ClusterKeepaliveThread, 0, NULL);
+
+   if (ConditionWait(s_joinCondition, 60000))  // wait 1 minute for other nodes to join
+   {
+
+   }
+   else
+   {
+      // no other nodes, declare self as master
+      g_nxccMasterNode = true;
+      ClusterDebug(1, _T("ClusterJoin: cannot contact other nodes, declaring self as master"));
+   }
+
+   return true;
+}
+
+/**
+ * Disconnect all sockets
+ */
+void ClusterDisconnect()
+{
+   ThreadJoin(s_listenerThread);
+   ThreadJoin(s_connectorThread);
+   ThreadJoin(s_keepaliveThread);
+}
diff --git a/src/libnxcc/libnxcc.h b/src/libnxcc/libnxcc.h
new file mode 100644 (file)
index 0000000..8bbc1ae
--- /dev/null
@@ -0,0 +1,51 @@
+#ifndef _libnxcc_h_
+#define _libnxcc_h_
+
+#include <nxcc.h>
+
+/**
+ * Max node ID
+ */
+#define CLUSTER_MAX_NODE_ID   32
+
+/**
+ * Max sockets per node
+ */
+#define CLUSTER_MAX_SOCKETS   16
+
+/**
+ * Node information
+ */
+struct ClusterNodeInfo
+{
+   UINT32 m_id;
+   InetAddress *m_addr;
+   UINT16 m_port;
+   SOCKET m_socket;
+   THREAD m_thread;
+   ClusterNodeState m_state;
+   bool m_master;
+   MUTEX m_mutex;
+   THREAD m_receiverThread;
+};
+
+/**
+ * Internal functions
+ */
+void ClusterDebug(int level, const TCHAR *format, ...);
+
+void ClusterDisconnect();
+
+/**
+ * Global cluster node settings
+ */
+extern UINT32 g_nxccNodeId;
+extern ClusterEventHandler *g_nxccEventHandler;
+extern ClusterNodeState g_nxccState;
+extern ClusterNodeInfo g_nxccNodes[CLUSTER_MAX_NODE_ID];
+extern bool g_nxccInitialized;
+extern bool g_nxccMasterNode;
+extern bool g_nxccShutdown;
+extern UINT16 g_nxccListenPort;
+
+#endif
diff --git a/src/libnxcc/main.cpp b/src/libnxcc/main.cpp
new file mode 100644 (file)
index 0000000..3fc08ac
--- /dev/null
@@ -0,0 +1,165 @@
+#include "libnxcc.h"
+
+/**
+ * Global cluster node settings
+ */
+UINT32 g_nxccNodeId = 0;
+ClusterEventHandler *g_nxccEventHandler = NULL;
+ClusterNodeState g_nxccState = CLUSTER_NODE_DOWN;
+bool g_nxccInitialized = false;
+bool g_nxccMasterNode = false;
+bool g_nxccShutdown = false;
+UINT16 g_nxccListenPort = 47000;
+
+/**
+ * Other cluster nodes
+ */
+ClusterNodeInfo g_nxccNodes[CLUSTER_MAX_NODE_ID];
+
+/**
+ * Debug callback
+ */
+static void (*s_debugCallback)(int, const TCHAR *, va_list) = NULL;
+
+/**
+ * Set debug callback
+ */
+void LIBNXCC_EXPORTABLE ClusterSetDebugCallback(void (*cb)(int, const TCHAR *, va_list))
+{
+   s_debugCallback = cb;
+}
+
+/**
+ * Debug output
+ */
+void ClusterDebug(int level, const TCHAR *format, ...)
+{
+   if (s_debugCallback == NULL)
+      return;
+
+   va_list args;
+   va_start(args, format);
+   s_debugCallback(level, format, args);
+   va_end(args);
+}
+
+/**
+ * Add cluster peer node from config
+ */
+static bool AddPeerNode(TCHAR *cfg)
+{
+   TCHAR *s = _tcschr(cfg, _T(':'));
+   if (s == NULL)
+   {
+      ClusterDebug(1, _T("ClusterInit: invalid peer node configuration record \"%s\""), cfg);
+      return false;
+   }
+
+   *s = 0;
+   s++;
+   UINT32 id = _tcstol(cfg, NULL, 0);
+   if ((id < 1) || (id > CLUSTER_MAX_NODE_ID) || (id == g_nxccNodeId))
+   {
+      ClusterDebug(1, _T("ClusterInit: invalid peer node ID %d"), id);
+      return false;
+   }
+
+   g_nxccNodes[id].m_id = id;
+   g_nxccNodes[id].m_addr = new InetAddress(InetAddress::resolveHostName(s));
+   g_nxccNodes[id].m_port = (UINT16)(47000 + id);
+   g_nxccNodes[id].m_socket = INVALID_SOCKET;
+   ClusterDebug(1, _T("ClusterInit: added peer node %d"), id);
+   return true;
+}
+
+/**
+ * Cluster configuration template
+ */
+static TCHAR *s_peerNodeList = NULL;
+static NX_CFG_TEMPLATE s_clusterConfigTemplate[] =
+{
+   { _T("NodeId"), CT_LONG, 0, 0, 0, 0, &g_nxccNodeId, NULL },
+   { _T("PeerNode"), CT_STRING_LIST, '\n', 0, 0, 0, &s_peerNodeList, NULL },
+   { _T(""), CT_END_OF_LIST, 0, 0, 0, 0, NULL, NULL }
+};
+
+/**
+ * Init cluster node
+ */
+bool LIBNXCC_EXPORTABLE ClusterInit(Config *config, const TCHAR *section, ClusterEventHandler *eventHandler)
+{
+   if (!config->parseTemplate(section, s_clusterConfigTemplate))
+      return false;
+
+   if ((g_nxccNodeId < 1) || (g_nxccNodeId > CLUSTER_MAX_NODE_ID))
+      return false;
+
+   memset(g_nxccNodes, 0, sizeof(g_nxccNodes));
+   for(int i = 0; i < CLUSTER_MAX_NODE_ID; i++)
+   {
+      g_nxccNodes[i].m_mutex = MutexCreate();
+      g_nxccNodes[i].m_socket = INVALID_SOCKET;
+      g_nxccNodes[i].m_receiverThread = INVALID_THREAD_HANDLE;
+   }
+
+   if (s_peerNodeList != NULL)
+   {
+      TCHAR *curr, *next;
+      for(curr = next = s_peerNodeList; next != NULL && (*curr != 0); curr = next + 1)
+      {
+         next = _tcschr(curr, _T('\n'));
+         if (next != NULL)
+            *next = 0;
+         StrStrip(curr);
+         if (!AddPeerNode(curr))
+         {
+            free(s_peerNodeList);
+            s_peerNodeList = NULL;
+            return false;
+         }
+      }
+      free(s_peerNodeList);
+      s_peerNodeList = NULL;
+   }
+
+   g_nxccEventHandler = eventHandler;
+   g_nxccInitialized = true;
+   return true;
+}
+
+/**
+ * Shutdown cluster
+ */
+void LIBNXCC_EXPORTABLE ClusterShutdown()
+{
+   if (!g_nxccInitialized || g_nxccShutdown)
+      return;
+
+   g_nxccShutdown = true;
+   ClusterDisconnect();
+}
+
+/**
+ * Check if this node is master node
+ */
+bool LIBNXCC_EXPORTABLE ClusterIsMasterNode()
+{
+   return g_nxccMasterNode;
+}
+
+#ifdef _WIN32
+
+/**
+ * DLL entry point
+ */
+BOOL WINAPI DllMain(HINSTANCE hInstance, DWORD dwReason, LPVOID lpReserved)
+{
+   if (dwReason == DLL_PROCESS_ATTACH)
+   {
+      DisableThreadLibraryCalls(hInstance);
+      SEHInit();
+   }
+   return TRUE;
+}
+
+#endif   /* _WIN32 */
index 1e8df8c..f8b9213 100644 (file)
@@ -120,7 +120,6 @@ InterfaceList *CatalystDriver::getInterfaces(SNMP_Transport *snmp, StringMap *at
  */
 DECLARE_NDD_ENTRY_POINT(s_driverName, CatalystDriver);
 
-
 /**
  * DLL entry point
  */
index dabdeba..0825584 100644 (file)
@@ -8,4 +8,4 @@
 # WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
 # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 
-SUBDIRS = include test-libnetxms test-libnxdb
+SUBDIRS = include test-libnetxms test-libnxdb test-libnxcc
diff --git a/tests/test-libnxcc/.project b/tests/test-libnxcc/.project
new file mode 100644 (file)
index 0000000..350df4e
--- /dev/null
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+       <name>test-libnxcc</name>
+       <comment></comment>
+       <projects>
+       </projects>
+       <buildSpec>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
+                       <triggers>clean,full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.ScannerConfigBuilder</name>
+                       <triggers>full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+       </buildSpec>
+       <natures>
+               <nature>org.eclipse.cdt.core.cnature</nature>
+               <nature>org.eclipse.cdt.core.ccnature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
+       </natures>
+       <filteredResources>
+               <filter>
+                       <id>1441271991719</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.o</arguments>
+                       </matcher>
+               </filter>
+       </filteredResources>
+</projectDescription>
diff --git a/tests/test-libnxcc/.settings/language.settings.xml b/tests/test-libnxcc/.settings/language.settings.xml
new file mode 100644 (file)
index 0000000..e055f78
--- /dev/null
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<project>
+       <configuration id="cdt.managedbuild.toolchain.gnu.base.895971687" name="Default">
+               <extension point="org.eclipse.cdt.core.LanguageSettingsProvider">
+                       <provider class="org.eclipse.cdt.core.language.settings.providers.LanguageSettingsGenericProvider" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider" name="CDT User Setting Entries" prefer-non-shared="true"/>
+                       <provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
+                       <provider copy-of="extension" id="org.eclipse.cdt.managedbuilder.core.GCCBuildCommandParser"/>
+                       <provider-reference id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" ref="shared-provider"/>
+                       <provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
+               </extension>
+       </configuration>
+</project>
similarity index 64%
copy from tests/Makefile.am
copy to tests/test-libnxcc/Makefile.am
index dabdeba..1c65943 100644 (file)
@@ -8,4 +8,9 @@
 # WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
 # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 
-SUBDIRS = include test-libnetxms test-libnxdb
+bin_PROGRAMS = test-libnxcc
+test_libnxcc_SOURCES = test-libnxcc.cpp
+test_libnxcc_CPPFLAGS = -I@top_srcdir@/include -I../include
+test_libnxcc_LDADD = @top_srcdir@/src/libnxcc/libnxcc.la @top_srcdir@/src/libnetxms/libnetxms.la
+
+EXTRA_DIST = test-libnxcc.vcproj
diff --git a/tests/test-libnxcc/test-libnxcc b/tests/test-libnxcc/test-libnxcc
new file mode 100755 (executable)
index 0000000..3944edf
--- /dev/null
@@ -0,0 +1,228 @@
+#! /bin/bash
+
+# test-libnxcc - temporary wrapper script for .libs/test-libnxcc
+# Generated by libtool (GNU libtool) 2.4.2 Debian-2.4.2-1.7ubuntu1
+#
+# The test-libnxcc program cannot be directly executed until all the libtool
+# libraries that it depends on are installed.
+#
+# This wrapper script should never be moved out of the build directory.
+# If it is, it will not operate correctly.
+
+# Sed substitution that helps us do robust quoting.  It backslashifies
+# metacharacters that are still active within double-quoted strings.
+sed_quote_subst='s/\([`"$\\]\)/\\\1/g'
+
+# Be Bourne compatible
+if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then
+  emulate sh
+  NULLCMD=:
+  # Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which
+  # is contrary to our usage.  Disable this feature.
+  alias -g '${1+"$@"}'='"$@"'
+  setopt NO_GLOB_SUBST
+else
+  case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac
+fi
+BIN_SH=xpg4; export BIN_SH # for Tru64
+DUALCASE=1; export DUALCASE # for MKS sh
+
+# The HP-UX ksh and POSIX shell print the target directory to stdout
+# if CDPATH is set.
+(unset CDPATH) >/dev/null 2>&1 && unset CDPATH
+
+relink_command="(cd /home/victor/Source/NetXMS/tests/test-libnxcc/; { test -z \"\${LIBRARY_PATH+set}\" || unset LIBRARY_PATH || { LIBRARY_PATH=; export LIBRARY_PATH; }; }; { test -z \"\${COMPILER_PATH+set}\" || unset COMPILER_PATH || { COMPILER_PATH=; export COMPILER_PATH; }; }; { test -z \"\${GCC_EXEC_PREFIX+set}\" || unset GCC_EXEC_PREFIX || { GCC_EXEC_PREFIX=; export GCC_EXEC_PREFIX; }; }; { test -z \"\${LD_RUN_PATH+set}\" || unset LD_RUN_PATH || { LD_RUN_PATH=; export LD_RUN_PATH; }; }; { test -z \"\${LD_LIBRARY_PATH+set}\" || unset LD_LIBRARY_PATH || { LD_LIBRARY_PATH=; export LD_LIBRARY_PATH; }; }; PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/instantclient_12_1; export PATH; g++ -g -O2 -fno-rtti -fno-exceptions -o \$progdir/\$file test_libnxcc-test-libnxcc.o  -L/usr/local/lib ../../src/libnxcc/.libs/libnxcc.so ../../src/libnetxms/.libs/libnetxms.so -lz -lresolv -ldl -lpthread -lcrypto -Wl,-rpath -Wl,/home/victor/Source/NetXMS/src/libnxcc/.libs -Wl,-rpath -Wl,/home/victor/Source/NetXMS/src/libnetxms/.libs -Wl,-rpath -Wl,/opt/stronghold/lib)"
+
+# This environment variable determines our operation mode.
+if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then
+  # install mode needs the following variables:
+  generated_by_libtool_version='2.4.2'
+  notinst_deplibs=' ../../src/libnxcc/libnxcc.la ../../src/libnetxms/libnetxms.la'
+else
+  # When we are sourced in execute mode, $file and $ECHO are already set.
+  if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
+    file="$0"
+
+# A function that is used when there is no print builtin or printf.
+func_fallback_echo ()
+{
+  eval 'cat <<_LTECHO_EOF
+$1
+_LTECHO_EOF'
+}
+    ECHO="printf %s\\n"
+  fi
+
+# Very basic option parsing. These options are (a) specific to
+# the libtool wrapper, (b) are identical between the wrapper
+# /script/ and the wrapper /executable/ which is used only on
+# windows platforms, and (c) all begin with the string --lt-
+# (application programs are unlikely to have options which match
+# this pattern).
+#
+# There are only two supported options: --lt-debug and
+# --lt-dump-script. There is, deliberately, no --lt-help.
+#
+# The first argument to this parsing function should be the
+# script's ../../libtool value, followed by no.
+lt_option_debug=
+func_parse_lt_options ()
+{
+  lt_script_arg0=$0
+  shift
+  for lt_opt
+  do
+    case "$lt_opt" in
+    --lt-debug) lt_option_debug=1 ;;
+    --lt-dump-script)
+        lt_dump_D=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%/[^/]*$%%'`
+        test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=.
+        lt_dump_F=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%^.*/%%'`
+        cat "$lt_dump_D/$lt_dump_F"
+        exit 0
+      ;;
+    --lt-*)
+        $ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2
+        exit 1
+      ;;
+    esac
+  done
+
+  # Print the debug banner immediately:
+  if test -n "$lt_option_debug"; then
+    echo "test-libnxcc:test-libnxcc:${LINENO}: libtool wrapper (GNU libtool) 2.4.2 Debian-2.4.2-1.7ubuntu1" 1>&2
+  fi
+}
+
+# Used when --lt-debug. Prints its arguments to stdout
+# (redirection is the responsibility of the caller)
+func_lt_dump_args ()
+{
+  lt_dump_args_N=1;
+  for lt_arg
+  do
+    $ECHO "test-libnxcc:test-libnxcc:${LINENO}: newargv[$lt_dump_args_N]: $lt_arg"
+    lt_dump_args_N=`expr $lt_dump_args_N + 1`
+  done
+}
+
+# Core function for launching the target application
+func_exec_program_core ()
+{
+
+      if test -n "$lt_option_debug"; then
+        $ECHO "test-libnxcc:test-libnxcc:${LINENO}: newargv[0]: $progdir/$program" 1>&2
+        func_lt_dump_args ${1+"$@"} 1>&2
+      fi
+      exec "$progdir/$program" ${1+"$@"}
+
+      $ECHO "$0: cannot exec $program $*" 1>&2
+      exit 1
+}
+
+# A function to encapsulate launching the target application
+# Strips options in the --lt-* namespace from $@ and
+# launches target application with the remaining arguments.
+func_exec_program ()
+{
+  case " $* " in
+  *\ --lt-*)
+    for lt_wr_arg
+    do
+      case $lt_wr_arg in
+      --lt-*) ;;
+      *) set x "$@" "$lt_wr_arg"; shift;;
+      esac
+      shift
+    done ;;
+  esac
+  func_exec_program_core ${1+"$@"}
+}
+
+  # Parse options
+  func_parse_lt_options "$0" ${1+"$@"}
+
+  # Find the directory that this script lives in.
+  thisdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
+  test "x$thisdir" = "x$file" && thisdir=.
+
+  # Follow symbolic links until we get to the real thisdir.
+  file=`ls -ld "$file" | /bin/sed -n 's/.*-> //p'`
+  while test -n "$file"; do
+    destdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
+
+    # If there was a directory component, then change thisdir.
+    if test "x$destdir" != "x$file"; then
+      case "$destdir" in
+      [\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;;
+      *) thisdir="$thisdir/$destdir" ;;
+      esac
+    fi
+
+    file=`$ECHO "$file" | /bin/sed 's%^.*/%%'`
+    file=`ls -ld "$thisdir/$file" | /bin/sed -n 's/.*-> //p'`
+  done
+
+  # Usually 'no', except on cygwin/mingw when embedded into
+  # the cwrapper.
+  WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no
+  if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then
+    # special case for '.'
+    if test "$thisdir" = "."; then
+      thisdir=`pwd`
+    fi
+    # remove .libs from thisdir
+    case "$thisdir" in
+    *[\\/].libs ) thisdir=`$ECHO "$thisdir" | /bin/sed 's%[\\/][^\\/]*$%%'` ;;
+    .libs )   thisdir=. ;;
+    esac
+  fi
+
+  # Try to get the absolute directory name.
+  absdir=`cd "$thisdir" && pwd`
+  test -n "$absdir" && thisdir="$absdir"
+
+  program=lt-'test-libnxcc'
+  progdir="$thisdir/.libs"
+
+  if test ! -f "$progdir/$program" ||
+     { file=`ls -1dt "$progdir/$program" "$progdir/../$program" 2>/dev/null | /bin/sed 1q`; \
+       test "X$file" != "X$progdir/$program"; }; then
+
+    file="$$-$program"
+
+    if test ! -d "$progdir"; then
+      mkdir "$progdir"
+    else
+      rm -f "$progdir/$file"
+    fi
+
+    # relink executable if necessary
+    if test -n "$relink_command"; then
+      if relink_command_output=`eval $relink_command 2>&1`; then :
+      else
+       printf %s\n "$relink_command_output" >&2
+       rm -f "$progdir/$file"
+       exit 1
+      fi
+    fi
+
+    mv -f "$progdir/$file" "$progdir/$program" 2>/dev/null ||
+    { rm -f "$progdir/$program";
+      mv -f "$progdir/$file" "$progdir/$program"; }
+    rm -f "$progdir/$file"
+  fi
+
+  if test -f "$progdir/$program"; then
+    if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
+      # Run the actual program with our arguments.
+      func_exec_program ${1+"$@"}
+    fi
+  else
+    # The program doesn't exist.
+    $ECHO "$0: error: \`$progdir/$program' does not exist" 1>&2
+    $ECHO "This script is just a wrapper for $program." 1>&2
+    $ECHO "See the libtool documentation for more information." 1>&2
+    exit 1
+  fi
+fi
diff --git a/tests/test-libnxcc/test-libnxcc.cpp b/tests/test-libnxcc/test-libnxcc.cpp
new file mode 100644 (file)
index 0000000..2bdbbdb
--- /dev/null
@@ -0,0 +1,77 @@
+#include <nms_common.h>
+#include <nms_util.h>
+#include <nxcc.h>
+#include <testtools.h>
+
+static MUTEX cbLock = MutexCreate();
+
+static void DebugCallback(int level, const TCHAR *format, va_list args)
+{
+   MutexLock(cbLock);
+   _vtprintf(format, args);
+   _tprintf(_T("\n"));
+   MutexUnlock(cbLock);
+}
+
+class EventHandler : public ClusterEventHandler
+{
+public:
+   EventHandler() : ClusterEventHandler() { }
+   virtual ~EventHandler() { }
+
+   virtual void onNodeJoin(UINT32 nodeId)
+   {
+      _tprintf(_T("** Node joined: %d\n"), nodeId);
+   }
+
+   virtual void onNodeDisconnect(UINT32 nodeId)
+   {
+      _tprintf(_T("** Node disconnected: %d\n"), nodeId);
+   }
+
+   virtual void onShutdown()
+   {
+      _tprintf(_T("** cluster shutdown\n"));
+   }
+
+   virtual void onMessage(NXCPMessage *msg, UINT32 sourceNodeId)
+   {
+
+   }
+};
+
+/**
+ * main()
+ */
+int main(int argc, char *argv[])
+{
+   if (argc < 2)
+   {
+      _tprintf(_T("Please specify node ID\n"));
+      return 1;
+   }
+
+#ifdef _WIN32
+   WSADATA wsaData;
+   WSAStartup(MAKEWORD(2, 2), &wsaData);
+#endif
+
+   UINT32 nodeId = strtoul(argv[1], NULL, 0);
+
+   Config *config = new Config();
+   config->setValue(_T("/CLUSTER/NodeId"), nodeId);
+   config->setValue(_T("/CLUSTER/PeerNode"), (nodeId == 1) ? _T("2:127.0.0.1") : _T("1:127.0.0.1"));
+
+   ClusterSetDebugCallback(DebugCallback);
+   ClusterInit(config, _T("CLUSTER"), new EventHandler());
+
+   ClusterJoin();
+   _tprintf(_T("CLUSTER RUNNING\n"));
+
+   ThreadSleep(60);
+
+   ClusterShutdown();
+   _tprintf(_T("CLUSTER SHUTDOWN\n"));
+
+   return 0;
+}