Added function that fill message with schedule data
[public/netxms.git] / src / server / core / schedule.cpp
CommitLineData
0a145c10 1/*
2** NetXMS - Network Management System
3** Copyright (C) 2003-2015 Raden Solutions
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: schedule.cpp
20**
21**/
22
23#include "nxcore.h"
24
25#define NEVER 0
26
27#define SCHEDULE_DISABLED 1
28#define SCHEDULE_EXECUTED 2
29#define SCHEDULE_IN_PROGRES 4
30#define SCHEDULE_INTERNAL 5
31//#define SCHEDULE_
32
33class Schedule;
34class ScheduleCallback;
35
36/**
37 * Scheduled task execution pool
38 */
39static ThreadPool *s_taskSchedullPool = NULL;
40/**
41 * Static fields
42 */
43static StringObjectMap<ScheduleCallback> s_callbacks(true);
44static ObjectArray<Schedule> s_cronSchedules(5, 5, true);
45static ObjectArray<Schedule> s_oneTimeSchedules(5, 5, true);
46static CONDITION s_cond = ConditionCreate(false);
47
48/**
49 * Static functions
50 */
51void InitializeTaskScheduler();
52void CloseTaskScheduler();
53static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg);
54static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg);
55static bool IsItTime(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp);
56static int ScheduleListSortCallback(const void *e1, const void *e2);
57
58/**
59 * Mutex for shedule structures
60 */
61static MUTEX s_cronScheduleLock = MutexCreate();
62static MUTEX s_oneTimeScheduleLock = MutexCreate();
63
64class ScheduleCallback
65{
66public:
67 scheduled_action_executor m_func;
68 ScheduleCallback(scheduled_action_executor func) { m_func = func; }
69};
70
71class Schedule
72{
73private:
74 UINT32 m_id;
75 TCHAR *m_taskId;
76 TCHAR *m_schedule;
77 TCHAR *m_params;
78 time_t m_executionTime;
79 time_t m_lastExecution;
80 int m_flags;
81
82public:
83
84 Schedule(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params);
85 Schedule(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params);
86 Schedule(DB_RESULT hResult, int row);
87
88 ~Schedule(){ delete m_taskId; delete m_schedule; delete m_params; }
89
90 UINT32 getId() { return m_id; }
91 const TCHAR *getTaskId() { return m_taskId; }
92 const TCHAR *getSchedule() { return m_schedule; }
93 const TCHAR *getParams() { return m_params; }
94 time_t getExecutionTime() { return m_executionTime; }
95
96 void setLastExecutionTime(time_t time) { m_lastExecution = time; };
97 void setExecutionTime(time_t time) { m_executionTime = time; }
98 void setFlag(int flag) { m_flags |= flag; }
99 void removeFlag(int flag) { m_flags &= ~flag; }
100
101 void update(const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params);
102 void update(const TCHAR *taskId, time_t nextExecution, const TCHAR *params);
103
104 void saveToDatabase(bool newObject);
105 void run(ScheduleCallback *callback);
4ede0acd 106 void fillMessage(NXCPMessage *msg);
0a145c10 107
108 bool isInProgress() { return (m_flags & SCHEDULE_IN_PROGRES) > 0 ? true : false; }
109};
110
111Schedule::Schedule(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params)
112{
113 m_id = id;
114 m_taskId = _tcsdup(CHECK_NULL_EX(taskId));
115 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
116 m_params = _tcsdup(CHECK_NULL_EX(params));
117 m_executionTime = NEVER;
118 m_lastExecution = NEVER;
119 m_flags = 0;
120}
121
122Schedule::Schedule(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params)
123{
124 m_id = id;
125 m_taskId = _tcsdup(CHECK_NULL_EX(taskId));
126 m_schedule = _tcsdup(_T(""));
127 m_params = _tcsdup(CHECK_NULL_EX(params));
128 m_executionTime = executionTime;
129 m_lastExecution = NEVER;
130 m_flags = 0;
131}
132
133Schedule::Schedule(DB_RESULT hResult, int row)
134{
135 m_id = DBGetFieldULong(hResult, row, 0);
136 m_taskId = DBGetField(hResult, row, 1, NULL, 0);
137 m_schedule = DBGetField(hResult, row, 2, NULL, 0);
138 m_params = DBGetField(hResult, row, 3, NULL, 0);
139 m_executionTime = DBGetFieldULong(hResult, row, 4);
140 m_lastExecution = DBGetFieldULong(hResult, row, 5);
141 m_flags = DBGetFieldULong(hResult, row, 6);
142}
143
144void Schedule::update(const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params)
145{
146 safe_free(m_taskId);
147 m_taskId = _tcsdup(taskId);
148 safe_free(m_schedule);
149 m_schedule = _tcsdup(schedule);
150 safe_free(m_params);
151 m_params = _tcsdup(params);
152}
153
154void Schedule::update(const TCHAR *taskId, time_t nextExecution, const TCHAR *params)
155{
156 safe_free(m_taskId);
157 m_taskId = _tcsdup(taskId);
158 safe_free(m_params);
159 m_params = _tcsdup(params);
160 m_executionTime = nextExecution;
161}
162
163void Schedule::saveToDatabase(bool newObject)
164{
165 DB_HANDLE db = DBConnectionPoolAcquireConnection();
166 DB_STATEMENT hStmt;
167
168 if (newObject)
169 {
170 hStmt = DBPrepare(db,
171 _T("INSERT INTO schedule (taskId,shedule,params,execution_time,")
172 _T("last_execution_time,flags, id) VALUES (?,?,?,?,?,?,?)"));
173 }
174 else
175 {
176 hStmt = DBPrepare(db,
177 _T("UPDATE schedule SET taskId=?,shedule=?,params=?,")
178 _T("execution_time=?,last_execution_time=?,flags=? ")
179 _T("WHERE id=?"));
180 }
181
182 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, m_taskId, DB_BIND_STATIC);
183 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, m_schedule, DB_BIND_STATIC);
184 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_params, DB_BIND_STATIC);
185 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (UINT32)m_executionTime);
186 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (UINT32)m_lastExecution);
187 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_flags);
188 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_id);
189
190 if (hStmt == NULL)
191 return;
192
193 DBExecute(hStmt);
194 DBFreeStatement(hStmt);
195 DBConnectionPoolReleaseConnection(db);
196}
197
198void Schedule::run(ScheduleCallback *callback)
199{
4ede0acd 200 bool oneTimeShedule = !_tcscmp(m_schedule, _T(""));
201
0a145c10 202 setFlag(SCHEDULE_IN_PROGRES);
203 callback->m_func(m_params);
204 setLastExecutionTime(time(NULL));
4ede0acd 205
206 if(oneTimeShedule)
207 {
208 MutexLock(s_oneTimeScheduleLock);
209 setExecutionTime(NEVER);
210 }
211
0a145c10 212 removeFlag(SCHEDULE_IN_PROGRES);
213 setFlag(SCHEDULE_EXECUTED);
214 saveToDatabase(false);
4ede0acd 215 if(oneTimeShedule)
216 {
217 s_oneTimeSchedules.sort(ScheduleListSortCallback);
218 MutexUnlock(s_oneTimeScheduleLock);
219 }
0a145c10 220}
221
4ede0acd 222void Schedule::fillMessage(NXCPMessage *msg)
223{
224 msg->setField(VID_SCHEDULE_ID, m_id);
225 msg->setField(VID_TASK_ID, m_taskId);
226 msg->setField(VID_SCHEDULE, m_schedule);
227 msg->setField(VID_PARAMETER, m_params);
228 msg->setFieldFromTime(VID_EXECUTION_TIME, m_executionTime);
229 msg->setFieldFromTime(VID_LAST_EXECUTION_TIME, m_lastExecution);
230 msg->setField(VID_FLAGS, (UINT32)m_flags);
231}
0a145c10 232
233/**
234 * Callback for sorting reset list
235 */
236static int ScheduleListSortCallback(const void *e1, const void *e2)
237{
238 Schedule * s1 = *((Schedule**)e1);
239 Schedule * s2 = *((Schedule**)e2);
240
241 if (s1->getExecutionTime() == s2->getExecutionTime())
242 {
243 return 0;
244 }
245
246 if (((s1->getExecutionTime() < s2->getExecutionTime()) && (s1->getExecutionTime() != 0)) || (s2->getExecutionTime() == 0))
247 {
248 return -1;
249 }
250 else
251 {
252 return 1;
253 }
254}
255
4ede0acd 256
257/**
258 * Initialize task sheduler - read all shedules form database and start threads for one time and crom shedules
259 */
0a145c10 260void InitializeTaskScheduler()
261{
262 s_taskSchedullPool = ThreadPoolCreate(1, 64, _T("TASKSCHEDULL"));
263 //read from DB configuration
264 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
265 DB_RESULT hResult = DBSelect(hdb, _T("SELECT id,taskId,shedule,params,execution_time,last_execution_time,flags FROM schedule"));
266 if (hResult != NULL)
267 {
268 int count = DBGetNumRows(hResult);
269 for(int i = 0; i < count; i++)
270 {
271 Schedule *sh = new Schedule(hResult, i);
272 if(!_tcscmp(sh->getSchedule(), _T("")))
273 {
274 DbgPrintf(7, _T("InitializeTaskScheduler: Add one time shedule %d, %d"), sh->getId(), sh->getExecutionTime());
275 s_oneTimeSchedules.add(sh);
276 }
277 else
278 {
279 DbgPrintf(7, _T("InitializeTaskScheduler: Add cron shedule %d, %s"), sh->getId(), sh->getSchedule());
280 s_cronSchedules.add(sh);
281 }
282 }
283 DBFreeResult(hResult);
284 }
285 DBConnectionPoolReleaseConnection(hdb);
286 s_oneTimeSchedules.sort(ScheduleListSortCallback);
287 //start threads that will start cron and one time tasks threads
288 ThreadCreate(OneTimeEventThread, 0, NULL);
289 ThreadCreate(CronCheckThread, 0, NULL);
290}
291
4ede0acd 292/**
293 * Stop all sheduler threads and free all memory
294 */
0a145c10 295void CloseTaskScheduler()
296{
297 ConditionSet(s_cond);
298 s_cond = INVALID_CONDITION_HANDLE;
299 ConditionDestroy(s_cond);
300 ThreadPoolDestroy(s_taskSchedullPool);
301 MutexDestroy(s_cronScheduleLock);
302 MutexDestroy(s_oneTimeScheduleLock);
303}
304
4ede0acd 305/**
306 * Function that adds to list task handler function
307 */
0a145c10 308void AddSchedulleTaskHandler(const TCHAR *id, scheduled_action_executor exec)
309{
310 s_callbacks.set(id, new ScheduleCallback(exec));
311 DbgPrintf(6, _T("AddSchedulleTaskHandler: Add shedule callback %s"), id);
312}
313
4ede0acd 314/**
315 * Scheduled task creation function
316 */
0a145c10 317void AddSchedule(const TCHAR *task, const TCHAR *schedule, const TCHAR *params)
318{
319 DbgPrintf(7, _T("AddSchedule: Add cron shedule %s, %s, %s"), task, schedule, params);
320 MutexLock(s_cronScheduleLock);
321 Schedule *sh = new Schedule(CreateUniqueId(IDG_SCHEDULE), task, schedule, params);
322 sh->saveToDatabase(true);
323 s_cronSchedules.add(sh);
324 MutexUnlock(s_cronScheduleLock);
325}
326
4ede0acd 327/**
328 * One time action creation function
329 */
0a145c10 330void AddOneTimeAction(const TCHAR *task, time_t nextExecutionTime, const TCHAR *params)
331{
332 DbgPrintf(7, _T("AddOneTimeAction: Add one time shedule %s, %d, %s"), task, nextExecutionTime, params);
333 MutexLock(s_oneTimeScheduleLock);
334 Schedule *sh = new Schedule(CreateUniqueId(IDG_SCHEDULE), task, nextExecutionTime, params);
335 sh->saveToDatabase(true);
336 s_oneTimeSchedules.add(sh);
337 s_oneTimeSchedules.sort(ScheduleListSortCallback);
338 MutexUnlock(s_oneTimeScheduleLock);
339 ConditionSet(s_cond);
340}
341
4ede0acd 342/**
343 * Scheduled actionUpdate
344 */
0a145c10 345void UpdateSchedule(int id, const TCHAR *task, const TCHAR *schedule, const TCHAR *params)
346{
347 DbgPrintf(7, _T("UpdateSchedule: update cron shedule %d, %s, %s, %s"), id, task, schedule, params);
348 MutexLock(s_cronScheduleLock);
349 for (int i = 0; i < s_cronSchedules.size(); i++)
350 {
351 if (s_cronSchedules.get(i)->getId() == id)
352 {
353 s_cronSchedules.get(i)->update(task, schedule, params);
354 s_cronSchedules.get(i)->saveToDatabase(false);
355 break;
356 }
357 }
358 MutexUnlock(s_cronScheduleLock);
359
360}
361
4ede0acd 362/**
363 * One time action update
364 */
0a145c10 365void UpdateOneTimeAction(int id, const TCHAR *task, time_t nextExecutionTime, const TCHAR *params)
366{
367 DbgPrintf(7, _T("UpdateOneTimeAction: update one time shedule %d, %s, %d, %s"), id, task, nextExecutionTime, params);
368 bool found = true;
369 MutexLock(s_oneTimeScheduleLock);
370 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
371 {
372 if (s_oneTimeSchedules.get(i)->getId() == id)
373 {
374 s_oneTimeSchedules.get(i)->update(task, nextExecutionTime, params);
375 s_oneTimeSchedules.get(i)->saveToDatabase(false);
376 s_oneTimeSchedules.sort(ScheduleListSortCallback);
377 found = true;
378 break;
379 }
380 }
381 MutexUnlock(s_oneTimeScheduleLock);
382
383 if(found)
384 ConditionSet(s_cond);
385}
386
4ede0acd 387/**
388 * Removes shedule form Database by id
389 */
0a145c10 390void DeleteFromDB(UINT32 id)
391{
392 DB_HANDLE db = DBConnectionPoolAcquireConnection();
393 TCHAR query[256];
394 _sntprintf(query, 256, _T("DELETE FROM schedule WHERE id = %s"), id);
395 DBQuery(db, query);
396 DB_STATEMENT hStmt;
397 DBConnectionPoolReleaseConnection(db);
398}
399
4ede0acd 400/**
401 * Removes shedule by id
402 */
0a145c10 403void RemoveSchedule(UINT32 id, bool alreadyLocked)
404{
405 DbgPrintf(7, _T("RemoveSchedule: shedule(%d) removed"), id);
406 bool found = false;
407
408 MutexLock(s_cronScheduleLock);
409 for (int i = 0; i < s_cronSchedules.size(); i++)
410 {
411 if (s_cronSchedules.get(i)->getId() == id)
412 {
413 s_cronSchedules.remove(i);
414 found = true;
415 break;
416 }
417 }
418 MutexUnlock(s_cronScheduleLock);
419
420 if(found)
421 {
422 DeleteFromDB(id);
423 return;
424 }
425
426 MutexLock(s_oneTimeScheduleLock);
427 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
428 {
429 if (s_oneTimeSchedules.get(i)->getId() == id)
430 {
431 s_oneTimeSchedules.remove(i);
432 s_oneTimeSchedules.sort(ScheduleListSortCallback);
433 found = true;
434 break;
435 }
436 }
437 MutexUnlock(s_oneTimeScheduleLock);
438
439
440 if (found)
441 {
442 ConditionSet(s_cond);
443 DeleteFromDB(id);
444 }
445}
446
4ede0acd 447/**
448 * Fills message with shedule list
449 */
450void GetSheduleList(NXCPMessage *msg)
451{
452 int sheduleCount = 0;
453 int base = VID_SCHEDULE_LIST_BASE;
454
455 MutexLock(s_oneTimeScheduleLock);
456 for(int i = 0; i < s_oneTimeSchedules.size(); i++, base+=10)
457 {
458 s_oneTimeSchedules.get(i)->fillMessage(msg);
459 }
460 sheduleCount += s_oneTimeSchedules.size();
461 MutexUnlock(s_oneTimeScheduleLock);
462
463 MutexLock(s_cronScheduleLock);
464 for(int i = 0; i < s_cronSchedules.size(); i++, base+=10)
465 {
466 s_cronSchedules.get(i)->fillMessage(msg);
467 }
468 sheduleCount += s_cronSchedules.size();
469 MutexUnlock(s_cronScheduleLock);
470
471 msg->setField(VID_SCHEDULE_COUNT, sheduleCount);
472}
473
474/**
475 * Thread that checks one time shedules and executes them
476 */
0a145c10 477static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg)
478{
479 int sleepTime = 1;
480 DbgPrintf(7, _T("OneTimeEventThread: started"));
481 while(true)
482 {
483 if(!ConditionWait(s_cond, sleepTime) && (g_flags & AF_SHUTDOWN))
484 break;
485
486 //ConditionReset(s_cond);
487 time_t now = time(NULL);
488 struct tm currLocal;
489 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
490
491 MutexLock(s_oneTimeScheduleLock);
492 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
493 {
494 Schedule *sh = s_oneTimeSchedules.get(i);
495 ScheduleCallback *callback = s_callbacks.get(sh->getTaskId());
496 if(callback == NULL)
497 {
498 DbgPrintf(3, _T("OneTimeEventThread: One time execution function with taskId=\'%s\' not found"), sh->getTaskId());
499 continue;
500 }
501
502 if(sh->isInProgress())
503 continue;
504
505 //execute all timmers that is expected to execute now
506 if(sh->getExecutionTime() != 0 && now >= sh->getExecutionTime())
507 {
508 DbgPrintf(7, _T("OneTimeEventThread: run shedule id=\'%d\', execution time =\'%d\'"), sh->getId(), sh->getExecutionTime());
509 ThreadPoolExecute(s_taskSchedullPool, sh, &Schedule::run, callback);
510 }
511 else
512 {
513 break;
514 }
515 }
516
517 sleepTime = INFINITE;
518
519 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
520 {
521 Schedule *sh = s_oneTimeSchedules.get(i);
522 if(sh->getExecutionTime() == NEVER)
523 break;
524
525 if(now >= sh->getExecutionTime())
526 continue;
527
528 sleepTime = sh->getExecutionTime() - now;
529 sleepTime = sleepTime < 0 ? 0 : sleepTime * 1000;
530 break;
531 }
532
533 DbgPrintf(7, _T("OneTimeEventThread: thread will sleep for %d"), sleepTime);
534 MutexUnlock(s_oneTimeScheduleLock);
535 }
536 DbgPrintf(3, _T("OneTimeEventThread: stopped"));
537}
538
4ede0acd 539/**
540 * Wakes up for execution of one time shedule or for recolculation new wake up timestamp
541 */
0a145c10 542static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg)
543{
544 DbgPrintf(3, _T("CronCheckThread: started"));
545 do {
546 time_t now = time(NULL);
547 struct tm currLocal;
548 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
549
550 MutexLock(s_cronScheduleLock);
551 for(int i = 0; i < s_cronSchedules.size(); i++)
552 {
553 Schedule *sh = s_cronSchedules.get(i);
554 ScheduleCallback *callback = s_callbacks.get(sh->getTaskId());
555 if(callback == NULL)
556 {
557 DbgPrintf(3, _T("CronCheckThread: Cron execution function with taskId=\'%s\' not found"), sh->getTaskId());
558 continue;
559 }
560 if(IsItTime(&currLocal, sh->getSchedule(), now))
561 {
562 DbgPrintf(7, _T("OneTimeEventThread: run shedule id=\'%d\', shedule=\'%s\'"), sh->getId(), sh->getSchedule());
563 ThreadPoolExecute(s_taskSchedullPool, sh, &Schedule::run, callback);
564 }
565 }
566 MutexUnlock(s_cronScheduleLock);
567 } while(!SleepAndCheckForShutdown(60)); //sleep 1 minute
568 DbgPrintf(3, _T("CronCheckThread: stopped"));
569}
570
4ede0acd 571/**
572 * Checks if it is time to execute cron shedule
573 */
0a145c10 574static bool IsItTime(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp)
575{
576 TCHAR value[256];
577
578 // Minute
579 const TCHAR *curr = ExtractWord(schedule, value);
580 if (!MatchScheduleElement(value, currTime->tm_min, 59, NULL))
581 return false;
582
583 // Hour
584 curr = ExtractWord(curr, value);
585 if (!MatchScheduleElement(value, currTime->tm_hour, 23, NULL))
586 return false;
587
588 // Day of month
589 curr = ExtractWord(curr, value);
590 if (!MatchScheduleElement(value, currTime->tm_mday, GetLastMonthDay(currTime), NULL))
591 return false;
592
593 // Month
594 curr = ExtractWord(curr, value);
595 if (!MatchScheduleElement(value, currTime->tm_mon + 1, 12, NULL))
596 return false;
597
598 // Day of week
599 curr = ExtractWord(curr, value);
600 for(int i = 0; value[i] != 0; i++)
601 if (value[i] == _T('7'))
602 value[i] = _T('0');
603 if (!MatchScheduleElement(value, currTime->tm_wday, 7, currTime))
604 return false;
605
606 return true;
607}