Added JobRetryCount server configuration parameter
[public/netxms.git] / src / server / core / job.cpp
CommitLineData
76b4edb5 1/*
ab621f39 2** NetXMS - Network Management System
509bb045 3** Copyright (C) 2003-2011 Victor Kirhenshtein
ab621f39
VK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: job.cpp
20**
21**/
22
23#include "nxcore.h"
24
25
a2871ffc 26/**
27 * Externals
28 */
967893bb 29void UnregisterJob(UINT32 jobId);
27de5dab
VK
30
31
a2871ffc 32/**
33 * Constructor
34 */
967893bb 35ServerJob::ServerJob(const TCHAR *type, const TCHAR *description, UINT32 node, UINT32 userId, bool createOnHold)
ab621f39 36{
24dc5346 37 m_id = CreateUniqueId(IDG_JOB);
0edd0ab0 38 m_userId = userId;
ab621f39
VK
39 m_type = _tcsdup(CHECK_NULL(type));
40 m_description = _tcsdup(CHECK_NULL(description));
509bb045 41 m_status = createOnHold ? JOB_ON_HOLD : JOB_PENDING;
3929b1ca
VK
42 m_lastStatusChange = time(NULL);
43 m_autoCancelDelay = 0;
ab621f39 44 m_remoteNode = node;
8134d3a3 45 m_resolvedObject = FindObjectById(m_remoteNode);
ab621f39
VK
46 m_progress = 0;
47 m_failureMessage = NULL;
48 m_owningQueue = NULL;
49 m_workerThread = INVALID_THREAD_HANDLE;
3929b1ca
VK
50 m_lastNotification = 0;
51 m_notificationLock = MutexCreate();
8134d3a3 52 m_blockNextJobsOnFailure = false;
878b4261 53 createHistoryRecord();
ab621f39
VK
54}
55
a2871ffc 56/**
57 * Destructor
58 */
ab621f39
VK
59ServerJob::~ServerJob()
60{
27de5dab
VK
61 UnregisterJob(m_id);
62
ab621f39
VK
63 ThreadJoin(m_workerThread);
64
65 safe_free(m_type);
66 safe_free(m_description);
712dd47d 67 safe_free(m_failureMessage);
3929b1ca
VK
68 MutexDestroy(m_notificationLock);
69}
70
71
a2871ffc 72/**
73 * Send notification to clients
74 */
3929b1ca
VK
75void ServerJob::sendNotification(ClientSession *session, void *arg)
76{
77 ServerJob *job = (ServerJob *)arg;
6b8e9f96 78 if (job->m_resolvedObject->checkAccessRights(session->getUserId(), OBJECT_ACCESS_READ))
d3a7cf4c 79 session->postMessage(&job->m_notificationMessage);
3929b1ca
VK
80}
81
82
a2871ffc 83/**
84 * Notify clients
85 */
3929b1ca
VK
86void ServerJob::notifyClients(bool isStatusChange)
87{
8134d3a3
VK
88 if (m_resolvedObject == NULL)
89 return;
90
3929b1ca 91 time_t t = time(NULL);
901a5a9b
VK
92 if (!isStatusChange && (t - m_lastNotification < 3))
93 return; // Don't send progress notifications often then every 3 seconds
94 m_lastNotification = t;
3929b1ca 95
c17f6cbc 96 MutexLock(m_notificationLock);
b368969c 97 m_notificationMessage.setCode(CMD_JOB_CHANGE_NOTIFICATION);
3929b1ca 98 fillMessage(&m_notificationMessage);
8134d3a3 99 EnumerateClientSessions(ServerJob::sendNotification, this);
3929b1ca
VK
100 MutexUnlock(m_notificationLock);
101}
102
103
a2871ffc 104/**
105 * Change status
106 */
3929b1ca
VK
107void ServerJob::changeStatus(ServerJobStatus newStatus)
108{
109 m_status = newStatus;
110 m_lastStatusChange = time(NULL);
111 notifyClients(true);
ab621f39
VK
112}
113
114
a2871ffc 115/**
116 * Set owning queue
117 */
ab621f39
VK
118void ServerJob::setOwningQueue(ServerJobQueue *queue)
119{
120 m_owningQueue = queue;
3929b1ca 121 notifyClients(true);
ab621f39
VK
122}
123
124
76b4edb5 125/**
126 * Update progress
127 */
ab621f39
VK
128void ServerJob::markProgress(int pctCompleted)
129{
130 if ((pctCompleted > m_progress) && (pctCompleted <= 100))
3929b1ca 131 {
ab621f39 132 m_progress = pctCompleted;
3929b1ca
VK
133 notifyClients(false);
134 }
ab621f39
VK
135}
136
a2871ffc 137/**
138 * Worker thread starter
139 */
ab621f39
VK
140THREAD_RESULT THREAD_CALL ServerJob::WorkerThreadStarter(void *arg)
141{
129a1ce0
VK
142 ServerJob *job = (ServerJob *)arg;
143 DbgPrintf(4, _T("Job %d started"), job->m_id);
878b4261 144 job->updateHistoryRecord(true);
ab621f39 145
129a1ce0
VK
146 if (job->run())
147 {
3929b1ca 148 job->changeStatus(JOB_COMPLETED);
129a1ce0 149 }
ab621f39 150 else
129a1ce0 151 {
878b4261
VK
152 if (job->m_status == JOB_CANCEL_PENDING)
153 job->changeStatus(JOB_CANCELLED);
154 else
901a5a9b 155 job->changeStatus(JOB_FAILED);
129a1ce0
VK
156 }
157 job->m_workerThread = INVALID_THREAD_HANDLE;
ab621f39 158
878b4261
VK
159 DbgPrintf(4, _T("Job %d finished, status=%s"), job->m_id, (job->m_status == JOB_COMPLETED) ? _T("COMPLETED") : ((job->m_status == JOB_CANCELLED) ? _T("CANCELLED") : _T("FAILED")));
160 job->updateHistoryRecord(false);
ab621f39 161
129a1ce0
VK
162 if (job->m_owningQueue != NULL)
163 job->m_owningQueue->jobCompleted(job);
ab621f39
VK
164 return THREAD_OK;
165}
166
a2871ffc 167/**
168 * Start job
169 */
ab621f39
VK
170void ServerJob::start()
171{
172 m_status = JOB_ACTIVE;
173 m_workerThread = ThreadCreateEx(WorkerThreadStarter, 0, this);
174}
175
a2871ffc 176/**
177 * Cancel job
178 */
ab621f39
VK
179bool ServerJob::cancel()
180{
f40831eb
VK
181 switch(m_status)
182 {
183 case JOB_COMPLETED:
f2665675 184 case JOB_CANCEL_PENDING:
f40831eb
VK
185 return false;
186 case JOB_ACTIVE:
f2665675
VK
187 if (!onCancel())
188 return false;
189 changeStatus(JOB_CANCEL_PENDING);
190 return true;
f40831eb 191 default:
3929b1ca 192 changeStatus(JOB_CANCELLED);
f40831eb
VK
193 return true;
194 }
ab621f39
VK
195}
196
197
a2871ffc 198/**
199 * Hold job
200 */
509bb045
VK
201bool ServerJob::hold()
202{
203 if (m_status == JOB_PENDING)
204 {
205 changeStatus(JOB_ON_HOLD);
206 return true;
207 }
208 return false;
209}
210
211
a2871ffc 212/**
213 * Unhold job
214 */
509bb045
VK
215bool ServerJob::unhold()
216{
217 if (m_status == JOB_ON_HOLD)
218 {
219 changeStatus(JOB_PENDING);
220 return true;
221 }
222 return false;
223}
224
225
a2871ffc 226/**
227 * Default run (empty)
228 */
ab621f39
VK
229bool ServerJob::run()
230{
231 return true;
232}
233
234
a2871ffc 235/**
236 * Default cancel handler
237 */
ab621f39
VK
238bool ServerJob::onCancel()
239{
240 return false;
241}
242
243
a2871ffc 244/**
245 * Set failure message
246 */
ab621f39
VK
247void ServerJob::setFailureMessage(const TCHAR *msg)
248{
249 safe_free(m_failureMessage);
250 m_failureMessage = (msg != NULL) ? _tcsdup(msg) : NULL;
251}
3929b1ca
VK
252
253
a2871ffc 254/**
255 * Set description
256 */
3929b1ca 257void ServerJob::setDescription(const TCHAR *description)
76b4edb5 258{
3929b1ca 259 safe_free(m_description);
76b4edb5 260 m_description = _tcsdup(description);
3929b1ca
VK
261}
262
263
a2871ffc 264/**
265 * Fill NXCP message with job's data
266 */
b368969c 267void ServerJob::fillMessage(NXCPMessage *msg)
3929b1ca 268{
b368969c
VK
269 msg->setField(VID_JOB_ID, m_id);
270 msg->setField(VID_USER_ID, m_userId);
271 msg->setField(VID_JOB_TYPE, m_type);
272 msg->setField(VID_OBJECT_ID, m_remoteNode);
273 msg->setField(VID_DESCRIPTION, CHECK_NULL_EX(m_description));
274 msg->setField(VID_JOB_STATUS, (WORD)m_status);
275 msg->setField(VID_JOB_PROGRESS, (WORD)m_progress);
8134d3a3 276 if (m_status == JOB_FAILED)
b368969c 277 msg->setField(VID_FAILURE_MESSAGE, (m_failureMessage != NULL) ? m_failureMessage : _T("Internal error"));
8134d3a3 278 else
b368969c 279 msg->setField(VID_FAILURE_MESSAGE, CHECK_NULL_EX(m_failureMessage));
3929b1ca 280}
878b4261 281
77c78cbf
VK
282/**
283 * Create record in job history table
284 */
878b4261
VK
285void ServerJob::createHistoryRecord()
286{
287 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
76b4edb5 288
289 DB_STATEMENT hStmt = DBPrepare(hdb,
878b4261
VK
290 _T("INSERT INTO job_history (id,time_created,time_started,time_finished,job_type,")
291 _T("description,node_id,user_id,status) VALUES (?,?,0,0,?,?,?,?,?)"));
292 if (hStmt != NULL)
293 {
294 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
967893bb 295 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
878b4261
VK
296 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_type, DB_BIND_STATIC);
297 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
298 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_remoteNode);
299 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_userId);
300 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_status);
301 DBExecute(hStmt);
302 DBFreeStatement(hStmt);
303 }
304 DBConnectionPoolReleaseConnection(hdb);
305}
306
77c78cbf
VK
307/**
308 * Update job history record
309 */
878b4261
VK
310void ServerJob::updateHistoryRecord(bool onStart)
311{
312 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
76b4edb5 313
314 DB_STATEMENT hStmt = DBPrepare(hdb,
315 onStart ?
316 _T("UPDATE job_history SET time_started=?,status=?,description=?,additional_info=? WHERE id=?") :
24dc5346 317 _T("UPDATE job_history SET time_finished=?,status=?,description=?,additional_info=?,failure_message=? WHERE id=?"));
76b4edb5 318
878b4261
VK
319 if (hStmt != NULL)
320 {
967893bb 321 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (UINT32)time(NULL));
878b4261 322 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_status);
24dc5346
VK
323 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_description), DB_BIND_STATIC);
324 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, getAdditionalInfo(), DB_BIND_TRANSIENT);
878b4261
VK
325 if (onStart)
326 {
24dc5346 327 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, m_id);
878b4261
VK
328 }
329 else
330 {
24dc5346
VK
331 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, CHECK_NULL_EX(m_failureMessage), DB_BIND_STATIC);
332 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, m_id);
878b4261
VK
333 }
334 DBExecute(hStmt);
335 DBFreeStatement(hStmt);
336 }
337 DBConnectionPoolReleaseConnection(hdb);
338}
24dc5346 339
77c78cbf
VK
340/**
341 * Get additional info for logging
342 */
24dc5346
VK
343const TCHAR *ServerJob::getAdditionalInfo()
344{
345 return _T("");
346}