improved channel closure detection in agent tunnels
authorVictor Kirhenshtein <victor@netxms.org>
Sun, 23 Apr 2017 20:49:29 +0000 (23:49 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Sun, 23 Apr 2017 20:49:29 +0000 (23:49 +0300)
src/agent/core/tunnel.cpp
src/libnetxms/cch.cpp
src/libnetxms/message.cpp
src/server/core/tunnel.cpp
src/server/include/agent_tunnel.h

index 23aa500..a572165 100644 (file)
@@ -35,6 +35,11 @@ bool RegisterSession(CommSession *session);
 class Tunnel;
 
 /**
+ * Unique channel ID
+ */
+static VolatileCounter s_nextChannelId = 0;
+
+/**
  * Tunnel communication channel
  */
 class TunnelCommChannel : public AbstractCommChannel
@@ -55,7 +60,7 @@ protected:
    virtual ~TunnelCommChannel();
 
 public:
-   TunnelCommChannel(Tunnel *tunnel, UINT32 id);
+   TunnelCommChannel(Tunnel *tunnel);
 
    virtual int send(const void *data, size_t size, MUTEX mutex = INVALID_MUTEX_HANDLE);
    virtual int recv(void *buffer, size_t size, UINT32 timeout = INFINITE);
@@ -87,7 +92,7 @@ private:
    THREAD m_recvThread;
    MsgWaitQueue *m_queue;
    TCHAR m_debugId[64];
-   TunnelCommChannel **m_channels;
+   RefCountHashMap<UINT32, TunnelCommChannel> m_channels;
    MUTEX m_channelLock;
 
    Tunnel(const TCHAR *hostname, UINT16 port);
@@ -128,7 +133,7 @@ public:
 /**
  * Tunnel constructor
  */
-Tunnel::Tunnel(const TCHAR *hostname, UINT16 port)
+Tunnel::Tunnel(const TCHAR *hostname, UINT16 port) : m_channels(true)
 {
    m_hostname = _tcsdup(hostname);
    m_port = port;
@@ -142,7 +147,6 @@ Tunnel::Tunnel(const TCHAR *hostname, UINT16 port)
    m_recvThread = INVALID_THREAD_HANDLE;
    m_queue = NULL;
    _sntprintf(m_debugId, 64, _T("TUN-%s"), m_hostname);
-   m_channels = (TunnelCommChannel **)calloc(g_dwMaxSessions, sizeof(TunnelCommChannel *));
    m_channelLock = MutexCreate();
 }
 
@@ -159,7 +163,6 @@ Tunnel::~Tunnel()
    if (m_context != NULL)
       SSL_CTX_free(m_context);
    MutexDestroy(m_sslLock);
-   free(m_channels);
    MutexDestroy(m_channelLock);
    free(m_hostname);
 }
@@ -190,14 +193,14 @@ void Tunnel::disconnect()
 
    Array channels(g_dwMaxSessions, 16, false);
    MutexLock(m_channelLock);
-   for(UINT32 i = 0; i < g_dwMaxSessions; i++)
+   Iterator<TunnelCommChannel> *it = m_channels.iterator();
+   while(it->hasNext())
    {
-      if (m_channels[i] != NULL)
-      {
-         channels.add(m_channels[i]);
-         m_channels[i]->incRefCount();
-      }
+      TunnelCommChannel *c = it->next();
+      channels.add(c);
+      c->incRefCount();
    }
+   delete it;
    MutexUnlock(m_channelLock);
 
    for(int i = 0; i < channels.size(); i++)
@@ -252,13 +255,16 @@ void Tunnel::recvThread()
                createSession(msg);
                break;
             case CMD_CHANNEL_DATA:
-               if (msg->isBinary() && (msg->getId() < g_dwMaxSessions))
+               if (msg->isBinary())
                {
                   MutexLock(m_channelLock);
-                  TunnelCommChannel *channel = m_channels[msg->getId()];
+                  TunnelCommChannel *channel = m_channels.get(msg->getId());
+                  MutexUnlock(m_channelLock);
                   if (channel != NULL)
+                  {
                      channel->putData(msg->getBinaryData(), msg->getBinaryDataSize());
-                  MutexUnlock(m_channelLock);
+                     channel->decRefCount();
+                  }
                }
                break;
             case CMD_CLOSE_CHANNEL:
