added communication channel abstraction layer; added tunnel reset command
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 6 Mar 2017 16:55:29 +0000 (18:55 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 6 Mar 2017 16:55:29 +0000 (18:55 +0200)
15 files changed:
include/nms_common.h
include/nms_cscp.h
include/nms_util.h
include/nxcpapi.h
src/agent/core/comm.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/agent/core/tunnel.cpp
src/java/client/netxms-base/src/main/java/org/netxms/base/NXCPCodes.java
src/libnetxms/Makefile.am
src/libnetxms/Makefile.w32
src/libnetxms/cch.cpp [new file with mode: 0644]
src/libnetxms/msgrecv.cpp
src/libnetxms/nxcp.cpp
src/libnetxms/tools.cpp

index 3248022..854ffa6 100644 (file)
@@ -758,7 +758,7 @@ inline int _close(int fd) { return ::close(fd); }
 // Socket compatibility
 typedef int SOCKET;
 
-#define closesocket(x) close(x)
+#define closesocket(x) _close(x)
 #define WSAGetLastError() (errno)
 
 #define WSAEINTR        EINTR
index 24b2eab..a890e8d 100644 (file)
@@ -580,6 +580,8 @@ typedef struct
 #define CMD_BULK_ALARM_STATE_CHANGE    0x0159
 #define CMD_GET_FOLDER_SIZE            0x015A
 #define CMD_FIND_HOSTNAME_LOCATION     0x015B
+#define CMD_RESET_TUNNEL               0x015C
+#define CMD_CREATE_SESSION             0x015D
 
 #define CMD_RS_LIST_REPORTS            0x1100
 #define CMD_RS_GET_REPORT              0x1101
index 915fbc9..0d24ef2 100644 (file)
@@ -1571,6 +1571,46 @@ public:
    void reset();
 };
 
