fixed memory leak in thread pools
[public/netxms.git] / src / libnetxms / tp.cpp
CommitLineData
5d3459af
VK
1/*
2** NetXMS - Network Management System
3** NetXMS Foundation Library
eb077f61 4** Copyright (C) 2003-2016 Victor Kirhenshtein
5d3459af
VK
5**
6** This program is free software; you can redistribute it and/or modify
7** it under the terms of the GNU Lesser General Public License as published
8** by the Free Software Foundation; either version 3 of the License, or
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
16** You should have received a copy of the GNU Lesser General Public License
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19**
20** File: tp.cpp
21**
22**/
23
24#include "libnetxms.h"
25#include <nxqueue.h>
26
7e1816e5
VK
27/**
28 * Load average calculation
29 */
30#define FP_SHIFT 11 /* nr of bits of precision */
31#define FP_1 (1 << FP_SHIFT) /* 1.0 as fixed-point */
32#define EXP_1 1884 /* 1/exp(5sec/1min) as fixed-point */
33#define EXP_5 2014 /* 1/exp(5sec/5min) */
34#define EXP_15 2037 /* 1/exp(5sec/15min) */
35#define CALC_LOAD(load, exp, n) do { load *= exp; load += n * (FP_1 - exp); load >>= FP_SHIFT; } while(0)
36
37/**
38 * Worker thread idle timeout in milliseconds
39 */
40#define THREAD_IDLE_TIMEOUT 300000
41
42/**
43 * Worker thread data
44 */
45struct WorkerThreadInfo
46{
47 ThreadPool *pool;
48 THREAD handle;
49};
50
5d3459af
VK
51/**
52 * Thread pool
53 */
54struct ThreadPool
55{
56 int minThreads;
57 int maxThreads;
5d3459af
VK
58 VolatileCounter activeRequests;
59 MUTEX mutex;
7e1816e5
VK
60 THREAD maintThread;
61 CONDITION maintThreadStop;
62 HashMap<UINT64, WorkerThreadInfo> *threads;
5d3459af 63 Queue *queue;
e1415980
VK
64 StringObjectMap<Queue> *serializationQueues;
65 MUTEX serializationLock;
5d3459af 66 TCHAR *name;
7e1816e5
VK
67 bool shutdownMode;
68 INT32 loadAverage[3];
5d3459af
VK
69};
70
374afd7b
VK
71/**
72 * Thread pool registry
73 */
74static StringObjectMap<ThreadPool> s_registry(false);
75static MUTEX s_registryLock = MutexCreate();
76
5d3459af
VK
77/**
78 * Thread work request
79 */
80struct WorkRequest
81{
82 ThreadPoolWorkerFunction func;
83 void *arg;
7e1816e5 84 bool inactivityStop;
5d3459af
VK
85};
86
7e1816e5
VK
87/**
88 * Worker function to join stopped thread
89 */
90static void JoinWorkerThread(void *arg)
91{
92 ThreadJoin(((WorkerThreadInfo *)arg)->handle);
93 delete (WorkerThreadInfo *)arg;
94}
95
5d3459af
VK
96/**
97 * Worker thread function
98 */
99static THREAD_RESULT THREAD_CALL WorkerThread(void *arg)
100{
7e1816e5
VK
101 ThreadPool *p = ((WorkerThreadInfo *)arg)->pool;
102 Queue *q = p->queue;
5d3459af
VK
103 while(true)
104 {
19dbc8ef 105 WorkRequest *rq = (WorkRequest *)q->getOrBlock();
7e1816e5
VK
106
107 if (rq->func == NULL) // stop indicator
108 {
109 if (rq->inactivityStop)
110 {
111 MutexLock(p->mutex);
112 p->threads->remove(CAST_FROM_POINTER(arg, UINT64));
113 MutexUnlock(p->mutex);
114
2df047f4 115 nxlog_debug(3, _T("Stopping worker thread in thread pool %s due to inactivity"), p->name);
7e1816e5
VK
116
117 free(rq);
118 rq = (WorkRequest *)malloc(sizeof(WorkRequest));
119 rq->func = JoinWorkerThread;
120 rq->arg = arg;
121 InterlockedIncrement(&p->activeRequests);
122 p->queue->put(rq);
123 }
124 break;
125 }
126
5d3459af
VK
127 rq->func(rq->arg);
128 free(rq);
7e1816e5 129 InterlockedDecrement(&p->activeRequests);
5d3459af
VK
130 }
131 return THREAD_OK;
132}
133
7e1816e5
VK
134/**
135 * Thread pool maintenance thread
136 */
137static THREAD_RESULT THREAD_CALL MaintenanceThread(void *arg)
138{
139 ThreadPool *p = (ThreadPool *)arg;
140 int count = 0;
141 while(!ConditionWait(p->maintThreadStop, 5000))
142 {
143 INT32 requestCount = (INT32)p->activeRequests << FP_SHIFT;
144 CALC_LOAD(p->loadAverage[0], EXP_1, requestCount);
145 CALC_LOAD(p->loadAverage[1], EXP_5, requestCount);
146 CALC_LOAD(p->loadAverage[2], EXP_15, requestCount);
147
148 count++;
149 if (count == 12) // do pool check once per minute
150 {
151 MutexLock(p->mutex);
152 INT32 threadCount = p->threads->size();
153 MutexUnlock(p->mutex);
154 if ((threadCount > p->minThreads) && (p->loadAverage[1] < 1024 * threadCount)) // 5 minutes load average < 0.5 * thread count
155 {
156 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
157 rq->func = NULL;
158 rq->arg = NULL;
159 rq->inactivityStop = true;
160 p->queue->put(rq);
161 }
162 count = 0;
163 }
164 }
2df047f4 165 nxlog_debug(3, _T("Maintenance thread for thread pool %s stopped"), p->name);
7e1816e5
VK
166 return THREAD_OK;
167}
168
5d3459af
VK
169/**
170 * Create thread pool
171 */
172ThreadPool LIBNETXMS_EXPORTABLE *ThreadPoolCreate(int minThreads, int maxThreads, const TCHAR *name)
173{
174 ThreadPool *p = (ThreadPool *)malloc(sizeof(ThreadPool));
175 p->minThreads = minThreads;
176 p->maxThreads = maxThreads;
5d3459af 177 p->activeRequests = 0;
7e1816e5 178 p->threads = new HashMap<UINT64, WorkerThreadInfo>();
5d3459af
VK
179 p->queue = new Queue(64, 64);
180 p->mutex = MutexCreate();
7e1816e5 181 p->maintThreadStop = ConditionCreate(TRUE);
e1415980
VK
182 p->serializationQueues = new StringObjectMap<Queue>(true);
183 p->serializationQueues->setIgnoreCase(false);
184 p->serializationLock = MutexCreate();
5d3459af 185 p->name = (name != NULL) ? _tcsdup(name) : _tcsdup(_T("NONAME"));
7e1816e5
VK
186 p->shutdownMode = false;
187 p->loadAverage[0] = 0;
188 p->loadAverage[1] = 0;
189 p->loadAverage[2] = 0;
5d3459af 190
7e1816e5
VK
191 for(int i = 0; i < p->minThreads; i++)
192 {
193 WorkerThreadInfo *wt = new WorkerThreadInfo;
194 wt->pool = p;
195 wt->handle = ThreadCreateEx(WorkerThread, 0, wt);
196 p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt);
197 }
5d3459af 198
7e1816e5 199 p->maintThread = ThreadCreateEx(MaintenanceThread, 0, p);
374afd7b
VK
200
201 MutexLock(s_registryLock);
202 s_registry.set(p->name, p);
203 MutexUnlock(s_registryLock);
204
2df047f4 205 nxlog_debug(1, _T("Thread pool %s initialized (min=%d, max=%d)"), p->name, p->minThreads, p->maxThreads);
5d3459af
VK
206 return p;
207}
208
7e1816e5
VK
209/**
210 * Callback for joining all worker threads on pool destruction
211 */
212static EnumerationCallbackResult ThreadPoolDestroyCallback(const void *key, const void *object, void *arg)
213{
214 ThreadJoin(((WorkerThreadInfo *)object)->handle);
215 return _CONTINUE;
216}
217
5d3459af
VK
218/**
219 * Destroy thread pool
220 */
221void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p)
222{
2df047f4 223 nxlog_debug(3, _T("Stopping threads in thread pool %s"), p->name);
5d3459af 224
374afd7b
VK
225 MutexLock(s_registryLock);
226 s_registry.remove(p->name);
227 MutexUnlock(s_registryLock);
228
7e1816e5
VK
229 MutexLock(p->mutex);
230 p->shutdownMode = true;
231 MutexUnlock(p->mutex);
232
233 ConditionSet(p->maintThreadStop);
234 ThreadJoin(p->maintThread);
aea481fe 235 ConditionDestroy(p->maintThreadStop);
7e1816e5 236
5d3459af
VK
237 WorkRequest rq;
238 rq.func = NULL;
7e1816e5
VK
239 rq.inactivityStop = false;
240 for(int i = 0; i < p->threads->size(); i++)
19dbc8ef 241 p->queue->put(&rq);
7e1816e5 242 p->threads->forEach(ThreadPoolDestroyCallback, NULL);
5d3459af 243
2df047f4 244 nxlog_debug(1, _T("Thread pool %s destroyed"), p->name);
7e1816e5
VK
245 p->threads->setOwner(true);
246 delete p->threads;
5d3459af 247 delete p->queue;
e1415980
VK
248 delete p->serializationQueues;
249 MutexDestroy(p->serializationLock);
5d3459af
VK
250 MutexDestroy(p->mutex);
251 free(p->name);
252 free(p);
253}
254
255/**
256 * Execute task as soon as possible
257 */
258void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunction f, void *arg)
259{
7e1816e5 260 if (InterlockedIncrement(&p->activeRequests) > p->threads->size())
5d3459af
VK
261 {
262 bool started = false;
263 MutexLock(p->mutex);
7e1816e5 264 if (p->threads->size() < p->maxThreads)
5d3459af 265 {
7e1816e5
VK
266 WorkerThreadInfo *wt = new WorkerThreadInfo;
267 wt->pool = p;
268 wt->handle = ThreadCreateEx(WorkerThread, 0, wt);
269 p->threads->set(CAST_FROM_POINTER(wt, UINT64), wt);
270 started = true;
5d3459af
VK
271 }
272 MutexUnlock(p->mutex);
273 if (started)
2df047f4 274 nxlog_debug(3, _T("New thread started in thread pool %s"), p->name);
5d3459af
VK
275 }
276
277 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
278 rq->func = f;
279 rq->arg = arg;
19dbc8ef 280 p->queue->put(rq);
5d3459af
VK
281}
282
e1415980
VK
283/**
284 * Request serialization data
285 */
286struct RequestSerializationData
287{
288 TCHAR *key;
289 ThreadPool *pool;
290 Queue *queue;
291};
292
293/**
294 * Worker function to process serialized requests
295 */
296static void ProcessSerializedRequests(void *arg)
297{
298 RequestSerializationData *data = (RequestSerializationData *)arg;
299 while(true)
300 {
301 MutexLock(data->pool->serializationLock);
302 WorkRequest *rq = (WorkRequest *)data->queue->get();
303 if (rq == NULL)
304 {
305 data->pool->serializationQueues->remove(data->key);
306 MutexUnlock(data->pool->serializationLock);
307 break;
308 }
309 MutexUnlock(data->pool->serializationLock);
c1bc623e
VK
310
311 rq->func(rq->arg);
312 free(rq);
e1415980
VK
313 }
314 free(data->key);
315 delete data;
316}
317
318/**
319 * Execute task serialized (not before previous task with same key ends)
320 */
321void LIBNETXMS_EXPORTABLE ThreadPoolExecuteSerialized(ThreadPool *p, const TCHAR *key, ThreadPoolWorkerFunction f, void *arg)
322{
323 MutexLock(p->serializationLock);
324
325 Queue *q = p->serializationQueues->get(key);
326 if (q == NULL)
327 {
328 q = new Queue(8, 8);
329 p->serializationQueues->set(key, q);
330
331 RequestSerializationData *data = new RequestSerializationData;
332 data->key = _tcsdup(key);
333 data->pool = p;
334 data->queue = q;
335 ThreadPoolExecute(p, ProcessSerializedRequests, data);
336 }
337
338 WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
339 rq->func = f;
340 rq->arg = arg;
341 q->put(rq);
342
343 MutexUnlock(p->serializationLock);
344}
345
5d3459af
VK
346/**
347 * Schedule task for execution using absolute time
348 */
349void LIBNETXMS_EXPORTABLE ThreadPoolScheduleAbsolute(ThreadPool *p, time_t runTime, ThreadPoolWorkerFunction *f, void *arg)
350{
351}
352
353/**
354 * Schedule task for execution using relative time
355 */
356void LIBNETXMS_EXPORTABLE ThreadPoolScheduleRelative(ThreadPool *p, UINT32 delay, ThreadPoolWorkerFunction *f, void *arg)
357{
358}
359
360/**
361 * Get pool information
362 */
363void LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(ThreadPool *p, ThreadPoolInfo *info)
364{
7e1816e5 365 MutexLock(p->mutex);
5d3459af
VK
366 info->name = p->name;
367 info->minThreads = p->minThreads;
368 info->maxThreads = p->maxThreads;
7e1816e5 369 info->curThreads = p->threads->size();
5d3459af
VK
370 info->activeRequests = p->activeRequests;
371 info->load = info->activeRequests * 100 / info->curThreads;
372 info->usage = info->curThreads * 100 / info->maxThreads;
7e1816e5
VK
373 info->loadAvg[0] = (double)p->loadAverage[0] / FP_1;
374 info->loadAvg[1] = (double)p->loadAverage[1] / FP_1;
375 info->loadAvg[2] = (double)p->loadAverage[2] / FP_1;
376 MutexUnlock(p->mutex);
5d3459af 377}
374afd7b
VK
378
379/**
380 * Get pool information by name
381 */
382bool LIBNETXMS_EXPORTABLE ThreadPoolGetInfo(const TCHAR *name, ThreadPoolInfo *info)
383{
384 MutexLock(s_registryLock);
385 ThreadPool *p = s_registry.get(name);
386 if (p != NULL)
387 ThreadPoolGetInfo(p, info);
388 MutexUnlock(s_registryLock);
389 return p != NULL;
390}
391
392/**
393 * Get all thread pool names
394 */
395StringList LIBNETXMS_EXPORTABLE *ThreadPoolGetAllPools()
396{
397 MutexLock(s_registryLock);
398 StringList *list = s_registry.keys();
399 MutexUnlock(s_registryLock);
400 return list;
401}