0850fc313a48841faf78c9a83b934d853404b198
[public/netxms.git] / src / libnetxms / tp.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** NetXMS Foundation Library
4 ** Copyright (C) 2003-2016 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published
8 ** by the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: tp.cpp
21 **
22 **/
23
24 #include "libnetxms.h"
25 #include <nxqueue.h>
26
27 /**
28 * Load average calculation
29 */
30 #define FP_SHIFT 11 /* nr of bits of precision */
31 #define FP_1 (1 << FP_SHIFT) /* 1.0 as fixed-point */
32 #define EXP_1 1884 /* 1/exp(5sec/1min) as fixed-point */
33 #define EXP_5 2014 /* 1/exp(5sec/5min) */
34 #define EXP_15 2037 /* 1/exp(5sec/15min) */
35 #define CALC_LOAD(load, exp, n) do { load *= exp; load += n * (FP_1 - exp); load >>= FP_SHIFT; } while(0)
36
37 /**
38 * Worker thread idle timeout in milliseconds
39 */
40 #define THREAD_IDLE_TIMEOUT 300000
41
42 /**
43 * Worker thread data
44 */
45 struct WorkerThreadInfo
46 {
47 ThreadPool *pool;
48 THREAD handle;
49 };
50
51 /**
52 * Thread pool
53 */
54 struct ThreadPool
55 {
56 int minThreads;
57 int maxThreads;
58 VolatileCounter activeRequests;
59 MUTEX mutex;
60 THREAD maintThread;
61 CONDITION maintThreadStop;
62 HashMap<UINT64, WorkerThreadInfo> *threads;
63 Queue *queue;
64 StringObjectMap<Queue> *serializationQueues;
65 MUTEX serializationLock;
66 TCHAR *name;
67 bool shutdownMode;
68 INT32 loadAverage[3];
69 };
70
71 /**
72 * Thread pool registry
73 */
74 static StringObjectMap<ThreadPool> s_registry(false);
75 static MUTEX s_registryLock = MutexCreate();
76
77 /**
78 * Thread work request
79 */
80 struct WorkRequest
81 {
82 ThreadPoolWorkerFunction func;
83 void *arg;
84 bool inactivityStop;
85 };
86
87 /**
88 * Worker function to join stopped thread
89 */
90 static void JoinWorkerThread(void *arg)
91 {
92 ThreadJoin(((WorkerThreadInfo *)arg)->handle);
93 delete (WorkerThreadInfo *)arg;
94 }
95
96 /**
97 * Worker thread function
98 */
99 static THREAD_RESULT THREAD_CALL WorkerThread(void *arg)
100 {
101 ThreadPool *p = ((WorkerThreadInfo *)arg)->pool;
102 Queue *q = p->queue;
103 while(true)
104 {
105 WorkRequest *rq = (WorkRequest *)q->getOrBlock();
106
107 if (rq->func == NULL) // stop indicator
108 {
109 if (rq->inactivityStop)
110 {
111 MutexLock(p->mutex);
112 p->threads->remove(CAST_FROM_POINTER(arg, UINT64));
113 MutexUnlock(p->mutex);
114
115 nxlog_debug(3, _T("Stopping worker thread in thread pool %s due to inactivity"), p->name);
116
117 free(rq);
118 rq = (WorkRequest *)malloc(sizeof(WorkRequest));
119 rq->func = JoinWorkerThread;
120 rq->arg = arg;
121 InterlockedIncrement(&p->activeRequests);
122 p->queue->put(rq);
123 }
124 break;
125 }
126
127 rq->func(rq->arg);
128 free(rq);
129 InterlockedDecrement(&p->activeRequests);
130 }
131 return THREAD_OK;
132 }
133
134 /**
135 * Thread pool maintenance thread
136 */
137 static THREAD_RESULT THREAD_CALL MaintenanceThread(void *arg)
138 {
139 ThreadPool *p = (ThreadPool *)arg;
140 int count = 0;
141 while(!ConditionWait(p->maintThreadStop, 5000))
142 {
143 INT32 requestCount = (INT32)p->activeRequests << FP_SHIFT;
144 CALC_LOAD(p->loadAverage[0], EXP_1, requestCount);
145 CALC_LOAD(p->loadAverage[1], EXP_5, requestCount);
146 CALC_LOAD(p->loadAverage[2], EXP_15, requestCount);
147
148 count++;
149 if (count == 12) // do pool check once per minute
150 {
151 MutexLock(p->mutex);
152 INT32 threadCount = p->threads->size();
153 MutexUnlock(p->mutex);
154 if ((threadCount > p->minThreads) && (p->loadAverage[1] < 1024 * threadCount)) // 5 minutes load average < 0.5 * thread count
155 {
156 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
157 rq->func = NULL;
158 rq->arg = NULL;
159 rq->inactivityStop = true;
160 p->queue->put(rq);
161 }
162 count = 0;
163 }
164 }
165 nxlog_debug(3, _T("Maintenance thread for thread pool %s stopped"), p->name);
166 return THREAD_OK;
167 }
168
169 /**
170 * Create thread pool
171 */
172 ThreadPool LIBNETXMS_EXPORTABLE *ThreadPoolCreate(int minThreads, int maxThreads, const TCHAR *name)
173 {
174 ThreadPool *p = (ThreadPool *)malloc(sizeof(ThreadPool));
175 p->minThreads = minThreads;
176 p->maxThreads = maxThreads;
177 p->activeRequests = 0;
178 p->threads = new HashMap<UINT64, WorkerThreadInfo>();
179 p->queue = new Queue(64, 64);
180 p->mutex = MutexCreate();
181 p->maintThreadStop = ConditionCreate(TRUE);
182 p->serializationQueues = new StringObjectMap<Queue>(true);
183 p->serializationQueues->setIgnoreCase(false);
184 p->serializationLock = MutexCreate();
185 p->name = (name != NULL) ? _tcsdup(name) : _tcsdup(_T("NONAME"));
186 p->shutdownMode = false;
187 p->loadAverage[0] = 0;
188 p->loadAverage[1] = 0;
189 p->loadAverage[2] = 0;
190
191 for(int i = 0; i < p->minThreads; i++)
192 {
193 WorkerThreadInfo *wt = new WorkerThreadInfo;
194 wt->pool = p;
195 wt->handle = ThreadCreateEx(WorkerThread, 0, wt);
196 p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt);
197 }
198
199 p->maintThread = ThreadCreateEx(MaintenanceThread, 0, p);
200
201 MutexLock(s_registryLock);
202 s_registry.set(p->name, p);
203 MutexUnlock(s_registryLock);
204
205 nxlog_debug(1, _T("Thread pool %s initialized (min=%d, max=%d)"), p->name, p->minThreads, p->maxThreads);
206 return p;
207 }
208
209 /**
210 * Callback for joining all worker threads on pool destruction
211 */
212 static EnumerationCallbackResult ThreadPoolDestroyCallback(const void *key, const void *object, void *arg)
213 {
214 ThreadJoin(((WorkerThreadInfo *)object)->handle);
215 return _CONTINUE;
216 }
217
218 /**
219 * Destroy thread pool
220 */
221 void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p)
222 {
223 nxlog_debug(3, _T("Stopping threads in thread pool %s"), p->name);
224
225 MutexLock(s_registryLock);
226 s_registry.remove(p->name);
227 MutexUnlock(s_registryLock);
228
229 MutexLock(p->mutex);
230 p->shutdownMode = true;
231 MutexUnlock(p->mutex);
232
233 ConditionSet(p->maintThreadStop);
234 ThreadJoin(p->maintThread);
235
236 WorkRequest rq;
237 rq.func = NULL;
238 rq.inactivityStop = false;
239 for(int i = 0; i < p->threads->size(); i++)
240 p->queue->put(&rq);
241 p->threads->forEach(ThreadPoolDestroyCallback, NULL);
242
243 nxlog_debug(1, _T("Thread pool %s destroyed"), p->name);
244 p->threads->setOwner(true);
245 delete p->threads;
246 delete p->queue;
247 delete p->serializationQueues;
248 MutexDestroy(p->serializationLock);
249 MutexDestroy(p->mutex);
250 free(p->name);
251 free(p);
252 }
253
254 /**
255 * Execute task as soon as possible
256 */
257 void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunction f, void *arg)
258 {
259 if (InterlockedIncrement(&p->activeRequests) > p->threads->size())
260 {
261 bool started = false;
262 MutexLock(p->mutex);
263 if (p->threads->size() < p->maxThreads)
264 {
265 WorkerThreadInfo *wt = new WorkerThreadInfo;
266 wt->pool = p;
267 wt->handle = ThreadCreateEx(WorkerThread, 0, wt);
268 p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt);
269 started = true;
270 }
271 MutexUnlock(p->mutex);
272 if (started)
273 nxlog_debug(3, _T("New thread started in thread pool %s"), p->name);
274 }
275
276 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
277 rq->func = f;
278 rq->arg = arg;
279 p->queue->put(rq);
280 }
281
282 /**
283 * Request serialization data
284 */
285 struct RequestSerializationData
286 {
287 TCHAR *key;
288 ThreadPool *pool;
289 Queue *queue;
290 };
291
292 /**
293 * Worker function to process serialized requests
294 */
295 static void ProcessSerializedRequests(void *arg)
296 {
297 RequestSerializationData *data = (RequestSerializationData *)arg;
298 while(true)
299 {
300 MutexLock(data->pool->serializationLock);
301 WorkRequest *rq = (WorkRequest *)data->queue->get();
302 if (rq == NULL)
303 {
304 data->pool->serializationQueues->remove(data->key);
305 MutexUnlock(data->pool->serializationLock);
306 break;
307 }
308 MutexUnlock(data->pool->serializationLock);
309
310 rq->func(rq->arg);
311 free(rq);
312 }
313 free(data->key);
314 delete data;
315 }
316
317 /**
318 * Execute task serialized (not before previous task with same key ends)
319 */
320 void LIBNETXMS_EXPORTABLE ThreadPoolExecuteSerialized(ThreadPool *p, const TCHAR *key, ThreadPoolWorkerFunction f, void *arg)
321 {
322 MutexLock(p->serializationLock);
323
324 Queue *q = p->serializationQueues->get(key);
325 if (q == NULL)
326 {
327 q = new Queue(8, 8);
328 p->serializationQueues->set(key, q);
329
330 RequestSerializationData *data = new RequestSerializationData;
331 data->key = _tcsdup(key);
332 data->pool = p;
333 data->queue = q;
334 ThreadPoolExecute(p, ProcessSerializedRequests, data);
335 }
336
337 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
338 rq->func = f;
339 rq->arg = arg;
340 q->put(rq);
341
342 MutexUnlock(p->serializationLock);
343 }
344
345 /**
346 * Schedule task for execution using absolute time
347 */
348 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleAbsolute(ThreadPool *p, time_t runTime, ThreadPoolWorkerFunction *f, void *arg)
349 {
350 }
351
352 /**
353 * Schedule task for execution using relative time
354 */
355 void LIBNETXMS_EXPORTABLE ThreadPoolScheduleRelative(ThreadPool *p, UINT32 delay, ThreadPoolWorkerFunction *f, void *arg)
356 {
357 }
358
359 /**
360 * Get pool information
361 */
362 void LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(ThreadPool *p, ThreadPoolInfo *info)
363 {
364 MutexLock(p->mutex);
365 info->name = p->name;
366 info->minThreads = p->minThreads;
367 info->maxThreads = p->maxThreads;
368 info->curThreads = p->threads->size();
369 info->activeRequests = p->activeRequests;
370 info->load = info->activeRequests * 100 / info->curThreads;
371 info->usage = info->curThreads * 100 / info->maxThreads;
372 info->loadAvg[0] = (double)p->loadAverage[0] / FP_1;
373 info->loadAvg[1] = (double)p->loadAverage[1] / FP_1;
374 info->loadAvg[2] = (double)p->loadAverage[2] / FP_1;
375 MutexUnlock(p->mutex);
376 }
377
378 /**
379 * Get pool information by name
380 */
381 bool LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(const TCHAR *name, ThreadPoolInfo *info)
382 {
383 MutexLock(s_registryLock);
384 ThreadPool *p = s_registry.get(name);
385 if (p != NULL)
386 ThreadPoolGetInfo(p, info);
387 MutexUnlock(s_registryLock);
388 return p != NULL;
389 }
390
391 /**
392 * Get all thread pool names
393 */
394 StringList LIBNETXMS_EXPORTABLE *ThreadPoolGetAllPools()
395 {
396 MutexLock(s_registryLock);
397 StringList *list = s_registry.keys();
398 MutexUnlock(s_registryLock);
399 return list;
400 }