channel management on agent side; AgentConnection class updated to use AbstractCommun...
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 6 Mar 2017 21:22:00 +0000 (23:22 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 6 Mar 2017 21:22:00 +0000 (23:22 +0200)
15 files changed:
include/nms_cscp.h
include/nxcpapi.h
src/agent/core/comm.cpp
src/agent/core/tunnel.cpp
src/java/client/netxms-base/src/main/java/org/netxms/base/NXCPCodes.java
src/libnetxms/nxcp.cpp
src/server/core/download_job.cpp
src/server/core/node.cpp
src/server/include/nxcore_jobs.h
src/server/include/nxsrvapi.h
src/server/libnxsrv/agent.cpp
src/server/tools/nxaction/nxaction.cpp
src/server/tools/nxap/nxap.cpp
src/server/tools/nxget/nxget.cpp
src/server/tools/nxupload/nxupload.cpp

index a890e8d..670b858 100644 (file)
@@ -581,7 +581,9 @@ typedef struct
 #define CMD_GET_FOLDER_SIZE            0x015A
 #define CMD_FIND_HOSTNAME_LOCATION     0x015B
 #define CMD_RESET_TUNNEL               0x015C
-#define CMD_CREATE_SESSION             0x015D
+#define CMD_CREATE_CHANNEL             0x015D
+#define CMD_CHANNEL_DATA               0x015E
+#define CMD_CLOSE_CHANNEL              0x015F
 
 #define CMD_RS_LIST_REPORTS            0x1100
 #define CMD_RS_GET_REPORT              0x1101
@@ -1170,6 +1172,7 @@ typedef struct
 #define VID_AGENT_COMPRESSION_MODE  ((UINT32)571)
 #define VID_TRAP_TYPE               ((UINT32)572)
 #define VID_IS_ACTIVE               ((UINT32)573)
+#define VID_CHANNEL_ID              ((UINT32)574)
 
 // Base variabe for single threshold in message
 #define VID_THRESHOLD_BASE          ((UINT32)0x00800000)
index 3344456..170b0d0 100644 (file)
@@ -549,11 +549,20 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessage(SOCKET hSocket, NXCP_MESSAGE *pMsg,
                                          NXCP_BUFFER *pBuffer, UINT32 dwMaxMsgSize,
                                          NXCPEncryptionContext **ppCtx,
                                          BYTE *pDecryptionBuffer, UINT32 dwTimeout);
+int LIBNETXMS_EXPORTABLE RecvNXCPMessage(AbstractCommChannel *channel, NXCP_MESSAGE *pMsg,
+                                         NXCP_BUFFER *pBuffer, UINT32 dwMaxMsgSize,
+                                         NXCPEncryptionContext **ppCtx,
+                                         BYTE *pDecryptionBuffer, UINT32 dwTimeout);
 int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuffer,
                                            NXCP_BUFFER *nxcpBuffer, UINT32 *bufferSize,
                                            NXCPEncryptionContext **ppCtx,
                                            BYTE **decryptionBuffer, UINT32 dwTimeout,
                                                                                                                 UINT32 maxMsgSize);
+int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(AbstractCommChannel *channel, NXCP_MESSAGE **msgBuffer,
+                                           NXCP_BUFFER *nxcpBuffer, UINT32 *bufferSize,
+                                           NXCPEncryptionContext **ppCtx,
+                                           BYTE **decryptionBuffer, UINT32 dwTimeout,
+                                           UINT32 maxMsgSize);
 NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(UINT16 code, UINT32 id, UINT16 flags,
                                                         const void *data, size_t dataSize,
                                                         NXCP_MESSAGE *buffer, bool allowCompression);
@@ -565,7 +574,8 @@ bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(AbstractCommChannel *channel, UINT32
                                            NXCPEncryptionContext *pCtx, long offset,
                                            void (* progressCallback)(INT64, void *), void *cbArg,
                                            MUTEX mutex, NXCPStreamCompressionMethod compressionMethod = NXCP_STREAM_COMPRESSION_NONE);
-bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex);
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET s, int *pnVersion, MUTEX mutex);
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(AbstractCommChannel *channel, int *pnVersion, MUTEX mutex);
 
 TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(UINT16 wCode, TCHAR *buffer);
 void LIBNETXMS_EXPORTABLE NXCPRegisterMessageNameResolver(NXCPMessageNameResolver r);
index b7fec42..9bad887 100644 (file)
@@ -80,7 +80,7 @@ void DestroySessionList()
 /**
  * Validates server's address
  */
