528f8867e257c6bd6eef1646438d886241d9f302
[public/netxms.git] / src / libnetxms / msgwq.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** NetXMS Foundation Library
4 ** Copyright (C) 2003-2011 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published
8 ** by the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: msgwq.cpp
21 **
22 **/
23
24 #include "libnetxms.h"
25
26 /**
27 * Interval between checking messages TTL in milliseconds
28 */
29 #define TTL_CHECK_INTERVAL 500
30
31 /**
32 * Buffer allocation step
33 */
34 #define ALLOCATION_STEP 16
35
36 /**
37 * Constructor
38 */
39 MsgWaitQueue::MsgWaitQueue()
40 {
41 m_holdTime = 30000; // Default message TTL is 30 seconds
42 m_size = 0;
43 m_allocated = 0;
44 m_elements = NULL;
45 m_sequence = 1;
46 m_stopCondition = ConditionCreate(FALSE);
47 #ifdef _WIN32
48 InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
49 memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
50 m_wakeupEvents[0] = CreateEvent(NULL, FALSE, FALSE, NULL);
51 memset(m_waiters, 0, MAX_MSGQUEUE_WAITERS);
52 #else
53 pthread_mutex_init(&m_mutex, NULL);
54 pthread_cond_init(&m_wakeupCondition, NULL);
55 #endif
56 m_hHkThread = ThreadCreateEx(mwqThreadStarter, 0, this);
57 }
58
59 /**
60 * Destructor
61 */
62 MsgWaitQueue::~MsgWaitQueue()
63 {
64 ConditionSet(m_stopCondition);
65
66 // Wait for housekeeper thread to terminate
67 ThreadJoin(m_hHkThread);
68
69 // Housekeeper thread stopped, proceed with object destruction
70 clear();
71 safe_free(m_elements);
72 ConditionDestroy(m_stopCondition);
73
74 #ifdef _WIN32
75 DeleteCriticalSection(&m_mutex);
76 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
77 if (m_wakeupEvents[i] != NULL)
78 CloseHandle(m_wakeupEvents[i]);
79 #else
80 pthread_mutex_destroy(&m_mutex);
81 pthread_cond_destroy(&m_wakeupCondition);
82 #endif
83 }
84
85 /**
86 * Clear queue
87 */
88 void MsgWaitQueue::clear()
89 {
90 lock();
91
92 for(int i = 0; i < m_allocated; i++)
93 {
94 if (m_elements[i].msg == NULL)
95 continue;
96
97 if (m_elements[i].isBinary)
98 {
99 safe_free(m_elements[i].msg);
100 }
101 else
102 {
103 delete (NXCPMessage *)(m_elements[i].msg);
104 }
105 }
106 m_size = 0;
107 m_allocated = 0;
108 safe_free_and_null(m_elements);
109 unlock();
110 }
111
112 /**
113 * Put message into queue
114 */
115 void MsgWaitQueue::put(NXCPMessage *pMsg)
116 {
117 lock();
118
119 int pos;
120 if (m_size == m_allocated)
121 {
122 pos = m_allocated;
123 m_allocated += ALLOCATION_STEP;
124 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
125 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
126 }
127 else
128 {
129 for(pos = 0; m_elements[pos].msg != NULL; pos++);
130 }
131
132 m_elements[pos].code = pMsg->getCode();
133 m_elements[pos].isBinary = 0;
134 m_elements[pos].id = pMsg->getId();
135 m_elements[pos].ttl = m_holdTime;
136 m_elements[pos].msg = pMsg;
137 m_elements[pos].sequence = m_sequence++;
138 m_size++;
139
140 #ifdef _WIN32
141 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
142 if (m_waiters[i])
143 SetEvent(m_wakeupEvents[i]);
144 #else
145 pthread_cond_broadcast(&m_wakeupCondition);
146 #endif
147
148 unlock();
149 }
150
151 /**
152 * Put raw message into queue
153 */
154 void MsgWaitQueue::put(NXCP_MESSAGE *pMsg)
155 {
156 lock();
157
158 int pos;
159 if (m_size == m_allocated)
160 {
161 pos = m_allocated;
162 m_allocated += ALLOCATION_STEP;
163 m_elements = (WAIT_QUEUE_ELEMENT *)realloc(m_elements, sizeof(WAIT_QUEUE_ELEMENT) * m_allocated);
164 memset(&m_elements[pos], 0, sizeof(WAIT_QUEUE_ELEMENT) * ALLOCATION_STEP);
165 }
166 else
167 {
168 for(pos = 0; m_elements[pos].msg != NULL; pos++);
169 }
170
171 m_elements[pos].code = pMsg->code;
172 m_elements[pos].isBinary = 1;
173 m_elements[pos].id = pMsg->id;
174 m_elements[pos].ttl = m_holdTime;
175 m_elements[pos].msg = pMsg;
176 m_elements[pos].sequence = m_sequence++;
177 m_size++;
178
179 #ifdef _WIN32
180 for(int i = 0; i < MAX_MSGQUEUE_WAITERS; i++)
181 if (m_waiters[i])
182 SetEvent(m_wakeupEvents[i]);
183 #else
184 pthread_cond_broadcast(&m_wakeupCondition);
185 #endif
186
187 unlock();
188 }
189
190 /**
191 * Wait for message with specific code and ID
192 * Function return pointer to the message on success or
193 * NULL on timeout or error
194 */
195 void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32 dwId, UINT32 dwTimeOut)
196 {
197 lock();
198
199 #ifdef _WIN32
200 int slot = -1;
201 #endif
202
203 do
204 {
205 UINT64 minSeq = _ULL(0xFFFFFFFFFFFFFFFF);
206 int index = -1;
207 for(int i = 0; i < m_allocated; i++)
208 {
209 if ((m_elements[i].msg != NULL) &&
210 (m_elements[i].id == dwId) &&
211 (m_elements[i].code == wCode) &&
212 (m_elements[i].isBinary == isBinary))
213 {
214 if (m_elements[i].sequence < minSeq)
215 {
216 minSeq = m_elements[i].sequence;
217 index = i;
218 }
219 }
220 }
221
222 if (index != -1)
223 {
224 void *msg = m_elements[index].msg;
225 m_elements[index].msg = NULL;
226 m_size--;
227 #ifdef _WIN32
228 if (slot != -1)
229 m_waiters[slot] = 0; // release waiter slot
230 #endif
231 unlock();
232 return msg;
233 }
234
235 INT64 startTime = GetCurrentTimeMs();
236
237 #ifdef _WIN32
238 // Find free slot if needed
239 if (slot == -1)
240 {
241 for(slot = 0; slot < MAX_MSGQUEUE_WAITERS; slot++)
242 if (!m_waiters[slot])
243 {
244 m_waiters[slot] = 1;
245 if (m_wakeupEvents[slot] == NULL)
246 m_wakeupEvents[slot] = CreateEvent(NULL, FALSE, FALSE, NULL);
247 break;
248 }
249
250 if (slot == MAX_MSGQUEUE_WAITERS)
251 {
252 slot = -1;
253 }
254 }
255
256 LeaveCriticalSection(&m_mutex);
257 if (slot != -1)
258 WaitForSingleObject(m_wakeupEvents[slot], dwTimeOut);
259 else
260 Sleep(50); // Just sleep if there are no waiter slots (highly unlikely during normal operation)
261 EnterCriticalSection(&m_mutex);
262 #else
263 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP || defined(_NETWARE)
264 struct timespec ts;
265
266 ts.tv_sec = dwTimeOut / 1000;
267 ts.tv_nsec = (dwTimeOut % 1000) * 1000000;
268 #ifdef _NETWARE
269 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
270 #else
271 pthread_cond_reltimedwait_np(&m_wakeupCondition, &m_mutex, &ts);
272 #endif
273 #else
274 struct timeval now;
275 struct timespec ts;
276
277 gettimeofday(&now, NULL);
278 ts.tv_sec = now.tv_sec + (dwTimeOut / 1000);
279
280 now.tv_usec += (dwTimeOut % 1000) * 1000;
281 ts.tv_sec += now.tv_usec / 1000000;
282 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
283
284 pthread_cond_timedwait(&m_wakeupCondition, &m_mutex, &ts);
285 #endif /* HAVE_PTHREAD_COND_RELTIMEDWAIT_NP */
286 #endif /* _WIN32 */
287
288 UINT32 sleepTime = (UINT32)(GetCurrentTimeMs() - startTime);
289 dwTimeOut -= min(sleepTime, dwTimeOut);
290 } while(dwTimeOut > 0);
291
292 #ifdef _WIN32
293 if (slot != -1)
294 m_waiters[slot] = 0; // release waiter slot
295 #endif
296
297 unlock();
298 return NULL;
299 }
300
301 /**
302 * Housekeeping thread
303 */
304 void MsgWaitQueue::housekeeperThread()
305 {
306 while(!ConditionWait(m_stopCondition, TTL_CHECK_INTERVAL))
307 {
308 lock();
309 if (m_size > 0)
310 {
311 for(int i = 0; i < m_allocated; i++)
312 {
313 if (m_elements[i].msg == NULL)
314 continue;
315
316 if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
317 {
318 if (m_elements[i].isBinary)
319 {
320 safe_free(m_elements[i].msg);
321 }
322 else
323 {
324 delete (NXCPMessage *)(m_elements[i].msg);
325 }
326 m_elements[i].msg = NULL;
327 m_size--;
328 }
329 else
330 {
331 m_elements[i].ttl -= TTL_CHECK_INTERVAL;
332 }
333 }
334
335 // compact queue if possible
336 if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
337 {
338 m_allocated = ALLOCATION_STEP;
339 free(m_elements);
340 m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
341 }
342 }
343 unlock();
344 }
345 }
346
347 /**
348 * Housekeeper thread starter
349 */
350 THREAD_RESULT THREAD_CALL MsgWaitQueue::mwqThreadStarter(void *arg)
351 {
352 ((MsgWaitQueue *)arg)->housekeeperThread();
353 return THREAD_OK;
354 }