added new NXCP message receiver type - TlsMessageReceiver; unfinished agent tunnel...
authorVictor Kirhenshtein <victor@netxms.org>
Wed, 1 Mar 2017 21:54:43 +0000 (23:54 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Wed, 1 Mar 2017 21:54:53 +0000 (23:54 +0200)
include/nxcpapi.h
src/agent/core/tunnel.cpp
src/libnetxms/msgrecv.cpp
src/server/core/Makefile.am
src/server/core/client.cpp
src/server/core/main.cpp
src/server/core/tunnel.cpp [new file with mode: 0644]
src/server/libnxsrv/messages.mc

index 863d066..6ba15c9 100644 (file)
 #include <wincrypt.h>
 #endif
 
+#ifdef _WITH_ENCRYPTION
+#include <openssl/ssl.h>
+#endif
+
 /**
  * Temporary buffer structure for RecvNXCPMessage() function
  */
@@ -344,6 +348,27 @@ public:
    virtual ~SocketMessageReceiver();
 };
 
+#ifdef _WITH_ENCRYPTION
+
+/**
+ * Message receiver - SSL/TLS implementation
+ */
+class LIBNETXMS_EXPORTABLE TlsMessageReceiver : public AbstractMessageReceiver
+{
+private:
+   SOCKET m_socket;
+   SSL *m_ssl;
+
+protected:
+   virtual int readBytes(BYTE *buffer, size_t size, UINT32 timeout);
+
+public:
+   TlsMessageReceiver(SOCKET socket, SSL *ssl, size_t initialSize, size_t maxSize);
+   virtual ~TlsMessageReceiver();
+};
+
+#endif /* _WITH_ENCRYPTION */
+
 /**
  * Message receiver - UNIX socket/named pipe implementation
  */