-static bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer)
+bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer)
 {
    for(int i = 0; i < g_serverList.size(); i++)
        {
@@ -98,23 +98,21 @@ static bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer,
 /**
  * Register new session in list
  */
-static BOOL RegisterSession(CommSession *pSession)
+bool RegisterSession(CommSession *session)
 {
-   UINT32 i;
-
    MutexLock(g_hSessionListAccess);
-   for(i = 0; i < g_dwMaxSessions; i++)
+   for(UINT32 i = 0; i < g_dwMaxSessions; i++)
       if (g_pSessionList[i] == NULL)
       {
-         g_pSessionList[i] = pSession;
-         pSession->setIndex(i);
+         g_pSessionList[i] = session;
+         session->setIndex(i);
          MutexUnlock(g_hSessionListAccess);
-         return TRUE;
+         return true;
       }
 
    MutexUnlock(g_hSessionListAccess);
    nxlog_write(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
-   return FALSE;
+   return false;
 }
 
 /**
index 642af9c..96e91d6 100644 (file)
 #include "nxagentd.h"
 
 /**
+ * Check if server address is valid
+ */
+bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer);
+
+/**
+ * Register session
+ */
+bool RegisterSession(CommSession *session);
+
+class Tunnel;
+
+/**
+ * Tunnel communication channel
+ */
+class TunnelCommChannel : public AbstractCommChannel
+{
+private:
+   Tunnel *m_tunnel;
+   UINT32 m_id;
+   bool m_active;
+   BYTE *m_buffer;
+   size_t m_allocated;
+   size_t m_head;
+   size_t m_size;
+   MUTEX m_bufferLock;
+   CONDITION m_dataCondition;
+
+protected:
+   virtual ~TunnelCommChannel();
+
+public:
+   TunnelCommChannel(Tunnel *tunnel, UINT32 id);
+
+   virtual int send(const void *data, size_t size, MUTEX mutex = INVALID_MUTEX_HANDLE);
+   virtual int recv(void *buffer, size_t size, UINT32 timeout = INFINITE);
+   virtual int poll(UINT32 timeout, bool write = false);
+   virtual int shutdown() { return 0; }
+   virtual void close();
+
+   UINT32 getId() const { return m_id; }
+
+   void putData(const BYTE *data, size_t size);
+};
+
+/**
  * Tunnel class
  */
 class Tunnel
@@ -40,6 +85,8 @@ private:
    THREAD m_recvThread;
    MsgWaitQueue *m_queue;
    TCHAR m_debugId[64];
+   TunnelCommChannel **m_channels;
+   MUTEX m_channelLock;
 
    Tunnel(const InetAddress& addr, UINT16 port);
 
@@ -48,6 +95,7 @@ private:
    NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return (m_queue != NULL) ? m_queue->waitForMessage(code, id, 5000) : NULL; }
 
    void processBindRequest(NXCPMessage *request);
+   void createSession(NXCPMessage *request);
 
    X509_REQ *createCertificateRequest(const char *cn, EVP_PKEY **pkey);
    bool saveCertificate(X509 *cert, EVP_PKEY *key);
@@ -64,6 +112,10 @@ public:
    void checkConnection();
    void disconnect();
 
+   TunnelCommChannel *createChannel();
+   void closeChannel(TunnelCommChannel *channel);
+   int sendChannelData(UINT32 id, const void *data, size_t len);
+
    const InetAddress& getAddress() const { return m_address; }
 
    static Tunnel *createFromConfig(TCHAR *config);
@@ -85,6 +137,8 @@ Tunnel::Tunnel(const InetAddress& addr, UINT16 port) : m_address(addr)
    m_recvThread = INVALID_THREAD_HANDLE;
    m_queue = NULL;
    _sntprintf(m_debugId, 64, _T("TUN-%s"), (const TCHAR *)addr.toString());
+   m_channels = (TunnelCommChannel **)malloc(sizeof(TunnelCommChannel *) * g_dwMaxSessions);
+   m_channelLock = MutexCreate();
 }
 
 /**
@@ -92,6 +146,17 @@ Tunnel::Tunnel(const InetAddress& addr, UINT16 port) : m_address(addr)
  */
 Tunnel::~Tunnel()
 {
+   MutexLock(m_channelLock);
+   for(UINT32 i = 0; i < g_dwMaxSessions; i++)
+   {
+      if (m_channels[i] != NULL)
+      {
+         m_channels[i]->decRefCount();
+         m_channels[i] = NULL;
+      }
+   }
+   MutexUnlock(m_channelLock);
+
    disconnect();
    if (m_socket != INVALID_SOCKET)
       closesocket(m_socket);
@@ -100,6 +165,8 @@ Tunnel::~Tunnel()
    if (m_context != NULL)
       SSL_CTX_free(m_context);
    MutexDestroy(m_sslLock);
+   free(m_channels);
+   MutexDestroy(m_channelLock);
 }
 
 /**
@@ -167,7 +234,28 @@ void Tunnel::recvThread()
                ThreadPoolExecute(g_commThreadPool, this, &Tunnel::processBindRequest, msg);
                msg = NULL; // prevent message deletion
                break;
-            case CMD_CREATE_SESSION:
+            case CMD_CREATE_CHANNEL:
+               createSession(msg);
+               break;
+            case CMD_CHANNEL_DATA:
+               if (msg->isBinary() && (msg->getId() >= 0) && (msg->getId() < g_dwMaxSessions))
+               {
+                  MutexLock(m_channelLock);
+                  TunnelCommChannel *channel = m_channels[msg->getId()];
+                  if (channel != NULL)
+                     channel->putData(msg->getBinaryData(), msg->getBinaryDataSize());
+                  MutexUnlock(m_channelLock);
+               }
+               break;
+            case CMD_CLOSE_CHANNEL:
+               if ((msg->getId() >= 0) && (msg->getId() < g_dwMaxSessions))
+               {
+                  MutexLock(m_channelLock);
+                  TunnelCommChannel *channel = m_channels[msg->getId()];
+                  MutexUnlock(m_channelLock);
+                  if (channel != NULL)
+                     closeChannel(channel);
+               }
                break;
             default:
                m_queue->put(msg);
@@ -670,6 +758,88 @@ void Tunnel::processBindRequest(NXCPMessage *request)
 }
 
 /**
+ * Create new session
+ */
+void Tunnel::createSession(NXCPMessage *request)
+{
+   NXCPMessage response(CMD_REQUEST_COMPLETED, request->getId());
+
+   // Assume that peer always have minimal access, so don't check return value
+   bool masterServer, controlServer;
+   IsValidServerAddress(m_address, &masterServer, &controlServer);
+
+   TunnelCommChannel *channel = createChannel();
+   if (channel != NULL)
+   {
+      CommSession *session = new CommSession(channel, m_address, masterServer, controlServer);
+      if (RegisterSession(session))
+      {
+         response.setField(VID_RCC, ERR_SUCCESS);
+         response.setField(VID_CHANNEL_ID, channel->getId());
+         debugPrintf(9, _T("New session registered"));
+         session->run();
+      }
+      else
+      {
+         delete session;
+         response.setField(VID_RCC, ERR_OUT_OF_RESOURCES);
+      }
+   }
+   else
+   {
+      response.setField(VID_RCC, ERR_OUT_OF_RESOURCES);
+   }
+
+   sendMessage(&response);
+}
+
+/**
+ * Create channel
+ */
+TunnelCommChannel *Tunnel::createChannel()
+{
+   TunnelCommChannel *channel = NULL;
+   MutexLock(m_channelLock);
+   for(UINT32 i = 0; i < g_dwMaxSessions; i++)
+   {
+      if (m_channels[i] == NULL)
+      {
+         channel = new TunnelCommChannel(this, i);
+         m_channels[i] = channel;
+         debugPrintf(5, _T("New channel created (ID=%d)"), i);
+         break;
+      }
+   }
+   MutexUnlock(m_channelLock);
+   return channel;
+}
+
+/**
+ * Close channel
+ */
+void Tunnel::closeChannel(TunnelCommChannel *channel)
+{
+   MutexLock(m_channelLock);
+   m_channels[channel->getId()] = NULL;
+   MutexUnlock(m_channelLock);
+   debugPrintf(5, _T("Channel %d closed"), channel->getId());
+   channel->decRefCount();
+}
+
+/**
+ * Send channel data
+ */
+int Tunnel::sendChannelData(UINT32 id, const void *data, size_t len)
+{
+   NXCP_MESSAGE *msg = CreateRawNXCPMessage(CMD_CHANNEL_DATA, id, 0, data, len, NULL, false);
+   MutexLock(m_sslLock);
+   int rc = SSL_write(m_ssl, msg, ntohl(msg->size));
+   MutexUnlock(m_sslLock);
+   free(msg);
+   return rc;
+}
+
+/**
  * Create tunnel object from configuration record
  */
 Tunnel *Tunnel::createFromConfig(TCHAR *config)
@@ -695,6 +865,108 @@ Tunnel *Tunnel::createFromConfig(TCHAR *config)
 }
 
 /**
+ * Channel constructor
+ */
+TunnelCommChannel::TunnelCommChannel(Tunnel *tunnel, UINT32 id)
+{
+   m_tunnel = tunnel;
+   m_id = id;
+   m_active = true;
+   m_allocated = 256 * 1024;
+   m_head = 0;
+   m_size = 0;
+   m_buffer = (BYTE *)malloc(m_allocated);
+   m_bufferLock = MutexCreate();
+   m_dataCondition = ConditionCreate(TRUE);
+}
+
+/**
+ * Channel destructor
+ */
+TunnelCommChannel::~TunnelCommChannel()
+{
+   free(m_buffer);
+   MutexDestroy(m_bufferLock);
+   ConditionDestroy(m_dataCondition);
+}
+
+/**
+ * Send data
+ */
+int TunnelCommChannel::send(const void *data, size_t size, MUTEX mutex)
+{
+   return m_active ? m_tunnel->sendChannelData(m_id, data, size) : -1;
+}
+
+/**
+ * Receive data
+ */
+int TunnelCommChannel::recv(void *buffer, size_t size, UINT32 timeout)
+{
+   if (!m_active)
+      return 0;
+
+   if (!ConditionWait(m_dataCondition, timeout))
+      return -2;
+
+   MutexLock(m_bufferLock);
+   size_t bytes = min(size, m_size);
+   memcpy(buffer, &m_buffer[m_head], bytes);
+   m_size -= bytes;
+   if (m_size == 0)
+   {
+      m_head = 0;
+      ConditionReset(m_dataCondition);
+   }
+   else
+   {
+      m_head += bytes;
+   }
+   MutexUnlock(m_bufferLock);
+   return (int)bytes;
+}
+
+/**
+ * Poll for data
+ */
+int TunnelCommChannel::poll(UINT32 timeout, bool write)
+{
+   if (write)
+      return 1;
+
+   if (!m_active)
+      return -1;
+
+   return ConditionWait(m_dataCondition, timeout) ? 1 : 0;
+}
+
+/**
+ * Close channel
+ */
+void TunnelCommChannel::close()
+{
+   m_active = false;
+   m_tunnel->closeChannel(this);
+}
+
+/**
+ * Put data into buffer
+ */
+void TunnelCommChannel::putData(const BYTE *data, size_t size)
+{
+   MutexLock(m_bufferLock);
+   if (m_head + m_size + size > m_allocated)
+   {
+      m_allocated = m_head + m_size + size;
+      m_buffer = (BYTE *)realloc(m_buffer, m_allocated);
+   }
+   memcpy(&m_buffer[m_head + m_size], data, size);
+   m_size += size;
+   MutexUnlock(m_bufferLock);
+   ConditionSet(m_dataCondition);
+}
+
+/**
  * Configured tunnels
  */
 static ObjectArray<Tunnel> s_tunnels;
