Initial agent-side code for agent to server connections
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 8 Sep 2016 09:16:53 +0000 (12:16 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 8 Sep 2016 09:16:53 +0000 (12:16 +0300)
include/nms_agent.h
include/nms_cscp.h
src/agent/core/Makefile.am
src/agent/core/messages.mc
src/agent/core/nxagentd.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/agent/core/tunnel.cpp [new file with mode: 0644]
src/libnetxms/nxcp.cpp

index dabb17b..e050fcc 100644 (file)
@@ -57,6 +57,7 @@
  * Constants
  */
 #define AGENT_LISTEN_PORT        4700
+#define AGENT_TUNNEL_PORT        4703
 #define AGENT_PROTOCOL_VERSION   2
 #define MAX_RESULT_LENGTH        256
 #define MAX_CMD_LEN              256
index 04c0721..377218f 100644 (file)
@@ -318,7 +318,7 @@ typedef struct
 #define CMD_CREATE_ACTION              0x0051
 #define CMD_DELETE_ACTION              0x0052
 #define CMD_ACTION_DATA                0x0053
-//UNUSED: #define CMD_GET_CONTAINER_CAT_LIST     0x0054
+#define CMD_SETUP_AGENT_TUNNEL         0x0054
 //UNUSED: #define CMD_CONTAINER_CAT_DATA         0x0055
 //UNUSED: #define CMD_DELETE_CONTAINER_CAT       0x0056
 //UNUSED: #define CMD_CREATE_CONTAINER_CAT       0x0057
index 12262e2..8463bd3 100644 (file)
@@ -6,7 +6,7 @@ nxagentd_SOURCES = messages.c actions.cpp appagent.cpp comm.cpp config.cpp \
                    nxagentd.cpp policy.cpp push.cpp register.cpp sa.cpp \
                    sd.cpp session.cpp snmpproxy.cpp snmptrapproxy.cpp \
                    static_subagents.cpp subagent.cpp sysinfo.cpp syslog.cpp \
-                  tools.cpp trap.cpp upgrade.cpp watchdog.cpp
+                  tools.cpp trap.cpp tunnel.cpp upgrade.cpp watchdog.cpp
 if USE_INTERNAL_EXPAT
 nxagentd_LDADD = ../../appagent/libappagent.la ../libnxagent/libnxagent.la @top_srcdir@/src/db/libnxdb/libnxdb.la @top_srcdir@/src/libnetxms/libnetxms.la @top_srcdir@/src/snmp/libnxsnmp/libnxsnmp.la @top_srcdir@/src/libexpat/libexpat/libnxexpat.la @SUBAGENT_LIBS@
 else
index 6b443c2..87ced8e 100644 (file)
@@ -326,4 +326,10 @@ Language=English
 Data collection database schema upgrade failed
 .
 
+MessageId=
+SymbolicName=MSG_INVALID_TUNNEL_CONFIG
+Language=English
+Invalid server connection configuration record "%1"
+.
+
 ;#endif
index f58b9e4..44a5b99 100644 (file)
@@ -65,6 +65,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapReceiver(void *);
 THREAD_RESULT THREAD_CALL SNMPTrapSender(void *);
 THREAD_RESULT THREAD_CALL SyslogReceiver(void *);
 THREAD_RESULT THREAD_CALL SyslogSender(void *);
+THREAD_RESULT THREAD_CALL TunnelManager(void *);
 
 void ShutdownTrapSender();
 void ShutdownSNMPTrapSender();
@@ -85,6 +86,8 @@ BOOL RegisterOnServer(const TCHAR *pszServer);
 
 void UpdatePolicyInventory();
 
+void ParseTunnelList(TCHAR *list);
+
 #if !defined(_WIN32)
 void InitStaticSubagents();
 #endif
@@ -203,6 +206,7 @@ static TCHAR *m_pszShExtParamList = NULL;
 static TCHAR *m_pszParamProviderList = NULL;
 static TCHAR *m_pszExtSubagentList = NULL;
 static TCHAR *m_pszAppAgentList = NULL;
+static TCHAR *s_serverConnectionList = NULL;
 static UINT32 s_enabledCiphers = 0xFFFF;
 static THREAD s_sessionWatchdogThread = INVALID_THREAD_HANDLE;
 static THREAD s_listenerThread = INVALID_THREAD_HANDLE;
@@ -212,6 +216,7 @@ static THREAD s_snmpTrapSenderThread = INVALID_THREAD_HANDLE;
 static THREAD s_syslogReceiverThread = INVALID_THREAD_HANDLE;
 static THREAD s_syslogSenderThread = INVALID_THREAD_HANDLE;
 static THREAD s_masterAgentListenerThread = INVALID_THREAD_HANDLE;
+static THREAD s_tunnelManagerThread = INVALID_THREAD_HANDLE;
 static TCHAR s_processToWaitFor[MAX_PATH] = _T("");
 static TCHAR s_dumpDir[MAX_PATH] = _T("C:\\");
 static UINT64 s_maxLogSize = 16384 * 1024;
@@ -282,6 +287,7 @@ static NX_CFG_TEMPLATE m_cfgTemplate[] =
    { _T("PlatformSuffix"), CT_STRING, 0, 0, MAX_PSUFFIX_LENGTH, 0, g_szPlatformSuffix, NULL },
    { _T("RequireAuthentication"), CT_BOOLEAN, 0, 0, AF_REQUIRE_AUTH, 0, &g_dwFlags, NULL },
    { _T("RequireEncryption"), CT_BOOLEAN, 0, 0, AF_REQUIRE_ENCRYPTION, 0, &g_dwFlags, NULL },
+   { _T("ServerConnection"), CT_STRING_LIST, '\n', 0, 0, 0, &s_serverConnectionList, NULL },
    { _T("Servers"), CT_STRING_LIST, ',', 0, 0, 0, &m_pszServerList, NULL },
    { _T("SessionIdleTimeout"), CT_LONG, 0, 0, 0, 0, &g_dwIdleTimeout, NULL },
    { _T("SessionAgentPort"), CT_WORD, 0, 0, 0, 0, &g_sessionAgentPort, NULL },
@@ -805,6 +811,10 @@ BOOL Initialize()
                if (!InitParameterList())
                        return FALSE;
 
+               // Parse outgoing server connection (tunnel) list
+      if (s_serverConnectionList != NULL)
+         ParseTunnelList(s_serverConnectionList);
+
                // Parse server lists
                if (m_pszMasterServerList != NULL)
                        ParseServerList(m_pszMasterServerList, true, true);
@@ -1030,7 +1040,9 @@ BOOL Initialize()
       {
          RegisterOnServer(g_szRegistrar);
       }
-   }
+
+      s_tunnelManagerThread = ThreadCreateEx(TunnelManager, 0, NULL);
+       }
 
 #if defined(_WIN32)
    s_shutdownCondition = ConditionCreate(TRUE);
