fixed memory leak in alarms; fixed invalid memory access on server shutdown; updated...
[public/netxms.git] / src / server / core / schedule.cpp
CommitLineData
0a145c10 1/*
2** NetXMS - Network Management System
e49a868f 3** Copyright (C) 2003-2016 Raden Solutions
0a145c10 4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: schedule.cpp
20**
21**/
22
bf3cf439 23#include "nxcore.h"
0a145c10 24
ec13a467
VK
25/**
26 * Static fields
27 */
28static StringObjectMap<SchedulerCallback> s_callbacks(true);
29static ObjectArray<ScheduledTask> s_cronSchedules(5, 5, true);
30static ObjectArray<ScheduledTask> s_oneTimeSchedules(5, 5, true);
1d455218 31static CONDITION s_wakeupCondition = ConditionCreate(false);
ec13a467
VK
32static MUTEX s_cronScheduleLock = MutexCreate();
33static MUTEX s_oneTimeScheduleLock = MutexCreate();
34
0a145c10 35/**
36 * Scheduled task execution pool
37 */
c6e191d2
VK
38ThreadPool *g_schedulerThreadPool = NULL;
39
ec13a467
VK
40/**
41 * Create recurrent task object
42 */
43ScheduledTask::ScheduledTask(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
0a145c10 44{
45 m_id = id;
ec13a467 46 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskId));
0a145c10 47 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
48 m_params = _tcsdup(CHECK_NULL_EX(params));
49 m_executionTime = NEVER;
50 m_lastExecution = NEVER;
a44a910c 51 m_flags = flags;
fd72d846 52 m_owner = owner;
b3beb7f4 53 m_objectId = objectId;
0a145c10 54}
55
ec13a467
VK
56/**
57 * Create one-time execution task object
58 */
59ScheduledTask::ScheduledTask(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
0a145c10 60{
61 m_id = id;
ec13a467 62 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskId));
0a145c10 63 m_schedule = _tcsdup(_T(""));
64 m_params = _tcsdup(CHECK_NULL_EX(params));
65 m_executionTime = executionTime;
66 m_lastExecution = NEVER;
a44a910c 67 m_flags = flags;
fd72d846 68 m_owner = owner;
b3beb7f4 69 m_objectId = objectId;
0a145c10 70}
71
ec13a467
VK
72/**
73 * Create task object from database record
74 */
75ScheduledTask::ScheduledTask(DB_RESULT hResult, int row)
0a145c10 76{
77 m_id = DBGetFieldULong(hResult, row, 0);
ec13a467 78 m_taskHandlerId = DBGetField(hResult, row, 1, NULL, 0);
0a145c10 79 m_schedule = DBGetField(hResult, row, 2, NULL, 0);
80 m_params = DBGetField(hResult, row, 3, NULL, 0);
81 m_executionTime = DBGetFieldULong(hResult, row, 4);
82 m_lastExecution = DBGetFieldULong(hResult, row, 5);
83 m_flags = DBGetFieldULong(hResult, row, 6);
fd72d846 84 m_owner = DBGetFieldULong(hResult, row, 7);
b3beb7f4 85 m_objectId = DBGetFieldULong(hResult, row, 8);
0a145c10 86}
87
ec13a467
VK
88/**
89 * Destructor
90 */
91ScheduledTask::~ScheduledTask()
0a145c10 92{
82deb2d7
TD
93 safe_free(m_taskHandlerId);
94 safe_free(m_schedule);
95 safe_free(m_params);
ec13a467
VK
96}
97
98/**
99 * Update task
100 */
101void ScheduledTask::update(const TCHAR *taskHandlerId, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
102{
82deb2d7 103 safe_free(m_taskHandlerId);
ec13a467 104 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskHandlerId));
82deb2d7 105 safe_free(m_schedule);
ec13a467 106 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
82deb2d7 107 safe_free(m_params);
ec13a467 108 m_params = _tcsdup(CHECK_NULL_EX(params));
fd72d846 109 m_owner = owner;
b3beb7f4 110 m_objectId = objectId;
111 m_flags = flags;
0a145c10 112}
113
ec13a467
VK
114/**
115 * Update task
116 */
117void ScheduledTask::update(const TCHAR *taskHandlerId, time_t nextExecution, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
0a145c10 118{
ec13a467 119 free(m_taskHandlerId);
82deb2d7 120 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskHandlerId));
35ee2459 121 free(m_schedule);
122 m_schedule = _tcsdup(_T(""));
82deb2d7 123 free(m_params);
ec13a467 124 m_params = _tcsdup(CHECK_NULL_EX(params));
0a145c10 125 m_executionTime = nextExecution;
fd72d846 126 m_owner = owner;
b3beb7f4 127 m_objectId = objectId;
128 m_flags = flags;
0a145c10 129}
130
ec13a467
VK
131/**
132 * Save task to database
133 */
134void ScheduledTask::saveToDatabase(bool newObject)
0a145c10 135{
136 DB_HANDLE db = DBConnectionPoolAcquireConnection();
137 DB_STATEMENT hStmt;
138
139 if (newObject)
140 {
141 hStmt = DBPrepare(db,
c6e191d2 142 _T("INSERT INTO scheduled_tasks (taskId,schedule,params,execution_time,")
b3beb7f4 143 _T("last_execution_time,flags,owner,object_id,id) VALUES (?,?,?,?,?,?,?,?,?)"));
0a145c10 144 }
145 else
146 {
147 hStmt = DBPrepare(db,
c6e191d2 148 _T("UPDATE scheduled_tasks SET taskId=?,schedule=?,params=?,")
b3beb7f4 149 _T("execution_time=?,last_execution_time=?,flags=?,owner=?,object_id=? ")
0a145c10 150 _T("WHERE id=?"));
151 }
152
ec13a467 153 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, m_taskHandlerId, DB_BIND_STATIC);
0a145c10 154 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, m_schedule, DB_BIND_STATIC);
155 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_params, DB_BIND_STATIC);
156 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (UINT32)m_executionTime);
157 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (UINT32)m_lastExecution);
158 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_flags);
fd72d846 159 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, m_owner);
b3beb7f4 160 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, m_objectId);
161 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (LONG)m_id);
0a145c10 162
163 if (hStmt == NULL)
164 return;
165
166 DBExecute(hStmt);
167 DBFreeStatement(hStmt);
168 DBConnectionPoolReleaseConnection(db);
b3beb7f4 169 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
0a145c10 170}
171
ec13a467
VK
172/**
173 * Scheduled task comparator (used for task sorting)
174 */
175static int ScheduledTaskComparator(const void *e1, const void *e2)
176{
177 ScheduledTask * s1 = *((ScheduledTask**)e1);
178 ScheduledTask * s2 = *((ScheduledTask**)e2);
179
180 //Executed schedules schould go down
181 if(s1->checkFlag(SCHEDULED_TASK_EXECUTED) != s2->checkFlag(SCHEDULED_TASK_EXECUTED))
182 {
183 if(s1->checkFlag(SCHEDULED_TASK_EXECUTED))
184 {
185 return 1;
186 }
187 else
188 {
189 return -1;
190 }
191 }
192
193 //Schedules with execution time 0 should go down, others should be compared
194 if (s1->getExecutionTime() == s2->getExecutionTime())
195 {
196 return 0;
197 }
198
199 if (((s1->getExecutionTime() < s2->getExecutionTime()) && (s1->getExecutionTime() != 0)) || (s2->getExecutionTime() == 0))
200 {
201 return -1;
202 }
203 else
204 {
205 return 1;
206 }
207}
208
209/**
210 * Run scheduled task
211 */
212void ScheduledTask::run(SchedulerCallback *callback)
0a145c10 213{
c6e191d2 214 bool oneTimeSchedule = !_tcscmp(m_schedule, _T(""));
4ede0acd 215
ec13a467 216 setFlag(SCHEDULED_TASK_RUNNING);
b3beb7f4 217 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
ec13a467 218 ScheduledTaskParameters param(m_params, m_owner, m_objectId);
4fe83ebd 219 callback->m_func(&param);
0a145c10 220 setLastExecutionTime(time(NULL));
4ede0acd 221
c6e191d2 222 if (oneTimeSchedule)
4ede0acd 223 {
224 MutexLock(s_oneTimeScheduleLock);
4ede0acd 225 }
226
ec13a467
VK
227 removeFlag(SCHEDULED_TASK_RUNNING);
228 setFlag(SCHEDULED_TASK_EXECUTED);
0a145c10 229 saveToDatabase(false);
c6e191d2 230 if (oneTimeSchedule)
4ede0acd 231 {
ec13a467 232 s_oneTimeSchedules.sort(ScheduledTaskComparator);
4ede0acd 233 MutexUnlock(s_oneTimeScheduleLock);
234 }
fd72d846 235
6fbcf3b4 236 if (oneTimeSchedule && checkFlag(SCHEDULED_TASK_SYSTEM))
ec13a467 237 RemoveScheduledTask(m_id, 0, SYSTEM_ACCESS_FULL);
0a145c10 238}
239
ec13a467
VK
240/**
241 * Fill NXCP message with task data
242 */
243void ScheduledTask::fillMessage(NXCPMessage *msg)
4ede0acd 244{
bf3cf439 245 msg->setField(VID_SCHEDULED_TASK_ID, m_id);
ec13a467 246 msg->setField(VID_TASK_HANDLER, m_taskHandlerId);
4ede0acd 247 msg->setField(VID_SCHEDULE, m_schedule);
248 msg->setField(VID_PARAMETER, m_params);
249 msg->setFieldFromTime(VID_EXECUTION_TIME, m_executionTime);
250 msg->setFieldFromTime(VID_LAST_EXECUTION_TIME, m_lastExecution);
251 msg->setField(VID_FLAGS, (UINT32)m_flags);
fd72d846 252 msg->setField(VID_OWNER, m_owner);
b3beb7f4 253 msg->setField(VID_OBJECT_ID, m_objectId);
4ede0acd 254}
0a145c10 255
ec13a467
VK
256/**
257 * Fill NXCP message with task data
258 */
259void ScheduledTask::fillMessage(NXCPMessage *msg, UINT32 base)
a44a910c 260{
261 msg->setField(base, m_id);
ec13a467 262 msg->setField(base + 1, m_taskHandlerId);
bf3cf439
VK
263 msg->setField(base + 2, m_schedule);
264 msg->setField(base + 3, m_params);
265 msg->setFieldFromTime(base + 4, m_executionTime);
266 msg->setFieldFromTime(base + 5, m_lastExecution);
267 msg->setField(base + 6, m_flags);
268 msg->setField(base + 7, m_owner);
b3beb7f4 269 msg->setField(base + 8, m_objectId);
fd72d846 270}
271
bf3cf439
VK
272/**
273 * Check if user can access this scheduled task
274 */
381e0ed6 275bool ScheduledTask::canAccess(UINT32 userId, UINT64 systemAccess)
fd72d846 276{
bf3cf439 277 if (systemAccess & SYSTEM_ACCESS_ALL_SCHEDULED_TASKS)
fd72d846 278 return true;
fd72d846 279
e49a868f 280 if(systemAccess & SYSTEM_ACCESS_USER_SCHEDULED_TASKS)
ec13a467 281 return !checkFlag(SCHEDULED_TASK_SYSTEM);
fd72d846 282
bf3cf439 283 if (systemAccess & SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)
381e0ed6 284 return userId == m_owner;
bf3cf439
VK
285
286 return false;
a44a910c 287}
288
4ede0acd 289/**
290 * Function that adds to list task handler function
291 */
3e71685f 292void RegisterSchedulerTaskHandler(const TCHAR *id, scheduled_action_executor exec, UINT64 accessRight)
0a145c10 293{
ec13a467 294 s_callbacks.set(id, new SchedulerCallback(exec, accessRight));
c6e191d2 295 DbgPrintf(6, _T("Registered scheduler task %s"), id);
0a145c10 296}
297
4ede0acd 298/**
299 * Scheduled task creation function
300 */
ec13a467 301UINT32 AddScheduledTask(const TCHAR *task, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemRights, UINT32 flags)
0a145c10 302{
e49a868f 303 if ((systemRights & (SYSTEM_ACCESS_ALL_SCHEDULED_TASKS | SYSTEM_ACCESS_USER_SCHEDULED_TASKS | SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)) == 0)
fd72d846 304 return RCC_ACCESS_DENIED;
c6e191d2 305 DbgPrintf(7, _T("AddSchedule: Add cron schedule %s, %s, %s"), task, schedule, params);
0a145c10 306 MutexLock(s_cronScheduleLock);
ec13a467 307 ScheduledTask *sh = new ScheduledTask(CreateUniqueId(IDG_SCHEDULED_TASK), task, schedule, params, owner, objectId, flags);
0a145c10 308 sh->saveToDatabase(true);
309 s_cronSchedules.add(sh);
310 MutexUnlock(s_cronScheduleLock);
fd72d846 311 return RCC_SUCCESS;
0a145c10 312}
313
4ede0acd 314/**
c6e191d2 315 * One time schedule creation function
4ede0acd 316 */
ec13a467 317UINT32 AddOneTimeScheduledTask(const TCHAR *task, time_t nextExecutionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemRights, UINT32 flags)
0a145c10 318{
e49a868f 319 if ((systemRights & (SYSTEM_ACCESS_ALL_SCHEDULED_TASKS | SYSTEM_ACCESS_USER_SCHEDULED_TASKS | SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)) == 0)
fd72d846 320 return RCC_ACCESS_DENIED;
c6e191d2 321 DbgPrintf(7, _T("AddOneTimeAction: Add one time schedule %s, %d, %s"), task, nextExecutionTime, params);
0a145c10 322 MutexLock(s_oneTimeScheduleLock);
ec13a467 323 ScheduledTask *sh = new ScheduledTask(CreateUniqueId(IDG_SCHEDULED_TASK), task, nextExecutionTime, params, owner, objectId, flags);
0a145c10 324 sh->saveToDatabase(true);
325 s_oneTimeSchedules.add(sh);
ec13a467 326 s_oneTimeSchedules.sort(ScheduledTaskComparator);
0a145c10 327 MutexUnlock(s_oneTimeScheduleLock);
1d455218 328 ConditionSet(s_wakeupCondition);
fd72d846 329 return RCC_SUCCESS;
0a145c10 330}
331
4ede0acd 332/**
333 * Scheduled actionUpdate
334 */
ec13a467 335UINT32 UpdateScheduledTask(int id, const TCHAR *task, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemAccessRights, UINT32 flags)
0a145c10 336{
c6e191d2 337 DbgPrintf(7, _T("UpdateSchedule: update cron schedule %d, %s, %s, %s"), id, task, schedule, params);
0a145c10 338 MutexLock(s_cronScheduleLock);
fd72d846 339 UINT32 rcc = RCC_SUCCESS;
35ee2459 340 bool found = false;
0a145c10 341 for (int i = 0; i < s_cronSchedules.size(); i++)
342 {
6e36577c 343 if (s_cronSchedules.get(i)->getId() == id)
0a145c10 344 {
6fbcf3b4 345 if (!s_cronSchedules.get(i)->canAccess(owner, systemAccessRights))
fd72d846 346 {
347 rcc = RCC_ACCESS_DENIED;
348 break;
349 }
b3beb7f4 350 s_cronSchedules.get(i)->update(task, schedule, params, owner, objectId, flags);
0a145c10 351 s_cronSchedules.get(i)->saveToDatabase(false);
35ee2459 352 found = true;
0a145c10 353 break;
354 }
355 }
356 MutexUnlock(s_cronScheduleLock);
35ee2459 357
7833a69f 358 if (!found)
35ee2459 359 {
360 //check in different que and if exists - remove from one and add to another
361 MutexLock(s_oneTimeScheduleLock);
362 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
363 {
364 if (s_oneTimeSchedules.get(i)->getId() == id)
365 {
366 if(!s_oneTimeSchedules.get(i)->canAccess(owner, systemAccessRights))
367 {
368 rcc = RCC_ACCESS_DENIED;
369 break;
370 }
371 ScheduledTask *st = s_oneTimeSchedules.get(i);
372 s_oneTimeSchedules.unlink(i);
373 s_oneTimeSchedules.sort(ScheduledTaskComparator);
374 st->update(task, schedule, params, owner, objectId, flags);
375 st->saveToDatabase(false);
376
377 MutexLock(s_cronScheduleLock);
378 s_cronSchedules.add(st);
379 MutexUnlock(s_cronScheduleLock);
380
35ee2459 381 break;
382 }
383 }
384 MutexUnlock(s_oneTimeScheduleLock);
385 }
386
fd72d846 387 return rcc;
0a145c10 388}
389
4ede0acd 390/**
391 * One time action update
392 */
ec13a467 393UINT32 UpdateOneTimeScheduledTask(int id, const TCHAR *task, time_t nextExecutionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemAccessRights, UINT32 flags)
0a145c10 394{
c6e191d2 395 DbgPrintf(7, _T("UpdateOneTimeAction: update one time schedule %d, %s, %d, %s"), id, task, nextExecutionTime, params);
6e36577c 396 bool found = false;
0a145c10 397 MutexLock(s_oneTimeScheduleLock);
fd72d846 398 UINT32 rcc = RCC_SUCCESS;
0a145c10 399 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
400 {
401 if (s_oneTimeSchedules.get(i)->getId() == id)
402 {
fd72d846 403 if(!s_oneTimeSchedules.get(i)->canAccess(owner, systemAccessRights))
404 {
405 rcc = RCC_ACCESS_DENIED;
406 break;
407 }
b3beb7f4 408 s_oneTimeSchedules.get(i)->update(task, nextExecutionTime, params, owner, objectId, flags);
0a145c10 409 s_oneTimeSchedules.get(i)->saveToDatabase(false);
ec13a467 410 s_oneTimeSchedules.sort(ScheduledTaskComparator);
0a145c10 411 found = true;
412 break;
413 }
414 }
415 MutexUnlock(s_oneTimeScheduleLock);
416
6fbcf3b4 417 if (!found)
35ee2459 418 {
6fbcf3b4 419 //check in different queue and if exists - remove from one and add to another
35ee2459 420 MutexLock(s_cronScheduleLock);
421 for (int i = 0; i < s_cronSchedules.size(); i++)
422 {
6e36577c 423 if (s_cronSchedules.get(i)->getId() == id)
35ee2459 424 {
6e36577c 425 if(!s_cronSchedules.get(i)->canAccess(owner, systemAccessRights))
35ee2459 426 {
427 rcc = RCC_ACCESS_DENIED;
428 break;
429 }
430 ScheduledTask *st = s_cronSchedules.get(i);
431 s_cronSchedules.unlink(i);
432 st->update(task, nextExecutionTime, params, owner, objectId, flags);
433 st->saveToDatabase(false);
434
435 MutexLock(s_oneTimeScheduleLock);
436 s_oneTimeSchedules.add(st);
437 s_oneTimeSchedules.sort(ScheduledTaskComparator);
438 MutexUnlock(s_oneTimeScheduleLock);
439
440 found = true;
441 break;
442 }
443 }
444 MutexUnlock(s_cronScheduleLock);
445 }
446
0a145c10 447 if(found)
1d455218 448 ConditionSet(s_wakeupCondition);
fd72d846 449 return rcc;
0a145c10 450}
451
4ede0acd 452/**
c6e191d2 453 * Removes scheduled task from database by id
4ede0acd 454 */
c6e191d2 455static void DeleteScheduledTaskFromDB(UINT32 id)
0a145c10 456{
457 DB_HANDLE db = DBConnectionPoolAcquireConnection();
458 TCHAR query[256];
b3beb7f4 459 _sntprintf(query, 256, _T("DELETE FROM scheduled_tasks WHERE id = %d"), id);
0a145c10 460 DBQuery(db, query);
0a145c10 461 DBConnectionPoolReleaseConnection(db);
b3beb7f4 462 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
0a145c10 463}
464
4ede0acd 465/**
ec13a467 466 * Removes scheduled task by id
4ede0acd 467 */
ec13a467 468UINT32 RemoveScheduledTask(UINT32 id, UINT32 user, UINT64 systemRights)
0a145c10 469{
c6e191d2 470 DbgPrintf(7, _T("RemoveSchedule: schedule(%d) removed"), id);
0a145c10 471 bool found = false;
fd72d846 472 UINT32 rcc = RCC_SUCCESS;
0a145c10 473
474 MutexLock(s_cronScheduleLock);
475 for (int i = 0; i < s_cronSchedules.size(); i++)
476 {
477 if (s_cronSchedules.get(i)->getId() == id)
478 {
b3beb7f4 479 if(!s_cronSchedules.get(i)->canAccess(user, systemRights))
fd72d846 480 {
481 rcc = RCC_ACCESS_DENIED;
482 break;
483 }
0a145c10 484 s_cronSchedules.remove(i);
485 found = true;
486 break;
487 }
488 }
489 MutexUnlock(s_cronScheduleLock);
490
491 if(found)
492 {
c6e191d2 493 DeleteScheduledTaskFromDB(id);
fd72d846 494 return rcc;
0a145c10 495 }
496
497 MutexLock(s_oneTimeScheduleLock);
498 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
499 {
500 if (s_oneTimeSchedules.get(i)->getId() == id)
501 {
b3beb7f4 502 if(!s_cronSchedules.get(i)->canAccess(user, systemRights))
fd72d846 503 {
504 rcc = RCC_ACCESS_DENIED;
505 break;
506 }
0a145c10 507 s_oneTimeSchedules.remove(i);
ec13a467 508 s_oneTimeSchedules.sort(ScheduledTaskComparator);
0a145c10 509 found = true;
510 break;
511 }
512 }
513 MutexUnlock(s_oneTimeScheduleLock);
514
0a145c10 515 if (found)
516 {
1d455218 517 ConditionSet(s_wakeupCondition);
c6e191d2 518 DeleteScheduledTaskFromDB(id);
0a145c10 519 }
fd72d846 520 return rcc;
0a145c10 521}
522
4ede0acd 523/**
ec13a467 524 * Fills message with scheduled tasks list
4ede0acd 525 */
381e0ed6 526void GetSheduledTasks(NXCPMessage *msg, UINT32 userId, UINT64 systemRights)
4ede0acd 527{
c6e191d2 528 int scheduleCount = 0;
4ede0acd 529 int base = VID_SCHEDULE_LIST_BASE;
530
531 MutexLock(s_oneTimeScheduleLock);
4744022d 532 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
4ede0acd 533 {
381e0ed6 534 if (s_oneTimeSchedules.get(i)->canAccess(userId, systemRights))
4744022d 535 {
fd72d846 536 s_oneTimeSchedules.get(i)->fillMessage(msg, base);
4744022d 537 scheduleCount++;
381e0ed6 538 base += 10;
4744022d 539 }
4ede0acd 540 }
4ede0acd 541 MutexUnlock(s_oneTimeScheduleLock);
542
543 MutexLock(s_cronScheduleLock);
4744022d 544 for(int i = 0; i < s_cronSchedules.size(); i++)
4ede0acd 545 {
381e0ed6 546 if (s_cronSchedules.get(i)->canAccess(userId, systemRights))
4744022d 547 {
fd72d846 548 s_cronSchedules.get(i)->fillMessage(msg, base);
4744022d 549 scheduleCount++;
381e0ed6 550 base += 10;
4744022d 551 }
4ede0acd 552 }
4ede0acd 553 MutexUnlock(s_cronScheduleLock);
554
c6e191d2 555 msg->setField(VID_SCHEDULE_COUNT, scheduleCount);
4ede0acd 556}
557
a44a910c 558/**
ec13a467 559 * Fills message with task handlers list
a44a910c 560 */
ec13a467 561void GetSchedulerTaskHandlers(NXCPMessage *msg, UINT64 accessRights)
a44a910c 562{
4744022d 563 UINT32 base = VID_CALLBACK_BASE;
564 int count = 0;
a44a910c 565
566 StringList *keyList = s_callbacks.keys();
4744022d 567 for(int i = 0; i < keyList->size(); i++)
fd72d846 568 {
569 if(accessRights & s_callbacks.get(keyList->get(i))->m_accessRight)
4744022d 570 {
fd72d846 571 msg->setField(base, keyList->get(i));
4744022d 572 count++;
573 base++;
574 }
fd72d846 575 }
4744022d 576 delete keyList;
577 msg->setField(VID_CALLBACK_COUNT, (UINT32)count);
a44a910c 578}
579
580/**
ec13a467 581 * Creates scheduled task from message
a44a910c 582 */
ec13a467 583UINT32 CreateScehduledTaskFromMsg(NXCPMessage *request, UINT32 owner, UINT64 systemAccessRights)
a44a910c 584{
bf3cf439 585 TCHAR *taskId = request->getFieldAsString(VID_TASK_HANDLER);
a44a910c 586 TCHAR *schedule = NULL;
587 time_t nextExecutionTime = 0;
588 TCHAR *params = request->getFieldAsString(VID_PARAMETER);
589 int flags = request->getFieldAsInt32(VID_FLAGS);
b3beb7f4 590 int objectId = request->getFieldAsInt32(VID_OBJECT_ID);
fd72d846 591 UINT32 result;
6fbcf3b4 592 if (request->isFieldExist(VID_SCHEDULE))
a44a910c 593 {
594 schedule = request->getFieldAsString(VID_SCHEDULE);
ec13a467 595 result = AddScheduledTask(taskId, schedule, params, owner, objectId, systemAccessRights, flags);
a44a910c 596 }
597 else
598 {
599 nextExecutionTime = request->getFieldAsTime(VID_EXECUTION_TIME);
ec13a467 600 result = AddOneTimeScheduledTask(taskId, nextExecutionTime, params, owner, objectId, systemAccessRights, flags);
a44a910c 601 }
6fbcf3b4
VK
602 free(taskId);
603 free(schedule);
604 free(params);
fd72d846 605 return result;
a44a910c 606}
607
608/**
ec13a467 609 * Update scheduled task from message
a44a910c 610 */
ec13a467 611UINT32 UpdateScheduledTaskFromMsg(NXCPMessage *request, UINT32 owner, UINT64 systemAccessRights)
a44a910c 612{
bf3cf439
VK
613 UINT32 id = request->getFieldAsInt32(VID_SCHEDULED_TASK_ID);
614 TCHAR *taskId = request->getFieldAsString(VID_TASK_HANDLER);
a44a910c 615 TCHAR *schedule = NULL;
616 time_t nextExecutionTime = 0;
617 TCHAR *params = request->getFieldAsString(VID_PARAMETER);
b3beb7f4 618 UINT32 flags = request->getFieldAsInt32(VID_FLAGS);
619 UINT32 objectId = request->getFieldAsInt32(VID_OBJECT_ID);
fd72d846 620 UINT32 rcc;
a44a910c 621 if(request->isFieldExist(VID_SCHEDULE))
622 {
623 schedule = request->getFieldAsString(VID_SCHEDULE);
ec13a467 624 rcc = UpdateScheduledTask(id, taskId, schedule, params, owner, objectId, systemAccessRights, flags);
a44a910c 625 }
626 else
627 {
628 nextExecutionTime = request->getFieldAsTime(VID_EXECUTION_TIME);
ec13a467 629 rcc = UpdateOneTimeScheduledTask(id, taskId, nextExecutionTime, params, owner, objectId, systemAccessRights, flags);
a44a910c 630 }
631 safe_free(taskId);
632 safe_free(schedule);
633 safe_free(params);
fd72d846 634 return rcc;
a44a910c 635}
636
4ede0acd 637/**
c6e191d2 638 * Thread that checks one time schedules and executes them
4ede0acd 639 */
0a145c10 640static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg)
641{
642 int sleepTime = 1;
643 DbgPrintf(7, _T("OneTimeEventThread: started"));
644 while(true)
645 {
1d455218 646 ConditionWait(s_wakeupCondition, sleepTime);
61955a69 647 if(g_flags & AF_SHUTDOWN)
0a145c10 648 break;
649
0a145c10 650 time_t now = time(NULL);
651 struct tm currLocal;
652 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
653
654 MutexLock(s_oneTimeScheduleLock);
655 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
656 {
ec13a467
VK
657 ScheduledTask *sh = s_oneTimeSchedules.get(i);
658 if (sh->checkFlag(SCHEDULED_TASK_DISABLED))
ca6c1f0c 659 continue;
660
ec13a467 661 if (sh->checkFlag(SCHEDULED_TASK_EXECUTED))
b3beb7f4 662 break;
663
ec13a467 664 SchedulerCallback *callback = s_callbacks.get(sh->getTaskHandlerId());
0a145c10 665 if(callback == NULL)
666 {
ec13a467 667 DbgPrintf(3, _T("OneTimeEventThread: One time execution function with taskId=\'%s\' not found"), sh->getTaskHandlerId());
0a145c10 668 continue;
669 }
670
ec13a467 671 if (sh->isRunning())
0a145c10 672 continue;
673
674 //execute all timmers that is expected to execute now
675 if(sh->getExecutionTime() != 0 && now >= sh->getExecutionTime())
676 {
c6e191d2 677 DbgPrintf(7, _T("OneTimeEventThread: run schedule id=\'%d\', execution time =\'%d\'"), sh->getId(), sh->getExecutionTime());
ec13a467 678 ThreadPoolExecute(g_schedulerThreadPool, sh, &ScheduledTask::run, callback);
0a145c10 679 }
680 else
681 {
682 break;
683 }
684 }
685
686 sleepTime = INFINITE;
687
688 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
689 {
ec13a467 690 ScheduledTask *sh = s_oneTimeSchedules.get(i);
0a145c10 691 if(sh->getExecutionTime() == NEVER)
692 break;
693
694 if(now >= sh->getExecutionTime())
695 continue;
696
40b2c2b3 697 sleepTime = (int)(sh->getExecutionTime() - now);
0a145c10 698 sleepTime = sleepTime < 0 ? 0 : sleepTime * 1000;
699 break;
700 }
701
702 DbgPrintf(7, _T("OneTimeEventThread: thread will sleep for %d"), sleepTime);
703 MutexUnlock(s_oneTimeScheduleLock);
704 }
705 DbgPrintf(3, _T("OneTimeEventThread: stopped"));
4fe83ebd 706 return THREAD_OK;
0a145c10 707}
708
4ede0acd 709/**
c6e191d2 710 * Checks if it is time to execute cron schedule
4ede0acd 711 */
ec13a467 712static bool IsTimeToRun(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp)
0a145c10 713{
714 TCHAR value[256];
715
716 // Minute
717 const TCHAR *curr = ExtractWord(schedule, value);
718 if (!MatchScheduleElement(value, currTime->tm_min, 59, NULL))
719 return false;
720
721 // Hour
722 curr = ExtractWord(curr, value);
723 if (!MatchScheduleElement(value, currTime->tm_hour, 23, NULL))
724 return false;
725
726 // Day of month
727 curr = ExtractWord(curr, value);
728 if (!MatchScheduleElement(value, currTime->tm_mday, GetLastMonthDay(currTime), NULL))
729 return false;
730
731 // Month
732 curr = ExtractWord(curr, value);
733 if (!MatchScheduleElement(value, currTime->tm_mon + 1, 12, NULL))
734 return false;
735
736 // Day of week
7833a69f 737 ExtractWord(curr, value);
0a145c10 738 for(int i = 0; value[i] != 0; i++)
739 if (value[i] == _T('7'))
740 value[i] = _T('0');
741 if (!MatchScheduleElement(value, currTime->tm_wday, 7, currTime))
742 return false;
743
744 return true;
745}
ec13a467
VK
746
747/**
748 * Wakes up for execution of one time schedule or for recalculation new wake up timestamp
749 */
750static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg)
751{
752 DbgPrintf(3, _T("CronCheckThread: started"));
753 do {
754 time_t now = time(NULL);
755 struct tm currLocal;
756 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
757
758 MutexLock(s_cronScheduleLock);
759 for(int i = 0; i < s_cronSchedules.size(); i++)
760 {
761 ScheduledTask *sh = s_cronSchedules.get(i);
6fbcf3b4 762 if (sh->checkFlag(SCHEDULED_TASK_DISABLED))
ec13a467
VK
763 continue;
764 SchedulerCallback *callback = s_callbacks.get(sh->getTaskHandlerId());
6fbcf3b4 765 if (callback == NULL)
ec13a467
VK
766 {
767 DbgPrintf(3, _T("CronCheckThread: Cron execution function with taskId=\'%s\' not found"), sh->getTaskHandlerId());
768 continue;
769 }
770 if (IsTimeToRun(&currLocal, sh->getSchedule(), now))
771 {
772 DbgPrintf(7, _T("CronCheckThread: run schedule id=\'%d\', schedule=\'%s\'"), sh->getId(), sh->getSchedule());
773 ThreadPoolExecute(g_schedulerThreadPool, sh, &ScheduledTask::run, callback);
774 }
775 }
776 MutexUnlock(s_cronScheduleLock);
777 } while(!SleepAndCheckForShutdown(60)); //sleep 1 minute
778 DbgPrintf(3, _T("CronCheckThread: stopped"));
779 return THREAD_OK;
780}
781
1d455218
VK
782/**
783 * Scheduler thread handles
784 */
785static THREAD s_oneTimeEventThread = INVALID_THREAD_HANDLE;
786static THREAD s_cronSchedulerThread = INVALID_THREAD_HANDLE;
787
ec13a467
VK
788/**
789 * Initialize task scheduler - read all schedules form database and start threads for one time and crom schedules
790 */
791void InitializeTaskScheduler()
792{
793 g_schedulerThreadPool = ThreadPoolCreate(1, 64, _T("SCHEDULER"));
794 //read from DB configuration
795 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
796 DB_RESULT hResult = DBSelect(hdb, _T("SELECT id,taskId,schedule,params,execution_time,last_execution_time,flags,owner,object_id FROM scheduled_tasks"));
797 if (hResult != NULL)
798 {
799 int count = DBGetNumRows(hResult);
800 for(int i = 0; i < count; i++)
801 {
802 ScheduledTask *sh = new ScheduledTask(hResult, i);
803 if(!_tcscmp(sh->getSchedule(), _T("")))
804 {
805 DbgPrintf(7, _T("InitializeTaskScheduler: Add one time schedule %d, %d"), sh->getId(), sh->getExecutionTime());
806 s_oneTimeSchedules.add(sh);
807 }
808 else
809 {
810 DbgPrintf(7, _T("InitializeTaskScheduler: Add cron schedule %d, %s"), sh->getId(), sh->getSchedule());
811 s_cronSchedules.add(sh);
812 }
813 }
814 DBFreeResult(hResult);
815 }
816 DBConnectionPoolReleaseConnection(hdb);
817 s_oneTimeSchedules.sort(ScheduledTaskComparator);
1d455218
VK
818
819 s_oneTimeEventThread = ThreadCreateEx(OneTimeEventThread, 0, NULL);
820 s_cronSchedulerThread = ThreadCreateEx(CronCheckThread, 0, NULL);
ec13a467
VK
821}
822
823/**
824 * Stop all scheduler threads and free all memory
825 */
826void CloseTaskScheduler()
827{
1d455218
VK
828 ConditionSet(s_wakeupCondition);
829 ThreadJoin(s_oneTimeEventThread);
830 ThreadJoin(s_cronSchedulerThread);
831 ConditionDestroy(s_wakeupCondition);
ec13a467
VK
832 ThreadPoolDestroy(g_schedulerThreadPool);
833 MutexDestroy(s_cronScheduleLock);
834 MutexDestroy(s_oneTimeScheduleLock);
835}