implemented message compression in NXCP; compression is used on agent connections...
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 9 Feb 2017 10:23:42 +0000 (12:23 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 9 Feb 2017 10:23:42 +0000 (12:23 +0200)
24 files changed:
ChangeLog
include/netxmsdb.h
include/nms_cscp.h
include/nxcpapi.h
sql/setup.in
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/agent/core/snmptrapproxy.cpp
src/java/client/netxms-base/src/main/java/org/netxms/base/NXCPCodes.java
src/java/client/netxms-client/src/main/java/org/netxms/client/NXCObjectModificationData.java
src/java/client/netxms-client/src/main/java/org/netxms/client/NXCSession.java
src/java/client/netxms-client/src/main/java/org/netxms/client/constants/AgentCompressionMode.java [new file with mode: 0644]
src/java/client/netxms-client/src/main/java/org/netxms/client/objects/AbstractNode.java
src/java/client/netxms-client/src/test/java/org/netxms/client/PStorageTest.java
src/java/netxms-eclipse/ObjectManager/src/org/netxms/ui/eclipse/objectmanager/propertypages/Agent.java
src/libnetxms/message.cpp
src/server/core/node.cpp
src/server/core/session.cpp
src/server/include/nms_objects.h
src/server/include/nxsrvapi.h
src/server/libnxsrv/agent.cpp
src/server/tools/nxdbmgr/upgrade.cpp
src/snmp/libnxsnmp/zfile.cpp
webui/webapp/ObjectManager/src/org/netxms/ui/eclipse/objectmanager/propertypages/Agent.java

index 7062bdf..33bf98d 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -13,7 +13,8 @@
 - Container autobind mode and script can be set from NXSL
 - Events generated when interface expected state changed
 - Situations functionality is replaced with persistent storage that is included in each execution environment
-- Fixed issues: NX-60, NX-630, NX-916, NX-1151, NX-1161
+- Compression support in communication protocol
+- Fixed issues: NX-60, NX-630, NX-916, NX-1151, NX-1161, NX-1171
 
 
 *
index 84c103e..9b62f7b 100644 (file)
@@ -23,6 +23,6 @@
 #ifndef _netxmsdb_h
 #define _netxmsdb_h
 
-#define DB_FORMAT_VERSION   434
+#define DB_FORMAT_VERSION   435
 
 #endif
index 7e0cd43..1a6d65c 100644 (file)
@@ -26,7 +26,7 @@
 /**
  * Constants
  */
-#define NXCP_VERSION                   3
+#define NXCP_VERSION                   4
 
 #define SERVER_LISTEN_PORT_FOR_CLIENTS 4701
 #define SERVER_LISTEN_PORT_FOR_MOBILES 4747
@@ -1163,6 +1163,8 @@ typedef struct
 #define VID_NUM_PSTORAGE            ((UINT32)567)
 #define VID_COMMAND_ID              ((UINT32)568)
 #define VID_HOSTNAME                ((UINT32)569)
+#define VID_ENABLE_COMPRESSION      ((UINT32)570)
+#define VID_AGENT_COMPRESSION_MODE  ((UINT32)571)
 
 // Base variabe for single threshold in message
 #define VID_THRESHOLD_BASE          ((UINT32)0x00800000)
index 6b614d2..d6b5a5a 100644 (file)
@@ -72,7 +72,7 @@ public:
    NXCPMessage(NXCP_MESSAGE *rawMag, int version = NXCP_VERSION);
    ~NXCPMessage();
 
-   NXCP_MESSAGE *createMessage() const;
+   NXCP_MESSAGE *createMessage(bool allowCompression = false) const;
 
    UINT16 getCode() const { return m_code; }
    void setCode(UINT16 code) { m_code = code; }
index a82855f..80280e1 100644 (file)
@@ -39,7 +39,8 @@ INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DBLockInfo','',0,0,'S','');
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DBLockPID','0',0,0,'I','');
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DBLockStatus','UNLOCKED',0,1,'S','');
-INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultAgentCacheMode','2',1,1,'I','');
+INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultAgentCacheMode','2',1,1,'C','Default agent cache mode');
+INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultAgentProtocolCompressionMode','1',1,0,'C','Default agent protocol compression mode');
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultConsoleDateFormat','dd.MM.yyyy',1,0,'S','Default date display format for GUI.');
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultConsoleShortTimeFormat','HH:mm',1,0,'S','Default short time display format for GUI.');
 INSERT INTO config (var_name,var_value,is_visible,need_server_restart,data_type,description) VALUES ('DefaultConsoleTimeFormat','HH:mm:ss',1,0,'S','Default long time display format for GUI.');
@@ -206,6 +207,10 @@ INSERT INTO config_values (var_name,var_value) VALUES ('SNMPTrapPort','65535');
 INSERT INTO config_values (var_name,var_value) VALUES ('SyslogListenPort','65535');
 INSERT INTO config_values (var_name,var_value) VALUES ('XMPPPort','65535');
 INSERT INTO config_values (var_name,var_value) VALUES ('AllowedCiphers','63');
+INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentCacheMode','1','On');
+INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentCacheMode','2','Off');
+INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentProtocolCompressionMode','1','Enabled');
+INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentProtocolCompressionMode','2','Disabled');
 INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultInterfaceExpectedState','0','UP');
 INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultInterfaceExpectedState','1','DOWN');
 INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultInterfaceExpectedState','2','IGNORE');
index 7e96235..f7c57ca 100644 (file)
@@ -347,7 +347,8 @@ private:
    bool m_ipv6Aware;
    bool m_bulkReconciliationSupported;
    HashMap<UINT32, DownloadFileInfo> m_downloadFileMap;
-   StreamCompressor *m_compressor;
+   StreamCompressor *m_compressor;  // stream compressor for file transfer
+   bool m_allowCompression;   // allow compression for structured messages
        NXCPEncryptionContext *m_pCtx;
    time_t m_ts;               // Last activity timestamp
    SOCKET m_hProxySocket;     // Socket for proxy connection
@@ -388,7 +389,7 @@ public:
    void disconnect();
 
    virtual bool sendMessage(NXCPMessage *msg);
-   virtual void postMessage(NXCPMessage *msg) { if (!m_disconnected) m_sendQueue->put(msg->createMessage()); }
+   virtual void postMessage(NXCPMessage *msg) { if (!m_disconnected) m_sendQueue->put(msg->createMessage(m_allowCompression)); }
    virtual bool sendRawMessage(NXCP_MESSAGE *msg);
    virtual void postRawMessage(NXCP_MESSAGE *msg) { if (!m_disconnected) m_sendQueue->put(nx_memdup(msg, ntohl(msg->size))); }
        virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset);
index 1c127f0..32b69ce 100644 (file)
@@ -132,6 +132,7 @@ CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool mas
    m_bulkReconciliationSupported = false;
    m_disconnected = false;
    m_compressor = NULL;
+   m_allowCompression = false;
    m_pCtx = NULL;
    m_ts = time(NULL);
    m_socketWriteMutex = MutexCreate();