index e3dd483..a1b6111 100644 (file)
@@ -369,6 +369,7 @@ public class NXCPCodes
    public static final int CMD_FIND_HOSTNAME_LOCATION = 0x015B;
    public static final int CMD_RESET_TUNNEL = 0x015C;
    public static final int CMD_CREATE_SESSION = 0x015D;
+   public static final int CMD_CHANNEL_DATA = 0x015E;
 
        // CMD_RS_ - Reporting Server related codes
        public static final int CMD_RS_LIST_REPORTS = 0x1100;
index beb0815..f38ed3d 100644 (file)
@@ -387,10 +387,12 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer)
       _T("CMD_GET_FOLDER_SIZE"),
       _T("CMD_FIND_HOSTNAME_LOCATION"),
       _T("CMD_RESET_TUNNEL"),
-      _T("CMD_CREATE_SESSION")
+      _T("CMD_CREATE_CHANNEL"),
+      _T("CMD_CHANNEL_DATA"),
+      _T("CMD_CLOSE_CHANNEL")
    };
 
-   if ((code >= CMD_LOGIN) && (code <= CMD_CREATE_SESSION))
+   if ((code >= CMD_LOGIN) && (code <= CMD_CLOSE_CHANNEL))
    {
       _tcscpy(pszBuffer, pszMsgNames[code - CMD_LOGIN]);
    }
@@ -444,7 +446,7 @@ void LIBNETXMS_EXPORTABLE NXCPUnregisterMessageNameResolver(NXCPMessageNameResol
  *   2 Message decryption failed
  *   3 Receive timeout
  */
-int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuffer,
+int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(AbstractCommChannel *channel, NXCP_MESSAGE **msgBuffer,
                                            NXCP_BUFFER *nxcpBuffer, UINT32 *bufferSize,
                                            NXCPEncryptionContext **ppCtx,
                                            BYTE **decryptionBuffer, UINT32 dwTimeout,
@@ -475,8 +477,8 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuf
 
          // Receive new portion of data from the network
          // and append it to existing data in buffer
-                       iErr = RecvEx(hSocket, &nxcpBuffer->buffer[nxcpBuffer->bufferSize],
-                       NXCP_TEMP_BUF_SIZE - nxcpBuffer->bufferSize, 0, dwTimeout);
+                       iErr = channel->recv(&nxcpBuffer->buffer[nxcpBuffer->bufferSize],
+                       NXCP_TEMP_BUF_SIZE - nxcpBuffer->bufferSize, dwTimeout);
          if (iErr <= 0)
             return (iErr == -2) ? 3 : iErr;
          nxcpBuffer->bufferSize += (UINT32)iErr;
@@ -516,8 +518,8 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuf
        nxcpBuffer->bufferPos = 0;
    do
    {
-               iErr = RecvEx(hSocket, &nxcpBuffer->buffer[nxcpBuffer->bufferSize],
-                             NXCP_TEMP_BUF_SIZE - nxcpBuffer->bufferSize, 0, dwTimeout);
+               iErr = channel->recv(&nxcpBuffer->buffer[nxcpBuffer->bufferSize],
+                                    NXCP_TEMP_BUF_SIZE - nxcpBuffer->bufferSize, dwTimeout);
       if (iErr <= 0)
          return (iErr == -2) ? 3 : iErr;
 
@@ -589,6 +591,18 @@ decrypt_message:
    return bSkipMsg ? 1 : (int)dwMsgSize;
 }
 
+int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuffer,
+                                           NXCP_BUFFER *nxcpBuffer, UINT32 *bufferSize,
+                                           NXCPEncryptionContext **ppCtx,
+                                           BYTE **decryptionBuffer, UINT32 dwTimeout,
+                                           UINT32 maxMsgSize)
+{
+   SocketCommChannel *channel = new SocketCommChannel(hSocket, false);
+   int result = RecvNXCPMessageEx(channel, msgBuffer, nxcpBuffer, bufferSize, ppCtx, decryptionBuffer, dwTimeout, maxMsgSize);
+   channel->decRefCount();
+   return result;
+}
+
 int LIBNETXMS_EXPORTABLE RecvNXCPMessage(SOCKET hSocket, NXCP_MESSAGE *msgBuffer,
                                          NXCP_BUFFER *nxcpBuffer, UINT32 bufferSize,
                                          NXCPEncryptionContext **ppCtx,
@@ -601,6 +615,18 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessage(SOCKET hSocket, NXCP_MESSAGE *msgBuffer
                                 (decryptionBuffer != NULL) ? &db : NULL, dwTimeout, bufferSize);
 }
 
+int LIBNETXMS_EXPORTABLE RecvNXCPMessage(AbstractCommChannel *channel, NXCP_MESSAGE *msgBuffer,
+                                         NXCP_BUFFER *nxcpBuffer, UINT32 bufferSize,
+                                         NXCPEncryptionContext **ppCtx,
+                                         BYTE *decryptionBuffer, UINT32 dwTimeout)
+{
+   NXCP_MESSAGE *mb = msgBuffer;
+   UINT32 bs = bufferSize;
+   BYTE *db = decryptionBuffer;
+   return RecvNXCPMessageEx(channel, (msgBuffer != NULL) ? &mb : NULL, nxcpBuffer, &bs, ppCtx,
+                            (decryptionBuffer != NULL) ? &db : NULL, dwTimeout, bufferSize);
+}
+
 /**
  * Create NXCP message with raw data (MF_BINARY flag)
  * If buffer is NULL, new buffer is allocated with malloc()
@@ -816,7 +842,7 @@ bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(AbstractCommChannel *channel, UINT32
 /**
  * Get version of NXCP used by peer
  */
-bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex)
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(AbstractCommChannel *channel, int *pnVersion, MUTEX mutex)
 {
    NXCP_MESSAGE msg;
    NXCPEncryptionContext *pDummyCtx = NULL;
@@ -829,11 +855,11 @@ bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVers
    msg.size = htonl(NXCP_HEADER_SIZE);
    msg.code = htons(CMD_GET_NXCP_CAPS);
    msg.flags = htons(MF_CONTROL);
-   if (SendEx(hSocket, &msg, NXCP_HEADER_SIZE, 0, mutex) == NXCP_HEADER_SIZE)
+   if (channel->send(&msg, NXCP_HEADER_SIZE, mutex) == NXCP_HEADER_SIZE)
    {
       pBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
       RecvNXCPMessage(0, NULL, pBuffer, 0, NULL, NULL, 0);
-      nSize = RecvNXCPMessage(hSocket, &msg, pBuffer, NXCP_HEADER_SIZE, &pDummyCtx, NULL, 30000);
+      nSize = RecvNXCPMessage(channel, &msg, pBuffer, NXCP_HEADER_SIZE, &pDummyCtx, NULL, 30000);
       if ((nSize == NXCP_HEADER_SIZE) &&
           (ntohs(msg.code) == CMD_NXCP_CAPS) &&
           (ntohs(msg.flags) & MF_CONTROL))
@@ -853,3 +879,14 @@ bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVers
    }
    return success;
 }
+
+/**
+ * Get version of NXCP used by peer
+ */
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET s, int *pnVersion, MUTEX mutex)
+{
+   SocketCommChannel *channel = new SocketCommChannel(s, false);
+   bool success = NXCPGetPeerProtocolVersion(channel, pnVersion, mutex);
+   channel->decRefCount();
+   return success;
+}
index e363cdf..e22c7a8 100644 (file)
@@ -31,6 +31,8 @@ FileDownloadJob::FileDownloadJob(Node *node, const TCHAR *remoteFile, UINT32 max
        m_session = session;
        session->incRefCount();
 
+       m_agentConnection = NULL;
+
        m_node = node;
        node->incRefCount();
 
@@ -58,26 +60,28 @@ FileDownloadJob::FileDownloadJob(Node *node, const TCHAR *remoteFile, UINT32 max
 }
 
 /**
- * Returns localFileName
- */
-const TCHAR *FileDownloadJob::getLocalFileName()
-{
-   return m_localFile;
-}
-
-/**
  * Destructor for download job
  */
 FileDownloadJob::~FileDownloadJob()
 {
        m_node->decRefCount();
        m_session->decRefCount();
+       if (m_agentConnection != NULL)
+          m_agentConnection->decRefCount();
        free(m_localFile);
        free(m_remoteFile);
        free(m_info);
 }
 
 /**
+ * Returns localFileName
+ */
+const TCHAR *FileDownloadJob::getLocalFileName()
+{
+   return m_localFile;
+}
+
+/**
  * Send message callback
  */
 void FileDownloadJob::fileResendCallback(NXCP_MESSAGE *msg, void *arg)
@@ -117,19 +121,18 @@ ServerJobResult FileDownloadJob::run()
       m_follow = false;
    }
 
