eee4ea07bcd594d48a741bed4f05dba3d042d95a
[public/netxms.git] / src / libnetxms / queue.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2013 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published
7 ** by the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: queue.cpp
20 **
21 **/
22
23 #include "libnetxms.h"
24 #include <nxqueue.h>
25
26 /**
27 * Queue constructor
28 */
29 Queue::Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement)
30 {
31 m_dwBufferSize = dwInitialSize;
32 m_dwBufferIncrement = dwBufferIncrement;
33 CommonInit();
34 }
35
36 /**
37 * Default queue constructor
38 */
39 Queue::Queue()
40 {
41 m_dwBufferSize = 256;
42 m_dwBufferIncrement = 32;
43 CommonInit();
44 }
45
46
47 //
48 // Common initialization (used by all constructors)
49 //
50
51 void Queue::CommonInit()
52 {
53 m_mutexQueueAccess = MutexCreate();
54 m_condWakeup = ConditionCreate(FALSE);
55 m_dwNumElements = 0;
56 m_dwFirst = 0;
57 m_dwLast = 0;
58 m_pElements = (void **)malloc(sizeof(void *) * m_dwBufferSize);
59 m_bShutdownFlag = FALSE;
60 }
61
62
63 //
64 // Destructor
65 //
66
67 Queue::~Queue()
68 {
69 MutexDestroy(m_mutexQueueAccess);
70 ConditionDestroy(m_condWakeup);
71 safe_free(m_pElements);
72 }
73
74
75 //
76 // Put new element into queue
77 //
78
79 void Queue::Put(void *pElement)
80 {
81 Lock();
82 if (m_dwNumElements == m_dwBufferSize)
83 {
84 // Extend buffer
85 m_dwBufferSize += m_dwBufferIncrement;
86 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
87
88 // Move free space
89 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
90 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
91 m_dwFirst += m_dwBufferIncrement;
92 }
93 m_pElements[m_dwLast++] = pElement;
94 if (m_dwLast == m_dwBufferSize)
95 m_dwLast = 0;
96 m_dwNumElements++;
97 ConditionSet(m_condWakeup);
98 Unlock();
99 }
100
101
102 //
103 // Insert new element into the beginning of a queue
104 //
105
106 void Queue::Insert(void *pElement)
107 {
108 Lock();
109 if (m_dwNumElements == m_dwBufferSize)
110 {
111 // Extend buffer
112 m_dwBufferSize += m_dwBufferIncrement;
113 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
114
115 // Move free space
116 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
117 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
118 m_dwFirst += m_dwBufferIncrement;
119 }
120 if (m_dwFirst == 0)
121 m_dwFirst = m_dwBufferSize;
122 m_pElements[--m_dwFirst] = pElement;
123 m_dwNumElements++;
124 ConditionSet(m_condWakeup);
125 Unlock();
126 }
127
128
129 //
130 // Get object from queue. Return NULL if queue is empty
131 //
132
133 void *Queue::Get()
134 {
135 void *pElement = NULL;
136
137 Lock();
138 if (m_bShutdownFlag)
139 {
140 pElement = INVALID_POINTER_VALUE;
141 }
142 else
143 {
144 while((m_dwNumElements > 0) && (pElement == NULL))
145 {
146 pElement = m_pElements[m_dwFirst++];
147 if (m_dwFirst == m_dwBufferSize)
148 m_dwFirst = 0;
149 m_dwNumElements--;
150 }
151 }
152 Unlock();
153 return pElement;
154 }
155
156
157 //
158 // Get object from queue or block if queue if empty
159 //
160
161 void *Queue::GetOrBlock()
162 {
163 void *pElement;
164
165 pElement = Get();
166 if (pElement != NULL)
167 {
168 return pElement;
169 }
170
171 do
172 {
173 ConditionWait(m_condWakeup, INFINITE);
174 pElement = Get();
175 } while(pElement == NULL);
176 return pElement;
177 }
178
179
180 //
181 // Clear queue
182 //
183
184 void Queue::Clear()
185 {
186 Lock();
187 m_dwNumElements = 0;
188 m_dwFirst = 0;
189 m_dwLast = 0;
190 Unlock();
191 }
192
193
194 //
195 // Set shutdown flag
196 // When this flag is set, Get() always return INVALID_POINTER_VALUE
197 //
198
199 void Queue::SetShutdownMode()
200 {
201 Lock();
202 m_bShutdownFlag = TRUE;
203 ConditionSet(m_condWakeup);
204 Unlock();
205 }
206
207
208 //
209 // Find element in queue using given key and comparator
210 // Returns pointer to element or NULL if element was not found.
211 // Element remains in the queue
212 //
213
214 void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
215 {
216 void *element = NULL;
217 UINT32 i, pos;
218
219 Lock();
220 for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
221 {
222 if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
223 {
224 element = m_pElements[pos];
225 break;
226 }
227 pos++;
228 if (pos == m_dwBufferSize)
229 pos = 0;
230 }
231 Unlock();
232 return element;
233 }
234
235
236 //
237 // Find element in queue using given key and comparator
238 // Returns pointer to element or NULL if element was not found.
239 // Element remains in the queue
240 //
241
242 bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
243 {
244 bool success = false;
245 UINT32 i, pos;
246
247 Lock();
248 for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
249 {
250 if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
251 {
252 m_pElements[pos] = NULL;
253 success = true;
254 break;
255 }
256 pos++;
257 if (pos == m_dwBufferSize)
258 pos = 0;
259 }
260 Unlock();
261 return success;
262 }