@@ -339,13 +340,13 @@ void CommSession::readThread()
 
             if (msg->getCode() == CMD_GET_NXCP_CAPS)
             {
-               NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
-               pMsg->id = htonl(msg->getId());
-               pMsg->code = htons((WORD)CMD_NXCP_CAPS);
-               pMsg->flags = htons(MF_CONTROL);
-               pMsg->numFields = htonl(NXCP_VERSION << 24);
-               pMsg->size = htonl(NXCP_HEADER_SIZE);
-               sendRawMessage(pMsg, m_pCtx);
+               NXCP_MESSAGE *response = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
+               response->id = htonl(msg->getId());
+               response->code = htons((WORD)CMD_NXCP_CAPS);
+               response->flags = htons(MF_CONTROL);
+               response->numFields = htonl(NXCP_VERSION << 24);
+               response->size = htonl(NXCP_HEADER_SIZE);
+               sendRawMessage(response, m_pCtx);
             }
             delete msg;
          }
@@ -463,13 +464,18 @@ bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
    }
 
    bool success = true;
-   TCHAR buffer[128];
-   debugPrintf(6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(msg->code), buffer), ntohl(msg->size));
-   if (nxlog_get_debug_level() >= 8)
+   if (nxlog_get_debug_level() >= 6)
    {
-      String msgDump = NXCPMessage::dump(msg, NXCP_VERSION);
-      debugPrintf(8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
+      TCHAR buffer[128];
+      debugPrintf(6, _T("Sending message %s (size %d; %s)"), NXCPMessageCodeName(ntohs(msg->code), buffer), ntohl(msg->size),
+                  ntohs(msg->flags) & MF_COMPRESSED ? _T("compressed") : _T("uncompressed"));
+      if (nxlog_get_debug_level() >= 8)
+      {
+         String msgDump = NXCPMessage::dump(msg, NXCP_VERSION);
+         debugPrintf(8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
+      }
    }
+
    if ((ctx != NULL) && (ctx != PROXY_ENCRYPTION_CTX))
    {
       NXCP_ENCRYPTED_MESSAGE *enMsg = ctx->encryptMessage(msg);
@@ -489,8 +495,12 @@ bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
          success = false;
       }
    }
+
        if (!success)
+       {
+      TCHAR buffer[128];
           debugPrintf(6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(msg->code), buffer), ntohl(msg->size));
+       }
    free(msg);
    return success;
 }
@@ -503,7 +513,7 @@ bool CommSession::sendMessage(NXCPMessage *msg)
    if (m_disconnected)
       return false;
 
