Fixed problem with file sheduled file upload. Fixed some memory leaks Fixes #1187
[public/netxms.git] / src / server / core / job.cpp
CommitLineData
76b4edb5 1/*
ab621f39 2** NetXMS - Network Management System
509bb045 3** Copyright (C) 2003-2011 Victor Kirhenshtein
ab621f39
VK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: job.cpp
20**
21**/
22
23#include "nxcore.h"
82deb2d7 24#include <math.h>
ab621f39 25
a2871ffc 26/**
27 * Externals
28 */
967893bb 29void UnregisterJob(UINT32 jobId);
27de5dab
VK
30
31
a2871ffc 32/**
33 * Constructor
34 */
82deb2d7 35ServerJob::ServerJob(const TCHAR *type, const TCHAR *description, UINT32 node, UINT32 userId, bool createOnHold, int retryCount)
ab621f39 36{
24dc5346 37 m_id = CreateUniqueId(IDG_JOB);
0edd0ab0 38 m_userId = userId;
ab621f39
VK
39 m_type = _tcsdup(CHECK_NULL(type));
40 m_description = _tcsdup(CHECK_NULL(description));
509bb045 41 m_status = createOnHold ? JOB_ON_HOLD : JOB_PENDING;
3929b1ca
VK
42 m_lastStatusChange = time(NULL);
43 m_autoCancelDelay = 0;
ab621f39 44 m_remoteNode = node;
8134d3a3 45 m_resolvedObject = FindObjectById(m_remoteNode);
ab621f39
VK
46 m_progress = 0;
47 m_failureMessage = NULL;
48 m_owningQueue = NULL;
49 m_workerThread = INVALID_THREAD_HANDLE;
3929b1ca
VK
50 m_lastNotification = 0;
51 m_notificationLock = MutexCreate();
8134d3a3 52 m_blockNextJobsOnFailure = false;
82deb2d7
TD
53 m_retryCount = retryCount == -1 ? ConfigReadInt(_T("JobRetryCount"), 5) : retryCount;
54 m_isValid = (m_resolvedObject != NULL) && (m_resolvedObject->getObjectClass() == OBJECT_NODE);
55
56 createHistoryRecord();
57}
58
59/**
60 * Constructor that creates class from serialized string
61 */
62ServerJob::ServerJob(const TCHAR* params, UINT32 node, UINT32 userId)
63{
64 m_id = CreateUniqueId(IDG_JOB);
65 m_userId = userId;
66 m_type = _tcsdup(_T(""));
67 m_status = JOB_PENDING;
68 m_description = _tcsdup(_T(""));
69 m_lastStatusChange = time(NULL);
70 m_autoCancelDelay = 0;
71 m_remoteNode = node;
72 m_resolvedObject = FindObjectById(m_remoteNode);
73 m_progress = 0;
74 m_failureMessage = NULL;
75 m_owningQueue = NULL;
76 m_workerThread = INVALID_THREAD_HANDLE;
77 m_lastNotification = 0;
78 m_notificationLock = MutexCreate();
79 m_blockNextJobsOnFailure = false;
80 m_retryCount = ConfigReadInt(_T("JobRetryCount"), 5);
81 m_isValid = (m_resolvedObject != NULL) && (m_resolvedObject->getObjectClass() == OBJECT_NODE);
82
878b4261 83 createHistoryRecord();
ab621f39
VK
84}
85
a2871ffc 86/**
87 * Destructor
88 */
ab621f39
VK
89ServerJob::~ServerJob()
90{
27de5dab
VK
91 UnregisterJob(m_id);
92
ab621f39
VK
93 ThreadJoin(m_workerThread);
94
95 safe_free(m_type);
96 safe_free(m_description);
712dd47d 97 safe_free(m_failureMessage);
3929b1ca
VK
98 MutexDestroy(m_notificationLock);
99}
100
101
a2871ffc 102/**
103 * Send notification to clients
104 */
3929b1ca
VK
105void ServerJob::sendNotification(ClientSession *session, void *arg)
106{
107 ServerJob *job = (ServerJob *)arg;
6b8e9f96 108 if (job->m_resolvedObject->checkAccessRights(session->getUserId(), OBJECT_ACCESS_READ))
d3a7cf4c 109 session->postMessage(&job->m_notificationMessage);
3929b1ca
VK
110}
111
112
a2871ffc 113/**
114 * Notify clients
115 */
3929b1ca
VK
116void ServerJob::notifyClients(bool isStatusChange)
117{
8134d3a3
VK
118 if (m_resolvedObject == NULL)
119 return;
120
3929b1ca 121 time_t t = time(NULL);
901a5a9b
VK
122 if (!isStatusChange && (t - m_lastNotification < 3))
123 return; // Don't send progress notifications often then every 3 seconds
124 m_lastNotification = t;
3929b1ca 125
c17f6cbc 126 MutexLock(m_notificationLock);
b368969c 127 m_notificationMessage.setCode(CMD_JOB_CHANGE_NOTIFICATION);
3929b1ca 128 fillMessage(&m_notificationMessage);
8134d3a3 129 EnumerateClientSessions(ServerJob::sendNotification, this);
3929b1ca
VK
130 MutexUnlock(m_notificationLock);
131}
132
133
a2871ffc 134/**
135 * Change status
136 */
3929b1ca
VK
137void ServerJob::changeStatus(ServerJobStatus newStatus)
138{
139 m_status = newStatus;
140 m_lastStatusChange = time(NULL);
141 notifyClients(true);
ab621f39
VK
142}
143
144
a2871ffc 145/**
146 * Set owning queue
147 */
ab621f39
VK
148void ServerJob::setOwningQueue(ServerJobQueue *queue)
149{
150 m_owningQueue = queue;
3929b1ca 151 notifyClients(true);
ab621f39
VK
152}
153
154
76b4edb5 155/**
156 * Update progress
157 */
ab621f39
VK
158void ServerJob::markProgress(int pctCompleted)
159{
160 if ((pctCompleted > m_progress) && (pctCompleted <= 100))
3929b1ca 161 {
ab621f39 162 m_progress = pctCompleted;
3929b1ca
VK
163 notifyClients(false);
164 }
ab621f39
VK
165}
166
a2871ffc 167/**
168 * Worker thread starter
169 */
ab621f39
VK
170THREAD_RESULT THREAD_CALL ServerJob::WorkerThreadStarter(void *arg)
171{
129a1ce0
VK
172 ServerJob *job = (ServerJob *)arg;
173 DbgPrintf(4, _T("Job %d started"), job->m_id);
878b4261 174 job->updateHistoryRecord(true);
82deb2d7 175 ServerJobResult result = job->run();
ab621f39 176
82deb2d7 177 switch(result)
129a1ce0 178 {
82deb2d7
TD
179 case JOB_RESULT_SUCCESS:
180 job->changeStatus(JOB_COMPLETED);
181 break;
182 case JOB_RESULT_FAILED:
183 if (job->m_status == JOB_CANCEL_PENDING)
184 job->changeStatus(JOB_CANCELLED);
185 else
186 job->changeStatus(JOB_FAILED);
187 break;
188 case JOB_RESULT_RESCHEDULE:
189 job->rescheduleExecution();
190 job->changeStatus(JOB_FAILED);
191 break;
129a1ce0
VK
192 }
193 job->m_workerThread = INVALID_THREAD_HANDLE;
ab621f39 194
878b4261
VK
195 DbgPrintf(4, _T("Job %d finished, status=%s"), job->m_id, (job->m_status == JOB_COMPLETED) ? _T("COMPLETED") : ((job->m_status == JOB_CANCELLED) ? _T("CANCELLED") : _T("FAILED")));
196 job->updateHistoryRecord(false);
ab621f39 197
129a1ce0
VK
198 if (job->m_owningQueue != NULL)
199 job->m_owningQueue->jobCompleted(job);
ab621f39
VK
200 return THREAD_OK;
201}
202
a2871ffc 203/**
204 * Start job
205 */
ab621f39
VK
206void ServerJob::start()
207{
208 m_status = JOB_ACTIVE;
209 m_workerThread = ThreadCreateEx(WorkerThreadStarter, 0, this);
210}
211
a2871ffc 212/**
213 * Cancel job
214 */
ab621f39
VK
215bool ServerJob::cancel()
216{
f40831eb
VK
217 switch(m_status)
218 {
219 case JOB_COMPLETED:
f2665675 220 case JOB_CANCEL_PENDING:
f40831eb
VK
221 return false;
222 case JOB_ACTIVE:
f2665675
VK
223 if (!onCancel())
224 return false;
225 changeStatus(JOB_CANCEL_PENDING);
226 return true;
f40831eb 227 default:
3929b1ca 228 changeStatus(JOB_CANCELLED);
f40831eb
VK
229 return true;
230 }
ab621f39
VK
231}
232
233
a2871ffc 234/**
235 * Hold job
236 */
509bb045
VK
237bool ServerJob::hold()
238{
239 if (m_status == JOB_PENDING)
240 {
241 changeStatus(JOB_ON_HOLD);
242 return true;
243 }
244 return false;
245}
246
247
a2871ffc 248/**
249 * Unhold job
250 */
509bb045
VK
251bool ServerJob::unhold()
252{
253 if (m_status == JOB_ON_HOLD)
254 {
255 changeStatus(JOB_PENDING);
256 return true;
257 }
258 return false;
259}
260
261
a2871ffc 262/**
263 * Default run (empty)
264 */
82deb2d7 265ServerJobResult ServerJob::run()
ab621f39 266{
82deb2d7 267 return JOB_RESULT_SUCCESS;
ab621f39
VK
268}
269
270
a2871ffc 271/**
272 * Default cancel handler
273 */
ab621f39
VK
274bool ServerJob::onCancel()
275{
276 return false;
277}
278
279
a2871ffc 280/**
281 * Set failure message
282 */
ab621f39
VK
283void ServerJob::setFailureMessage(const TCHAR *msg)
284{
285 safe_free(m_failureMessage);
286 m_failureMessage = (msg != NULL) ? _tcsdup(msg) : NULL;
287}
3929b1ca
VK
288
289
a2871ffc 290/**
291 * Set description
292 */
3929b1ca 293void ServerJob::setDescription(const TCHAR *description)
76b4edb5 294{
3929b1ca 295 safe_free(m_description);
76b4edb5 296 m_description = _tcsdup(description);
3929b1ca
VK
297}
298
299
a2871ffc 300/**
301 * Fill NXCP message with job's data
302 */
b368969c 303void ServerJob::fillMessage(NXCPMessage *msg)
3929b1ca 304{
b368969c
VK
305 msg->setField(VID_JOB_ID, m_id);
306 msg->setField(VID_USER_ID, m_userId);
307 msg->setField(VID_JOB_TYPE, m_type);
308 msg->setField(VID_OBJECT_ID, m_remoteNode);
309 msg->setField(VID_DESCRIPTION, CHECK_NULL_EX(m_description));
310 msg->setField(VID_JOB_STATUS, (WORD)m_status);
311 msg->setField(VID_JOB_PROGRESS, (WORD)m_progress);
8134d3a3 312 if (m_status == JOB_FAILED)
b368969c 313 msg->setField(VID_FAILURE_MESSAGE, (m_failureMessage != NULL) ? m_failureMessage : _T("Internal error"));
8134d3a3 314 else
b368969c 315 msg->setField(VID_FAILURE_MESSAGE, CHECK_NULL_EX(m_failureMessage));
3929b1ca 316}
878b4261 317
77c78cbf
VK
318/**
319 * Create record in job history table
320 */
878b4261
VK
321void ServerJob::createHistoryRecord()
322{
323 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
76b4edb5 324
325 DB_STATEMENT hStmt = DBPrepare(hdb,
878b4261
VK
326 _T("INSERT INTO job_history (id,time_created,time_started,time_finished,job_type,")
327 _T("description,node_id,user_id,status) VALUES (?,?,0,0,?,?,?,?,?)"));
328 if (hStmt != NULL)
329 {
330 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
967893bb 331 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
878b4261
VK
332 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_type, DB_BIND_STATIC);
333 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
334 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_remoteNode);
335 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_userId);
336 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_status);
337 DBExecute(hStmt);
338 DBFreeStatement(hStmt);
339 }
340 DBConnectionPoolReleaseConnection(hdb);
341}
342
77c78cbf
VK
343/**
344 * Update job history record
345 */
878b4261
VK
346void ServerJob::updateHistoryRecord(bool onStart)
347{
348 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
76b4edb5 349
350 DB_STATEMENT hStmt = DBPrepare(hdb,
351 onStart ?
352 _T("UPDATE job_history SET time_started=?,status=?,description=?,additional_info=? WHERE id=?") :
24dc5346 353 _T("UPDATE job_history SET time_finished=?,status=?,description=?,additional_info=?,failure_message=? WHERE id=?"));
76b4edb5 354
878b4261
VK
355 if (hStmt != NULL)
356 {
967893bb 357 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
878b4261 358 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_status);
24dc5346
VK
359 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
360 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, getAdditionalInfo(), DB_BIND_TRANSIENT);
878b4261
VK
361 if (onStart)
362 {
24dc5346 363 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_id);
878b4261
VK
364 }
365 else
366 {
24dc5346
VK
367 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_failureMessage), DB_BIND_STATIC);
368 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_id);
878b4261
VK
369 }
370 DBExecute(hStmt);
371 DBFreeStatement(hStmt);
372 }
373 DBConnectionPoolReleaseConnection(hdb);
374}
24dc5346 375
77c78cbf
VK
376/**
377 * Get additional info for logging
378 */
24dc5346
VK
379const TCHAR *ServerJob::getAdditionalInfo()
380{
381 return _T("");
382}
82deb2d7
TD
383
384/**
385 * Serializes job parameters into TCHAR line separated by ';'
386 */
c9b9393b 387TCHAR *ServerJob::serializeParameters()
82deb2d7
TD
388{
389 return _tcsdup(_T(""));
390}
391
392/**
393 * Schedules execution in 10 minutes
394 */
395void ServerJob::rescheduleExecution()
396{
397}
398
399
400
401/**
402 * Returns next job execution interval in minutes
403 * Each next execution time will be twce bigger than the previous one
404 */
405int ServerJob::getNextJobExecutionTime()
406{
84233487 407 return pow(2.0f, (4 - m_retryCount)) * JOB_RESCHEDULE_OFSET;
82deb2d7 408}