@@ -1082,6 +1094,7 @@ void Shutdown()
                ShutdownTrapSender();
                ThreadJoin(s_sessionWatchdogThread);
                ThreadJoin(s_listenerThread);
+               ThreadJoin(s_tunnelManagerThread);
        }
        ThreadJoin(s_eventSenderThread);
        if (g_dwFlags & AF_ENABLE_SNMP_TRAP_PROXY)
index e5f3977..0e897c7 100644 (file)
 #define MAX_PSUFFIX_LENGTH 32
 #define MAX_SERVERS        32
 #define MAX_ESA_USER_NAME  64
+#define MAX_AGENT_MSG_SIZE 4194304
+
 
 #define AF_DAEMON                   0x00000001
 #define AF_USE_SYSLOG               0x00000002
index d483537..5f5df2b 100644 (file)
@@ -36,11 +36,6 @@ UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
 UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
 void ClearDataCollectionConfiguration();
 
-/**
- * Max message size
- */
-#define MAX_MSG_SIZE    4194304
-
 /**
  * SNMP proxy thread pool
  */
@@ -206,7 +201,7 @@ void CommSession::disconnect()
  */
 void CommSession::readThread()
 {
-   SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE);
+   SocketMessageReceiver receiver(m_hSocket, 4096, MAX_AGENT_MSG_SIZE);
    while(true)
    {
       if (!m_proxyConnection)
diff --git a/src/agent/core/tunnel.cpp b/src/agent/core/tunnel.cpp
new file mode 100644 (file)
index 0000000..af65ade
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+** NetXMS multiplatform core agent
+** Copyright (C) 2003-2016 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU General Public License as published by
+** the Free Software Foundation; either version 2 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: tunnel.cpp
+**
+**/
+
+#include "nxagentd.h"
+
+/**
+ * Tunnel class
+ */
+class Tunnel
+{
+private:
+   InetAddress m_address;
+   UINT16 m_port;
+   TCHAR m_login[MAX_OBJECT_NAME];
+   SOCKET m_socket;
+   bool m_connected;
+   UINT32 m_requestId;
+   THREAD m_recvThread;
+   MsgWaitQueue *m_queue;
+
+   Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login);
+
+   bool connectToServer();
+   bool sendMessage(const NXCPMessage *msg);
+   NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return (m_queue != NULL) ? m_queue->waitForMessage(code, id, 5000) : NULL; }
+   void recvThread();
+
+   static THREAD_RESULT THREAD_CALL recvThreadStarter(void *arg);
+
+public:
+   ~Tunnel();
+
+   void checkConnection();
+   void disconnect();
+
+   const InetAddress& getAddress() const { return m_address; }
+   const TCHAR *getLogin() const { return m_login; }
+
+   static Tunnel *createFromConfig(TCHAR *config);
+};
+
+/**
+ * Tunnel constructor
+ */
+Tunnel::Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login) : m_address(addr)
+{
+   m_port = port;
+   nx_strncpy(m_login, login, MAX_OBJECT_NAME);
+   m_socket = INVALID_SOCKET;
+   m_connected = false;
+   m_requestId = 0;
+   m_recvThread = INVALID_THREAD_HANDLE;
+   m_queue = NULL;
+}
+
+/**
+ * Tunnel destructor
+ */
+Tunnel::~Tunnel()
+{
+   disconnect();
+   if (m_socket != INVALID_SOCKET)
+      closesocket(m_socket);
+}
+
+/**
+ * Force disconnect
+ */
+void Tunnel::disconnect()
+{
+   if (m_socket != INVALID_SOCKET)
+      shutdown(m_socket, SHUT_RDWR);
+   m_connected = false;
+   ThreadJoin(m_recvThread);
+   delete_and_null(m_queue);
+}
+
+/**
+ * Receiver thread starter
+ */
+THREAD_RESULT THREAD_CALL Tunnel::recvThreadStarter(void *arg)
+{
+   ((Tunnel *)arg)->recvThread();
+   return THREAD_OK;
+}
+
+/**
+ * Receiver thread
+ */
+void Tunnel::recvThread()
+{
+   SocketMessageReceiver receiver(m_socket, 8192, MAX_AGENT_MSG_SIZE);
+   while(m_connected)
+   {
+      MessageReceiverResult result;
+      NXCPMessage *msg = receiver.readMessage(1000, &result);
+      if (msg != NULL)
+      {
+         m_queue->put(msg);
+      }
+      else if (result != MSGRECV_TIMEOUT)
+      {
+         nxlog_debug(4, _T("Receiver thread for tunnel %s@%s stopped (%s)"), \
+                     m_login, (const TCHAR *)m_address.toString(), AbstractMessageReceiver::resultToText(result));
+         break;
+      }
+   }
+}
+
+/**
+ * Send message
+ */
+bool Tunnel::sendMessage(const NXCPMessage *msg)
+{
+   if (m_socket == INVALID_SOCKET)
+      return false;
+
+   NXCP_MESSAGE *data = msg->createMessage();
+   bool success = SendEx(m_socket, data, ntohl(data->size), 0, NULL);
+   free(data);
+   return success;
+}
+
+/**
+ * Connect to server
+ */
+bool Tunnel::connectToServer()
+{
+   if (m_socket != INVALID_SOCKET)
+      closesocket(m_socket);
+
+   m_socket = socket(m_address.getFamily(), SOCK_STREAM, 0);
+   if (m_socket == INVALID_SOCKET)
+   {
+      nxlog_debug(4, _T("Cannot create socket for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError()));
+      return false;
+   }
+
+   SockAddrBuffer sa;
+   m_address.fillSockAddr(&sa, m_port);
+   if (ConnectEx(m_socket, (struct sockaddr *)&sa, SA_LEN((struct sockaddr *)&sa), 5000) == -1)
+   {
+      nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError()));
+      return false;
+   }
+
+   delete m_queue;
+   m_queue = new MsgWaitQueue();
+   m_recvThread = ThreadCreateEx(Tunnel::recvThreadStarter, 0, this);
+
+   m_requestId = 1;
+
+   NXCPMessage msg;
+   msg.setCode(CMD_SETUP_AGENT_TUNNEL);
+   msg.setId(m_requestId++);
+   msg.setField(VID_LOGIN_NAME, m_login);
+   msg.setField(VID_SHARED_SECRET, g_szSharedSecret);
+   sendMessage(&msg);
+
+   NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
+   if (response == NULL)
+   {
+      nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: request timeout"), m_login, (const TCHAR *)m_address.toString());
+      disconnect();
+      return false;
+   }
+
+   UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
+   delete response;
+   if (rcc != ERR_SUCCESS)
+   {
+      nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: error %d"), m_login, (const TCHAR *)m_address.toString(), rcc);
+      disconnect();
+      return false;
+   }
+
+   m_connected = true;
+   return true;
+}
+
+/**
+ * Check tunnel connection and connect as needed
+ */
+void Tunnel::checkConnection()
+{
+   if (!m_connected)
+   {
+      if (connectToServer())
+         nxlog_debug(3, _T("Tunnel %s@%s active"), m_login, (const TCHAR *)m_address.toString());
+   }
+   else
+   {
+      NXCPMessage msg;
+      msg.setCode(CMD_KEEPALIVE);
+      msg.setId(m_requestId++);
+      if (sendMessage(&msg))
+      {
+         NXCPMessage *response = waitForMessage(CMD_KEEPALIVE, msg.getId());
+         if (response == NULL)
+         {
+            disconnect();
+            closesocket(m_socket);
+            m_socket = INVALID_SOCKET;
+            nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+         }
+         else
+         {
+            delete response;
+         }
+      }
+      else
+      {
+         disconnect();
+         closesocket(m_socket);
+         m_socket = INVALID_SOCKET;
+         nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+      }
+   }
+}
+
+/**
+ * Create tunnel object from configuration record
+ */
+Tunnel *Tunnel::createFromConfig(TCHAR *config)
+{
+   TCHAR *a = _tcschr(config, _T('@'));
+   if (a == NULL)
+      return NULL;
+
+   a++;
+   int port = AGENT_TUNNEL_PORT;
+   TCHAR *p = _tcschr(a, _T(':'));
+   if (p != NULL)
+   {
+      *p = 0;
+      p++;
+
+      TCHAR *eptr;
+      int port = _tcstol(p, &eptr, 10);
+      if ((port < 1) || (port > 65535))
+         return NULL;
+   }
+
+   InetAddress addr = InetAddress::resolveHostName(a);
+   if (!addr.isValidUnicast())
+      return NULL;
+
+   return new Tunnel(addr, port, config);
+}
+
+/**
+ * Configured tunnels
+ */
+static ObjectArray<Tunnel> s_tunnels;
+
+/**
+ * Parser server connection (tunnel) list
+ */
+void ParseTunnelList(TCHAR *list)
+{
+   TCHAR *curr, *next;
+   for(curr = next = list; curr != NULL && *curr != 0; curr = next + 1)
+   {
+      next = _tcschr(curr, _T('\n'));
+      if (next != NULL)
+         *next = 0;
+      StrStrip(curr);
+
+      Tunnel *t = Tunnel::createFromConfig(curr);
+      if (t != NULL)
+      {
+         s_tunnels.add(t);
+         nxlog_debug(1, _T("Added server tunnel %s@%s"), t->getLogin(), (const TCHAR *)t->getAddress().toString());
+      }
+      else
+      {
+         nxlog_write(MSG_INVALID_TUNNEL_CONFIG, NXLOG_ERROR, "s", curr);
+      }
+   }
+   free(list);
+}
+
+/**
+ * Tunnel manager
+ */
+THREAD_RESULT THREAD_CALL TunnelManager(void *)
+{
+   if (s_tunnels.size() == 0)
+   {
+      nxlog_debug(3, _T("No tunnels configured, tunnel manager will not start"));
+      return THREAD_OK;
+   }
+
+   nxlog_debug(3, _T("Tunnel manager started"));
+   while(!AgentSleepAndCheckForShutdown(30000))
+   {
+      for(int i = 0; i < s_tunnels.size(); i++)
+      {
+         Tunnel *t = s_tunnels.get(i);
+         t->checkConnection();
+      }
+   }
+   nxlog_debug(3, _T("Tunnel manager stopped"));
+   return THREAD_OK;
+}
index 1537d26..614ce41 100644 (file)
@@ -119,7 +119,7 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer)
       _T("CMD_CREATE_ACTION"),
       _T("CMD_DELETE_ACTION"),
       _T("CMD_ACTION_DATA"),
-      _T("<unused>"),
+      _T("CMD_SETUP_AGENT_TUNNEL"),
       _T("<unused>"),
       _T("<unused>"),
       _T("<unused>"),