Fixed problem with file sheduled file upload. Fixed some memory leaks Fixes #1187
[public/netxms.git] / src / server / core / job.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2011 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: job.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24 #include <math.h>
25
26 /**
27 * Externals
28 */
29 void UnregisterJob(UINT32 jobId);
30
31
32 /**
33 * Constructor
34 */
35 ServerJob::ServerJob(const TCHAR *type, const TCHAR *description, UINT32 node, UINT32 userId, bool createOnHold, int retryCount)
36 {
37 m_id = CreateUniqueId(IDG_JOB);
38 m_userId = userId;
39 m_type = _tcsdup(CHECK_NULL(type));
40 m_description = _tcsdup(CHECK_NULL(description));
41 m_status = createOnHold ? JOB_ON_HOLD : JOB_PENDING;
42 m_lastStatusChange = time(NULL);
43 m_autoCancelDelay = 0;
44 m_remoteNode = node;
45 m_resolvedObject = FindObjectById(m_remoteNode);
46 m_progress = 0;
47 m_failureMessage = NULL;
48 m_owningQueue = NULL;
49 m_workerThread = INVALID_THREAD_HANDLE;
50 m_lastNotification = 0;
51 m_notificationLock = MutexCreate();
52 m_blockNextJobsOnFailure = false;
53 m_retryCount = retryCount == -1 ? ConfigReadInt(_T("JobRetryCount"), 5) : retryCount;
54 m_isValid = (m_resolvedObject != NULL) && (m_resolvedObject->getObjectClass() == OBJECT_NODE);
55
56 createHistoryRecord();
57 }
58
59 /**
60 * Constructor that creates class from serialized string
61 */
62 ServerJob::ServerJob(const TCHAR* params, UINT32 node, UINT32 userId)
63 {
64 m_id = CreateUniqueId(IDG_JOB);
65 m_userId = userId;
66 m_type = _tcsdup(_T(""));
67 m_status = JOB_PENDING;
68 m_description = _tcsdup(_T(""));
69 m_lastStatusChange = time(NULL);
70 m_autoCancelDelay = 0;
71 m_remoteNode = node;
72 m_resolvedObject = FindObjectById(m_remoteNode);
73 m_progress = 0;
74 m_failureMessage = NULL;
75 m_owningQueue = NULL;
76 m_workerThread = INVALID_THREAD_HANDLE;
77 m_lastNotification = 0;
78 m_notificationLock = MutexCreate();
79 m_blockNextJobsOnFailure = false;
80 m_retryCount = ConfigReadInt(_T("JobRetryCount"), 5);
81 m_isValid = (m_resolvedObject != NULL) && (m_resolvedObject->getObjectClass() == OBJECT_NODE);
82
83 createHistoryRecord();
84 }
85
86 /**
87 * Destructor
88 */
89 ServerJob::~ServerJob()
90 {
91 UnregisterJob(m_id);
92
93 ThreadJoin(m_workerThread);
94
95 safe_free(m_type);
96 safe_free(m_description);
97 safe_free(m_failureMessage);
98 MutexDestroy(m_notificationLock);
99 }
100
101
102 /**
103 * Send notification to clients
104 */
105 void ServerJob::sendNotification(ClientSession *session, void *arg)
106 {
107 ServerJob *job = (ServerJob *)arg;
108 if (job->m_resolvedObject->checkAccessRights(session->getUserId(), OBJECT_ACCESS_READ))
109 session->postMessage(&job->m_notificationMessage);
110 }
111
112
113 /**
114 * Notify clients
115 */
116 void ServerJob::notifyClients(bool isStatusChange)
117 {
118 if (m_resolvedObject == NULL)
119 return;
120
121 time_t t = time(NULL);
122 if (!isStatusChange && (t - m_lastNotification < 3))
123 return; // Don't send progress notifications often then every 3 seconds
124 m_lastNotification = t;
125
126 MutexLock(m_notificationLock);
127 m_notificationMessage.setCode(CMD_JOB_CHANGE_NOTIFICATION);
128 fillMessage(&m_notificationMessage);
129 EnumerateClientSessions(ServerJob::sendNotification, this);
130 MutexUnlock(m_notificationLock);
131 }
132
133
134 /**
135 * Change status
136 */
137 void ServerJob::changeStatus(ServerJobStatus newStatus)
138 {
139 m_status = newStatus;
140 m_lastStatusChange = time(NULL);
141 notifyClients(true);
142 }
143
144
145 /**
146 * Set owning queue
147 */
148 void ServerJob::setOwningQueue(ServerJobQueue *queue)
149 {
150 m_owningQueue = queue;
151 notifyClients(true);
152 }
153
154
155 /**
156 * Update progress
157 */
158 void ServerJob::markProgress(int pctCompleted)
159 {
160 if ((pctCompleted > m_progress) && (pctCompleted <= 100))
161 {
162 m_progress = pctCompleted;
163 notifyClients(false);
164 }
165 }
166
167 /**
168 * Worker thread starter
169 */
170 THREAD_RESULT THREAD_CALL ServerJob::WorkerThreadStarter(void *arg)
171 {
172 ServerJob *job = (ServerJob *)arg;
173 DbgPrintf(4, _T("Job %d started"), job->m_id);
174 job->updateHistoryRecord(true);
175 ServerJobResult result = job->run();
176
177 switch(result)
178 {
179 case JOB_RESULT_SUCCESS:
180 job->changeStatus(JOB_COMPLETED);
181 break;
182 case JOB_RESULT_FAILED:
183 if (job->m_status == JOB_CANCEL_PENDING)
184 job->changeStatus(JOB_CANCELLED);
185 else
186 job->changeStatus(JOB_FAILED);
187 break;
188 case JOB_RESULT_RESCHEDULE:
189 job->rescheduleExecution();
190 job->changeStatus(JOB_FAILED);
191 break;
192 }
193 job->m_workerThread = INVALID_THREAD_HANDLE;
194
195 DbgPrintf(4, _T("Job %d finished, status=%s"), job->m_id, (job->m_status == JOB_COMPLETED) ? _T("COMPLETED") : ((job->m_status == JOB_CANCELLED) ? _T("CANCELLED") : _T("FAILED")));
196 job->updateHistoryRecord(false);
197
198 if (job->m_owningQueue != NULL)
199 job->m_owningQueue->jobCompleted(job);
200 return THREAD_OK;
201 }
202
203 /**
204 * Start job
205 */
206 void ServerJob::start()
207 {
208 m_status = JOB_ACTIVE;
209 m_workerThread = ThreadCreateEx(WorkerThreadStarter, 0, this);
210 }
211
212 /**
213 * Cancel job
214 */
215 bool ServerJob::cancel()
216 {
217 switch(m_status)
218 {
219 case JOB_COMPLETED:
220 case JOB_CANCEL_PENDING:
221 return false;
222 case JOB_ACTIVE:
223 if (!onCancel())
224 return false;
225 changeStatus(JOB_CANCEL_PENDING);
226 return true;
227 default:
228 changeStatus(JOB_CANCELLED);
229 return true;
230 }
231 }
232
233
234 /**
235 * Hold job
236 */
237 bool ServerJob::hold()
238 {
239 if (m_status == JOB_PENDING)
240 {
241 changeStatus(JOB_ON_HOLD);
242 return true;
243 }
244 return false;
245 }
246
247
248 /**
249 * Unhold job
250 */
251 bool ServerJob::unhold()
252 {
253 if (m_status == JOB_ON_HOLD)
254 {
255 changeStatus(JOB_PENDING);
256 return true;
257 }
258 return false;
259 }
260
261
262 /**
263 * Default run (empty)
264 */
265 ServerJobResult ServerJob::run()
266 {
267 return JOB_RESULT_SUCCESS;
268 }
269
270
271 /**
272 * Default cancel handler
273 */
274 bool ServerJob::onCancel()
275 {
276 return false;
277 }
278
279
280 /**
281 * Set failure message
282 */
283 void ServerJob::setFailureMessage(const TCHAR *msg)
284 {
285 safe_free(m_failureMessage);
286 m_failureMessage = (msg != NULL) ? _tcsdup(msg) : NULL;
287 }
288
289
290 /**
291 * Set description
292 */
293 void ServerJob::setDescription(const TCHAR *description)
294 {
295 safe_free(m_description);
296 m_description = _tcsdup(description);
297 }
298
299
300 /**
301 * Fill NXCP message with job's data
302 */
303 void ServerJob::fillMessage(NXCPMessage *msg)
304 {
305 msg->setField(VID_JOB_ID, m_id);
306 msg->setField(VID_USER_ID, m_userId);
307 msg->setField(VID_JOB_TYPE, m_type);
308 msg->setField(VID_OBJECT_ID, m_remoteNode);
309 msg->setField(VID_DESCRIPTION, CHECK_NULL_EX(m_description));
310 msg->setField(VID_JOB_STATUS, (WORD)m_status);
311 msg->setField(VID_JOB_PROGRESS, (WORD)m_progress);
312 if (m_status == JOB_FAILED)
313 msg->setField(VID_FAILURE_MESSAGE, (m_failureMessage != NULL) ? m_failureMessage : _T("Internal error"));
314 else
315 msg->setField(VID_FAILURE_MESSAGE, CHECK_NULL_EX(m_failureMessage));
316 }
317
318 /**
319 * Create record in job history table
320 */
321 void ServerJob::createHistoryRecord()
322 {
323 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
324
325 DB_STATEMENT hStmt = DBPrepare(hdb,
326 _T("INSERT INTO job_history (id,time_created,time_started,time_finished,job_type,")
327 _T("description,node_id,user_id,status) VALUES (?,?,0,0,?,?,?,?,?)"));
328 if (hStmt != NULL)
329 {
330 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
331 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
332 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_type, DB_BIND_STATIC);
333 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
334 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_remoteNode);
335 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_userId);
336 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_status);
337 DBExecute(hStmt);
338 DBFreeStatement(hStmt);
339 }
340 DBConnectionPoolReleaseConnection(hdb);
341 }
342
343 /**
344 * Update job history record
345 */
346 void ServerJob::updateHistoryRecord(bool onStart)
347 {
348 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
349
350 DB_STATEMENT hStmt = DBPrepare(hdb,
351 onStart ?
352 _T("UPDATE job_history SET time_started=?,status=?,description=?,additional_info=? WHERE id=?") :
353 _T("UPDATE job_history SET time_finished=?,status=?,description=?,additional_info=?,failure_message=? WHERE id=?"));
354
355 if (hStmt != NULL)
356 {
357 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
358 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_status);
359 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
360 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, getAdditionalInfo(), DB_BIND_TRANSIENT);
361 if (onStart)
362 {
363 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_id);
364 }
365 else
366 {
367 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_failureMessage), DB_BIND_STATIC);
368 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_id);
369 }
370 DBExecute(hStmt);
371 DBFreeStatement(hStmt);
372 }
373 DBConnectionPoolReleaseConnection(hdb);
374 }
375
376 /**
377 * Get additional info for logging
378 */
379 const TCHAR *ServerJob::getAdditionalInfo()
380 {
381 return _T("");
382 }
383
384 /**
385 * Serializes job parameters into TCHAR line separated by ';'
386 */
387 TCHAR *ServerJob::serializeParameters()
388 {
389 return _tcsdup(_T(""));
390 }
391
392 /**
393 * Schedules execution in 10 minutes
394 */
395 void ServerJob::rescheduleExecution()
396 {
397 }
398
399
400
401 /**
402 * Returns next job execution interval in minutes
403 * Each next execution time will be twce bigger than the previous one
404 */
405 int ServerJob::getNextJobExecutionTime()
406 {
407 return pow(2.0f, (4 - m_retryCount)) * JOB_RESCHEDULE_OFSET;
408 }