implemented compression for binary NXCP messages; flag for marking binary messages...
authorVictor Kirhenshtein <victor@netxms.org>
Tue, 14 Feb 2017 14:28:17 +0000 (16:28 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Tue, 14 Feb 2017 14:28:17 +0000 (16:28 +0200)
16 files changed:
include/nms_cscp.h
include/nxcpapi.h
src/agent/core/session.cpp
src/java/client/netxms-base/src/main/java/org/netxms/base/NXCPMessage.java
src/java/client/netxms-base/src/test/java/org/netxms/base/NXCPMessageTest.java
src/libnetxms/message.cpp
src/libnetxms/nxcp.cpp
src/libnetxms/streamcomp.cpp
src/server/core/alarm.cpp
src/server/core/npe.cpp
src/server/core/session.cpp
src/server/include/nms_core.h
src/server/include/nxsrvapi.h
src/server/libnxsrv/agent.cpp
src/server/tools/nxupload/nxupload.cpp
tests/test-libnetxms/nxcp.cpp

index 1a6d65c..ac7e746 100644 (file)
@@ -225,13 +225,14 @@ typedef struct
 /**
  * Message flags
  */
-#define MF_BINARY          0x0001
-#define MF_END_OF_FILE     0x0002
-#define MF_DONT_ENCRYPT    0x0004
-#define MF_END_OF_SEQUENCE 0x0008
-#define MF_REVERSE_ORDER   0x0010
-#define MF_CONTROL         0x0020
-#define MF_COMPRESSED      0x0040
+#define MF_BINARY             0x0001
+#define MF_END_OF_FILE        0x0002
+#define MF_DONT_ENCRYPT       0x0004
+#define MF_END_OF_SEQUENCE    0x0008
+#define MF_REVERSE_ORDER      0x0010
+#define MF_CONTROL            0x0020
+#define MF_COMPRESSED         0x0040
+#define MF_COMPRESSED_STREAM  0x0080
 
 /**
  * Message (command) codes
index 81f7202..3747108 100644 (file)
@@ -88,7 +88,7 @@ public:
    bool isReverseOrder() const { return (m_flags & MF_REVERSE_ORDER) ? true : false; }
    bool isBinary() const { return (m_flags & MF_BINARY) ? true : false; }
    bool isControl() const { return (m_flags & MF_CONTROL) ? true : false; }
-   bool isCompressed() const { return (m_flags & MF_COMPRESSED) ? true : false; }
+   bool isCompressedStream() const { return (m_flags & MF_COMPRESSED_STREAM) ? true : false; }
 
    const BYTE *getBinaryData() const { return m_data; }
    size_t getBinaryDataSize() const { return m_dataSize; }
@@ -364,12 +364,12 @@ public:
 };
 
 /**
- * NXCP compression methods
+ * NXCP stresam compression methods
  */
-enum NXCPCompressionMethod
+enum NXCPStreamCompressionMethod
 {
-   NXCP_COMPRESSION_NONE = 0,
-   NXCP_COMPRESSION_LZ4 = 1
+   NXCP_STREAM_COMPRESSION_NONE = 0,
+   NXCP_STREAM_COMPRESSION_LZ4 = 1
 };
 
 /**
@@ -384,7 +384,7 @@ public:
    virtual size_t decompress(const BYTE *in, size_t inSize, const BYTE **out) = 0;
    virtual size_t compressBufferSize(size_t dataSize) = 0;
 
-   static StreamCompressor *create(NXCPCompressionMethod method, bool compress, size_t maxBlockSize);
+   static StreamCompressor *create(NXCPStreamCompressionMethod method, bool compress, size_t maxBlockSize);
 };
 
 /**
@@ -488,13 +488,13 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuf
                                            NXCPEncryptionContext **ppCtx,
                                            BYTE **decryptionBuffer, UINT32 dwTimeout,
                                                                                                                 UINT32 maxMsgSize);
-NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(WORD wCode, UINT32 dwId, WORD flags,
-                                                        UINT32 dwDataSize, void *pData,
-                                                        NXCP_MESSAGE *pBuffer);
+NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(UINT16 code, UINT32 id, UINT16 flags,
+                                                        const void *data, size_t dataSize,
+                                                        NXCP_MESSAGE *buffer, bool allowCompression);
 BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 dwId, const TCHAR *pszFile,
                                            NXCPEncryptionContext *pCtx, long offset,
                                                                                                                 void (* progressCallback)(INT64, void *), void *cbArg,
-                                                                                                                MUTEX mutex, NXCPCompressionMethod compressionMethod = NXCP_COMPRESSION_NONE);
+                                                                                                                MUTEX mutex, NXCPStreamCompressionMethod compressionMethod = NXCP_STREAM_COMPRESSION_NONE);
 BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex);
 
 TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(UINT16 wCode, TCHAR *buffer);
index 32b69ce..39e0b88 100644 (file)
@@ -269,12 +269,12 @@ void CommSession::readThread()
                {
                   const BYTE *data;
                   int dataSize;
-                  if (msg->isCompressed())
+                  if (msg->isCompressedStream())
                   {
                      const BYTE *in = msg->getBinaryData();
                      if (m_compressor == NULL)
                      {
-                        NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
+                        NXCPStreamCompressionMethod method = (NXCPStreamCompressionMethod)(*in);
                         m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
                         if (m_compressor == NULL)
                         {
index 5f962c9..02694e3 100644 (file)
@@ -51,6 +51,7 @@ public class NXCPMessage
        public static final int MF_REVERSE_ORDER = 0x0010;
        public static final int MF_CONTROL = 0x0020;
    public static final int MF_COMPRESSED = 0x0040;
+   public static final int MF_COMPRESSED_STREAM = 0x0080;
        
        private int messageCode;
        private int messageFlags;
@@ -153,6 +154,12 @@ public class NXCPMessage
                {
                        final int size = inputStream.readInt();
                        binaryData = new byte[size];
+         if ((messageFlags & MF_COMPRESSED) == MF_COMPRESSED)
+         {
+            // Compressed message
+            inputStream.skip(4);  // skip original message length
+            inputStream = new NXCPDataInputStream(new InflaterInputStream(inputStream));
+         }
                        inputStream.readFully(binaryData);
                }
                else if ((messageFlags & MF_CONTROL) == MF_CONTROL)
@@ -623,15 +630,38 @@ public class NXCPMessage
                }
                else if ((messageFlags & MF_BINARY) == MF_BINARY) 
                {
-                       outputStream.writeShort(messageCode); // wCode
-                       outputStream.writeShort(messageFlags); // wFlags
-                       final int length = binaryData.length;
+                       byte[] payload = binaryData;
+                       boolean compressed = false;
+                       if (allowCompression && ((messageFlags & MF_COMPRESSED_STREAM) == 0) && (binaryData.length > 128))
+                       {
+                          ByteArrayOutputStream compDataByteStream = new ByteArrayOutputStream();
+            byte[] length = new byte[4];
+            final int unpackedPadding = (8 - ((binaryData.length + HEADER_SIZE) % 8)) & 7;
+            intToBytes(unpackedPadding + HEADER_SIZE, length, 0);
+            compDataByteStream.write(length);   // unpacked message size
+            
+            DeflaterOutputStream deflaterStream = new DeflaterOutputStream(compDataByteStream, new Deflater(JZlib.Z_BEST_COMPRESSION));
+            deflaterStream.write(binaryData);
+            deflaterStream.close();
+
+            byte[] compPayload = compDataByteStream.toByteArray();
+            if (compPayload.length < binaryData.length)
+            {
+               payload = compPayload;
+               compressed = true;
+            }
+                       }
+
+         outputStream.writeShort(messageCode); // wCode
+                       outputStream.writeShort(compressed ? (messageFlags | MF_COMPRESSED) : messageFlags); // wFlags
+                          
+                       final int length = payload.length;
                        final int padding = (8 - ((length + HEADER_SIZE) % 8)) & 7;
                        final int packetSize = length + HEADER_SIZE + padding;
                        outputStream.writeInt(packetSize); // dwSize (padded to 8 bytes boundaries)
                        outputStream.writeInt((int)messageId); // dwId
-                       outputStream.writeInt(length); // dwNumVars, here used for real size of the payload (w/o headers and padding)
-                       outputStream.write(binaryData);
+                       outputStream.writeInt(binaryData.length); // dwNumVars, here used for real size of the payload (w/o headers and padding)
+                       outputStream.write(payload);
                        for (int i = 0; i < padding; i++)
                                outputStream.writeByte(0);
                }
index 7c03ca8..696e07c 100644 (file)
@@ -93,7 +93,24 @@ public class NXCPMessageTest extends TestCase
       assertEquals(10, msg2.findField(2).getAsInteger().intValue());
       assertEquals(20, msg2.findField(3).getAsInteger().intValue());
       assertEquals(123456789L, msg2.findField(4).getAsInteger().longValue());
-      assertEquals(true, Arrays.equals(byteTest, msg2.findField(5).getAsBinary()));
+      assertTrue(Arrays.equals(byteTest, msg2.findField(5).getAsBinary()));
+   }
+   
+   public void testCompressedBinaryMessage() throws Exception
+   {
+      final byte[] byteTest = Arrays.copyOf(new byte[] { 0x10, 0x20, 0x30, 0x40, 0x50 }, 500);
+      NXCPMessage msg1 = new NXCPMessage(1, 100);
+      msg1.setBinaryMessage(true);
+      msg1.setBinaryData(byteTest);
+      
+      final byte[] bytes = msg1.createNXCPMessage(true);
+      assertEquals(40, bytes.length);
+      
+      final NXCPMessage msg2 = new NXCPMessage(bytes, null);
+      assertEquals(1, msg2.getMessageCode());
+      assertEquals(100L, msg2.getMessageId());
+      assertTrue(msg2.isBinaryMessage());
+      assertTrue(Arrays.equals(byteTest, msg2.getBinaryData()));
    }
    
        /**
index 00c62b6..22d1971 100644 (file)
@@ -142,7 +142,39 @@ NXCPMessage::NXCPMessage(NXCP_MESSAGE *msg, int version)
    if (m_flags & MF_BINARY)
    {
       m_dataSize = (size_t)ntohl(msg->numFields);
-      m_data = (BYTE *)nx_memdup(msg->fields, m_dataSize);
+      if ((m_flags & MF_COMPRESSED) && (m_version >= 4))
+      {
+         m_flags &= ~MF_COMPRESSED; // clear "compressed" flag so it will not be mistakenly re-sent
+
+         z_stream stream;
+         stream.zalloc = Z_NULL;
+         stream.zfree = Z_NULL;
+         stream.opaque = Z_NULL;
+         stream.avail_in = (size_t)ntohl(msg->size) - NXCP_HEADER_SIZE - 4;
+         stream.next_in = (BYTE *)msg + NXCP_HEADER_SIZE + 4;
+         if (inflateInit(&stream) != Z_OK)
+         {
+            nxlog_debug(6, _T("NXCPMessage: inflateInit() failed"));
+            return;
+         }
+
+         m_data = (BYTE *)malloc(m_dataSize);
+         stream.next_out = m_data;
+         stream.avail_out = m_dataSize;
+
+         if (inflate(&stream, Z_FINISH) != Z_STREAM_END)
+         {
+            inflateEnd(&stream);
+            TCHAR buffer[256];
+            nxlog_debug(6, _T("NXCPMessage: failed to decompress binary message %s with ID %d"), NXCPMessageCodeName(m_code, buffer), m_id);
+            return;
+         }
+         inflateEnd(&stream);
+      }
+      else
+      {
+         m_data = (BYTE *)nx_memdup(msg->fields, m_dataSize);
+      }
    }
    else
    {
@@ -768,6 +800,8 @@ NXCP_MESSAGE *NXCPMessage::createMessage(bool allowCompression) const
    // Calculate message size
    size_t size = NXCP_HEADER_SIZE;
    UINT32 fieldCount = 0;
+   BYTE *compressedData = NULL;
+   size_t compressedDataSize = 0;
    if (m_flags & MF_BINARY)
    {
       size += m_dataSize;
@@ -860,7 +894,7 @@ NXCP_MESSAGE *NXCPMessage::createMessage(bool allowCompression) const
    }
 
    // Compress message payload if requested. Compression supported starting with NXCP version 4.
-   if ((m_version >= 4) && allowCompression && (size > 128))
+   if ((m_version >= 4) && allowCompression && (size > 128) && !(m_flags & MF_COMPRESSED_STREAM))
    {
       z_stream stream;
       stream.zalloc = Z_NULL;
index 1d643d8..18d727f 100644 (file)
@@ -24,6 +24,7 @@
 #include "libnetxms.h"
 #include <nxcpapi.h>
 #include <nxstat.h>
+#include <zlib.h>
 
 /**
  * Additional message name resolvers
@@ -599,32 +600,69 @@ int LIBNETXMS_EXPORTABLE RecvNXCPMessage(SOCKET hSocket, NXCP_MESSAGE *msgBuffer
 
 /**
  * Create NXCP message with raw data (MF_BINARY flag)
- * If pBuffer is NULL, new buffer is allocated with malloc()
- * Buffer should be of dwDataSize + NXCP_HEADER_SIZE + 8 bytes.
+ * If buffer is NULL, new buffer is allocated with malloc()
+ * Buffer should be at least dataSize + NXCP_HEADER_SIZE + 8 bytes.
  */
-NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(WORD code, UINT32 id, WORD flags,
-                                                        UINT32 dwDataSize, void *pData,
-                                                        NXCP_MESSAGE *pBuffer)
+NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(UINT16 code, UINT32 id, UINT16 flags,
+                                                        const void *data, size_t dataSize,
+                                                        NXCP_MESSAGE *buffer, bool allowCompression)
 {
-   NXCP_MESSAGE *pMsg;
-   UINT32 dwPadding;
-
-   if (pBuffer == NULL)
-      pMsg = (NXCP_MESSAGE *)malloc(dwDataSize + NXCP_HEADER_SIZE + 8);
-   else
-      pMsg = pBuffer;
+   NXCP_MESSAGE *msg = (buffer == NULL) ? (NXCP_MESSAGE *)malloc(dataSize + NXCP_HEADER_SIZE + 8) : buffer;
 
    // Message should be aligned to 8 bytes boundary
-   dwPadding = (8 - ((dwDataSize + NXCP_HEADER_SIZE) % 8)) & 7;
+   size_t padding = (8 - ((dataSize + NXCP_HEADER_SIZE) % 8)) & 7;
 
-   pMsg->code = htons(code);
-   pMsg->flags = htons(MF_BINARY | flags);
-   pMsg->id = htonl(id);
-   pMsg->size = htonl(dwDataSize + NXCP_HEADER_SIZE + dwPadding);
-   pMsg->numFields = htonl(dwDataSize);   // numFields contains actual data size for binary message
-   memcpy(pMsg->fields, pData, dwDataSize);
+   msg->code = htons(code);
+   msg->flags = htons(MF_BINARY | flags);
+   msg->id = htonl(id);
+   size_t msgSize = dataSize + NXCP_HEADER_SIZE + padding;
+   msg->size = htonl((UINT32)msgSize);
+   msg->numFields = htonl((UINT32)dataSize);   // numFields contains actual data size for binary message
 
-   return pMsg;
+   if (allowCompression)
+   {
+      z_stream stream;
+      stream.zalloc = Z_NULL;
+      stream.zfree = Z_NULL;
+      stream.opaque = Z_NULL;
+      stream.avail_in = 0;
+      stream.next_in = Z_NULL;
+      if (deflateInit(&stream, 9) == Z_OK)
+      {
+         stream.next_in = (BYTE *)data;
+         stream.avail_in = dataSize;
+         stream.next_out = (BYTE *)msg->fields + 4;
+         stream.avail_out = dataSize + padding - 4;
+         if (deflate(&stream, Z_FINISH) == Z_STREAM_END)
+         {
+            size_t compMsgSize = dataSize - stream.avail_out + NXCP_HEADER_SIZE + 4;
+            // Message should be aligned to 8 bytes boundary
+            compMsgSize += (8 - (compMsgSize % 8)) & 7;
+            if (compMsgSize < msgSize - 4)
+            {
+               msg->flags |= htons(MF_COMPRESSED);
+               memcpy((BYTE *)msg + NXCP_HEADER_SIZE, &msg->size, 4); // Save size of uncompressed message
+               msg->size = htonl((UINT32)compMsgSize);
+            }
+            else
+            {
+               // compression produce message of same size
+               memcpy(msg->fields, data, dataSize);
+            }
+         }
+         else
+         {
+            // compression failed, send uncompressed message
+            memcpy(msg->fields, data, dataSize);
+         }
+         deflateEnd(&stream);
+      }
+   }
+   else
+   {
+      memcpy(msg->fields, data, dataSize);
+   }
+   return msg;
 }
 
 /**
@@ -633,7 +671,7 @@ NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(WORD code, UINT32 id, WO
 BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHAR *pszFile,
                                            NXCPEncryptionContext *pCtx, long offset,
                                                                                                                 void (* progressCallback)(INT64, void *), void *cbArg,
-                                                                                                                MUTEX mutex, NXCPCompressionMethod compressionMethod)
+                                                                                                                MUTEX mutex, NXCPStreamCompressionMethod compressionMethod)
 {
    int hFile, iBytes;
        INT64 bytesTransferred = 0;
@@ -642,7 +680,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
    NXCP_MESSAGE *pMsg;
    NXCP_ENCRYPTED_MESSAGE *pEnMsg;
 
-   StreamCompressor *compressor = (compressionMethod != NXCP_COMPRESSION_NONE) ? StreamCompressor::create(compressionMethod, true, FILE_BUFFER_SIZE) : NULL;
+   StreamCompressor *compressor = (compressionMethod != NXCP_STREAM_COMPRESSION_NONE) ? StreamCompressor::create(compressionMethod, true, FILE_BUFFER_SIZE) : NULL;
    BYTE *compBuffer = (compressor != NULL) ? (BYTE *)malloc(FILE_BUFFER_SIZE) : NULL;
 
    hFile = _topen(pszFile, O_RDONLY | O_BINARY);
@@ -661,7 +699,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
          pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE + 8 + ((compressor != NULL) ? compressor->compressBufferSize(FILE_BUFFER_SIZE) + 4 : FILE_BUFFER_SIZE));
                        pMsg->id = htonl(id);
                        pMsg->code = htons(CMD_FILE_DATA);
-         pMsg->flags = htons(MF_BINARY | ((compressionMethod != NXCP_COMPRESSION_NONE) ? MF_COMPRESSED : 0));
+         pMsg->flags = htons(MF_BINARY | ((compressionMethod != NXCP_STREAM_COMPRESSION_NONE) ? MF_COMPRESSED_STREAM : 0));
 
                        while(1)
                        {
@@ -727,7 +765,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
                _close(hFile);
        }
 
-   safe_free(compBuffer);
+   free(compBuffer);
    delete compressor;
 
    // If file upload failed, send CMD_ABORT_FILE_TRANSFER
index 832eee1..9998b4c 100644 (file)
@@ -35,13 +35,13 @@ StreamCompressor::~StreamCompressor()
 /**
  * Create compressor object for given method
  */
-StreamCompressor *StreamCompressor::create(NXCPCompressionMethod method, bool compress, size_t maxBlockSize)
+StreamCompressor *StreamCompressor::create(NXCPStreamCompressionMethod method, bool compress, size_t maxBlockSize)
 {
    switch(method)
    {
-      case NXCP_COMPRESSION_LZ4:
+      case NXCP_STREAM_COMPRESSION_LZ4:
          return new LZ4StreamCompressor(compress, maxBlockSize);
-      case NXCP_COMPRESSION_NONE:
+      case NXCP_STREAM_COMPRESSION_NONE:
          return new DummyStreamCompressor();
    }
    return NULL;
index 81f3e64..52767fe 100644 (file)
@@ -942,13 +942,12 @@ UINT32 Alarm::openHelpdeskIssue(TCHAR *hdref)
       if (rcc == RCC_SUCCESS)
       {
          m_helpDeskState = ALARM_HELPDESK_OPEN;
+         nx_strncpy(hdref, m_helpDeskRef, MAX_HELPDESK_REF_LEN);
          NotifyClients(NX_NOTIFY_ALARM_CHANGED, this);
          updateInDatabase();
          if (hdref != NULL)
             nx_strncpy(hdref, m_helpDeskRef, MAX_HELPDESK_REF_LEN);
-
-         DbgPrintf(5, _T("Helpdesk issue created for alarm %d, reference \"%s\""), m_alarmId, m_helpDeskRef);
-
+         nxlog_debug(5, _T("Helpdesk issue created for alarm %d, reference \"%s\""), m_alarmId, m_helpDeskRef);
       }
    }
    else
index 3fab125..8482e0e 100644 (file)
@@ -218,8 +218,8 @@ bool GetPredictedData(ClientSession *session, const NXCPMessage *request, NXCPMe
    // Prepare and send raw message with fetched data
    NXCP_MESSAGE *msg =
       CreateRawNXCPMessage(CMD_DCI_DATA, request->getId(), 0,
-                           rows * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
-                           pData, NULL);
+                           pData, rows * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
+                           NULL, session->isCompressionEnabled());
    free(pData);
    session->sendRawMessage(msg);
    free(msg);
index a318f1c..3c9c6d1 100644 (file)
@@ -4289,8 +4289,8 @@ bool ClientSession::getCollectedDataFromDB(NXCPMessage *request, NXCPMessage *re
       // Prepare and send raw message with fetched data
       NXCP_MESSAGE *msg =
          CreateRawNXCPMessage(CMD_DCI_DATA, request->getId(), 0,
-                              s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
-                              pData, NULL);
+                              pData, s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
+                              NULL, isCompressionEnabled());
       free(pData);
       sendRawMessage(msg);
       free(msg);
@@ -4458,8 +4458,8 @@ read_from_db:
                        // Prepare and send raw message with fetched data
                        NXCP_MESSAGE *msg =
                                CreateRawNXCPMessage(CMD_DCI_DATA, request->getId(), 0,
-                                                                                       rows * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
-                                                                                       pData, NULL);
+                                                                                       pData, rows * s_rowSize[dataType] + sizeof(DCI_DATA_HEADER),
+                                                                                       NULL, isCompressionEnabled());
                        free(pData);
                        sendRawMessage(msg);
                        free(msg);
