49f56a6c5a3a14c05daea946809381fad4373625
[public/netxms.git] / src / server / core / schedule.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Raden Solutions
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: schedule.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 #define NEVER 0
26
27 #define SCHEDULE_DISABLED 1
28 #define SCHEDULE_EXECUTED 2
29 #define SCHEDULE_IN_PROGRES 4
30 #define SCHEDULE_INTERNAL 5
31 //#define SCHEDULE_
32
33 class Schedule;
34 class ScheduleCallback;
35
36 /**
37 * Scheduled task execution pool
38 */
39 static ThreadPool *s_taskSchedullPool = NULL;
40 /**
41 * Static fields
42 */
43 static StringObjectMap<ScheduleCallback> s_callbacks(true);
44 static ObjectArray<Schedule> s_cronSchedules(5, 5, true);
45 static ObjectArray<Schedule> s_oneTimeSchedules(5, 5, true);
46 static CONDITION s_cond = ConditionCreate(false);
47
48 /**
49 * Static functions
50 */
51 void InitializeTaskScheduler();
52 void CloseTaskScheduler();
53 static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg);
54 static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg);
55 static bool IsItTime(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp);
56 static int ScheduleListSortCallback(const void *e1, const void *e2);
57
58 /**
59 * Mutex for shedule structures
60 */
61 static MUTEX s_cronScheduleLock = MutexCreate();
62 static MUTEX s_oneTimeScheduleLock = MutexCreate();
63
64 class ScheduleCallback
65 {
66 public:
67 scheduled_action_executor m_func;
68 ScheduleCallback(scheduled_action_executor func) { m_func = func; }
69 };
70
71 class Schedule
72 {
73 private:
74 UINT32 m_id;
75 TCHAR *m_taskId;
76 TCHAR *m_schedule;
77 TCHAR *m_params;
78 time_t m_executionTime;
79 time_t m_lastExecution;
80 int m_flags;
81
82 public:
83
84 Schedule(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params);
85 Schedule(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params);
86 Schedule(DB_RESULT hResult, int row);
87
88 ~Schedule(){ delete m_taskId; delete m_schedule; delete m_params; }
89
90 UINT32 getId() { return m_id; }
91 const TCHAR *getTaskId() { return m_taskId; }
92 const TCHAR *getSchedule() { return m_schedule; }
93 const TCHAR *getParams() { return m_params; }
94 time_t getExecutionTime() { return m_executionTime; }
95
96 void setLastExecutionTime(time_t time) { m_lastExecution = time; };
97 void setExecutionTime(time_t time) { m_executionTime = time; }
98 void setFlag(int flag) { m_flags |= flag; }
99 void removeFlag(int flag) { m_flags &= ~flag; }
100
101 void update(const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params);
102 void update(const TCHAR *taskId, time_t nextExecution, const TCHAR *params);
103
104 void saveToDatabase(bool newObject);
105 void run(ScheduleCallback *callback);
106
107 bool isInProgress() { return (m_flags & SCHEDULE_IN_PROGRES) > 0 ? true : false; }
108 };
109
110 Schedule::Schedule(int id, const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params)
111 {
112 m_id = id;
113 m_taskId = _tcsdup(CHECK_NULL_EX(taskId));
114 m_schedule = _tcsdup(CHECK_NULL_EX(schedule));
115 m_params = _tcsdup(CHECK_NULL_EX(params));
116 m_executionTime = NEVER;
117 m_lastExecution = NEVER;
118 m_flags = 0;
119 }
120
121 Schedule::Schedule(int id, const TCHAR *taskId, time_t executionTime, const TCHAR *params)
122 {
123 m_id = id;
124 m_taskId = _tcsdup(CHECK_NULL_EX(taskId));
125 m_schedule = _tcsdup(_T(""));
126 m_params = _tcsdup(CHECK_NULL_EX(params));
127 m_executionTime = executionTime;
128 m_lastExecution = NEVER;
129 m_flags = 0;
130 }
131
132 Schedule::Schedule(DB_RESULT hResult, int row)
133 {
134 m_id = DBGetFieldULong(hResult, row, 0);
135 m_taskId = DBGetField(hResult, row, 1, NULL, 0);
136 m_schedule = DBGetField(hResult, row, 2, NULL, 0);
137 m_params = DBGetField(hResult, row, 3, NULL, 0);
138 m_executionTime = DBGetFieldULong(hResult, row, 4);
139 m_lastExecution = DBGetFieldULong(hResult, row, 5);
140 m_flags = DBGetFieldULong(hResult, row, 6);
141 }
142
143 void Schedule::update(const TCHAR *taskId, const TCHAR *schedule, const TCHAR *params)
144 {
145 safe_free(m_taskId);
146 m_taskId = _tcsdup(taskId);
147 safe_free(m_schedule);
148 m_schedule = _tcsdup(schedule);
149 safe_free(m_params);
150 m_params = _tcsdup(params);
151 }
152
153 void Schedule::update(const TCHAR *taskId, time_t nextExecution, const TCHAR *params)
154 {
155 safe_free(m_taskId);
156 m_taskId = _tcsdup(taskId);
157 safe_free(m_params);
158 m_params = _tcsdup(params);
159 m_executionTime = nextExecution;
160 }
161
162 void Schedule::saveToDatabase(bool newObject)
163 {
164 DB_HANDLE db = DBConnectionPoolAcquireConnection();
165 DB_STATEMENT hStmt;
166
167 if (newObject)
168 {
169 hStmt = DBPrepare(db,
170 _T("INSERT INTO schedule (taskId,shedule,params,execution_time,")
171 _T("last_execution_time,flags, id) VALUES (?,?,?,?,?,?,?)"));
172 }
173 else
174 {
175 hStmt = DBPrepare(db,
176 _T("UPDATE schedule SET taskId=?,shedule=?,params=?,")
177 _T("execution_time=?,last_execution_time=?,flags=? ")
178 _T("WHERE id=?"));
179 }
180
181 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, m_taskId, DB_BIND_STATIC);
182 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, m_schedule, DB_BIND_STATIC);
183 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_params, DB_BIND_STATIC);
184 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (UINT32)m_executionTime);
185 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (UINT32)m_lastExecution);
186 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_flags);
187 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_id);
188
189 if (hStmt == NULL)
190 return;
191
192 DBExecute(hStmt);
193 DBFreeStatement(hStmt);
194 DBConnectionPoolReleaseConnection(db);
195 }
196
197 void Schedule::run(ScheduleCallback *callback)
198 {
199 setFlag(SCHEDULE_IN_PROGRES);
200 callback->m_func(m_params);
201 setLastExecutionTime(time(NULL));
202 MutexLock(s_oneTimeScheduleLock);
203 setExecutionTime(NEVER);
204 removeFlag(SCHEDULE_IN_PROGRES);
205 setFlag(SCHEDULE_EXECUTED);
206 saveToDatabase(false);
207 s_oneTimeSchedules.sort(ScheduleListSortCallback);
208 MutexUnlock(s_oneTimeScheduleLock);
209 }
210
211
212 /**
213 * Callback for sorting reset list
214 */
215 static int ScheduleListSortCallback(const void *e1, const void *e2)
216 {
217 Schedule * s1 = *((Schedule**)e1);
218 Schedule * s2 = *((Schedule**)e2);
219
220 if (s1->getExecutionTime() == s2->getExecutionTime())
221 {
222 return 0;
223 }
224
225 if (((s1->getExecutionTime() < s2->getExecutionTime()) && (s1->getExecutionTime() != 0)) || (s2->getExecutionTime() == 0))
226 {
227 return -1;
228 }
229 else
230 {
231 return 1;
232 }
233 }
234
235 void InitializeTaskScheduler()
236 {
237 s_taskSchedullPool = ThreadPoolCreate(1, 64, _T("TASKSCHEDULL"));
238 //read from DB configuration
239 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
240 DB_RESULT hResult = DBSelect(hdb, _T("SELECT id,taskId,shedule,params,execution_time,last_execution_time,flags FROM schedule"));
241 if (hResult != NULL)
242 {
243 int count = DBGetNumRows(hResult);
244 for(int i = 0; i < count; i++)
245 {
246 Schedule *sh = new Schedule(hResult, i);
247 if(!_tcscmp(sh->getSchedule(), _T("")))
248 {
249 DbgPrintf(7, _T("InitializeTaskScheduler: Add one time shedule %d, %d"), sh->getId(), sh->getExecutionTime());
250 s_oneTimeSchedules.add(sh);
251 }
252 else
253 {
254 DbgPrintf(7, _T("InitializeTaskScheduler: Add cron shedule %d, %s"), sh->getId(), sh->getSchedule());
255 s_cronSchedules.add(sh);
256 }
257 }
258 DBFreeResult(hResult);
259 }
260 DBConnectionPoolReleaseConnection(hdb);
261 s_oneTimeSchedules.sort(ScheduleListSortCallback);
262 //start threads that will start cron and one time tasks threads
263 ThreadCreate(OneTimeEventThread, 0, NULL);
264 ThreadCreate(CronCheckThread, 0, NULL);
265 }
266
267 void CloseTaskScheduler()
268 {
269 ConditionSet(s_cond);
270 s_cond = INVALID_CONDITION_HANDLE;
271 ConditionDestroy(s_cond);
272 ThreadPoolDestroy(s_taskSchedullPool);
273 MutexDestroy(s_cronScheduleLock);
274 MutexDestroy(s_oneTimeScheduleLock);
275 }
276
277 void AddSchedulleTaskHandler(const TCHAR *id, scheduled_action_executor exec)
278 {
279 s_callbacks.set(id, new ScheduleCallback(exec));
280 DbgPrintf(6, _T("AddSchedulleTaskHandler: Add shedule callback %s"), id);
281 }
282
283 void AddSchedule(const TCHAR *task, const TCHAR *schedule, const TCHAR *params)
284 {
285 DbgPrintf(7, _T("AddSchedule: Add cron shedule %s, %s, %s"), task, schedule, params);
286 MutexLock(s_cronScheduleLock);
287 Schedule *sh = new Schedule(CreateUniqueId(IDG_SCHEDULE), task, schedule, params);
288 sh->saveToDatabase(true);
289 s_cronSchedules.add(sh);
290 MutexUnlock(s_cronScheduleLock);
291 }
292
293 void AddOneTimeAction(const TCHAR *task, time_t nextExecutionTime, const TCHAR *params)
294 {
295 DbgPrintf(7, _T("AddOneTimeAction: Add one time shedule %s, %d, %s"), task, nextExecutionTime, params);
296 MutexLock(s_oneTimeScheduleLock);
297 Schedule *sh = new Schedule(CreateUniqueId(IDG_SCHEDULE), task, nextExecutionTime, params);
298 sh->saveToDatabase(true);
299 s_oneTimeSchedules.add(sh);
300 s_oneTimeSchedules.sort(ScheduleListSortCallback);
301 MutexUnlock(s_oneTimeScheduleLock);
302 ConditionSet(s_cond);
303 }
304
305 void UpdateSchedule(int id, const TCHAR *task, const TCHAR *schedule, const TCHAR *params)
306 {
307 DbgPrintf(7, _T("UpdateSchedule: update cron shedule %d, %s, %s, %s"), id, task, schedule, params);
308 MutexLock(s_cronScheduleLock);
309 for (int i = 0; i < s_cronSchedules.size(); i++)
310 {
311 if (s_cronSchedules.get(i)->getId() == id)
312 {
313 s_cronSchedules.get(i)->update(task, schedule, params);
314 s_cronSchedules.get(i)->saveToDatabase(false);
315 break;
316 }
317 }
318 MutexUnlock(s_cronScheduleLock);
319
320 }
321
322 void UpdateOneTimeAction(int id, const TCHAR *task, time_t nextExecutionTime, const TCHAR *params)
323 {
324 DbgPrintf(7, _T("UpdateOneTimeAction: update one time shedule %d, %s, %d, %s"), id, task, nextExecutionTime, params);
325 bool found = true;
326 MutexLock(s_oneTimeScheduleLock);
327 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
328 {
329 if (s_oneTimeSchedules.get(i)->getId() == id)
330 {
331 s_oneTimeSchedules.get(i)->update(task, nextExecutionTime, params);
332 s_oneTimeSchedules.get(i)->saveToDatabase(false);
333 s_oneTimeSchedules.sort(ScheduleListSortCallback);
334 found = true;
335 break;
336 }
337 }
338 MutexUnlock(s_oneTimeScheduleLock);
339
340 if(found)
341 ConditionSet(s_cond);
342 }
343
344 void DeleteFromDB(UINT32 id)
345 {
346 DB_HANDLE db = DBConnectionPoolAcquireConnection();
347 TCHAR query[256];
348 _sntprintf(query, 256, _T("DELETE FROM schedule WHERE id = %s"), id);
349 DBQuery(db, query);
350 DB_STATEMENT hStmt;
351 DBConnectionPoolReleaseConnection(db);
352 }
353
354 void RemoveSchedule(UINT32 id, bool alreadyLocked)
355 {
356 DbgPrintf(7, _T("RemoveSchedule: shedule(%d) removed"), id);
357 bool found = false;
358
359 MutexLock(s_cronScheduleLock);
360 for (int i = 0; i < s_cronSchedules.size(); i++)
361 {
362 if (s_cronSchedules.get(i)->getId() == id)
363 {
364 s_cronSchedules.remove(i);
365 found = true;
366 break;
367 }
368 }
369 MutexUnlock(s_cronScheduleLock);
370
371 if(found)
372 {
373 DeleteFromDB(id);
374 return;
375 }
376
377 MutexLock(s_oneTimeScheduleLock);
378 for (int i = 0; i < s_oneTimeSchedules.size(); i++)
379 {
380 if (s_oneTimeSchedules.get(i)->getId() == id)
381 {
382 s_oneTimeSchedules.remove(i);
383 s_oneTimeSchedules.sort(ScheduleListSortCallback);
384 found = true;
385 break;
386 }
387 }
388 MutexUnlock(s_oneTimeScheduleLock);
389
390
391 if (found)
392 {
393 ConditionSet(s_cond);
394 DeleteFromDB(id);
395 }
396 }
397
398 static THREAD_RESULT THREAD_CALL OneTimeEventThread(void *arg)
399 {
400 int sleepTime = 1;
401 DbgPrintf(7, _T("OneTimeEventThread: started"));
402 while(true)
403 {
404 if(!ConditionWait(s_cond, sleepTime) && (g_flags & AF_SHUTDOWN))
405 break;
406
407 //ConditionReset(s_cond);
408 time_t now = time(NULL);
409 struct tm currLocal;
410 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
411
412 MutexLock(s_oneTimeScheduleLock);
413 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
414 {
415 Schedule *sh = s_oneTimeSchedules.get(i);
416 ScheduleCallback *callback = s_callbacks.get(sh->getTaskId());
417 if(callback == NULL)
418 {
419 DbgPrintf(3, _T("OneTimeEventThread: One time execution function with taskId=\'%s\' not found"), sh->getTaskId());
420 continue;
421 }
422
423 if(sh->isInProgress())
424 continue;
425
426 //execute all timmers that is expected to execute now
427 if(sh->getExecutionTime() != 0 && now >= sh->getExecutionTime())
428 {
429 DbgPrintf(7, _T("OneTimeEventThread: run shedule id=\'%d\', execution time =\'%d\'"), sh->getId(), sh->getExecutionTime());
430 ThreadPoolExecute(s_taskSchedullPool, sh, &Schedule::run, callback);
431 }
432 else
433 {
434 break;
435 }
436 }
437
438 sleepTime = INFINITE;
439
440 for(int i = 0; i < s_oneTimeSchedules.size(); i++)
441 {
442 Schedule *sh = s_oneTimeSchedules.get(i);
443 if(sh->getExecutionTime() == NEVER)
444 break;
445
446 if(now >= sh->getExecutionTime())
447 continue;
448
449 sleepTime = sh->getExecutionTime() - now;
450 sleepTime = sleepTime < 0 ? 0 : sleepTime * 1000;
451 break;
452 }
453
454 DbgPrintf(7, _T("OneTimeEventThread: thread will sleep for %d"), sleepTime);
455 MutexUnlock(s_oneTimeScheduleLock);
456 }
457 DbgPrintf(3, _T("OneTimeEventThread: stopped"));
458 }
459
460 static THREAD_RESULT THREAD_CALL CronCheckThread(void *arg)
461 {
462 DbgPrintf(3, _T("CronCheckThread: started"));
463 do {
464 time_t now = time(NULL);
465 struct tm currLocal;
466 memcpy(&currLocal, localtime(&now), sizeof(struct tm));
467
468 MutexLock(s_cronScheduleLock);
469 for(int i = 0; i < s_cronSchedules.size(); i++)
470 {
471 Schedule *sh = s_cronSchedules.get(i);
472 ScheduleCallback *callback = s_callbacks.get(sh->getTaskId());
473 if(callback == NULL)
474 {
475 DbgPrintf(3, _T("CronCheckThread: Cron execution function with taskId=\'%s\' not found"), sh->getTaskId());
476 continue;
477 }
478 if(IsItTime(&currLocal, sh->getSchedule(), now))
479 {
480 DbgPrintf(7, _T("OneTimeEventThread: run shedule id=\'%d\', shedule=\'%s\'"), sh->getId(), sh->getSchedule());
481 ThreadPoolExecute(s_taskSchedullPool, sh, &Schedule::run, callback);
482 }
483 }
484 MutexUnlock(s_cronScheduleLock);
485 } while(!SleepAndCheckForShutdown(60)); //sleep 1 minute
486 DbgPrintf(3, _T("CronCheckThread: stopped"));
487 }
488
489 static bool IsItTime(struct tm *currTime, const TCHAR *schedule, time_t currTimestamp)
490 {
491 TCHAR value[256];
492
493 // Minute
494 const TCHAR *curr = ExtractWord(schedule, value);
495 if (!MatchScheduleElement(value, currTime->tm_min, 59, NULL))
496 return false;
497
498 // Hour
499 curr = ExtractWord(curr, value);
500 if (!MatchScheduleElement(value, currTime->tm_hour, 23, NULL))
501 return false;
502
503 // Day of month
504 curr = ExtractWord(curr, value);
505 if (!MatchScheduleElement(value, currTime->tm_mday, GetLastMonthDay(currTime), NULL))
506 return false;
507
508 // Month
509 curr = ExtractWord(curr, value);
510 if (!MatchScheduleElement(value, currTime->tm_mon + 1, 12, NULL))
511 return false;
512
513 // Day of week
514 curr = ExtractWord(curr, value);
515 for(int i = 0; value[i] != 0; i++)
516 if (value[i] == _T('7'))
517 value[i] = _T('0');
518 if (!MatchScheduleElement(value, currTime->tm_wday, 7, currTime))
519 return false;
520
521 return true;
522 }