implemented helper class SocketListener; server listeners converted to use SocketListener
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 25 Sep 2017 10:57:56 +0000 (13:57 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 25 Sep 2017 10:57:56 +0000 (13:57 +0300)
include/Makefile.am
include/socket_listener.h [new file with mode: 0644]
src/libnetxms/Makefile.am
src/libnetxms/socket_listener.cpp [new file with mode: 0644]
src/server/core/client.cpp
src/server/core/main.cpp
src/server/core/mdconn.cpp
src/server/core/mdsession.cpp
src/server/core/session.cpp
src/server/core/tunnel.cpp
src/server/include/nms_core.h

index 609b06f..13c1289 100644 (file)
@@ -53,6 +53,7 @@ include_HEADERS = \
        nxtools.h \
        nxtux.h \
        rwlock.h \
+       socket_listener.h \
        strophe.h \
        unicode.h \
        uthash.h \
diff --git a/include/socket_listener.h b/include/socket_listener.h
new file mode 100644 (file)
index 0000000..bb205b2
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+** NetXMS - Network Management System
+** Copyright (C) 2003-2017 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU Lesser General Public License as published
+** by the Free Software Foundation; either version 3 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU Lesser General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: socket_listener.h
+**
+**/
+
+#ifndef _socket_listener_h_
+#define _socket_listener_h_
+
+#include <nms_util.h>
+
+/**
+ * Connection processing result
+ */
+enum ConnectionProcessingResult
+{
+   CPR_COMPLETED = 0,
+   CPR_BACKGROUND = 1
+};
+
+/**
+ * Maximum length of listener name
+ */
+#define MAX_LISTENER_NAME_LEN    64
+
+/**
+ * Socket listener
+ */
+class LIBNETXMS_EXPORTABLE SocketListener
+{
+private:
+   SOCKET m_socketV4;
+   SOCKET m_socketV6;
+   UINT16 m_port;
+   TCHAR *m_listenAddress;
+   bool m_allowV4;
+   bool m_allowV6;
+   bool m_stop;
+   TCHAR m_name[MAX_LISTENER_NAME_LEN];
+   UINT32 m_acceptErrors;
+   UINT32 m_acceptedConnections;
+   UINT32 m_rejectedConnections;
+
+protected:
+   virtual bool isConnectionAllowed(const InetAddress& peer);
+   virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
+   virtual bool isStopConditionReached();
+
+public:
+   SocketListener(UINT16 port, bool allowV4 = true, bool allowV6 = true);
+   virtual ~SocketListener();
+
+   void enableIPv4(bool enabled) { m_allowV4 = enabled; }
+   void enableIPv6(bool enabled) { m_allowV6 = enabled; }
+   void setName(const TCHAR *name) { _tcslcpy(m_name, name, MAX_LISTENER_NAME_LEN); }
+   void setListenAddress(const TCHAR *addr) { free(m_listenAddress); m_listenAddress = _tcsdup_ex(addr); }
+
+   UINT32 getAcceptErrors() const { return m_acceptErrors; }
+   UINT32 getAcceptedConnections() const { return m_acceptedConnections; }
+   UINT32 getRejectedConnections() const { return m_rejectedConnections; }
+
+   bool initialize();
+   void mainLoop();
+   void shutdown();
+};
+
+#endif   /* _socket_listener_h_ */
index d74c246..53e6e28 100644 (file)
@@ -6,7 +6,7 @@ SOURCES = array.cpp base64.cpp bytestream.cpp cc_mb.cpp cc_ucs2.cpp \
          msgrecv.cpp msgwq.cpp net.cpp nxcp.cpp nxproc.cpp nxproc_unix.cpp \
          pa.cpp parisc_atomic.cpp \
           qsort.c queue.cpp rbuffer.cpp rwlock.cpp scandir.c serial.cpp \
-         sha1.cpp sha2.cpp solaris9_atomic.c spoll.cpp streamcomp.cpp \
+         sha1.cpp sha2.cpp socket_listener.cpp solaris9_atomic.c spoll.cpp streamcomp.cpp \
          string.cpp stringlist.cpp strlcat.c strlcpy.c strmap.cpp \
          strmapbase.cpp strptime.c \
          strset.cpp strtoll.c strtoull.c table.cpp threads.cpp timegm.c \
diff --git a/src/libnetxms/socket_listener.cpp b/src/libnetxms/socket_listener.cpp
new file mode 100644 (file)
index 0000000..0155f92
--- /dev/null
@@ -0,0 +1,349 @@
+/*
+** NetXMS - Network Management System
+** Utility Library
+** Copyright (C) 2003-2017 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU Lesser General Public License as published
+** by the Free Software Foundation; either version 3 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU Lesser General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: socket_listener.cpp
+**/
+
+#include "libnetxms.h"
+#include <socket_listener.h>
+
+/**
+ * Constructor
+ */
+SocketListener::SocketListener(UINT16 port, bool allowV4, bool allowV6)
+{
+   m_listenAddress = NULL;
+   m_port = port;
+   m_allowV4 = allowV4;
+   m_allowV6 = allowV6;
+   m_socketV4 = INVALID_SOCKET;
+   m_socketV6 = INVALID_SOCKET;
+   m_stop = false;
+   m_acceptErrors = 0;
+   m_acceptedConnections = 0;
+   m_rejectedConnections = 0;
+}
+
+/**
+ * Destructor
+ */
+SocketListener::~SocketListener()
+{
+   shutdown();
+   if (m_socketV4 != INVALID_SOCKET)
+   {
+      closesocket(m_socketV4);
+      m_socketV4 = INVALID_SOCKET;
+   }
+   if (m_socketV6 != INVALID_SOCKET)
+   {
+      closesocket(m_socketV6);
+      m_socketV6 = INVALID_SOCKET;
+   }
+   free(m_listenAddress);
+}
+
+/**
+ * Initialize listener
+ */
+bool SocketListener::initialize()
+{
+   // Create socket(s)
+   m_socketV4 = m_allowV4 ? socket(AF_INET, SOCK_STREAM, 0) : INVALID_SOCKET;
+#ifdef WITH_IPV6
+   m_socketV6 = m_allowV6 ? socket(AF_INET6, SOCK_STREAM, 0) : INVALID_SOCKET;
+#endif
+   if (((m_socketV4 == INVALID_SOCKET) && m_allowV4)
+#ifdef WITH_IPV6
+       && ((m_socketV6 == INVALID_SOCKET) && m_allowV6)
+#endif
+      )
+   {
+      TCHAR buffer[256];
+      nxlog_write_generic(NXLOG_ERROR, _T("SocketListener/%s: socket() call failed (%s)"), m_name, GetLastSocketErrorText(buffer, 256));
+      exit(1);
+   }
+
+   if (m_allowV4)
+   {
+      SetSocketExclusiveAddrUse(m_socketV4);
+      SetSocketReuseFlag(m_socketV4);
+#ifndef _WIN32
+      fcntl(m_socketV4, F_SETFD, fcntl(m_socketV4, F_GETFD) | FD_CLOEXEC);
+#endif
+   }
+
+#ifdef WITH_IPV6
+   if (m_allowV6)
+   {
+      SetSocketExclusiveAddrUse(m_socketV6);
+      SetSocketReuseFlag(m_socketV6);
+#ifndef _WIN32
+      fcntl(m_socketV6, F_SETFD, fcntl(m_socketV6, F_GETFD) | FD_CLOEXEC);
+#endif
+#ifdef IPV6_V6ONLY
+      int on = 1;
+      setsockopt(m_socketV6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
+#endif
+   }
+#endif
+
+   // Fill in local address structure
+   struct sockaddr_in servAddr;
+   memset(&servAddr, 0, sizeof(struct sockaddr_in));
+   servAddr.sin_family = AF_INET;
+
+#ifdef WITH_IPV6
+   struct sockaddr_in6 servAddr6;
+   memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
+   servAddr6.sin6_family = AF_INET6;
+#endif
+
+   if ((m_listenAddress == NULL) || (*m_listenAddress == 0) || !_tcscmp(m_listenAddress, _T("*")))
+   {
+      servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+#ifdef WITH_IPV6
+      memset(servAddr6.sin6_addr.s6_addr, 0, 16);
+#endif
+   }
+   else
+   {
+      InetAddress bindAddress = InetAddress::resolveHostName(m_listenAddress, AF_INET);
+      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
+      {
+         servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
+      }
+      else
+      {
+         servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+      }
+#ifdef WITH_IPV6
+      bindAddress = InetAddress::resolveHostName(m_listenAddress, AF_INET6);
+      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
+      {
+         memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
+      }
+      else
+      {
+         memset(servAddr6.sin6_addr.s6_addr, 0, 15);
+         servAddr6.sin6_addr.s6_addr[15] = 1;
+      }
+#endif
+   }
+   servAddr.sin_port = htons(m_port);
+#ifdef WITH_IPV6
+   servAddr6.sin6_port = htons(m_port);
+#endif
+
+   // Bind socket
+   TCHAR buffer[64];
+   int bindFailures = 0;
+   if (m_allowV4)
+   {
+      nxlog_debug(1, _T("SocketListener/%s: Trying to bind on %s:%d"), m_name, SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
+      if (bind(m_socketV4, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
+      {
+         nxlog_write_generic(NXLOG_ERROR, _T("SocketListener/%s: cannot bind IPv4 socket (%s)"), m_name, GetLastSocketErrorText(buffer, 256));
+         bindFailures++;
+      }
+   }
+   else
+   {
+      bindFailures++;
+   }
+
+#ifdef WITH_IPV6
+   if (m_allowV6)
+   {
+      nxlog_debug(1, _T("SocketListener/%s: Trying to bind on [%s]:%d"), m_name, SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
+      if (bind(m_socketV6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
+      {
+         nxlog_write_generic(NXLOG_ERROR, _T("SocketListener/%s: cannot bind IPv6 socket (%s)"), m_name, GetLastSocketErrorText(buffer, 256));
+         bindFailures++;
+      }
+   }
+   else
+   {
+      bindFailures++;
+   }
+#else
+   bindFailures++;
+#endif
+
+   if (bindFailures == 2)
+      return false;
+
+   // Set up queue
+   if (m_allowV4)
+   {
+      if (listen(m_socketV4, SOMAXCONN) == 0)
+      {
+         nxlog_write_generic(NXLOG_INFO, _T("SocketListener/%s: listening on %s:%d"), m_name, SockaddrToStr((struct sockaddr *)&servAddr, buffer), (int)m_port);
+      }
+      else
+      {
+         closesocket(m_socketV4);
+         m_socketV4 = INVALID_SOCKET;
+      }
+   }
+#ifdef WITH_IPV6
+   if (m_allowV6)
+   {
+      if (listen(m_socketV6, SOMAXCONN) == 0)
+      {
+         nxlog_write_generic(NXLOG_INFO, _T("SocketListener/%s: listening on [%s]:%d"), m_name, SockaddrToStr((struct sockaddr *)&servAddr, buffer), (int)m_port);
+      }
+      else
+      {
+         closesocket(m_socketV6);
+         m_socketV6 = INVALID_SOCKET;
+      }
+   }
+#endif
+
+   return true;
+}
+
+/**
+ * Main listener loop
+ */
+void SocketListener::mainLoop()
+{
+   SocketPoller sp;
+   int errorCount = 0;
+   while(!m_stop && !isStopConditionReached())
+   {
+      sp.reset();
+      if (m_socketV4 != INVALID_SOCKET)
+         sp.add(m_socketV4);
+#ifdef WITH_IPV6
+      if (m_socketV6 != INVALID_SOCKET)
+         sp.add(m_socketV6);
+#endif
+
+      int nRet = sp.poll(1000);
+      if ((nRet > 0) && !m_stop && !isStopConditionReached())
+      {
+         char clientAddr[128];
+         socklen_t size = 128;
+#ifdef WITH_IPV6
+         SOCKET hClientSocket = accept(sp.isSet(m_socketV4) ? m_socketV4 : m_socketV6, (struct sockaddr *)clientAddr, &size);
+#else
+         SOCKET hClientSocket = accept(m_socketV4, (struct sockaddr *)clientAddr, &size);
+#endif
+         if (hClientSocket == INVALID_SOCKET)
+         {
+            int error = WSAGetLastError();
+            if (error != WSAEINTR)
+            {
+               if (errorCount == 0)
+               {
+                  TCHAR buffer[256];
+                  nxlog_write_generic(NXLOG_WARNING, _T("SocketListener/%s: accept() call failed (%s)"), m_name, GetLastSocketErrorText(buffer, 256));
+               }
+               errorCount++;
+            }
+            m_acceptErrors++;
+            if (errorCount > 1000)
+            {
+               nxlog_write_generic(NXLOG_WARNING, _T("SocketListener/%s: multiple consecutive accept() errors"), m_name);
+               errorCount = 0;
+            }
+            ThreadSleepMs(500);
+            continue;
+         }
+
+         // Socket should be closed on successful exec
+#ifndef _WIN32
+         fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
+#endif
+
+         errorCount = 0;     // Reset consecutive errors counter
+         InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
+         TCHAR buffer[256];
+         nxlog_debug(5, _T("SocketListener/%s: Incoming connection from %s"), m_name, addr.toString(buffer));
+
+         if (isConnectionAllowed(addr))
+         {
+            m_acceptedConnections++;
+            nxlog_debug(5, _T("SocketListener/%s: Connection from %s accepted"), m_name, buffer);
+            if (processConnection(hClientSocket, addr) == CPR_COMPLETED)
+            {
+               ::shutdown(hClientSocket, SHUT_RDWR);
+               closesocket(hClientSocket);
+            }
+         }
+         else     // Unauthorized connection
+         {
+            m_rejectedConnections++;
+            ::shutdown(hClientSocket, SHUT_RDWR);
+            closesocket(hClientSocket);
+            nxlog_debug(5, _T("SocketListener/%s: Connection from %s rejected"), m_name, buffer);
+         }
+      }
+      else if (nRet == -1)
+      {
+         int error = WSAGetLastError();
+
+         // On AIX, select() returns ENOENT after SIGINT for unknown reason
+#ifdef _WIN32
+         if (error != WSAEINTR)
+#else
+         if ((error != EINTR) && (error != ENOENT))
+#endif
+         {
+            TCHAR buffer[256];
+            nxlog_write_generic(NXLOG_ERROR, _T("SocketListener/%s: select() call failed (%s)"), m_name, GetLastSocketErrorText(buffer, 256));
+            ThreadSleepMs(100);
+         }
+      }
+   }
+}
+
+/**
+ * Shutdown listener
+ */
+void SocketListener::shutdown()
+{
+   m_stop = true;
+}
+
+/**
+ * Check if incoming connection is allowed
+ */
+bool SocketListener::isConnectionAllowed(const InetAddress& peer)
+{
+   return true;
+}
+
+/**
+ * Process incoming connection
+ */
+ConnectionProcessingResult SocketListener::processConnection(SOCKET s, const InetAddress& peer)
+{
+   return CPR_COMPLETED;
+}
+
+/**
+ * Check if external stop condition is reached
+ */
+bool SocketListener::isStopConditionReached()
+{
+   return false;
+}
index 819292f..59edae4 100644 (file)
 **/
 
 #include "nxcore.h"
+#include <socket_listener.h>
 
 /**
  * Static data
  */
-static ClientSession *m_pSessionList[MAX_CLIENT_SESSIONS];
-static RWLOCK m_rwlockSessionListAccess;
+static ClientSession *s_sessionList[MAX_CLIENT_SESSIONS];
+static RWLOCK s_sessionListLock;
 
 /**
  * Register new session in list
@@ -35,17 +36,17 @@ static BOOL RegisterClientSession(ClientSession *pSession)
 {
    UINT32 i;
 
-   RWLockWriteLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockWriteLock(s_sessionListLock, INFINITE);
    for(i = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if (m_pSessionList[i] == NULL)
+      if (s_sessionList[i] == NULL)
       {
-         m_pSessionList[i] = pSession;
+         s_sessionList[i] = pSession;
          pSession->setId(i);
-         RWLockUnlock(m_rwlockSessionListAccess);
+         RWLockUnlock(s_sessionListLock);
          return TRUE;
       }
 
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    nxlog_write(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
    return FALSE;
 }
@@ -55,9 +56,9 @@ static BOOL RegisterClientSession(ClientSession *pSession)
  */
 void UnregisterClientSession(int id)
 {
-   RWLockWriteLock(m_rwlockSessionListAccess, INFINITE);
-   m_pSessionList[id] = NULL;
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockWriteLock(s_sessionListLock, INFINITE);
+   s_sessionList[id] = NULL;
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -81,12 +82,12 @@ static THREAD_RESULT THREAD_CALL ClientKeepAliveThread(void *)
          break;
 
       msg.setField(VID_TIMESTAMP, (UINT32)time(NULL));
-      RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+      RWLockReadLock(s_sessionListLock, INFINITE);
       for(i = 0; i < MAX_CLIENT_SESSIONS; i++)
-         if (m_pSessionList[i] != NULL)
-            if (m_pSessionList[i]->isAuthenticated())
-               m_pSessionList[i]->postMessage(&msg);
-      RWLockUnlock(m_rwlockSessionListAccess);
+         if (s_sessionList[i] != NULL)
+            if (s_sessionList[i]->isAuthenticated())
+               s_sessionList[i]->postMessage(&msg);
+      RWLockUnlock(s_sessionListLock);
    }
 
    DbgPrintf(1, _T("Client keep-alive thread terminated"));
@@ -98,206 +99,70 @@ static THREAD_RESULT THREAD_CALL ClientKeepAliveThread(void *)
  */
 void InitClientListeners()
 {
+   memset(s_sessionList, 0, sizeof(s_sessionList));
+
    // Create session list access rwlock
-   m_rwlockSessionListAccess = RWLockCreate();
+   s_sessionListLock = RWLockCreate();
 
    // Start client keep-alive thread
    ThreadCreate(ClientKeepAliveThread, 0, NULL);
 }
 
 /**
- * Listener thread
+ * Client listener class
  */
-THREAD_RESULT THREAD_CALL ClientListener(void *arg)
+class ClientListener : public SocketListener
 {
-   SOCKET sock, sockClient;
-   struct sockaddr_in servAddr;
-   int errorCount = 0;
-   socklen_t iSize;
-   WORD wListenPort;
-   ClientSession *pSession;
-
-   // Read configuration
-   wListenPort = (WORD)ConfigReadInt(_T("ClientListenerPort"), SERVER_LISTEN_PORT_FOR_CLIENTS);
-
-   // Create socket
-   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
-   {
-      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("ClientListener"));
-      return THREAD_OK;
-   }
+protected:
+   virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
+   virtual bool isStopConditionReached();
 
-       SetSocketExclusiveAddrUse(sock);
-       SetSocketReuseFlag(sock);
-#ifndef _WIN32
-   fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
-#endif
+public:
+   ClientListener(UINT16 port) : SocketListener(port) { setName(_T("Clients")); }
+};
 
-   // Fill in local address structure
-   memset(&servAddr, 0, sizeof(struct sockaddr_in));
-   servAddr.sin_family = AF_INET;
-   servAddr.sin_addr.s_addr = !_tcscmp(g_szListenAddress, _T("*")) ? 0 : htonl(InetAddress::resolveHostName(g_szListenAddress, AF_INET).getAddressV4());
-   servAddr.sin_port = htons(wListenPort);
+/**
+ * Listener stop condition
+ */
+bool ClientListener::isStopConditionReached()
+{
+   return IsShutdownInProgress();
+}
 
-   // Bind socket
-   if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
+/**
+ * Process incoming connection
+ */
+ConnectionProcessingResult ClientListener::processConnection(SOCKET s, const InetAddress& peer)
+{
+   SetSocketNonBlocking(s);
+   ClientSession *session = new ClientSession(s, peer);
+   if (RegisterClientSession(session))
    {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", wListenPort, _T("ClientListener"), WSAGetLastError());
-      closesocket(sock);
-      /* TODO: we should initiate shutdown procedure here */
-      return THREAD_OK;
+      session->run();
    }
-
-   // Set up queue
-   listen(sock, SOMAXCONN);
-       nxlog_write(MSG_LISTENING_FOR_CLIENTS, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), wListenPort);
-
-   // Wait for connection requests
-   while(!IsShutdownInProgress())
+   else
    {
-      iSize = sizeof(struct sockaddr_in);
-      if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
-      {
-         int error;
-
-#ifdef _WIN32
-         error = WSAGetLastError();
-         if (error != WSAEINTR)
-#else
-         error = errno;
-         if (error != EINTR)
-#endif
-            nxlog_write(MSG_ACCEPT_ERROR, NXLOG_ERROR, "e", error);
-         errorCount++;
-         if (errorCount > 1000)
-         {
-            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, NXLOG_WARNING, NULL);
-            errorCount = 0;
-         }
-         ThreadSleepMs(500);
-                       continue;
-      }
-
-      errorCount = 0;     // Reset consecutive errors counter
-               SetSocketNonBlocking(sockClient);
-
-      // Create new session structure and threads
-      pSession = new ClientSession(sockClient, (struct sockaddr *)&servAddr);
-      if (!RegisterClientSession(pSession))
-      {
-         delete pSession;
-      }
-      else
-      {
-         pSession->run();
-      }
+      delete session;
    }
-
-   closesocket(sock);
-   return THREAD_OK;
+   return CPR_BACKGROUND;
 }
 
-#ifdef WITH_IPV6
-
 /**
- * Listener thread - IPv6
+ * Listener thread
  */
-THREAD_RESULT THREAD_CALL ClientListenerIPv6(void *arg)
+THREAD_RESULT THREAD_CALL ClientListenerThread(void *arg)
 {
-   SOCKET sock, sockClient;
-   struct sockaddr_in6 servAddr;
-   int errorCount = 0;
-   socklen_t iSize;
-   WORD wListenPort;
-   ClientSession *pSession;
-
-   // Read configuration
-   wListenPort = (WORD)ConfigReadInt(_T("ClientListenerPort"), SERVER_LISTEN_PORT_FOR_CLIENTS);
-
-   // Create socket
-   if ((sock = socket(AF_INET6, SOCK_STREAM, 0)) == INVALID_SOCKET)
-   {
-      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("ClientListenerIPv6"));
+   UINT16 listenPort = (UINT16)ConfigReadInt(_T("ClientListenerPort"), SERVER_LISTEN_PORT_FOR_CLIENTS);
+   ClientListener listener(listenPort);
+   listener.setListenAddress(g_szListenAddress);
+   if (!listener.initialize())
       return THREAD_OK;
-   }
-
-       SetSocketExclusiveAddrUse(sock);
-       SetSocketReuseFlag(sock);
-#ifdef IPV6_V6ONLY
-   int on = 1;
-   setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
-#endif
-#ifndef _WIN32
-   fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
-#endif
-
-   // Fill in local address structure
-   memset(&servAddr, 0, sizeof(struct sockaddr_in6));
-   servAddr.sin6_family = AF_INET6;
-   if (_tcscmp(g_szListenAddress, _T("*")))
-      memcpy(servAddr.sin6_addr.s6_addr, InetAddress::resolveHostName(g_szListenAddress, AF_INET6).getAddressV6(), 16);
-   servAddr.sin6_port = htons(wListenPort);
-
-   // Bind socket
-   if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in6)) != 0)
-   {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", wListenPort, _T("ClientListenerIPv6"), WSAGetLastError());
-      closesocket(sock);
-      /* TODO: we should initiate shutdown procedure here */
-      return THREAD_OK;
-   }
-
-   // Set up queue
-   listen(sock, SOMAXCONN);
-       nxlog_write(MSG_LISTENING_FOR_CLIENTS, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr.sin6_addr.s6_addr, wListenPort);
 
-   // Wait for connection requests
-   while(!IsShutdownInProgress())
-   {
-      iSize = sizeof(struct sockaddr_in6);
-      if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
-      {
-         int error;
-
-#ifdef _WIN32
-         error = WSAGetLastError();
-         if (error != WSAEINTR)
-#else
-         error = errno;
-         if (error != EINTR)
-#endif
-            nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
-         errorCount++;
-         if (errorCount > 1000)
-         {
-            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
-            errorCount = 0;
-         }
-         ThreadSleepMs(500);
-                       continue;
-      }
-
-      errorCount = 0;     // Reset consecutive errors counter
-               SetSocketNonBlocking(sockClient);
-
-      // Create new session structure and threads
-      pSession = new ClientSession(sockClient, (struct sockaddr *)&servAddr);
-      if (!RegisterClientSession(pSession))
-      {
-         delete pSession;
-      }
-      else
-      {
-         pSession->run();
-      }
-   }
-
-   closesocket(sock);
+   listener.mainLoop();
+   listener.shutdown();
    return THREAD_OK;
 }
 
-#endif
-
 /**
  * Dump client sessions to screen
  */
@@ -310,26 +175,26 @@ void DumpClientSessions(CONSOLE_CTX pCtx)
        static const TCHAR *pszClientType[] = { _T("DESKTOP"), _T("WEB"), _T("MOBILE"), _T("TABLET"), _T("APP") };
 
    ConsolePrintf(pCtx, _T("ID  STATE                    CIPHER   CLTYPE  USER [CLIENT]\n"));
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(i = 0, iCount = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if (m_pSessionList[i] != NULL)
+      if (s_sessionList[i] != NULL)
       {
          TCHAR webServer[256] = _T("");
-         if (m_pSessionList[i]->getClientType() == CLIENT_TYPE_WEB)
+         if (s_sessionList[i]->getClientType() == CLIENT_TYPE_WEB)
          {
-            _sntprintf(webServer, 256, _T(" (%s)"), m_pSessionList[i]->getWebServerAddress());
+            _sntprintf(webServer, 256, _T(" (%s)"), s_sessionList[i]->getWebServerAddress());
          }
          ConsolePrintf(pCtx, _T("%-3d %-24s %-8s %-7s %s%s [%s]\n"), i,
-                       (m_pSessionList[i]->getState() != SESSION_STATE_PROCESSING) ?
-                         pszStateName[m_pSessionList[i]->getState()] :
-                         NXCPMessageCodeName(m_pSessionList[i]->getCurrentCmd(), szBuffer),
-                                               pszCipherName[m_pSessionList[i]->getCipher() + 1],
-                                                         pszClientType[m_pSessionList[i]->getClientType()],
-                       m_pSessionList[i]->getSessionName(), webServer,
-                       m_pSessionList[i]->getClientInfo());
+                       (s_sessionList[i]->getState() != SESSION_STATE_PROCESSING) ?
+                         pszStateName[s_sessionList[i]->getState()] :
+                         NXCPMessageCodeName(s_sessionList[i]->getCurrentCmd(), szBuffer),
+                                               pszCipherName[s_sessionList[i]->getCipher() + 1],
+                                                         pszClientType[s_sessionList[i]->getClientType()],
+                       s_sessionList[i]->getSessionName(), webServer,
+                       s_sessionList[i]->getClientInfo());
          iCount++;
       }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    ConsolePrintf(pCtx, _T("\n%d active session%s\n\n"), iCount, iCount == 1 ? _T("") : _T("s"));
 }
 