-   AgentConnection *conn = m_node->createAgentConnection();
-       if (conn != NULL)
+   m_agentConnection = m_node->createAgentConnection();
+       if (m_agentConnection != NULL)
        {
-               NXCPMessage msg(conn->getProtocolVersion()), *response;
+               NXCPMessage msg(m_agentConnection->getProtocolVersion()), *response;
 
-               m_socket = conn->getSocket();
-               conn->setDeleteFileOnDownloadFailure(false);
+               m_agentConnection->setDeleteFileOnDownloadFailure(false);
 
                DbgPrintf(5, _T("FileDownloadJob: Sending file stat request for file %s@%s"), m_remoteFile, m_node->getName());
                msg.setCode(CMD_GET_FILE_DETAILS);
-               msg.setId(conn->generateRequestId());
+               msg.setId(m_agentConnection->generateRequestId());
                msg.setField(VID_FILE_NAME, m_remoteFile);
-               response = conn->customRequest(&msg);
+               response = m_agentConnection->customRequest(&msg);
                if (response != NULL)
                {
                        NXCPMessage notify;
@@ -147,7 +150,7 @@ ServerJobResult FileDownloadJob::run()
 
                                DbgPrintf(5, _T("FileDownloadJob: Sending download request for file %s@%s"), m_remoteFile, m_node->getName());
                                msg.setCode(CMD_GET_AGENT_FILE);
-                               msg.setId(conn->generateRequestId());
+                               msg.setId(m_agentConnection->generateRequestId());
                                msg.setField(VID_FILE_NAME, m_remoteFile);
 
             // default - get parameters
@@ -163,7 +166,7 @@ ServerJobResult FileDownloadJob::run()
             msg.setField(VID_NAME, m_localFile);
             msg.setField(VID_ENABLE_COMPRESSION, (m_session == NULL) || m_session->isCompressionEnabled());
 
-                               response = conn->customRequest(&msg, m_localFile, false, progressCallback, fileResendCallback, this);
+                               response = m_agentConnection->customRequest(&msg, m_localFile, false, progressCallback, fileResendCallback, this);
                                if (response != NULL)
                                {
                                        rcc = response->getFieldAsUInt32(VID_RCC);
@@ -215,7 +218,7 @@ ServerJobResult FileDownloadJob::run()
                m_session->sendMessage(&response);
                if (m_follow)
                {
-         g_monitoringList.addMonitoringFile(newFile, m_node, conn);
+         g_monitoringList.addMonitoringFile(newFile, m_node, m_agentConnection);
                }
                else
                {
@@ -249,8 +252,11 @@ ServerJobResult FileDownloadJob::run()
                m_session->sendMessage(&response);
        }
 
-       if (conn != NULL)
-          conn->decRefCount();
+       if (m_agentConnection != NULL)
+       {
+          m_agentConnection->decRefCount();
+          m_agentConnection = NULL;
+       }
        return success;
 }
 
@@ -259,7 +265,8 @@ ServerJobResult FileDownloadJob::run()
  */
 bool FileDownloadJob::onCancel()
 {
-       shutdown(m_socket, SHUT_RDWR);
+   if (m_agentConnection != NULL)
+      m_agentConnection->disconnect();
        return true;
 }
 
index 92b355a..d7d5140 100644 (file)
@@ -3786,7 +3786,7 @@ bool Node::connectToAgent(UINT32 *error, UINT32 *socketError, bool *newConnectio
    setAgentProxy(m_agentConnection);
    m_agentConnection->setCommandTimeout(g_agentCommandTimeout);
    DbgPrintf(7, _T("Node::connectToAgent(%s [%d]): calling connect on port %d"), m_name, m_id, (int)m_agentPort);
-   bool success = m_agentConnection->connect(g_pServerKey, FALSE, error, socketError, g_serverId);
+   bool success = m_agentConnection->connect(g_pServerKey, error, socketError, g_serverId);
    if (success)
    {
       UINT32 rcc = m_agentConnection->setServerId(g_serverId);
@@ -5387,7 +5387,7 @@ AgentConnectionEx *Node::createAgentConnection(bool sendServerId)
    AgentConnectionEx *conn = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret, isAgentCompressionAllowed());
    setAgentProxy(conn);
    conn->setCommandTimeout(g_agentCommandTimeout);
-   if (!conn->connect(g_pServerKey, FALSE, NULL, NULL, sendServerId ? g_serverId : 0))
+   if (!conn->connect(g_pServerKey, NULL, NULL, sendServerId ? g_serverId : 0))
    {
       conn->decRefCount();
       conn = NULL;
index 6368186..aab1d47 100644 (file)
@@ -219,13 +219,13 @@ class FileDownloadJob : public ServerJob
 private:
        Node *m_node;
        ClientSession *m_session;
+       AgentConnection *m_agentConnection;
        UINT32 m_requestId;
        TCHAR *m_localFile;
        TCHAR *m_remoteFile;
        TCHAR *m_info;
        INT64 m_fileSize;
        INT64 m_currentSize;
-       SOCKET m_socket;
        UINT32 m_maxFileSize;
        bool m_follow;
 
index 4659438..9717535 100644 (file)
@@ -465,7 +465,7 @@ private:
    int m_iAuthMethod;
    char m_szSecret[MAX_SECRET_LENGTH];
    time_t m_tLastCommandTime;
-   SOCKET m_hSocket;
+   AbstractCommChannel *m_channel;
    UINT32 m_dwNumDataLines;
    VolatileCounter m_requestId;
    UINT32 m_dwCommandTimeout;
@@ -493,7 +493,7 @@ private:
        void (*m_downloadProgressCallback)(size_t, void *);
        void *m_downloadProgressCallbackArg;
        bool m_deleteFileOnDownloadFailure;
-       void (*m_sendToClientMessageCallback)(NXCP_MESSAGE*, void *);
+       void (*m_sendToClientMessageCallback)(NXCP_MESSAGE *, void *);
        bool m_fileUploadInProgress;
        bool m_allowCompression;
 
@@ -513,8 +513,10 @@ protected:
    UINT32 authenticate(BOOL bProxyData);
    UINT32 setupProxyConnection();
    const InetAddress& getIpAddr() { return m_addr; }
-       UINT32 prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append, void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE*, void *), void *cbArg);
+       UINT32 prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append,
+                void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg);
 
+       virtual AbstractCommChannel *createChannel();
    virtual void printMsg(const TCHAR *format, ...);
    virtual void onTrap(NXCPMessage *pMsg);
    virtual void onSyslogMessage(NXCPMessage *pMsg);
@@ -541,11 +543,10 @@ public:
    void incRefCount() { InterlockedIncrement(&m_userRefCount); }
    void decRefCount() { if (InterlockedDecrement(&m_userRefCount) == 0) { disconnect(); decInternalRefCount(); } }
 
-   bool connect(RSA *pServerKey = NULL, BOOL bVerbose = FALSE, UINT32 *pdwError = NULL, UINT32 *pdwSocketError = NULL, UINT64 serverId = 0);
+   bool connect(RSA *pServerKey = NULL, UINT32 *pdwError = NULL, UINT32 *pdwSocketError = NULL, UINT64 serverId = 0);
    void disconnect();
    bool isConnected() const { return m_isConnected; }
        int getProtocolVersion() const { return m_nProtocolVersion; }
-       SOCKET getSocket() { return m_hSocket; }
        bool isCompressionAllowed() const { return m_allowCompression && (m_nProtocolVersion >= 4); }
 
    bool sendMessage(NXCPMessage *pMsg);
@@ -561,8 +562,11 @@ public:
    UINT32 nop();
    UINT32 setServerCapabilities();
    UINT32 setServerId(UINT64 serverId);
-   UINT32 execAction(const TCHAR *action, int argc, const TCHAR * const *argv, bool withOutput = false, void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *) = NULL, void *cbData = NULL);
-   UINT32 uploadFile(const TCHAR *localFile, const TCHAR *destinationFile = NULL, void (* progressCallback)(INT64, void *) = NULL, void *cbArg = NULL, NXCPStreamCompressionMethod compMethod = NXCP_STREAM_COMPRESSION_NONE);
+   UINT32 execAction(const TCHAR *action, int argc, const TCHAR * const *argv, bool withOutput = false,
+            void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *) = NULL, void *cbData = NULL);
+   UINT32 uploadFile(const TCHAR *localFile, const TCHAR *destinationFile = NULL,
+            void (* progressCallback)(INT64, void *) = NULL, void *cbArg = NULL,
+            NXCPStreamCompressionMethod compMethod = NXCP_STREAM_COMPRESSION_NONE);
    UINT32 startUpgrade(const TCHAR *pszPkgName);
    UINT32 checkNetworkService(UINT32 *pdwStatus, const InetAddress& addr, int iServiceType, WORD wPort = 0,
                               WORD wProto = 0, const TCHAR *pszRequest = NULL, const TCHAR *pszResponse = NULL, UINT32 *responseTime = NULL);
