Commit | Line | Data |
---|---|---|
5d3459af VK |
1 | /* |
2 | ** NetXMS - Network Management System | |
3 | ** NetXMS Foundation Library | |
eb077f61 | 4 | ** Copyright (C) 2003-2016 Victor Kirhenshtein |
5d3459af VK |
5 | ** |
6 | ** This program is free software; you can redistribute it and/or modify | |
7 | ** it under the terms of the GNU Lesser General Public License as published | |
8 | ** by the Free Software Foundation; either version 3 of the License, or | |
9 | ** (at your option) any later version. | |
10 | ** | |
11 | ** This program is distributed in the hope that it will be useful, | |
12 | ** but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 | ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 | ** GNU General Public License for more details. | |
15 | ** | |
16 | ** You should have received a copy of the GNU Lesser General Public License | |
17 | ** along with this program; if not, write to the Free Software | |
18 | ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. | |
19 | ** | |
20 | ** File: tp.cpp | |
21 | ** | |
22 | **/ | |
23 | ||
24 | #include "libnetxms.h" | |
25 | #include <nxqueue.h> | |
26 | ||
7e1816e5 VK |
27 | /** |
28 | * Load average calculation | |
29 | */ | |
30 | #define FP_SHIFT 11 /* nr of bits of precision */ | |
31 | #define FP_1 (1 << FP_SHIFT) /* 1.0 as fixed-point */ | |
32 | #define EXP_1 1884 /* 1/exp(5sec/1min) as fixed-point */ | |
33 | #define EXP_5 2014 /* 1/exp(5sec/5min) */ | |
34 | #define EXP_15 2037 /* 1/exp(5sec/15min) */ | |
35 | #define CALC_LOAD(load, exp, n) do { load *= exp; load += n * (FP_1 - exp); load >>= FP_SHIFT; } while(0) | |
36 | ||
37 | /** | |
38 | * Worker thread idle timeout in milliseconds | |
39 | */ | |
40 | #define THREAD_IDLE_TIMEOUT 300000 | |
41 | ||
42 | /** | |
43 | * Worker thread data | |
44 | */ | |
45 | struct WorkerThreadInfo | |
46 | { | |
47 | ThreadPool *pool; | |
48 | THREAD handle; | |
49 | }; | |
50 | ||
5d3459af VK |
51 | /** |
52 | * Thread pool | |
53 | */ | |
54 | struct ThreadPool | |
55 | { | |
56 | int minThreads; | |
57 | int maxThreads; | |
5d3459af VK |
58 | VolatileCounter activeRequests; |
59 | MUTEX mutex; | |
7e1816e5 VK |
60 | THREAD maintThread; |
61 | CONDITION maintThreadStop; | |
62 | HashMap<UINT64, WorkerThreadInfo> *threads; | |
5d3459af | 63 | Queue *queue; |
e1415980 VK |
64 | StringObjectMap<Queue> *serializationQueues; |
65 | MUTEX serializationLock; | |
5d3459af | 66 | TCHAR *name; |
7e1816e5 VK |
67 | bool shutdownMode; |
68 | INT32 loadAverage[3]; | |
5d3459af VK |
69 | }; |
70 | ||
374afd7b VK |
71 | /** |
72 | * Thread pool registry | |
73 | */ | |
74 | static StringObjectMap<ThreadPool> s_registry(false); | |
75 | static MUTEX s_registryLock = MutexCreate(); | |
76 | ||
5d3459af VK |
77 | /** |
78 | * Thread work request | |
79 | */ | |
80 | struct WorkRequest | |
81 | { | |
82 | ThreadPoolWorkerFunction func; | |
83 | void *arg; | |
7e1816e5 | 84 | bool inactivityStop; |
5d3459af VK |
85 | }; |
86 | ||
7e1816e5 VK |
87 | /** |
88 | * Worker function to join stopped thread | |
89 | */ | |
90 | static void JoinWorkerThread(void *arg) | |
91 | { | |
92 | ThreadJoin(((WorkerThreadInfo *)arg)->handle); | |
93 | delete (WorkerThreadInfo *)arg; | |
94 | } | |
95 | ||
5d3459af VK |
96 | /** |
97 | * Worker thread function | |
98 | */ | |
99 | static THREAD_RESULT THREAD_CALL WorkerThread(void *arg) | |
100 | { | |
7e1816e5 VK |
101 | ThreadPool *p = ((WorkerThreadInfo *)arg)->pool; |
102 | Queue *q = p->queue; | |
5d3459af VK |
103 | while(true) |
104 | { | |
19dbc8ef | 105 | WorkRequest *rq = (WorkRequest *)q->getOrBlock(); |
7e1816e5 VK |
106 | |
107 | if (rq->func == NULL) // stop indicator | |
108 | { | |
109 | if (rq->inactivityStop) | |
110 | { | |
111 | MutexLock(p->mutex); | |
112 | p->threads->remove(CAST_FROM_POINTER(arg, UINT64)); | |
113 | MutexUnlock(p->mutex); | |
114 | ||
2df047f4 | 115 | nxlog_debug(3, _T("Stopping worker thread in thread pool %s due to inactivity"), p->name); |
7e1816e5 VK |
116 | |
117 | free(rq); | |
118 | rq = (WorkRequest *)malloc(sizeof(WorkRequest)); | |
119 | rq->func = JoinWorkerThread; | |
120 | rq->arg = arg; | |
121 | InterlockedIncrement(&p->activeRequests); | |
122 | p->queue->put(rq); | |
123 | } | |
124 | break; | |
125 | } | |
126 | ||
5d3459af VK |
127 | rq->func(rq->arg); |
128 | free(rq); | |
7e1816e5 | 129 | InterlockedDecrement(&p->activeRequests); |
5d3459af VK |
130 | } |
131 | return THREAD_OK; | |
132 | } | |
133 | ||
7e1816e5 VK |
134 | /** |
135 | * Thread pool maintenance thread | |
136 | */ | |
137 | static THREAD_RESULT THREAD_CALL MaintenanceThread(void *arg) | |
138 | { | |
139 | ThreadPool *p = (ThreadPool *)arg; | |
140 | int count = 0; | |
141 | while(!ConditionWait(p->maintThreadStop, 5000)) | |
142 | { | |
143 | INT32 requestCount = (INT32)p->activeRequests << FP_SHIFT; | |
144 | CALC_LOAD(p->loadAverage[0], EXP_1, requestCount); | |
145 | CALC_LOAD(p->loadAverage[1], EXP_5, requestCount); | |
146 | CALC_LOAD(p->loadAverage[2], EXP_15, requestCount); | |
147 | ||
148 | count++; | |
149 | if (count == 12) // do pool check once per minute | |
150 | { | |
151 | MutexLock(p->mutex); | |
152 | INT32 threadCount = p->threads->size(); | |
153 | MutexUnlock(p->mutex); | |
154 | if ((threadCount > p->minThreads) && (p->loadAverage[1] < 1024 * threadCount)) // 5 minutes load average < 0.5 * thread count | |
155 | { | |
156 | WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest)); | |
157 | rq->func = NULL; | |
158 | rq->arg = NULL; | |
159 | rq->inactivityStop = true; | |
160 | p->queue->put(rq); | |
161 | } | |
162 | count = 0; | |
163 | } | |
164 | } | |
2df047f4 | 165 | nxlog_debug(3, _T("Maintenance thread for thread pool %s stopped"), p->name); |
7e1816e5 VK |
166 | return THREAD_OK; |
167 | } | |
168 | ||
5d3459af VK |
169 | /** |
170 | * Create thread pool | |
171 | */ | |
172 | ThreadPool LIBNETXMS_EXPORTABLE *ThreadPoolCreate(int minThreads, int maxThreads, const TCHAR *name) | |
173 | { | |
174 | ThreadPool *p = (ThreadPool *)malloc(sizeof(ThreadPool)); | |
175 | p->minThreads = minThreads; | |
176 | p->maxThreads = maxThreads; | |
5d3459af | 177 | p->activeRequests = 0; |
7e1816e5 | 178 | p->threads = new HashMap<UINT64, WorkerThreadInfo>(); |
5d3459af VK |
179 | p->queue = new Queue(64, 64); |
180 | p->mutex = MutexCreate(); | |
7e1816e5 | 181 | p->maintThreadStop = ConditionCreate(TRUE); |
e1415980 VK |
182 | p->serializationQueues = new StringObjectMap<Queue>(true); |
183 | p->serializationQueues->setIgnoreCase(false); | |
184 | p->serializationLock = MutexCreate(); | |
5d3459af | 185 | p->name = (name != NULL) ? _tcsdup(name) : _tcsdup(_T("NONAME")); |
7e1816e5 VK |
186 | p->shutdownMode = false; |
187 | p->loadAverage[0] = 0; | |
188 | p->loadAverage[1] = 0; | |
189 | p->loadAverage[2] = 0; | |
5d3459af | 190 | |
7e1816e5 VK |
191 | for(int i = 0; i < p->minThreads; i++) |
192 | { | |
193 | WorkerThreadInfo *wt = new WorkerThreadInfo; | |
194 | wt->pool = p; | |
195 | wt->handle = ThreadCreateEx(WorkerThread, 0, wt); | |
196 | p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt); | |
197 | } | |
5d3459af | 198 | |
7e1816e5 | 199 | p->maintThread = ThreadCreateEx(MaintenanceThread, 0, p); |
374afd7b VK |
200 | |
201 | MutexLock(s_registryLock); | |
202 | s_registry.set(p->name, p); | |
203 | MutexUnlock(s_registryLock); | |
204 | ||
2df047f4 | 205 | nxlog_debug(1, _T("Thread pool %s initialized (min=%d, max=%d)"), p->name, p->minThreads, p->maxThreads); |
5d3459af VK |
206 | return p; |
207 | } | |
208 | ||
7e1816e5 VK |
209 | /** |
210 | * Callback for joining all worker threads on pool destruction | |
211 | */ | |
212 | static EnumerationCallbackResult ThreadPoolDestroyCallback(const void *key, const void *object, void *arg) | |
213 | { | |
214 | ThreadJoin(((WorkerThreadInfo *)object)->handle); | |
215 | return _CONTINUE; | |
216 | } | |
217 | ||
5d3459af VK |
218 | /** |
219 | * Destroy thread pool | |
220 | */ | |
221 | void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p) | |
222 | { | |
2df047f4 | 223 | nxlog_debug(3, _T("Stopping threads in thread pool %s"), p->name); |
5d3459af | 224 | |
374afd7b VK |
225 | MutexLock(s_registryLock); |
226 | s_registry.remove(p->name); | |
227 | MutexUnlock(s_registryLock); | |
228 | ||
7e1816e5 VK |
229 | MutexLock(p->mutex); |
230 | p->shutdownMode = true; | |
231 | MutexUnlock(p->mutex); | |
232 | ||
233 | ConditionSet(p->maintThreadStop); | |
234 | ThreadJoin(p->maintThread); | |
aea481fe | 235 | ConditionDestroy(p->maintThreadStop); |
7e1816e5 | 236 | |
5d3459af VK |
237 | WorkRequest rq; |
238 | rq.func = NULL; | |
7e1816e5 VK |
239 | rq.inactivityStop = false; |
240 | for(int i = 0; i < p->threads->size(); i++) | |
19dbc8ef | 241 | p->queue->put(&rq); |
7e1816e5 | 242 | p->threads->forEach(ThreadPoolDestroyCallback, NULL); |
5d3459af | 243 | |
2df047f4 | 244 | nxlog_debug(1, _T("Thread pool %s destroyed"), p->name); |
7e1816e5 VK |
245 | p->threads->setOwner(true); |
246 | delete p->threads; | |
5d3459af | 247 | delete p->queue; |
e1415980 VK |
248 | delete p->serializationQueues; |
249 | MutexDestroy(p->serializationLock); | |
5d3459af VK |
250 | MutexDestroy(p->mutex); |
251 | free(p->name); | |
252 | free(p); | |
253 | } | |
254 | ||
255 | /** | |
256 | * Execute task as soon as possible | |
257 | */ | |
258 | void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunction f, void *arg) | |
259 | { | |
7e1816e5 | 260 | if (InterlockedIncrement(&p->activeRequests) > p->threads->size()) |
5d3459af VK |
261 | { |
262 | bool started = false; | |
263 | MutexLock(p->mutex); | |
7e1816e5 | 264 | if (p->threads->size() < p->maxThreads) |
5d3459af | 265 | { |
7e1816e5 VK |
266 | WorkerThreadInfo *wt = new WorkerThreadInfo; |
267 | wt->pool = p; | |
268 | wt->handle = ThreadCreateEx(WorkerThread, 0, wt); | |
269 | p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt); | |
270 | started = true; | |
5d3459af VK |
271 | } |
272 | MutexUnlock(p->mutex); | |
273 | if (started) | |
2df047f4 | 274 | nxlog_debug(3, _T("New thread started in thread pool %s"), p->name); |
5d3459af VK |
275 | } |
276 | ||
277 | WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest)); | |
278 | rq->func = f; | |
279 | rq->arg = arg; | |
19dbc8ef | 280 | p->queue->put(rq); |
5d3459af VK |
281 | } |
282 | ||
e1415980 VK |
283 | /** |
284 | * Request serialization data | |
285 | */ | |
286 | struct RequestSerializationData | |
287 | { | |
288 | TCHAR *key; | |
289 | ThreadPool *pool; | |
290 | Queue *queue; | |
291 | }; | |
292 | ||
293 | /** | |
294 | * Worker function to process serialized requests | |
295 | */ | |
296 | static void ProcessSerializedRequests(void *arg) | |
297 | { | |
298 | RequestSerializationData *data = (RequestSerializationData *)arg; | |
299 | while(true) | |
300 | { | |
301 | MutexLock(data->pool->serializationLock); | |
302 | WorkRequest *rq = (WorkRequest *)data->queue->get(); | |
303 | if (rq == NULL) | |
304 | { | |
305 | data->pool->serializationQueues->remove(data->key); | |
306 | MutexUnlock(data->pool->serializationLock); | |
307 | break; | |
308 | } | |
309 | MutexUnlock(data->pool->serializationLock); | |
c1bc623e VK |
310 | |
311 | rq->func(rq->arg); | |
312 | free(rq); | |
e1415980 VK |
313 | } |
314 | free(data->key); | |
315 | delete data; | |
316 | } | |
317 | ||
318 | /** | |
319 | * Execute task serialized (not before previous task with same key ends) | |
320 | */ | |
321 | void LIBNETXMS_EXPORTABLE ThreadPoolExecuteSerialized(ThreadPool *p, const TCHAR *key, ThreadPoolWorkerFunction f, void *arg) | |
322 | { | |
323 | MutexLock(p->serializationLock); | |
324 | ||
325 | Queue *q = p->serializationQueues->get(key); | |
326 | if (q == NULL) | |
327 | { | |
328 | q = new Queue(8, 8); | |
329 | p->serializationQueues->set(key, q); | |
330 | ||
331 | RequestSerializationData *data = new RequestSerializationData; | |
332 | data->key = _tcsdup(key); | |
333 | data->pool = p; | |
334 | data->queue = q; | |
335 | ThreadPoolExecute(p, ProcessSerializedRequests, data); | |
336 | } | |
337 | ||
338 | WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest)); | |
339 | rq->func = f; | |
340 | rq->arg = arg; | |
341 | q->put(rq); | |
342 | ||
343 | MutexUnlock(p->serializationLock); | |
344 | } | |
345 | ||
5d3459af VK |
346 | /** |
347 | * Schedule task for execution using absolute time | |
348 | */ | |
349 | void LIBNETXMS_EXPORTABLE ThreadPoolScheduleAbsolute(ThreadPool *p, time_t runTime, ThreadPoolWorkerFunction *f, void *arg) | |
350 | { | |
351 | } | |
352 | ||
353 | /** | |
354 | * Schedule task for execution using relative time | |
355 | */ | |
356 | void LIBNETXMS_EXPORTABLE ThreadPoolScheduleRelative(ThreadPool *p, UINT32 delay, ThreadPoolWorkerFunction *f, void *arg) | |
357 | { | |
358 | } | |
359 | ||
360 | /** | |
361 | * Get pool information | |
362 | */ | |
363 | void LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(ThreadPool *p, ThreadPoolInfo *info) | |
364 | { | |
7e1816e5 | 365 | MutexLock(p->mutex); |
5d3459af VK |
366 | info->name = p->name; |
367 | info->minThreads = p->minThreads; | |
368 | info->maxThreads = p->maxThreads; | |
7e1816e5 | 369 | info->curThreads = p->threads->size(); |
5d3459af VK |
370 | info->activeRequests = p->activeRequests; |
371 | info->load = info->activeRequests * 100 / info->curThreads; | |
372 | info->usage = info->curThreads * 100 / info->maxThreads; | |
7e1816e5 VK |
373 | info->loadAvg[0] = (double)p->loadAverage[0] / FP_1; |
374 | info->loadAvg[1] = (double)p->loadAverage[1] / FP_1; | |
375 | info->loadAvg[2] = (double)p->loadAverage[2] / FP_1; | |
376 | MutexUnlock(p->mutex); | |
5d3459af | 377 | } |
374afd7b VK |
378 | |
379 | /** | |
380 | * Get pool information by name | |
381 | */ | |
382 | bool LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(const TCHAR *name, ThreadPoolInfo *info) | |
383 | { | |
384 | MutexLock(s_registryLock); | |
385 | ThreadPool *p = s_registry.get(name); | |
386 | if (p != NULL) | |
387 | ThreadPoolGetInfo(p, info); | |
388 | MutexUnlock(s_registryLock); | |
389 | return p != NULL; | |
390 | } | |
391 | ||
392 | /** | |
393 | * Get all thread pool names | |
394 | */ | |
395 | StringList LIBNETXMS_EXPORTABLE *ThreadPoolGetAllPools() | |
396 | { | |
397 | MutexLock(s_registryLock); | |
398 | StringList *list = s_registry.keys(); | |
399 | MutexUnlock(s_registryLock); | |
400 | return list; | |
401 | } |