index 3419f68..e74505b 100644 (file)
@@ -1,6 +1,6 @@
 /*
 ** NetXMS multiplatform core agent
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
@@ -32,6 +32,8 @@ private:
    UINT16 m_port;
    TCHAR m_login[MAX_OBJECT_NAME];
    SOCKET m_socket;
+   SSL_CTX *m_context;
+   SSL *m_ssl;
    bool m_connected;
    UINT32 m_requestId;
    THREAD m_recvThread;
@@ -66,6 +68,8 @@ Tunnel::Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login) : m_add
    m_port = port;
    nx_strncpy(m_login, login, MAX_OBJECT_NAME);
    m_socket = INVALID_SOCKET;
+   m_context = NULL;
+   m_ssl = NULL;
    m_connected = false;
    m_requestId = 0;
    m_recvThread = INVALID_THREAD_HANDLE;
@@ -80,6 +84,10 @@ Tunnel::~Tunnel()
    disconnect();
    if (m_socket != INVALID_SOCKET)
       closesocket(m_socket);
+   if (m_ssl != NULL)
+      SSL_free(m_ssl);
+   if (m_context != NULL)
+      SSL_CTX_free(m_context);
 }
 
 /**
@@ -108,7 +116,7 @@ THREAD_RESULT THREAD_CALL Tunnel::recvThreadStarter(void *arg)
  */
 void Tunnel::recvThread()
 {
-   SocketMessageReceiver receiver(m_socket, 8192, MAX_AGENT_MSG_SIZE);
+   TlsMessageReceiver receiver(m_socket, m_ssl, 8192, MAX_AGENT_MSG_SIZE);
    while(m_connected)
    {
       MessageReceiverResult result;
@@ -135,7 +143,7 @@ bool Tunnel::sendMessage(const NXCPMessage *msg)
       return false;
 
    NXCP_MESSAGE *data = msg->createMessage(true);
-   bool success = (SendEx(m_socket, data, ntohl(data->size), 0, NULL) == ntohl(data->size));
+   bool success = (SSL_write(m_ssl, data, ntohl(data->size)) == ntohl(data->size));
    free(data);
    return success;
 }
@@ -145,9 +153,19 @@ bool Tunnel::sendMessage(const NXCPMessage *msg)
  */
 bool Tunnel::connectToServer()
 {
+   // Cleanup from previous connection attemp
    if (m_socket != INVALID_SOCKET)
       closesocket(m_socket);
+   if (m_ssl != NULL)
+      SSL_free(m_ssl);
+   if (m_context != NULL)
+      SSL_CTX_free(m_context);
 
+   m_socket = INVALID_SOCKET;
+   m_context = NULL;
+   m_ssl = NULL;
+
+   // Create socket and connect
    m_socket = socket(m_address.getFamily(), SOCK_STREAM, 0);
    if (m_socket == INVALID_SOCKET)
    {
@@ -163,12 +181,66 @@ bool Tunnel::connectToServer()
       return false;
    }
 
+   // Setup secure connection
+   const SSL_METHOD *method = SSLv23_method();
+   if (method == NULL)
+   {
+      nxlog_debug(4, _T("Cannot obtain TLS method for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+      return false;
+   }
+
+   m_context = SSL_CTX_new(method);
+   if (method == NULL)
+   {
+      nxlog_debug(4, _T("Cannot create context for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+      return false;
+   }
+   SSL_CTX_set_options(m_context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
+
+   m_ssl = SSL_new(m_context);
+   if (m_ssl == NULL)
+   {
+      nxlog_debug(4, _T("Cannot create SSL object for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+      return false;
+   }
+
+   SSL_set_connect_state(m_ssl);
+   SSL_set_bio(m_ssl, BIO_new_socket(m_socket, 0), BIO_new_socket(m_socket, 0));
+
+   int rc = SSL_do_handshake(m_ssl);
+   if (rc != 1)
+   {
+      char buffer[128];
+      nxlog_debug(4, _T("TLS handshake failed for tunnel %s@%s (%hs)"),
+               m_login, (const TCHAR *)m_address.toString(), ERR_error_string(SSL_get_error(m_ssl, rc), buffer));
+      return false;
+   }
+
+   // Check server vertificate
+   X509 *cert = SSL_get_peer_certificate(m_ssl);
+   if (cert == NULL)
+   {
+      nxlog_debug(4, _T("Server certificate not provided for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
+      return false;
+   }
+
+   char *subj = X509_NAME_oneline(X509_get_subject_name(cert), NULL ,0);
+   char *issuer = X509_NAME_oneline(X509_get_issuer_name(cert), NULL ,0);
+   nxlog_debug(4, _T("Tunnel %s@%s: certificate subject is %hs"), m_login, (const TCHAR *)m_address.toString(), subj);
+   nxlog_debug(4, _T("Tunnel %s@%s: certificate issuer is %hs"), m_login, (const TCHAR *)m_address.toString(), issuer);
+   OPENSSL_free(subj);
+   OPENSSL_free(issuer);
+
+   X509_free(cert);
+
+   // Setup receiver
    delete m_queue;
    m_queue = new MsgWaitQueue();
    m_recvThread = ThreadCreateEx(Tunnel::recvThreadStarter, 0, this);
 
    m_requestId = 1;
 
+   // Do handshake
    NXCPMessage msg;
    msg.setCode(CMD_SETUP_AGENT_TUNNEL);
    msg.setId(m_requestId++);
index 129d118..7114867 100644 (file)
@@ -197,6 +197,42 @@ int SocketMessageReceiver::readBytes(BYTE *buffer, size_t size, UINT32 timeout)
    return RecvEx(m_socket, buffer, size, 0, timeout);
 }
 
+#ifdef _WITH_ENCRYPTION
+
+/**
+ * TLS message receiver constructor
+ */
+TlsMessageReceiver::TlsMessageReceiver(SOCKET socket, SSL *ssl, size_t initialSize, size_t maxSize) : AbstractMessageReceiver(initialSize, maxSize)
+{
+   m_socket = socket;
+   m_ssl = ssl;
+}
+
+/**
+ * TLS message receiver destructor
+ */
+TlsMessageReceiver::~TlsMessageReceiver()
+{
+}
+
+/**
+ * Read bytes from TLS connection
+ */
+int TlsMessageReceiver::readBytes(BYTE *buffer, size_t size, UINT32 timeout)
+{
+   if (SSL_pending(m_ssl) == 0)
+   {
+      SocketPoller sp;
+      sp.add(m_socket);
+      int rc = sp.poll(timeout);
+      if (rc <= 0)
+         return rc;
+   }
+   return SSL_read(m_ssl, buffer, size);
+}
+
+#endif /* _WITH_ENCRYPTION */
+
 /**
  * Pipe message receiver constructor
  */
index 515d1db..51ad8fc 100644 (file)
@@ -31,7 +31,7 @@ libnxcore_la_SOURCES =  accesspoint.cpp acl.cpp actions.cpp addrlist.cpp \
                        session.cpp slmcheck.cpp smclp.cpp \
                        sms.cpp snmp.cpp snmptrap.cpp stp.cpp subnet.cpp summary_email.cpp \
                        svccontainer.cpp swpkg.cpp syncer.cpp syslogd.cpp \
-                       template.cpp tools.cpp tracert.cpp \
+                       template.cpp tools.cpp tracert.cpp tunnel.cpp \
                        uniroot.cpp upload_job.cpp uptimecalc.cpp userdb.cpp \
                        userdb_objects.cpp vpnconn.cpp vrrp.cpp watchdog.cpp \
                        winperf.cpp xmpp.cpp zeromq.cpp zone.cpp
index 52ad4f4..61df0ab 100644 (file)
@@ -167,11 +167,11 @@ THREAD_RESULT THREAD_CALL ClientListener(void *arg)
          error = errno;
          if (error != EINTR)
 #endif
-            nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
+            nxlog_write(MSG_ACCEPT_ERROR, NXLOG_ERROR, "e", error);
          errorCount++;
          if (errorCount > 1000)
          {
-            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
+            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, NXLOG_WARNING, NULL);
             errorCount = 0;
          }
          ThreadSleepMs(500);
index 97e78e7..5b4fd1c 100644 (file)
@@ -73,6 +73,7 @@ void LoadPerfDataStorageDrivers();
 void ImportLocalConfiguration();
 void RegisterPredictionEngines();
 void ExecuteStartupScripts();
+void CloseAgentTunnels();
 
 void ExecuteScheduledScript(const ScheduledTaskParameters *param);
 void MaintenanceModeEnter(const ScheduledTaskParameters *params);
@@ -113,6 +114,7 @@ THREAD_RESULT THREAD_CALL BeaconPoller(void *);
 THREAD_RESULT THREAD_CALL JobManagerThread(void *);
 THREAD_RESULT THREAD_CALL UptimeCalculator(void *);
 THREAD_RESULT THREAD_CALL ReportingServerConnector(void *);
+THREAD_RESULT THREAD_CALL TunnelListener(void *arg);
 
 /**
  * Global variables
@@ -163,6 +165,7 @@ InetAddressList g_peerNodeAddrList;
 static CONDITION m_condShutdown = INVALID_CONDITION_HANDLE;
 static THREAD m_thPollManager = INVALID_THREAD_HANDLE;
 static THREAD m_thSyncer = INVALID_THREAD_HANDLE;
+static THREAD s_tunnelListenerThread = INVALID_THREAD_HANDLE;
 static int m_nShutdownReason = SHUTDOWN_DEFAULT;
 static StringSet s_components;
 
@@ -967,6 +970,9 @@ retry_db_lock:
        ThreadCreate(MobileDeviceListenerIPv6, 0, NULL);
 #endif
 
+       // Agent tunnels
+   s_tunnelListenerThread = ThreadCreateEx(TunnelListener, 0, NULL);
+
        // Start uptime calculator for SLM
        ThreadCreate(UptimeCalculator, 0, NULL);
 
@@ -1033,6 +1039,9 @@ void NXCORE_EXPORTABLE Shutdown()
        // Wait for critical threads
        ThreadJoin(m_thPollManager);
        ThreadJoin(m_thSyncer);
+       ThreadJoin(s_tunnelListenerThread);
+
+       CloseAgentTunnels();
 
        // Call shutdown functions for the modules
    // CALL_ALL_MODULES cannot be used here because it checks for shutdown flag
diff --git a/src/server/core/tunnel.cpp b/src/server/core/tunnel.cpp
new file mode 100644 (file)
index 0000000..e56aef8
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+** NetXMS - Network Management System
+** Copyright (C) 2003-2017 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU General Public License as published by
+** the Free Software Foundation; either version 2 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: tunnel.cpp
+**/
+
+#include "nxcore.h"
+
+/**
+ * Tunnel listener
+ */
+THREAD_RESULT THREAD_CALL TunnelListener(void *arg)
+{
+   UINT16 port = (UINT16)ConfigReadULong(_T("AgentTunnelListenPort"), 4703);
+
+   // Create socket(s)
+   SOCKET hSocket = socket(AF_INET, SOCK_STREAM, 0);
+#ifdef WITH_IPV6
+   SOCKET hSocket6 = socket(AF_INET6, SOCK_STREAM, 0);
+#endif
+   if ((hSocket == INVALID_SOCKET)
+#ifdef WITH_IPV6
+       && (hSocket6 == INVALID_SOCKET)
+#endif
+      )
+   {
+      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("TunnelListener"));
+      return THREAD_OK;
+   }
+
+   SetSocketExclusiveAddrUse(hSocket);
+   SetSocketReuseFlag(hSocket);
+#ifndef _WIN32
+   fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
+#endif
+
+#ifdef WITH_IPV6
+   SetSocketExclusiveAddrUse(hSocket6);
+   SetSocketReuseFlag(hSocket6);
+#ifndef _WIN32
+   fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
+#endif
+#ifdef IPV6_V6ONLY
+   int on = 1;
+   setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
+#endif
+#endif
+
+   // Fill in local address structure
+   struct sockaddr_in servAddr;
+   memset(&servAddr, 0, sizeof(struct sockaddr_in));
+   servAddr.sin_family = AF_INET;
+
+#ifdef WITH_IPV6
+   struct sockaddr_in6 servAddr6;
+   memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
+   servAddr6.sin6_family = AF_INET6;
+#endif
+
+   if (!_tcscmp(g_szListenAddress, _T("*")))
+   {
+      servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+#ifdef WITH_IPV6
+      memset(servAddr6.sin6_addr.s6_addr, 0, 16);
+#endif
+   }
+   else
+   {
+      InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
+      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
+      {
+         servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
+      }
+      else
+      {
+         servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+      }
+#ifdef WITH_IPV6
+      bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
+      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
+      {
+         memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
+      }
+      else
+      {
+         memset(servAddr6.sin6_addr.s6_addr, 0, 15);
+         servAddr6.sin6_addr.s6_addr[15] = 1;
+      }
+#endif
+   }
+   servAddr.sin_port = htons(port);
+#ifdef WITH_IPV6
+   servAddr6.sin6_port = htons(port);
+#endif
+
+   // Bind socket
+   TCHAR buffer[64];
+   int bindFailures = 0;
+   nxlog_debug(1, _T("Trying to bind on %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
+   if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
+   {
+      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
+      bindFailures++;
+   }
+
+#ifdef WITH_IPV6
+   nxlog_debug(1, _T("Trying to bind on [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
+   if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
+   {
+      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("TunnelListener"), WSAGetLastError());
+      bindFailures++;
+   }
+#else
+   bindFailures++;
+#endif
+
+   // Abort if cannot bind to socket
+   if (bindFailures == 2)
+   {
+      return THREAD_OK;
+   }
+
+   // Set up queue
+   if (listen(hSocket, SOMAXCONN) == 0)
+   {
+      nxlog_write(MSG_LISTENING_FOR_AGENTS, NXLOG_INFO, "ad", ntohl(servAddr.sin_addr.s_addr), port);
+   }
+   else
+   {
+      closesocket(hSocket);
+      hSocket = INVALID_SOCKET;
+   }
+#ifdef WITH_IPV6
+   if (listen(hSocket6, SOMAXCONN) == 0)
+   {
+      nxlog_write(MSG_LISTENING_FOR_AGENTS, NXLOG_INFO, "Hd", servAddr6.sin6_addr.s6_addr, port);
+   }
+   else
+   {
+      closesocket(hSocket6);
+      hSocket6 = INVALID_SOCKET;
+   }
+#endif
+
+   // Wait for connection requests
+   SocketPoller sp;
+   int errorCount = 0;
+   while(!(g_flags & AF_SHUTDOWN))
+   {
+      sp.reset();
+      if (hSocket != INVALID_SOCKET)
+         sp.add(hSocket);
+#ifdef WITH_IPV6
+      if (hSocket6 != INVALID_SOCKET)
+         sp.add(hSocket6);
+#endif
+
+      int nRet = sp.poll(1000);
+      if ((nRet > 0) && (!(g_flags & AF_SHUTDOWN)))
+      {
+         char clientAddr[128];
+         socklen_t size = 128;
+#ifdef WITH_IPV6
+         SOCKET hClientSocket = accept(sp.isSet(hSocket) ? hSocket : hSocket6, (struct sockaddr *)clientAddr, &size);
+#else
+         SOCKET hClientSocket = accept(hSocket, (struct sockaddr *)clientAddr, &size);
+#endif
+         if (hClientSocket == INVALID_SOCKET)
+         {
+            int error = WSAGetLastError();
+
+            if (error != WSAEINTR)
+               nxlog_write(MSG_ACCEPT_ERROR, NXLOG_ERROR, "e", error);
+            errorCount++;
+            if (errorCount > 1000)
+            {
+               nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, NXLOG_WARNING, NULL);
+               errorCount = 0;
+            }
+            ThreadSleepMs(500);
+            continue;
+         }
+
+         // Socket should be closed on successful exec
+#ifndef _WIN32
+         fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
+#endif
+
+         errorCount = 0;     // Reset consecutive errors counter
+         InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
+         nxlog_debug(5, _T("TunnelListener: incoming connection from %s"), addr.toString(buffer));
+
+         shutdown(hClientSocket, SHUT_RDWR);
+      }
+      else if (nRet == -1)
+      {
+         int error = WSAGetLastError();
+
+         // On AIX, select() returns ENOENT after SIGINT for unknown reason
+#ifdef _WIN32
+         if (error != WSAEINTR)
+#else
+         if ((error != EINTR) && (error != ENOENT))
+#endif
+         {
+            ThreadSleepMs(100);
+         }
+      }
+   }
+
+   closesocket(hSocket);
+#ifdef WITH_IPV6
+   closesocket(hSocket6);
+#endif
+   nxlog_debug(1, _T("Tunnel listener thread terminated"));
+   return THREAD_OK;
+}
+
+/**
+ * Close all active agent tunnels
+ */
+void CloseAgentTunnels()
+{
+}
index 957f0cb..cee8f42 100644 (file)
@@ -1046,4 +1046,10 @@ Language=English
 Thread "%1" returned to running state
 .
 
+MessageId=
+SymbolicName=MSG_LISTENING_FOR_AGENTS
+Language=English
+Listening for agent connections on TCP socket %1:%2
+.
+
 ;#endif