fixed Windows build errors
[public/netxms.git] / src / libnetxms / msgwq.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS - Network Management System
3** NetXMS Foundation Library
c73c3ba9 4** Copyright (C) 2003-2016 Victor Kirhenshtein
5039dede
AK
5**
6** This program is free software; you can redistribute it and/or modify
68f384ea
VK
7** it under the terms of the GNU Lesser General Public License as published
8** by the Free Software Foundation; either version 3 of the License, or
5039dede
AK
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
68f384ea 16** You should have received a copy of the GNU Lesser General Public License
5039dede
AK
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19**
20** File: msgwq.cpp
21**
22**/
23
24#include "libnetxms.h"
25
9113e749
VK
26/**
27 * Interval between checking messages TTL in milliseconds
28 */
d87ddcc2 29#define TTL_CHECK_INTERVAL 30000
5039dede 30
9113e749 31/**
f128c07c
VK
32 * Buffer allocation step
33 */
34#define ALLOCATION_STEP 16
35
36/**
d87ddcc2
VK
37 * Housekeeper data
38 */
c73c3ba9 39Mutex MsgWaitQueue::m_housekeeperLock;
669fcb92 40HashMap<UINT64, MsgWaitQueue> *MsgWaitQueue::m_activeQueues = new HashMap<UINT64, MsgWaitQueue>(false);
c73c3ba9 41Condition MsgWaitQueue::m_shutdownCondition(true);
d87ddcc2
VK
42THREAD MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
43
44/**
9113e749
VK
45 * Constructor
46 */
5039dede
AK
47MsgWaitQueue::MsgWaitQueue()
48{
f128c07c
VK
49 m_holdTime = 30000; // Default message TTL is 30 seconds
50 m_size = 0;
51 m_allocated = 0;
52 m_elements = NULL;
14149881 53 m_sequence = 1;
ef8a3c32 54#if defined(_WIN32)
f128c07c
VK
55 InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
56 memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
57 m_wakeupEvents[0] = CreateEvent(NULL, FALSE, FALSE, NULL);
58 memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
ef8a3c32
VK
59#elif defined(_USE_GNU_PTH)
60 pth_mutex_init(&m_mutex);
61 pth_cond_init(&m_wakeupCondition);
f128c07c
VK
62#else
63 pthread_mutex_init(&m_mutex, NULL);
64 pthread_cond_init(&m_wakeupCondition, NULL);
65#endif
d87ddcc2
VK
66
67 // register new queue
c73c3ba9 68 m_housekeeperLock.lock();
669fcb92 69 m_activeQueues->set(CAST_FROM_POINTER(this, UINT64), this);
d87ddcc2
VK
70 if (m_housekeeperThread == INVALID_THREAD_HANDLE)
71 {
72 m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, NULL);
73 }
c73c3ba9 74 m_housekeeperLock.unlock();
5039dede
AK
75}
76
9113e749
VK
77/**
78 * Destructor
79 */
5039dede
AK
80MsgWaitQueue::~MsgWaitQueue()
81{
d87ddcc2 82 // unregister queue
c73c3ba9 83 m_housekeeperLock.lock();
669fcb92 84 m_activeQueues->remove(CAST_FROM_POINTER(this, UINT64));
c73c3ba9 85 m_housekeeperLock.unlock();
5039dede 86
c17f6cbc 87 clear();
f128c07c 88 safe_free(m_elements);
f128c07c 89
ef8a3c32 90#if defined(_WIN32)
f128c07c
VK
91 DeleteCriticalSection(&m_mutex);
92 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
93 if (m_wakeupEvents[i] != NULL)
94 CloseHandle(m_wakeupEvents[i]);
ef8a3c32
VK
95#elif defined(_USE_GNU_PTH)
96 // nothing to do if libpth is used
f128c07c
VK
97#else
98 pthread_mutex_destroy(&m_mutex);
99 pthread_cond_destroy(&m_wakeupCondition);
100#endif
5039dede
AK
101}
102
9113e749
VK
103/**
104 * Clear queue
105 */
c17f6cbc 106void MsgWaitQueue::clear()
5039dede 107{
c17f6cbc 108 lock();
5039dede 109
f128c07c
VK
110 for(int i = 0; i < m_allocated; i++)
111 {
112 if (m_elements[i].msg == NULL)
113 continue;
114
115 if (m_elements[i].isBinary)
5039dede 116 {
f128c07c 117 safe_free(m_elements[i].msg);
5039dede
AK
118 }
119 else
120 {
b368969c 121 delete (NXCPMessage *)(m_elements[i].msg);
5039dede 122 }
f128c07c
VK
123 }
124 m_size = 0;
125 m_allocated = 0;
126 safe_free_and_null(m_elements);
c17f6cbc 127 unlock();
5039dede
AK
128}
129
f128c07c
VK
130/**
131 * Put message into queue
132 */
b368969c 133void MsgWaitQueue::put(NXCPMessage *pMsg)
5039dede 134{
c17f6cbc 135 lock();
5039dede 136
f128c07c
VK
137 int pos;
138 if (m_size == m_allocated)
139 {
140 pos = m_allocated;
141 m_allocated += ALLOCATION_STEP;
142 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
143 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
144 }
145 else
146 {
147 for(pos = 0; m_elements[pos].msg != NULL; pos++);
148 }
5039dede 149
b368969c 150 m_elements[pos].code = pMsg->getCode();
f128c07c 151 m_elements[pos].isBinary = 0;
b368969c 152 m_elements[pos].id = pMsg->getId();
f128c07c
VK
153 m_elements[pos].ttl = m_holdTime;
154 m_elements[pos].msg = pMsg;
14149881 155 m_elements[pos].sequence = m_sequence++;
f128c07c
VK
156 m_size++;
157
ef8a3c32 158#if defined(_WIN32)
f128c07c
VK
159 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
160 if (m_waiters[i])
161 SetEvent(m_wakeupEvents[i]);
ef8a3c32
VK
162#elif defined(_USE_GNU_PTH)
163 pth_cond_notify(&m_wakeupCondition, TRUE);
f128c07c
VK
164#else
165 pthread_cond_broadcast(&m_wakeupCondition);
166#endif
5039dede 167
c17f6cbc 168 unlock();
5039dede
AK
169}
170
f128c07c
VK
171/**
172 * Put raw message into queue
173 */
b368969c 174void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
5039dede 175{
c17f6cbc 176 lock();
5039dede 177
f128c07c
VK
178 int pos;
179 if (m_size == m_allocated)
180 {
181 pos = m_allocated;
182 m_allocated += ALLOCATION_STEP;
183 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
184 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
185 }
186 else
187 {
188 for(pos = 0; m_elements[pos].msg != NULL; pos++);
189 }
190
b368969c 191 m_elements[pos].code = pMsg->code;
f128c07c 192 m_elements[pos].isBinary = 1;
b368969c 193 m_elements[pos].id = pMsg->id;
f128c07c
VK
194 m_elements[pos].ttl = m_holdTime;
195 m_elements[pos].msg = pMsg;
14149881 196 m_elements[pos].sequence = m_sequence++;
f128c07c
VK
197 m_size++;
198
199#ifdef _WIN32
200 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
201 if (m_waiters[i])
202 SetEvent(m_wakeupEvents[i]);
ef8a3c32
VK
203#elif defined(_USE_GNU_PTH)
204 pth_cond_notify(&m_wakeupCondition, TRUE);
f128c07c
VK
205#else
206 pthread_cond_broadcast(&m_wakeupCondition);
207#endif
5039dede 208
f128c07c 209 unlock();
5039dede
AK
210}
211
967893bb
VK
212/**
213 * Wait for message with specific code and ID
214 * Function return pointer to the message on success or
215 * NULL on timeout or error
216 */
f128c07c 217void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
5039dede 218{
f128c07c
VK
219 lock();
220
221#ifdef _WIN32
222 int slot = -1;
223#endif
5039dede
AK
224
225 do
226 {
14149881
VK
227 UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
228 int index = -1;
f128c07c 229 for(int i = 0; i < m_allocated; i++)
5039dede 230 {
f128c07c
VK
231 if ((m_elements[i].msg != NULL) &&
232 (m_elements[i].id == dwId) &&
233 (m_elements[i].code == wCode) &&
234 (m_elements[i].isBinary == isBinary))
5039dede 235 {
14149881
VK
236 if (m_elements[i].sequence < minSeq)
237 {
238 minSeq = m_elements[i].sequence;
239 index = i;
240 }
5039dede
AK
241 }
242 }
5039dede 243
14149881
VK
244 if (index != -1)
245 {
246 void *msg = m_elements[index].msg;
247 m_elements[index].msg = NULL;
248 m_size--;
249#ifdef _WIN32
250 if (slot != -1)
251 m_waiters[slot] = 0; // release waiter slot
252#endif
253 unlock();
254 return msg;
255 }
256
f128c07c
VK
257 INT64 startTime = GetCurrentTimeMs();
258
ef8a3c32 259#if defined(_WIN32)
f128c07c
VK
260 // Find free slot if needed
261 if (slot == -1)
262 {
263 for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
264 if (!m_waiters[slot])
265 {
266 m_waiters[slot] = 1;
267 if (m_wakeupEvents[slot] == NULL)
268 m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
269 break;
270 }
271
272 if (slot == MAX_MSGQUEUE_WAITERS)
273 {
274 slot = -1;
275 }
276 }
277
278 LeaveCriticalSection(&m_mutex);
279 if (slot != -1)
280 WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
281 else
282 Sleep(50); // Just sleep if there are no waiter slots (highly unlikely during normal operation)
283 EnterCriticalSection(&m_mutex);
ef8a3c32 284#elif HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
f128c07c
VK
285 struct timespec ts;
286
287 ts.tv_sec = dwTimeOut / 1000;
288 ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
289#ifdef _NETWARE
290 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
291#else
292 pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
293#endif
ef8a3c32
VK
294#elif defined(_USE_GNU_PTH)
295 pth_event_t ev = pth_event(PTH_EVENT_TIME, pth_timeout(dwTimeOut / 1000, (dwTimeOut % 1000) * 1000));
296 pth_cond_await(&m_wakeupCondition, &m_mutex, ev);
297 pth_event_free(ev, PTH_FREE_ALL);
f128c07c
VK
298#else
299 struct timeval now;
300 struct timespec ts;
301
302 gettimeofday(&now, NULL);
303 ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
304
305 now.tv_usec += (dwTimeOut % 1000) * 1000;
306 ts.tv_sec += now.tv_usec / 1000000;
307 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
308
309 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
f128c07c
VK
310#endif /* _WIN32 */
311
312 UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
313 dwTimeOut -= min(sleepTime, dwTimeOut);
5039dede
AK
314 } while(dwTimeOut > 0);
315
4e65e21f 316#ifdef _WIN32
f128c07c
VK
317 if (slot != -1)
318 m_waiters[slot] = 0; // release waiter slot
4e65e21f
VK
319#endif
320
f128c07c 321 unlock();
5039dede
AK
322 return NULL;
323}
324
f128c07c 325/**
d87ddcc2 326 * Housekeeping run
f128c07c 327 */
d87ddcc2 328void MsgWaitQueue::housekeeperRun()
5039dede 329{
d87ddcc2
VK
330 lock();
331 if (m_size > 0)
5039dede 332 {
d87ddcc2
VK
333 for(int i = 0; i < m_allocated; i++)
334 {
335 if (m_elements[i].msg == NULL)
336 continue;
f128c07c 337
d87ddcc2
VK
338 if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
339 {
340 if (m_elements[i].isBinary)
5039dede 341 {
d87ddcc2 342 safe_free(m_elements[i].msg);
5039dede
AK
343 }
344 else
345 {
d87ddcc2 346 delete (NXCPMessage *)(m_elements[i].msg);
5039dede 347 }
d87ddcc2
VK
348 m_elements[i].msg = NULL;
349 m_size--;
350 }
351 else
14149881 352 {
d87ddcc2 353 m_elements[i].ttl -= TTL_CHECK_INTERVAL;
14149881 354 }
d87ddcc2
VK
355 }
356
357 // compact queue if possible
358 if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
359 {
360 m_allocated = ALLOCATION_STEP;
361 free(m_elements);
362 m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
34c8306e 363 }
5039dede 364 }
d87ddcc2
VK
365 unlock();
366}
367
368/**
369 * Callback for enumerating active queues
370 */
371EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const void *key, const void *object, void *arg)
372{
373 ((MsgWaitQueue *)object)->housekeeperRun();
374 return _CONTINUE;
5039dede
AK
375}
376
f128c07c 377/**
d87ddcc2 378 * Housekeeper thread
f128c07c 379 */
d87ddcc2 380THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
5039dede 381{
c73c3ba9 382 while(!m_shutdownCondition.wait(TTL_CHECK_INTERVAL))
d87ddcc2 383 {
c73c3ba9 384 m_housekeeperLock.lock();
669fcb92 385 m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback, NULL);
c73c3ba9 386 m_housekeeperLock.unlock();
d87ddcc2 387 }
5039dede
AK
388 return THREAD_OK;
389}
d87ddcc2
VK
390
391/**
392 * Shutdown message wait queue background tasks
393 */
394void MsgWaitQueue::shutdown()
395{
c73c3ba9 396 m_shutdownCondition.set();
d87ddcc2 397 ThreadJoin(m_housekeeperThread);
c73c3ba9 398 m_housekeeperLock.lock();
d87ddcc2 399 m_housekeeperThread = INVALID_THREAD_HANDLE;
c73c3ba9 400 m_housekeeperLock.unlock();
669fcb92 401 delete m_activeQueues;
d87ddcc2
VK
402}
403
404/**
405 * Diag info callback
406 */
407EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const void *key, const void *object, void *arg)
408{
409 MsgWaitQueue *q = (MsgWaitQueue *)object;
410 TCHAR buffer[256];
411 _sntprintf(buffer, 256, _T(" %p size=%d holdTime=%d\n"), q, q->m_size, q->m_holdTime);
412 ((String *)arg)->append(buffer);
413 return _CONTINUE;
414}
415
416/**
417 * Get diagnostic info
418 */
419String MsgWaitQueue::getDiagInfo()
420{
421 String out;
c73c3ba9 422 m_housekeeperLock.lock();
669fcb92 423 out.append(m_activeQueues->size());
d87ddcc2
VK
424 out.append(_T(" active queues\nHousekeeper thread state is "));
425 out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
669fcb92 426 if (m_activeQueues->size() > 0)
d87ddcc2
VK
427 {
428 out.append(_T("Active queues:\n"));
669fcb92 429 m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
d87ddcc2 430 }
c73c3ba9 431 m_housekeeperLock.unlock();
d87ddcc2
VK
432 return out;
433}