@@ -576,8 +580,9 @@ public:
    UINT32 takeScreenshot(const TCHAR *sessionName, BYTE **data, size_t *size);
 
        UINT32 generateRequestId() { return (UINT32)InterlockedIncrement(&m_requestId); }
-       NXCPMessage *customRequest(NXCPMessage *pRequest, const TCHAR *recvFile = NULL, bool append = false, void (*downloadProgressCallback)(size_t, void *) = NULL,
-                                  void (*fileResendCallback)(NXCP_MESSAGE*, void *) = NULL, void *cbArg = NULL);
+       NXCPMessage *customRequest(NXCPMessage *pRequest, const TCHAR *recvFile = NULL, bool append = false,
+                void (*downloadProgressCallback)(size_t, void *) = NULL,
+                void (*fileResendCallback)(NXCP_MESSAGE *, void *) = NULL, void *cbArg = NULL);
 
    UINT32 getNumDataLines() { return m_dwNumDataLines; }
    const TCHAR *getDataLine(UINT32 dwIndex) { return dwIndex < m_dwNumDataLines ? m_ppDataLines[dwIndex] : _T("(error)"); }
index ac196c4..77b93f4 100644 (file)
@@ -93,7 +93,7 @@ AgentConnection::AgentConnection(InetAddress addr, WORD port, int authMethod, co
       m_szSecret[0] = 0;
    }
    m_allowCompression = allowCompression;
-   m_hSocket = -1;
+   m_channel = NULL;
    m_tLastCommandTime = 0;
    m_dwNumDataLines = 0;
    m_ppDataLines = NULL;
@@ -170,100 +170,95 @@ void AgentConnection::printMsg(const TCHAR *format, ...)
  */
 void AgentConnection::receiverThread()
 {
-       UINT32 msgBufferSize = 1024;
-   NXCPMessage *pMsg;
-   NXCP_MESSAGE *pRawMsg;
-   NXCP_BUFFER *pMsgBuffer;
-   BYTE *pDecryptionBuffer = NULL;
-   int error;
-   TCHAR szBuffer[128];
-       SOCKET nSocket;
+   UINT32 msgBufferSize = 1024;
 
    // Initialize raw message receiving function
-   pMsgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
-   RecvNXCPMessage(0, NULL, pMsgBuffer, 0, NULL, NULL, 0);
+   NXCP_BUFFER *msgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
+   RecvNXCPMessage(0, NULL, msgBuffer, 0, NULL, NULL, 0);
 
    // Allocate space for raw message
-   pRawMsg = (NXCP_MESSAGE *)malloc(msgBufferSize);
+   NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)malloc(msgBufferSize);
 #ifdef _WITH_ENCRYPTION
-   pDecryptionBuffer = (BYTE *)malloc(msgBufferSize);
+   BYTE *decryptionBuffer = (BYTE *)malloc(msgBufferSize);
+#else
+   BYTE *decryptionBuffer = NULL;
 #endif
 
