acdac0a571a0121cabd1ad12fbe5ea8857838828
[public/netxms.git] / src / server / core / schedule.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Raden Solutions
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: schedule.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Static fields
27 */
28 static StringObjectMap<SchedulerCallback> s_callbacks(true);
29 static ObjectArray<ScheduledTask> s_cronSchedules(5, 5, true);
30 static ObjectArray<ScheduledTask> s_oneTimeSchedules(5, 5, true);
31 static CONDITION s_cond = ConditionCreate(false);
32 static MUTEX s_cronScheduleLock = MutexCreate();
33 static MUTEX s_oneTimeScheduleLock = MutexCreate();
34
35 /**
36 * Scheduled task execution pool
37 */
38 ThreadPool *g_schedulerThreadPool = NULL;
39
40 /**
41 * Create recurrent task object
42 */
43 ScheduledTask::ScheduledTask(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
44 {
45 m_id = id;
46 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskId));
47 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
48 m_params = _tcsdup(CHECK_NULL_EX(params));
49 m_executionTime = NEVER;
50 m_lastExecution = NEVER;
51 m_flags = flags;
52 m_owner = owner;
53 m_objectId = objectId;
54 }
55
56 /**
57 * Create one-time execution task object
58 */
59 ScheduledTask::ScheduledTask(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
60 {
61 m_id = id;
62 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskId));
63 m_schedule = _tcsdup(_T(""));
64 m_params = _tcsdup(CHECK_NULL_EX(params));
65 m_executionTime = executionTime;
66 m_lastExecution = NEVER;
67 m_flags = flags;
68 m_owner = owner;
69 m_objectId = objectId;
70 }
71
72 /**
73 * Create task object from database record
74 */
75 ScheduledTask::ScheduledTask(DB_RESULT hResult, int row)
76 {
77 m_id = DBGetFieldULong(hResult, row, 0);
78 m_taskHandlerId = DBGetField(hResult, row, 1, NULL, 0);
79 m_schedule = DBGetField(hResult, row, 2, NULL, 0);
80 m_params = DBGetField(hResult, row, 3, NULL, 0);
81 m_executionTime = DBGetFieldULong(hResult, row, 4);
82 m_lastExecution = DBGetFieldULong(hResult, row, 5);
83 m_flags = DBGetFieldULong(hResult, row, 6);
84 m_owner = DBGetFieldULong(hResult, row, 7);
85 m_objectId = DBGetFieldULong(hResult, row, 8);
86 }
87
88 /**
89 * Destructor
90 */
91 ScheduledTask::~ScheduledTask()
92 {
93 safe_free(m_taskHandlerId);
94 safe_free(m_schedule);
95 safe_free(m_params);
96 }
97
98 /**
99 * Update task
100 */
101 void ScheduledTask::update(const TCHAR *taskHandlerId, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
102 {
103 safe_free(m_taskHandlerId);
104 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskHandlerId));
105 safe_free(m_schedule);
106 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
107 safe_free(m_params);
108 m_params = _tcsdup(CHECK_NULL_EX(params));
109 m_owner = owner;
110 m_objectId = objectId;
111 m_flags = flags;
112 }
113
114 /**
115 * Update task
116 */
117 void ScheduledTask::update(const TCHAR *taskHandlerId, time_t nextExecution, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT32 flags)
118 {
119 free(m_taskHandlerId);
120 m_taskHandlerId = _tcsdup(CHECK_NULL_EX(taskHandlerId));
121 free(m_schedule);
122 m_schedule = _tcsdup(_T(""));
123 free(m_params);
124 m_params = _tcsdup(CHECK_NULL_EX(params));
125 m_executionTime = nextExecution;
126 m_owner = owner;
127 m_objectId = objectId;
128 m_flags = flags;
129 }
130
131 /**
132 * Save task to database
133 */
134 void ScheduledTask::saveToDatabase(bool newObject)
135 {
136 DB_HANDLE db = DBConnectionPoolAcquireConnection();
137 DB_STATEMENT hStmt;
138
139 if (newObject)
140 {
141 hStmt = DBPrepare(db,
142 _T("INSERT INTO scheduled_tasks (taskId,schedule,params,execution_time,")
143 _T("last_execution_time,flags,owner,object_id,id) VALUES (?,?,?,?,?,?,?,?,?)"));
144 }
145 else
146 {
147 hStmt = DBPrepare(db,
148 _T("UPDATE scheduled_tasks SET taskId=?,schedule=?,params=?,")
149 _T("execution_time=?,last_execution_time=?,flags=?,owner=?,object_id=? ")
150 _T("WHERE id=?"));
151 }
152
153 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, m_taskHandlerId, DB_BIND_STATIC);
154 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, m_schedule, DB_BIND_STATIC);
155 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_params, DB_BIND_STATIC);
156 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (UINT32)m_executionTime);
157 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (UINT32)m_lastExecution);
158 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_flags);
159 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, m_owner);
160 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, m_objectId);
161 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (LONG)m_id);
162
163 if (hStmt == NULL)
164 return;
165
166 DBExecute(hStmt);
167 DBFreeStatement(hStmt);
168 DBConnectionPoolReleaseConnection(db);
169 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
170 }
171
172 /**
173 * Scheduled task comparator (used for task sorting)
174 */
175 static int ScheduledTaskComparator(const void *e1, const void *e2)
176 {
177 ScheduledTask * s1 = *((ScheduledTask**)e1);
178 ScheduledTask * s2 = *((ScheduledTask**)e2);
179
180 //Executed schedules schould go down
181 if(s1->checkFlag(SCHEDULED_TASK_EXECUTED) != s2->checkFlag(SCHEDULED_TASK_EXECUTED))
182 {
183 if(s1->checkFlag(SCHEDULED_TASK_EXECUTED))
184 {
185 return 1;
186 }
187 else
188 {
189 return -1;
190 }
191 }
192
193 //Schedules with execution time 0 should go down, others should be compared
194 if (s1->getExecutionTime() == s2->getExecutionTime())
195 {
196 return 0;
197 }
198
199 if (((s1->getExecutionTime() < s2->getExecutionTime()) && (s1->getExecutionTime() != 0)) || (s2->getExecutionTime() == 0))
200 {
201 return -1;
202 }
203 else
204 {
205 return 1;
206 }
207 }
208
209 /**
210 * Run scheduled task
211 */
212 void ScheduledTask::run(SchedulerCallback *callback)
213 {
214 bool oneTimeSchedule = !_tcscmp(m_schedule, _T(""));
215
216 setFlag(SCHEDULED_TASK_RUNNING);
217 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
218 ScheduledTaskParameters param(m_params, m_owner, m_objectId);
219 callback->m_func(&param);
220 setLastExecutionTime(time(NULL));
221
222 if (oneTimeSchedule)
223 {
224 MutexLock(s_oneTimeScheduleLock);
225 }
226
227 removeFlag(SCHEDULED_TASK_RUNNING);
228 setFlag(SCHEDULED_TASK_EXECUTED);
229 saveToDatabase(false);
230 if (oneTimeSchedule)
231 {
232 s_oneTimeSchedules.sort(ScheduledTaskComparator);
233 MutexUnlock(s_oneTimeScheduleLock);
234 }
235
236 if (oneTimeSchedule && checkFlag(SCHEDULED_TASK_SYSTEM))
237 RemoveScheduledTask(m_id, 0, SYSTEM_ACCESS_FULL);
238 }
239
240 /**
241 * Fill NXCP message with task data
242 */
243 void ScheduledTask::fillMessage(NXCPMessage *msg)
244 {
245 msg->setField(VID_SCHEDULED_TASK_ID, m_id);
246 msg->setField(VID_TASK_HANDLER, m_taskHandlerId);
247 msg->setField(VID_SCHEDULE, m_schedule);
248 msg->setField(VID_PARAMETER, m_params);
249 msg->setFieldFromTime(VID_EXECUTION_TIME, m_executionTime);
250 msg->setFieldFromTime(VID_LAST_EXECUTION_TIME, m_lastExecution);
251 msg->setField(VID_FLAGS, (UINT32)m_flags);
252 msg->setField(VID_OWNER, m_owner);
253 msg->setField(VID_OBJECT_ID, m_objectId);
254 }
255
256 /**
257 * Fill NXCP message with task data
258 */
259 void ScheduledTask::fillMessage(NXCPMessage *msg, UINT32 base)
260 {
261 msg->setField(base, m_id);
262 msg->setField(base + 1, m_taskHandlerId);
263 msg->setField(base + 2, m_schedule);
264 msg->setField(base + 3, m_params);
265 msg->setFieldFromTime(base + 4, m_executionTime);
266 msg->setFieldFromTime(base + 5, m_lastExecution);
267 msg->setField(base + 6, m_flags);
268 msg->setField(base + 7, m_owner);
269 msg->setField(base + 8, m_objectId);
270 }
271
272 /**
273 * Check if user can access this scheduled task
274 */
275 bool ScheduledTask::canAccess(UINT32 userId, UINT64 systemAccess)
276 {
277 if (systemAccess & SYSTEM_ACCESS_ALL_SCHEDULED_TASKS)
278 return true;
279
280 if(systemAccess & SYSTEM_ACCESS_USER_SCHEDULED_TASKS)
281 return !checkFlag(SCHEDULED_TASK_SYSTEM);
282
283 if (systemAccess & SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)
284 return userId == m_owner;
285
286 return false;
287 }
288
289 /**
290 * Function that adds to list task handler function
291 */
292 void RegisterSchedulerTaskHandler(const TCHAR *id, scheduled_action_executor exec, UINT64 accessRight)
293 {
294 s_callbacks.set(id, new SchedulerCallback(exec, accessRight));
295 DbgPrintf(6, _T("Registered scheduler task %s"), id);
296 }
297
298 /**
299 * Scheduled task creation function
300 */
301 UINT32 AddScheduledTask(const TCHAR *task, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemRights, UINT32 flags)
302 {
303 if ((systemRights & (SYSTEM_ACCESS_ALL_SCHEDULED_TASKS | SYSTEM_ACCESS_USER_SCHEDULED_TASKS | SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)) == 0)
304 return RCC_ACCESS_DENIED;
305 DbgPrintf(7, _T("AddSchedule: Add cron schedule %s, %s, %s"), task, schedule, params);
306 MutexLock(s_cronScheduleLock);
307 ScheduledTask *sh = new ScheduledTask(CreateUniqueId(IDG_SCHEDULED_TASK), task, schedule, params, owner, objectId, flags);
308 sh->saveToDatabase(true);
309 s_cronSchedules.add(sh);
310 MutexUnlock(s_cronScheduleLock);
311 return RCC_SUCCESS;
312 }
313
314 /**
315 * One time schedule creation function
316 */
317 UINT32 AddOneTimeScheduledTask(const TCHAR *task, time_t nextExecutionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemRights, UINT32 flags)
318 {
319 if ((systemRights & (SYSTEM_ACCESS_ALL_SCHEDULED_TASKS | SYSTEM_ACCESS_USER_SCHEDULED_TASKS | SYSTEM_ACCESS_OWN_SCHEDULED_TASKS)) == 0)
320 return RCC_ACCESS_DENIED;
321 DbgPrintf(7, _T("AddOneTimeAction: Add one time schedule %s, %d, %s"), task, nextExecutionTime, params);
322 MutexLock(s_oneTimeScheduleLock);
323 ScheduledTask *sh = new ScheduledTask(CreateUniqueId(IDG_SCHEDULED_TASK), task, nextExecutionTime, params, owner, objectId, flags);
324 sh->saveToDatabase(true);
325 s_oneTimeSchedules.add(sh);
326 s_oneTimeSchedules.sort(ScheduledTaskComparator);
327 MutexUnlock(s_oneTimeScheduleLock);
328 ConditionSet(s_cond);
329 return RCC_SUCCESS;
330 }
331
332 /**
333 * Scheduled actionUpdate
334 */
335 UINT32 UpdateScheduledTask(int id, const TCHAR *task, const TCHAR *schedule, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemAccessRights, UINT32 flags)
336 {
337 DbgPrintf(7, _T("UpdateSchedule: update cron schedule %d, %s, %s, %s"), id, task, schedule, params);
338 MutexLock(s_cronScheduleLock);
339 UINT32 rcc = RCC_SUCCESS;
340 bool found = false;
341 for (int i = 0; i < s_cronSchedules.size(); i++)
342 {
343 if (s_cronSchedules.get(i)->getId() == id)
344 {
345 if (!s_cronSchedules.get(i)->canAccess(owner, systemAccessRights))
346 {
347 rcc = RCC_ACCESS_DENIED;
348 break;
349 }
350 s_cronSchedules.get(i)->update(task, schedule, params, owner, objectId, flags);
351 s_cronSchedules.get(i)->saveToDatabase(false);
352 found = true;
353 break;
354 }
355 }
356 MutexUnlock(s_cronScheduleLock);
357
358 if (!found)
359 {
360 //check in different que and if exists - remove from one and add to another
361 MutexLock(s_oneTimeScheduleLock);
362 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
363 {
364 if (s_oneTimeSchedules.get(i)->getId() == id)
365 {
366 if(!s_oneTimeSchedules.get(i)->canAccess(owner, systemAccessRights))
367 {
368 rcc = RCC_ACCESS_DENIED;
369 break;
370 }
371 ScheduledTask *st = s_oneTimeSchedules.get(i);
372 s_oneTimeSchedules.unlink(i);
373 s_oneTimeSchedules.sort(ScheduledTaskComparator);
374 st->update(task, schedule, params, owner, objectId, flags);
375 st->saveToDatabase(false);
376
377 MutexLock(s_cronScheduleLock);
378 s_cronSchedules.add(st);
379 MutexUnlock(s_cronScheduleLock);
380
381 break;
382 }
383 }
384 MutexUnlock(s_oneTimeScheduleLock);
385 }
386
387 return rcc;
388 }
389
390 /**
391 * One time action update
392 */
393 UINT32 UpdateOneTimeScheduledTask(int id, const TCHAR *task, time_t nextExecutionTime, const TCHAR *params, UINT32 owner, UINT32 objectId, UINT64 systemAccessRights, UINT32 flags)
394 {
395 DbgPrintf(7, _T("UpdateOneTimeAction: update one time schedule %d, %s, %d, %s"), id, task, nextExecutionTime, params);
396 bool found = false;
397 MutexLock(s_oneTimeScheduleLock);
398 UINT32 rcc = RCC_SUCCESS;
399 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
400 {
401 if (s_oneTimeSchedules.get(i)->getId() == id)
402 {
403 if(!s_oneTimeSchedules.get(i)->canAccess(owner, systemAccessRights))
404 {
405 rcc = RCC_ACCESS_DENIED;
406 break;
407 }
408 s_oneTimeSchedules.get(i)->update(task, nextExecutionTime, params, owner, objectId, flags);
409 s_oneTimeSchedules.get(i)->saveToDatabase(false);
410 s_oneTimeSchedules.sort(ScheduledTaskComparator);
411 found = true;
412 break;
413 }
414 }
415 MutexUnlock(s_oneTimeScheduleLock);
416
417 if (!found)
418 {
419 //check in different queue and if exists - remove from one and add to another
420 MutexLock(s_cronScheduleLock);
421 for (int i = 0; i < s_cronSchedules.size(); i++)
422 {
423 if (s_cronSchedules.get(i)->getId() == id)
424 {
425 if(!s_cronSchedules.get(i)->canAccess(owner, systemAccessRights))
426 {
427 rcc = RCC_ACCESS_DENIED;
428 break;
429 }
430 ScheduledTask *st = s_cronSchedules.get(i);
431 s_cronSchedules.unlink(i);
432 st->update(task, nextExecutionTime, params, owner, objectId, flags);
433 st->saveToDatabase(false);
434
435 MutexLock(s_oneTimeScheduleLock);
436 s_oneTimeSchedules.add(st);
437 s_oneTimeSchedules.sort(ScheduledTaskComparator);
438 MutexUnlock(s_oneTimeScheduleLock);
439
440 found = true;
441 break;
442 }
443 }
444 MutexUnlock(s_cronScheduleLock);
445 }
446
447 if(found)
448 ConditionSet(s_cond);
449 return rcc;
450 }
451
452 /**
453 * Removes scheduled task from database by id
454 */
455 static void DeleteScheduledTaskFromDB(UINT32 id)
456 {
457 DB_HANDLE db = DBConnectionPoolAcquireConnection();
458 TCHAR query[256];
459 _sntprintf(query, 256, _T("DELETE FROM scheduled_tasks WHERE id = %d"), id);
460 DBQuery(db, query);
461 DBConnectionPoolReleaseConnection(db);
462 NotifyClientSessions(NX_NOTIFY_SCHEDULE_UPDATE,0);
463 }
464
465 /**
466 * Removes scheduled task by id
467 */
468 UINT32 RemoveScheduledTask(UINT32 id, UINT32 user, UINT64 systemRights)
469 {
470 DbgPrintf(7, _T("RemoveSchedule: schedule(%d) removed"), id);
471 bool found = false;
472 UINT32 rcc = RCC_SUCCESS;
473
474 MutexLock(s_cronScheduleLock);
475 for (int i = 0; i < s_cronSchedules.size(); i++)
476 {
477 if (s_cronSchedules.get(i)->getId() == id)
478 {
479 if(!s_cronSchedules.get(i)->canAccess(user, systemRights))
480 {
481 rcc = RCC_ACCESS_DENIED;
482 break;
483 }
484 s_cronSchedules.remove(i);
485 found = true;
486 break;
487 }
488 }
489 MutexUnlock(s_cronScheduleLock);
490
491 if(found)
492 {
493 DeleteScheduledTaskFromDB(id);
494 return rcc;
495 }
496
497 MutexLock(s_oneTimeScheduleLock);
498 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
499 {
500 if (s_oneTimeSchedules.get(i)->getId() == id)
501 {
502 if(!s_cronSchedules.get(i)->canAccess(user, systemRights))
503 {
504 rcc = RCC_ACCESS_DENIED;
505 break;
506 }
507 s_oneTimeSchedules.remove(i);
508 s_oneTimeSchedules.sort(ScheduledTaskComparator);
509 found = true;
510 break;
511 }
512 }
513 MutexUnlock(s_oneTimeScheduleLock);
514
515 if (found)
516 {
517 ConditionSet(s_cond);
518 DeleteScheduledTaskFromDB(id);
519 }
520 return rcc;
521 }
522
523 /**
524 * Fills message with scheduled tasks list
525 */
526 void GetSheduledTasks(NXCPMessage *msg, UINT32 userId, UINT64 systemRights)
527 {
528 int scheduleCount = 0;
529 int base = VID_SCHEDULE_LIST_BASE;
530
531 MutexLock(s_oneTimeScheduleLock);
532 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
533 {
534 if (s_oneTimeSchedules.get(i)->canAccess(userId, systemRights))
535 {
536 s_oneTimeSchedules.get(i)->fillMessage(msg, base);
537 scheduleCount++;
538 base += 10;
539 }
540 }
541 MutexUnlock(s_oneTimeScheduleLock);
542
543 MutexLock(s_cronScheduleLock);
544 for(int i = 0; i < s_cronSchedules.size(); i++)
545 {
546 if (s_cronSchedules.get(i)->canAccess(userId, systemRights))
547 {
548 s_cronSchedules.get(i)->fillMessage(msg, base);
549 scheduleCount++;
550 base += 10;
551 }
552 }
553 MutexUnlock(s_cronScheduleLock);
554
555 msg->setField(VID_SCHEDULE_COUNT, scheduleCount);
556 }
557
558 /**
559 * Fills message with task handlers list
560 */
561 void GetSchedulerTaskHandlers(NXCPMessage *msg, UINT64 accessRights)
562 {
563 UINT32 base = VID_CALLBACK_BASE;
564 int count = 0;
565
566 StringList *keyList = s_callbacks.keys();
567 for(int i = 0; i < keyList->size(); i++)
568 {
569 if(accessRights & s_callbacks.get(keyList->get(i))->m_accessRight)
570 {
571 msg->setField(base, keyList->get(i));
572 count++;
573 base++;
574 }
575 }
576 delete keyList;
577 msg->setField(VID_CALLBACK_COUNT, (UINT32)count);
578 }
579
580 /**
581 * Creates scheduled task from message
582 */
583 UINT32 CreateScehduledTaskFromMsg(NXCPMessage *request, UINT32 owner, UINT64 systemAccessRights)
584 {
585 TCHAR *taskId = request->getFieldAsString(VID_TASK_HANDLER);
586 TCHAR *schedule = NULL;
587 time_t nextExecutionTime = 0;
588 TCHAR *params = request->getFieldAsString(VID_PARAMETER);
589 int flags = request->getFieldAsInt32(VID_FLAGS);
590 int objectId = request->getFieldAsInt32(VID_OBJECT_ID);
591 UINT32 result;
592 if (request->isFieldExist(VID_SCHEDULE))
593 {
594 schedule = request->getFieldAsString(VID_SCHEDULE);
595 result = AddScheduledTask(taskId, schedule, params, owner, objectId, systemAccessRights, flags);
596 }
597 else
598 {
599 nextExecutionTime = request->getFieldAsTime(VID_EXECUTION_TIME);
600 result = AddOneTimeScheduledTask(taskId, nextExecutionTime, params, owner, objectId, systemAccessRights, flags);
601 }
602 free(taskId);
603 free(schedule);
604 free(params);
605 return result;
606 }
607
608 /**
609 * Update scheduled task from message
610 */
611 UINT32 UpdateScheduledTaskFromMsg(NXCPMessage *request, UINT32 owner, UINT64 systemAccessRights)
612 {
613 UINT32 id = request->getFieldAsInt32(VID_SCHEDULED_TASK_ID);
614 TCHAR *taskId = request->getFieldAsString(VID_TASK_HANDLER);
615 TCHAR *schedule = NULL;
616 time_t nextExecutionTime = 0;
617 TCHAR *params = request->getFieldAsString(VID_PARAMETER);
618 UINT32 flags = request->getFieldAsInt32(VID_FLAGS);
619 UINT32 objectId = request->getFieldAsInt32(VID_OBJECT_ID);
620 UINT32 rcc;
621 if(request->isFieldExist(VID_SCHEDULE))
622 {
623 schedule = request->getFieldAsString(VID_SCHEDULE);
624 rcc = UpdateScheduledTask(id, taskId, schedule, params, owner, objectId, systemAccessRights, flags);
625 }
626 else
627 {
628 nextExecutionTime = request->getFieldAsTime(VID_EXECUTION_TIME);
629 rcc = UpdateOneTimeScheduledTask(id, taskId, nextExecutionTime, params, owner, objectId, systemAccessRights, flags);
630 }
631 safe_free(taskId);
632 safe_free(schedule);
633 safe_free(params);
634 return rcc;
635 }
636
637 /**
638 * Thread that checks one time schedules and executes them
639 */
640 static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg)
641 {
642 int sleepTime = 1;
643 DbgPrintf(7, _T("OneTimeEventThread: started"));
644 while(true)
645 {
646 ConditionWait(s_cond, sleepTime);
647 if(g_flags & AF_SHUTDOWN)
648 break;
649
650 time_t now = time(NULL);
651 struct tm currLocal;
652 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
653
654 MutexLock(s_oneTimeScheduleLock);
655 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
656 {
657 ScheduledTask *sh = s_oneTimeSchedules.get(i);
658 if (sh->checkFlag(SCHEDULED_TASK_DISABLED))
659 continue;
660
661 if (sh->checkFlag(SCHEDULED_TASK_EXECUTED))
662 break;
663
664 SchedulerCallback *callback = s_callbacks.get(sh->getTaskHandlerId());
665 if(callback == NULL)
666 {
667 DbgPrintf(3, _T("OneTimeEventThread: One time execution function with taskId=\'%s\' not found"), sh->getTaskHandlerId());
668 continue;
669 }
670
671 if (sh->isRunning())
672 continue;
673
674 //execute all timmers that is expected to execute now
675 if(sh->getExecutionTime() != 0 && now >= sh->getExecutionTime())
676 {
677 DbgPrintf(7, _T("OneTimeEventThread: run schedule id=\'%d\', execution time =\'%d\'"), sh->getId(), sh->getExecutionTime());
678 ThreadPoolExecute(g_schedulerThreadPool, sh, &ScheduledTask::run, callback);
679 }
680 else
681 {
682 break;
683 }
684 }
685
686 sleepTime = INFINITE;
687
688 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
689 {
690 ScheduledTask *sh = s_oneTimeSchedules.get(i);
691 if(sh->getExecutionTime() == NEVER)
692 break;
693
694 if(now >= sh->getExecutionTime())
695 continue;
696
697 sleepTime = (int)(sh->getExecutionTime() - now);
698 sleepTime = sleepTime < 0 ? 0 : sleepTime * 1000;
699 break;
700 }
701
702 DbgPrintf(7, _T("OneTimeEventThread: thread will sleep for %d"), sleepTime);
703 MutexUnlock(s_oneTimeScheduleLock);
704 }
705 DbgPrintf(3, _T("OneTimeEventThread: stopped"));
706 return THREAD_OK;
707 }
708
709 /**
710 * Checks if it is time to execute cron schedule
711 */
712 static bool IsTimeToRun(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp)
713 {
714 TCHAR value[256];
715
716 // Minute
717 const TCHAR *curr = ExtractWord(schedule, value);
718 if (!MatchScheduleElement(value, currTime->tm_min, 59, NULL))
719 return false;
720
721 // Hour
722 curr = ExtractWord(curr, value);
723 if (!MatchScheduleElement(value, currTime->tm_hour, 23, NULL))
724 return false;
725
726 // Day of month
727 curr = ExtractWord(curr, value);
728 if (!MatchScheduleElement(value, currTime->tm_mday, GetLastMonthDay(currTime), NULL))
729 return false;
730
731 // Month
732 curr = ExtractWord(curr, value);
733 if (!MatchScheduleElement(value, currTime->tm_mon + 1, 12, NULL))
734 return false;
735
736 // Day of week
737 ExtractWord(curr, value);
738 for(int i = 0; value[i] != 0; i++)
739 if (value[i] == _T('7'))
740 value[i] = _T('0');
741 if (!MatchScheduleElement(value, currTime->tm_wday, 7, currTime))
742 return false;
743
744 return true;
745 }
746
747 /**
748 * Wakes up for execution of one time schedule or for recalculation new wake up timestamp
749 */
750 static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg)
751 {
752 DbgPrintf(3, _T("CronCheckThread: started"));
753 do {
754 time_t now = time(NULL);
755 struct tm currLocal;
756 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
757
758 MutexLock(s_cronScheduleLock);
759 for(int i = 0; i < s_cronSchedules.size(); i++)
760 {
761 ScheduledTask *sh = s_cronSchedules.get(i);
762 if (sh->checkFlag(SCHEDULED_TASK_DISABLED))
763 continue;
764 SchedulerCallback *callback = s_callbacks.get(sh->getTaskHandlerId());
765 if (callback == NULL)
766 {
767 DbgPrintf(3, _T("CronCheckThread: Cron execution function with taskId=\'%s\' not found"), sh->getTaskHandlerId());
768 continue;
769 }
770 if (IsTimeToRun(&currLocal, sh->getSchedule(), now))
771 {
772 DbgPrintf(7, _T("CronCheckThread: run schedule id=\'%d\', schedule=\'%s\'"), sh->getId(), sh->getSchedule());
773 ThreadPoolExecute(g_schedulerThreadPool, sh, &ScheduledTask::run, callback);
774 }
775 }
776 MutexUnlock(s_cronScheduleLock);
777 } while(!SleepAndCheckForShutdown(60)); //sleep 1 minute
778 DbgPrintf(3, _T("CronCheckThread: stopped"));
779 return THREAD_OK;
780 }
781
782 /**
783 * Initialize task scheduler - read all schedules form database and start threads for one time and crom schedules
784 */
785 void InitializeTaskScheduler()
786 {
787 g_schedulerThreadPool = ThreadPoolCreate(1, 64, _T("SCHEDULER"));
788 //read from DB configuration
789 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
790 DB_RESULT hResult = DBSelect(hdb, _T("SELECT id,taskId,schedule,params,execution_time,last_execution_time,flags,owner,object_id FROM scheduled_tasks"));
791 if (hResult != NULL)
792 {
793 int count = DBGetNumRows(hResult);
794 for(int i = 0; i < count; i++)
795 {
796 ScheduledTask *sh = new ScheduledTask(hResult, i);
797 if(!_tcscmp(sh->getSchedule(), _T("")))
798 {
799 DbgPrintf(7, _T("InitializeTaskScheduler: Add one time schedule %d, %d"), sh->getId(), sh->getExecutionTime());
800 s_oneTimeSchedules.add(sh);
801 }
802 else
803 {
804 DbgPrintf(7, _T("InitializeTaskScheduler: Add cron schedule %d, %s"), sh->getId(), sh->getSchedule());
805 s_cronSchedules.add(sh);
806 }
807 }
808 DBFreeResult(hResult);
809 }
810 DBConnectionPoolReleaseConnection(hdb);
811 s_oneTimeSchedules.sort(ScheduledTaskComparator);
812 //start threads that will start cron and one time tasks threads
813 ThreadCreate(OneTimeEventThread, 0, NULL);
814 ThreadCreate(CronCheckThread, 0, NULL);
815 }
816
817 /**
818 * Stop all scheduler threads and free all memory
819 */
820 void CloseTaskScheduler()
821 {
822 ConditionSet(s_cond);
823 ConditionDestroy(s_cond);
824 s_cond = INVALID_CONDITION_HANDLE;
825 ThreadPoolDestroy(g_schedulerThreadPool);
826 MutexDestroy(s_cronScheduleLock);
827 MutexDestroy(s_oneTimeScheduleLock);
828 }