d043c788e418783f043bc34ae9ca28c2021476d9
[public/netxms.git] / src / server / core / upload_job.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: upload_job.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Static members
27 */
28 int FileUploadJob::m_activeJobs = 0;
29 int FileUploadJob::m_maxActiveJobs = 10;
30 MUTEX FileUploadJob::m_sharedDataMutex = INVALID_MUTEX_HANDLE;
31
32 /**
33 * Scheduled file upload
34 */
35 void ScheduledFileUpload(const ScheduledTaskParameters *params)
36 {
37 Node *object = (Node *)FindObjectById(params->m_objectId, OBJECT_NODE);
38 if (object != NULL)
39 {
40 if (object->checkAccessRights(params->m_userId, OBJECT_ACCESS_CONTROL))
41 {
42
43 ServerJob *job = new FileUploadJob(params->m_params, params->m_objectId, params->m_userId);
44 if (!AddJob(job))
45 {
46 delete job;
47 DbgPrintf(4, _T("ScheduledUploadFile: Failed to add job(incorrect parameters or no such object)."));
48 }
49 }
50 else
51 DbgPrintf(4, _T("ScheduledUploadFile: Access to node %s denied"), object->getName());
52 }
53 else
54 DbgPrintf(4, _T("ScheduledUploadFile: Node with id=\'%d\' not found"), params->m_userId);
55 }
56
57 /**
58 * Static initializer
59 */
60 void FileUploadJob::init()
61 {
62 m_sharedDataMutex = MutexCreate();
63 m_maxActiveJobs = ConfigReadInt(_T("MaxActiveUploadJobs"), 10);
64 RegisterSchedulerTaskHandler(_T("Upload.File"), ScheduledFileUpload, SYSTEM_ACCESS_SCHEDULE_FILE_UPLOAD);
65 }
66
67 /**
68 * Constructor
69 */
70 FileUploadJob::FileUploadJob(Node *node, const TCHAR *localFile, const TCHAR *remoteFile, UINT32 userId, bool createOnHold)
71 : ServerJob(_T("UPLOAD_FILE"), _T("Upload file to managed node"), node->getId(), userId, createOnHold)
72 {
73 m_node = node;
74 node->incRefCount();
75
76 TCHAR buffer[1024];
77 _sntprintf(buffer, 1024, _T("Upload file %s"), GetCleanFileName(localFile));
78 setDescription(buffer);
79
80 m_localFile = _tcsdup(localFile);
81 m_remoteFile = (remoteFile != NULL) ? _tcsdup(remoteFile) : NULL;
82
83 _sntprintf(buffer, 1024, _T("Local file: %s; Remote file: %s"), m_localFile, CHECK_NULL(m_remoteFile));
84 m_info = _tcsdup(buffer);
85
86 m_fileSize = 0;
87 }
88
89 FileUploadJob::FileUploadJob(TCHAR* params, UINT32 node, UINT32 userId)
90 : ServerJob(_T("UPLOAD_FILE"), _T("Upload file to managed node"), node, userId, false)
91 {
92 m_node = (Node *)FindObjectById(node, OBJECT_NODE);
93 if(m_node != NULL)
94 m_node->incRefCount();
95
96
97 StringList fileList(params, _T(","));
98 if(fileList.size() < 2)
99 {
100 setIsValid(false);
101 return;
102 }
103
104 if(fileList.size() == 3)
105 m_retryCount = _tcstol(fileList.get(2), NULL, 0);
106
107 TCHAR buffer[1024];
108 _sntprintf(buffer, 1024, _T("Upload file %s"), GetCleanFileName(fileList.get(0)));
109 setDescription(buffer);
110
111 m_localFile = _tcsdup(fileList.get(0));
112 m_remoteFile = (fileList.size() == 3) ? _tcsdup(fileList.get(1)) : NULL;
113
114 _sntprintf(buffer, 1024, _T("Local file: %s; Remote file: %s"), m_localFile, CHECK_NULL(fileList.get(1)));
115 m_info = _tcsdup(buffer);
116
117 m_fileSize = 0;
118 }
119
120 /**
121 * Destructor
122 */
123 FileUploadJob::~FileUploadJob()
124 {
125 m_node->decRefCount();
126 safe_free(m_localFile);
127 safe_free(m_remoteFile);
128 safe_free(m_info);
129 }
130
131 /**
132 * Run job
133 */
134 ServerJobResult FileUploadJob::run()
135 {
136 ServerJobResult success = JOB_RESULT_FAILED;
137
138 while(true)
139 {
140 MutexLock(m_sharedDataMutex);
141 if (m_activeJobs < m_maxActiveJobs)
142 {
143 m_activeJobs++;
144 MutexUnlock(m_sharedDataMutex);
145 break;
146 }
147 MutexUnlock(m_sharedDataMutex);
148 ThreadSleep(5);
149 }
150
151 AgentConnectionEx *conn = m_node->createAgentConnection();
152 if (conn != NULL)
153 {
154 m_fileSize = (INT64)FileSize(m_localFile);
155 UINT32 rcc = conn->uploadFile(m_localFile, m_remoteFile, uploadCallback, this);
156 if (rcc == ERR_SUCCESS)
157 {
158 success = JOB_RESULT_SUCCESS;
159 }
160 else
161 {
162 setFailureMessage(AgentErrorCodeToText(rcc));
163 }
164 conn->decRefCount();
165 }
166 else
167 {
168 setFailureMessage(_T("Agent connection not available"));
169 }
170
171 MutexLock(m_sharedDataMutex);
172 m_activeJobs--;
173 MutexUnlock(m_sharedDataMutex);
174
175 if(success == JOB_RESULT_FAILED && m_retryCount-- > 0)
176 {
177 TCHAR description[256];
178 _sntprintf(description, 256, _T("File upload failed. Wainting %d minutes to restart job."), getNextJobExecutionTime()/60);
179 setDescription(description);
180 success = JOB_RESULT_RESCHEDULE;
181 }
182
183 return success;
184 }
185
186 /**
187 * Upload progress callback
188 */
189 void FileUploadJob::uploadCallback(INT64 size, void *arg)
190 {
191 if (((FileUploadJob *)arg)->m_fileSize > 0)
192 ((FileUploadJob *)arg)->markProgress((int)(size * _LL(100) / ((FileUploadJob *)arg)->m_fileSize));
193 else
194 ((FileUploadJob *)arg)->markProgress(100);
195 }
196
197 /**
198 * Get additional info for logging
199 */
200 const TCHAR *FileUploadJob::getAdditionalInfo()
201 {
202 return m_info;
203 }
204
205 /**
206 * Serializes job parameters into TCHAR line separated by ';'
207 */
208 const TCHAR *FileUploadJob::serializeParameters()
209 {
210 String params;
211 params.append(m_localFile);
212 params.append(_T(','));
213 params.append(CHECK_NULL_EX(m_remoteFile));
214 params.append(_T(','));
215 params.append(m_retryCount);
216 return _tcsdup(params.getBuffer());
217 }
218
219 /**
220 * Schedules execution in 10 minutes
221 */
222 void FileUploadJob::rescheduleExecution()
223 {
224 AddOneTimeScheduledTask(_T("Policy.Uninstall"), time(NULL) + getNextJobExecutionTime(), serializeParameters(), 0, getRemoteNode(), SYSTEM_ACCESS_FULL, SCHEDULED_TASK_SYSTEM);//TODO: change to correct user
225 }
226