handling of compressed NXCP streams changed for compatibility with old agents
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 13:14:04 +0000 (15:14 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 13:14:04 +0000 (15:14 +0200)
include/nms_cscp.h
include/nxcpapi.h
src/java/client/netxms-base/src/main/java/org/netxms/base/NXCPMessage.java
src/java/client/netxms-client/src/main/java/org/netxms/client/NXCSession.java
src/java/netxms-eclipse/FileManager/src/org/netxms/ui/eclipse/filemanager/views/AgentFileManager.java
src/libnetxms/message.cpp
src/libnetxms/nxcp.cpp
src/server/core/session.cpp
src/server/include/nxsrvapi.h

index ac7e746..671f825 100644 (file)
@@ -232,7 +232,7 @@ typedef struct
 #define MF_REVERSE_ORDER      0x0010
 #define MF_CONTROL            0x0020
 #define MF_COMPRESSED         0x0040
-#define MF_COMPRESSED_STREAM  0x0080
+#define MF_STREAM             0x0080
 
 /**
  * Message (command) codes
index e0ea982..863d066 100644 (file)
@@ -88,7 +88,7 @@ public:
    bool isReverseOrder() const { return (m_flags & MF_REVERSE_ORDER) ? true : false; }
    bool isBinary() const { return (m_flags & MF_BINARY) ? true : false; }
    bool isControl() const { return (m_flags & MF_CONTROL) ? true : false; }
-   bool isCompressedStream() const { return (m_flags & MF_COMPRESSED_STREAM) ? true : false; }
+   bool isCompressedStream() const { return ((m_flags & (MF_COMPRESSED | MF_STREAM)) == (MF_COMPRESSED | MF_STREAM)) ? true : false; }
 
    const BYTE *getBinaryData() const { return m_data; }
    size_t getBinaryDataSize() const { return m_dataSize; }
index 936a5a4..9697cbf 100644 (file)
@@ -51,7 +51,7 @@ public class NXCPMessage
        public static final int MF_REVERSE_ORDER = 0x0010;
        public static final int MF_CONTROL = 0x0020;
    public static final int MF_COMPRESSED = 0x0040;
-   public static final int MF_COMPRESSED_STREAM = 0x0080;
+   public static final int MF_STREAM = 0x0080;
        
        private int messageCode;
        private int messageFlags;
@@ -154,7 +154,7 @@ public class NXCPMessage
                {
                        final int size = inputStream.readInt();
                        binaryData = new byte[size];
-         if ((messageFlags & MF_COMPRESSED) == MF_COMPRESSED)
+         if (((messageFlags & MF_COMPRESSED) == MF_COMPRESSED) && ((messageFlags & MF_STREAM) == 0))
          {
             // Compressed message
             inputStream.skip(4);  // skip original message length
@@ -632,7 +632,7 @@ public class NXCPMessage
                {
                        byte[] payload = binaryData;
                        boolean compressed = false;
-                       if (allowCompression && ((messageFlags & MF_COMPRESSED_STREAM) == 0) && (binaryData.length > 128))
+                       if (allowCompression && ((messageFlags & MF_STREAM) == 0) && (binaryData.length > 128))
                        {
                           ByteArrayOutputStream compDataByteStream = new ByteArrayOutputStream();
             byte[] length = new byte[4];
@@ -863,25 +863,43 @@ public class NXCPMessage
        }
 
    /**
-    * Return true if message has "compressed stream" flag set
+    * Return true if message has "stream" flags set
+    * @return "compressed stream" flag
+    */
+   public boolean isStream()
+   {
+      return (messageFlags & MF_STREAM) == MF_STREAM;
+   }
+   
+   /**
+    * Return true if message has "compressed" and "stream" flags set
     * @return "compressed stream" flag
     */
    public boolean isCompressedStream()
    {
-      return (messageFlags & MF_COMPRESSED_STREAM) == MF_COMPRESSED_STREAM;
+      return (messageFlags & (MF_COMPRESSED | MF_STREAM)) == (MF_COMPRESSED | MF_STREAM);
    }
    
    /**
-    * Set "compressed stream" message flag
+    * Set stream related message flags
     * 
-    * @param isCompressedStream true to set "compressed stream" message flag
+    * @param isStream true to set "stream" message flag
+    * @param isCompressed true to set "compressed" message flag (ignored if isStream is false)
     */
-   public void setCompressedStream(boolean isCompressedStream)
+   public void setStream(boolean isStream, boolean isCompressed)
    {
-      if (isCompressedStream)
-         messageFlags |= MF_COMPRESSED_STREAM;
+      if (isStream)
+      {
+         messageFlags |= MF_STREAM;
+         if (isCompressed)
+            messageFlags |= MF_COMPRESSED;
+         else
+            messageFlags &= ~MF_COMPRESSED;
+      }
       else
-         messageFlags &= ~MF_COMPRESSED_STREAM;
+      {
+         messageFlags &= ~(MF_COMPRESSED | MF_STREAM);
+      }
    }
    
        /* (non-Javadoc)
index e24a557..ac73501 100644 (file)
@@ -1415,32 +1415,36 @@ public class NXCSession
     *
     * @param requestId request ID
     * @param file source file to be sent
+    * @param listener progress listener
+    * @param allowStreamCompression true if data stream compression is allowed
     * @throws IOException
     * @throws NXCException
     */
-   protected void sendFile(final long requestId, final File file, ProgressListener listener) throws IOException, NXCException
+   protected void sendFile(final long requestId, final File file, ProgressListener listener, boolean allowStreamCompression) throws IOException, NXCException
    {
       if (listener != null) 
          listener.setTotalWorkAmount(file.length());
       final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
-      sendFileStream(requestId, inputStream, listener);
+      sendFileStream(requestId, inputStream, listener, allowStreamCompression);
       inputStream.close();
    }
 
    /**
     * Send block of data as binary message
     *
-    * @param requestId
-    * @param data
+    * @param requestId request ID
+    * @param data file data
+    * @param listener progress listener
+    * @param allowStreamCompression true if data stream compression is allowed
     * @throws IOException
     * @throws NXCException
     */
-   protected void sendFile(final long requestId, final byte[] data, ProgressListener listener) throws IOException, NXCException
+   protected void sendFile(final long requestId, final byte[] data, ProgressListener listener, boolean allowStreamCompression) throws IOException, NXCException
    {
       if (listener != null) 
          listener.setTotalWorkAmount(data.length);
       final InputStream inputStream = new ByteArrayInputStream(data);
-      sendFileStream(requestId, inputStream, listener);
+      sendFileStream(requestId, inputStream, listener, allowStreamCompression);
       inputStream.close();
    }
 
@@ -1448,17 +1452,20 @@ public class NXCSession
     * Send binary message, data loaded from provided input stream and splitted
     * into chunks of {@value FILE_BUFFER_SIZE} bytes
     *
-    * @param requestId
-    * @param inputStream
+    * @param requestId request ID
+    * @param inputStream data input stream
+    * @param listener progress listener
+    * @param allowStreamCompression true if data stream compression is allowed
     * @throws IOException
     * @throws NXCException
     */
-   private void sendFileStream(final long requestId, final InputStream inputStream, ProgressListener listener) throws IOException, NXCException
+   private void sendFileStream(final long requestId, final InputStream inputStream, ProgressListener listener, boolean allowStreamCompression) throws IOException, NXCException
    {
       NXCPMessage msg = new NXCPMessage(NXCPCodes.CMD_FILE_DATA, requestId);
       msg.setBinaryMessage(true);
       
-      Deflater compressor = allowCompression ? new Deflater(9) : null;
+      Deflater compressor = allowStreamCompression ? new Deflater(9) : null;
+      msg.setStream(true, allowStreamCompression);
 
       boolean success = false;
       final byte[] buffer = new byte[FILE_BUFFER_SIZE];
@@ -1485,7 +1492,6 @@ public class NXCSession
             payload[2] = (byte)((bytesRead >> 8) & 0xFF);   // uncompressed length, high bits
             payload[3] = (byte)(bytesRead & 0xFF);   // uncompressed length, low bits
             msg.setBinaryData(payload);
-            msg.setCompressedStream(true);
          }
          else
          {
@@ -7396,7 +7402,7 @@ public class NXCSession
       final UUID imageGuid = response.getFieldAsUUID(NXCPCodes.VID_GUID);
       image.setGuid(imageGuid);
 
-      sendFile(msg.getMessageId(), image.getBinaryData(), listener);
+      sendFile(msg.getMessageId(), image.getBinaryData(), listener, allowCompression);
 
       waitForRCC(msg.getMessageId());
 
@@ -7439,7 +7445,7 @@ public class NXCSession
       sendMessage(msg);
       waitForRCC(msg.getMessageId());
 
-      sendFile(msg.getMessageId(), image.getBinaryData(), listener);
+      sendFile(msg.getMessageId(), image.getBinaryData(), listener, allowCompression);
 
       waitForRCC(msg.getMessageId());
    }
@@ -7658,8 +7664,7 @@ public class NXCSession
    }
 
    /**
-    * Start file upload from server's file store to agent. Returns ID of upload
-    * job.
+    * Start file upload from server's file store to agent. Returns ID of upload job.
     *
     * @param nodeId         node object ID
     * @param serverFileName file name in server's file store
@@ -7704,7 +7709,7 @@ public class NXCSession
       msg.setField(NXCPCodes.VID_MODIFICATION_TIME, new Date(localFile.lastModified()));
       sendMessage(msg);
       waitForRCC(msg.getMessageId());
-      sendFile(msg.getMessageId(), localFile, listener);
+      sendFile(msg.getMessageId(), localFile, listener, allowCompression);
    }
    
    /**
@@ -7730,10 +7735,10 @@ public class NXCSession
       msg.setField(NXCPCodes.VID_FILE_NAME, remoteFileName);
       msg.setField(NXCPCodes.VID_MODIFICATION_TIME, new Date(localFile.lastModified()));
       sendMessage(msg);
-      waitForRCC(msg.getMessageId());
-      sendFile(msg.getMessageId(), localFile, listener);
+      NXCPMessage response = waitForRCC(msg.getMessageId());
+      sendFile(msg.getMessageId(), localFile, listener, response.getFieldAsBoolean(NXCPCodes.VID_ENABLE_COMPRESSION));
    }
-   
+
    /**
     * Create folder on remote system via agent
     *
@@ -8246,7 +8251,7 @@ public class NXCSession
       sendMessage(msg);
       final NXCPMessage response = waitForRCC(msg.getMessageId());
       final long id = response.getFieldAsInt64(NXCPCodes.VID_PACKAGE_ID);
-      sendFile(msg.getMessageId(), pkgFile, listener);
+      sendFile(msg.getMessageId(), pkgFile, listener, allowCompression);
       waitForRCC(msg.getMessageId());
       return id;
    }
index 6d053f4..ab9ded7 100644 (file)
@@ -872,7 +872,7 @@ public class AgentFileManager extends ViewPart
             {
                File folder = dlg.getLocalFile();
                session.createFolderOnAgent(objectId, upladFolder.getFullName()+"/"+dlg.getRemoteFileName()); //$NON-NLS-1$
-               listFilesForFolder(folder, upladFolder.getFullName()+"/"+dlg.getRemoteFileName(), monitor); //$NON-NLS-1$
+               uploadFilesInFolder(folder, upladFolder.getFullName()+"/"+dlg.getRemoteFileName(), monitor); //$NON-NLS-1$
                
                upladFolder.setChildren(session.listAgentFiles(upladFolder, upladFolder.getFullName(), objectId));
                runInUIThread(new Runnable() {
@@ -902,14 +902,14 @@ public class AgentFileManager extends ViewPart
     * @throws NXCException
     * @throws IOException
     */
