Changelog update
[public/netxms.git] / src / libnetxms / queue.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published
7 ** by the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: queue.cpp
20 **
21 **/
22
23 #include "libnetxms.h"
24 #include <nxqueue.h>
25
26 /**
27 * Queue constructor
28 */
29 Queue::Queue(UINT32 initialSize, UINT32 bufferIncrement)
30 {
31 m_initialSize = initialSize;
32 m_bufferSize = initialSize;
33 m_bufferIncrement = bufferIncrement;
34 commonInit();
35 }
36
37 /**
38 * Default queue constructor
39 */
40 Queue::Queue()
41 {
42 m_initialSize = 256;
43 m_bufferSize = 256;
44 m_bufferIncrement = 32;
45 commonInit();
46 }
47
48 /**
49 * Common initialization (used by all constructors)
50 */
51 void Queue::commonInit()
52 {
53 m_mutexQueueAccess = MutexCreate();
54 m_condWakeup = ConditionCreate(FALSE);
55 m_numElements = 0;
56 m_first = 0;
57 m_last = 0;
58 m_elements = (void **)malloc(sizeof(void *) * m_bufferSize);
59 m_shutdownFlag = FALSE;
60 }
61
62 /**
63 * Destructor
64 */
65 Queue::~Queue()
66 {
67 MutexDestroy(m_mutexQueueAccess);
68 ConditionDestroy(m_condWakeup);
69 safe_free(m_elements);
70 }
71
72 /**
73 * Put new element into queue
74 */
75 void Queue::put(void *pElement)
76 {
77 lock();
78 if (m_numElements == m_bufferSize)
79 {
80 // Extend buffer
81 m_bufferSize += m_bufferIncrement;
82 m_elements = (void **)realloc(m_elements, sizeof(void *) * m_bufferSize);
83
84 // Move free space
85 memmove(&m_elements[m_first + m_bufferIncrement], &m_elements[m_first],
86 sizeof(void *) * (m_bufferSize - m_first - m_bufferIncrement));
87 m_first += m_bufferIncrement;
88 }
89 m_elements[m_last++] = pElement;
90 if (m_last == m_bufferSize)
91 m_last = 0;
92 m_numElements++;
93 ConditionSet(m_condWakeup);
94 unlock();
95 }
96
97 /**
98 * Insert new element into the beginning of a queue
99 */
100 void Queue::insert(void *pElement)
101 {
102 lock();
103 if (m_numElements == m_bufferSize)
104 {
105 // Extend buffer
106 m_bufferSize += m_bufferIncrement;
107 m_elements = (void **)realloc(m_elements, sizeof(void *) * m_bufferSize);
108
109 // Move free space
110 memmove(&m_elements[m_first + m_bufferIncrement], &m_elements[m_first],
111 sizeof(void *) * (m_bufferSize - m_first - m_bufferIncrement));
112 m_first += m_bufferIncrement;
113 }
114 if (m_first == 0)
115 m_first = m_bufferSize;
116 m_elements[--m_first] = pElement;
117 m_numElements++;
118 ConditionSet(m_condWakeup);
119 unlock();
120 }
121
122 /**
123 * Get object from queue. Return NULL if queue is empty
124 */
125 void *Queue::get()
126 {
127 void *pElement = NULL;
128
129 lock();
130 if (m_shutdownFlag)
131 {
132 pElement = INVALID_POINTER_VALUE;
133 }
134 else
135 {
136 while((m_numElements > 0) && (pElement == NULL))
137 {
138 pElement = m_elements[m_first++];
139 if (m_first == m_bufferSize)
140 m_first = 0;
141 m_numElements--;
142 }
143 shrink();
144 }
145 unlock();
146 return pElement;
147 }
148
149 /**
150 * Get object from queue or block with timeout if queue if empty
151 */
152 void *Queue::getOrBlock(UINT32 timeout)
153 {
154 void *pElement = get();
155 if (pElement != NULL)
156 {
157 return pElement;
158 }
159
160 do
161 {
162 if (!ConditionWait(m_condWakeup, timeout))
163 break;
164 pElement = get();
165 } while(pElement == NULL);
166 return pElement;
167 }
168
169 /**
170 * Clear queue
171 */
172 void Queue::clear()
173 {
174 lock();
175 m_numElements = 0;
176 m_first = 0;
177 m_last = 0;
178 shrink();
179 unlock();
180 }
181
182 /**
183 * Set shutdown flag
184 * When this flag is set, Get() always return INVALID_POINTER_VALUE
185 */
186 void Queue::setShutdownMode()
187 {
188 lock();
189 m_shutdownFlag = TRUE;
190 ConditionSet(m_condWakeup);
191 unlock();
192 }
193
194 /**
195 * Find element in queue using given key and comparator
196 * Returns pointer to element or NULL if element was not found.
197 * Element remains in the queue
198 */
199 void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
200 {
201 void *element = NULL;
202 UINT32 i, pos;
203
204 lock();
205 for(i = 0, pos = m_first; i < m_numElements; i++)
206 {
207 if ((m_elements[pos] != NULL) && (m_elements[pos] != INVALID_POINTER_VALUE) && comparator(key, m_elements[pos]))
208 {
209 element = m_elements[pos];
210 break;
211 }
212 pos++;
213 if (pos == m_bufferSize)
214 pos = 0;
215 }
216 unlock();
217 return element;
218 }
219
220 /**
221 * Find element in queue using given key and comparator and remove it.
222 * Returns true if element was removed.
223 */
224 bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
225 {
226 bool success = false;
227 UINT32 i, pos;
228
229 lock();
230 for(i = 0, pos = m_first; i < m_numElements; i++)
231 {
232 if ((m_elements[pos] != NULL) && comparator(key, m_elements[pos]))
233 {
234 m_elements[pos] = NULL;
235 success = true;
236 break;
237 }
238 pos++;
239 if (pos == m_bufferSize)
240 pos = 0;
241 }
242 unlock();
243 return success;
244 }
245
246 /**
247 * Shrink queue if possible
248 */
249 void Queue::shrink()
250 {
251 if ((m_bufferSize == m_initialSize) || (m_numElements > m_initialSize / 2) || ((m_numElements > 0) && (m_last < m_first)))
252 return;
253
254 if ((m_numElements > 0) && (m_first > 0))
255 {
256 memmove(&m_elements[0], &m_elements[m_first], sizeof(void *) * m_numElements);
257 m_last -= m_first;
258 m_first = 0;
259 }
260 m_bufferSize = m_initialSize;
261 m_elements = (void **)realloc(m_elements, m_bufferSize * sizeof(void *));
262 }