-   // Message receiving loop
-   while(1)
+   while(true)
    {
-               // Shrink buffer after receiving large message
-               if (msgBufferSize > 131072)
-               {
-                       msgBufferSize = 131072;
-                  pRawMsg = (NXCP_MESSAGE *)realloc(pRawMsg, msgBufferSize);
-                       if (pDecryptionBuffer != NULL)
-                          pDecryptionBuffer = (BYTE *)realloc(pDecryptionBuffer, msgBufferSize);
-               }
+      // Shrink buffer after receiving large message
+      if (msgBufferSize > 131072)
+      {
+         msgBufferSize = 131072;
+         rawMsg = (NXCP_MESSAGE *)realloc(rawMsg, msgBufferSize);
+         if (decryptionBuffer != NULL)
+            decryptionBuffer = (BYTE *)realloc(decryptionBuffer, msgBufferSize);
+      }
 
       // Receive raw message
-      lock();
-               nSocket = m_hSocket;
-               unlock();
-      if ((error = RecvNXCPMessageEx(nSocket, &pRawMsg, pMsgBuffer, &msgBufferSize,
-                                     &m_pCtx, (pDecryptionBuffer != NULL) ? &pDecryptionBuffer : NULL,
-                                                                                           m_dwRecvTimeout, MAX_MSG_SIZE)) <= 0)
-               {
-                       if ((error != 0) && (WSAGetLastError() != WSAESHUTDOWN))
-                               DbgPrintf(6, _T("AgentConnection::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), error, WSAGetLastError());
+      int rc = RecvNXCPMessageEx(m_channel, &rawMsg, msgBuffer, &msgBufferSize,
+                                 &m_pCtx, (decryptionBuffer != NULL) ? &decryptionBuffer : NULL,
+                                 m_dwRecvTimeout, MAX_MSG_SIZE);
+      if (rc <= 0)
+      {
+         if ((rc != 0) && (WSAGetLastError() != WSAESHUTDOWN))
+            DbgPrintf(6, _T("AgentConnection::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), rc, WSAGetLastError());
          break;
-               }
+      }
 
       // Check if we get too large message
-      if (error == 1)
+      if (rc == 1)
       {
-         printMsg(_T("Received too large message %s (%d bytes)"),
-                  NXCPMessageCodeName(ntohs(pRawMsg->code), szBuffer),
-                  ntohl(pRawMsg->size));
+         TCHAR buffer[64];
+         printMsg(_T("Received too large message %s (%d bytes)"), NXCPMessageCodeName(ntohs(rawMsg->code), buffer), ntohl(rawMsg->size));
          continue;
       }
 
       // Check if we are unable to decrypt message
-      if (error == 2)
+      if (rc == 2)
       {
          printMsg(_T("Unable to decrypt received message"));
          continue;
       }
 
       // Check for timeout
-      if (error == 3)
+      if (rc == 3)
       {
-                       if (m_fileUploadInProgress)
-                               continue;       // Receive timeout may occur when uploading large files via slow links
+         if (m_fileUploadInProgress)
+            continue;   // Receive timeout may occur when uploading large files via slow links
          printMsg(_T("Timed out waiting for message"));
          break;
       }
 
       // Check that actual received packet size is equal to encoded in packet
-      if ((int)ntohl(pRawMsg->size) != error)
+      if ((int)ntohl(rawMsg->size) != rc)
       {
-         printMsg(_T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(pRawMsg->size), error);
+         printMsg(_T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(rawMsg->size), rc);
          continue;   // Bad packet, wait for next
       }
 
-               if (ntohs(pRawMsg->flags) & MF_BINARY)
-               {
+      if (ntohs(rawMsg->flags) & MF_BINARY)
+      {
          // Convert message header to host format
-         pRawMsg->id = ntohl(pRawMsg->id);
-         pRawMsg->code = ntohs(pRawMsg->code);
-         pRawMsg->numFields = ntohl(pRawMsg->numFields);
-         DbgPrintf(6, _T("Received raw message %s from agent at %s"),
-            NXCPMessageCodeName(pRawMsg->code, szBuffer), (const TCHAR *)m_addr.toString());
+         rawMsg->id = ntohl(rawMsg->id);
+         rawMsg->code = ntohs(rawMsg->code);
+         rawMsg->numFields = ntohl(rawMsg->numFields);
+         if (nxlog_get_debug_level() >= 6)
+         {
+            TCHAR buffer[64];
+            nxlog_debug(6, _T("Received raw message %s from agent at %s"),
+               NXCPMessageCodeName(rawMsg->code, buffer), (const TCHAR *)m_addr.toString());
+         }
 
-                       if ((pRawMsg->code == CMD_FILE_DATA) && (pRawMsg->id == m_dwDownloadRequestId))
-                       {
+         if ((rawMsg->code == CMD_FILE_DATA) && (rawMsg->id == m_dwDownloadRequestId))
+         {
             if (m_sendToClientMessageCallback != NULL)
             {
-               pRawMsg->code = ntohs(pRawMsg->code);
-               pRawMsg->numFields = ntohl(pRawMsg->numFields);
-               m_sendToClientMessageCallback(pRawMsg, m_downloadProgressCallbackArg);
+               rawMsg->code = ntohs(rawMsg->code);
+               rawMsg->numFields = ntohl(rawMsg->numFields);
+               m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
 
-               if (ntohs(pRawMsg->flags) & MF_END_OF_FILE)
+               if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
                {
                   onFileDownload(true);
                }
@@ -271,7 +266,7 @@ void AgentConnection::receiverThread()
                {
                   if (m_downloadProgressCallback != NULL)
                   {
-                     m_downloadProgressCallback(pRawMsg->size - (NXCP_HEADER_SIZE + 8), m_downloadProgressCallbackArg);
+                     m_downloadProgressCallback(rawMsg->size - (NXCP_HEADER_SIZE + 8), m_downloadProgressCallbackArg);
                   }
                }
             }
@@ -279,9 +274,9 @@ void AgentConnection::receiverThread()
             {
                if (m_hCurrFile != -1)
                {
-                  if (_write(m_hCurrFile, pRawMsg->fields, pRawMsg->numFields) == (int)pRawMsg->numFields)
+                  if (_write(m_hCurrFile, rawMsg->fields, rawMsg->numFields) == (int)rawMsg->numFields)
                   {
-                     if (ntohs(pRawMsg->flags) & MF_END_OF_FILE)
+                     if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
                      {
                         _close(m_hCurrFile);
                         m_hCurrFile = -1;
@@ -306,15 +301,15 @@ void AgentConnection::receiverThread()
                   onFileDownload(false);
                }
             }
-                       }
+         }
 
-                       if((pRawMsg->code == CMD_ABORT_FILE_TRANSFER) && (pRawMsg->id == m_dwDownloadRequestId))
-                       {
+         if ((rawMsg->code == CMD_ABORT_FILE_TRANSFER) && (rawMsg->id == m_dwDownloadRequestId))
+         {
             if (m_sendToClientMessageCallback != NULL)
             {
-               pRawMsg->code = ntohs(pRawMsg->code);
-               pRawMsg->numFields = ntohl(pRawMsg->numFields);
-               m_sendToClientMessageCallback(pRawMsg, m_downloadProgressCallbackArg);
+               rawMsg->code = ntohs(rawMsg->code);
+               rawMsg->numFields = ntohl(rawMsg->numFields);
+               m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
 
                onFileDownload(false);
             }
@@ -326,90 +321,90 @@ void AgentConnection::receiverThread()
 
                onFileDownload(false);
             }
-                       }
-               }
-               else
-               {
-                       // Create message object from raw message
-                       pMsg = new NXCPMessage(pRawMsg, m_nProtocolVersion);
-                       switch(pMsg->getCode())
-                       {
-                               case CMD_REQUEST_COMPLETED:
+         }
+      }
+      else
+      {
+         // Create message object from raw message
+         NXCPMessage *msg = new NXCPMessage(rawMsg, m_nProtocolVersion);
+         switch(msg->getCode())
+         {
+            case CMD_REQUEST_COMPLETED:
             case CMD_SESSION_KEY:
-                                       m_pMsgWaitQueue->put(pMsg);
-                                       break;
-                               case CMD_TRAP:
+               m_pMsgWaitQueue->put(msg);
+               break;
+            case CMD_TRAP:
                if (g_agentConnectionThreadPool != NULL)
                {
                   incInternalRefCount();
-                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onTrapCallback, pMsg);
+                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onTrapCallback, msg);
                }
                else
                {
-                  delete pMsg;
+                  delete msg;
                }
-                                       break;
+               break;
             case CMD_SYSLOG_RECORDS:
                if (g_agentConnectionThreadPool != NULL)
                {
                   incInternalRefCount();
-                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSyslogMessageCallback, pMsg);
+                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSyslogMessageCallback, msg);
                }
                else
                {
-                  delete pMsg;
+                  delete msg;
                }
                break;
-                               case CMD_PUSH_DCI_DATA:
-                                  if (g_agentConnectionThreadPool != NULL)
-                                  {
-                                     incInternalRefCount();
-                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onDataPushCallback, pMsg);
-                                  }
-                                  else
-                                  {
-                  delete pMsg;
-                                  }
-                                       break;
-                               case CMD_DCI_DATA:
+            case CMD_PUSH_DCI_DATA:
                if (g_agentConnectionThreadPool != NULL)
                {
                   incInternalRefCount();
-                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::processCollectedDataCallback, pMsg);
+                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onDataPushCallback, msg);
+               }
+               else
+               {
+                  delete msg;
+               }
+               break;
+            case CMD_DCI_DATA:
+               if (g_agentConnectionThreadPool != NULL)
+               {
+                  incInternalRefCount();
+                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::processCollectedDataCallback, msg);
                }
                else
                {
                   NXCPMessage response;
                   response.setCode(CMD_REQUEST_COMPLETED);
-                  response.setId(pMsg->getId());
+                  response.setId(msg->getId());
                   response.setField(VID_RCC, ERR_INTERNAL_ERROR);
                   sendMessage(&response);
-                  delete pMsg;
+                  delete msg;
                }
-                                       break;
+               break;
             case CMD_FILE_MONITORING:
-               onFileMonitoringData(pMsg);
-                                       delete pMsg;
+               onFileMonitoringData(msg);
+               delete msg;
                break;
             case CMD_SNMP_TRAP:
                if (g_agentConnectionThreadPool != NULL)
                {
                   incInternalRefCount();
-                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSnmpTrapCallback, pMsg);
+                  ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSnmpTrapCallback, msg);
                }
                else
                {
-                  delete pMsg;
+                  delete msg;
                }
                break;
-                               default:
-                                       if (processCustomMessage(pMsg))
-                                               delete pMsg;
-                                       else
-                                               m_pMsgWaitQueue->put(pMsg);
-                                       break;
-                       }
-               }
+            default:
+               if (processCustomMessage(msg))
+                  delete msg;
+               else
+                  m_pMsgWaitQueue->put(msg);
+               break;
+         }
+      }
    }
 
    // Close socket and mark connection as disconnected
@@ -421,10 +416,9 @@ void AgentConnection::receiverThread()
                onFileDownload(false);
        }
 
-       if (error == 0)
-      shutdown(m_hSocket, SHUT_RDWR);
-   closesocket(m_hSocket);
-   m_hSocket = -1;
+       m_channel->close();
+       m_channel->decRefCount();
+       m_channel = NULL;
        if (m_pCtx != NULL)
        {
                m_pCtx->decRefCount();
@@ -432,18 +426,43 @@ void AgentConnection::receiverThread()
        }
    m_isConnected = false;
    unlock();
+}
 