@@ -890,16 +896,11 @@ TunnelCommChannel *Tunnel::createChannel()
 {
    TunnelCommChannel *channel = NULL;
    MutexLock(m_channelLock);
-   for(UINT32 i = 0; i < g_dwMaxSessions; i++)
+   if (m_channels.size() < g_dwMaxSessions)
    {
-      if (m_channels[i] == NULL)
-      {
-         channel = new TunnelCommChannel(this, i);
-         m_channels[i] = channel;
-         channel->incRefCount();
-         debugPrintf(5, _T("New channel created (ID=%d)"), i);
-         break;
-      }
+      channel = new TunnelCommChannel(this);
+      m_channels.set(channel->getId(), channel);
+      debugPrintf(5, _T("New channel created (ID=%d)"), channel->getId());
    }
    MutexUnlock(m_channelLock);
    return channel;
@@ -912,13 +913,13 @@ void Tunnel::processChannelCloseRequest(NXCPMessage *request)
 {
    UINT32 id = request->getFieldAsUInt32(VID_CHANNEL_ID);
    debugPrintf(5, _T("Close request for channel %d"), id);
-   if (id < g_dwMaxSessions)
+   MutexLock(m_channelLock);
+   TunnelCommChannel *channel = m_channels.get(id);
+   MutexUnlock(m_channelLock);
+   if (channel != NULL)
    {
-      MutexLock(m_channelLock);
-      TunnelCommChannel *channel = m_channels[id];
-      MutexUnlock(m_channelLock);
-      if (channel != NULL)
-         channel->close();
+      channel->close();
+      channel->decRefCount();
    }
 }
 
@@ -927,14 +928,22 @@ void Tunnel::processChannelCloseRequest(NXCPMessage *request)
  */
 void Tunnel::closeChannel(TunnelCommChannel *channel)
 {
+   UINT32 id = 0;
    MutexLock(m_channelLock);
-   if (m_channels[channel->getId()] == channel)
+   if (m_channels.contains(channel->getId()))
    {
-      debugPrintf(5, _T("Channel %d closed"), channel->getId());
-      m_channels[channel->getId()] = NULL;
-      channel->decRefCount();
+      id = channel->getId();
+      debugPrintf(5, _T("Channel %d closed"), id);
+      m_channels.remove(id);
    }
    MutexUnlock(m_channelLock);
+
+   if (id != 0)
+   {
+      NXCPMessage msg(CMD_CLOSE_CHANNEL, 0);
+      msg.setField(VID_CHANNEL_ID, id);
+      sendMessage(&msg);
+   }
 }
 
 /**
@@ -973,10 +982,10 @@ Tunnel *Tunnel::createFromConfig(TCHAR *config)
 /**
  * Channel constructor
  */
-TunnelCommChannel::TunnelCommChannel(Tunnel *tunnel, UINT32 id)
+TunnelCommChannel::TunnelCommChannel(Tunnel *tunnel) : AbstractCommChannel()
 {
+   m_id = InterlockedIncrement(&s_nextChannelId);
    m_tunnel = tunnel;
-   m_id = id;
    m_active = true;
    m_closed = 0;
    m_allocated = 256 * 1024;
index 0f2d8ef..d009360 100644 (file)
@@ -26,7 +26,7 @@
 /**
  * Abstract communication channel constructor
  */
-AbstractCommChannel::AbstractCommChannel()
+AbstractCommChannel::AbstractCommChannel() : RefCountObject()
 {
 }
 
index 7b75d34..778a35b 100644 (file)
@@ -309,7 +309,7 @@ NXCPMessage::NXCPMessage(NXCP_MESSAGE *msg, int version)
 NXCPMessage::~NXCPMessage()
 {
    deleteAllFields();
-   safe_free(m_data);
+   free(m_data);
 }
 
 /**
index ac4e78f..7a93781 100644 (file)
@@ -320,6 +320,9 @@ void AgentTunnel::recvThread()
                }
             }
             break;
+         case CMD_CLOSE_CHANNEL:    // channel close notification
+            processChannelClose(msg->getFieldAsUInt32(VID_CHANNEL_ID));
+            break;
          default:
             m_queue.put(msg);
             msg = NULL; // prevent message deletion
@@ -328,6 +331,15 @@ void AgentTunnel::recvThread()
       delete msg;
    }
    UnregisterTunnel(this);
+
+   // shutdown all channels
+   MutexLock(m_channelLock);
+   Iterator<AgentTunnelCommChannel> *it = m_channels.iterator();
+   while(it->hasNext())
+      it->next()->shutdown();
+   delete it;
+   MutexUnlock(m_channelLock);
+
    debugPrintf(5, _T("Receiver thread stopped"));
 }
 
@@ -595,6 +607,23 @@ AgentTunnelCommChannel *AgentTunnel::createChannel()
 }
 
 /**
+ * Process channel close notification from agent
+ */
+void AgentTunnel::processChannelClose(UINT32 channelId)
+{
+   debugPrintf(4, _T("processChannelClose: notification of channel %d closure"), channelId);
+
+   MutexLock(m_channelLock);
+   AgentTunnelCommChannel *ch = m_channels.get(channelId);
+   MutexUnlock(m_channelLock);
+   if (ch != NULL)
+   {
+      ch->shutdown();
+      ch->decRefCount();
+   }
+}
+
+/**
  * Close channel
  */
 void AgentTunnel::closeChannel(AgentTunnelCommChannel *channel)
index 31684e7..7df8087 100644 (file)
@@ -110,6 +110,7 @@ protected:
    NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return m_queue.waitForMessage(code, id, 5000); }
 
    void processCertificateRequest(NXCPMessage *request);
+   void processChannelClose(UINT32 channelId);
 
    void setup(const NXCPMessage *request);