-   return sendRawMessage(msg->createMessage(), m_pCtx);
+   return sendRawMessage(msg->createMessage(m_allowCompression), m_pCtx);
 }
 
 /**
@@ -677,7 +687,12 @@ void CommSession::processingThread()
                // Servers before 2.0 use VID_ENABLED
                m_ipv6Aware = request->isFieldExist(VID_IPV6_SUPPORT) ? request->getFieldAsBoolean(VID_IPV6_SUPPORT) : request->getFieldAsBoolean(VID_ENABLED);
                m_bulkReconciliationSupported = request->getFieldAsBoolean(VID_BULK_RECONCILIATION);
+               m_allowCompression = request->getFieldAsBoolean(VID_ENABLE_COMPRESSION);
                response.setField(VID_RCC, ERR_SUCCESS);
+               debugPrintf(1, _T("Server capabilities: IPv6: %s; bulk reconciliation: %s; compression: %s"),
+                           m_ipv6Aware ? _T("yes") : _T("no"),
+                           m_bulkReconciliationSupported ? _T("yes") : _T("no"),
+                           m_allowCompression ? _T("yes") : _T("no"));
                break;
             case CMD_SET_SERVER_ID:
                m_serverId = request->getFieldAsUInt64(VID_SERVER_ID);
index c4dd99a..164931d 100644 (file)
@@ -162,7 +162,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapSender(void *pArg)
 
       if (g_dwFlags & AF_SUBAGENT_LOADER)
       {
-         sent = SendRawMessageToMasterAgent(msg->createMessage());
+         sent = SendMessageToMasterAgent(msg);
       }
       else
       {
@@ -173,7 +173,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapSender(void *pArg)
             {
                if (g_pSessionList[i]->canAcceptTraps())
                {
-                  g_pSessionList[i]->sendRawMessage(msg->createMessage());
+                  g_pSessionList[i]->sendMessage(msg);
                   sent = true;
                }
             }
index 708a325..9865ac0 100644 (file)
@@ -950,6 +950,8 @@ public class NXCPCodes
        public static final long VID_NUM_PSTORAGE = 567;
    public static final long VID_COMMAND_ID = 568;
    public static final long VID_HOSTNAME = 569;
+   public static final long VID_ENABLE_COMPRESSION = 570;
+   public static final long VID_AGENT_COMPRESSION_MODE = 571;
 
        public static final long VID_ACL_USER_BASE = 0x00001000L;
        public static final long VID_ACL_USER_LAST = 0x00001FFFL;
index ec7adb0..ea7f7f5 100644 (file)
@@ -33,6 +33,7 @@ import org.netxms.base.GeoLocation;
 import org.netxms.base.InetAddressEx;
 import org.netxms.base.PostalAddress;
 import org.netxms.client.constants.AgentCacheMode;
+import org.netxms.client.constants.AgentCompressionMode;
 import org.netxms.client.constants.ObjectStatus;
 import org.netxms.client.dashboards.DashboardElement;
 import org.netxms.client.datacollection.ConditionDciInfo;
@@ -48,73 +49,74 @@ import org.netxms.client.objects.ClusterResource;
 public class NXCObjectModificationData
 {
        // Modification flags
-       public static final int NAME                 = 1;
-       public static final int ACL                  = 2;
-       public static final int CUSTOM_ATTRIBUTES    = 3;
-       public static final int AUTOBIND_FILTER      = 4;
-       public static final int LINK_COLOR           = 5;
-       public static final int POLICY_CONFIG        = 6;
-       public static final int VERSION              = 7;
-       public static final int DESCRIPTION          = 8;
-       public static final int AGENT_PORT           = 9;
-       public static final int AGENT_AUTH           = 10;
-       public static final int SNMP_VERSION         = 11;
-       public static final int SNMP_AUTH            = 12;
-       public static final int AGENT_PROXY          = 13;
-       public static final int SNMP_PROXY           = 14;
-       public static final int TRUSTED_NODES        = 15;
-       public static final int GEOLOCATION          = 16;
-       public static final int PRIMARY_IP           = 17;
-       public static final int SNMP_PORT            = 18;
-       public static final int MAP_LAYOUT           = 19;
-       public static final int MAP_BACKGROUND       = 20;
-       public static final int MAP_CONTENT          = 21;
-       public static final int IMAGE                = 22;
-       public static final int ICMP_PROXY           = 23;
-       public static final int COLUMN_COUNT         = 24;
-       public static final int DASHBOARD_ELEMENTS   = 25;
-       public static final int SCRIPT               = 26;
-       public static final int ACTIVATION_EVENT     = 27;
-       public static final int DEACTIVATION_EVENT   = 28;
-       public static final int SOURCE_OBJECT        = 29;
-       public static final int ACTIVE_STATUS        = 30;
-       public static final int INACTIVE_STATUS      = 31;
-       public static final int DCI_LIST             = 32;
-       public static final int DRILL_DOWN_OBJECT_ID = 33;
-       public static final int IP_ADDRESS           = 34;
-       public static final int IP_PROTOCOL          = 35;
-       public static final int IP_PORT              = 36;
-       public static final int SERVICE_TYPE         = 37;
-       public static final int POLLER_NODE          = 38;
-       public static final int REQUIRED_POLLS       = 39;
-       public static final int REQUEST              = 40;
-       public static final int RESPONSE             = 41;
-       public static final int OBJECT_FLAGS         = 42;
-       public static final int IFXTABLE_POLICY      = 43;
-       public static final int REPORT_DEFINITION    = 44;
-       public static final int CLUSTER_RESOURCES    = 45;
-       public static final int PRIMARY_NAME         = 46;
-       public static final int STATUS_CALCULATION   = 47;
-       public static final int CLUSTER_NETWORKS     = 48;
-       public static final int EXPECTED_STATE       = 49;
-       public static final int CONNECTION_ROUTING   = 50;
-       public static final int DISCOVERY_RADIUS     = 51;
-       public static final int HEIGHT               = 52;
-       public static final int FILTER               = 53;
-   public static final int PEER_GATEWAY         = 54;
-   public static final int VPN_NETWORKS         = 55;
-   public static final int POSTAL_ADDRESS       = 56;
-   public static final int AGENT_CACHE_MODE     = 57;
-   public static final int MAPOBJ_DISP_MODE     = 58;
-   public static final int RACK_PLACEMENT       = 59;
-   public static final int DASHBOARD_LIST       = 60;
-   public static final int RACK_NUMB_SCHEME     = 61;
-   public static final int CONTROLLER_ID        = 62;
-   public static final int CHASSIS_ID           = 63;
-   public static final int SSH_PROXY            = 64;
-   public static final int SSH_LOGIN            = 65;
-   public static final int SSH_PASSWORD         = 66;
-   public static final int ZONE_PROXY           = 67;
+       public static final int NAME                   = 1;
+       public static final int ACL                    = 2;
+       public static final int CUSTOM_ATTRIBUTES      = 3;
+       public static final int AUTOBIND_FILTER        = 4;
+       public static final int LINK_COLOR             = 5;
+       public static final int POLICY_CONFIG          = 6;
+       public static final int VERSION                = 7;
+       public static final int DESCRIPTION            = 8;
+       public static final int AGENT_PORT             = 9;
+       public static final int AGENT_AUTH             = 10;
+       public static final int SNMP_VERSION           = 11;
+       public static final int SNMP_AUTH              = 12;
+       public static final int AGENT_PROXY            = 13;
+       public static final int SNMP_PROXY             = 14;
+       public static final int TRUSTED_NODES          = 15;
+       public static final int GEOLOCATION            = 16;
+       public static final int PRIMARY_IP             = 17;
+       public static final int SNMP_PORT              = 18;
+       public static final int MAP_LAYOUT             = 19;
+       public static final int MAP_BACKGROUND         = 20;
+       public static final int MAP_CONTENT            = 21;
+       public static final int IMAGE                  = 22;
+       public static final int ICMP_PROXY             = 23;
+       public static final int COLUMN_COUNT           = 24;
+       public static final int DASHBOARD_ELEMENTS     = 25;
+       public static final int SCRIPT                 = 26;
+       public static final int ACTIVATION_EVENT       = 27;
+       public static final int DEACTIVATION_EVENT     = 28;
+       public static final int SOURCE_OBJECT          = 29;
+       public static final int ACTIVE_STATUS          = 30;
+       public static final int INACTIVE_STATUS        = 31;
+       public static final int DCI_LIST               = 32;
+       public static final int DRILL_DOWN_OBJECT_ID   = 33;
+       public static final int IP_ADDRESS             = 34;
+       public static final int IP_PROTOCOL            = 35;
+       public static final int IP_PORT                = 36;
+       public static final int SERVICE_TYPE           = 37;
+       public static final int POLLER_NODE            = 38;
+       public static final int REQUIRED_POLLS         = 39;
+       public static final int REQUEST                = 40;
+       public static final int RESPONSE               = 41;
+       public static final int OBJECT_FLAGS           = 42;
+       public static final int IFXTABLE_POLICY        = 43;
+       public static final int REPORT_DEFINITION      = 44;
+       public static final int CLUSTER_RESOURCES      = 45;
+       public static final int PRIMARY_NAME           = 46;
+       public static final int STATUS_CALCULATION     = 47;
+       public static final int CLUSTER_NETWORKS       = 48;
+       public static final int EXPECTED_STATE         = 49;
+       public static final int CONNECTION_ROUTING     = 50;
+       public static final int DISCOVERY_RADIUS       = 51;
+       public static final int HEIGHT                 = 52;
+       public static final int FILTER                 = 53;
+   public static final int PEER_GATEWAY           = 54;
+   public static final int VPN_NETWORKS           = 55;
+   public static final int POSTAL_ADDRESS         = 56;
+   public static final int AGENT_CACHE_MODE       = 57;
+   public static final int MAPOBJ_DISP_MODE       = 58;
+   public static final int RACK_PLACEMENT         = 59;
+   public static final int DASHBOARD_LIST         = 60;
+   public static final int RACK_NUMB_SCHEME       = 61;
+   public static final int CONTROLLER_ID          = 62;
+   public static final int CHASSIS_ID             = 63;
+   public static final int SSH_PROXY              = 64;
+   public static final int SSH_LOGIN              = 65;
+   public static final int SSH_PASSWORD           = 66;
+   public static final int ZONE_PROXY             = 67;
+   public static final int AGENT_COMPRESSION_MODE = 68;
        
        private Set<Integer> fieldSet;
        private long objectId;
@@ -193,6 +195,7 @@ public class NXCObjectModificationData
        private List<InetAddressEx> remoteNetworks;
        private PostalAddress postalAddress;
        private AgentCacheMode agentCacheMode;
+   private AgentCompressionMode agentCompressionMode;
        private MapObjectDisplayMode mapObjectDisplayMode;
        private long rackId;
        private UUID rackImage;
@@ -1051,7 +1054,9 @@ public class NXCObjectModificationData
        }
 
        /**
-        * @return the nodeFlags
+        * Get object flags
+        * 
+        * @return object flags
         */
        public int getObjectFlags()
        {
@@ -1059,18 +1064,31 @@ public class NXCObjectModificationData
        }
 
    /**
-    * @return the objectFlagsMask
+    * Get object flags mask
+    * 
+    * @return the object flags mask
     */
    public int getObjectFlagsMask()
    {
       return objectFlagsMask;
    }
 