index 76a0f15..608b9dc 100644 (file)
@@ -796,7 +796,8 @@ public:
    bool isAuthenticated() const { return (m_dwFlags & CSF_AUTHENTICATED) ? true : false; }
    bool isTerminated() const { return (m_dwFlags & CSF_TERMINATED) ? true : false; }
    bool isConsoleOpen() const { return (m_dwFlags & CSF_CONSOLE_OPEN) ? true : false; }
-   WORD getCurrentCmd() const { return m_wCurrentCmd; }
+   bool isCompressionEnabled() const { return (m_dwFlags & CSF_COMPRESSION_ENABLED) ? true : false; }
+   UINT16 getCurrentCmd() const { return m_wCurrentCmd; }
    int getCipher() const { return (m_pCtx == NULL) ? -1 : m_pCtx->getCipher(); }
        int getClientType() const { return m_clientType; }
    time_t getLoginTime() const { return m_loginTime; }
index 6b2e4ea..6f5659a 100644 (file)
@@ -561,7 +561,7 @@ public:
    UINT32 setServerCapabilities();
    UINT32 setServerId(UINT64 serverId);
    UINT32 execAction(const TCHAR *action, int argc, const TCHAR * const *argv, bool withOutput = false, void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *) = NULL, void *cbData = NULL);
