implemented connection pool in SSH subagent
authorVictor Kirhenshtein <victor@netxms.org>
Wed, 3 Aug 2016 07:18:32 +0000 (10:18 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Wed, 3 Aug 2016 07:18:32 +0000 (10:18 +0300)
src/agent/subagents/ssh/Makefile.am
src/agent/subagents/ssh/handlers.cpp
src/agent/subagents/ssh/main.cpp
src/agent/subagents/ssh/session.cpp
src/agent/subagents/ssh/sp.cpp [new file with mode: 0644]
src/agent/subagents/ssh/ssh_subagent.h

index 27769a2..a38924f 100644 (file)
@@ -1,7 +1,7 @@
 SUBAGENT = ssh
 
 pkglib_LTLIBRARIES = ssh.la
-ssh_la_SOURCES = handlers.cpp main.cpp session.cpp
+ssh_la_SOURCES = handlers.cpp main.cpp session.cpp sp.cpp
 ssh_la_CPPFLAGS=-I@top_srcdir@/include
 ssh_la_LDFLAGS = -module -avoid-version -export-symbols ../subagent.sym
 ssh_la_LIBADD = ../../libnxagent/libnxagent.la ../../../libnetxms/libnetxms.la -lssh
index 8dd8626..7a7ae32 100644 (file)
@@ -48,10 +48,10 @@ LONG H_SSHCommand(const TCHAR *param, const TCHAR *arg, TCHAR *value, AbstractCo
       return SYSINFO_RC_UNSUPPORTED;
 
    LONG rc = SYSINFO_RC_ERROR;
-   SSHSession ssh(addr, port);
-   if (ssh.connect(login, password))
+   SSHSession *ssh = AcquireSession(addr, port, login, password);
+   if (ssh != NULL)
    {
-      StringList *output = ssh.execute(command);
+      StringList *output = ssh->execute(command);
       if (output != NULL)
       {
          if (output->size() > 0)
@@ -61,7 +61,7 @@ LONG H_SSHCommand(const TCHAR *param, const TCHAR *arg, TCHAR *value, AbstractCo
          }
          delete output;
       }
-      ssh.disconnect();
+      ReleaseSession(ssh);
    }
    return rc;
 }
@@ -92,17 +92,17 @@ LONG H_SSHCommandList(const TCHAR *param, const TCHAR *arg, StringList *value, A
       return SYSINFO_RC_UNSUPPORTED;
 
    LONG rc = SYSINFO_RC_ERROR;
-   SSHSession ssh(addr, port);
-   if (ssh.connect(login, password))
+   SSHSession *ssh = AcquireSession(addr, port, login, password);
+   if (ssh != NULL)
    {
-      StringList *output = ssh.execute(command);
+      StringList *output = ssh->execute(command);
       if (output != NULL)
       {
          value->addAll(output);
          rc = SYSINFO_RC_SUCCESS;
          delete output;
       }
-      ssh.disconnect();
+      ReleaseSession(ssh);
    }
    return rc;
 }
index fae2d1c..975cf91 100644 (file)
 **/
 
 #include "ssh_subagent.h"
+#include <libssh/callbacks.h>
+
+/**
+ * Configuration options
+ */
+UINT32 g_sshSessionIdleTimeout = 300;
 
 /**
  * Configuration file template
  */
 static NX_CFG_TEMPLATE m_cfgTemplate[] =
 {
+   { _T("SessionIdleTimeout"), CT_LONG, 0, 0, 0, 0, &g_sshSessionIdleTimeout },
        { _T(""), CT_END_OF_LIST, 0, 0, 0, 0, NULL }
 };
 
+#if defined(_WIN32) || _USE_GNU_PTH
+
+/**
+ * Mutex creation callback
+ */
+static int cb_mutex_init(void **mutex)
+{
+   *mutex = MutexCreate();
+   return 0;
+}
+
+/**
+ * Mutex destruction callback
+ */
+static int cb_mutex_destroy(void **mutex)
+{
+   MutexDestroy(*((MUTEX *)mutex));
+   return 0;
+}
+
+/**
+ * Mutex lock callback
+ */
+static int cb_mutex_lock(void **mutex)
+{
+   MutexLock(*((MUTEX *)mutex));
+   return 0;
+}
+
+/**
+ * Mutex unlock callback
+ */
+static int cb_mutex_unlock(void **mutex)
+{
+   MutexUnlock(*((MUTEX *)mutex));
+   return 0;
+}
+
+/**
+ * Thread ID callback
+ */
+static unsigned long cb_thread_id()
+{
+   return (unsigned long)GetCurrentThreadId();
+}
+
+/**
+ * Custom callbacks for libssh threading support
+ */
+static struct ssh_threads_callbacks_struct s_threadCallbacks =
+   {
+      "netxms", cb_mutex_init, cb_mutex_destroy, cb_mutex_lock, cb_mutex_unlock, cb_thread_id
+   };
+
+#endif
+
 /**
  * Subagent initialization
  */
 static BOOL SubagentInit(Config *config)
 {
+#if !defined(_WIN32) && !_USE_GNU_PTH
+   ssh_threads_set_callbacks(ssh_threads_get_noop());
+#else
+   ssh_threads_set_callbacks(&s_threadCallbacks);
+#endif
    ssh_init();
    nxlog_debug(2, _T("SSH: using libssh version %hs"), ssh_version(0));
+   InitializeSessionPool();
        return TRUE;
 }
 
@@ -45,6 +114,8 @@ static BOOL SubagentInit(Config *config)
  */
 static void SubagentShutdown()
 {
+   ShutdownSessionPool();
+   ssh_finalize();
 }
 
 /**
index 52a5b1f..d3b0a50 100644 (file)
@@ -17,7 +17,6 @@
 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 **
 ** File: session.cpp
-**
 **/
 
 #include "ssh_subagent.h"
 /**
  * SSH session constructor
  */
-SSHSession::SSHSession(const InetAddress& addr, UINT16 port)
+SSHSession::SSHSession(const InetAddress& addr, UINT16 port, INT32 id)
 {
+   m_id = id;
    m_addr = addr;
    m_port = port;
    m_session = NULL;
+   m_lastAccess = 0;
+   m_user[0] = 0;
+   m_busy = false;
+   _sntprintf(m_name, MAX_SSH_SESSION_NAME_LEN, _T("nobody@%s:%d/%d"), (const TCHAR *)m_addr.toString(), m_port, m_id);
 }
 
 /**
@@ -40,6 +44,33 @@ SSHSession::~SSHSession()
    disconnect();
 }
 
+/**
+ * Check if session match for given target
+ */
+bool SSHSession::match(const InetAddress& addr, UINT16 port, const TCHAR *user) const
+{
+   return addr.equals(m_addr) && ((unsigned int)port == m_port) && !_tcscmp(m_user, user);
+}
+
+/**
+ * Acquire session
+ */
+bool SSHSession::acquire()
+{
+   if (m_busy || !isConnected())
+      return false;
+   m_busy = true;
+   return true;
+}
+
+/**
+ * Release session
+ */
+void SSHSession::release()
+{
+   m_busy = false;
+}
+
 /**
  * Connect to server
  */
@@ -89,7 +120,13 @@ bool SSHSession::connect(const TCHAR *user, const TCHAR *password)
       nxlog_debug(6, _T("SSH: connect to %s:%d failed"), (const TCHAR *)m_addr.toString(), m_port);
    }
 