+       /**
+        * Set object flags
+        * 
+        * @param objectFlags
+        */
        public void setObjectFlags(int objectFlags)
        {
           setObjectFlags(objectFlags, 0xFFFFFFFF);
        }
 
+   /**
+    * Set selected object flags. Only flags with corresponding mask bits set will be changed.
+    * 
+    * @param objectFlags new object flags
+    * @param objectFlagsMask object flag mask
+    */
    public void setObjectFlags(int objectFlags, int objectFlagsMask)
    {
       this.objectFlags = objectFlags;
@@ -1476,7 +1494,9 @@ public class NXCObjectModificationData
    }
 
    /**
-    * @return the agentCacheMode
+    * Get agent cache mode
+    * 
+    * @return agent cache mode
     */
    public AgentCacheMode getAgentCacheMode()
    {
@@ -1484,7 +1504,9 @@ public class NXCObjectModificationData
    }
 
    /**
-    * @param agentCacheMode the agentCacheMode to set
+    * Set agent cache mode
+    * 
+    * @param agentCacheMode new agent cache mode
     */
    public void setAgentCacheMode(AgentCacheMode agentCacheMode)
    {
@@ -1493,6 +1515,27 @@ public class NXCObjectModificationData
    }
 
    /**
+    * Get agent compression mode
+    * 
+    * @return agent compression mode
+    */
+   public AgentCompressionMode getAgentCompressionMode()
+   {
+      return agentCompressionMode;
+   }
+
+   /**
+    * Set agent compression mode
+    * 
+    * @param agentCompressionMode new agent compression mode
+    */
+   public void setAgentCompressionMode(AgentCompressionMode agentCompressionMode)
+   {
+      this.agentCompressionMode = agentCompressionMode;
+      fieldSet.add(AGENT_COMPRESSION_MODE);
+   }
+
+   /**
     * @return the mapObjectDisplayMode
     */
    public MapObjectDisplayMode getMapObjectDisplayMode()
index 41b0953..92b78ce 100644 (file)
@@ -4791,6 +4791,11 @@ public class NXCSession
          msg.setFieldInt16(NXCPCodes.VID_AGENT_CACHE_MODE, data.getAgentCacheMode().getValue());
       }
       
