syslog message write to database optimized
[public/netxms.git] / src / libnetxms / queue.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2013 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published
7 ** by the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: queue.cpp
20 **
21 **/
22
23 #include "libnetxms.h"
24 #include <nxqueue.h>
25
26 /**
27 * Queue constructor
28 */
29 Queue::Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement)
30 {
31 m_dwBufferSize = dwInitialSize;
32 m_dwBufferIncrement = dwBufferIncrement;
33 commonInit();
34 }
35
36 /**
37 * Default queue constructor
38 */
39 Queue::Queue()
40 {
41 m_dwBufferSize = 256;
42 m_dwBufferIncrement = 32;
43 commonInit();
44 }
45
46 /**
47 * Common initialization (used by all constructors)
48 */
49 void Queue::commonInit()
50 {
51 m_mutexQueueAccess = MutexCreate();
52 m_condWakeup = ConditionCreate(FALSE);
53 m_dwNumElements = 0;
54 m_dwFirst = 0;
55 m_dwLast = 0;
56 m_pElements = (void **)malloc(sizeof(void *) * m_dwBufferSize);
57 m_bShutdownFlag = FALSE;
58 }
59
60 /**
61 * Destructor
62 */
63 Queue::~Queue()
64 {
65 MutexDestroy(m_mutexQueueAccess);
66 ConditionDestroy(m_condWakeup);
67 safe_free(m_pElements);
68 }
69
70 /**
71 * Put new element into queue
72 */
73 void Queue::Put(void *pElement)
74 {
75 lock();
76 if (m_dwNumElements == m_dwBufferSize)
77 {
78 // Extend buffer
79 m_dwBufferSize += m_dwBufferIncrement;
80 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
81
82 // Move free space
83 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
84 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
85 m_dwFirst += m_dwBufferIncrement;
86 }
87 m_pElements[m_dwLast++] = pElement;
88 if (m_dwLast == m_dwBufferSize)
89 m_dwLast = 0;
90 m_dwNumElements++;
91 ConditionSet(m_condWakeup);
92 unlock();
93 }
94
95 /**
96 * Insert new element into the beginning of a queue
97 */
98 void Queue::Insert(void *pElement)
99 {
100 lock();
101 if (m_dwNumElements == m_dwBufferSize)
102 {
103 // Extend buffer
104 m_dwBufferSize += m_dwBufferIncrement;
105 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
106
107 // Move free space
108 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
109 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
110 m_dwFirst += m_dwBufferIncrement;
111 }
112 if (m_dwFirst == 0)
113 m_dwFirst = m_dwBufferSize;
114 m_pElements[--m_dwFirst] = pElement;
115 m_dwNumElements++;
116 ConditionSet(m_condWakeup);
117 unlock();
118 }
119
120 /**
121 * Get object from queue. Return NULL if queue is empty
122 */
123 void *Queue::Get()
124 {
125 void *pElement = NULL;
126
127 lock();
128 if (m_bShutdownFlag)
129 {
130 pElement = INVALID_POINTER_VALUE;
131 }
132 else
133 {
134 while((m_dwNumElements > 0) && (pElement == NULL))
135 {
136 pElement = m_pElements[m_dwFirst++];
137 if (m_dwFirst == m_dwBufferSize)
138 m_dwFirst = 0;
139 m_dwNumElements--;
140 }
141 }
142 unlock();
143 return pElement;
144 }
145
146 /**
147 * Get object from queue or block if queue if empty
148 */
149 void *Queue::GetOrBlock()
150 {
151 void *pElement;
152
153 pElement = Get();
154 if (pElement != NULL)
155 {
156 return pElement;
157 }
158
159 do
160 {
161 ConditionWait(m_condWakeup, INFINITE);
162 pElement = Get();
163 } while(pElement == NULL);
164 return pElement;
165 }
166
167 /**
168 * Clear queue
169 */
170 void Queue::Clear()
171 {
172 lock();
173 m_dwNumElements = 0;
174 m_dwFirst = 0;
175 m_dwLast = 0;
176 unlock();
177 }
178
179 /**
180 * Set shutdown flag
181 * When this flag is set, Get() always return INVALID_POINTER_VALUE
182 */
183 void Queue::SetShutdownMode()
184 {
185 lock();
186 m_bShutdownFlag = TRUE;
187 ConditionSet(m_condWakeup);
188 unlock();
189 }
190
191 /**
192 * Find element in queue using given key and comparator
193 * Returns pointer to element or NULL if element was not found.
194 * Element remains in the queue
195 */
196 void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
197 {
198 void *element = NULL;
199 UINT32 i, pos;
200
201 lock();
202 for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
203 {
204 if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
205 {
206 element = m_pElements[pos];
207 break;
208 }
209 pos++;
210 if (pos == m_dwBufferSize)
211 pos = 0;
212 }
213 unlock();
214 return element;
215 }
216
217 /**
218 * Find element in queue using given key and comparator and remove it.
219 * Returns true if element was removed.
220 */
221 bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
222 {
223 bool success = false;
224 UINT32 i, pos;
225
226 lock();
227 for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
228 {
229 if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
230 {
231 m_pElements[pos] = NULL;
232 success = true;
233 break;
234 }
235 pos++;
236 if (pos == m_dwBufferSize)
237 pos = 0;
238 }
239 unlock();
240 return success;
241 }