-   if (!success)
+   if (success)
+   {
+      nx_strncpy(m_user, user, MAX_SSH_LOGIN_LEN);
+      _sntprintf(m_name, MAX_SSH_SESSION_NAME_LEN, _T("%s@%s:%d/%d"), m_user, (const TCHAR *)m_addr.toString(), m_port, m_id);
+      m_lastAccess = time(NULL);
+   }
+   else
    {
       if (ssh_is_connected(m_session))
          ssh_disconnect(m_session);
@@ -187,5 +224,6 @@ StringList *SSHSession::execute(const TCHAR *command)
       nxlog_debug(6, _T("SSH: cannot open channel on %s:%d"), (const TCHAR *)m_addr.toString(), m_port);
    }
    ssh_channel_free(channel);
+   m_lastAccess = time(NULL);
    return output;
 }
diff --git a/src/agent/subagents/ssh/sp.cpp b/src/agent/subagents/ssh/sp.cpp
new file mode 100644 (file)
index 0000000..b6484d7
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+** NetXMS SSH subagent
+** Copyright (C) 2004-2016 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU General Public License as published by
+** the Free Software Foundation; either version 2 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: sp.cpp
+**/
+
+#include "ssh_subagent.h"
+
+/**
+ * Session pool
+ */
+static ObjectArray<SSHSession> s_sessions(16, 16, true);
+static MUTEX s_lock = MutexCreate();
+static VolatileCounter s_sessionId = 0;
+static CONDITION s_shutdownCondition = ConditionCreate(TRUE);
+static THREAD s_housekeeperThread = INVALID_THREAD_HANDLE;
+
+/**
+ * Acquire SSH session
+ */
+SSHSession *AcquireSession(const InetAddress& addr, UINT16 port, const TCHAR *user, const TCHAR *password)
+{
+   MutexLock(s_lock);
+   for(int i = 0; i < s_sessions.size(); i++)
+   {
+      SSHSession *s = s_sessions.get(i);
+      if (s->match(addr, port, user) && s->acquire())
+      {
+         nxlog_debug(7, _T("SSH: acquired existing session %s"), s->getName());
+         MutexUnlock(s_lock);
+         return s;
+      }
+   }
+   MutexUnlock(s_lock);
+
+   // No matching sessions, create new one
+   SSHSession *session = new SSHSession(addr, port, InterlockedIncrement(&s_sessionId));
+   if (!session->connect(user, password))
+   {
+      delete session;
+      return NULL;
+   }
+
+   session->acquire();
+   MutexLock(s_lock);
+   s_sessions.add(session);
+   MutexUnlock(s_lock);
+   return session;
+}
+
+/**
+ * Release SSH session
+ */
+void ReleaseSession(SSHSession *session)
+{
+   MutexLock(s_lock);
+   session->release();
+   if (!session->isConnected())
+   {
+      nxlog_debug(7, _T("SSH: disconnected session %s removed"), session->getName());
+      s_sessions.remove(session);
+   }
+   MutexUnlock(s_lock);
+}
+
+/**
+ * Housekeeping thread
+ */
+static THREAD_RESULT THREAD_CALL HousekeeperThread(void *arg)
+{
+   ObjectArray<SSHSession> deleteList(16, 16, true);
+   while(!ConditionWait(s_shutdownCondition, 30000))
+   {
+      MutexLock(s_lock);
+      time_t now = time(NULL);
+      for(int i = 0; i < s_sessions.size(); i++)
+      {
+         SSHSession *s = s_sessions.get(i);
+         if (!s->isBusy() && (!s->isConnected() || (now - s->getLastAccessTime() > g_sshSessionIdleTimeout)))
+         {
+            nxlog_debug(7, _T("SSH: session %s removed by housekeeper"), s->getName());
+            s_sessions.unlink(i);
+            i--;
+            deleteList.add(s);
+         }
+      }
+      MutexUnlock(s_lock);
+      deleteList.clear();
+   }
+   return THREAD_OK;
+}
+
+/**
+ * Initialize SSH session pool
+ */
+void InitializeSessionPool()
+{
+   s_housekeeperThread = ThreadCreateEx(HousekeeperThread, 0, NULL);
+   nxlog_debug(5, _T("SSH: connection pool initialized"));
+}
+
+/**
+ * Shutdown SSH session pool
+ */
+void ShutdownSessionPool()
+{
+   ConditionSet(s_shutdownCondition);
+   ThreadJoin(s_housekeeperThread);
+
+   MutexLock(s_lock);
+   s_sessions.clear();
+   MutexUnlock(s_lock);
+
+   nxlog_debug(5, _T("SSH: connection pool closed"));
+}
index 1437b00..1c7eb2d 100644 (file)
 #include <nms_agent.h>
 #include <libssh/libssh.h>
 