@@ -339,17 +204,17 @@ void DumpClientSessions(CONSOLE_CTX pCtx)
 bool NXCORE_EXPORTABLE KillClientSession(int id)
 {
    bool success = false;
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
    {
-      if ((m_pSessionList[i] != NULL) && (m_pSessionList[i]->getId() == id))
+      if ((s_sessionList[i] != NULL) && (s_sessionList[i]->getId() == id))
       {
-         m_pSessionList[i]->kill();
+         s_sessionList[i]->kill();
          success = true;
          break;
       }
    }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    return success;
 }
 
@@ -358,15 +223,15 @@ bool NXCORE_EXPORTABLE KillClientSession(int id)
  */
 void NXCORE_EXPORTABLE EnumerateClientSessions(void (*pHandler)(ClientSession *, void *), void *pArg)
 {
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
    {
-      if ((m_pSessionList[i] != NULL) && !m_pSessionList[i]->isTerminated())
+      if ((s_sessionList[i] != NULL) && !s_sessionList[i]->isTerminated())
       {
-         pHandler(m_pSessionList[i], pArg);
+         pHandler(s_sessionList[i], pArg);
       }
    }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -389,14 +254,14 @@ void SendUserDBUpdate(int code, UINT32 id, UserDatabaseObject *object)
          break;
    }
 
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if ((m_pSessionList[i] != NULL) &&
-          m_pSessionList[i]->isAuthenticated() &&
-          !m_pSessionList[i]->isTerminated() &&
-          m_pSessionList[i]->isSubscribedTo(NXC_CHANNEL_USERDB))
-         m_pSessionList[i]->postMessage(&msg);
-   RWLockUnlock(m_rwlockSessionListAccess);
+      if ((s_sessionList[i] != NULL) &&
+          s_sessionList[i]->isAuthenticated() &&
+          !s_sessionList[i]->isTerminated() &&
+          s_sessionList[i]->isSubscribedTo(NXC_CHANNEL_USERDB))
+         s_sessionList[i]->postMessage(&msg);
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -404,14 +269,14 @@ void SendUserDBUpdate(int code, UINT32 id, UserDatabaseObject *object)
  */
 void NXCORE_EXPORTABLE NotifyClientGraphUpdate(NXCPMessage *update, UINT32 graphId)
 {
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if ((m_pSessionList[i] != NULL) &&
-          m_pSessionList[i]->isAuthenticated() &&
-          !m_pSessionList[i]->isTerminated() &&
-          (GetGraphAccessCheckResult(graphId, m_pSessionList[i]->getUserId()) == RCC_SUCCESS))
-         m_pSessionList[i]->postMessage(update);
-   RWLockUnlock(m_rwlockSessionListAccess);
+      if ((s_sessionList[i] != NULL) &&
+          s_sessionList[i]->isAuthenticated() &&
+          !s_sessionList[i]->isTerminated() &&
+          (GetGraphAccessCheckResult(graphId, s_sessionList[i]->getUserId()) == RCC_SUCCESS))
+         s_sessionList[i]->postMessage(update);
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -419,17 +284,17 @@ void NXCORE_EXPORTABLE NotifyClientGraphUpdate(NXCPMessage *update, UINT32 graph
  */
 void NXCORE_EXPORTABLE NotifyClientSessions(UINT32 dwCode, UINT32 dwData)
 {
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
    {
-      if ((m_pSessionList[i] != NULL) &&
-          m_pSessionList[i]->isAuthenticated() &&
-          !m_pSessionList[i]->isTerminated())
+      if ((s_sessionList[i] != NULL) &&
+          s_sessionList[i]->isAuthenticated() &&
+          !s_sessionList[i]->isTerminated())
       {
-         m_pSessionList[i]->notify(dwCode, dwData);
+         s_sessionList[i]->notify(dwCode, dwData);
       }
    }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -437,11 +302,11 @@ void NXCORE_EXPORTABLE NotifyClientSessions(UINT32 dwCode, UINT32 dwData)
  */
 void NXCORE_EXPORTABLE NotifyClientSession(UINT32 sessionId, UINT32 dwCode, UINT32 dwData)
 {
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if ((m_pSessionList[i] != NULL) && (m_pSessionList[i]->getId() == sessionId))
-         m_pSessionList[i]->notify(dwCode, dwData);
-   RWLockUnlock(m_rwlockSessionListAccess);
+      if ((s_sessionList[i] != NULL) && (s_sessionList[i]->getId() == sessionId))
+         s_sessionList[i]->notify(dwCode, dwData);
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -451,16 +316,16 @@ int GetSessionCount(bool includeSystemAccount)
 {
    int i, nCount;
 
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(i = 0, nCount = 0; i < MAX_CLIENT_SESSIONS; i++)
    {
-      if ((m_pSessionList[i] != NULL) &&
-          (includeSystemAccount || (m_pSessionList[i]->getUserId() != 0)))
+      if ((s_sessionList[i] != NULL) &&
+          (includeSystemAccount || (s_sessionList[i]->getUserId() != 0)))
       {
          nCount++;
       }
    }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    return nCount;
 }
 
@@ -470,14 +335,14 @@ int GetSessionCount(bool includeSystemAccount)
 bool IsLoggedIn(UINT32 dwUserId)
 {
    bool result = false;
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
-      if (m_pSessionList[i] != NULL && m_pSessionList[i]->getUserId() == dwUserId)
+      if (s_sessionList[i] != NULL && s_sessionList[i]->getUserId() == dwUserId)
       {
          result = true;
          break;
       }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    return result;
 }
 
@@ -486,16 +351,16 @@ bool IsLoggedIn(UINT32 dwUserId)
  */
 void CloseOtherSessions(UINT32 userId, UINT32 thisSession)
 {
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(int i = 0; i < MAX_CLIENT_SESSIONS; i++)
    {
-      if ((m_pSessionList[i] != NULL) &&
-          (m_pSessionList[i]->getUserId() == userId) &&
-          (m_pSessionList[i]->getId() != thisSession))
+      if ((s_sessionList[i] != NULL) &&
+          (s_sessionList[i]->getUserId() == userId) &&
+          (s_sessionList[i]->getId() != thisSession))
       {
-         nxlog_debug(4, _T("CloseOtherSessions(%d,%d): disconnecting session %d"), userId, thisSession, m_pSessionList[i]->getId());
-         m_pSessionList[i]->kill();
+         nxlog_debug(4, _T("CloseOtherSessions(%d,%d): disconnecting session %d"), userId, thisSession, s_sessionList[i]->getId());
+         s_sessionList[i]->kill();
       }
    }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
 }
index 37e1d71..73013c7 100644 (file)
@@ -104,10 +104,8 @@ THREAD_RESULT THREAD_CALL NodePoller(void *);
 THREAD_RESULT THREAD_CALL PollManager(void *);
 THREAD_RESULT THREAD_CALL EventProcessor(void *);
 THREAD_RESULT THREAD_CALL WatchdogThread(void *);
-THREAD_RESULT THREAD_CALL ClientListener(void *);
-THREAD_RESULT THREAD_CALL ClientListenerIPv6(void *);
-THREAD_RESULT THREAD_CALL MobileDeviceListener(void *);
-THREAD_RESULT THREAD_CALL MobileDeviceListenerIPv6(void *);
+THREAD_RESULT THREAD_CALL ClientListenerThread(void *);
+THREAD_RESULT THREAD_CALL MobileDeviceListenerThread(void *);
 THREAD_RESULT THREAD_CALL ISCListener(void *);
 THREAD_RESULT THREAD_CALL LocalAdminListener(void *);
 THREAD_RESULT THREAD_CALL SNMPTrapReceiver(void *);
@@ -991,17 +989,11 @@ retry_db_lock:
       RemoveScheduledTaskByHandlerId(ALARM_SUMMARY_EMAIL_TASK_ID);
 
        // Allow clients to connect
-       ThreadCreate(ClientListener, 0, NULL);
-#ifdef WITH_IPV6
-       ThreadCreate(ClientListenerIPv6, 0, NULL);
-#endif
+       ThreadCreate(ClientListenerThread, 0, NULL);
 
        // Allow mobile devices to connect
        InitMobileDeviceListeners();
-       ThreadCreate(MobileDeviceListener, 0, NULL);
-#ifdef WITH_IPV6
-       ThreadCreate(MobileDeviceListenerIPv6, 0, NULL);
-#endif
+       ThreadCreate(MobileDeviceListenerThread, 0, NULL);
 
        // Agent tunnels
    s_tunnelListenerThread = ThreadCreateEx(TunnelListener, 0, NULL);
index 474ba94..b48499a 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS - Network Management System
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
 **/
 
 #include "nxcore.h"
+#include <socket_listener.h>
 
 /**
  * Static data
  */
-static MobileDeviceSession *m_pSessionList[MAX_DEVICE_SESSIONS];
-static RWLOCK m_rwlockSessionListAccess;
+static MobileDeviceSession *s_sessionList[MAX_DEVICE_SESSIONS];
+static RWLOCK s_sessionListLock;
 
 /**
  * Register new session in list
  */
 static BOOL RegisterMobileDeviceSession(MobileDeviceSession *pSession)
 {
-   UINT32 i;
-
-   RWLockWriteLock(m_rwlockSessionListAccess, INFINITE);
-   for(i = 0; i < MAX_DEVICE_SESSIONS; i++)
-      if (m_pSessionList[i] == NULL)
+   RWLockWriteLock(s_sessionListLock, INFINITE);
+   for(int i = 0; i < MAX_DEVICE_SESSIONS; i++)
+      if (s_sessionList[i] == NULL)
       {
-         m_pSessionList[i] = pSession;
+         s_sessionList[i] = pSession;
          pSession->setId(i + MAX_CLIENT_SESSIONS);
-         RWLockUnlock(m_rwlockSessionListAccess);
+         RWLockUnlock(s_sessionListLock);
          return TRUE;
       }
 
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    nxlog_write(MSG_TOO_MANY_MD_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
    return FALSE;
 }
@@ -55,9 +54,9 @@ static BOOL RegisterMobileDeviceSession(MobileDeviceSession *pSession)
  */
 void UnregisterMobileDeviceSession(int id)
 {
-   RWLockWriteLock(m_rwlockSessionListAccess, INFINITE);
-   m_pSessionList[id - MAX_CLIENT_SESSIONS] = NULL;
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockWriteLock(s_sessionListLock, INFINITE);
+   s_sessionList[id - MAX_CLIENT_SESSIONS] = NULL;
+   RWLockUnlock(s_sessionListLock);
 }
 
 /**
@@ -65,203 +64,65 @@ void UnregisterMobileDeviceSession(int id)
  */
 void InitMobileDeviceListeners()
 {
-   // Create session list access rwlock
-   m_rwlockSessionListAccess = RWLockCreate();
+   memset(s_sessionList, 0, sizeof(s_sessionList));
+   s_sessionListLock = RWLockCreate();
 }
 
 /**
- * Listener thread
+ * Mobile device listener class
  */
-THREAD_RESULT THREAD_CALL MobileDeviceListener(void *arg)
+class MobileDeviceListener : public SocketListener
 {
-   SOCKET sock, sockClient;
-   struct sockaddr_in servAddr;
-   int errorCount = 0;
-   socklen_t iSize;
-   WORD wListenPort;
-   MobileDeviceSession *pSession;
-
-   // Read configuration
-   wListenPort = (WORD)ConfigReadInt(_T("MobileDeviceListenerPort"), SERVER_LISTEN_PORT_FOR_MOBILES);
-
-   // Create socket
-   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
-   {
-      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("MobileDeviceListener"));
-      return THREAD_OK;
-   }
+protected:
+   virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
+   virtual bool isStopConditionReached();
 
-       SetSocketExclusiveAddrUse(sock);
-       SetSocketReuseFlag(sock);
-#ifndef _WIN32
-   fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
-#endif
+public:
+   MobileDeviceListener(UINT16 port) : SocketListener(port) { setName(_T("MobileDevices")); }
+};
 
-   // Fill in local address structure
-   memset(&servAddr, 0, sizeof(struct sockaddr_in));
-   servAddr.sin_family = AF_INET;
-   servAddr.sin_addr.s_addr = !_tcscmp(g_szListenAddress, _T("*")) ? 0 : htonl(InetAddress::resolveHostName(g_szListenAddress, AF_INET).getAddressV4());
-   servAddr.sin_port = htons(wListenPort);
+/**
+ * Listener stop condition
+ */
+bool MobileDeviceListener::isStopConditionReached()
+{
+   return IsShutdownInProgress();
+}
 
-   // Bind socket
-   if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
+/**
+ * Process incoming connection
+ */
+ConnectionProcessingResult MobileDeviceListener::processConnection(SOCKET s, const InetAddress& peer)
+{
+   SetSocketNonBlocking(s);
+   MobileDeviceSession *session = new MobileDeviceSession(s, peer);
+   if (RegisterMobileDeviceSession(session))
    {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", wListenPort, _T("MobileDeviceListener"), WSAGetLastError());
-      closesocket(sock);
-      /* TODO: we should initiate shutdown procedure here */
-      return THREAD_OK;
+      session->run();
    }
-
-   // Set up queue
-   listen(sock, SOMAXCONN);
-       nxlog_write(MSG_LISTENING_FOR_MOBILE_DEVICES, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), wListenPort);
-
-   // Wait for connection requests
-   while(!IsShutdownInProgress())
+   else
    {
-      iSize = sizeof(struct sockaddr_in);
-      if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
-      {
-         int error;
-
-#ifdef _WIN32
-         error = WSAGetLastError();
-         if (error != WSAEINTR)
-#else
-         error = errno;
-         if (error != EINTR)
-#endif
-            nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
-         errorCount++;
-         if (errorCount > 1000)
-         {
-            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
-            errorCount = 0;
-         }
-         ThreadSleepMs(500);
-                       continue;
-      }
-
-      errorCount = 0;     // Reset consecutive errors counter
-               SetSocketNonBlocking(sockClient);
-
-      // Create new session structure and threads
-      pSession = new MobileDeviceSession(sockClient, (struct sockaddr *)&servAddr);
-      if (!RegisterMobileDeviceSession(pSession))
-      {
-         delete pSession;
-      }
-      else
-      {
-         pSession->run();
-      }
+      delete session;
    }
-
-   closesocket(sock);
-   return THREAD_OK;
+   return CPR_BACKGROUND;
 }
 
 /**
- * Listener thread - IPv6
+ * Listener thread
  */
-#ifdef WITH_IPV6
-
-THREAD_RESULT THREAD_CALL MobileDeviceListenerIPv6(void *arg)
+THREAD_RESULT THREAD_CALL MobileDeviceListenerThread(void *arg)
 {
-   SOCKET sock, sockClient;
-   struct sockaddr_in6 servAddr;
-   int errorCount = 0;
-   socklen_t iSize;
-   WORD wListenPort;
-   MobileDeviceSession *pSession;
-
-   // Read configuration
-   wListenPort = (WORD)ConfigReadInt(_T("MobileDeviceListenerPort"), SERVER_LISTEN_PORT_FOR_MOBILES);
-
-   // Create socket
-   if ((sock = socket(AF_INET6, SOCK_STREAM, 0)) == INVALID_SOCKET)
-   {
-      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("MobileDeviceListenerIPv6"));
-      return THREAD_OK;
-   }
-
-   SetSocketExclusiveAddrUse(sock);
-   SetSocketReuseFlag(sock);
-#ifdef IPV6_V6ONLY
-   int on = 1;
-   setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
-#endif
-#ifndef _WIN32
-   fcntl(sock, F_SETFD, fcntl(sock, F_GETFD) | FD_CLOEXEC);
-#endif
-
-   // Fill in local address structure
-   memset(&servAddr, 0, sizeof(struct sockaddr_in6));
-   servAddr.sin6_family = AF_INET6;
-   if (_tcscmp(g_szListenAddress, _T("*")))
-      memcpy(servAddr.sin6_addr.s6_addr, InetAddress::resolveHostName(g_szListenAddress, AF_INET6).getAddressV6(), 16);
-   servAddr.sin6_port = htons(wListenPort);
-
-   // Bind socket
-   if (bind(sock, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in6)) != 0)
-   {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", wListenPort, _T("MobileDeviceListenerIPv6"), WSAGetLastError());
-      closesocket(sock);
-      /* TODO: we should initiate shutdown procedure here */
+   UINT16 listenPort = (UINT16)ConfigReadInt(_T("MobileDeviceListenerPort"), SERVER_LISTEN_PORT_FOR_MOBILES);
+   MobileDeviceListener listener(listenPort);
+   listener.setListenAddress(g_szListenAddress);
+   if (!listener.initialize())
       return THREAD_OK;
-   }
-
-   // Set up queue
-   listen(sock, SOMAXCONN);
-       nxlog_write(MSG_LISTENING_FOR_MOBILE_DEVICES, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr.sin6_addr.s6_addr, wListenPort);
 
-   // Wait for connection requests
-   while(!IsShutdownInProgress())
-   {
-      iSize = sizeof(struct sockaddr_in6);
-      if ((sockClient = accept(sock, (struct sockaddr *)&servAddr, &iSize)) == -1)
-      {
-         int error;
-
-#ifdef _WIN32
-         error = WSAGetLastError();
-         if (error != WSAEINTR)
-#else
-         error = errno;
-         if (error != EINTR)
-#endif
-            nxlog_write(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
-         errorCount++;
-         if (errorCount > 1000)
-         {
-            nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
-            errorCount = 0;
-         }
-         ThreadSleepMs(500);
-                       continue;
-      }
-
-      errorCount = 0;     // Reset consecutive errors counter
-               SetSocketNonBlocking(sockClient);
-
-      // Create new session structure and threads
-      pSession = new MobileDeviceSession(sockClient, (struct sockaddr *)&servAddr);
-      if (!RegisterMobileDeviceSession(pSession))
-      {
-         delete pSession;
-      }
-      else
-      {
-         pSession->run();
-      }
-   }
-
-   closesocket(sock);
+   listener.mainLoop();
+   listener.shutdown();
    return THREAD_OK;
 }
 
-#endif
-
 /**
  * Dump client sessions to screen
  */
@@ -273,19 +134,19 @@ void DumpMobileDeviceSessions(CONSOLE_CTX pCtx)
    static const TCHAR *pszCipherName[] = { _T("NONE"), _T("AES-256"), _T("BLOWFISH"), _T("IDEA"), _T("3DES"), _T("AES-128") };
 
    ConsolePrintf(pCtx, _T("ID  STATE                    CIPHER   USER [CLIENT]\n"));
-   RWLockReadLock(m_rwlockSessionListAccess, INFINITE);
+   RWLockReadLock(s_sessionListLock, INFINITE);
    for(i = 0, iCount = 0; i < MAX_DEVICE_SESSIONS; i++)
-      if (m_pSessionList[i] != NULL)
+      if (s_sessionList[i] != NULL)
       {
          ConsolePrintf(pCtx, _T("%-3d %-24s %-8s %s [%s]\n"), i, 
-                       (m_pSessionList[i]->getState() != SESSION_STATE_PROCESSING) ?
-                         pszStateName[m_pSessionList[i]->getState()] :
-                         NXCPMessageCodeName(m_pSessionList[i]->getCurrentCmd(), szBuffer),
-                                               pszCipherName[m_pSessionList[i]->getCipher() + 1],
-                       m_pSessionList[i]->getUserName(),
-                       m_pSessionList[i]->getClientInfo());
+                       (s_sessionList[i]->getState() != SESSION_STATE_PROCESSING) ?
+                         pszStateName[s_sessionList[i]->getState()] :
+                         NXCPMessageCodeName(s_sessionList[i]->getCurrentCmd(), szBuffer),
+                                               pszCipherName[s_sessionList[i]->getCipher() + 1],
+                       s_sessionList[i]->getUserName(),
+                       s_sessionList[i]->getClientInfo());
          iCount++;
       }
-   RWLockUnlock(m_rwlockSessionListAccess);
+   RWLockUnlock(s_sessionListLock);
    ConsolePrintf(pCtx, _T("\n%d active session%s\n\n"), iCount, iCount == 1 ? _T("") : _T("s"));
 }
