dc3776b54eefcc226f30078c4b60a922413aa26f
[public/netxms.git] / src / libnetxms / queue.cpp
1 /* $Id$ */
2 /*
3 ** NetXMS - Network Management System
4 ** Copyright (C) 2003, 2004, 2005, 2006, 2007 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU General Public License as published by
8 ** the Free Software Foundation; either version 2 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: queue.cpp
21 **
22 **/
23
24 #include "libnetxms.h"
25 #include <nxqueue.h>
26
27
28 //
29 // Queue constructor
30 //
31
32 Queue::Queue(DWORD dwInitialSize, DWORD dwBufferIncrement)
33 {
34 m_dwBufferSize = dwInitialSize;
35 m_dwBufferIncrement = dwBufferIncrement;
36 CommonInit();
37 }
38
39
40 //
41 // Default queue constructor
42 //
43
44 Queue::Queue()
45 {
46 m_dwBufferSize = 256;
47 m_dwBufferIncrement = 32;
48 CommonInit();
49 }
50
51
52 //
53 // Common initialization (used by all constructors)
54 //
55
56 void Queue::CommonInit()
57 {
58 m_mutexQueueAccess = MutexCreate();
59 m_condWakeup = ConditionCreate(FALSE);
60 m_dwNumElements = 0;
61 m_dwFirst = 0;
62 m_dwLast = 0;
63 m_pElements = (void **)malloc(sizeof(void *) * m_dwBufferSize);
64 m_bShutdownFlag = FALSE;
65 }
66
67
68 //
69 // Destructor
70 //
71
72 Queue::~Queue()
73 {
74 MutexDestroy(m_mutexQueueAccess);
75 ConditionDestroy(m_condWakeup);
76 safe_free(m_pElements);
77 }
78
79
80 //
81 // Put new element into queue
82 //
83
84 void Queue::Put(void *pElement)
85 {
86 Lock();
87 if (m_dwNumElements == m_dwBufferSize)
88 {
89 // Extend buffer
90 m_dwBufferSize += m_dwBufferIncrement;
91 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
92
93 // Move free space
94 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
95 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
96 m_dwFirst += m_dwBufferIncrement;
97 }
98 m_pElements[m_dwLast++] = pElement;
99 if (m_dwLast == m_dwBufferSize)
100 m_dwLast = 0;
101 m_dwNumElements++;
102 ConditionSet(m_condWakeup);
103 Unlock();
104 }
105
106
107 //
108 // Insert new element into the beginning of a queue
109 //
110
111 void Queue::Insert(void *pElement)
112 {
113 Lock();
114 if (m_dwNumElements == m_dwBufferSize)
115 {
116 // Extend buffer
117 m_dwBufferSize += m_dwBufferIncrement;
118 m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
119
120 // Move free space
121 memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
122 sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
123 m_dwFirst += m_dwBufferIncrement;
124 }
125 if (m_dwFirst == 0)
126 m_dwFirst = m_dwBufferSize - 1;
127 m_pElements[--m_dwFirst] = pElement;
128 m_dwNumElements++;
129 ConditionSet(m_condWakeup);
130 Unlock();
131 }
132
133
134 //
135 // Get object from queue. Return NULL if queue is empty
136 //
137
138 void *Queue::Get()
139 {
140 void *pElement = NULL;
141
142 Lock();
143 if (m_bShutdownFlag)
144 {
145 pElement = INVALID_POINTER_VALUE;
146 }
147 else if (m_dwNumElements != 0)
148 {
149 pElement = m_pElements[m_dwFirst++];
150 if (m_dwFirst == m_dwBufferSize)
151 m_dwFirst = 0;
152 m_dwNumElements--;
153 }
154 Unlock();
155 return pElement;
156 }
157
158
159 //
160 // Get object from queue or block if queue if empty
161 //
162
163 void *Queue::GetOrBlock()
164 {
165 void *pElement;
166
167 pElement = Get();
168 if (pElement != NULL)
169 {
170 return pElement;
171 }
172
173 do
174 {
175 ConditionWait(m_condWakeup, INFINITE);
176 pElement = Get();
177 } while(pElement == NULL);
178 return pElement;
179 }
180
181
182 //
183 // Clear queue
184 //
185
186 void Queue::Clear()
187 {
188 Lock();
189 m_dwNumElements = 0;
190 m_dwFirst = 0;
191 m_dwLast = 0;
192 Unlock();
193 }
194
195
196 //
197 // Set shutdown flag
198 // When this flag is set, Get() always return INVALID_POINTER_VALUE
199 //
200
201 void Queue::SetShutdownMode()
202 {
203 Lock();
204 m_bShutdownFlag = TRUE;
205 ConditionSet(m_condWakeup);
206 Unlock();
207 }