+      if (data.isFieldSet(NXCObjectModificationData.AGENT_COMPRESSION_MODE))
+      {
+         msg.setFieldInt16(NXCPCodes.VID_AGENT_COMPRESSION_MODE, data.getAgentCompressionMode().getValue());
+      }
+      
       if (data.isFieldSet(NXCObjectModificationData.MAPOBJ_DISP_MODE))
       {
          msg.setFieldInt16(NXCPCodes.VID_DISPLAY_MODE, data.getMapObjectDisplayMode().getValue());
diff --git a/src/java/client/netxms-client/src/main/java/org/netxms/client/constants/AgentCompressionMode.java b/src/java/client/netxms-client/src/main/java/org/netxms/client/constants/AgentCompressionMode.java
new file mode 100644 (file)
index 0000000..74370df
--- /dev/null
@@ -0,0 +1,75 @@
+/**
+ * NetXMS - open source network management system
+ * Copyright (C) 2003-2017 Victor Kirhenshtein
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+package org.netxms.client.constants;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.netxms.base.Logger;
+
+/**
+ * Agent protocol compression mode
+ */
+public enum AgentCompressionMode
+{
+   DEFAULT(0),
+   ENABLED(1),
+   DISABLED(2);
+
+   private int value;
+   private static Map<Integer, AgentCompressionMode> lookupTable = new HashMap<Integer, AgentCompressionMode>();
+
+   static
+   {
+      for(AgentCompressionMode element : AgentCompressionMode.values())
+      {
+         lookupTable.put(element.value, element);
+      }
+   }
+
+   /**
+    * @param value
+    */
+   private AgentCompressionMode(int value)
+   {
+      this.value = value;
+   }
+
+   /**
+    * @return
+    */
+   public int getValue()
+   {
+      return value;
+   }
+
+   /**
+    * @param value
+    * @return
+    */
+   public static AgentCompressionMode getByValue(int value)
+   {
+      final AgentCompressionMode element = lookupTable.get(value);
+      if (element == null)
+      {
+         Logger.warning(AgentCompressionMode.class.getName(), "Unknown element " + value);
+         return DEFAULT; // fallback
+      }
+      return element;
+   }
+}
index 2146060..8969a8f 100644 (file)
@@ -26,6 +26,7 @@ import org.netxms.base.NXCPMessage;
 import org.netxms.client.MacAddress;
 import org.netxms.client.NXCSession;
 import org.netxms.client.constants.AgentCacheMode;
+import org.netxms.client.constants.AgentCompressionMode;
 import org.netxms.client.constants.NodeType;
 
 /**
@@ -103,6 +104,7 @@ public abstract class AbstractNode extends DataCollectionTarget implements RackE
        protected int agentPort;
        protected int agentAuthMethod;
        protected AgentCacheMode agentCacheMode;
+       protected AgentCompressionMode agentCompressionMode;
        protected String agentSharedSecret;
        protected String agentVersion;
        protected String platformName;
@@ -174,6 +176,7 @@ public abstract class AbstractNode extends DataCollectionTarget implements RackE
                agentAuthMethod = msg.getFieldAsInt32(NXCPCodes.VID_AUTH_METHOD);
                agentSharedSecret = msg.getFieldAsString(NXCPCodes.VID_SHARED_SECRET);
       agentCacheMode = AgentCacheMode.getByValue(msg.getFieldAsInt32(NXCPCodes.VID_AGENT_CACHE_MODE));
+      agentCompressionMode = AgentCompressionMode.getByValue(msg.getFieldAsInt32(NXCPCodes.VID_AGENT_COMPRESSION_MODE));
                agentVersion = msg.getFieldAsString(NXCPCodes.VID_AGENT_VERSION);
                platformName = msg.getFieldAsString(NXCPCodes.VID_PLATFORM_NAME);
                snmpAuthName = msg.getFieldAsString(NXCPCodes.VID_SNMP_AUTH_OBJECT);
@@ -320,6 +323,14 @@ public abstract class AbstractNode extends DataCollectionTarget implements RackE
    }
 
    /**
+    * @return the agentCompressionMode
+    */
+   public AgentCompressionMode getAgentCompressionMode()
+   {
+      return agentCompressionMode;
+   }
+
+   /**
         * @return the agentVersion
         */
        public String getAgentVersion()
index 0e0049b..56da2f5 100644 (file)
@@ -26,13 +26,16 @@ import org.eclipse.swt.events.SelectionListener;
 import org.eclipse.swt.layout.FormAttachment;
 import org.eclipse.swt.layout.FormData;
 import org.eclipse.swt.layout.FormLayout;
+import org.eclipse.swt.layout.GridLayout;
 import org.eclipse.swt.widgets.Button;
 import org.eclipse.swt.widgets.Combo;
 import org.eclipse.swt.widgets.Composite;
 import org.eclipse.swt.widgets.Control;
+import org.eclipse.swt.widgets.Group;
 import org.eclipse.ui.dialogs.PropertyPage;
 import org.netxms.client.NXCObjectModificationData;
 import org.netxms.client.NXCSession;
+import org.netxms.client.constants.AgentCompressionMode;
 import org.netxms.client.objects.AbstractNode;
 import org.netxms.ui.eclipse.jobs.ConsoleJob;
 import org.netxms.ui.eclipse.objectbrowser.widgets.ObjectSelector;
@@ -53,6 +56,9 @@ public class Agent extends PropertyPage
    private Combo agentAuthMethod;
    private Button agentForceEncryption;
    private ObjectSelector agentProxy;
+   private Button radioAgentCompressionDefault;
+   private Button radioAgentCompressionEnabled;
+   private Button radioAgentCompressionDisabled;
 
    /* (non-Javadoc)
     * @see org.eclipse.jface.preference.PreferencePage#createContents(org.eclipse.swt.widgets.Composite)
@@ -127,10 +133,50 @@ public class Agent extends PropertyPage
       agentSharedSecret.setLayoutData(fd);
       agentSharedSecret.getTextControl().setEnabled(node.getAgentAuthMethod() != AbstractNode.AGENT_AUTH_NONE);
       
+      /* agent compression */
+      Group agentCompressionGroup = new Group(dialogArea, SWT.NONE);
+      agentCompressionGroup.setText("Protocol compression mode");
+      GridLayout layout = new GridLayout();
+      layout.horizontalSpacing = WidgetHelper.DIALOG_SPACING;
+      layout.numColumns = 3;
+      layout.makeColumnsEqualWidth = true;
+      agentCompressionGroup.setLayout(layout);
+      fd = new FormData();
+      fd.left = new FormAttachment(0, 0);
+      fd.right = new FormAttachment(100, 0);
+      fd.top = new FormAttachment(agentSharedSecret, 0, SWT.BOTTOM);
+      agentCompressionGroup.setLayoutData(fd);
+      
+      radioAgentCompressionDefault = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionDefault.setText("Default");
+      radioAgentCompressionDefault.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.DEFAULT);
+
+      radioAgentCompressionEnabled = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionEnabled.setText("Enabled");
+      radioAgentCompressionEnabled.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.ENABLED);
+
+      radioAgentCompressionDisabled = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionDisabled.setText("Disabled");
+      radioAgentCompressionDisabled.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.DISABLED);
+
       return dialogArea;
    }
 
    /**
+    * Collect agent compression mode from radio buttons
+    * 
+    * @return
+    */
+   private AgentCompressionMode collectAgentCompressionMode()
+   {
+      if (radioAgentCompressionEnabled.getSelection())
+         return AgentCompressionMode.ENABLED;
+      if (radioAgentCompressionDisabled.getSelection())
+         return AgentCompressionMode.DISABLED;
+      return AgentCompressionMode.DEFAULT;
+   }
+   
+   /**
     * Apply changes
     * 
     * @param isApply true if update operation caused by "Apply" button
@@ -156,14 +202,8 @@ public class Agent extends PropertyPage
       md.setAgentProxy(agentProxy.getObjectId());
       md.setAgentAuthMethod(agentAuthMethod.getSelectionIndex());
       md.setAgentSecret(agentSharedSecret.getText());
-      
-      /* TODO: sync in some way with "Polling" page */
-      int flags = node.getFlags();
-      if (agentForceEncryption.getSelection())
-         flags |= AbstractNode.NF_FORCE_ENCRYPTION;
-      else
-         flags &= ~AbstractNode.NF_FORCE_ENCRYPTION;
-      md.setObjectFlags(flags);
+      md.setAgentCompressionMode(collectAgentCompressionMode());
+      md.setObjectFlags(agentForceEncryption.getSelection() ? AbstractNode.NF_FORCE_ENCRYPTION : 0, AbstractNode.NF_FORCE_ENCRYPTION);
 
       final NXCSession session = (NXCSession)ConsoleSharedData.getSession();
       new ConsoleJob(String.format("Updating agent communication settings for node %s", node.getObjectName()), null, Activator.PLUGIN_ID, null) {
index 599e68b..00c62b6 100644 (file)
@@ -1,7 +1,7 @@
 /* 
 ** NetXMS - Network Management System
 ** NetXMS Foundation Library
-** Copyright (C) 2003-2016 Victor Kirhenshtein
+** Copyright (C) 2003-2017 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU Lesser General Public License as published
@@ -23,6 +23,7 @@
 #include "libnetxms.h"
 #include <nxcpapi.h>
 #include <uthash.h>
+#include <zlib.h>
 
 /**
  * Calculate field size
@@ -148,23 +149,60 @@ NXCPMessage::NXCPMessage(NXCP_MESSAGE *msg, int version)
       m_data = NULL;
       m_dataSize = 0;
 
+      BYTE *msgData;
+      size_t msgDataSize;
+      if ((m_flags & MF_COMPRESSED) && (m_version >= 4))
+      {
+         m_flags &= ~MF_COMPRESSED; // clear "compressed" flag so it will not be mistakenly re-sent
+         msgDataSize = (size_t)ntohl(*((UINT32 *)((BYTE *)msg + NXCP_HEADER_SIZE))) - NXCP_HEADER_SIZE;
+
+         z_stream stream;
+         stream.zalloc = Z_NULL;
+         stream.zfree = Z_NULL;
+         stream.opaque = Z_NULL;
+         stream.avail_in = (size_t)ntohl(msg->size) - NXCP_HEADER_SIZE - 4;
+         stream.next_in = (BYTE *)msg + NXCP_HEADER_SIZE + 4;
+         if (inflateInit(&stream) != Z_OK)
+         {
+            nxlog_debug(6, _T("NXCPMessage: inflateInit() failed"));
+            return;
+         }
+
+         msgData = (BYTE *)malloc(msgDataSize);
+         stream.next_out = msgData;
+         stream.avail_out = msgDataSize;
+
+         if (inflate(&stream, Z_FINISH) != Z_STREAM_END)
+         {
+            inflateEnd(&stream);
+            TCHAR buffer[256];
+            nxlog_debug(6, _T("NXCPMessage: failed to decompress message %s with ID %d"), NXCPMessageCodeName(m_code, buffer), m_id);
+            return;
+         }
+         inflateEnd(&stream);
+      }
+      else
+      {
+         msgData = (BYTE *)msg + NXCP_HEADER_SIZE;
+         msgDataSize = (size_t)ntohl(msg->size) - NXCP_HEADER_SIZE;
+      }
+
       int fieldCount = (int)ntohl(msg->numFields);
-      size_t size = (size_t)ntohl(msg->size);
-      size_t pos = NXCP_HEADER_SIZE;
+      size_t pos = 0;
       for(int f = 0; f < fieldCount; f++)
       {
-         NXCP_MESSAGE_FIELD *field = (NXCP_MESSAGE_FIELD *)(((BYTE *)msg) + pos);
+         NXCP_MESSAGE_FIELD *field = (NXCP_MESSAGE_FIELD *)(msgData + pos);
 
          // Validate position inside message
-         if (pos > size - 8)
+         if (pos > msgDataSize - 8)
             break;
-         if ((pos > size - 12) && 
+         if ((pos > msgDataSize - 12) &&
              ((field->type == NXCP_DT_STRING) || (field->type == NXCP_DT_BINARY)))
             break;
 
-         // Calculate and validate variable size
+         // Calculate and validate field size
          size_t fieldSize = CalculateFieldSize(field, true);
-         if (pos + fieldSize > size)
+         if (pos + fieldSize > msgDataSize)
             break;
 
          // Create new entry
@@ -214,6 +252,8 @@ NXCPMessage::NXCPMessage(NXCP_MESSAGE *msg, int version)
             pos += fieldSize;
       }
 
+      if (msgData != (BYTE *)msg + NXCP_HEADER_SIZE)
+         free(msgData);
    }
 }
 
@@ -723,7 +763,7 @@ uuid NXCPMessage::getFieldAsGUID(UINT32 fieldId) const
 /**
  * Build protocol message ready to be send over the wire
  */
-NXCP_MESSAGE *NXCPMessage::createMessage() const
+NXCP_MESSAGE *NXCPMessage::createMessage(bool allowCompression) const
 {
    // Calculate message size
    size_t size = NXCP_HEADER_SIZE;
@@ -818,6 +858,42 @@ NXCP_MESSAGE *NXCPMessage::createMessage() const
             field = (NXCP_MESSAGE_FIELD *)((char *)field + fieldSize);
       }
    }
