Fixed problem with file sheduled file upload. Fixed some memory leaks Fixes #1187
[public/netxms.git] / src / server / core / upload_job.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: upload_job.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Static members
27 */
28 int FileUploadJob::m_activeJobs = 0;
29 int FileUploadJob::m_maxActiveJobs = 10;
30 MUTEX FileUploadJob::m_sharedDataMutex = INVALID_MUTEX_HANDLE;
31
32 /**
33 * Scheduled file upload
34 */
35 void ScheduledFileUpload(const ScheduledTaskParameters *params)
36 {
37 Node *object = (Node *)FindObjectById(params->m_objectId, OBJECT_NODE);
38 if (object != NULL)
39 {
40 if (object->checkAccessRights(params->m_userId, OBJECT_ACCESS_CONTROL))
41 {
42
43 ServerJob *job = new FileUploadJob(params->m_params, params->m_objectId, params->m_userId);
44 if (!AddJob(job))
45 {
46 delete job;
47 DbgPrintf(4, _T("ScheduledUploadFile: Failed to add job(incorrect parameters or no such object)."));
48 }
49 }
50 else
51 DbgPrintf(4, _T("ScheduledUploadFile: Access to node %s denied"), object->getName());
52 }
53 else
54 DbgPrintf(4, _T("ScheduledUploadFile: Node with id=\'%d\' not found"), params->m_userId);
55 }
56
57 /**
58 * Static initializer
59 */
60 void FileUploadJob::init()
61 {
62 m_sharedDataMutex = MutexCreate();
63 m_maxActiveJobs = ConfigReadInt(_T("MaxActiveUploadJobs"), 10);
64 RegisterSchedulerTaskHandler(_T("Upload.File"), ScheduledFileUpload, SYSTEM_ACCESS_SCHEDULE_FILE_UPLOAD);
65 }
66
67 /**
68 * Constructor
69 */
70 FileUploadJob::FileUploadJob(Node *node, const TCHAR *localFile, const TCHAR *remoteFile, UINT32 userId, bool createOnHold)
71 : ServerJob(_T("UPLOAD_FILE"), _T("Upload file to managed node"), node->getId(), userId, createOnHold)
72 {
73 m_node = node;
74 node->incRefCount();
75
76 TCHAR buffer[1024];
77 _sntprintf(buffer, 1024, _T("Upload file %s"), GetCleanFileName(localFile));
78 setDescription(buffer);
79
80 m_localFile = _tcsdup(localFile);
81 setLocalFileFullPath();
82 m_remoteFile = (remoteFile != NULL) ? _tcsdup(remoteFile) : NULL;
83
84 _sntprintf(buffer, 1024, _T("Local file: %s; Remote file: %s"), m_localFile, CHECK_NULL(m_remoteFile));
85 m_info = _tcsdup(buffer);
86
87 m_fileSize = 0;
88 }
89
90 FileUploadJob::FileUploadJob(TCHAR* params, UINT32 node, UINT32 userId)
91 : ServerJob(_T("UPLOAD_FILE"), _T("Upload file to managed node"), node, userId, false)
92 {
93 m_node = (Node *)FindObjectById(node, OBJECT_NODE);
94 if(m_node != NULL)
95 m_node->incRefCount();
96
97 StringList fileList(params, _T(","));
98 if(fileList.size() < 2)
99 {
100 setIsValid(false);
101 return;
102 }
103
104 if(fileList.size() == 3)
105 m_retryCount = _tcstol(fileList.get(2), NULL, 0);
106
107 TCHAR buffer[1024];
108 _sntprintf(buffer, 1024, _T("Upload file %s"), GetCleanFileName(fileList.get(0)));
109 setDescription(buffer);
110
111 m_localFile = _tcsdup(fileList.get(0));
112 setLocalFileFullPath();
113 m_remoteFile = fileList.get(1)[0] != 0 ? _tcsdup(fileList.get(1)) : NULL;
114
115 _sntprintf(buffer, 1024, _T("Local file: %s; Remote file: %s"), m_localFile, CHECK_NULL(fileList.get(1)));
116 m_info = _tcsdup(buffer);
117
118 m_fileSize = 0;
119 }
120
121 void FileUploadJob::setLocalFileFullPath()
122 {
123 int nLen;
124 TCHAR fullPath[MAX_PATH];
125
126 // Create full path to the file store
127 _tcscpy(fullPath, g_netxmsdDataDir);
128 _tcscat(fullPath, DDIR_FILES);
129 _tcscat(fullPath, FS_PATH_SEPARATOR);
130 nLen = (int)_tcslen(fullPath);
131 nx_strncpy(&fullPath[nLen], GetCleanFileName(m_localFile), MAX_PATH - nLen);
132 m_localFileFullPath = _tcsdup(fullPath);
133 }
134
135 /**
136 * Destructor
137 */
138 FileUploadJob::~FileUploadJob()
139 {
140 m_node->decRefCount();
141 safe_free(m_localFile);
142 safe_free(m_localFileFullPath);
143 safe_free(m_remoteFile);
144 safe_free(m_info);
145 }
146
147 /**
148 * Run job
149 */
150 ServerJobResult FileUploadJob::run()
151 {
152 ServerJobResult success = JOB_RESULT_FAILED;
153
154 while(true)
155 {
156 MutexLock(m_sharedDataMutex);
157 if (m_activeJobs < m_maxActiveJobs)
158 {
159 m_activeJobs++;
160 MutexUnlock(m_sharedDataMutex);
161 break;
162 }
163 MutexUnlock(m_sharedDataMutex);
164 ThreadSleep(5);
165 }
166
167 AgentConnectionEx *conn = m_node->createAgentConnection();
168 if (conn != NULL)
169 {
170 m_fileSize = (INT64)FileSize(m_localFileFullPath);
171 UINT32 rcc = conn->uploadFile(m_localFileFullPath, m_remoteFile, uploadCallback, this);
172 if (rcc == ERR_SUCCESS)
173 {
174 success = JOB_RESULT_SUCCESS;
175 }
176 else
177 {
178 setFailureMessage(AgentErrorCodeToText(rcc));
179 }
180 conn->decRefCount();
181 }
182 else
183 {
184 setFailureMessage(_T("Agent connection not available"));
185 }
186
187 MutexLock(m_sharedDataMutex);
188 m_activeJobs--;
189 MutexUnlock(m_sharedDataMutex);
190
191 if(success == JOB_RESULT_FAILED && m_retryCount-- > 0)
192 {
193 TCHAR description[256];
194 _sntprintf(description, 256, _T("File upload failed. Wainting %d minutes to restart job."), getNextJobExecutionTime()/60);
195 setDescription(description);
196 success = JOB_RESULT_RESCHEDULE;
197 }
198
199 return success;
200 }
201
202 /**
203 * Upload progress callback
204 */
205 void FileUploadJob::uploadCallback(INT64 size, void *arg)
206 {
207 if (((FileUploadJob *)arg)->m_fileSize > 0)
208 ((FileUploadJob *)arg)->markProgress((int)(size * _LL(100) / ((FileUploadJob *)arg)->m_fileSize));
209 else
210 ((FileUploadJob *)arg)->markProgress(100);
211 }
212
213 /**
214 * Get additional info for logging
215 */
216 const TCHAR *FileUploadJob::getAdditionalInfo()
217 {
218 return m_info;
219 }
220
221 /**
222 * Serializes job parameters into TCHAR line separated by ';'
223 */
224 TCHAR *FileUploadJob::serializeParameters()
225 {
226 String params;
227 params.append(m_localFile);
228 params.append(_T(','));
229 params.append(CHECK_NULL_EX(m_remoteFile));
230 params.append(_T(','));
231 params.append(m_retryCount);
232 return _tcsdup(params.getBuffer());
233 }
234
235 /**
236 * Schedules execution in 10 minutes
237 */
238 void FileUploadJob::rescheduleExecution()
239 {
240 TCHAR *param = serializeParameters();
241 AddOneTimeScheduledTask(_T("Policy.Uninstall"), time(NULL) + getNextJobExecutionTime(), param, 0, getRemoteNode(), SYSTEM_ACCESS_FULL, SCHEDULED_TASK_SYSTEM);//TODO: change to correct user
242 free(param);
243 }
244