change log updated
[public/netxms.git] / include / nms_threads.h
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published
7 ** by the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: nms_threads.h
20 **
21 **/
22
23 #ifndef _nms_threads_h_
24 #define _nms_threads_h_
25
26 #ifdef __cplusplus
27
28 #define NMS_THREADS_H_INCLUDED
29
30 #if defined(_WIN32)
31
32 #ifndef UNDER_CE
33 #include <process.h>
34 #endif
35
36 //
37 // Related datatypes and constants
38 //
39
40 #define INVALID_MUTEX_HANDLE (NULL)
41 #define INVALID_CONDITION_HANDLE INVALID_HANDLE_VALUE
42 #define INVALID_THREAD_HANDLE (NULL)
43
44 #ifdef UNDER_CE
45 typedef UINT32 THREAD_RESULT;
46 typedef UINT32 THREAD_ID;
47 #else
48 typedef unsigned int THREAD_RESULT;
49 typedef unsigned int THREAD_ID;
50 #endif
51
52 #define THREAD_OK 0
53
54 #ifdef UNDER_CE
55 #define THREAD_CALL
56 #else
57 #define THREAD_CALL __stdcall
58
59 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
60
61 typedef CRITICAL_SECTION *MUTEX;
62 typedef HANDLE CONDITION;
63 struct netxms_thread_t
64 {
65 HANDLE handle;
66 THREAD_ID id;
67 };
68 typedef struct netxms_thread_t *THREAD;
69
70 typedef struct
71 {
72 ThreadFunction start_address;
73 void *args;
74 } THREAD_START_DATA;
75
76 THREAD_RESULT LIBNETXMS_EXPORTABLE THREAD_CALL SEHThreadStarter(void *);
77 int LIBNETXMS_EXPORTABLE ___ExceptionHandler(EXCEPTION_POINTERS *pInfo);
78
79 void LIBNETXMS_EXPORTABLE SetExceptionHandler(BOOL (*pfHandler)(EXCEPTION_POINTERS *),
80 void (*pfWriter)(const TCHAR *), const TCHAR *pszDumpDir,
81 const TCHAR *pszBaseProcessName, DWORD dwLogMsgCode,
82 BOOL writeFullDump, BOOL printToScreen);
83 BOOL LIBNETXMS_EXPORTABLE SEHDefaultConsoleHandler(EXCEPTION_POINTERS *pInfo);
84 TCHAR LIBNETXMS_EXPORTABLE *SEHExceptionName(DWORD code);
85 void LIBNETXMS_EXPORTABLE SEHShowCallStack(CONTEXT *pCtx);
86
87 void LIBNETXMS_EXPORTABLE SEHServiceExceptionDataWriter(const TCHAR *pszText);
88 BOOL LIBNETXMS_EXPORTABLE SEHServiceExceptionHandler(EXCEPTION_POINTERS *pInfo);
89
90 #define LIBNETXMS_EXCEPTION_HANDLER \
91 } __except(___ExceptionHandler((EXCEPTION_POINTERS *)_exception_info())) { ExitProcess(99); }
92
93 #endif
94
95
96 //
97 // Inline functions
98 //
99
100 inline void InitThreadLibrary()
101 {
102 }
103
104 inline void ThreadSleep(int iSeconds)
105 {
106 Sleep((UINT32)iSeconds * 1000); // Convert to milliseconds
107 }
108
109 inline void ThreadSleepMs(UINT32 dwMilliseconds)
110 {
111 Sleep(dwMilliseconds);
112 }
113
114 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
115 {
116 HANDLE hThread;
117 THREAD_ID dwThreadId;
118
119 #ifdef UNDER_CE
120 hThread = CreateThread(NULL, (UINT32)stack_size, start_address, args, 0, &dwThreadId);
121 #else
122 THREAD_START_DATA *data = (THREAD_START_DATA *)malloc(sizeof(THREAD_START_DATA));
123 data->start_address = start_address;
124 data->args = args;
125 hThread = (HANDLE)_beginthreadex(NULL, stack_size, SEHThreadStarter, data, 0, &dwThreadId);
126 #endif
127 if (hThread != NULL)
128 CloseHandle(hThread);
129 return (hThread != NULL);
130 }
131
132 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
133 {
134 THREAD thread;
135
136 thread = (THREAD)malloc(sizeof(struct netxms_thread_t));
137 #ifdef UNDER_CE
138 thread->handle = CreateThread(NULL, (UINT32)stack_size, start_address, args, 0, &thread->id);
139 if (thread->handle == NULL)
140 {
141 #else
142 THREAD_START_DATA *data = (THREAD_START_DATA *)malloc(sizeof(THREAD_START_DATA));
143 data->start_address = start_address;
144 data->args = args;
145 thread->handle = (HANDLE)_beginthreadex(NULL, stack_size, SEHThreadStarter, data, 0, &thread->id);
146 if ((thread->handle == (HANDLE)-1) || (thread->handle == 0))
147 {
148 free(data);
149 #endif
150 free(thread);
151 thread = INVALID_THREAD_HANDLE;
152 }
153 return thread;
154 }
155
156 #pragma pack(push,8)
157 typedef struct tagTHREADNAME_INFO
158 {
159 DWORD dwType; // Must be 0x1000.
160 LPCSTR szName; // Pointer to name (in user addr space).
161 DWORD dwThreadID; // Thread ID (-1=caller thread).
162 DWORD dwFlags; // Reserved for future use, must be zero.
163 } THREADNAME_INFO;
164 #pragma pack(pop)
165
166 /**
167 * Set thread name. Thread can be set to INVALID_THREAD_HANDLE to change name of current thread.
168 */
169 inline void ThreadSetName(THREAD thread, const char *name)
170 {
171 THREADNAME_INFO info;
172 info.dwType = 0x1000;
173 info.szName = name;
174 info.dwThreadID = (thread != INVALID_THREAD_HANDLE) ? thread->id : (DWORD)-1;
175 info.dwFlags = 0;
176 #pragma warning(push)
177 #pragma warning(disable: 6320 6322)
178 __try
179 {
180 RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
181 }
182 __except(EXCEPTION_EXECUTE_HANDLER)
183 {
184 }
185 #pragma warning(pop)
186 }
187
188 /**
189 * Set name for current thread
190 */
191 inline void ThreadSetName(const char *name)
192 {
193 ThreadSetName(INVALID_THREAD_HANDLE, name);
194 }
195
196 /**
197 * Exit thread
198 */
199 inline void ThreadExit()
200 {
201 #ifdef UNDER_CE
202 ExitThread(0);
203 #else
204 _endthreadex(0);
205 #endif
206 }
207
208 inline void ThreadJoin(THREAD thread)
209 {
210 if (thread != INVALID_THREAD_HANDLE)
211 {
212 WaitForSingleObject(thread->handle, INFINITE);
213 CloseHandle(thread->handle);
214 free(thread);
215 }
216 }
217
218 inline void ThreadDetach(THREAD thread)
219 {
220 if (thread != INVALID_THREAD_HANDLE)
221 {
222 CloseHandle(thread->handle);
223 free(thread);
224 }
225 }
226
227 inline THREAD_ID ThreadId(THREAD thread)
228 {
229 return (thread != INVALID_THREAD_HANDLE) ? thread->id : 0;
230 }
231
232 inline MUTEX MutexCreate()
233 {
234 MUTEX mutex = (MUTEX)malloc(sizeof(CRITICAL_SECTION));
235 InitializeCriticalSectionAndSpinCount(mutex, 4000);
236 return mutex;
237 }
238
239 inline MUTEX MutexCreateRecursive()
240 {
241 return MutexCreate();
242 }
243
244 inline void MutexDestroy(MUTEX mutex)
245 {
246 DeleteCriticalSection(mutex);
247 free(mutex);
248 }
249
250 inline bool MutexLock(MUTEX mutex)
251 {
252 if (mutex == INVALID_MUTEX_HANDLE)
253 return false;
254 EnterCriticalSection(mutex);
255 return true;
256 }
257
258 inline bool MutexTryLock(MUTEX mutex)
259 {
260 if (mutex == INVALID_MUTEX_HANDLE)
261 return false;
262 return TryEnterCriticalSection(mutex) ? true : false;
263 }
264
265 inline void MutexUnlock(MUTEX mutex)
266 {
267 LeaveCriticalSection(mutex);
268 }
269
270 inline CONDITION ConditionCreate(bool bBroadcast)
271 {
272 return CreateEvent(NULL, bBroadcast, FALSE, NULL);
273 }
274
275 inline void ConditionDestroy(CONDITION hCond)
276 {
277 CloseHandle(hCond);
278 }
279
280 inline void ConditionSet(CONDITION hCond)
281 {
282 SetEvent(hCond);
283 }
284
285 inline void ConditionReset(CONDITION hCond)
286 {
287 ResetEvent(hCond);
288 }
289
290 inline void ConditionPulse(CONDITION hCond)
291 {
292 PulseEvent(hCond);
293 }
294
295 inline bool ConditionWait(CONDITION hCond, UINT32 dwTimeOut)
296 {
297 if (hCond == INVALID_CONDITION_HANDLE)
298 return false;
299 return WaitForSingleObject(hCond, dwTimeOut) == WAIT_OBJECT_0;
300 }
301
302 #elif defined(_USE_GNU_PTH)
303
304 /****************************************************************************/
305 /* GNU Pth */
306 /****************************************************************************/
307
308 //
309 // Related datatypes and constants
310 //
311
312 typedef pth_t THREAD;
313 typedef pth_mutex_t * MUTEX;
314 struct netxms_condition_t
315 {
316 pth_cond_t cond;
317 pth_mutex_t mutex;
318 bool broadcast;
319 bool isSet;
320 };
321 typedef struct netxms_condition_t * CONDITION;
322
323 #define INVALID_MUTEX_HANDLE (NULL)
324 #define INVALID_CONDITION_HANDLE (NULL)
325 #define INVALID_THREAD_HANDLE (NULL)
326
327 #ifndef INFINITE
328 #define INFINITE 0xFFFFFFFF
329 #endif
330
331 typedef void *THREAD_RESULT;
332
333 #define THREAD_OK ((void *)0)
334 #define THREAD_CALL
335
336 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
337
338
339 //
340 // Inline functions
341 //
342
343 inline void InitThreadLibrary()
344 {
345 if (!pth_init())
346 {
347 perror("pth_init() failed");
348 exit(200);
349 }
350 }
351
352 inline void ThreadSleep(int nSeconds)
353 {
354 pth_sleep(nSeconds);
355 }
356
357 inline void ThreadSleepMs(UINT32 dwMilliseconds)
358 {
359 pth_usleep(dwMilliseconds * 1000);
360 }
361
362 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
363 {
364 THREAD id;
365
366 if ((id = pth_spawn(PTH_ATTR_DEFAULT, start_address, args)) != NULL)
367 {
368 pth_attr_set(pth_attr_of(id), PTH_ATTR_JOINABLE, 0);
369 return TRUE;
370 }
371 else
372 {
373 return FALSE;
374 }
375 }
376
377 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
378 {
379 THREAD id;
380
381 if ((id = pth_spawn(PTH_ATTR_DEFAULT, start_address, args)) != NULL)
382 {
383 return id;
384 }
385 else
386 {
387 return INVALID_THREAD_HANDLE;
388 }
389 }
390
391 /**
392 * Set thread name
393 */
394 inline void ThreadSetName(THREAD thread, const char *name)
395 {
396 }
397
398 /**
399 * Set name for current thread
400 */
401 inline void ThreadSetName(const char *name)
402 {
403 ThreadSetName(INVALID_THREAD_HANDLE, name);
404 }
405
406 /**
407 * Exit thread
408 */
409 inline void ThreadExit(void)
410 {
411 pth_exit(NULL);
412 }
413
414 inline void ThreadJoin(THREAD hThread)
415 {
416 if (hThread != INVALID_THREAD_HANDLE)
417 pth_join(hThread, NULL);
418 }
419
420 inline void ThreadDetach(THREAD hThread)
421 {
422 if (hThread != INVALID_THREAD_HANDLE)
423 pth_detach(hThread);
424 }
425
426 inline MUTEX MutexCreate(void)
427 {
428 MUTEX mutex;
429
430 mutex = (MUTEX)malloc(sizeof(pth_mutex_t));
431 if (mutex != NULL)
432 {
433 pth_mutex_init(mutex);
434 }
435 return mutex;
436 }
437
438 inline MUTEX MutexCreateRecursive()
439 {
440 MUTEX mutex;
441
442 // In libpth, recursive locking is explicitly supported,
443 // so we just create mutex
444 mutex = (MUTEX)malloc(sizeof(pth_mutex_t));
445 if (mutex != NULL)
446 {
447 pth_mutex_init(mutex);
448 }
449 return mutex;
450 }
451
452 inline void MutexDestroy(MUTEX mutex)
453 {
454 if (mutex != NULL)
455 free(mutex);
456 }
457
458 inline bool MutexLock(MUTEX mutex)
459 {
460 return (mutex != NULL) ? (pth_mutex_acquire(mutex, FALSE, NULL) != 0) : false;
461 }
462
463 inline bool MutexTryLock(MUTEX mutex)
464 {
465 return (mutex != NULL) ? (pth_mutex_acquire(mutex, TRUE, NULL) != 0) : false;
466 }
467
468 inline void MutexUnlock(MUTEX mutex)
469 {
470 if (mutex != NULL)
471 pth_mutex_release(mutex);
472 }
473
474 inline CONDITION ConditionCreate(bool bBroadcast)
475 {
476 CONDITION cond;
477
478 cond = (CONDITION)malloc(sizeof(struct netxms_condition_t));
479 if (cond != NULL)
480 {
481 pth_cond_init(&cond->cond);
482 pth_mutex_init(&cond->mutex);
483 cond->broadcast = bBroadcast;
484 cond->isSet = FALSE;
485 }
486
487 return cond;
488 }
489
490 inline void ConditionDestroy(CONDITION cond)
491 {
492 if (cond != INVALID_CONDITION_HANDLE)
493 {
494 free(cond);
495 }
496 }
497
498 inline void ConditionSet(CONDITION cond)
499 {
500 if (cond != INVALID_CONDITION_HANDLE)
501 {
502 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
503 cond->isSet = TRUE;
504 pth_cond_notify(&cond->cond, cond->broadcast);
505 pth_mutex_release(&cond->mutex);
506 }
507 }
508
509 inline void ConditionReset(CONDITION cond)
510 {
511 if (cond != INVALID_CONDITION_HANDLE)
512 {
513 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
514 cond->isSet = FALSE;
515 pth_mutex_release(&cond->mutex);
516 }
517 }
518
519 inline void ConditionPulse(CONDITION cond)
520 {
521 if (cond != INVALID_CONDITION_HANDLE)
522 {
523 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
524 pth_cond_notify(&cond->cond, cond->broadcast);
525 cond->isSet = FALSE;
526 pth_mutex_release(&cond->mutex);
527 }
528 }
529
530 inline bool ConditionWait(CONDITION cond, UINT32 dwTimeOut)
531 {
532 bool ret = false;
533
534 if (cond != NULL)
535 {
536 int retcode;
537
538 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
539 if (cond->isSet)
540 {
541 ret = true;
542 if (!cond->broadcast)
543 cond->isSet = FALSE;
544 }
545 else
546 {
547 if (dwTimeOut != INFINITE)
548 {
549 pth_event_t ev;
550
551 ev = pth_event(PTH_EVENT_TIME, pth_timeout(dwTimeOut / 1000, (dwTimeOut % 1000) * 1000));
552 retcode = pth_cond_await(&cond->cond, &cond->mutex, ev);
553 pth_event_free(ev, PTH_FREE_ALL);
554 }
555 else
556 {
557 retcode = pth_cond_await(&cond->cond, &cond->mutex, NULL);
558 }
559
560 if (retcode)
561 {
562 if (!cond->broadcast)
563 cond->isSet = FALSE;
564 ret = true;
565 }
566 }
567
568 pth_mutex_release(&cond->mutex);
569 }
570
571 return ret;
572 }
573
574 inline UINT32 GetCurrentProcessId()
575 {
576 return getpid();
577 }
578
579 inline THREAD GetCurrentThreadId()
580 {
581 return pth_self();
582 }
583
584 #else /* not _WIN32 && not _USE_GNU_PTH */
585
586 /****************************************************************************/
587 /* pthreads */
588 /****************************************************************************/
589
590 #include <pthread.h>
591 #include <errno.h>
592 #include <sys/time.h>
593
594 #if HAVE_PTHREAD_NP_H && !defined(_IPSO)
595 #include <pthread_np.h>
596 #endif
597
598 #if (HAVE_PTHREAD_MUTEXATTR_SETTYPE || HAVE___PTHREAD_MUTEXATTR_SETTYPE || HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && \
599 HAVE_DECL_PTHREAD_MUTEXATTR_SETTYPE && \
600 (HAVE_DECL_PTHREAD_MUTEX_RECURSIVE || \
601 HAVE_DECL_PTHREAD_MUTEX_RECURSIVE_NP || \
602 HAVE_DECL_MUTEX_TYPE_COUNTING_FAST)
603
604 #define HAVE_RECURSIVE_MUTEXES 1
605
606 #if HAVE_DECL_PTHREAD_MUTEX_RECURSIVE
607 #define MUTEX_RECURSIVE_FLAG PTHREAD_MUTEX_RECURSIVE
608 #elif HAVE_DECL_PTHREAD_MUTEX_RECURSIVE_NP
609 #define MUTEX_RECURSIVE_FLAG PTHREAD_MUTEX_RECURSIVE_NP
610 #elif HAVE_DECL_MUTEX_TYPE_COUNTING_FAST
611 #define MUTEX_RECURSIVE_FLAG MUTEX_TYPE_COUNTING_FAST
612 #else
613 #error Constant used to declare recursive mutex is not known
614 #endif
615
616 #if HAVE_PTHREAD_MUTEXATTR_SETTYPE || HAVE_DECL_PTHREAD_MUTEXATTR_SETTYPE
617 #define MUTEXATTR_SETTYPE pthread_mutexattr_settype
618 #elif HAVE___PTHREAD_MUTEXATTR_SETTYPE
619 #define MUTEXATTR_SETTYPE __pthread_mutexattr_settype
620 #else
621 #define MUTEXATTR_SETTYPE pthread_mutexattr_setkind_np
622 #endif
623
624 #endif
625
626 //
627 // Related datatypes and constants
628 //
629
630 typedef pthread_t THREAD;
631 struct netxms_mutex_t
632 {
633 pthread_mutex_t mutex;
634 #ifndef HAVE_RECURSIVE_MUTEXES
635 bool isRecursive;
636 pthread_t owner;
637 #endif
638 };
639 typedef netxms_mutex_t * MUTEX;
640 struct netxms_condition_t
641 {
642 pthread_cond_t cond;
643 pthread_mutex_t mutex;
644 bool broadcast;
645 bool isSet;
646 };
647 typedef struct netxms_condition_t * CONDITION;
648
649 #define INVALID_MUTEX_HANDLE (NULL)
650 #define INVALID_CONDITION_HANDLE (NULL)
651 #define INVALID_THREAD_HANDLE 0
652
653 #ifndef INFINITE
654 #define INFINITE 0xFFFFFFFF
655 #endif
656
657 typedef void *THREAD_RESULT;
658
659 #define THREAD_OK ((void *)0)
660 #define THREAD_CALL
661
662 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
663
664
665 //
666 // Inline functions
667 //
668
669 inline void InitThreadLibrary()
670 {
671 }
672
673 inline void ThreadSleep(int nSeconds)
674 {
675 #ifdef _NETWARE
676 sleep(nSeconds);
677 #else
678 struct timeval tv;
679
680 tv.tv_sec = nSeconds;
681 tv.tv_usec = 0;
682
683 select(1, NULL, NULL, NULL, &tv);
684 #endif
685 }
686
687 inline void ThreadSleepMs(UINT32 dwMilliseconds)
688 {
689 #if HAVE_NANOSLEEP && HAVE_DECL_NANOSLEEP
690 struct timespec interval, remainder;
691
692 interval.tv_sec = dwMilliseconds / 1000;
693 interval.tv_nsec = (dwMilliseconds % 1000) * 1000000; // milli -> nano
694 nanosleep(&interval, &remainder);
695 #else
696 usleep(dwMilliseconds * 1000); // Convert to microseconds
697 #endif
698 }
699
700 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
701 {
702 THREAD id;
703
704 if (stack_size <= 0)
705 {
706 // TODO: Find out minimal stack size
707 stack_size = 1024 * 1024; // 1MB
708 // set stack size to 1mb (it's windows default - and application works,
709 // we need to investigate more on this)
710 }
711 pthread_attr_t attr;
712 pthread_attr_init(&attr);
713 pthread_attr_setstacksize(&attr, stack_size);
714
715 if (pthread_create(&id, &attr, start_address, args) != 0)
716 {
717 id = INVALID_THREAD_HANDLE;
718 }
719
720 pthread_attr_destroy(&attr);
721
722 return id;
723 }
724
725 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
726 {
727 THREAD id = ThreadCreateEx(start_address, stack_size, args);
728
729 if (id != INVALID_THREAD_HANDLE)
730 {
731 pthread_detach(id);
732 return TRUE;
733 }
734
735 return FALSE;
736 }
737
738 /**
739 * Set thread name
740 */
741 inline void ThreadSetName(THREAD thread, const char *name)
742 {
743 #if HAVE_PTHREAD_SETNAME_NP
744 pthread_setname_np((thread != INVALID_THREAD_HANDLE) ? thread : pthread_self(), name);
745 #endif
746 }
747
748 /**
749 * Set name for current thread
750 */
751 inline void ThreadSetName(const char *name)
752 {
753 ThreadSetName(INVALID_THREAD_HANDLE, name);
754 }
755
756 /**
757 * Exit thread
758 */
759 inline void ThreadExit()
760 {
761 pthread_exit(NULL);
762 }
763
764 inline void ThreadJoin(THREAD hThread)
765 {
766 if (hThread != INVALID_THREAD_HANDLE)
767 pthread_join(hThread, NULL);
768 }
769
770 inline void ThreadDetach(THREAD hThread)
771 {
772 if (hThread != INVALID_THREAD_HANDLE)
773 pthread_detach(hThread);
774 }
775
776 inline MUTEX MutexCreate()
777 {
778 MUTEX mutex;
779
780 mutex = (MUTEX)malloc(sizeof(netxms_mutex_t));
781 if (mutex != NULL)
782 {
783 pthread_mutex_init(&mutex->mutex, NULL);
784 #ifndef HAVE_RECURSIVE_MUTEXES
785 mutex->isRecursive = FALSE;
786 #endif
787 }
788 return mutex;
789 }
790
791 inline MUTEX MutexCreateRecursive()
792 {
793 MUTEX mutex;
794
795 mutex = (MUTEX)malloc(sizeof(netxms_mutex_t));
796 if (mutex != NULL)
797 {
798 #ifdef HAVE_RECURSIVE_MUTEXES
799 pthread_mutexattr_t a;
800
801 pthread_mutexattr_init(&a);
802 MUTEXATTR_SETTYPE(&a, MUTEX_RECURSIVE_FLAG);
803 pthread_mutex_init(&mutex->mutex, &a);
804 pthread_mutexattr_destroy(&a);
805 #else
806 mutex->isRecursive = TRUE;
807 #error FIXME: implement recursive mutexes
808 #endif
809 }
810 return mutex;
811 }
812
813 inline void MutexDestroy(MUTEX mutex)
814 {
815 if (mutex != NULL)
816 {
817 pthread_mutex_destroy(&mutex->mutex);
818 free(mutex);
819 }
820 }
821
822 inline bool MutexLock(MUTEX mutex)
823 {
824 return (mutex != NULL) ? (pthread_mutex_lock(&mutex->mutex) == 0) : false;
825 }
826
827 inline bool MutexTryLock(MUTEX mutex)
828 {
829 return (mutex != NULL) ? (pthread_mutex_trylock(&mutex->mutex) == 0) : false;
830 }
831
832 inline void MutexUnlock(MUTEX mutex)
833 {
834 if (mutex != NULL)
835 pthread_mutex_unlock(&mutex->mutex);
836 }
837
838 inline CONDITION ConditionCreate(bool bBroadcast)
839 {
840 CONDITION cond;
841
842 cond = (CONDITION)malloc(sizeof(struct netxms_condition_t));
843 if (cond != NULL)
844 {
845 pthread_cond_init(&cond->cond, NULL);
846 pthread_mutex_init(&cond->mutex, NULL);
847 cond->broadcast = bBroadcast;
848 cond->isSet = FALSE;
849 }
850
851 return cond;
852 }
853
854 inline void ConditionDestroy(CONDITION cond)
855 {
856 if (cond != INVALID_CONDITION_HANDLE)
857 {
858 pthread_cond_destroy(&cond->cond);
859 pthread_mutex_destroy(&cond->mutex);
860 free(cond);
861 }
862 }
863
864 inline void ConditionSet(CONDITION cond)
865 {
866 if (cond != INVALID_CONDITION_HANDLE)
867 {
868 pthread_mutex_lock(&cond->mutex);
869 cond->isSet = TRUE;
870 if (cond->broadcast)
871 {
872 pthread_cond_broadcast(&cond->cond);
873 }
874 else
875 {
876 pthread_cond_signal(&cond->cond);
877 }
878 pthread_mutex_unlock(&cond->mutex);
879 }
880 }
881
882 inline void ConditionReset(CONDITION cond)
883 {
884 if (cond != INVALID_CONDITION_HANDLE)
885 {
886 pthread_mutex_lock(&cond->mutex);
887 cond->isSet = FALSE;
888 pthread_mutex_unlock(&cond->mutex);
889 }
890 }
891
892 inline void ConditionPulse(CONDITION cond)
893 {
894 if (cond != INVALID_CONDITION_HANDLE)
895 {
896 pthread_mutex_lock(&cond->mutex);
897 if (cond->broadcast)
898 {
899 pthread_cond_broadcast(&cond->cond);
900 }
901 else
902 {
903 pthread_cond_signal(&cond->cond);
904 }
905 cond->isSet = FALSE;
906 pthread_mutex_unlock(&cond->mutex);
907 }
908 }
909
910 inline bool ConditionWait(CONDITION cond, UINT32 dwTimeOut)
911 {
912 bool ret = FALSE;
913
914 if (cond != NULL)
915 {
916 int retcode;
917
918 pthread_mutex_lock(&cond->mutex);
919 if (cond->isSet)
920 {
921 ret = true;
922 if (!cond->broadcast)
923 cond->isSet = FALSE;
924 }
925 else
926 {
927 if (dwTimeOut != INFINITE)
928 {
929 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
930 struct timespec timeout;
931
932 timeout.tv_sec = dwTimeOut / 1000;
933 timeout.tv_nsec = (dwTimeOut % 1000) * 1000000;
934 #ifdef _NETWARE
935 retcode = pthread_cond_timedwait(&cond->cond, &cond->mutex, &timeout);
936 #else
937 retcode = pthread_cond_reltimedwait_np(&cond->cond, &cond->mutex, &timeout);
938 #endif
939 #else
940 struct timeval now;
941 struct timespec timeout;
942
943 // note.
944 // mili - 10^-3
945 // micro - 10^-6
946 // nano - 10^-9
947
948 // FIXME there should be more accurate way
949 gettimeofday(&now, NULL);
950 timeout.tv_sec = now.tv_sec + (dwTimeOut / 1000);
951
952 now.tv_usec += (dwTimeOut % 1000) * 1000;
953 timeout.tv_sec += now.tv_usec / 1000000;
954 timeout.tv_nsec = (now.tv_usec % 1000000) * 1000;
955
956 retcode = pthread_cond_timedwait(&cond->cond, &cond->mutex, &timeout);
957 #endif
958 }
959 else
960 {
961 retcode = pthread_cond_wait(&cond->cond, &cond->mutex);
962 }
963
964 if (retcode == 0)
965 {
966 if (!cond->broadcast)
967 cond->isSet = FALSE;
968 ret = true;
969 }
970 }
971
972 pthread_mutex_unlock(&cond->mutex);
973 }
974
975 return ret;
976 }
977
978 inline UINT32 GetCurrentProcessId()
979 {
980 return getpid();
981 }
982
983 inline THREAD GetCurrentThreadId()
984 {
985 return pthread_self();
986 }
987
988 #endif /* _WIN32 */
989
990 #include <rwlock.h>
991
992 /**
993 * String list
994 */
995 class StringList;
996
997 /**
998 * Thread pool
999 */
1000 struct ThreadPool;
1001
1002 /**
1003 * Thread pool information
1004 */
1005 struct ThreadPoolInfo
1006 {
1007 const TCHAR *name; // pool name
1008 int minThreads; // min threads
1009 int maxThreads; // max threads
1010 int curThreads; // current threads
1011 int activeRequests; // number of active requests
1012 int usage; // Pool usage in %
1013 int load; // Pool current load in % (can be more than 100% if there are more requests then threads available)
1014 double loadAvg[3]; // Pool load average
1015 };
1016
1017 /**
1018 * Worker function for thread pool
1019 */
1020 typedef void (* ThreadPoolWorkerFunction)(void *);
1021
1022 /* Thread pool functions */
1023 ThreadPool LIBNETXMS_EXPORTABLE *ThreadPoolCreate(int minThreads, int maxThreads, const TCHAR *name);
1024 void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p);
1025 void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunction f, void *arg);
1026 void LIBNETXMS_EXPORTABLE ThreadPoolExecuteSerialized(ThreadPool *p, const TCHAR *key, ThreadPoolWorkerFunction f, void *arg);
1027 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleAbsolute(ThreadPool *p, time_t runTime, ThreadPoolWorkerFunction f, void *arg);
1028 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleRelative(ThreadPool *p, UINT32 delay, ThreadPoolWorkerFunction f, void *arg);
1029 void LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(ThreadPool *p, ThreadPoolInfo *info);
1030 bool LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(const TCHAR *name, ThreadPoolInfo *info);
1031 StringList LIBNETXMS_EXPORTABLE *ThreadPoolGetAllPools();
1032
1033 /**
1034 * Wrapper data for ThreadPoolExecute
1035 */
1036 template <typename T, typename R> class __ThreadPoolExecute_WrapperData
1037 {
1038 public:
1039 T *m_object;
1040 void (T::*m_func)(R);
1041 R m_arg;
1042
1043 __ThreadPoolExecute_WrapperData(T *object, void (T::*func)(R), R arg) { m_object = object; m_func = func; m_arg = arg; }
1044 };
1045
1046 /**
1047 * Wrapper for ThreadPoolExecute
1048 */
1049 template <typename T, typename R> void __ThreadPoolExecute_Wrapper(void *arg)
1050 {
1051 __ThreadPoolExecute_WrapperData<T, R> *wd = static_cast<__ThreadPoolExecute_WrapperData<T, R> *>(arg);
1052 ((*wd->m_object).*(wd->m_func))(wd->m_arg);
1053 delete wd;
1054 }
1055
1056 /**
1057 * Execute task as soon as possible (use class member with one argument)
1058 */
1059 template <typename T, typename B, typename R> inline void ThreadPoolExecute(ThreadPool *p, T *object, void (B::*f)(R), R arg)
1060 {
1061 ThreadPoolExecute(p, __ThreadPoolExecute_Wrapper<B,R>, new __ThreadPoolExecute_WrapperData<B, R>(object, f, arg));
1062 }
1063
1064 /**
1065 * Wrappers for mutex
1066 */
1067 class LIBNETXMS_EXPORTABLE Mutex
1068 {
1069 private:
1070 MUTEX m_mutex;
1071 VolatileCounter *m_refCount;
1072
1073 public:
1074 Mutex();
1075 Mutex(const Mutex& src);
1076 ~Mutex();
1077
1078 Mutex& operator =(const Mutex &src);
1079
1080 void lock() { MutexLock(m_mutex); }
1081 bool tryLock() { return MutexTryLock(m_mutex); }
1082 void unlock() { MutexUnlock(m_mutex); }
1083 };
1084
1085 /**
1086 * Wrappers for read/write lock
1087 */
1088 class LIBNETXMS_EXPORTABLE RWLock
1089 {
1090 private:
1091 RWLOCK m_rwlock;
1092 VolatileCounter *m_refCount;
1093
1094 public:
1095 RWLock();
1096 RWLock(const RWLock& src);
1097 ~RWLock();
1098
1099 RWLock& operator =(const RWLock &src);
1100
1101 void readLock(UINT32 timeout = INFINITE) { RWLockReadLock(m_rwlock, timeout); }
1102 void writeLock(UINT32 timeout = INFINITE) { RWLockWriteLock(m_rwlock, timeout); }
1103 void unlock() { RWLockUnlock(m_rwlock); }
1104 };
1105
1106 /**
1107 * Wrappers for condition
1108 */
1109 class LIBNETXMS_EXPORTABLE Condition
1110 {
1111 private:
1112 CONDITION m_condition;
1113 VolatileCounter *m_refCount;
1114
1115 public:
1116 Condition(bool broadcast);
1117 Condition(const Condition& src);
1118 ~Condition();
1119
1120 Condition& operator =(const Condition &src);
1121
1122 void set() { ConditionSet(m_condition); }
1123 void pulse() { ConditionPulse(m_condition); }
1124 void reset() { ConditionReset(m_condition); }
1125 bool wait(UINT32 timeout = INFINITE) { return ConditionWait(m_condition, timeout); }
1126 };
1127
1128 #endif /* __cplusplus */
1129
1130 #endif /* _nms_threads_h_ */