+
+   // Compress message payload if requested. Compression supported starting with NXCP version 4.
+   if ((m_version >= 4) && allowCompression && (size > 128))
+   {
+      z_stream stream;
+      stream.zalloc = Z_NULL;
+      stream.zfree = Z_NULL;
+      stream.opaque = Z_NULL;
+      stream.avail_in = 0;
+      stream.next_in = Z_NULL;
+      if (deflateInit(&stream, 9) == Z_OK)
+      {
+         size_t compBufferSize = deflateBound(&stream, size - NXCP_HEADER_SIZE);
+         BYTE *compressedMsg = (BYTE *)malloc(compBufferSize + NXCP_HEADER_SIZE + 4);
+         stream.next_in = (BYTE *)msg->fields;
+         stream.avail_in = size - NXCP_HEADER_SIZE;
+         stream.next_out = compressedMsg + NXCP_HEADER_SIZE + 4;
+         stream.avail_out = compBufferSize;
+         if (deflate(&stream, Z_FINISH) == Z_STREAM_END)
+         {
+            size_t compMsgSize = compBufferSize - stream.avail_out + NXCP_HEADER_SIZE + 4;
+            // Message should be aligned to 8 bytes boundary
+            compMsgSize += (8 - (compMsgSize % 8)) & 7;
+            if (compMsgSize < size - 4)
+            {
+               memcpy(compressedMsg, msg, NXCP_HEADER_SIZE);
+               free(msg);
+               msg = (NXCP_MESSAGE *)compressedMsg;
+               msg->flags |= htons(MF_COMPRESSED);
+               memcpy((BYTE *)msg + NXCP_HEADER_SIZE, &msg->size, 4); // Save size of uncompressed message
+               msg->size = htonl((UINT32)compMsgSize);
+            }
+         }
+         deflateEnd(&stream);
+      }
+   }
    return msg;
 }
 
index c09956e..d870e5f 100644 (file)
@@ -2312,7 +2312,7 @@ bool Node::confPollAgent(UINT32 dwRqId)
    bool hasChanges = false;
 
    sendPollerMsg(dwRqId, _T("   Checking NetXMS agent...\r\n"));
-   AgentConnection *pAgentConn = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret);
+   AgentConnection *pAgentConn = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret, isAgentCompressionAllowed());
    setAgentProxy(pAgentConn);
    pAgentConn->setCommandTimeout(g_agentCommandTimeout);
    DbgPrintf(5, _T("ConfPoll(%s): checking for NetXMS agent - connecting"), m_name);
@@ -3622,7 +3622,7 @@ bool Node::connectToAgent(UINT32 *error, UINT32 *socketError, bool *newConnectio
    // Create new agent connection object if needed
    if (m_agentConnection == NULL)
    {
-      m_agentConnection = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret);
+      m_agentConnection = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret, isAgentCompressionAllowed());
       DbgPrintf(7, _T("Node::connectToAgent(%s [%d]): new agent connection created"), m_name, m_id);
    }
    else
@@ -4666,6 +4666,7 @@ void Node::fillMessageInternal(NXCPMessage *pMsg)
    pMsg->setField(VID_SSH_PASSWORD, m_sshPassword);
    pMsg->setField(VID_PORT_ROW_COUNT, m_portRowCount);
    pMsg->setField(VID_PORT_NUMBERING_SCHEME, m_portNumberingScheme);
+   pMsg->setField(VID_AGENT_COMPRESSION_MODE, m_agentCompressionMode);
 }
 
 /**
@@ -4924,6 +4925,9 @@ UINT32 Node::modifyFromMessageInternal(NXCPMessage *pRequest)
    if (pRequest->isFieldExist(VID_SSH_PASSWORD))
       pRequest->getFieldAsString(VID_SSH_PASSWORD, m_sshPassword, MAX_SSH_PASSWORD_LEN);
 
+   if (pRequest->isFieldExist(VID_AGENT_COMPRESSION_MODE))
+      m_agentCompressionMode = pRequest->getFieldAsInt16(VID_AGENT_COMPRESSION_MODE);
+
    return DataCollectionTarget::modifyFromMessageInternal(pRequest);
 }
 
@@ -5181,13 +5185,11 @@ UINT32 Node::checkNetworkService(UINT32 *pdwStatus, const InetAddress& ipAddr, i
        (!(m_dwDynamicFlags & NDF_AGENT_UNREACHABLE)) &&
        (!(m_dwDynamicFlags & NDF_UNREACHABLE)))
    {
-      AgentConnection *pConn;
-
-      pConn = createAgentConnection();
-      if (pConn != NULL)
+      AgentConnection *conn = createAgentConnection();
+      if (conn != NULL)
       {
-         dwError = pConn->checkNetworkService(pdwStatus, ipAddr, iServiceType, wPort, wProto, pszRequest, pszResponse, responseTime);
-         pConn->decRefCount();
+         dwError = conn->checkNetworkService(pdwStatus, ipAddr, iServiceType, wPort, wProto, pszRequest, pszResponse, responseTime);
+         conn->decRefCount();
       }
    }
    return dwError;
@@ -5243,7 +5245,7 @@ AgentConnectionEx *Node::createAgentConnection(bool sendServerId)
        (m_dwDynamicFlags & NDF_UNREACHABLE))
       return NULL;
 
-   AgentConnectionEx *conn = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret);
+   AgentConnectionEx *conn = new AgentConnectionEx(m_id, m_ipAddress, m_agentPort, m_agentAuthMethod, m_szSharedSecret, isAgentCompressionAllowed());
    setAgentProxy(conn);
    conn->setCommandTimeout(g_agentCommandTimeout);
    if (!conn->connect(g_pServerKey, FALSE, NULL, NULL, sendServerId ? g_serverId : 0))
@@ -7737,3 +7739,13 @@ void Node::setSshCredentials(const TCHAR *login, const TCHAR *password)
    setModified();
    unlockProperties();
 }
+
+/**
+ * Check if compression allowed for agent
+ */
+bool Node::isAgentCompressionAllowed()
+{
+   if (m_agentCompressionMode == NODE_AGENT_COMPRESSION_DEFAULT)
+      return ConfigReadInt(_T("DefaultAgentProtocolCompressionMode"), NODE_AGENT_COMPRESSION_ENABLED) == NODE_AGENT_COMPRESSION_ENABLED;
+   return m_agentCompressionMode == NODE_AGENT_COMPRESSION_ENABLED;
+}
index 6f50c18..00a6ed5 100644 (file)
@@ -12744,11 +12744,9 @@ void ClientSession::closeConsole(UINT32 rqId)
        sendMessage(&msg);
 }
 