+#define MAX_SSH_SESSION_NAME_LEN (MAX_SSH_LOGIN_LEN + MAX_DNS_NAME + 16)
+
 /**
  * SSH session
  */
 class SSHSession
 {
 private:
+   INT32 m_id;
    InetAddress m_addr;
    unsigned int m_port;
+   TCHAR m_user[MAX_SSH_LOGIN_LEN];
    ssh_session m_session;
+   time_t m_lastAccess;
+   bool m_busy;
+   TCHAR m_name[MAX_SSH_SESSION_NAME_LEN];
 
 public:
-   SSHSession(const InetAddress& addr, UINT16 port);
+   SSHSession(const InetAddress& addr, UINT16 port, INT32 id = 0);
    ~SSHSession();
 
    bool connect(const TCHAR *user, const TCHAR *password);
    void disconnect();
+   bool isConnected() const { return (m_session != NULL) && ssh_is_connected(m_session); }
+
+   const TCHAR *getName() const { return m_name; }
+   time_t getLastAccessTime() const { return m_lastAccess; }
+   bool isBusy() const { return m_busy; }
+
+   bool match(const InetAddress& addr, UINT16 port, const TCHAR *user) const;
+
+   bool acquire();
+   void release();
+
    StringList *execute(const TCHAR *command);
 };
 
+/* Session pool */
+void InitializeSessionPool();
+void ShutdownSessionPool();
+SSHSession *AcquireSession(const InetAddress& addr, UINT16 port, const TCHAR *user, const TCHAR *password);
+void ReleaseSession(SSHSession *session);
+
 /* handlers */
 LONG H_SSHCommand(const TCHAR *param, const TCHAR *arg, TCHAR *value, AbstractCommSession *session);
 LONG H_SSHCommandList(const TCHAR *param, const TCHAR *arg, StringList *value, AbstractCommSession *session);
 
+/* globals */
+extern UINT32 g_sshSessionIdleTimeout;
+
 #endif