-   free(pRawMsg);
-   free(pMsgBuffer);
-#ifdef _WITH_ENCRYPTION
-   free(pDecryptionBuffer);
-#endif
+/**
+ * Create channel. Default implementation creates socket channel.
+ */
+AbstractCommChannel *AgentConnection::createChannel()
+{
+   // Create socket
+   SOCKET s = socket(m_bUseProxy ? m_proxyAddr.getFamily() : m_addr.getFamily(), SOCK_STREAM, 0);
+   if (s == INVALID_SOCKET)
+   {
+      printMsg(_T("Call to socket() failed"));
+      return NULL;
+   }
+
+   // Fill in address structure
+   SockAddrBuffer sb;
+   struct sockaddr *sa = m_bUseProxy ? m_proxyAddr.fillSockAddr(&sb, m_wProxyPort) : m_addr.fillSockAddr(&sb, m_wPort);
+
+   // Connect to server
+   if ((sa == NULL) || (ConnectEx(s, sa, SA_LEN(sa), m_connectionTimeout) == -1))
+   {
+      TCHAR buffer[64];
+      printMsg(_T("Cannot establish connection with agent at %s:%d"),
+         m_bUseProxy ? m_proxyAddr.toString(buffer) : m_addr.toString(buffer),
+         (int)(m_bUseProxy ? m_wProxyPort : m_wPort));
+      closesocket(s);
+      return NULL;
+   }
+
+   return new SocketCommChannel(s);
 }
 
 /**
  * Connect to agent
  */