-
-//
-// Process console command
-//
-
+/**
+ * Process console command
+ */
 void ClientSession::processConsoleCommand(NXCPMessage *request)
 {
        NXCPMessage msg;
index f7ef185..a6f0021 100644 (file)
@@ -139,8 +139,8 @@ protected:
    virtual ~AgentConnectionEx();
 
 public:
-   AgentConnectionEx(UINT32 nodeId, InetAddress ipAddr, WORD port = AGENT_LISTEN_PORT, int authMethod = AUTH_NONE, const TCHAR *secret = NULL) :
-            AgentConnection(ipAddr, port, authMethod, secret) { m_nodeId = nodeId; }
+   AgentConnectionEx(UINT32 nodeId, InetAddress ipAddr, WORD port = AGENT_LISTEN_PORT, int authMethod = AUTH_NONE, const TCHAR *secret = NULL, bool allowCompression = true) :
+            AgentConnection(ipAddr, port, authMethod, secret, allowCompression) { m_nodeId = nodeId; }
 
    UINT32 deployPolicy(AgentPolicy *policy);
    UINT32 uninstallPolicy(AgentPolicy *policy);
@@ -1296,6 +1296,16 @@ enum NodeType
 };
 
 /**
+ * Node agent compression modes
+ */
+enum NodeAgentCompressionMode
+{
+   NODE_AGENT_COMPRESSION_DEFAULT = 0,
+   NODE_AGENT_COMPRESSION_ENABLED = 1,
+   NODE_AGENT_COMPRESSION_DISABLED = 2
+};
+
+/**
  * Node
  */
 class NXCORE_EXPORTABLE Node : public DataCollectionTarget
@@ -1332,6 +1342,7 @@ protected:
    UINT16 m_agentPort;
    INT16 m_agentAuthMethod;
    INT16 m_agentCacheMode;
+   INT16 m_agentCompressionMode;  // agent compression mode (enabled/disabled/default)
    TCHAR m_szSharedSecret[MAX_SECRET_LENGTH];
    INT16 m_iStatusPollType;
    INT16 m_snmpVersion;
@@ -1427,9 +1438,11 @@ protected:
    void checkOSPFSupport(SNMP_Transport *pTransport);
        void addVrrpInterfaces(InterfaceList *ifList);
        BOOL resolveName(BOOL useOnlyDNS);
-   void setAgentProxy(AgentConnection *pConn);
        void setPrimaryIPAddress(const InetAddress& addr);
 
+   void setAgentProxy(AgentConnection *conn);
+   bool isAgentCompressionAllowed();
+
    UINT32 getInterfaceCount(Interface **ppInterface);
 
    void checkInterfaceNames(InterfaceList *pIfList);
index 2f0f798..6b2e4ea 100644 (file)
@@ -495,6 +495,7 @@ private:
        bool m_deleteFileOnDownloadFailure;
        void (*m_sendToClientMessageCallback)(NXCP_MESSAGE*, void *);
        bool m_fileUploadInProgress;
+       bool m_allowCompression;
 
    void receiverThread();
    static THREAD_RESULT THREAD_CALL receiverThreadStarter(void *);
@@ -535,7 +536,7 @@ protected:
    virtual ~AgentConnection();
 
 public:
-   AgentConnection(InetAddress addr, WORD port = AGENT_LISTEN_PORT, int authMethod = AUTH_NONE, const TCHAR *secret = NULL);
+   AgentConnection(InetAddress addr, WORD port = AGENT_LISTEN_PORT, int authMethod = AUTH_NONE, const TCHAR *secret = NULL, bool allowCompression = true);
 
    void incRefCount() { InterlockedIncrement(&m_userRefCount); }
    void decRefCount() { if (InterlockedDecrement(&m_userRefCount) == 0) { disconnect(); decInternalRefCount(); } }
index a8b0f14..627b033 100644 (file)
@@ -72,7 +72,7 @@ THREAD_RESULT THREAD_CALL AgentConnection::receiverThreadStarter(void *pArg)
 /**
  * Constructor for AgentConnection
  */
