implemented single housekeeping thread for all message wait queues
[public/netxms.git] / src / libnetxms / msgwq.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** NetXMS Foundation Library
4 ** Copyright (C) 2003-2011 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published
8 ** by the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: msgwq.cpp
21 **
22 **/
23
24 #include "libnetxms.h"
25
26 /**
27 * Interval between checking messages TTL in milliseconds
28 */
29 #define TTL_CHECK_INTERVAL 30000
30
31 /**
32 * Buffer allocation step
33 */
34 #define ALLOCATION_STEP 16
35
36 /**
37 * Housekeeper data
38 */
39 MUTEX MsgWaitQueue::m_housekeeperLock = MutexCreate();
40 HashMap<UINT64, MsgWaitQueue> *MsgWaitQueue::m_activeQueues = new HashMap<UINT64, MsgWaitQueue>(false);
41 CONDITION MsgWaitQueue::m_shutdownCondition = ConditionCreate(TRUE);
42 THREAD MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
43
44 /**
45 * Constructor
46 */
47 MsgWaitQueue::MsgWaitQueue()
48 {
49 m_holdTime = 30000; // Default message TTL is 30 seconds
50 m_size = 0;
51 m_allocated = 0;
52 m_elements = NULL;
53 m_sequence = 1;
54 #ifdef _WIN32
55 InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
56 memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
57 m_wakeupEvents[0] = CreateEvent(NULL, FALSE, FALSE, NULL);
58 memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
59 #else
60 pthread_mutex_init(&m_mutex, NULL);
61 pthread_cond_init(&m_wakeupCondition, NULL);
62 #endif
63
64 // register new queue
65 MutexLock(m_housekeeperLock);
66 m_activeQueues->set(CAST_FROM_POINTER(this, UINT64), this);
67 if (m_housekeeperThread == INVALID_THREAD_HANDLE)
68 {
69 m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, NULL);
70 }
71 MutexUnlock(m_housekeeperLock);
72 }
73
74 /**
75 * Destructor
76 */
77 MsgWaitQueue::~MsgWaitQueue()
78 {
79 // unregister queue
80 MutexLock(m_housekeeperLock);
81 m_activeQueues->remove(CAST_FROM_POINTER(this, UINT64));
82 MutexUnlock(m_housekeeperLock);
83
84 clear();
85 safe_free(m_elements);
86
87 #ifdef _WIN32
88 DeleteCriticalSection(&m_mutex);
89 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
90 if (m_wakeupEvents[i] != NULL)
91 CloseHandle(m_wakeupEvents[i]);
92 #else
93 pthread_mutex_destroy(&m_mutex);
94 pthread_cond_destroy(&m_wakeupCondition);
95 #endif
96 }
97
98 /**
99 * Clear queue
100 */
101 void MsgWaitQueue::clear()
102 {
103 lock();
104
105 for(int i = 0; i < m_allocated; i++)
106 {
107 if (m_elements[i].msg == NULL)
108 continue;
109
110 if (m_elements[i].isBinary)
111 {
112 safe_free(m_elements[i].msg);
113 }
114 else
115 {
116 delete (NXCPMessage *)(m_elements[i].msg);
117 }
118 }
119 m_size = 0;
120 m_allocated = 0;
121 safe_free_and_null(m_elements);
122 unlock();
123 }
124
125 /**
126 * Put message into queue
127 */
128 void MsgWaitQueue::put(NXCPMessage *pMsg)
129 {
130 lock();
131
132 int pos;
133 if (m_size == m_allocated)
134 {
135 pos = m_allocated;
136 m_allocated += ALLOCATION_STEP;
137 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
138 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
139 }
140 else
141 {
142 for(pos = 0; m_elements[pos].msg != NULL; pos++);
143 }
144
145 m_elements[pos].code = pMsg->getCode();
146 m_elements[pos].isBinary = 0;
147 m_elements[pos].id = pMsg->getId();
148 m_elements[pos].ttl = m_holdTime;
149 m_elements[pos].msg = pMsg;
150 m_elements[pos].sequence = m_sequence++;
151 m_size++;
152
153 #ifdef _WIN32
154 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
155 if (m_waiters[i])
156 SetEvent(m_wakeupEvents[i]);
157 #else
158 pthread_cond_broadcast(&m_wakeupCondition);
159 #endif
160
161 unlock();
162 }
163
164 /**
165 * Put raw message into queue
166 */
167 void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
168 {
169 lock();
170
171 int pos;
172 if (m_size == m_allocated)
173 {
174 pos = m_allocated;
175 m_allocated += ALLOCATION_STEP;
176 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
177 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
178 }
179 else
180 {
181 for(pos = 0; m_elements[pos].msg != NULL; pos++);
182 }
183
184 m_elements[pos].code = pMsg->code;
185 m_elements[pos].isBinary = 1;
186 m_elements[pos].id = pMsg->id;
187 m_elements[pos].ttl = m_holdTime;
188 m_elements[pos].msg = pMsg;
189 m_elements[pos].sequence = m_sequence++;
190 m_size++;
191
192 #ifdef _WIN32
193 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
194 if (m_waiters[i])
195 SetEvent(m_wakeupEvents[i]);
196 #else
197 pthread_cond_broadcast(&m_wakeupCondition);
198 #endif
199
200 unlock();
201 }
202
203 /**
204 * Wait for message with specific code and ID
205 * Function return pointer to the message on success or
206 * NULL on timeout or error
207 */
208 void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
209 {
210 lock();
211
212 #ifdef _WIN32
213 int slot = -1;
214 #endif
215
216 do
217 {
218 UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
219 int index = -1;
220 for(int i = 0; i < m_allocated; i++)
221 {
222 if ((m_elements[i].msg != NULL) &&
223 (m_elements[i].id == dwId) &&
224 (m_elements[i].code == wCode) &&
225 (m_elements[i].isBinary == isBinary))
226 {
227 if (m_elements[i].sequence < minSeq)
228 {
229 minSeq = m_elements[i].sequence;
230 index = i;
231 }
232 }
233 }
234
235 if (index != -1)
236 {
237 void *msg = m_elements[index].msg;
238 m_elements[index].msg = NULL;
239 m_size--;
240 #ifdef _WIN32
241 if (slot != -1)
242 m_waiters[slot] = 0; // release waiter slot
243 #endif
244 unlock();
245 return msg;
246 }
247
248 INT64 startTime = GetCurrentTimeMs();
249
250 #ifdef _WIN32
251 // Find free slot if needed
252 if (slot == -1)
253 {
254 for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
255 if (!m_waiters[slot])
256 {
257 m_waiters[slot] = 1;
258 if (m_wakeupEvents[slot] == NULL)
259 m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
260 break;
261 }
262
263 if (slot == MAX_MSGQUEUE_WAITERS)
264 {
265 slot = -1;
266 }
267 }
268
269 LeaveCriticalSection(&m_mutex);
270 if (slot != -1)
271 WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
272 else
273 Sleep(50); // Just sleep if there are no waiter slots (highly unlikely during normal operation)
274 EnterCriticalSection(&m_mutex);
275 #else
276 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
277 struct timespec ts;
278
279 ts.tv_sec = dwTimeOut / 1000;
280 ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
281 #ifdef _NETWARE
282 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
283 #else
284 pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
285 #endif
286 #else
287 struct timeval now;
288 struct timespec ts;
289
290 gettimeofday(&now, NULL);
291 ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
292
293 now.tv_usec += (dwTimeOut % 1000) * 1000;
294 ts.tv_sec += now.tv_usec / 1000000;
295 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
296
297 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
298 #endif /* HAVE_PTHREAD_COND_RELTIMEDWAIT_NP */
299 #endif /* _WIN32 */
300
301 UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
302 dwTimeOut -= min(sleepTime, dwTimeOut);
303 } while(dwTimeOut > 0);
304
305 #ifdef _WIN32
306 if (slot != -1)
307 m_waiters[slot] = 0; // release waiter slot
308 #endif
309
310 unlock();
311 return NULL;
312 }
313
314 /**
315 * Housekeeping run
316 */
317 void MsgWaitQueue::housekeeperRun()
318 {
319 lock();
320 if (m_size > 0)
321 {
322 for(int i = 0; i < m_allocated; i++)
323 {
324 if (m_elements[i].msg == NULL)
325 continue;
326
327 if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
328 {
329 if (m_elements[i].isBinary)
330 {
331 safe_free(m_elements[i].msg);
332 }
333 else
334 {
335 delete (NXCPMessage *)(m_elements[i].msg);
336 }
337 m_elements[i].msg = NULL;
338 m_size--;
339 }
340 else
341 {
342 m_elements[i].ttl -= TTL_CHECK_INTERVAL;
343 }
344 }
345
346 // compact queue if possible
347 if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
348 {
349 m_allocated = ALLOCATION_STEP;
350 free(m_elements);
351 m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
352 }
353 }
354 unlock();
355 }
356
357 /**
358 * Callback for enumerating active queues
359 */
360 EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const void *key, const void *object, void *arg)
361 {
362 ((MsgWaitQueue *)object)->housekeeperRun();
363 return _CONTINUE;
364 }
365
366 /**
367 * Housekeeper thread
368 */
369 THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
370 {
371 while(!ConditionWait(m_shutdownCondition, TTL_CHECK_INTERVAL))
372 {
373 MutexLock(m_housekeeperLock);
374 m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback, NULL);
375 MutexUnlock(m_housekeeperLock);
376 }
377 return THREAD_OK;
378 }
379
380 /**
381 * Shutdown message wait queue background tasks
382 */
383 void MsgWaitQueue::shutdown()
384 {
385 ConditionSet(m_shutdownCondition);
386 ThreadJoin(m_housekeeperThread);
387 MutexLock(m_housekeeperLock);
388 m_housekeeperThread = INVALID_THREAD_HANDLE;
389 MutexUnlock(m_housekeeperLock);
390 }
391
392 /**
393 * Diag info callback
394 */
395 EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const void *key, const void *object, void *arg)
396 {
397 MsgWaitQueue *q = (MsgWaitQueue *)object;
398 TCHAR buffer[256];
399 _sntprintf(buffer, 256, _T(" %p size=%d holdTime=%d\n"), q, q->m_size, q->m_holdTime);
400 ((String *)arg)->append(buffer);
401 return _CONTINUE;
402 }
403
404 /**
405 * Get diagnostic info
406 */
407 String MsgWaitQueue::getDiagInfo()
408 {
409 String out;
410 MutexLock(m_housekeeperLock);
411 out.append(m_activeQueues->size());
412 out.append(_T(" active queues\nHousekeeper thread state is "));
413 out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
414 if (m_activeQueues->size() > 0)
415 {
416 out.append(_T("Active queues:\n"));
417 m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
418 }
419 MutexUnlock(m_housekeeperLock);
420 return out;
421 }