Java API can receive compressed files from agents
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 14:33:04 +0000 (16:33 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 16 Feb 2017 14:33:04 +0000 (16:33 +0200)
13 files changed:
include/nms_agent.h
src/agent/core/nxagentd.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/agent/libnxagent/bridge.cpp
src/agent/subagents/filemgr/filemgr.cpp
src/agent/subagents/filemgr/filemgr.h
src/java/client/netxms-client/src/main/java/org/netxms/client/NXCReceivedFile.java
src/java/client/netxms-client/src/main/java/org/netxms/client/NXCSession.java
src/java/netxms-eclipse/FileManager/src/org/netxms/ui/eclipse/filemanager/views/AgentFileManager.java
src/server/core/download_job.cpp
src/server/core/session.cpp
webui/webapp/FileManager/src/org/netxms/ui/eclipse/filemanager/views/AgentFileManager.java

index 415d8fd..104026b 100644 (file)
@@ -527,7 +527,7 @@ public:
    virtual void postMessage(NXCPMessage *msg) = 0;
    virtual bool sendRawMessage(NXCP_MESSAGE *msg) = 0;
    virtual void postRawMessage(NXCP_MESSAGE *msg) = 0;
-       virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) = 0;
+       virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression) = 0;
    virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) = 0;
    virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) = 0;
    virtual UINT32 generateRequestId() = 0;
@@ -777,7 +777,7 @@ void LIBNXAGENT_EXPORTABLE AgentSendTrap2(UINT32 dwEvent, const TCHAR *eventName
 bool LIBNXAGENT_EXPORTABLE AgentEnumerateSessions(EnumerationCallbackResult (* callback)(AbstractCommSession *, void *), void *data);
 AbstractCommSession LIBNXAGENT_EXPORTABLE *AgentFindServerSession(UINT64 serverId);
 
-bool LIBNXAGENT_EXPORTABLE AgentSendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset);
+bool LIBNXAGENT_EXPORTABLE AgentSendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset, bool allowCompression);
 
 bool LIBNXAGENT_EXPORTABLE AgentPushParameterData(const TCHAR *parameter, const TCHAR *value);
 bool LIBNXAGENT_EXPORTABLE AgentPushParameterDataInt32(const TCHAR *parameter, LONG value);
index a33c7b4..0ac6b10 100644 (file)
@@ -104,7 +104,7 @@ void LIBNXAGENT_EXPORTABLE InitSubAgentAPI(void (* writeLog)(int, int, const TCH
                                            void (* sendTrap2)(UINT32, const TCHAR *, int, TCHAR **),
                                            bool (* enumerateSessions)(EnumerationCallbackResult (*)(AbstractCommSession *, void *), void*),
                                            AbstractCommSession *(* findServerSession)(UINT64),
-                                           bool (* sendFile)(void *, UINT32, const TCHAR *, long),
+                                           bool (* sendFile)(void *, UINT32, const TCHAR *, long, bool),
                                            bool (* pushData)(const TCHAR *, const TCHAR *, UINT32, time_t),
                                            DB_HANDLE (* getLocalDatabaseHandle)(),
                                            CONDITION shutdownCondition, const TCHAR *dataDirectory);
@@ -650,11 +650,11 @@ static void LoadPlatformSubagent()
 /**
  * Send file to server (subagent API)
  */
-static bool SendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset)
+static bool SendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset, bool allowCompression)
 {
        if (session == NULL)
                return false;
-       return ((CommSession *)session)->sendFile(requestId, file, offset);
+       return ((CommSession *)session)->sendFile(requestId, file, offset, allowCompression);
 }
 
 /**
index b257f37..c3eb012 100644 (file)
@@ -373,7 +373,7 @@ public:
    virtual void postMessage(NXCPMessage *msg) { if (!m_disconnected) m_sendQueue->put(msg->createMessage(m_allowCompression)); }
    virtual bool sendRawMessage(NXCP_MESSAGE *msg);
    virtual void postRawMessage(NXCP_MESSAGE *msg) { if (!m_disconnected) m_sendQueue->put(nx_memdup(msg, ntohl(msg->size))); }
-       virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset);
+       virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression);
    virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout);
    virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout);
    virtual UINT32 generateRequestId();
@@ -431,7 +431,7 @@ public:
    virtual void postMessage(NXCPMessage *pMsg) { }
    virtual bool sendRawMessage(NXCP_MESSAGE *pMsg) { return false; }
    virtual void postRawMessage(NXCP_MESSAGE *pMsg) { }
-   virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
+   virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression) { return false; }
    virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
    virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
    virtual UINT32 generateRequestId() { return 0; }
index 80262a7..4d9f853 100644 (file)
@@ -926,11 +926,12 @@ static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
 /**
  * Send file to server
  */
-bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
+bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression)
 {
    if (m_disconnected)
       return false;
-       return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
+       return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset,
+                SendFileProgressCallback, this, m_socketWriteMutex, allowCompression ? NXCP_STREAM_COMPRESSION_DEFLATE : NXCP_STREAM_COMPRESSION_NONE) ? true : false;
 }
 
 /**
index b173d31..fc8cd11 100644 (file)
@@ -30,7 +30,7 @@ static void (* s_fpSendTrap1)(UINT32, const TCHAR *, const char *, va_list) = NU
 static void (* s_fpSendTrap2)(UINT32, const TCHAR *, int, TCHAR **) = NULL;
 static AbstractCommSession *(* s_fpFindServerSession)(UINT64) = NULL;
 static bool (* s_fpEnumerateSessions)(EnumerationCallbackResult (*)(AbstractCommSession *, void *), void *) = NULL;
-static bool (* s_fpSendFile)(void *, UINT32, const TCHAR *, long) = NULL;
+static bool (* s_fpSendFile)(void *, UINT32, const TCHAR *, long, bool) = NULL;
 static bool (* s_fpPushData)(const TCHAR *, const TCHAR *, UINT32, time_t) = NULL;
 static CONDITION s_agentShutdownCondition = INVALID_CONDITION_HANDLE;
 static const TCHAR *s_dataDirectory = NULL;
@@ -44,7 +44,7 @@ void LIBNXAGENT_EXPORTABLE InitSubAgentAPI(void (* writeLog)(int, int, const TCH
                                            void (* sendTrap2)(UINT32, const TCHAR *, int, TCHAR **),
                                            bool (* enumerateSessions)(EnumerationCallbackResult (*)(AbstractCommSession *, void *), void*),
                                            AbstractCommSession *(* findServerSession)(UINT64),
-                                           bool (* sendFile)(void *, UINT32, const TCHAR *, long),
+                                           bool (* sendFile)(void *, UINT32, const TCHAR *, long, bool),
                                            bool (* pushData)(const TCHAR *, const TCHAR *, UINT32, time_t),
                                            DB_HANDLE (* getLocalDatabaseHandle)(),
                                            CONDITION shutdownCondition, const TCHAR *dataDirectory)
@@ -165,11 +165,11 @@ bool LIBNXAGENT_EXPORTABLE AgentEnumerateSessions(EnumerationCallbackResult (* c
 /**
  * Send file to server
  */