-AgentConnection::AgentConnection(InetAddress addr, WORD port, int authMethod, const TCHAR *secret)
+AgentConnection::AgentConnection(InetAddress addr, WORD port, int authMethod, const TCHAR *secret, bool allowCompression)
 {
    m_internalRefCount = 1;
    m_userRefCount = 1;
@@ -92,6 +92,7 @@ AgentConnection::AgentConnection(InetAddress addr, WORD port, int authMethod, co
    {
       m_szSecret[0] = 0;
    }
+   m_allowCompression = allowCompression;
    m_hSocket = -1;
    m_tLastCommandTime = 0;
    m_dwNumDataLines = 0;
@@ -573,7 +574,7 @@ setup_encryption:
       goto setup_encryption;
    }
 
-   if(serverId != 0)
+   if (serverId != 0)
       setServerId(serverId);
 
    success = true;
@@ -904,6 +905,7 @@ UINT32 AgentConnection::setServerCapabilities()
    msg.setField(VID_ENABLED, (INT16)1);   // Enables IPv6 on pre-2.0 agents
    msg.setField(VID_IPV6_SUPPORT, (INT16)1);
    msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
+   msg.setField(VID_ENABLE_COMPRESSION, (INT16)(m_allowCompression ? 1 : 0));
    msg.setId(dwRqId);
    if (sendMessage(&msg))
       return waitForRCC(dwRqId, m_dwCommandTimeout);
@@ -952,7 +954,7 @@ UINT32 AgentConnection::waitForRCC(UINT32 dwRqId, UINT32 dwTimeOut)
 bool AgentConnection::sendMessage(NXCPMessage *pMsg)
 {
    bool success;
-   NXCP_MESSAGE *pRawMsg = pMsg->createMessage();
+   NXCP_MESSAGE *pRawMsg = pMsg->createMessage(m_allowCompression);
        NXCPEncryptionContext *pCtx = acquireEncryptionContext();
    if (pCtx != NULL)
    {
index 80cd4c4..ef59241 100644 (file)
@@ -747,6 +747,24 @@ static bool SetSchemaVersion(int version)
 }
 
 /**
+ * Upgrade from V434 to V435
+ */
+static BOOL H_UpgradeFromV434(int currVersion, int newVersion)
+{
+   CHK_EXEC(CreateConfigParam(_T("DefaultAgentProtocolCompressionMode"), _T("1"), _T("Default agent protocol compression mode"), 'C', true, false, false, false));
+   static const TCHAR *batch =
+            _T("UPDATE config SET data_type='C',description='Default agent cache mode' WHERE var_name='DefaultAgentCacheMode'\n")
+            _T("INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentCacheMode','1','On')\n")
+            _T("INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentCacheMode','2','Off')\n")
+            _T("INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentProtocolCompressionMode','1','Enabled')\n")
+            _T("INSERT INTO config_values (var_name,var_value,var_description) VALUES ('DefaultAgentProtocolCompressionMode','2','Disabled')\n")
+            _T("<END>");
+   CHK_EXEC(SQLBatch(batch));
+   CHK_EXEC(SetSchemaVersion(435));
+   return TRUE;
+}
+
+/**
  * Upgrade from V433 to V434
  */
 static BOOL H_UpgradeFromV433(int currVersion, int newVersion)
@@ -11364,6 +11382,7 @@ static struct
    { 431, 432, H_UpgradeFromV431 },
    { 432, 433, H_UpgradeFromV432 },
    { 433, 434, H_UpgradeFromV433 },
+   { 434, 435, H_UpgradeFromV434 },
    { 0, 0, NULL }
 };
 
index e810874..ac74329 100644 (file)
@@ -36,6 +36,7 @@ ZFile::ZFile(FILE *pFile, BOOL bCompress, BOOL bWrite)
    m_bCompress = bCompress;
    m_bWrite = bWrite;
    m_pFile = pFile;
+   m_pBufferPos = NULL;
    if (bCompress)
    {
       // Initialize compression stream
index 0e0049b..56da2f5 100644 (file)
@@ -26,13 +26,16 @@ import org.eclipse.swt.events.SelectionListener;
 import org.eclipse.swt.layout.FormAttachment;
 import org.eclipse.swt.layout.FormData;
 import org.eclipse.swt.layout.FormLayout;
+import org.eclipse.swt.layout.GridLayout;
 import org.eclipse.swt.widgets.Button;
 import org.eclipse.swt.widgets.Combo;
 import org.eclipse.swt.widgets.Composite;
 import org.eclipse.swt.widgets.Control;
+import org.eclipse.swt.widgets.Group;
 import org.eclipse.ui.dialogs.PropertyPage;
 import org.netxms.client.NXCObjectModificationData;
 import org.netxms.client.NXCSession;
+import org.netxms.client.constants.AgentCompressionMode;
 import org.netxms.client.objects.AbstractNode;
 import org.netxms.ui.eclipse.jobs.ConsoleJob;
 import org.netxms.ui.eclipse.objectbrowser.widgets.ObjectSelector;
@@ -53,6 +56,9 @@ public class Agent extends PropertyPage
    private Combo agentAuthMethod;
    private Button agentForceEncryption;
    private ObjectSelector agentProxy;
+   private Button radioAgentCompressionDefault;
+   private Button radioAgentCompressionEnabled;
+   private Button radioAgentCompressionDisabled;
 
    /* (non-Javadoc)
     * @see org.eclipse.jface.preference.PreferencePage#createContents(org.eclipse.swt.widgets.Composite)
@@ -127,10 +133,50 @@ public class Agent extends PropertyPage
       agentSharedSecret.setLayoutData(fd);
       agentSharedSecret.getTextControl().setEnabled(node.getAgentAuthMethod() != AbstractNode.AGENT_AUTH_NONE);
       
+      /* agent compression */
+      Group agentCompressionGroup = new Group(dialogArea, SWT.NONE);
+      agentCompressionGroup.setText("Protocol compression mode");
+      GridLayout layout = new GridLayout();
+      layout.horizontalSpacing = WidgetHelper.DIALOG_SPACING;
+      layout.numColumns = 3;
+      layout.makeColumnsEqualWidth = true;
+      agentCompressionGroup.setLayout(layout);
+      fd = new FormData();
+      fd.left = new FormAttachment(0, 0);
+      fd.right = new FormAttachment(100, 0);
+      fd.top = new FormAttachment(agentSharedSecret, 0, SWT.BOTTOM);
+      agentCompressionGroup.setLayoutData(fd);
+      
+      radioAgentCompressionDefault = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionDefault.setText("Default");
+      radioAgentCompressionDefault.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.DEFAULT);
+
+      radioAgentCompressionEnabled = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionEnabled.setText("Enabled");
+      radioAgentCompressionEnabled.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.ENABLED);
+
+      radioAgentCompressionDisabled = new Button(agentCompressionGroup, SWT.RADIO);
+      radioAgentCompressionDisabled.setText("Disabled");
+      radioAgentCompressionDisabled.setSelection(node.getAgentCompressionMode() == AgentCompressionMode.DISABLED);
+
       return dialogArea;
    }
 
    /**
+    * Collect agent compression mode from radio buttons
+    * 
+    * @return
+    */
+   private AgentCompressionMode collectAgentCompressionMode()
+   {
+      if (radioAgentCompressionEnabled.getSelection())
+         return AgentCompressionMode.ENABLED;
+      if (radioAgentCompressionDisabled.getSelection())
+         return AgentCompressionMode.DISABLED;
+      return AgentCompressionMode.DEFAULT;
+   }
+   
+   /**
     * Apply changes
     * 
     * @param isApply true if update operation caused by "Apply" button
@@ -156,14 +202,8 @@ public class Agent extends PropertyPage
       md.setAgentProxy(agentProxy.getObjectId());
       md.setAgentAuthMethod(agentAuthMethod.getSelectionIndex());
       md.setAgentSecret(agentSharedSecret.getText());
-      
-      /* TODO: sync in some way with "Polling" page */
-      int flags = node.getFlags();
-      if (agentForceEncryption.getSelection())
-         flags |= AbstractNode.NF_FORCE_ENCRYPTION;
-      else
-         flags &= ~AbstractNode.NF_FORCE_ENCRYPTION;
-      md.setObjectFlags(flags);
+      md.setAgentCompressionMode(collectAgentCompressionMode());
+      md.setObjectFlags(agentForceEncryption.getSelection() ? AbstractNode.NF_FORCE_ENCRYPTION : 0, AbstractNode.NF_FORCE_ENCRYPTION);
 
       final NXCSession session = (NXCSession)ConsoleSharedData.getSession();
       new ConsoleJob(String.format("Updating agent communication settings for node %s", node.getObjectName()), null, Activator.PLUGIN_ID, null) {