index 0515f23..7a4e884 100644 (file)
@@ -82,7 +82,7 @@ THREAD_RESULT THREAD_CALL MobileDeviceSession::processingThreadStarter(void *pAr
 /**
  * Mobile device session class constructor
  */
-MobileDeviceSession::MobileDeviceSession(SOCKET hSocket, struct sockaddr *addr)
+MobileDeviceSession::MobileDeviceSession(SOCKET hSocket, const InetAddress& addr)
 {
    m_pSendQueue = new Queue;
    m_pMessageQueue = new Queue;
@@ -94,13 +94,8 @@ MobileDeviceSession::MobileDeviceSession(SOCKET hSocket, struct sockaddr *addr)
    m_hWriteThread = INVALID_THREAD_HANDLE;
    m_hProcessingThread = INVALID_THREAD_HANDLE;
        m_mutexSocketWrite = MutexCreate();
-       m_clientAddr = (struct sockaddr *)nx_memdup(addr, (addr->sa_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
-       if (addr->sa_family == AF_INET)
-               IpToStr(ntohl(((struct sockaddr_in *)m_clientAddr)->sin_addr.s_addr), m_szHostName);
-#ifdef WITH_IPV6
-       else
-               Ip6ToStr(((struct sockaddr_in6 *)m_clientAddr)->sin6_addr.s6_addr, m_szHostName);
-#endif
+       m_clientAddr = addr;
+       m_clientAddr.toString(m_szHostName);
    _tcscpy(m_szUserName, _T("<not logged in>"));
        _tcscpy(m_szClientInfo, _T("n/a"));
    m_dwUserId = INVALID_INDEX;
@@ -122,8 +117,7 @@ MobileDeviceSession::~MobileDeviceSession()
       closesocket(m_hSocket);
    delete m_pSendQueue;
    delete m_pMessageQueue;
-   safe_free(m_pMsgBuffer);
-       safe_free(m_clientAddr);
+   free(m_pMsgBuffer);
        MutexDestroy(m_mutexSocketWrite);
        if (m_pCtx != NULL)
                m_pCtx->decRefCount();
index ce28c41..c848ee4 100644 (file)
@@ -214,7 +214,7 @@ THREAD_RESULT THREAD_CALL ClientSession::processingThreadStarter(void *pArg)
 /**
  * Client session class constructor
  */
-ClientSession::ClientSession(SOCKET hSocket, struct sockaddr *addr)
+ClientSession::ClientSession(SOCKET hSocket, const InetAddress& addr)
 {
    m_sendQueue = new Queue;
    m_requestQueue = new Queue;
@@ -234,13 +234,8 @@ ClientSession::ClientSession(SOCKET hSocket, struct sockaddr *addr)
    m_subscriptions = new StringObjectMap<UINT32>(true);
    m_dwFlags = 0;
        m_clientType = CLIENT_TYPE_DESKTOP;
-       m_clientAddr = (struct sockaddr *)nx_memdup(addr, (addr->sa_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
-       if (addr->sa_family == AF_INET)
-               IpToStr(ntohl(((struct sockaddr_in *)m_clientAddr)->sin_addr.s_addr), m_workstation);
-#ifdef WITH_IPV6
-       else
-               Ip6ToStr(((struct sockaddr_in6 *)m_clientAddr)->sin6_addr.s6_addr, m_workstation);
-#endif
+       m_clientAddr = addr;
+       m_clientAddr.toString(m_workstation);
    m_webServerAddress[0] = 0;
    m_loginName[0] = 0;
    _tcscpy(m_sessionName, _T("<not logged in>"));
@@ -274,7 +269,6 @@ ClientSession::~ClientSession()
       closesocket(m_hSocket);
    delete m_sendQueue;
    delete m_requestQueue;
-       free(m_clientAddr);
        MutexDestroy(m_mutexSocketWrite);
    MutexDestroy(m_mutexSendObjects);
    MutexDestroy(m_mutexSendAlarms);
@@ -6202,14 +6196,13 @@ void ClientSession::onTrap(NXCPMessage *pRequest)
        }
    else   // Client is the source
        {
-      InetAddress addr = InetAddress::createFromSockaddr(m_clientAddr);
-      if (addr.isLoopback())
+      if (m_clientAddr.isLoopback())
       {
                        object = FindObjectById(g_dwMgmtNode);
       }
       else
       {
-                       object = FindNodeByIP(0, addr);
+                       object = FindNodeByIP(0, m_clientAddr);
       }
        }
    if (object != NULL)
@@ -8828,7 +8821,7 @@ void ClientSession::sendConfigForAgent(NXCPMessage *pRequest)
    wMinor = pRequest->getFieldAsUInt16(VID_VERSION_MINOR);
    wRelease = pRequest->getFieldAsUInt16(VID_VERSION_RELEASE);
    DbgPrintf(3, _T("Finding config for agent at %s: platform=\"%s\", version=\"%d.%d.%d\""),
-             SockaddrToStr(m_clientAddr, szBuffer), szPlatform, (int)wMajor, (int)wMinor, (int)wRelease);
+             m_clientAddr.toString(szBuffer), szPlatform, (int)wMajor, (int)wMinor, (int)wRelease);
 
    DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
    hResult = DBSelect(hdb, _T("SELECT config_id,config_file,config_filter FROM agent_configs ORDER BY sequence_number"));
@@ -8854,7 +8847,7 @@ void ClientSession::sendConfigForAgent(NXCPMessage *pRequest)
             // $4 - minor version number
             // $5 - release number
             NXSL_Value *ppArgList[5];
-            ppArgList[0] = new NXSL_Value(SockaddrToStr(m_clientAddr, szBuffer));
+            ppArgList[0] = new NXSL_Value(m_clientAddr.toString(szBuffer));
             ppArgList[1] = new NXSL_Value(szPlatform);
             ppArgList[2] = new NXSL_Value((LONG)wMajor);
             ppArgList[3] = new NXSL_Value((LONG)wMinor);
@@ -8868,7 +8861,7 @@ void ClientSession::sendConfigForAgent(NXCPMessage *pRequest)
                if (pValue->getValueAsInt32() != 0)
                {
                   DbgPrintf(3, _T("Configuration script %d matched for agent %s, sending config"),
-                            dwCfgId, SockaddrToStr(m_clientAddr, szBuffer));
+                            dwCfgId, m_clientAddr.toString(szBuffer));
                   msg.setField(VID_RCC, (WORD)0);
                   pszText = DBGetField(hResult, i, 1, NULL, 0);
                   DecodeSQLStringAndSetVariable(&msg, VID_CONFIG_FILE, pszText);
@@ -8880,7 +8873,7 @@ void ClientSession::sendConfigForAgent(NXCPMessage *pRequest)
                else
                {
                   DbgPrintf(3, _T("Configuration script %d not matched for agent %s"),
-                            dwCfgId, SockaddrToStr(m_clientAddr, szBuffer));
+                            dwCfgId, m_clientAddr.toString(szBuffer));
                }
             }
             else
@@ -10210,31 +10203,22 @@ void ClientSession::registerAgent(NXCPMessage *pRequest)
 
        if (ConfigReadInt(_T("EnableAgentRegistration"), 0))
        {
-               if (m_clientAddr->sa_family == AF_INET)
-               {
-                       node = FindNodeByIP(0, ntohl(((struct sockaddr_in *)m_clientAddr)->sin_addr.s_addr));
-                       if (node != NULL)
-                       {
-                               // Node already exist, force configuration poll
-                               node->setRecheckCapsFlag();
-                               node->forceConfigurationPoll();
-                       }
-                       else
-                       {
-                               NEW_NODE *info;
-
-                               info = (NEW_NODE *)malloc(sizeof(NEW_NODE));
-            info->ipAddr = InetAddress::createFromSockaddr(m_clientAddr);
-                               info->zoneUIN = 0;      // Add to default zone
-                               info->ignoreFilter = TRUE;              // Ignore discovery filters and add node anyway
-                               g_nodePollerQueue.put(info);
-                       }
-                       msg.setField(VID_RCC, RCC_SUCCESS);
-               }
-               else
-               {
-                       msg.setField(VID_RCC, RCC_NOT_IMPLEMENTED);
-               }
+      node = FindNodeByIP(0, m_clientAddr);
+      if (node != NULL)
+      {
+         // Node already exist, force configuration poll
+         node->setRecheckCapsFlag();
+         node->forceConfigurationPoll();
+      }
+      else
+      {
+         NEW_NODE *info = (NEW_NODE *)malloc(sizeof(NEW_NODE));
+         info->ipAddr = m_clientAddr;
+         info->zoneUIN = 0;    // Add to default zone
+         info->ignoreFilter = TRUE;            // Ignore discovery filters and add node anyway
+         g_nodePollerQueue.put(info);
+      }
+      msg.setField(VID_RCC, RCC_SUCCESS);
        }
        else
        {
index 3148487..c1e571c 100644 (file)
@@ -20,6 +20,7 @@
 **/
 
 #include "nxcore.h"
+#include <socket_listener.h>
 #include <agent_tunnel.h>
 
 #define MAX_MSG_SIZE    268435456
@@ -1068,213 +1069,56 @@ failure:
 static Mutex s_tunnelListenerLock;
 
 /**
- * Tunnel listener
+ * Client listener class
  */
-THREAD_RESULT THREAD_CALL TunnelListener(void *arg)
+class TunnelListener : public SocketListener
 {
-   s_tunnelListenerLock.lock();
-   UINT16 port = (UINT16)ConfigReadULong(_T("AgentTunnelListenPort"), 4703);
-
-   // Create socket(s)
-   SOCKET hSocket = socket(AF_INET, SOCK_STREAM, 0);
-#ifdef WITH_IPV6
-   SOCKET hSocket6 = socket(AF_INET6, SOCK_STREAM, 0);
-   if ((hSocket == INVALID_SOCKET) && (hSocket6 == INVALID_SOCKET))
-#else
-   if (hSocket == INVALID_SOCKET)
-#endif
-   {
-      nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("TunnelListener"));
-      return THREAD_OK;
-   }
-
-   SetSocketExclusiveAddrUse(hSocket);
-   SetSocketReuseFlag(hSocket);
-#ifndef _WIN32
-   fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
-#endif
-
-#ifdef WITH_IPV6
-   SetSocketExclusiveAddrUse(hSocket6);
-   SetSocketReuseFlag(hSocket6);
-#ifndef _WIN32
-   fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
-#endif
-#ifdef IPV6_V6ONLY
-   int on = 1;
-   setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
-#endif
-#endif
-
-   // Fill in local address structure
-   struct sockaddr_in servAddr;
-   memset(&servAddr, 0, sizeof(struct sockaddr_in));
-   servAddr.sin_family = AF_INET;
+protected:
+   virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
+   virtual bool isStopConditionReached();
 
-#ifdef WITH_IPV6
-   struct sockaddr_in6 servAddr6;
-   memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
-   servAddr6.sin6_family = AF_INET6;
-#endif
-
-   if (!_tcscmp(g_szListenAddress, _T("*")))
-   {
-      servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-#ifdef WITH_IPV6
-      memset(servAddr6.sin6_addr.s6_addr, 0, 16);
-#endif
-   }
-   else
-   {
-      InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
-      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
-      {
-         servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
-      }
-      else
-      {
-         servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-      }
-#ifdef WITH_IPV6
-      bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
-      if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
-      {
-         memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
-      }
-      else
-      {
-         memset(servAddr6.sin6_addr.s6_addr, 0, 15);
-         servAddr6.sin6_addr.s6_addr[15] = 1;
-      }
-#endif
-   }
-   servAddr.sin_port = htons(port);
-#ifdef WITH_IPV6
-   servAddr6.sin6_port = htons(port);
-#endif
+public:
+   TunnelListener(UINT16 port) : SocketListener(port) { setName(_T("AgentTunnels")); }
+};
 
-   // Bind socket
-   TCHAR buffer[64];
-   int bindFailures = 0;
-   nxlog_debug(1, _T("Trying to bind on %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
-   if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
-   {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("TunnelListener"), WSAGetLastError());
-      bindFailures++;
-   }
+/**
+ * Listener stop condition
+ */
+bool TunnelListener::isStopConditionReached()
+{
+   return IsShutdownInProgress();
+}
 
-#ifdef WITH_IPV6
-   nxlog_debug(1, _T("Trying to bind on [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
-   if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
-   {
-      nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("TunnelListener"), WSAGetLastError());
-      bindFailures++;
-   }
-#else
-   bindFailures++;
-#endif
+/**
+ * Process incoming connection
+ */
+ConnectionProcessingResult TunnelListener::processConnection(SOCKET s, const InetAddress& peer)
+{
+   ConnectionRequest *request = new ConnectionRequest();
+   request->sock = s;
+   request->addr = peer;
+   ThreadPoolExecute(g_mainThreadPool, SetupTunnel, request);
+   return CPR_BACKGROUND;
+}
 
-   // Abort if cannot bind to socket
-   if (bindFailures == 2)
+/**
+ * Tunnel listener
+ */
+THREAD_RESULT THREAD_CALL TunnelListenerThread(void *arg)
+{
+   s_tunnelListenerLock.lock();
+   UINT16 listenPort = (UINT16)ConfigReadULong(_T("AgentTunnelListenPort"), 4703);
+   TunnelListener listener(listenPort);
+   listener.setListenAddress(g_szListenAddress);
+   if (!listener.initialize())
    {
+      s_tunnelListenerLock.unlock();
       return THREAD_OK;
    }
 
-   // Set up queue
-   if (listen(hSocket, SOMAXCONN) == 0)
-   {
-      nxlog_write(MSG_LISTENING_FOR_AGENTS, NXLOG_INFO, "ad", ntohl(servAddr.sin_addr.s_addr), port);
-   }
-   else
-   {
-      closesocket(hSocket);
-      hSocket = INVALID_SOCKET;
-   }
-#ifdef WITH_IPV6
-   if (listen(hSocket6, SOMAXCONN) == 0)
-   {
-      nxlog_write(MSG_LISTENING_FOR_AGENTS, NXLOG_INFO, "Hd", servAddr6.sin6_addr.s6_addr, port);
-   }
-   else
-   {
-      closesocket(hSocket6);
-      hSocket6 = INVALID_SOCKET;
-   }
-#endif
-
-   // Wait for connection requests
-   SocketPoller sp;
-   int errorCount = 0;
-   while(!(g_flags & AF_SHUTDOWN))
-   {
-      sp.reset();
-      if (hSocket != INVALID_SOCKET)
-         sp.add(hSocket);
-#ifdef WITH_IPV6
-      if (hSocket6 != INVALID_SOCKET)
-         sp.add(hSocket6);
-#endif
-
-      int nRet = sp.poll(1000);
-      if ((nRet > 0) && (!(g_flags & AF_SHUTDOWN)))
-      {
-         char clientAddr[128];
-         socklen_t size = 128;
-#ifdef WITH_IPV6
-         SOCKET hClientSocket = accept(sp.isSet(hSocket) ? hSocket : hSocket6, (struct sockaddr *)clientAddr, &size);
-#else
-         SOCKET hClientSocket = accept(hSocket, (struct sockaddr *)clientAddr, &size);
-#endif
-         if (hClientSocket == INVALID_SOCKET)
-         {
-            int error = WSAGetLastError();
-
-            if (error != WSAEINTR)
-               nxlog_write(MSG_ACCEPT_ERROR, NXLOG_ERROR, "e", error);
-            errorCount++;
-            if (errorCount > 1000)
-            {
-               nxlog_write(MSG_TOO_MANY_ACCEPT_ERRORS, NXLOG_WARNING, NULL);
-               errorCount = 0;
-            }
-            ThreadSleepMs(500);
-            continue;
-         }
-
-         // Socket should be closed on successful exec
-#ifndef _WIN32
-         fcntl(hClientSocket, F_SETFD, fcntl(hClientSocket, F_GETFD) | FD_CLOEXEC);
-#endif
-
-         errorCount = 0;     // Reset consecutive errors counter
-         InetAddress addr = InetAddress::createFromSockaddr((struct sockaddr *)clientAddr);
-         nxlog_debug(5, _T("TunnelListener: incoming connection from %s"), addr.toString(buffer));
+   listener.mainLoop();
+   listener.shutdown();
 
-         ConnectionRequest *request = new ConnectionRequest();
-         request->sock = hClientSocket;
-         request->addr = addr;
-         ThreadPoolExecute(g_mainThreadPool, SetupTunnel, request);
-      }
-      else if (nRet == -1)
-      {
-         int error = WSAGetLastError();
-
-         // On AIX, select() returns ENOENT after SIGINT for unknown reason
-#ifdef _WIN32
-         if (error != WSAEINTR)
-#else
-         if ((error != EINTR) && (error != ENOENT))
-#endif
-         {
-            ThreadSleepMs(100);
-         }
-      }
-   }
-
-   closesocket(hSocket);
-#ifdef WITH_IPV6
-   closesocket(hSocket6);
-#endif
    nxlog_debug(1, _T("Tunnel listener thread terminated"));
    s_tunnelListenerLock.unlock();
    return THREAD_OK;
index 0b198ae..d035707 100644 (file)
@@ -332,7 +332,7 @@ private:
    THREAD m_hWriteThread;
    THREAD m_hProcessingThread;
        MUTEX m_mutexSocketWrite;
-       struct sockaddr *m_clientAddr;
+       InetAddress m_clientAddr;
        TCHAR m_szHostName[256]; // IP address of name of conneced host in textual form
    TCHAR m_szUserName[MAX_SESSION_NAME];   // String in form login_name@host
    TCHAR m_szClientInfo[96];  // Client app info string
@@ -360,7 +360,7 @@ private:
    void pushData(NXCPMessage *request);
 
 public:
-   MobileDeviceSession(SOCKET hSocket, struct sockaddr *addr);
+   MobileDeviceSession(SOCKET hSocket, const InetAddress& addr);
    ~MobileDeviceSession();
 
    void run();
@@ -432,7 +432,7 @@ private:
    MUTEX m_mutexSendActions;
        MUTEX m_mutexSendAuditLog;
    MUTEX m_mutexPollerInit;
-       struct sockaddr *m_clientAddr;
+       InetAddress m_clientAddr;
        TCHAR m_workstation[256];      // IP address or name of connected host in textual form
    TCHAR m_webServerAddress[256]; // IP address or name of web server for web sessions
    TCHAR m_loginName[MAX_USER_NAME];
@@ -750,7 +750,7 @@ private:
    void sendObjectUpdate(NetObj *object);
 
 public:
-   ClientSession(SOCKET hSocket, struct sockaddr *addr);
+   ClientSession(SOCKET hSocket, const InetAddress& addr);
    ~ClientSession();
 
    void incRefCount() { InterlockedIncrement(&m_refCount); }