-bool LIBNXAGENT_EXPORTABLE AgentSendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset)
+bool LIBNXAGENT_EXPORTABLE AgentSendFileToServer(void *session, UINT32 requestId, const TCHAR *file, long offset, bool allowCompression)
 {
        if ((s_fpSendFile == NULL) || (session == NULL) || (file == NULL))
                return FALSE;
-       return s_fpSendFile(session, requestId, file, offset);
+       return s_fpSendFile(session, requestId, file, offset, allowCompression);
 }
 
 /**
index 7aaa2da..b45a529 100644 (file)
@@ -1,6 +1,6 @@
 /*
  ** File management subagent
- ** Copyright (C) 2014-2016 Raden Solutions
+ ** Copyright (C) 2014-2017 Raden Solutions
  **
  ** This program is free software; you can redistribute it and/or modify
  ** it under the terms of the GNU General Public License as published by
@@ -598,10 +598,10 @@ static BOOL MoveFile(TCHAR* oldName, TCHAR* newName)
 {
    MessageData *data = (MessageData *)dataStruct;
 
-   AgentWriteDebugLog(5, _T("CommSession::getLocalFile(): request for file \"%s\", follow = %s"),
-               data->fileName, data->follow ? _T("true") : _T("false"));
-   BOOL result = AgentSendFileToServer(data->session, data->id, data->fileName, (int)data->offset);
-   if (data->follow && result)
+   AgentWriteDebugLog(5, _T("CommSession::getLocalFile(): request for file \"%s\", follow = %s, compress = %s"),
+               data->fileName, data->follow ? _T("true") : _T("false"), data->allowCompression ? _T("true") : _T("false"));
+   bool success = AgentSendFileToServer(data->session, data->id, data->fileName, (int)data->offset, data->allowCompression);
+   if (data->follow && success)
    {
       g_monitorFileList.add(data->fileNameCode);
       FollowData *flData = new FollowData(data->fileName, data->fileNameCode, 0, data->session->getServerAddress());
@@ -613,6 +613,9 @@ static BOOL MoveFile(TCHAR* oldName, TCHAR* newName)
    return THREAD_OK;
 }
 
+/**
+ * Get folder information
+ */
 static void GetFolderInfo(const TCHAR *folder, UINT64 *fileCount, UINT64 *folderSize)
 {
    _TDIR *dir = _topendir(folder);
@@ -881,7 +884,8 @@ static BOOL ProcessCommands(UINT32 command, NXCPMessage *request, NXCPMessage *r
             MessageData *data = new MessageData();
             data->fileName = _tcsdup(fileName);
             data->fileNameCode = fileNameCode;
-            data->follow = request->getFieldAsUInt16(VID_FILE_FOLLOW) ? true : false;
+            data->follow = request->getFieldAsBoolean(VID_FILE_FOLLOW);
+            data->allowCompression = request->getFieldAsBoolean(VID_ENABLE_COMPRESSION);
             data->id = request->getId();
             data->offset = request->getFieldAsUInt32(VID_FILE_OFFSET);
             data->session = session;
index e9019dc..d537196 100644 (file)
@@ -76,6 +76,7 @@ struct MessageData
    TCHAR *fileName;
    TCHAR *fileNameCode;
    bool follow;
+   bool allowCompression;
    UINT32 id;
    long offset;
        AbstractCommSession *session;
index aa99622..114119b 100644 (file)
@@ -21,6 +21,9 @@ package org.netxms.client;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import org.netxms.base.Logger;
+import com.jcraft.jzlib.Inflater;
+import com.jcraft.jzlib.JZlib;
 
 /**
  * Represents file received from server
@@ -39,7 +42,8 @@ public class NXCReceivedFile
        private int status;
        private long timestamp;
        private long size;
-       private IOException exception;
+       private Exception exception;
+       private Inflater decompressor = null;
        
        /**
         * Create new received file with given id
@@ -68,17 +72,42 @@ public class NXCReceivedFile
         * Write data to file
         * @param data
         */
-       protected void writeData(final byte[] data)
+       protected void writeData(final byte[] data, boolean compressedStream)
        {
                if (status == OPEN)
                {
                        try
                        {
-                               stream.write(data);
+                          if (compressedStream)
+                          {
+                             if (data[0] != 2)
+                                throw new IOException("Unsupported stream compression method " + (int)data[0]);
+                             
+                             if (decompressor == null)
+                             {
+                                decompressor = new Inflater();
+                                Logger.debug(getClass().getName(), "Decompressor created for file " + file.getAbsolutePath());
+                             }
+                             decompressor.setInput(data, 4, data.length - 4, false);
+
+               int dataLength = (((int)data[2] << 8) & 0xFF00) | ((int)data[3] & 0xFF);
+               byte[] uncompressedData = new byte[dataLength];
+                             decompressor.setOutput(uncompressedData);
+                             
+                             int rc = decompressor.inflate(JZlib.Z_SYNC_FLUSH);
+                             if ((rc != JZlib.Z_OK) && (rc != JZlib.Z_STREAM_END))
+                                throw new IOException("Decompression error " + rc);
+               stream.write(uncompressedData);
+                          }
+                          else
+                          {
+                             stream.write(data);
+                          }
                                size += data.length;
                        }
-                       catch(IOException e)
+                       catch(Exception e)
                        {
+                          Logger.error(getClass().getName(), "Exception during file processing", e);
                                try
                                {
                                        stream.close();
@@ -114,6 +143,9 @@ public class NXCReceivedFile
                }
        }
        
+       /**
+        * Abort file transfer
+        */
        protected void abortTransfer()
        {
                if (status == OPEN)
@@ -177,6 +209,6 @@ public class NXCReceivedFile
         */
        public IOException getException()
        {
-               return (exception != null) ? exception : new IOException();
+               return ((exception != null) && (exception instanceof IOException)) ? (IOException)exception : new IOException(exception);
        }
 }
index ac73501..dfce172 100644 (file)
@@ -726,7 +726,7 @@ public class NXCSession
                receivedFiles.put(id, file);
             }
          }
-         file.writeData(msg.getBinaryData());
+         file.writeData(msg.getBinaryData(), msg.isCompressedStream());
          notifyProgressListener(id, msg.getBinaryData().length);
          if (msg.isEndOfFile())
          {
@@ -7040,9 +7040,11 @@ public class NXCSession
       }
       while(ttw > 0);
 
-      if (ttw == 0) throw new NXCException(RCC.TIMEOUT);
+      if (ttw == 0) 
+         throw new NXCException(RCC.TIMEOUT);
 
-      if (file.getStatus() == NXCReceivedFile.FAILED) throw file.getException();
+      if (file.getStatus() == NXCReceivedFile.FAILED) 
+         throw file.getException();
 
       return file.getFile();
    }
index ab9ded7..a76f8e5 100644 (file)
@@ -978,7 +978,7 @@ public class AgentFileManager extends ViewPart
    }
 
    /**
-    * Starts file tail view&messages
+    * Starts file tail
     */
    private void tailFile(final boolean followChanges, final int offset)
    {
index 17a1a95..e363cdf 100644 (file)
@@ -72,9 +72,9 @@ FileDownloadJob::~FileDownloadJob()
 {
        m_node->decRefCount();
        m_session->decRefCount();
-       safe_free(m_localFile);
-       safe_free(m_remoteFile);
-       safe_free(m_info);
+       free(m_localFile);
+       free(m_remoteFile);
+       free(m_info);
 }
 
 /**
@@ -150,7 +150,7 @@ ServerJobResult FileDownloadJob::run()
                                msg.setId(conn->generateRequestId());
                                msg.setField(VID_FILE_NAME, m_remoteFile);
 
-            //default - get parameters
+            // default - get parameters
             if (m_maxFileSize > 0)
             {
                msg.setField(VID_FILE_OFFSET, (UINT32)(-((int)m_maxFileSize)));
@@ -161,6 +161,7 @@ ServerJobResult FileDownloadJob::run()
             }
             msg.setField(VID_FILE_FOLLOW, m_follow);
             msg.setField(VID_NAME, m_localFile);
+            msg.setField(VID_ENABLE_COMPRESSION, (m_session == NULL) || m_session->isCompressionEnabled());
 
                                response = conn->customRequest(&msg, m_localFile, false, progressCallback, fileResendCallback, this);
                                if (response != NULL)
index efbd9e8..df0228a 100644 (file)
@@ -10915,8 +10915,9 @@ void ClientSession::getAgentFile(NXCPMessage *request)
                        if (object->getObjectClass() == OBJECT_NODE)
                        {
                                request->getFieldAsString(VID_FILE_NAME, remoteFile, MAX_PATH);
-            bool follow = request->getFieldAsUInt16(VID_FILE_FOLLOW) ? true : false;
-                               FileDownloadJob *job = new FileDownloadJob((Node *)object, remoteFile, request->getFieldAsUInt32(VID_FILE_SIZE_LIMIT), follow, this, request->getId());
+            bool follow = request->getFieldAsBoolean(VID_FILE_FOLLOW);
+                               FileDownloadJob *job = new FileDownloadJob((Node *)object, remoteFile,
+                                        request->getFieldAsUInt32(VID_FILE_SIZE_LIMIT), follow, this, request->getId());
                                msg.setField(VID_NAME, job->getLocalFileName());
                                if (AddJob(job))
                                {
index 2042d64..91aff0a 100644 (file)
@@ -864,7 +864,7 @@ public class AgentFileManager extends ViewPart
             {
                File folder = dlg.getLocalFile();
                session.createFolderOnAgent(objectId, upladFolder.getFullName()+"/"+dlg.getRemoteFileName()); //$NON-NLS-1$
-               listFilesForFolder(folder, upladFolder.getFullName()+"/"+dlg.getRemoteFileName(), monitor); //$NON-NLS-1$
+               uploadFilesInFolder(folder, upladFolder.getFullName()+"/"+dlg.getRemoteFileName(), monitor); //$NON-NLS-1$
                
                upladFolder.setChildren(session.listAgentFiles(upladFolder, upladFolder.getFullName(), objectId));
                runInUIThread(new Runnable() {
@@ -895,14 +895,14 @@ public class AgentFileManager extends ViewPart
     * @throws NXCException
     * @throws IOException
     */
-   public void listFilesForFolder(final File folder, final String uploadFolder, final IProgressMonitor monitor) throws NXCException, IOException 
+   public void uploadFilesInFolder(final File folder, final String uploadFolder, final IProgressMonitor monitor) throws NXCException, IOException 
    {
       for(final File fileEntry : folder.listFiles())
       {
           if (fileEntry.isDirectory()) 
           {
              session.createFolderOnAgent(objectId, uploadFolder + "/" + fileEntry.getName()); //$NON-NLS-1$
-             listFilesForFolder(fileEntry, uploadFolder + "/" + fileEntry.getName(), monitor); //$NON-NLS-1$
+             uploadFilesInFolder(fileEntry, uploadFolder + "/" + fileEntry.getName(), monitor); //$NON-NLS-1$
           } 
           else 
           {
@@ -971,7 +971,7 @@ public class AgentFileManager extends ViewPart
    }
 
    /**
-    * Starts file tail view&messages
+    * Starts file tail
     */
    private void tailFile(final boolean followChanges, final int offset)
    {
@@ -1023,6 +1023,7 @@ public class AgentFileManager extends ViewPart
                      final IWorkbenchWindow window = PlatformUI.getWorkbench().getActiveWorkbenchWindow();
                      MessageDialogHelper.openError(window.getShell(), Messages.get().AgentFileManager_Error,
                            String.format(Messages.get().AgentFileManager_OpenViewError, e.getLocalizedMessage()));
+                     Activator.logError("Exception in AgentFileManager.tailFile", e);
                   }
                }
             });