2c2b8b8afc10c363539c02ff5ba0abcb41cc13f1
[public/netxms.git] / src / db / libnxdb / dbcp.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Database Abstraction Library
4 ** Copyright (C) 2008-2015 Raden Solutions
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: dbcp.cpp
21 **
22 **/
23
24 #include "libnxdb.h"
25
26 static bool s_initialized = false;
27 static DB_DRIVER m_driver;
28 static TCHAR m_server[256];
29 static TCHAR m_login[256];
30 static TCHAR m_password[256];
31 static TCHAR m_dbName[256];
32 static TCHAR m_schema[256];
33
34 static int m_basePoolSize;
35 static int m_maxPoolSize;
36 static int m_cooldownTime;
37 static int m_connectionTTL;
38
39 static MUTEX m_poolAccessMutex = INVALID_MUTEX_HANDLE;
40 static ObjectArray<PoolConnectionInfo> m_connections;
41 static THREAD m_maintThread = INVALID_THREAD_HANDLE;
42 static CONDITION m_condShutdown = INVALID_CONDITION_HANDLE;
43 static CONDITION m_condRelease = INVALID_CONDITION_HANDLE;
44
45 /**
46 * Create connections on pool initialization
47 */
48 static bool DBConnectionPoolPopulate()
49 {
50 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
51 bool success = false;
52
53 MutexLock(m_poolAccessMutex);
54 for(int i = 0; i < m_basePoolSize; i++)
55 {
56 PoolConnectionInfo *conn = new PoolConnectionInfo;
57 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
58 if (conn->handle != NULL)
59 {
60 conn->inUse = false;
61 conn->connectTime = time(NULL);
62 conn->lastAccessTime = conn->connectTime;
63 conn->usageCount = 0;
64 conn->srcFile[0] = 0;
65 conn->srcLine = 0;
66 m_connections.add(conn);
67 success = true;
68 }
69 else
70 {
71 __DBDbgPrintf(3, _T("Database Connection Pool: cannot create DB connection %d (%s)"), i, errorText);
72 delete conn;
73 }
74 }
75 MutexUnlock(m_poolAccessMutex);
76 return success;
77 }
78
79 /**
80 * Shrink connection pool up to base size when possible
81 */
82 static void DBConnectionPoolShrink()
83 {
84 MutexLock(m_poolAccessMutex);
85
86 time_t now = time(NULL);
87 for(int i = m_basePoolSize; i < m_connections.size(); i++)
88 {
89 PoolConnectionInfo *conn = m_connections.get(i);
90 if (!conn->inUse && (now - conn->lastAccessTime > m_cooldownTime))
91 {
92 DBDisconnect(conn->handle);
93 m_connections.remove(i);
94 i--;
95 }
96 }
97
98 MutexUnlock(m_poolAccessMutex);
99 }
100
101 /*
102 * Reset connection
103 */
104 static bool ResetConnection(PoolConnectionInfo *conn)
105 {
106 time_t now = time(NULL);
107 DBDisconnect(conn->handle);
108
109 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
110 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
111 if (conn->handle != NULL)
112 {
113 conn->connectTime = now;
114 conn->lastAccessTime = now;
115 conn->usageCount = 0;
116
117 __DBDbgPrintf(3, _T("Database Connection Pool: connection %p reconnected"), conn->handle);
118 return true;
119 }
120 else
121 {
122 __DBDbgPrintf(3, _T("Database Connection Pool: connection %p reconnect failure (%s)"), conn->handle, errorText);
123 return false;
124 }
125 }
126
127 /**
128 * Callback for sorting reset list
129 */
130 static int ResetListSortCallback(const void *e1, const void *e2)
131 {
132 return ((PoolConnectionInfo *)e1)->usageCount > ((PoolConnectionInfo *)e2)->usageCount ? -1 :
133 (((PoolConnectionInfo *)e1)->usageCount == ((PoolConnectionInfo *)e2)->usageCount ? 0 : 1);
134 }
135
136 /**
137 * Reset expired connections
138 */
139 static void ResetExpiredConnections()
140 {
141 time_t now = time(NULL);
142
143 MutexLock(m_poolAccessMutex);
144
145 int i, availCount = 0;
146 ObjectArray<PoolConnectionInfo> reconnList(m_connections.size(), 16, false);
147 for(i = 0; i < m_connections.size(); i++)
148 {
149 PoolConnectionInfo *conn = m_connections.get(i);
150 if (!conn->inUse)
151 {
152 availCount++;
153 if (now - conn->connectTime > m_connectionTTL)
154 {
155 reconnList.add(conn);
156 }
157 }
158 }
159
160 int count = min(availCount / 2 + 1, reconnList.size()); // reset no more than 50% of available connections
161 if (count < reconnList.size())
162 {
163 reconnList.sort(ResetListSortCallback);
164 while(reconnList.size() > count)
165 reconnList.remove(count);
166 }
167
168 for(i = 0; i < count; i++)
169 reconnList.get(i)->inUse = true;
170 MutexUnlock(m_poolAccessMutex);
171
172 // do reconnects
173 for(i = 0; i < count; i++)
174 {
175 PoolConnectionInfo *conn = reconnList.get(i);
176 bool success = ResetConnection(conn);
177 MutexLock(m_poolAccessMutex);
178 if (success)
179 {
180 conn->inUse = false;
181 }
182 else
183 {
184 m_connections.remove(conn);
185 }
186 MutexUnlock(m_poolAccessMutex);
187 }
188 }
189
190 /**
191 * Pool maintenance thread
192 */
193 static THREAD_RESULT THREAD_CALL MaintenanceThread(void *arg)
194 {
195 __DBDbgPrintf(1, _T("Database Connection Pool maintenance thread started"));
196
197 while(!ConditionWait(m_condShutdown, (m_connectionTTL > 0) ? m_connectionTTL * 750 : 300000))
198 {
199 DBConnectionPoolShrink();
200 if (m_connectionTTL > 0)
201 {
202 ResetExpiredConnections();
203 }
204 }
205
206 __DBDbgPrintf(1, _T("Database Connection Pool maintenance thread stopped"));
207 return THREAD_OK;
208 }
209
210 /**
211 * Start connection pool
212 */
213 bool LIBNXDB_EXPORTABLE DBConnectionPoolStartup(DB_DRIVER driver, const TCHAR *server, const TCHAR *dbName,
214 const TCHAR *login, const TCHAR *password, const TCHAR *schema,
215 int basePoolSize, int maxPoolSize, int cooldownTime,
216 int connTTL)
217 {
218 if (s_initialized)
219 return true; // already initialized
220
221 m_driver = driver;
222 nx_strncpy(m_server, CHECK_NULL_EX(server), 256);
223 nx_strncpy(m_dbName, CHECK_NULL_EX(dbName), 256);
224 nx_strncpy(m_login, CHECK_NULL_EX(login), 256);
225 nx_strncpy(m_password, CHECK_NULL_EX(password), 256);
226 nx_strncpy(m_schema, CHECK_NULL_EX(schema), 256);
227
228 m_basePoolSize = basePoolSize;
229 m_maxPoolSize = maxPoolSize;
230 m_cooldownTime = cooldownTime;
231 m_connectionTTL = connTTL;
232
233 m_poolAccessMutex = MutexCreate();
234 m_connections.setOwner(true);
235 m_condShutdown = ConditionCreate(TRUE);
236 m_condRelease = ConditionCreate(FALSE);
237
238 if (!DBConnectionPoolPopulate())
239 {
240 // cannot open at least one connection
241 ConditionDestroy(m_condShutdown);
242 ConditionDestroy(m_condRelease);
243 MutexDestroy(m_poolAccessMutex);
244 return false;
245 }
246
247 m_maintThread = ThreadCreateEx(MaintenanceThread, 0, NULL);
248
249 s_initialized = true;
250 __DBDbgPrintf(1, _T("Database Connection Pool initialized"));
251
252 return true;
253 }
254
255 /**
256 * Shutdown connection pool
257 */
258 void LIBNXDB_EXPORTABLE DBConnectionPoolShutdown()
259 {
260 if (!s_initialized)
261 return;
262
263 ConditionSet(m_condShutdown);
264 ThreadJoin(m_maintThread);
265
266 ConditionDestroy(m_condShutdown);
267 ConditionDestroy(m_condRelease);
268 MutexDestroy(m_poolAccessMutex);
269
270 for(int i = 0; i < m_connections.size(); i++)
271 {
272 DBDisconnect(m_connections.get(i)->handle);
273 }
274
275 m_connections.clear();
276
277 s_initialized = false;
278 __DBDbgPrintf(1, _T("Database Connection Pool terminated"));
279
280 }
281
282 /**
283 * Acquire connection from pool. This function never fails - if it's impossible to acquire
284 * pooled connection, calling thread will be suspended until there will be connection available.
285 */
286 DB_HANDLE LIBNXDB_EXPORTABLE __DBConnectionPoolAcquireConnection(const char *srcFile, int srcLine)
287 {
288 retry:
289 MutexLock(m_poolAccessMutex);
290
291 DB_HANDLE handle = NULL;
292
293 // find less used connection
294 UINT32 count = 0xFFFFFFFF;
295 int index = -1;
296 for(int i = 0; (i < m_connections.size()) && (count > 0); i++)
297 {
298 PoolConnectionInfo *conn = m_connections.get(i);
299 if (!conn->inUse && (conn->usageCount < count))
300 {
301 count = conn->usageCount;
302 index = i;
303 }
304 }
305
306 if (index > -1)
307 {
308 PoolConnectionInfo *conn = m_connections.get(index);
309 handle = conn->handle;
310 conn->inUse = true;
311 conn->lastAccessTime = time(NULL);
312 conn->usageCount++;
313 strncpy(conn->srcFile, srcFile, 128);
314 conn->srcLine = srcLine;
315 }
316 else if (m_connections.size() < m_maxPoolSize)
317 {
318 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
319 PoolConnectionInfo *conn = new PoolConnectionInfo;
320 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
321 if (conn->handle != NULL)
322 {
323 conn->inUse = true;
324 conn->connectTime = time(NULL);
325 conn->lastAccessTime = conn->connectTime;
326 conn->usageCount = 0;
327 strncpy(conn->srcFile, srcFile, 128);
328 conn->srcLine = srcLine;
329 m_connections.add(conn);
330 handle = conn->handle;
331 }
332 else
333 {
334 __DBDbgPrintf(3, _T("Database Connection Pool: cannot create additional DB connection (%s)"), errorText);
335 delete conn;
336 }
337 }
338
339 MutexUnlock(m_poolAccessMutex);
340
341 if (handle == NULL)
342 {
343 __DBDbgPrintf(1, _T("Database Connection Pool exhausted (call from %hs:%d)"), srcFile, srcLine);
344 ConditionWait(m_condRelease, 10000);
345 __DBDbgPrintf(5, _T("Database Connection Pool: retry acquire connection (call from %hs:%d)"), srcFile, srcLine);
346 goto retry;
347 }
348
349 __DBDbgPrintf(7, _T("Database Connection Pool: handle %p acquired (call from %hs:%d)"), handle, srcFile, srcLine);
350 return handle;
351 }
352
353 /**
354 * Release acquired connection
355 */
356 void LIBNXDB_EXPORTABLE DBConnectionPoolReleaseConnection(DB_HANDLE handle)
357 {
358 MutexLock(m_poolAccessMutex);
359
360 for(int i = 0; i < m_connections.size(); i++)
361 {
362 PoolConnectionInfo *conn = m_connections.get(i);
363 if (conn->handle == handle)
364 {
365 conn->inUse = false;
366 conn->lastAccessTime = time(NULL);
367 conn->srcFile[0] = 0;
368 conn->srcLine = 0;
369 break;
370 }
371 }
372
373 MutexUnlock(m_poolAccessMutex);
374
375 __DBDbgPrintf(7, _T("Database Connection Pool: handle %p released"), handle);
376 ConditionPulse(m_condRelease);
377 }
378
379 /**
380 * Get current size of DB connection pool
381 */
382 int LIBNXDB_EXPORTABLE DBConnectionPoolGetSize()
383 {
384 MutexLock(m_poolAccessMutex);
385 int size = m_connections.size();
386 MutexUnlock(m_poolAccessMutex);
387 return size;
388 }
389
390 /**
391 * Get number of acquired connections in DB connection pool
392 */
393 int LIBNXDB_EXPORTABLE DBConnectionPoolGetAcquiredCount()
394 {
395 int count = 0;
396 MutexLock(m_poolAccessMutex);
397 for(int i = 0; i < m_connections.size(); i++)
398 if (m_connections.get(i)->inUse)
399 count++;
400 MutexUnlock(m_poolAccessMutex);
401 return count;
402 }
403
404 /**
405 * Get copy of active DB connections.
406 * Returned list must be deleted by the caller.
407 */
408 ObjectArray<PoolConnectionInfo> LIBNXDB_EXPORTABLE *DBConnectionPoolGetConnectionList()
409 {
410 ObjectArray<PoolConnectionInfo> *list = new ObjectArray<PoolConnectionInfo>(32, 32, true);
411 MutexLock(m_poolAccessMutex);
412 for(int i = 0; i < m_connections.size(); i++)
413 if (m_connections.get(i)->inUse)
414 {
415 list->add((PoolConnectionInfo *)nx_memdup(m_connections.get(i), sizeof(PoolConnectionInfo)));
416 }
417 MutexUnlock(m_poolAccessMutex);
418 return list;
419 }