fixed Windows build errors
[public/netxms.git] / src / libnetxms / msgwq.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** NetXMS Foundation Library
4 ** Copyright (C) 2003-2016 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published
8 ** by the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: msgwq.cpp
21 **
22 **/
23
24 #include "libnetxms.h"
25
26 /**
27 * Interval between checking messages TTL in milliseconds
28 */
29 #define TTL_CHECK_INTERVAL 30000
30
31 /**
32 * Buffer allocation step
33 */
34 #define ALLOCATION_STEP 16
35
36 /**
37 * Housekeeper data
38 */
39 Mutex MsgWaitQueue::m_housekeeperLock;
40 HashMap<UINT64, MsgWaitQueue> *MsgWaitQueue::m_activeQueues = new HashMap<UINT64, MsgWaitQueue>(false);
41 Condition MsgWaitQueue::m_shutdownCondition(true);
42 THREAD MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
43
44 /**
45 * Constructor
46 */
47 MsgWaitQueue::MsgWaitQueue()
48 {
49 m_holdTime = 30000; // Default message TTL is 30 seconds
50 m_size = 0;
51 m_allocated = 0;
52 m_elements = NULL;
53 m_sequence = 1;
54 #if defined(_WIN32)
55 InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
56 memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
57 m_wakeupEvents[0] = CreateEvent(NULL, FALSE, FALSE, NULL);
58 memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
59 #elif defined(_USE_GNU_PTH)
60 pth_mutex_init(&m_mutex);
61 pth_cond_init(&m_wakeupCondition);
62 #else
63 pthread_mutex_init(&m_mutex, NULL);
64 pthread_cond_init(&m_wakeupCondition, NULL);
65 #endif
66
67 // register new queue
68 m_housekeeperLock.lock();
69 m_activeQueues->set(CAST_FROM_POINTER(this, UINT64), this);
70 if (m_housekeeperThread == INVALID_THREAD_HANDLE)
71 {
72 m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, NULL);
73 }
74 m_housekeeperLock.unlock();
75 }
76
77 /**
78 * Destructor
79 */
80 MsgWaitQueue::~MsgWaitQueue()
81 {
82 // unregister queue
83 m_housekeeperLock.lock();
84 m_activeQueues->remove(CAST_FROM_POINTER(this, UINT64));
85 m_housekeeperLock.unlock();
86
87 clear();
88 safe_free(m_elements);
89
90 #if defined(_WIN32)
91 DeleteCriticalSection(&m_mutex);
92 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
93 if (m_wakeupEvents[i] != NULL)
94 CloseHandle(m_wakeupEvents[i]);
95 #elif defined(_USE_GNU_PTH)
96 // nothing to do if libpth is used
97 #else
98 pthread_mutex_destroy(&m_mutex);
99 pthread_cond_destroy(&m_wakeupCondition);
100 #endif
101 }
102
103 /**
104 * Clear queue
105 */
106 void MsgWaitQueue::clear()
107 {
108 lock();
109
110 for(int i = 0; i < m_allocated; i++)
111 {
112 if (m_elements[i].msg == NULL)
113 continue;
114
115 if (m_elements[i].isBinary)
116 {
117 safe_free(m_elements[i].msg);
118 }
119 else
120 {
121 delete (NXCPMessage *)(m_elements[i].msg);
122 }
123 }
124 m_size = 0;
125 m_allocated = 0;
126 safe_free_and_null(m_elements);
127 unlock();
128 }
129
130 /**
131 * Put message into queue
132 */
133 void MsgWaitQueue::put(NXCPMessage *pMsg)
134 {
135 lock();
136
137 int pos;
138 if (m_size == m_allocated)
139 {
140 pos = m_allocated;
141 m_allocated += ALLOCATION_STEP;
142 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
143 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
144 }
145 else
146 {
147 for(pos = 0; m_elements[pos].msg != NULL; pos++);
148 }
149
150 m_elements[pos].code = pMsg->getCode();
151 m_elements[pos].isBinary = 0;
152 m_elements[pos].id = pMsg->getId();
153 m_elements[pos].ttl = m_holdTime;
154 m_elements[pos].msg = pMsg;
155 m_elements[pos].sequence = m_sequence++;
156 m_size++;
157
158 #if defined(_WIN32)
159 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
160 if (m_waiters[i])
161 SetEvent(m_wakeupEvents[i]);
162 #elif defined(_USE_GNU_PTH)
163 pth_cond_notify(&m_wakeupCondition, TRUE);
164 #else
165 pthread_cond_broadcast(&m_wakeupCondition);
166 #endif
167
168 unlock();
169 }
170
171 /**
172 * Put raw message into queue
173 */
174 void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
175 {
176 lock();
177
178 int pos;
179 if (m_size == m_allocated)
180 {
181 pos = m_allocated;
182 m_allocated += ALLOCATION_STEP;
183 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
184 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
185 }
186 else
187 {
188 for(pos = 0; m_elements[pos].msg != NULL; pos++);
189 }
190
191 m_elements[pos].code = pMsg->code;
192 m_elements[pos].isBinary = 1;
193 m_elements[pos].id = pMsg->id;
194 m_elements[pos].ttl = m_holdTime;
195 m_elements[pos].msg = pMsg;
196 m_elements[pos].sequence = m_sequence++;
197 m_size++;
198
199 #ifdef _WIN32
200 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
201 if (m_waiters[i])
202 SetEvent(m_wakeupEvents[i]);
203 #elif defined(_USE_GNU_PTH)
204 pth_cond_notify(&m_wakeupCondition, TRUE);
205 #else
206 pthread_cond_broadcast(&m_wakeupCondition);
207 #endif
208
209 unlock();
210 }
211
212 /**
213 * Wait for message with specific code and ID
214 * Function return pointer to the message on success or
215 * NULL on timeout or error
216 */
217 void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
218 {
219 lock();
220
221 #ifdef _WIN32
222 int slot = -1;
223 #endif
224
225 do
226 {
227 UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
228 int index = -1;
229 for(int i = 0; i < m_allocated; i++)
230 {
231 if ((m_elements[i].msg != NULL) &&
232 (m_elements[i].id == dwId) &&
233 (m_elements[i].code == wCode) &&
234 (m_elements[i].isBinary == isBinary))
235 {
236 if (m_elements[i].sequence < minSeq)
237 {
238 minSeq = m_elements[i].sequence;
239 index = i;
240 }
241 }
242 }
243
244 if (index != -1)
245 {
246 void *msg = m_elements[index].msg;
247 m_elements[index].msg = NULL;
248 m_size--;
249 #ifdef _WIN32
250 if (slot != -1)
251 m_waiters[slot] = 0; // release waiter slot
252 #endif
253 unlock();
254 return msg;
255 }
256
257 INT64 startTime = GetCurrentTimeMs();
258
259 #if defined(_WIN32)
260 // Find free slot if needed
261 if (slot == -1)
262 {
263 for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
264 if (!m_waiters[slot])
265 {
266 m_waiters[slot] = 1;
267 if (m_wakeupEvents[slot] == NULL)
268 m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
269 break;
270 }
271
272 if (slot == MAX_MSGQUEUE_WAITERS)
273 {
274 slot = -1;
275 }
276 }
277
278 LeaveCriticalSection(&m_mutex);
279 if (slot != -1)
280 WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
281 else
282 Sleep(50); // Just sleep if there are no waiter slots (highly unlikely during normal operation)
283 EnterCriticalSection(&m_mutex);
284 #elif HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
285 struct timespec ts;
286
287 ts.tv_sec = dwTimeOut / 1000;
288 ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
289 #ifdef _NETWARE
290 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
291 #else
292 pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
293 #endif
294 #elif defined(_USE_GNU_PTH)
295 pth_event_t ev = pth_event(PTH_EVENT_TIME, pth_timeout(dwTimeOut / 1000, (dwTimeOut % 1000) * 1000));
296 pth_cond_await(&m_wakeupCondition, &m_mutex, ev);
297 pth_event_free(ev, PTH_FREE_ALL);
298 #else
299 struct timeval now;
300 struct timespec ts;
301
302 gettimeofday(&now, NULL);
303 ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
304
305 now.tv_usec += (dwTimeOut % 1000) * 1000;
306 ts.tv_sec += now.tv_usec / 1000000;
307 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
308
309 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
310 #endif /* _WIN32 */
311
312 UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
313 dwTimeOut -= min(sleepTime, dwTimeOut);
314 } while(dwTimeOut > 0);
315
316 #ifdef _WIN32
317 if (slot != -1)
318 m_waiters[slot] = 0; // release waiter slot
319 #endif
320
321 unlock();
322 return NULL;
323 }
324
325 /**
326 * Housekeeping run
327 */
328 void MsgWaitQueue::housekeeperRun()
329 {
330 lock();
331 if (m_size > 0)
332 {
333 for(int i = 0; i < m_allocated; i++)
334 {
335 if (m_elements[i].msg == NULL)
336 continue;
337
338 if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
339 {
340 if (m_elements[i].isBinary)
341 {
342 safe_free(m_elements[i].msg);
343 }
344 else
345 {
346 delete (NXCPMessage *)(m_elements[i].msg);
347 }
348 m_elements[i].msg = NULL;
349 m_size--;
350 }
351 else
352 {
353 m_elements[i].ttl -= TTL_CHECK_INTERVAL;
354 }
355 }
356
357 // compact queue if possible
358 if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
359 {
360 m_allocated = ALLOCATION_STEP;
361 free(m_elements);
362 m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
363 }
364 }
365 unlock();
366 }
367
368 /**
369 * Callback for enumerating active queues
370 */
371 EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const void *key, const void *object, void *arg)
372 {
373 ((MsgWaitQueue *)object)->housekeeperRun();
374 return _CONTINUE;
375 }
376
377 /**
378 * Housekeeper thread
379 */
380 THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
381 {
382 while(!m_shutdownCondition.wait(TTL_CHECK_INTERVAL))
383 {
384 m_housekeeperLock.lock();
385 m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback, NULL);
386 m_housekeeperLock.unlock();
387 }
388 return THREAD_OK;
389 }
390
391 /**
392 * Shutdown message wait queue background tasks
393 */
394 void MsgWaitQueue::shutdown()
395 {
396 m_shutdownCondition.set();
397 ThreadJoin(m_housekeeperThread);
398 m_housekeeperLock.lock();
399 m_housekeeperThread = INVALID_THREAD_HANDLE;
400 m_housekeeperLock.unlock();
401 delete m_activeQueues;
402 }
403
404 /**
405 * Diag info callback
406 */
407 EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const void *key, const void *object, void *arg)
408 {
409 MsgWaitQueue *q = (MsgWaitQueue *)object;
410 TCHAR buffer[256];
411 _sntprintf(buffer, 256, _T(" %p size=%d holdTime=%d\n"), q, q->m_size, q->m_holdTime);
412 ((String *)arg)->append(buffer);
413 return _CONTINUE;
414 }
415
416 /**
417 * Get diagnostic info
418 */
419 String MsgWaitQueue::getDiagInfo()
420 {
421 String out;
422 m_housekeeperLock.lock();
423 out.append(m_activeQueues->size());
424 out.append(_T(" active queues\nHousekeeper thread state is "));
425 out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
426 if (m_activeQueues->size() > 0)
427 {
428 out.append(_T("Active queues:\n"));
429 m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
430 }
431 m_housekeeperLock.unlock();
432 return out;
433 }