Fixed merge conflict
[public/netxms.git] / include / nms_threads.h
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published
7 ** by the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: nms_threads.h
20 **
21 **/
22
23 #ifndef _nms_threads_h_
24 #define _nms_threads_h_
25
26 #ifdef __cplusplus
27
28 #define NMS_THREADS_H_INCLUDED
29
30 #if defined(_WIN32)
31
32 #ifndef UNDER_CE
33 #include <process.h>
34 #endif
35
36 //
37 // Related datatypes and constants
38 //
39
40 #define INVALID_MUTEX_HANDLE (NULL)
41 #define INVALID_CONDITION_HANDLE INVALID_HANDLE_VALUE
42 #define INVALID_THREAD_HANDLE (NULL)
43
44 #ifdef UNDER_CE
45 typedef UINT32 THREAD_RESULT;
46 typedef UINT32 THREAD_ID;
47 #else
48 typedef unsigned int THREAD_RESULT;
49 typedef unsigned int THREAD_ID;
50 #endif
51
52 #define THREAD_OK 0
53
54 #ifdef UNDER_CE
55 #define THREAD_CALL
56 #else
57 #define THREAD_CALL __stdcall
58
59 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
60
61 typedef CRITICAL_SECTION *MUTEX;
62 typedef HANDLE CONDITION;
63 struct netxms_thread_t
64 {
65 HANDLE handle;
66 THREAD_ID id;
67 };
68 typedef struct netxms_thread_t *THREAD;
69
70 typedef struct
71 {
72 ThreadFunction start_address;
73 void *args;
74 } THREAD_START_DATA;
75
76 THREAD_RESULT LIBNETXMS_EXPORTABLE THREAD_CALL SEHThreadStarter(void *);
77 int LIBNETXMS_EXPORTABLE ___ExceptionHandler(EXCEPTION_POINTERS *pInfo);
78
79 void LIBNETXMS_EXPORTABLE SetExceptionHandler(BOOL (*pfHandler)(EXCEPTION_POINTERS *),
80 void (*pfWriter)(const TCHAR *), const TCHAR *pszDumpDir,
81 const TCHAR *pszBaseProcessName, DWORD dwLogMsgCode,
82 BOOL writeFullDump, BOOL printToScreen);
83 BOOL LIBNETXMS_EXPORTABLE SEHDefaultConsoleHandler(EXCEPTION_POINTERS *pInfo);
84 TCHAR LIBNETXMS_EXPORTABLE *SEHExceptionName(DWORD code);
85 void LIBNETXMS_EXPORTABLE SEHShowCallStack(CONTEXT *pCtx);
86
87 void LIBNETXMS_EXPORTABLE SEHServiceExceptionDataWriter(const TCHAR *pszText);
88 BOOL LIBNETXMS_EXPORTABLE SEHServiceExceptionHandler(EXCEPTION_POINTERS *pInfo);
89
90 #define LIBNETXMS_EXCEPTION_HANDLER \
91 } __except(___ExceptionHandler((EXCEPTION_POINTERS *)_exception_info())) { ExitProcess(99); }
92
93 #endif
94
95
96 //
97 // Inline functions
98 //
99
100 inline void InitThreadLibrary()
101 {
102 }
103
104 inline void ThreadSleep(int iSeconds)
105 {
106 Sleep((UINT32)iSeconds * 1000); // Convert to milliseconds
107 }
108
109 inline void ThreadSleepMs(UINT32 dwMilliseconds)
110 {
111 Sleep(dwMilliseconds);
112 }
113
114 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
115 {
116 HANDLE hThread;
117 THREAD_ID dwThreadId;
118
119 #ifdef UNDER_CE
120 hThread = CreateThread(NULL, (UINT32)stack_size, start_address, args, 0, &dwThreadId);
121 #else
122 THREAD_START_DATA *data = (THREAD_START_DATA *)malloc(sizeof(THREAD_START_DATA));
123 data->start_address = start_address;
124 data->args = args;
125 hThread = (HANDLE)_beginthreadex(NULL, stack_size, SEHThreadStarter, data, 0, &dwThreadId);
126 #endif
127 if (hThread != NULL)
128 CloseHandle(hThread);
129 return (hThread != NULL);
130 }
131
132 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
133 {
134 THREAD thread;
135
136 thread = (THREAD)malloc(sizeof(struct netxms_thread_t));
137 #ifdef UNDER_CE
138 thread->handle = CreateThread(NULL, (UINT32)stack_size, start_address, args, 0, &thread->id);
139 if (thread->handle == NULL)
140 {
141 #else
142 THREAD_START_DATA *data = (THREAD_START_DATA *)malloc(sizeof(THREAD_START_DATA));
143 data->start_address = start_address;
144 data->args = args;
145 thread->handle = (HANDLE)_beginthreadex(NULL, stack_size, SEHThreadStarter, data, 0, &thread->id);
146 if ((thread->handle == (HANDLE)-1) || (thread->handle == 0))
147 {
148 free(data);
149 #endif
150 free(thread);
151 thread = INVALID_THREAD_HANDLE;
152 }
153 return thread;
154 }
155
156 inline void ThreadExit()
157 {
158 #ifdef UNDER_CE
159 ExitThread(0);
160 #else
161 _endthreadex(0);
162 #endif
163 }
164
165 inline void ThreadJoin(THREAD thread)
166 {
167 if (thread != INVALID_THREAD_HANDLE)
168 {
169 WaitForSingleObject(thread->handle, INFINITE);
170 CloseHandle(thread->handle);
171 free(thread);
172 }
173 }
174
175 inline void ThreadDetach(THREAD thread)
176 {
177 if (thread != INVALID_THREAD_HANDLE)
178 {
179 CloseHandle(thread->handle);
180 free(thread);
181 }
182 }
183
184 inline THREAD_ID ThreadId(THREAD thread)
185 {
186 return (thread != INVALID_THREAD_HANDLE) ? thread->id : 0;
187 }
188
189 inline MUTEX MutexCreate()
190 {
191 MUTEX mutex = (MUTEX)malloc(sizeof(CRITICAL_SECTION));
192 InitializeCriticalSectionAndSpinCount(mutex, 4000);
193 return mutex;
194 }
195
196 inline MUTEX MutexCreateRecursive()
197 {
198 return MutexCreate();
199 }
200
201 inline void MutexDestroy(MUTEX mutex)
202 {
203 DeleteCriticalSection(mutex);
204 free(mutex);
205 }
206
207 inline bool MutexLock(MUTEX mutex)
208 {
209 if (mutex == INVALID_MUTEX_HANDLE)
210 return false;
211 EnterCriticalSection(mutex);
212 return true;
213 }
214
215 inline bool MutexTryLock(MUTEX mutex)
216 {
217 if (mutex == INVALID_MUTEX_HANDLE)
218 return false;
219 return TryEnterCriticalSection(mutex) ? true : false;
220 }
221
222 inline void MutexUnlock(MUTEX mutex)
223 {
224 LeaveCriticalSection(mutex);
225 }
226
227 inline CONDITION ConditionCreate(bool bBroadcast)
228 {
229 return CreateEvent(NULL, bBroadcast, FALSE, NULL);
230 }
231
232 inline void ConditionDestroy(CONDITION hCond)
233 {
234 CloseHandle(hCond);
235 }
236
237 inline void ConditionSet(CONDITION hCond)
238 {
239 SetEvent(hCond);
240 }
241
242 inline void ConditionReset(CONDITION hCond)
243 {
244 ResetEvent(hCond);
245 }
246
247 inline void ConditionPulse(CONDITION hCond)
248 {
249 PulseEvent(hCond);
250 }
251
252 inline bool ConditionWait(CONDITION hCond, UINT32 dwTimeOut)
253 {
254 if (hCond == INVALID_CONDITION_HANDLE)
255 return false;
256 return WaitForSingleObject(hCond, dwTimeOut) == WAIT_OBJECT_0;
257 }
258
259 #elif defined(_USE_GNU_PTH)
260
261 /****************************************************************************/
262 /* GNU Pth */
263 /****************************************************************************/
264
265 //
266 // Related datatypes and constants
267 //
268
269 typedef pth_t THREAD;
270 typedef pth_mutex_t * MUTEX;
271 struct netxms_condition_t
272 {
273 pth_cond_t cond;
274 pth_mutex_t mutex;
275 bool broadcast;
276 bool isSet;
277 };
278 typedef struct netxms_condition_t * CONDITION;
279
280 #define INVALID_MUTEX_HANDLE (NULL)
281 #define INVALID_CONDITION_HANDLE (NULL)
282 #define INVALID_THREAD_HANDLE (NULL)
283
284 #ifndef INFINITE
285 #define INFINITE 0xFFFFFFFF
286 #endif
287
288 typedef void *THREAD_RESULT;
289
290 #define THREAD_OK ((void *)0)
291 #define THREAD_CALL
292
293 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
294
295
296 //
297 // Inline functions
298 //
299
300 inline void InitThreadLibrary()
301 {
302 if (!pth_init())
303 {
304 perror("pth_init() failed");
305 exit(200);
306 }
307 }
308
309 inline void ThreadSleep(int nSeconds)
310 {
311 pth_sleep(nSeconds);
312 }
313
314 inline void ThreadSleepMs(UINT32 dwMilliseconds)
315 {
316 pth_usleep(dwMilliseconds * 1000);
317 }
318
319 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
320 {
321 THREAD id;
322
323 if ((id = pth_spawn(PTH_ATTR_DEFAULT, start_address, args)) != NULL)
324 {
325 pth_attr_set(pth_attr_of(id), PTH_ATTR_JOINABLE, 0);
326 return TRUE;
327 }
328 else
329 {
330 return FALSE;
331 }
332 }
333
334 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
335 {
336 THREAD id;
337
338 if ((id = pth_spawn(PTH_ATTR_DEFAULT, start_address, args)) != NULL)
339 {
340 return id;
341 }
342 else
343 {
344 return INVALID_THREAD_HANDLE;
345 }
346 }
347
348 inline void ThreadExit(void)
349 {
350 pth_exit(NULL);
351 }
352
353 inline void ThreadJoin(THREAD hThread)
354 {
355 if (hThread != INVALID_THREAD_HANDLE)
356 pth_join(hThread, NULL);
357 }
358
359 inline void ThreadDetach(THREAD hThread)
360 {
361 if (hThread != INVALID_THREAD_HANDLE)
362 pth_detach(hThread);
363 }
364
365 inline MUTEX MutexCreate(void)
366 {
367 MUTEX mutex;
368
369 mutex = (MUTEX)malloc(sizeof(pth_mutex_t));
370 if (mutex != NULL)
371 {
372 pth_mutex_init(mutex);
373 }
374 return mutex;
375 }
376
377 inline MUTEX MutexCreateRecursive()
378 {
379 MUTEX mutex;
380
381 // In libpth, recursive locking is explicitly supported,
382 // so we just create mutex
383 mutex = (MUTEX)malloc(sizeof(pth_mutex_t));
384 if (mutex != NULL)
385 {
386 pth_mutex_init(mutex);
387 }
388 return mutex;
389 }
390
391 inline void MutexDestroy(MUTEX mutex)
392 {
393 if (mutex != NULL)
394 free(mutex);
395 }
396
397 inline bool MutexLock(MUTEX mutex)
398 {
399 return (mutex != NULL) ? (pth_mutex_acquire(mutex, FALSE, NULL) != 0) : false;
400 }
401
402 inline bool MutexTryLock(MUTEX mutex)
403 {
404 return (mutex != NULL) ? (pth_mutex_acquire(mutex, TRUE, NULL) != 0) : false;
405 }
406
407 inline void MutexUnlock(MUTEX mutex)
408 {
409 if (mutex != NULL)
410 pth_mutex_release(mutex);
411 }
412
413 inline CONDITION ConditionCreate(bool bBroadcast)
414 {
415 CONDITION cond;
416
417 cond = (CONDITION)malloc(sizeof(struct netxms_condition_t));
418 if (cond != NULL)
419 {
420 pth_cond_init(&cond->cond);
421 pth_mutex_init(&cond->mutex);
422 cond->broadcast = bBroadcast;
423 cond->isSet = FALSE;
424 }
425
426 return cond;
427 }
428
429 inline void ConditionDestroy(CONDITION cond)
430 {
431 if (cond != INVALID_CONDITION_HANDLE)
432 {
433 free(cond);
434 }
435 }
436
437 inline void ConditionSet(CONDITION cond)
438 {
439 if (cond != INVALID_CONDITION_HANDLE)
440 {
441 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
442 cond->isSet = TRUE;
443 pth_cond_notify(&cond->cond, cond->broadcast);
444 pth_mutex_release(&cond->mutex);
445 }
446 }
447
448 inline void ConditionReset(CONDITION cond)
449 {
450 if (cond != INVALID_CONDITION_HANDLE)
451 {
452 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
453 cond->isSet = FALSE;
454 pth_mutex_release(&cond->mutex);
455 }
456 }
457
458 inline void ConditionPulse(CONDITION cond)
459 {
460 if (cond != INVALID_CONDITION_HANDLE)
461 {
462 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
463 pth_cond_notify(&cond->cond, cond->broadcast);
464 cond->isSet = FALSE;
465 pth_mutex_release(&cond->mutex);
466 }
467 }
468
469 inline bool ConditionWait(CONDITION cond, UINT32 dwTimeOut)
470 {
471 bool ret = false;
472
473 if (cond != NULL)
474 {
475 int retcode;
476
477 pth_mutex_acquire(&cond->mutex, FALSE, NULL);
478 if (cond->isSet)
479 {
480 ret = true;
481 if (!cond->broadcast)
482 cond->isSet = FALSE;
483 }
484 else
485 {
486 if (dwTimeOut != INFINITE)
487 {
488 pth_event_t ev;
489
490 ev = pth_event(PTH_EVENT_TIME, pth_timeout(dwTimeOut / 1000, (dwTimeOut % 1000) * 1000));
491 retcode = pth_cond_await(&cond->cond, &cond->mutex, ev);
492 pth_event_free(ev, PTH_FREE_ALL);
493 }
494 else
495 {
496 retcode = pth_cond_await(&cond->cond, &cond->mutex, NULL);
497 }
498
499 if (retcode)
500 {
501 if (!cond->broadcast)
502 cond->isSet = FALSE;
503 ret = true;
504 }
505 }
506
507 pth_mutex_release(&cond->mutex);
508 }
509
510 return ret;
511 }
512
513 inline UINT32 GetCurrentProcessId()
514 {
515 return getpid();
516 }
517
518 inline THREAD GetCurrentThreadId()
519 {
520 return pth_self();
521 }
522
523 #else /* not _WIN32 && not _USE_GNU_PTH */
524
525 /****************************************************************************/
526 /* pthreads */
527 /****************************************************************************/
528
529 #include <pthread.h>
530 #include <errno.h>
531 #include <sys/time.h>
532
533 #if HAVE_PTHREAD_NP_H && !defined(_IPSO)
534 #include <pthread_np.h>
535 #endif
536
537 #if (HAVE_PTHREAD_MUTEXATTR_SETTYPE || HAVE___PTHREAD_MUTEXATTR_SETTYPE || HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && \
538 HAVE_DECL_PTHREAD_MUTEXATTR_SETTYPE && \
539 (HAVE_DECL_PTHREAD_MUTEX_RECURSIVE || \
540 HAVE_DECL_PTHREAD_MUTEX_RECURSIVE_NP || \
541 HAVE_DECL_MUTEX_TYPE_COUNTING_FAST)
542
543 #define HAVE_RECURSIVE_MUTEXES 1
544
545 #if HAVE_DECL_PTHREAD_MUTEX_RECURSIVE
546 #define MUTEX_RECURSIVE_FLAG PTHREAD_MUTEX_RECURSIVE
547 #elif HAVE_DECL_PTHREAD_MUTEX_RECURSIVE_NP
548 #define MUTEX_RECURSIVE_FLAG PTHREAD_MUTEX_RECURSIVE_NP
549 #elif HAVE_DECL_MUTEX_TYPE_COUNTING_FAST
550 #define MUTEX_RECURSIVE_FLAG MUTEX_TYPE_COUNTING_FAST
551 #else
552 #error Constant used to declare recursive mutex is not known
553 #endif
554
555 #if HAVE_PTHREAD_MUTEXATTR_SETTYPE || HAVE_DECL_PTHREAD_MUTEXATTR_SETTYPE
556 #define MUTEXATTR_SETTYPE pthread_mutexattr_settype
557 #elif HAVE___PTHREAD_MUTEXATTR_SETTYPE
558 #define MUTEXATTR_SETTYPE __pthread_mutexattr_settype
559 #else
560 #define MUTEXATTR_SETTYPE pthread_mutexattr_setkind_np
561 #endif
562
563 #endif
564
565 //
566 // Related datatypes and constants
567 //
568
569 typedef pthread_t THREAD;
570 struct netxms_mutex_t
571 {
572 pthread_mutex_t mutex;
573 #ifndef HAVE_RECURSIVE_MUTEXES
574 bool isRecursive;
575 pthread_t owner;
576 #endif
577 };
578 typedef netxms_mutex_t * MUTEX;
579 struct netxms_condition_t
580 {
581 pthread_cond_t cond;
582 pthread_mutex_t mutex;
583 bool broadcast;
584 bool isSet;
585 };
586 typedef struct netxms_condition_t * CONDITION;
587
588 #define INVALID_MUTEX_HANDLE (NULL)
589 #define INVALID_CONDITION_HANDLE (NULL)
590 #define INVALID_THREAD_HANDLE 0
591
592 #ifndef INFINITE
593 #define INFINITE 0xFFFFFFFF
594 #endif
595
596 typedef void *THREAD_RESULT;
597
598 #define THREAD_OK ((void *)0)
599 #define THREAD_CALL
600
601 extern "C" typedef THREAD_RESULT (THREAD_CALL *ThreadFunction)(void *);
602
603
604 //
605 // Inline functions
606 //
607
608 inline void InitThreadLibrary()
609 {
610 }
611
612 inline void ThreadSleep(int nSeconds)
613 {
614 #ifdef _NETWARE
615 sleep(nSeconds);
616 #else
617 struct timeval tv;
618
619 tv.tv_sec = nSeconds;
620 tv.tv_usec = 0;
621
622 select(1, NULL, NULL, NULL, &tv);
623 #endif
624 }
625
626 inline void ThreadSleepMs(UINT32 dwMilliseconds)
627 {
628 #if HAVE_NANOSLEEP && HAVE_DECL_NANOSLEEP
629 struct timespec interval, remainder;
630
631 interval.tv_sec = dwMilliseconds / 1000;
632 interval.tv_nsec = (dwMilliseconds % 1000) * 1000000; // milli -> nano
633 nanosleep(&interval, &remainder);
634 #else
635 usleep(dwMilliseconds * 1000); // Convert to microseconds
636 #endif
637 }
638
639 inline THREAD ThreadCreateEx(ThreadFunction start_address, int stack_size, void *args)
640 {
641 THREAD id;
642
643 if (stack_size <= 0)
644 {
645 // TODO: Find out minimal stack size
646 stack_size = 1024 * 1024; // 1MB
647 // set stack size to 1mb (it's windows default - and application works,
648 // we need to investigate more on this)
649 }
650 pthread_attr_t attr;
651 pthread_attr_init(&attr);
652 pthread_attr_setstacksize(&attr, stack_size);
653
654 if (pthread_create(&id, &attr, start_address, args) != 0)
655 {
656 id = INVALID_THREAD_HANDLE;
657 }
658
659 pthread_attr_destroy(&attr);
660
661 return id;
662 }
663
664 inline bool ThreadCreate(ThreadFunction start_address, int stack_size, void *args)
665 {
666 THREAD id = ThreadCreateEx(start_address, stack_size, args);
667
668 if (id != INVALID_THREAD_HANDLE)
669 {
670 pthread_detach(id);
671 return TRUE;
672 }
673
674 return FALSE;
675 }
676
677 inline void ThreadExit()
678 {
679 pthread_exit(NULL);
680 }
681
682 inline void ThreadJoin(THREAD hThread)
683 {
684 if (hThread != INVALID_THREAD_HANDLE)
685 pthread_join(hThread, NULL);
686 }
687
688 inline void ThreadDetach(THREAD hThread)
689 {
690 if (hThread != INVALID_THREAD_HANDLE)
691 pthread_detach(hThread);
692 }
693
694 inline MUTEX MutexCreate()
695 {
696 MUTEX mutex;
697
698 mutex = (MUTEX)malloc(sizeof(netxms_mutex_t));
699 if (mutex != NULL)
700 {
701 pthread_mutex_init(&mutex->mutex, NULL);
702 #ifndef HAVE_RECURSIVE_MUTEXES
703 mutex->isRecursive = FALSE;
704 #endif
705 }
706 return mutex;
707 }
708
709 inline MUTEX MutexCreateRecursive()
710 {
711 MUTEX mutex;
712
713 mutex = (MUTEX)malloc(sizeof(netxms_mutex_t));
714 if (mutex != NULL)
715 {
716 #ifdef HAVE_RECURSIVE_MUTEXES
717 pthread_mutexattr_t a;
718
719 pthread_mutexattr_init(&a);
720 MUTEXATTR_SETTYPE(&a, MUTEX_RECURSIVE_FLAG);
721 pthread_mutex_init(&mutex->mutex, &a);
722 pthread_mutexattr_destroy(&a);
723 #else
724 mutex->isRecursive = TRUE;
725 #error FIXME: implement recursive mutexes
726 #endif
727 }
728 return mutex;
729 }
730
731 inline void MutexDestroy(MUTEX mutex)
732 {
733 if (mutex != NULL)
734 {
735 pthread_mutex_destroy(&mutex->mutex);
736 free(mutex);
737 }
738 }
739
740 inline bool MutexLock(MUTEX mutex)
741 {
742 return (mutex != NULL) ? (pthread_mutex_lock(&mutex->mutex) == 0) : false;
743 }
744
745 inline bool MutexTryLock(MUTEX mutex)
746 {
747 return (mutex != NULL) ? (pthread_mutex_trylock(&mutex->mutex) == 0) : false;
748 }
749
750 inline void MutexUnlock(MUTEX mutex)
751 {
752 if (mutex != NULL)
753 pthread_mutex_unlock(&mutex->mutex);
754 }
755
756 inline CONDITION ConditionCreate(bool bBroadcast)
757 {
758 CONDITION cond;
759
760 cond = (CONDITION)malloc(sizeof(struct netxms_condition_t));
761 if (cond != NULL)
762 {
763 pthread_cond_init(&cond->cond, NULL);
764 pthread_mutex_init(&cond->mutex, NULL);
765 cond->broadcast = bBroadcast;
766 cond->isSet = FALSE;
767 }
768
769 return cond;
770 }
771
772 inline void ConditionDestroy(CONDITION cond)
773 {
774 if (cond != INVALID_CONDITION_HANDLE)
775 {
776 pthread_cond_destroy(&cond->cond);
777 pthread_mutex_destroy(&cond->mutex);
778 free(cond);
779 }
780 }
781
782 inline void ConditionSet(CONDITION cond)
783 {
784 if (cond != INVALID_CONDITION_HANDLE)
785 {
786 pthread_mutex_lock(&cond->mutex);
787 cond->isSet = TRUE;
788 if (cond->broadcast)
789 {
790 pthread_cond_broadcast(&cond->cond);
791 }
792 else
793 {
794 pthread_cond_signal(&cond->cond);
795 }
796 pthread_mutex_unlock(&cond->mutex);
797 }
798 }
799
800 inline void ConditionReset(CONDITION cond)
801 {
802 if (cond != INVALID_CONDITION_HANDLE)
803 {
804 pthread_mutex_lock(&cond->mutex);
805 cond->isSet = FALSE;
806 pthread_mutex_unlock(&cond->mutex);
807 }
808 }
809
810 inline void ConditionPulse(CONDITION cond)
811 {
812 if (cond != INVALID_CONDITION_HANDLE)
813 {
814 pthread_mutex_lock(&cond->mutex);
815 if (cond->broadcast)
816 {
817 pthread_cond_broadcast(&cond->cond);
818 }
819 else
820 {
821 pthread_cond_signal(&cond->cond);
822 }
823 cond->isSet = FALSE;
824 pthread_mutex_unlock(&cond->mutex);
825 }
826 }
827
828 inline bool ConditionWait(CONDITION cond, UINT32 dwTimeOut)
829 {
830 bool ret = FALSE;
831
832 if (cond != NULL)
833 {
834 int retcode;
835
836 pthread_mutex_lock(&cond->mutex);
837 if (cond->isSet)
838 {
839 ret = true;
840 if (!cond->broadcast)
841 cond->isSet = FALSE;
842 }
843 else
844 {
845 if (dwTimeOut != INFINITE)
846 {
847 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
848 struct timespec timeout;
849
850 timeout.tv_sec = dwTimeOut / 1000;
851 timeout.tv_nsec = (dwTimeOut % 1000) * 1000000;
852 #ifdef _NETWARE
853 retcode = pthread_cond_timedwait(&cond->cond, &cond->mutex, &timeout);
854 #else
855 retcode = pthread_cond_reltimedwait_np(&cond->cond, &cond->mutex, &timeout);
856 #endif
857 #else
858 struct timeval now;
859 struct timespec timeout;
860
861 // note.
862 // mili - 10^-3
863 // micro - 10^-6
864 // nano - 10^-9
865
866 // FIXME there should be more accurate way
867 gettimeofday(&now, NULL);
868 timeout.tv_sec = now.tv_sec + (dwTimeOut / 1000);
869
870 now.tv_usec += (dwTimeOut % 1000) * 1000;
871 timeout.tv_sec += now.tv_usec / 1000000;
872 timeout.tv_nsec = (now.tv_usec % 1000000) * 1000;
873
874 retcode = pthread_cond_timedwait(&cond->cond, &cond->mutex, &timeout);
875 #endif
876 }
877 else
878 {
879 retcode = pthread_cond_wait(&cond->cond, &cond->mutex);
880 }
881
882 if (retcode == 0)
883 {
884 if (!cond->broadcast)
885 cond->isSet = FALSE;
886 ret = true;
887 }
888 }
889
890 pthread_mutex_unlock(&cond->mutex);
891 }
892
893 return ret;
894 }
895
896 inline UINT32 GetCurrentProcessId()
897 {
898 return getpid();
899 }
900
901 inline THREAD GetCurrentThreadId()
902 {
903 return pthread_self();
904 }
905
906 #endif /* _WIN32 */
907
908 #include <rwlock.h>
909
910 /**
911 * String list
912 */
913 class StringList;
914
915 /**
916 * Thread pool
917 */
918 struct ThreadPool;
919
920 /**
921 * Thread pool information
922 */
923 struct ThreadPoolInfo
924 {
925 const TCHAR *name; // pool name
926 int minThreads; // min threads
927 int maxThreads; // max threads
928 int curThreads; // current threads
929 int activeRequests; // number of active requests
930 int usage; // Pool usage in %
931 int load; // Pool current load in % (can be more than 100% if there are more requests then threads available)
932 double loadAvg[3]; // Pool load average
933 };
934
935 /**
936 * Worker function for thread pool
937 */
938 typedef void (* ThreadPoolWorkerFunction)(void *);
939
940 /* Thread pool functions */
941 ThreadPool LIBNETXMS_EXPORTABLE *ThreadPoolCreate(int minThreads, int maxThreads, const TCHAR *name);
942 void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p);
943 void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunction f, void *arg);
944 void LIBNETXMS_EXPORTABLE ThreadPoolExecuteSerialized(ThreadPool *p, const TCHAR *key, ThreadPoolWorkerFunction f, void *arg);
945 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleAbsolute(ThreadPool *p, time_t runTime, ThreadPoolWorkerFunction f, void *arg);
946 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleRelative(ThreadPool *p, UINT32 delay, ThreadPoolWorkerFunction f, void *arg);
947 void LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(ThreadPool *p, ThreadPoolInfo *info);
948 bool LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(const TCHAR *name, ThreadPoolInfo *info);
949 StringList LIBNETXMS_EXPORTABLE *ThreadPoolGetAllPools();
950
951 /**
952 * Wrapper data for ThreadPoolExecute
953 */
954 template <typename T, typename R> class __ThreadPoolExecute_WrapperData
955 {
956 public:
957 T *m_object;
958 void (T::*m_func)(R);
959 R m_arg;
960
961 __ThreadPoolExecute_WrapperData(T *object, void (T::*func)(R), R arg) { m_object = object; m_func = func; m_arg = arg; }
962 };
963
964 /**
965 * Wrapper for ThreadPoolExecute
966 */
967 template <typename T, typename R> void __ThreadPoolExecute_Wrapper(void *arg)
968 {
969 __ThreadPoolExecute_WrapperData<T, R> *wd = static_cast<__ThreadPoolExecute_WrapperData<T, R> *>(arg);
970 ((*wd->m_object).*(wd->m_func))(wd->m_arg);
971 delete wd;
972 }
973
974 /**
975 * Execute task as soon as possible (use class member with one argument)
976 */
977 template <typename T, typename R> inline void ThreadPoolExecute(ThreadPool *p, T *object, void (T::*f)(R), R arg)
978 {
979 ThreadPoolExecute(p, __ThreadPoolExecute_Wrapper<T,R>, new __ThreadPoolExecute_WrapperData<T, R>(object, f, arg));
980 }
981
982 /* Interlocked increment/decrement functions */
983 #ifdef _WIN32
984
985 typedef volatile LONG VolatileCounter;
986
987 #else
988
989 #if defined(__sun)
990
991 typedef volatile uint32_t VolatileCounter;
992
993 #if !HAVE_ATOMIC_INC_32_NV
994 extern "C" volatile uint32_t solaris9_atomic_inc32(volatile uint32_t *v);
995 #endif
996
997 #if !HAVE_ATOMIC_DEC_32_NV
998 extern "C" volatile uint32_t solaris9_atomic_dec32(volatile uint32_t *v);
999 #endif
1000
1001 /**
1002 * Atomically increment 32-bit value by 1
1003 */
1004 inline VolatileCounter InterlockedIncrement(VolatileCounter *v)
1005 {
1006 #if HAVE_ATOMIC_INC_32_NV
1007 return atomic_inc_32_nv(v);
1008 #else
1009 return solaris9_atomic_inc32(v);
1010 #endif
1011 }
1012
1013 /**
1014 * Atomically decrement 32-bit value by 1
1015 */
1016 inline VolatileCounter InterlockedDecrement(VolatileCounter *v)
1017 {
1018 #if HAVE_ATOMIC_DEC_32_NV
1019 return atomic_dec_32_nv(v);
1020 #else
1021 return solaris9_atomic_dec32(v);
1022 #endif
1023 }
1024
1025 #elif defined(__HP_aCC)
1026
1027 typedef volatile uint32_t VolatileCounter;
1028
1029 #if defined(__hppa) && !HAVE_ATOMIC_H
1030 VolatileCounter parisc_atomic_inc(VolatileCounter *v);
1031 VolatileCounter parisc_atomic_dec(VolatileCounter *v);
1032 #endif
1033
1034 /**
1035 * Atomically increment 32-bit value by 1
1036 */
1037 inline VolatileCounter InterlockedIncrement(VolatileCounter *v)
1038 {
1039 #if HAVE_ATOMIC_H
1040 return atomic_inc_32(v) + 1;
1041 #else
1042 #ifdef __hppa
1043 return parisc_atomic_inc(v);
1044 #else
1045 _Asm_mf(_DFLT_FENCE);
1046 return (uint32_t)_Asm_fetchadd(_FASZ_W, _SEM_ACQ, (void *)v, +1, _LDHINT_NONE) + 1;
1047 #endif
1048 #endif
1049 }
1050
1051 /**
1052 * Atomically decrement 32-bit value by 1
1053 */
1054 inline VolatileCounter InterlockedDecrement(VolatileCounter *v)
1055 {
1056 #if HAVE_ATOMIC_H
1057 return atomic_dec_32(v) - 1;
1058 #else
1059 #ifdef __hppa
1060 return parisc_atomic_inc(v);
1061 #else
1062 _Asm_mf(_DFLT_FENCE);
1063 return (uint32_t)_Asm_fetchadd(_FASZ_W, _SEM_ACQ, (void *)v, -1, _LDHINT_NONE) - 1;
1064 #endif
1065 #endif
1066 }
1067
1068 #else /* not Solaris nor HP-UX */
1069
1070 typedef volatile INT32 VolatileCounter;
1071
1072 /**
1073 * Atomically increment 32-bit value by 1
1074 */
1075 inline VolatileCounter InterlockedIncrement(VolatileCounter *v)
1076 {
1077 #if (defined(__IBMC__) || defined(__IBMCPP__)) && !HAVE_DECL___SYNC_ADD_AND_FETCH
1078 VolatileCounter oldval;
1079 do
1080 {
1081 oldval = __lwarx(v);
1082 } while(__stwcx(v, oldval + 1) == 0);
1083 return oldval + 1;
1084 #elif defined(__GNUC__) && ((__GNUC__ < 4) || (__GNUC_MINOR__ < 1)) && (defined(__i386__) || defined(__x86_64__))
1085 VolatileCounter temp = 1;
1086 __asm__ __volatile__("lock; xaddl %0,%1" : "+r" (temp), "+m" (*v) : : "memory");
1087 return temp + 1;
1088 #else
1089 return __sync_add_and_fetch(v, 1);
1090 #endif
1091 }
1092
1093 /**
1094 * Atomically decrement 32-bit value by 1
1095 */
1096 inline VolatileCounter InterlockedDecrement(VolatileCounter *v)
1097 {
1098 #if (defined(__IBMC__) || defined(__IBMCPP__)) && !HAVE_DECL___SYNC_SUB_AND_FETCH
1099 VolatileCounter oldval;
1100 do
1101 {
1102 oldval = __lwarx(v);
1103 } while(__stwcx(v, oldval - 1) == 0);
1104 return oldval - 1;
1105 #elif defined(__GNUC__) && ((__GNUC__ < 4) || (__GNUC_MINOR__ < 1)) && (defined(__i386__) || defined(__x86_64__))
1106 VolatileCounter temp = -1;
1107 __asm__ __volatile__("lock; xaddl %0,%1" : "+r" (temp), "+m" (*v) : : "memory");
1108 return temp - 1;
1109 #else
1110 return __sync_sub_and_fetch(v, 1);
1111 #endif
1112 }
1113
1114 #endif /* __sun */
1115
1116 #endif /* _WIN32 */
1117
1118 /**
1119 * Wrappers for mutex
1120 */
1121 class LIBNETXMS_EXPORTABLE Mutex
1122 {
1123 private:
1124 MUTEX m_mutex;
1125 VolatileCounter *m_refCount;
1126
1127 public:
1128 Mutex();
1129 Mutex(const Mutex& src);
1130 ~Mutex();
1131
1132 Mutex& operator =(const Mutex &src);
1133
1134 void lock() { MutexLock(m_mutex); }
1135 bool tryLock() { return MutexTryLock(m_mutex); }
1136 void unlock() { MutexUnlock(m_mutex); }
1137 };
1138
1139 /**
1140 * Wrappers for read/write lock
1141 */
1142 class LIBNETXMS_EXPORTABLE RWLock
1143 {
1144 private:
1145 RWLOCK m_rwlock;
1146 VolatileCounter *m_refCount;
1147
1148 public:
1149 RWLock();
1150 RWLock(const RWLock& src);
1151 ~RWLock();
1152
1153 RWLock& operator =(const RWLock &src);
1154
1155 void readLock(UINT32 timeout = INFINITE) { RWLockReadLock(m_rwlock, timeout); }
1156 void writeLock(UINT32 timeout = INFINITE) { RWLockWriteLock(m_rwlock, timeout); }
1157 void unlock() { RWLockUnlock(m_rwlock); }
1158 };
1159
1160 /**
1161 * Wrappers for condition
1162 */
1163 class LIBNETXMS_EXPORTABLE Condition
1164 {
1165 private:
1166 CONDITION m_condition;
1167 VolatileCounter *m_refCount;
1168
1169 public:
1170 Condition(bool broadcast);
1171 Condition(const Condition& src);
1172 ~Condition();
1173
1174 Condition& operator =(const Condition &src);
1175
1176 void set() { ConditionSet(m_condition); }
1177 void pulse() { ConditionPulse(m_condition); }
1178 void reset() { ConditionReset(m_condition); }
1179 bool wait(UINT32 timeout = INFINITE) { return ConditionWait(m_condition, timeout); }
1180 };
1181
1182 #endif /* __cplusplus */
1183
1184 #endif /* _nms_threads_h_ */