-bool AgentConnection::connect(RSA *pServerKey, BOOL bVerbose, UINT32 *pdwError, UINT32 *pdwSocketError, UINT64 serverId)
+bool AgentConnection::connect(RSA *pServerKey, UINT32 *pdwError, UINT32 *pdwSocketError, UINT64 serverId)
 {
    TCHAR szBuffer[256];
    bool success = false;
@@ -465,36 +484,22 @@ bool AgentConnection::connect(RSA *pServerKey, BOOL bVerbose, UINT32 *pdwError,
    ThreadJoin(m_hReceiverThread);
    m_hReceiverThread = INVALID_THREAD_HANDLE;
 
-   // Check if we need to close existing socket
-   if (m_hSocket != -1)
-      closesocket(m_hSocket);
-
-   struct sockaddr *sa;
-
-   // Create socket
-   m_hSocket = socket(m_bUseProxy ? m_proxyAddr.getFamily() : m_addr.getFamily(), SOCK_STREAM, 0);
-   if (m_hSocket == INVALID_SOCKET)
+   // Check if we need to close existing channel
+   if (m_channel != NULL)
    {
-      printMsg(_T("Call to socket() failed"));
-      goto connect_cleanup;
+      m_channel->decRefCount();
+      m_channel = NULL;
    }
 
-   // Fill in address structure
-   SockAddrBuffer sb;
-   sa = m_bUseProxy ? m_proxyAddr.fillSockAddr(&sb, m_wProxyPort) : m_addr.fillSockAddr(&sb, m_wPort);
-
-   // Connect to server
-       if ((sa == NULL) || (ConnectEx(m_hSocket, sa, SA_LEN(sa), m_connectionTimeout) == -1))
+   m_channel = createChannel();
+   if (m_channel == NULL)
    {
-      if (bVerbose)
-         printMsg(_T("Cannot establish connection with agent at %s:%d"),
-            m_bUseProxy ? m_proxyAddr.toString(szBuffer) : m_addr.toString(szBuffer),
-            (int)(m_bUseProxy ? m_wProxyPort : m_wPort));
+      printMsg(_T("Cannot create communication channel"));
       dwError = ERR_CONNECT_FAILED;
       goto connect_cleanup;
    }
 
-   if (!NXCPGetPeerProtocolVersion(m_hSocket, &m_nProtocolVersion, m_mutexSocketWrite))
+   if (!NXCPGetPeerProtocolVersion(m_channel, &m_nProtocolVersion, m_mutexSocketWrite))
    {
       dwError = ERR_INTERNAL_ERROR;
       goto connect_cleanup;
@@ -587,17 +592,17 @@ connect_cleanup:
                        *pdwSocketError = (UINT32)WSAGetLastError();
 
       lock();
-      if (m_hSocket != -1)
-         shutdown(m_hSocket, SHUT_RDWR);
+      if (m_channel != NULL)
+         m_channel->shutdown();
       unlock();
       ThreadJoin(m_hReceiverThread);
       m_hReceiverThread = INVALID_THREAD_HANDLE;
 
       lock();
-      if (m_hSocket != -1)
+      if (m_channel != NULL)
       {
-         closesocket(m_hSocket);
-         m_hSocket = -1;
+         m_channel->close();
+         m_channel = NULL;
       }
 
                if (m_pCtx != NULL)
@@ -627,9 +632,9 @@ void AgentConnection::disconnect()
                onFileDownload(false);
        }
 
-   if (m_hSocket != -1)
+   if (m_channel != NULL)
    {
-      shutdown(m_hSocket, SHUT_RDWR);
+      m_channel->shutdown();
    }
    destroyResultData();
    m_isConnected = false;
@@ -959,14 +964,14 @@ UINT32 AgentConnection::waitForRCC(UINT32 dwRqId, UINT32 dwTimeOut)
 bool AgentConnection::sendMessage(NXCPMessage *pMsg)
 {
    bool success;
-   NXCP_MESSAGE *pRawMsg = pMsg->createMessage(m_allowCompression);
+   NXCP_MESSAGE *rawMsg = pMsg->createMessage(m_allowCompression);
        NXCPEncryptionContext *pCtx = acquireEncryptionContext();
    if (pCtx != NULL)
    {
-      NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(pRawMsg);
+      NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
       if (pEnMsg != NULL)
       {
-         success = (SendEx(m_hSocket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
+         success = (m_channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
          free(pEnMsg);
       }
       else
@@ -977,9 +982,9 @@ bool AgentConnection::sendMessage(NXCPMessage *pMsg)
    }
    else
    {
-      success = (SendEx(m_hSocket, (char *)pRawMsg, ntohl(pRawMsg->size), 0, m_mutexSocketWrite) == (int)ntohl(pRawMsg->size));
+      success = (m_channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
    }
-   free(pRawMsg);
+   free(rawMsg);
    return success;
 }
 
@@ -989,14 +994,14 @@ bool AgentConnection::sendMessage(NXCPMessage *pMsg)
 bool AgentConnection::sendRawMessage(NXCP_MESSAGE *pMsg)
 {
    bool success;
-   NXCP_MESSAGE *pRawMsg = pMsg;
+   NXCP_MESSAGE *rawMsg = pMsg;
        NXCPEncryptionContext *pCtx = acquireEncryptionContext();
    if (pCtx != NULL)
    {
-      NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(pRawMsg);
+      NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
       if (pEnMsg != NULL)
       {
-         success = (SendEx(m_hSocket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
+         success = (m_channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
          free(pEnMsg);
       }
       else
@@ -1007,7 +1012,7 @@ bool AgentConnection::sendRawMessage(NXCP_MESSAGE *pMsg)
    }
    else
    {
-      success = (SendEx(m_hSocket, (char *)pRawMsg, ntohl(pRawMsg->size), 0, m_mutexSocketWrite) == (int)ntohl(pRawMsg->size));
+      success = (m_channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
    }
    return success;
 }
@@ -1367,7 +1372,7 @@ UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinat
                localFile, (compMethod == NXCP_STREAM_COMPRESSION_NONE) ? _T("without") : _T("with"));
                m_fileUploadInProgress = true;
                NXCPEncryptionContext *ctx = acquireEncryptionContext();
-      if (SendFileOverNXCP(m_hSocket, dwRqId, localFile, ctx, 0, progressCallback, cbArg, m_mutexSocketWrite, compMethod))
+      if (SendFileOverNXCP(m_channel, dwRqId, localFile, ctx, 0, progressCallback, cbArg, m_mutexSocketWrite, compMethod))
          dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
       else
          dwResult = ERR_IO_FAILURE;
@@ -1890,8 +1895,8 @@ UINT32 AgentConnection::takeScreenshot(const TCHAR *sessionName, BYTE **data, si
 /**
  * Send custom request to agent
  */
-NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *recvFile, bool append, void (*downloadProgressCallback)(size_t, void *),
-                                                                                                                 void (*fileResendCallback)(NXCP_MESSAGE*, void *), void *cbArg)
+NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *recvFile, bool append,
+         void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
 {
    UINT32 dwRqId, rcc;
        NXCPMessage *msg = NULL;
@@ -1955,7 +1960,7 @@ NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *
  * Prepare for file download
  */
 UINT32 AgentConnection::prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append, void (*downloadProgressCallback)(size_t, void *),
-                                             void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
+                                             void (* fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
 {
    if (fileResendCallback == NULL)
    {
index aab726c..57aa5f7 100644 (file)
@@ -45,7 +45,7 @@ static void OutputCallback(ActionCallbackEvent e, const TCHAR *data, void *arg)
 int main(int argc, char *argv[])
 {
    char *eptr;
-   BOOL bStart = TRUE, bVerbose = TRUE;
+   BOOL bStart = TRUE;
    bool showOutput = false;
    int i, ch, iExitCode = 3;
    int iAuthMethod = AUTH_NONE;
@@ -67,7 +67,7 @@ int main(int argc, char *argv[])
 
    // Parse command line
    opterr = 1;
-       while((ch = getopt(argc, argv, "a:e:hK:op:qs:vw:W:")) != -1)
+   while((ch = getopt(argc, argv, "a:e:hK:op:s:vw:W:")) != -1)
    {
       switch(ch)
       {
@@ -91,7 +91,6 @@ int main(int argc, char *argv[])
 #endif
                      _T("   -o           : Show action's output.\n")
                      _T("   -p <port>    : Specify agent's port number. Default is %d.\n")
-                     _T("   -q           : Quiet mode.\n")
                      _T("   -s <secret>  : Specify shared secret for authentication.\n")
                      _T("   -v           : Display version and exit.\n")
                      _T("   -w <seconds> : Set command timeout (default is 5 seconds)\n")
@@ -133,9 +132,6 @@ int main(int argc, char *argv[])
                wPort = (WORD)i;
             }
             break;
-         case 'q':   // Quiet mode
-            bVerbose = FALSE;
-            break;
          case 's':   // Shared secret
 #ifdef UNICODE
                                MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, optarg, -1, szSecret, MAX_SECRET_LENGTH);
@@ -263,7 +259,7 @@ int main(int argc, char *argv[])
             conn->setConnectionTimeout(dwConnTimeout);
             conn->setCommandTimeout(dwTimeout);
             conn->setEncryptionPolicy(iEncryptionPolicy);
-            if (conn->connect(pServerKey, bVerbose, &dwError))
+            if (conn->connect(pServerKey, &dwError))
             {
                UINT32 dwError;
 
@@ -283,13 +279,10 @@ int main(int argc, char *argv[])
 #else
                dwError = conn->execAction(argv[optind + 1], argc - optind - 2, &argv[optind + 2], showOutput, OutputCallback);
 #endif
-               if (bVerbose)
-               {
-                  if (dwError == ERR_SUCCESS)
-                     _tprintf(_T("Action executed successfully\n"));
-                  else
-                     _tprintf(_T("%d: %s\n"), dwError, AgentErrorCodeToText(dwError));
-               }
+               if (dwError == ERR_SUCCESS)
+                  _tprintf(_T("Action executed successfully\n"));
+               else
+                  _tprintf(_T("%d: %s\n"), dwError, AgentErrorCodeToText(dwError));
                iExitCode = (dwError == ERR_SUCCESS) ? 0 : 1;
             }
             else
index ff36a57..8d2a649 100644 (file)
@@ -81,7 +81,7 @@ static int UninstallPolicy(AgentConnection *conn, const uuid& guid)
 int main(int argc, char *argv[])
 {
    char *eptr;
-   BOOL bStart = TRUE, bVerbose = TRUE;
+   BOOL bStart = TRUE;
    int i, ch, iExitCode = 3, action = -1;
    int iAuthMethod = AUTH_NONE;
 #ifdef _WITH_ENCRYPTION
@@ -103,7 +103,7 @@ int main(int argc, char *argv[])
 
    // Parse command line
    opterr = 1;
-       while((ch = getopt(argc, argv, "a:e:hK:lp:qs:u:vw:W:")) != -1)
+   while((ch = getopt(argc, argv, "a:e:hK:lp:s:u:vw:W:")) != -1)
    {
       switch(ch)
       {
@@ -127,7 +127,6 @@ int main(int argc, char *argv[])
                      _T("                  (default is %s).\n")
 #endif
                      _T("   -p <port>    : Specify agent's port number. Default is %d.\n")
-                     _T("   -q           : Quiet mode.\n")
                      _T("   -s <secret>  : Specify shared secret for authentication.\n")
                      _T("   -v           : Display version and exit.\n")
                      _T("   -w <seconds> : Set command timeout (default is 5 seconds)\n")
@@ -166,9 +165,6 @@ int main(int argc, char *argv[])
                wPort = (WORD)i;
             }
             break;
-         case 'q':   // Quiet mode
-            bVerbose = FALSE;
-            break;
          case 's':   // Shared secret
 #ifdef UNICODE
                                MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, optarg, -1, szSecret, MAX_SECRET_LENGTH);
@@ -315,7 +311,7 @@ int main(int argc, char *argv[])
             conn->setConnectionTimeout(dwConnTimeout);
             conn->setCommandTimeout(dwTimeout);
             conn->setEncryptionPolicy(iEncryptionPolicy);
-            if (conn->connect(pServerKey, bVerbose, &dwError))
+            if (conn->connect(pServerKey, &dwError))
             {
                                        if (action == 0)
                                        {
index f2cee93..249eaa4 100644 (file)
@@ -47,11 +47,6 @@ enum Operation
 };
 
 /**
- * Verbose flag
- */
-static BOOL s_verbose = TRUE;
-
-/**
  * Get single parameter
  */
 static int Get(AgentConnection *pConn, const TCHAR *pszParam, BOOL bShowName)
@@ -291,7 +286,7 @@ int main(int argc, char *argv[])
 
    // Parse command line
    opterr = 1;
-       while((ch = getopt(argc, argv, "a:A:bCe:Ehi:IK:lno:O:p:P:qr:R:s:S:t:Tvw:W:X:Z:")) != -1)
+   while((ch = getopt(argc, argv, "a:A:bCe:Ehi:IK:lno:O:p:P:r:R:s:S:t:Tvw:W:X:Z:")) != -1)
    {
       switch(ch)
       {
@@ -325,7 +320,6 @@ int main(int argc, char *argv[])
                      _T("   -O <port>    : Proxy agent's port number. Default is %d.\n")
                      _T("   -p <port>    : Agent's port number. Default is %d.\n")
                      _T("   -P <port>    : Network service port (to be used wth -S option).\n")
-                     _T("   -q           : Quiet mode.\n")
                      _T("   -r <string>  : Service check request string.\n")
                      _T("   -R <string>  : Service check expected response string.\n")
                      _T("   -s <secret>  : Shared secret for authentication.\n")
@@ -417,9 +411,6 @@ int main(int argc, char *argv[])
                   wServicePort = (WORD)i;
             }
             break;
-         case 'q':   // Quiet mode
-            s_verbose = FALSE;
-            break;
          case 'r':   // Service check request string
 #ifdef UNICODE
                 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, optarg, -1, szRequest, MAX_DB_STRING);
@@ -646,7 +637,7 @@ int main(int argc, char *argv[])
             conn->setEncryptionPolicy(iEncryptionPolicy);
             if (useProxy)
                conn->setProxy(proxyAddr, proxyPort, proxyAuth, szProxySecret);
-            if (conn->connect(pServerKey, s_verbose, &dwError))
+            if (conn->connect(pServerKey, &dwError))
             {
                do
                {
index e656a07..9c4d266 100644 (file)
@@ -360,7 +360,7 @@ int main(int argc, char *argv[])
                                conn->setConnectionTimeout(dwConnTimeout);
             conn->setCommandTimeout(dwTimeout);
             conn->setEncryptionPolicy(iEncryptionPolicy);
-            if (conn->connect(pServerKey, bVerbose, &dwError))
+            if (conn->connect(pServerKey, &dwError))
             {
                UINT32 dwError;