+/**
+ * Abstract communication channel
+ */
+class LIBNETXMS_EXPORTABLE AbstractCommChannel : public RefCountObject
+{
+protected:
+   virtual ~AbstractCommChannel();
+
+public:
+   AbstractCommChannel();
+
+   virtual int send(const void *data, size_t size, MUTEX mutex = INVALID_MUTEX_HANDLE) = 0;
+   virtual int recv(void *buffer, size_t size, UINT32 timeout = INFINITE) = 0;
+   virtual int poll(UINT32 timeout, bool write = false) = 0;
+   virtual int shutdown() = 0;
+   virtual void close() = 0;
+};
+
+/**
+ * Socket communication channel
+ */
+class LIBNETXMS_EXPORTABLE SocketCommChannel : public AbstractCommChannel
+{
+private:
+   SOCKET m_socket;
+   bool m_owner;
+
+protected:
+   virtual ~SocketCommChannel();
+
+public:
+   SocketCommChannel(SOCKET socket, bool owner = true);
+
+   virtual int send(const void *data, size_t size, MUTEX mutex = INVALID_MUTEX_HANDLE);
+   virtual int recv(void *buffer, size_t size, UINT32 timeout = INFINITE);
+   virtual int poll(UINT32 timeout, bool write = false);
+   virtual int shutdown();
+   virtual void close();
+};
+
 #endif   /* __cplusplus */
 
 /**
index e65390c..3344456 100644 (file)
@@ -349,6 +349,22 @@ public:
    virtual ~SocketMessageReceiver();
 };
 
+/**
+ * Message receiver - comm channel implementation
+ */
+class LIBNETXMS_EXPORTABLE CommChannelMessageReceiver : public AbstractMessageReceiver
+{
+private:
+   AbstractCommChannel *m_channel;
+
+protected:
+   virtual int readBytes(BYTE *buffer, size_t size, UINT32 timeout);
+
+public:
+   CommChannelMessageReceiver(AbstractCommChannel *channel, size_t initialSize, size_t maxSize);
+   virtual ~CommChannelMessageReceiver();
+};
+
 #ifdef _WITH_ENCRYPTION
 
 /**
@@ -541,11 +557,15 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuf
 NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(UINT16 code, UINT32 id, UINT16 flags,
                                                         const void *data, size_t dataSize,
                                                         NXCP_MESSAGE *buffer, bool allowCompression);
-BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 dwId, const TCHAR *pszFile,
+bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 dwId, const TCHAR *pszFile,
                                            NXCPEncryptionContext *pCtx, long offset,
                                                                                                                 void (* progressCallback)(INT64, void *), void *cbArg,
                                                                                                                 MUTEX mutex, NXCPStreamCompressionMethod compressionMethod = NXCP_STREAM_COMPRESSION_NONE);
-BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex);
+bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(AbstractCommChannel *channel, UINT32 dwId, const TCHAR *pszFile,
+                                           NXCPEncryptionContext *pCtx, long offset,
+                                           void (* progressCallback)(INT64, void *), void *cbArg,
+                                           MUTEX mutex, NXCPStreamCompressionMethod compressionMethod = NXCP_STREAM_COMPRESSION_NONE);
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex);
 
 TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(UINT16 wCode, TCHAR *buffer);
 void LIBNETXMS_EXPORTABLE NXCPRegisterMessageNameResolver(NXCPMessageNameResolver r);
index d09b101..b7fec42 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS multiplatform core agent
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
@@ -412,7 +412,9 @@ THREAD_RESULT THREAD_CALL ListenerThread(void *)
             DebugPrintf(5, _T("Connection from %s accepted"), buffer);
 
             // Create new session structure and threads
-            CommSession *session = new CommSession(hClientSocket, addr, masterServer, controlServer);
+            SocketCommChannel *channel = new SocketCommChannel(hClientSocket);
+            CommSession *session = new CommSession(channel, addr, masterServer, controlServer);
+            channel->decRefCount();
                        
             if (!RegisterSession(session))
             {
index f5c0872..e7ac9a1 100644 (file)
@@ -310,7 +310,7 @@ class CommSession : public AbstractCommSession
 private:
    UINT32 m_id;
    UINT32 m_index;
-   SOCKET m_hSocket;
+   AbstractCommChannel *m_channel;
    Queue *m_sendQueue;
    Queue *m_processingQueue;
    THREAD m_hWriteThread;
@@ -348,7 +348,7 @@ private:
    void recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg);
    void getLocalFile(NXCPMessage *pRequest, NXCPMessage *pMsg);
    void cancelFileMonitoring(NXCPMessage *pRequest, NXCPMessage *pMsg);
-   UINT32 upgrade(NXCPMessage *pRequest);
+   UINT32 upgrade(NXCPMessage *request);
    UINT32 setupProxyConnection(NXCPMessage *pRequest);
 
    void readThread();
@@ -363,7 +363,7 @@ private:
    static THREAD_RESULT THREAD_CALL proxyReadThreadStarter(void *);
 
 public:
-   CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer);
+   CommSession(AbstractCommChannel *channel, const InetAddress &serverAddr, bool masterServer, bool controlServer);
    virtual ~CommSession();
 
    void run();
index e03d2de..b1bab17 100644 (file)
@@ -113,13 +113,14 @@ THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
 /**
  * Client session class constructor
  */
-CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer) : m_downloadFileMap(true)
+CommSession::CommSession(AbstractCommChannel *channel, const InetAddress &serverAddr, bool masterServer, bool controlServer) : m_downloadFileMap(true)
 {
    m_id = InterlockedIncrement(&s_sessionId);
    m_index = INVALID_INDEX;
    m_sendQueue = new Queue;
    m_processingQueue = new Queue;
-   m_hSocket = hSocket;
+   m_channel = channel;
+   m_channel->incRefCount();
    m_hProxySocket = INVALID_SOCKET;
    m_hWriteThread = INVALID_THREAD_HANDLE;
    m_hProcessingThread = INVALID_THREAD_HANDLE;
@@ -152,8 +153,9 @@ CommSession::~CommSession()
    if (m_proxyConnection)
       InterlockedDecrement(&s_activeProxySessions);
 
-   shutdown(m_hSocket, SHUT_RDWR);
-   closesocket(m_hSocket);
+   m_channel->shutdown();
+   m_channel->close();
+   m_channel->decRefCount();
    if (m_hProxySocket != INVALID_SOCKET)
       closesocket(m_hProxySocket);
 
@@ -207,7 +209,7 @@ void CommSession::run()
 void CommSession::disconnect()
 {
        debugPrintf(5, _T("CommSession::disconnect()"));
-   shutdown(m_hSocket, SHUT_RDWR);
+   m_channel->shutdown();
    if (m_hProxySocket != -1)
       shutdown(m_hProxySocket, SHUT_RDWR);
    m_disconnected = true;
@@ -218,7 +220,7 @@ void CommSession::disconnect()
  */
 void CommSession::readThread()
 {
-   SocketMessageReceiver receiver(m_hSocket, 4096, MAX_AGENT_MSG_SIZE);
+   CommChannelMessageReceiver receiver(m_channel, 4096, MAX_AGENT_MSG_SIZE);
    while(!m_disconnected)
    {
       if (!m_proxyConnection)
@@ -385,9 +387,7 @@ void CommSession::readThread()
       }
       else  // m_proxyConnection
       {
-         SocketPoller sp;
-         sp.add(m_hSocket);
-         int rc = sp.poll((g_dwIdleTimeout + 1) * 1000);
+         int rc = m_channel->poll((g_dwIdleTimeout + 1) * 1000);
          if (rc <= 0)
             break;
          if (rc > 0)
@@ -396,7 +396,7 @@ void CommSession::readThread()
             m_ts = time(NULL);
 
             char buffer[32768];
-            rc = recv(m_hSocket, buffer, 32768, 0);
+            rc = m_channel->recv(buffer, 32768);
             if (rc <= 0)
                break;
             SendEx(m_hProxySocket, buffer, rc, 0, NULL);
@@ -449,7 +449,7 @@ bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
       NXCP_ENCRYPTED_MESSAGE *enMsg = ctx->encryptMessage(msg);
       if (enMsg != NULL)
       {
-         if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
+         if (m_channel->send(enMsg, ntohl(enMsg->size), m_socketWriteMutex) <= 0)
          {
             success = false;
          }
@@ -458,7 +458,7 @@ bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
    }
    else
    {
-      if (SendEx(m_hSocket, (const char *)msg, ntohl(msg->size), 0, m_socketWriteMutex) <= 0)
+      if (m_channel->send(msg, ntohl(msg->size), m_socketWriteMutex) <= 0)
       {
          success = false;
       }
@@ -935,21 +935,21 @@ bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset, boo
 {
    if (m_disconnected)
       return false;
-       return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset,
-                SendFileProgressCallback, this, m_socketWriteMutex, allowCompression ? NXCP_STREAM_COMPRESSION_DEFLATE : NXCP_STREAM_COMPRESSION_NONE) ? true : false;
+       return SendFileOverNXCP(m_channel, requestId, file, m_pCtx, offset,
+                SendFileProgressCallback, this, m_socketWriteMutex, allowCompression ? NXCP_STREAM_COMPRESSION_DEFLATE : NXCP_STREAM_COMPRESSION_NONE);
 }
 
 /**
  * Upgrade agent from package in the file store
  */
-UINT32 CommSession::upgrade(NXCPMessage *pRequest)
+UINT32 CommSession::upgrade(NXCPMessage *request)
 {
    if (m_masterServer)
    {
       TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
 
       szPkgName[0] = 0;
-      pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
+      request->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
       BuildFullPath(szPkgName, szFullPath);
 
       //Create line in registry file with upgrade file name to delete it after system start
@@ -1131,7 +1131,7 @@ void CommSession::proxyReadThread()
          rc = recv(m_hProxySocket, buffer, 32768, 0);
          if (rc <= 0)
             break;
-         SendEx(m_hSocket, buffer, rc, 0, m_socketWriteMutex);
+         m_channel->send(buffer, rc, m_socketWriteMutex);
       }
    }
    disconnect();
index 8c57ef9..642af9c 100644 (file)
@@ -35,6 +35,7 @@ private:
    SSL *m_ssl;
    MUTEX m_sslLock;
    bool m_connected;
+   bool m_reset;
    VolatileCounter m_requestId;
    THREAD m_recvThread;
    MsgWaitQueue *m_queue;
@@ -79,6 +80,7 @@ Tunnel::Tunnel(const InetAddress& addr, UINT16 port) : m_address(addr)
    m_ssl = NULL;
    m_sslLock = MutexCreate();
    m_connected = false;
+   m_reset = false;
    m_requestId = 0;
    m_recvThread = INVALID_THREAD_HANDLE;
    m_queue = NULL;
@@ -151,12 +153,22 @@ void Tunnel::recvThread()
             TCHAR buffer[64];
             debugPrintf(6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
          }
+
+         if (msg->getCode() == CMD_RESET_TUNNEL)
+         {
+            m_reset = true;
+            debugPrintf(4, _T("Receiver thread stopped (tunnel reset)"));
+            break;
+         }
+
          switch(msg->getCode())
          {
             case CMD_BIND_AGENT_TUNNEL:
                ThreadPoolExecute(g_commThreadPool, this, &Tunnel::processBindRequest, msg);
                msg = NULL; // prevent message deletion
                break;
+            case CMD_CREATE_SESSION:
+               break;
             default:
                m_queue->put(msg);
                msg = NULL; // prevent message deletion
@@ -412,7 +424,17 @@ bool Tunnel::connectToServer()
  */
 void Tunnel::checkConnection()
 {
-   if (!m_connected)
+   if (m_reset)
+   {
+      m_reset = false;
+      disconnect();
+      closesocket(m_socket);
+      m_socket = INVALID_SOCKET;
+      debugPrintf(3, _T("Resetting tunnel"));
+      if (connectToServer())
+         debugPrintf(3, _T("Tunnel is active"));
+   }
+   else if (!m_connected)
    {
       if (connectToServer())
          debugPrintf(3, _T("Tunnel is active"));
index 6937630..e3dd483 100644 (file)
@@ -367,6 +367,8 @@ public class NXCPCodes
    public static final int CMD_BULK_ALARM_STATE_CHANGE = 0x0159;
    public static final int CMD_GET_FOLDER_SIZE = 0x015A;
    public static final int CMD_FIND_HOSTNAME_LOCATION = 0x015B;
+   public static final int CMD_RESET_TUNNEL = 0x015C;
+   public static final int CMD_CREATE_SESSION = 0x015D;
 
        // CMD_RS_ - Reporting Server related codes
        public static final int CMD_RS_LIST_REPORTS = 0x1100;
index 82f2b93..fba0706 100644 (file)
@@ -1,5 +1,5 @@
 SOURCES = array.cpp base64.cpp bytestream.cpp cc_mb.cpp cc_ucs2.cpp \
-          cc_ucs4.cpp cc_utf8.cpp config.cpp crypto.cpp \
+          cc_ucs4.cpp cc_utf8.cpp cch.cpp config.cpp crypto.cpp \
          dirw_unix.c geolocation.cpp getopt.c dload.cpp hash.cpp \
          hashmapbase.cpp ice.c icmp.cpp icmp6.cpp iconv.cpp \
          inet_pton.c inetaddr.cpp log.cpp lz4.c main.cpp md5.cpp message.cpp \
index ef3f2ac..364ca3d 100644 (file)
@@ -1,7 +1,7 @@
 TARGET = libnetxms.dll
 TYPE = dll
 SOURCES = array.cpp base64.cpp bytestream.cpp cc_mb.cpp cc_ucs2.cpp \
-       cc_ucs4.cpp cc_utf8.cpp config.cpp crypto.cpp dir.c \
+       cc_ucs4.cpp cc_utf8.cpp cch.cpp config.cpp crypto.cpp dir.c \
        dirw.c dload.cpp geolocation.cpp getopt.c hash.cpp \
        hashmapbase.cpp ice.c icmp.cpp \
        inetaddr.cpp log.cpp lz4.c main.cpp md5.cpp message.cpp \
diff --git a/src/libnetxms/cch.cpp b/src/libnetxms/cch.cpp
new file mode 100644 (file)
index 0000000..0f2d8ef
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+** NetXMS - Network Management System
+** NetXMS Foundation Library
+** Copyright (C) 2003-2017 Victor Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU Lesser General Public License as published
+** by the Free Software Foundation; either version 3 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU Lesser General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: cch.cpp
+**
+**/
+
+#include "libnetxms.h"
+
+/**
+ * Abstract communication channel constructor
+ */
+AbstractCommChannel::AbstractCommChannel()
+{
+}
+
+/**
+ * Abstract communication channel destructor
+ */
+AbstractCommChannel::~AbstractCommChannel()
+{
+}
+
+/**
+ * Socket communication channel constructor
+ */
+SocketCommChannel::SocketCommChannel(SOCKET socket, bool owner) : AbstractCommChannel()
+{
+   m_socket = socket;
+   m_owner = owner;
+}
+
+/**
+ * Socket communication channel destructor
+ */
+SocketCommChannel::~SocketCommChannel()
+{
+   if (m_owner && (m_socket != INVALID_SOCKET))
+      closesocket(m_socket);
+}
+
+/**
+ * Send data
+ */
+int SocketCommChannel::send(const void *data, size_t size, MUTEX mutex)
+{
+   return SendEx(m_socket, data, size, 0, mutex);
+}
+
+/**
+ * Receive data
+ */
+int SocketCommChannel::recv(void *buffer, size_t size, UINT32 timeout)
+{
+   return RecvEx(m_socket, buffer, size, 0, timeout);
+}
+
+/**
+ * Poll channel
+ */
+int SocketCommChannel::poll(UINT32 timeout, bool write)
+{
+   if (m_socket == INVALID_SOCKET)
+      return -1;
+
+   SocketPoller sp(write);
+   sp.add(m_socket);
+   return sp.poll(timeout);
+}
+
+/**
+ * Shutdown channel
+ */
+int SocketCommChannel::shutdown()
+{
+   return (m_socket != INVALID_SOCKET) ? ::shutdown(m_socket, SHUT_RDWR) : -1;
+}
+
+/**
+ * Close channel
+ */
+void SocketCommChannel::close()
+{
+   closesocket(m_socket);
+   m_socket = INVALID_SOCKET;
+}
index 05dbb9a..cc08f45 100644 (file)
@@ -1,7 +1,7 @@
 /*
 ** NetXMS - Network Management System
 ** NetXMS Foundation Library
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU Lesser General Public License as published
@@ -197,6 +197,31 @@ int SocketMessageReceiver::readBytes(BYTE *buffer, size_t size, UINT32 timeout)
    return RecvEx(m_socket, buffer, size, 0, timeout);
 }
 
+/**
+ * Communication channel message receiver constructor
+ */
+CommChannelMessageReceiver::CommChannelMessageReceiver(AbstractCommChannel *channel, size_t initialSize, size_t maxSize) : AbstractMessageReceiver(initialSize, maxSize)
+{
+   m_channel = channel;
+   m_channel->incRefCount();
+}
+
+/**
+ * Communication channel message receiver destructor
+ */
+CommChannelMessageReceiver::~CommChannelMessageReceiver()
+{
+   m_channel->decRefCount();
+}
+
+/**
+ * Read bytes from communication channel
+ */
+int CommChannelMessageReceiver::readBytes(BYTE *buffer, size_t size, UINT32 timeout)
+{
+   return m_channel->recv(buffer, size, timeout);
+}
+
 #ifdef _WITH_ENCRYPTION
 
 /**
index d66b245..beb0815 100644 (file)
@@ -384,10 +384,13 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer)
       _T("CMD_BULK_TERMINATE_ALARMS"),
       _T("CMD_BULK_RESOLVE_ALARMS"),
       _T("CMD_BULK_ALARM_STATE_CHANGE"),
-      _T("CMD_GET_FOLDER_SIZE")
+      _T("CMD_GET_FOLDER_SIZE"),
+      _T("CMD_FIND_HOSTNAME_LOCATION"),
+      _T("CMD_RESET_TUNNEL"),
+      _T("CMD_CREATE_SESSION")
    };
 
-   if ((code >= CMD_LOGIN) && (code <= CMD_GET_FOLDER_SIZE))
+   if ((code >= CMD_LOGIN) && (code <= CMD_CREATE_SESSION))
    {
       _tcscpy(pszBuffer, pszMsgNames[code - CMD_LOGIN]);
    }
@@ -666,9 +669,23 @@ NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(UINT16 code, UINT32 id,
 }
 
 /**
- * Send file over CSCP
+ * Send file over NXCP
  */
-BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHAR *pszFile,
+bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHAR *pszFile,
+                                           NXCPEncryptionContext *pCtx, long offset,
+                                           void (* progressCallback)(INT64, void *), void *cbArg,
+                                           MUTEX mutex, NXCPStreamCompressionMethod compressionMethod)
+{
+   SocketCommChannel *ch = new SocketCommChannel(hSocket, false);
+   bool result = SendFileOverNXCP(ch, id, pszFile, pCtx, offset, progressCallback, cbArg, mutex, compressionMethod);
+   ch->decRefCount();
+   return result;
+}
+
+/**
+ * Send file over NXCP
+ */
+bool LIBNETXMS_EXPORTABLE SendFileOverNXCP(AbstractCommChannel *channel, UINT32 id, const TCHAR *pszFile,
                                            NXCPEncryptionContext *pCtx, long offset,
                                                                                                                 void (* progressCallback)(INT64, void *), void *cbArg,
                                                                                                                 MUTEX mutex, NXCPStreamCompressionMethod compressionMethod)
@@ -676,7 +693,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
    int hFile, iBytes;
        INT64 bytesTransferred = 0;
    UINT32 dwPadding;
-   BOOL bResult = FALSE;
+   bool success = false;
    NXCP_MESSAGE *pMsg;
    NXCP_ENCRYPTED_MESSAGE *pEnMsg;
 
@@ -737,13 +754,13 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
                pEnMsg = pCtx->encryptMessage(pMsg);
                                        if (pEnMsg != NULL)
                                        {
-                                               SendEx(hSocket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, mutex);
+                                          channel->send(pEnMsg, ntohl(pEnMsg->size), mutex);
                                                free(pEnMsg);
                                        }
                                }
                                else
                                {
-                                       if (SendEx(hSocket, (char *)pMsg, (UINT32)iBytes + NXCP_HEADER_SIZE + dwPadding, 0, mutex) <= 0)
+                                       if (channel->send(pMsg, (UINT32)iBytes + NXCP_HEADER_SIZE + dwPadding, mutex) <= 0)
                                                break;  // Send error
                                }
                                if (progressCallback != NULL)
@@ -755,7 +772,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
                                if (bytesToRead <= 0)
                                {
                                        // End of file
-                                       bResult = TRUE;
+                                  success = true;
                                        break;
                                }
                        }
@@ -769,7 +786,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
    delete compressor;
 
    // If file upload failed, send CMD_ABORT_FILE_TRANSFER
-   if (!bResult)
+   if (!success)
    {
       NXCP_MESSAGE msg;
 
@@ -783,28 +800,28 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
          pEnMsg = pCtx->encryptMessage(&msg);
          if (pEnMsg != NULL)
          {
-            SendEx(hSocket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, mutex);
+            channel->send(pEnMsg, ntohl(pEnMsg->size), mutex);
             free(pEnMsg);
          }
       }
       else
       {
-         SendEx(hSocket, (char *)&msg, NXCP_HEADER_SIZE, 0, mutex);
+         channel->send(&msg, NXCP_HEADER_SIZE, mutex);
       }
    }
 
-   return bResult;
+   return success;
 }
 
 /**
  * Get version of NXCP used by peer
  */
-BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex)
+bool LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex)
 {
    NXCP_MESSAGE msg;
    NXCPEncryptionContext *pDummyCtx = NULL;
    NXCP_BUFFER *pBuffer;
-   BOOL bRet = FALSE;
+   bool success = false;
    int nSize;
 
    msg.id = 0;
@@ -821,7 +838,7 @@ BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVers
           (ntohs(msg.code) == CMD_NXCP_CAPS) &&
           (ntohs(msg.flags) & MF_CONTROL))
       {
-         bRet = TRUE;
+         success = true;
          *pnVersion = ntohl(msg.numFields) >> 24;
       }
       else if ((nSize == 1) || (nSize == 3) || (nSize >= NXCP_HEADER_SIZE))
@@ -829,10 +846,10 @@ BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVers
          // We don't receive any answer or receive invalid answer -
          // assume that peer doesn't understand CMD_GET_NXCP_CAPS message
          // and set version number to 1
-         bRet = TRUE;
+         success = true;
          *pnVersion = 1;
       }
       free(pBuffer);
    }
-   return bRet;
+   return success;
 }
index 76c43a1..03f90f2 100644 (file)
@@ -1096,7 +1096,7 @@ int LIBNETXMS_EXPORTABLE SendEx(SOCKET hSocket, const void *data, size_t len, in
        int nLeft = (int)len;
        int nRet;
 
-       if (mutex != NULL)
+       if (mutex != INVALID_MUTEX_HANDLE)
                MutexLock(mutex);
 
        do
@@ -1131,7 +1131,7 @@ retry:
                nLeft -= nRet;
        } while (nLeft > 0);
 
-       if (mutex != NULL)
+       if (mutex != INVALID_MUTEX_HANDLE)
                MutexUnlock(mutex);
 
        return nLeft == 0 ? (int)len : nRet;