added DEFLATE stream compression method
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 09:45:34 +0000 (11:45 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 09:45:34 +0000 (11:45 +0200)
include/nxcpapi.h
src/agent/libnxagent/dfile_info.cpp
src/libnetxms/nxcp.cpp
src/libnetxms/streamcomp.cpp
src/server/tools/nxupload/nxupload.cpp

index 3747108..e0ea982 100644 (file)
@@ -369,7 +369,8 @@ public:
 enum NXCPStreamCompressionMethod
 {
    NXCP_STREAM_COMPRESSION_NONE = 0,
-   NXCP_STREAM_COMPRESSION_LZ4 = 1
+   NXCP_STREAM_COMPRESSION_LZ4 = 1,
+   NXCP_STREAM_COMPRESSION_DEFLATE = 2
 };
 
 /**
@@ -429,6 +430,28 @@ public:
    virtual size_t compressBufferSize(size_t dataSize);
 };
 
+struct z_stream_s;
+
+/**
+ * Deflate stream compressor
+ */
+class LIBNETXMS_EXPORTABLE DeflateStreamCompressor : public StreamCompressor
+{
+private:
+   z_stream_s *m_stream;
+   BYTE *m_buffer;
+   size_t m_bufferSize;
+   bool m_compress;
+
+public:
+   DeflateStreamCompressor(bool compress, size_t maxBlockSize);
+   virtual ~DeflateStreamCompressor();
+
+   virtual size_t compress(const BYTE *in, size_t inSize, BYTE *out, size_t maxOutSize);
+   virtual size_t decompress(const BYTE *in, size_t inSize, const BYTE **out);
+   virtual size_t compressBufferSize(size_t dataSize);
+};
+
 #if 0
 /**
  * NXCP message consumer interface
index 777b9e1..6513bac 100644 (file)
@@ -58,6 +58,8 @@ bool DownloadFileInfo::open()
  */
 bool DownloadFileInfo::write(const BYTE *data, size_t dataSize, bool compressedStream)
 {
+   static const TCHAR *compressionMethods[] = { _T("NONE"), _T("LZ4"), _T("DEFLATE") };
+
    if (!compressedStream)
       return _write(m_file, data, (int)dataSize) == dataSize;
 
@@ -65,13 +67,14 @@ bool DownloadFileInfo::write(const BYTE *data, size_t dataSize, bool compressedS
    {
       NXCPStreamCompressionMethod method = (NXCPStreamCompressionMethod)(*data);
       m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
+      const TCHAR *methodName = (((int)method >= 0) && ((int)method <= 2)) ? compressionMethods[method] : _T("UNKNOWN");
       if (m_compressor != NULL)
       {
-         nxlog_debug(5, _T("DownloadFileInfo(%s): created stream compressor for method %d"), m_fileName, (int)method);
+         nxlog_debug(5, _T("DownloadFileInfo(%s): created stream compressor for method %s"), m_fileName, methodName);
       }
       else
       {
-         nxlog_debug(5, _T("DownloadFileInfo(%s): unable to create stream compressor for method %d"), m_fileName, (int)method);
+         nxlog_debug(5, _T("DownloadFileInfo(%s): unable to create stream compressor for method %s"), m_fileName, methodName);
          return false;
       }
    }
index d4c885f..f78bf67 100644 (file)
@@ -701,7 +701,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
                        pMsg->code = htons(CMD_FILE_DATA);
          pMsg->flags = htons(MF_BINARY | ((compressionMethod != NXCP_STREAM_COMPRESSION_NONE) ? MF_COMPRESSED_STREAM : 0));
 
-                       while(1)
+                       while(true)
                        {
             if (compressor != NULL)
             {
index b90ac14..fdd2f44 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "libnetxms.h"
 #include <nxcpapi.h>
+#include <zlib.h>
 #include "lz4.h"
 
 /**
@@ -39,6 +40,8 @@ StreamCompressor *StreamCompressor::create(NXCPStreamCompressionMethod method, b
 {
    switch(method)
    {
+      case NXCP_STREAM_COMPRESSION_DEFLATE:
+         return new DeflateStreamCompressor(compress, maxBlockSize);
       case NXCP_STREAM_COMPRESSION_LZ4:
          return new LZ4StreamCompressor(compress, maxBlockSize);
       case NXCP_STREAM_COMPRESSION_NONE:
@@ -132,7 +135,6 @@ size_t LZ4StreamCompressor::compress(const BYTE *in, size_t inSize, BYTE *out, s
    if (LZ4_saveDict(m_stream.encoder, m_buffer, 65536) == 0)
       return 0;
 
-_tprintf(_T("compressed: %d -> %d\n"),(int)inSize, (int)bytes);
    return bytes;
 }
 
@@ -162,3 +164,104 @@ size_t LZ4StreamCompressor::compressBufferSize(size_t dataSize)
 {
    return LZ4_compressBound((int)dataSize);
 }
+
+/**
+ * Create new DEFLATE stream compressor
+ */
+DeflateStreamCompressor::DeflateStreamCompressor(bool compress, size_t maxBlockSize)
+{
+   m_compress = compress;
+   m_stream = (z_stream_s *)malloc(sizeof(z_stream_s));
+   m_stream->zalloc = Z_NULL;
+   m_stream->zfree = Z_NULL;
+   m_stream->opaque = Z_NULL;
+   m_stream->avail_in = 0;
+   m_stream->next_in = Z_NULL;
+   if (compress)
+   {
+      m_buffer = NULL;
+      if (deflateInit(m_stream, 9) != Z_OK)
+      {
+         nxlog_debug(5, _T("DeflateStreamCompressor: deflateInit() failed"));
+         free(m_stream);
+         m_stream = NULL;
+      }
+   }
+   else
+   {
+      m_bufferSize = maxBlockSize * 2;
+      m_buffer = (BYTE *)malloc(m_bufferSize);
+      if (inflateInit(m_stream) != Z_OK)
+      {
+         nxlog_debug(5, _T("DeflateStreamCompressor: inflateInit() failed"));
+         free(m_stream);
+         m_stream = NULL;
+      }
+   }
+}
+
+/**
+ * DEFLATE stream compressor destructor
+ */
+DeflateStreamCompressor::~DeflateStreamCompressor()
+{
+   if (m_stream != NULL)
+   {
+      if (m_compress)
+         deflateEnd(m_stream);
+      else
+         inflateEnd(m_stream);
+      free(m_stream);
+   }
+   free(m_buffer);
+}
+
+/**
+ * DEFLATE compressor: compress
+ */
+size_t DeflateStreamCompressor::compress(const BYTE *in, size_t inSize, BYTE *out, size_t maxOutSize)
+{
+   if (m_stream == NULL)
+      return 0;
+
+   m_stream->avail_in = inSize;
+   m_stream->next_in = (BYTE *)in;
+   m_stream->avail_out = maxOutSize;
+   m_stream->next_out = out;
+   if (deflate(m_stream, Z_SYNC_FLUSH) != Z_OK)
+   {
+      nxlog_debug(5, _T("DeflateStreamCompressor: deflate() failed"));
+      return 0;
+   }
+   return maxOutSize - m_stream->avail_out;
+}
+
+/**
+ * DEFLATE compressor: decompress
+ */
+size_t DeflateStreamCompressor::decompress(const BYTE *in, size_t inSize, const BYTE **out)
+{
+   if (m_stream == NULL)
+      return 0;
+
+   m_stream->avail_in = inSize;
+   m_stream->next_in = (BYTE *)in;
+   m_stream->avail_out = m_bufferSize;
+   m_stream->next_out = m_buffer;
+   int rc = inflate(m_stream, Z_SYNC_FLUSH);
+   if ((rc != Z_OK) && (rc != Z_STREAM_END))
+   {
+      nxlog_debug(5, _T("DeflateStreamCompressor: inflate() failed"));
+      return 0;
+   }
+   *out = m_buffer;
+   return m_bufferSize - m_stream->avail_out;
+}
+
+/**
+ * Get required compress buffer size
+ */
+size_t DeflateStreamCompressor::compressBufferSize(size_t dataSize)
+{
+   return (m_stream != NULL) ? deflateBound(m_stream, dataSize) : 0;
+}
index 1250de7..87bff3b 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** nxupload - command line tool used to upload files to NetXMS agent
-** Copyright (C) 2004-2015 Victor Kirhenshtein
+** Copyright (C) 2004-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
@@ -142,7 +142,7 @@ int main(int argc, char *argv[])
 
    // Parse command line
    opterr = 1;
-       while((ch = getopt(argc, argv, "a:d:e:hK:p:qs:uvw:W:z")) != -1)
+       while((ch = getopt(argc, argv, "a:d:e:hK:p:qs:uvw:W:zZ")) != -1)
    {
       switch(ch)
       {
@@ -172,7 +172,8 @@ int main(int argc, char *argv[])
                      _T("   -v           : Display version and exit.\n")
                      _T("   -w <seconds> : Set command timeout (default is 5 seconds)\n")
                      _T("   -W <seconds> : Set connection timeout (default is 30 seconds)\n")
-                     _T("   -z           : Compress data stream.\n")
+                     _T("   -z           : Compress data stream with LZ4.\n")
+                     _T("   -Z           : Compress data stream with DEFLATE.\n")
                      _T("\n"), 
 #ifdef _WITH_ENCRYPTION
                      szKeyFile,
@@ -286,6 +287,9 @@ int main(int argc, char *argv[])
          case 'z':
             compression = NXCP_STREAM_COMPRESSION_LZ4;
             break;
+         case 'Z':
+            compression = NXCP_STREAM_COMPRESSION_DEFLATE;
+            break;
          case '?':
             bStart = FALSE;
             break;