-   UINT32 uploadFile(const TCHAR *localFile, const TCHAR *destinationFile = NULL, void (* progressCallback)(INT64, void *) = NULL, void *cbArg = NULL, NXCPCompressionMethod compMethod = NXCP_COMPRESSION_NONE);
+   UINT32 uploadFile(const TCHAR *localFile, const TCHAR *destinationFile = NULL, void (* progressCallback)(INT64, void *) = NULL, void *cbArg = NULL, NXCPStreamCompressionMethod compMethod = NXCP_STREAM_COMPRESSION_NONE);
    UINT32 startUpgrade(const TCHAR *pszPkgName);
    UINT32 checkNetworkService(UINT32 *pdwStatus, const InetAddress& addr, int iServiceType, WORD wPort = 0,
                               WORD wProto = 0, const TCHAR *pszRequest = NULL, const TCHAR *pszResponse = NULL, UINT32 *responseTime = NULL);
index 7fd6f11..72c3941 100644 (file)
@@ -1,7 +1,7 @@
 /*
 ** NetXMS - Network Management System
 ** Server Library
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU Lesser General Public License as published by
@@ -1314,7 +1314,7 @@ UINT32 AgentConnection::execAction(const TCHAR *action, int argc, const TCHAR *
 /**
  * Upload file to agent
  */
-UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinationFile, void (* progressCallback)(INT64, void *), void *cbArg, NXCPCompressionMethod compMethod)
+UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinationFile, void (* progressCallback)(INT64, void *), void *cbArg, NXCPStreamCompressionMethod compMethod)
 {
    UINT32 dwRqId, dwResult;
    NXCPMessage msg(m_nProtocolVersion);
index 3d8d97f..1250de7 100644 (file)
@@ -133,7 +133,7 @@ int main(int argc, char *argv[])
    TCHAR szKeyFile[MAX_PATH];
    TCHAR szDestinationFile[MAX_PATH] = {0};
    RSA *pServerKey = NULL;
-   NXCPCompressionMethod compression = NXCP_COMPRESSION_NONE;
+   NXCPStreamCompressionMethod compression = NXCP_STREAM_COMPRESSION_NONE;
 
    InitNetXMSProcess();
 
@@ -284,7 +284,7 @@ int main(int argc, char *argv[])
 #endif
             break;
          case 'z':
-            compression = NXCP_COMPRESSION_LZ4;
+            compression = NXCP_STREAM_COMPRESSION_LZ4;
             break;
          case '?':
             bStart = FALSE;
index 4e04f4e..44937f9 100644 (file)
@@ -4,6 +4,11 @@
 #include <testtools.h>
 
 /**
+ * Test text
+ */
+static TCHAR longText[] = _T("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
+
+/**
  * Poster thread
  */
 static THREAD_RESULT THREAD_CALL PosterThread(void *arg)
@@ -65,4 +70,16 @@ void TestMessageClass()
    AssertTrue(!_tcscmp(msg.getFieldAsString(1, buffer, 64), _T("test text")));
 
    EndTest();
+
+   StartTest(_T("NXCP message compression"));
+
+   msg.setField(100, longText);
+   NXCP_MESSAGE *binMsg = msg.createMessage(true);
+   AssertTrue((ntohs(binMsg->flags) & MF_COMPRESSED) != 0);
+
+   NXCPMessage *dmsg = new NXCPMessage(binMsg);
+   AssertTrue(!_tcscmp(dmsg->getFieldAsString(100), longText));
+   delete dmsg;
+
+   EndTest();
 }