implemented single housekeeping thread for all message wait queues
[public/netxms.git] / src / libnetxms / msgwq.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS - Network Management System
3** NetXMS Foundation Library
c17f6cbc 4** Copyright (C) 2003-2011 Victor Kirhenshtein
5039dede
AK
5**
6** This program is free software; you can redistribute it and/or modify
68f384ea
VK
7** it under the terms of the GNU Lesser General Public License as published
8** by the Free Software Foundation; either version 3 of the License, or
5039dede
AK
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
68f384ea 16** You should have received a copy of the GNU Lesser General Public License
5039dede
AK
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19**
20** File: msgwq.cpp
21**
22**/
23
24#include "libnetxms.h"
25
9113e749
VK
26/**
27 * Interval between checking messages TTL in milliseconds
28 */
d87ddcc2 29#define TTL_CHECK_INTERVAL 30000
5039dede 30
9113e749 31/**
f128c07c
VK
32 * Buffer allocation step
33 */
34#define ALLOCATION_STEP 16
35
36/**
d87ddcc2
VK
37 * Housekeeper data
38 */
39MUTEX MsgWaitQueue::m_housekeeperLock = MutexCreate();
40HashMap<UINT64, MsgWaitQueue> *MsgWaitQueue::m_activeQueues = new HashMap<UINT64, MsgWaitQueue>(false);
41CONDITION MsgWaitQueue::m_shutdownCondition = ConditionCreate(TRUE);
42THREAD MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
43
44/**
9113e749
VK
45 * Constructor
46 */
5039dede
AK
47MsgWaitQueue::MsgWaitQueue()
48{
f128c07c
VK
49 m_holdTime = 30000; // Default message TTL is 30 seconds
50 m_size = 0;
51 m_allocated = 0;
52 m_elements = NULL;
14149881 53 m_sequence = 1;
f128c07c
VK
54#ifdef _WIN32
55 InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
56 memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
57 m_wakeupEvents[0] = CreateEvent(NULL, FALSE, FALSE, NULL);
58 memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
59#else
60 pthread_mutex_init(&m_mutex, NULL);
61 pthread_cond_init(&m_wakeupCondition, NULL);
62#endif
d87ddcc2
VK
63
64 // register new queue
65 MutexLock(m_housekeeperLock);
66 m_activeQueues->set(CAST_FROM_POINTER(this, UINT64), this);
67 if (m_housekeeperThread == INVALID_THREAD_HANDLE)
68 {
69 m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, NULL);
70 }
71 MutexUnlock(m_housekeeperLock);
5039dede
AK
72}
73
9113e749
VK
74/**
75 * Destructor
76 */
5039dede
AK
77MsgWaitQueue::~MsgWaitQueue()
78{
d87ddcc2
VK
79 // unregister queue
80 MutexLock(m_housekeeperLock);
81 m_activeQueues->remove(CAST_FROM_POINTER(this, UINT64));
82 MutexUnlock(m_housekeeperLock);
5039dede 83
c17f6cbc 84 clear();
f128c07c 85 safe_free(m_elements);
f128c07c
VK
86
87#ifdef _WIN32
88 DeleteCriticalSection(&m_mutex);
89 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
90 if (m_wakeupEvents[i] != NULL)
91 CloseHandle(m_wakeupEvents[i]);
92#else
93 pthread_mutex_destroy(&m_mutex);
94 pthread_cond_destroy(&m_wakeupCondition);
95#endif
5039dede
AK
96}
97
9113e749
VK
98/**
99 * Clear queue
100 */
c17f6cbc 101void MsgWaitQueue::clear()
5039dede 102{
c17f6cbc 103 lock();
5039dede 104
f128c07c
VK
105 for(int i = 0; i < m_allocated; i++)
106 {
107 if (m_elements[i].msg == NULL)
108 continue;
109
110 if (m_elements[i].isBinary)
5039dede 111 {
f128c07c 112 safe_free(m_elements[i].msg);
5039dede
AK
113 }
114 else
115 {
b368969c 116 delete (NXCPMessage *)(m_elements[i].msg);
5039dede 117 }
f128c07c
VK
118 }
119 m_size = 0;
120 m_allocated = 0;
121 safe_free_and_null(m_elements);
c17f6cbc 122 unlock();
5039dede
AK
123}
124
f128c07c
VK
125/**
126 * Put message into queue
127 */
b368969c 128void MsgWaitQueue::put(NXCPMessage *pMsg)
5039dede 129{
c17f6cbc 130 lock();
5039dede 131
f128c07c
VK
132 int pos;
133 if (m_size == m_allocated)
134 {
135 pos = m_allocated;
136 m_allocated += ALLOCATION_STEP;
137 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
138 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
139 }
140 else
141 {
142 for(pos = 0; m_elements[pos].msg != NULL; pos++);
143 }
5039dede 144
b368969c 145 m_elements[pos].code = pMsg->getCode();
f128c07c 146 m_elements[pos].isBinary = 0;
b368969c 147 m_elements[pos].id = pMsg->getId();
f128c07c
VK
148 m_elements[pos].ttl = m_holdTime;
149 m_elements[pos].msg = pMsg;
14149881 150 m_elements[pos].sequence = m_sequence++;
f128c07c
VK
151 m_size++;
152
153#ifdef _WIN32
154 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
155 if (m_waiters[i])
156 SetEvent(m_wakeupEvents[i]);
157#else
158 pthread_cond_broadcast(&m_wakeupCondition);
159#endif
5039dede 160
c17f6cbc 161 unlock();
5039dede
AK
162}
163
f128c07c
VK
164/**
165 * Put raw message into queue
166 */
b368969c 167void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
5039dede 168{
c17f6cbc 169 lock();
5039dede 170
f128c07c
VK
171 int pos;
172 if (m_size == m_allocated)
173 {
174 pos = m_allocated;
175 m_allocated += ALLOCATION_STEP;
176 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
177 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
178 }
179 else
180 {
181 for(pos = 0; m_elements[pos].msg != NULL; pos++);
182 }
183
b368969c 184 m_elements[pos].code = pMsg->code;
f128c07c 185 m_elements[pos].isBinary = 1;
b368969c 186 m_elements[pos].id = pMsg->id;
f128c07c
VK
187 m_elements[pos].ttl = m_holdTime;
188 m_elements[pos].msg = pMsg;
14149881 189 m_elements[pos].sequence = m_sequence++;
f128c07c
VK
190 m_size++;
191
192#ifdef _WIN32
193 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
194 if (m_waiters[i])
195 SetEvent(m_wakeupEvents[i]);
196#else
197 pthread_cond_broadcast(&m_wakeupCondition);
198#endif
5039dede 199
f128c07c 200 unlock();
5039dede
AK
201}
202
967893bb
VK
203/**
204 * Wait for message with specific code and ID
205 * Function return pointer to the message on success or
206 * NULL on timeout or error
207 */
f128c07c 208void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
5039dede 209{
f128c07c
VK
210 lock();
211
212#ifdef _WIN32
213 int slot = -1;
214#endif
5039dede
AK
215
216 do
217 {
14149881
VK
218 UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
219 int index = -1;
f128c07c 220 for(int i = 0; i < m_allocated; i++)
5039dede 221 {
f128c07c
VK
222 if ((m_elements[i].msg != NULL) &&
223 (m_elements[i].id == dwId) &&
224 (m_elements[i].code == wCode) &&
225 (m_elements[i].isBinary == isBinary))
5039dede 226 {
14149881
VK
227 if (m_elements[i].sequence < minSeq)
228 {
229 minSeq = m_elements[i].sequence;
230 index = i;
231 }
5039dede
AK
232 }
233 }
5039dede 234
14149881
VK
235 if (index != -1)
236 {
237 void *msg = m_elements[index].msg;
238 m_elements[index].msg = NULL;
239 m_size--;
240#ifdef _WIN32
241 if (slot != -1)
242 m_waiters[slot] = 0; // release waiter slot
243#endif
244 unlock();
245 return msg;
246 }
247
f128c07c
VK
248 INT64 startTime = GetCurrentTimeMs();
249
250#ifdef _WIN32
251 // Find free slot if needed
252 if (slot == -1)
253 {
254 for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
255 if (!m_waiters[slot])
256 {
257 m_waiters[slot] = 1;
258 if (m_wakeupEvents[slot] == NULL)
259 m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
260 break;
261 }
262
263 if (slot == MAX_MSGQUEUE_WAITERS)
264 {
265 slot = -1;
266 }
267 }
268
269 LeaveCriticalSection(&m_mutex);
270 if (slot != -1)
271 WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
272 else
273 Sleep(50); // Just sleep if there are no waiter slots (highly unlikely during normal operation)
274 EnterCriticalSection(&m_mutex);
275#else
276#if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
277 struct timespec ts;
278
279 ts.tv_sec = dwTimeOut / 1000;
280 ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
281#ifdef _NETWARE
282 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
283#else
284 pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
285#endif
286#else
287 struct timeval now;
288 struct timespec ts;
289
290 gettimeofday(&now, NULL);
291 ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
292
293 now.tv_usec += (dwTimeOut % 1000) * 1000;
294 ts.tv_sec += now.tv_usec / 1000000;
295 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
296
297 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
298#endif /* HAVE_PTHREAD_COND_RELTIMEDWAIT_NP */
299#endif /* _WIN32 */
300
301 UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
302 dwTimeOut -= min(sleepTime, dwTimeOut);
5039dede
AK
303 } while(dwTimeOut > 0);
304
4e65e21f 305#ifdef _WIN32
f128c07c
VK
306 if (slot != -1)
307 m_waiters[slot] = 0; // release waiter slot
4e65e21f
VK
308#endif
309
f128c07c 310 unlock();
5039dede
AK
311 return NULL;
312}
313
f128c07c 314/**
d87ddcc2 315 * Housekeeping run
f128c07c 316 */
d87ddcc2 317void MsgWaitQueue::housekeeperRun()
5039dede 318{
d87ddcc2
VK
319 lock();
320 if (m_size > 0)
5039dede 321 {
d87ddcc2
VK
322 for(int i = 0; i < m_allocated; i++)
323 {
324 if (m_elements[i].msg == NULL)
325 continue;
f128c07c 326
d87ddcc2
VK
327 if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
328 {
329 if (m_elements[i].isBinary)
5039dede 330 {
d87ddcc2 331 safe_free(m_elements[i].msg);
5039dede
AK
332 }
333 else
334 {
d87ddcc2 335 delete (NXCPMessage *)(m_elements[i].msg);
5039dede 336 }
d87ddcc2
VK
337 m_elements[i].msg = NULL;
338 m_size--;
339 }
340 else
14149881 341 {
d87ddcc2 342 m_elements[i].ttl -= TTL_CHECK_INTERVAL;
14149881 343 }
d87ddcc2
VK
344 }
345
346 // compact queue if possible
347 if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
348 {
349 m_allocated = ALLOCATION_STEP;
350 free(m_elements);
351 m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
34c8306e 352 }
5039dede 353 }
d87ddcc2
VK
354 unlock();
355}
356
357/**
358 * Callback for enumerating active queues
359 */
360EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const void *key, const void *object, void *arg)
361{
362 ((MsgWaitQueue *)object)->housekeeperRun();
363 return _CONTINUE;
5039dede
AK
364}
365
f128c07c 366/**
d87ddcc2 367 * Housekeeper thread
f128c07c 368 */
d87ddcc2 369THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
5039dede 370{
d87ddcc2
VK
371 while(!ConditionWait(m_shutdownCondition, TTL_CHECK_INTERVAL))
372 {
373 MutexLock(m_housekeeperLock);
374 m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback, NULL);
375 MutexUnlock(m_housekeeperLock);
376 }
5039dede
AK
377 return THREAD_OK;
378}
d87ddcc2
VK
379
380/**
381 * Shutdown message wait queue background tasks
382 */
383void MsgWaitQueue::shutdown()
384{
385 ConditionSet(m_shutdownCondition);
386 ThreadJoin(m_housekeeperThread);
387 MutexLock(m_housekeeperLock);
388 m_housekeeperThread = INVALID_THREAD_HANDLE;
389 MutexUnlock(m_housekeeperLock);
390}
391
392/**
393 * Diag info callback
394 */
395EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const void *key, const void *object, void *arg)
396{
397 MsgWaitQueue *q = (MsgWaitQueue *)object;
398 TCHAR buffer[256];
399 _sntprintf(buffer, 256, _T(" %p size=%d holdTime=%d\n"), q, q->m_size, q->m_holdTime);
400 ((String *)arg)->append(buffer);
401 return _CONTINUE;
402}
403
404/**
405 * Get diagnostic info
406 */
407String MsgWaitQueue::getDiagInfo()
408{
409 String out;
410 MutexLock(m_housekeeperLock);
411 out.append(m_activeQueues->size());
412 out.append(_T(" active queues\nHousekeeper thread state is "));
413 out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
414 if (m_activeQueues->size() > 0)
415 {
416 out.append(_T("Active queues:\n"));
417 m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
418 }
419 MutexUnlock(m_housekeeperLock);
420 return out;
421}