-   public void listFilesForFolder(final File folder, final String uploadFolder, final IProgressMonitor monitor) throws NXCException, IOException 
+   public void uploadFilesInFolder(final File folder, final String uploadFolder, final IProgressMonitor monitor) throws NXCException, IOException 
    {
       for(final File fileEntry : folder.listFiles())
       {
           if (fileEntry.isDirectory()) 
           {
              session.createFolderOnAgent(objectId, uploadFolder + "/" + fileEntry.getName()); //$NON-NLS-1$
-             listFilesForFolder(fileEntry, uploadFolder + "/" + fileEntry.getName(), monitor); //$NON-NLS-1$
+             uploadFilesInFolder(fileEntry, uploadFolder + "/" + fileEntry.getName(), monitor); //$NON-NLS-1$
           } 
           else 
           {
index c14cfaf..4ac68cf 100644 (file)
@@ -142,7 +142,7 @@ NXCPMessage::NXCPMessage(NXCP_MESSAGE *msg, int version)
    if (m_flags & MF_BINARY)
    {
       m_dataSize = (size_t)ntohl(msg->numFields);
-      if ((m_flags & MF_COMPRESSED) && (m_version >= 4))
+      if ((m_flags & MF_COMPRESSED) && !(m_flags & MF_STREAM) && (m_version >= 4))
       {
          m_flags &= ~MF_COMPRESSED; // clear "compressed" flag so it will not be mistakenly re-sent
 
@@ -894,7 +894,7 @@ NXCP_MESSAGE *NXCPMessage::createMessage(bool allowCompression) const
    }
 
    // Compress message payload if requested. Compression supported starting with NXCP version 4.
-   if ((m_version >= 4) && allowCompression && (size > 128) && !(m_flags & MF_COMPRESSED_STREAM))
+   if ((m_version >= 4) && allowCompression && (size > 128) && !(m_flags & MF_STREAM))
    {
       z_stream stream;
       stream.zalloc = Z_NULL;
index f78bf67..19edb46 100644 (file)
@@ -699,7 +699,7 @@ BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 id, const TCHA
          pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE + 8 + ((compressor != NULL) ? compressor->compressBufferSize(FILE_BUFFER_SIZE) + 4 : FILE_BUFFER_SIZE));
                        pMsg->id = htonl(id);
                        pMsg->code = htons(CMD_FILE_DATA);
-         pMsg->flags = htons(MF_BINARY | ((compressionMethod != NXCP_STREAM_COMPRESSION_NONE) ? MF_COMPRESSED_STREAM : 0));
+         pMsg->flags = htons(MF_BINARY | MF_STREAM | ((compressionMethod != NXCP_STREAM_COMPRESSION_NONE) ? MF_COMPRESSED : 0));
 
                        while(true)
                        {
index c319f5e..efbd9e8 100644 (file)
@@ -13829,6 +13829,7 @@ void ClientSession::uploadUserFileToAgent(NXCPMessage *request)
                   if (rcc == RCC_SUCCESS)
                   {
                      response->setCode(CMD_REQUEST_COMPLETED);
+                     response->setField(VID_ENABLE_COMPRESSION, conn->isCompressionAllowed());
                      responseMessage = response;
 
                      //Add line in audit log
index 6f5659a..4659438 100644 (file)
@@ -543,9 +543,10 @@ public:
 
    bool connect(RSA *pServerKey = NULL, BOOL bVerbose = FALSE, UINT32 *pdwError = NULL, UINT32 *pdwSocketError = NULL, UINT64 serverId = 0);
    void disconnect();
-   bool isConnected() { return m_isConnected; }
-       int getProtocolVersion() { return m_nProtocolVersion; }
+   bool isConnected() const { return m_isConnected; }
+       int getProtocolVersion() const { return m_nProtocolVersion; }
        SOCKET getSocket() { return m_hSocket; }
+       bool isCompressionAllowed() const { return m_allowCompression && (m_nProtocolVersion >= 4); }
 
    bool sendMessage(NXCPMessage *pMsg);
    bool sendRawMessage(NXCP_MESSAGE *pMsg);