debug tags updated
[public/netxms.git] / src / db / libnxdb / dbcp.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Database Abstraction Library
4 ** Copyright (C) 2008-2015 Raden Solutions
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: dbcp.cpp
21 **
22 **/
23
24 #include "libnxdb.h"
25
26 static bool s_initialized = false;
27 static DB_DRIVER m_driver;
28 static TCHAR m_server[256];
29 static TCHAR m_login[256];
30 static TCHAR m_password[256];
31 static TCHAR m_dbName[256];
32 static TCHAR m_schema[256];
33
34 static int m_basePoolSize;
35 static int m_maxPoolSize;
36 static int m_cooldownTime;
37 static int m_connectionTTL;
38
39 static MUTEX m_poolAccessMutex = INVALID_MUTEX_HANDLE;
40 static ObjectArray<PoolConnectionInfo> m_connections;
41 static THREAD m_maintThread = INVALID_THREAD_HANDLE;
42 static CONDITION m_condShutdown = INVALID_CONDITION_HANDLE;
43 static CONDITION m_condRelease = INVALID_CONDITION_HANDLE;
44
45 #define DEBUG_TAG _T("db.cpool")
46
47 /**
48 * Create connections on pool initialization
49 */
50 static bool DBConnectionPoolPopulate()
51 {
52 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
53 bool success = false;
54
55 MutexLock(m_poolAccessMutex);
56 for(int i = 0; i < m_basePoolSize; i++)
57 {
58 PoolConnectionInfo *conn = new PoolConnectionInfo;
59 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
60 if (conn->handle != NULL)
61 {
62 conn->inUse = false;
63 conn->connectTime = time(NULL);
64 conn->lastAccessTime = conn->connectTime;
65 conn->usageCount = 0;
66 conn->srcFile[0] = 0;
67 conn->srcLine = 0;
68 m_connections.add(conn);
69 success = true;
70 }
71 else
72 {
73 nxlog_debug_tag(DEBUG_TAG, 3, _T("Cannot create DB connection %d (%s)"), i, errorText);
74 delete conn;
75 }
76 }
77 MutexUnlock(m_poolAccessMutex);
78 return success;
79 }
80
81 /**
82 * Shrink connection pool up to base size when possible
83 */
84 static void DBConnectionPoolShrink()
85 {
86 MutexLock(m_poolAccessMutex);
87
88 time_t now = time(NULL);
89 for(int i = m_basePoolSize; i < m_connections.size(); i++)
90 {
91 PoolConnectionInfo *conn = m_connections.get(i);
92 if (!conn->inUse && (now - conn->lastAccessTime > m_cooldownTime))
93 {
94 DBDisconnect(conn->handle);
95 m_connections.remove(i);
96 i--;
97 }
98 }
99
100 MutexUnlock(m_poolAccessMutex);
101 }
102
103 /*
104 * Reset connection
105 */
106 static bool ResetConnection(PoolConnectionInfo *conn)
107 {
108 time_t now = time(NULL);
109 DBDisconnect(conn->handle);
110
111 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
112 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
113 if (conn->handle != NULL)
114 {
115 conn->connectTime = now;
116 conn->lastAccessTime = now;
117 conn->usageCount = 0;
118
119 nxlog_debug_tag(DEBUG_TAG, 3, _T("Connection %p reconnected"), conn->handle);
120 return true;
121 }
122 else
123 {
124 nxlog_debug_tag(DEBUG_TAG, 3, _T("Connection %p reconnect failure (%s)"), conn->handle, errorText);
125 return false;
126 }
127 }
128
129 /**
130 * Callback for sorting reset list
131 */
132 static int ResetListSortCallback(const PoolConnectionInfo **e1, const PoolConnectionInfo **e2)
133 {
134 return (*e1)->usageCount > (*e2)->usageCount ? -1 : ((*e1)->usageCount == (*e2)->usageCount ? 0 : 1);
135 }
136
137 /**
138 * Reset expired connections
139 */
140 static void ResetExpiredConnections()
141 {
142 time_t now = time(NULL);
143
144 MutexLock(m_poolAccessMutex);
145
146 int i, availCount = 0;
147 ObjectArray<PoolConnectionInfo> reconnList(m_connections.size(), 16, false);
148 for(i = 0; i < m_connections.size(); i++)
149 {
150 PoolConnectionInfo *conn = m_connections.get(i);
151 if (!conn->inUse)
152 {
153 availCount++;
154 if (now - conn->connectTime > m_connectionTTL)
155 {
156 reconnList.add(conn);
157 }
158 }
159 }
160
161 int count = std::min(availCount / 2 + 1, reconnList.size()); // reset no more than 50% of available connections
162 if (count < reconnList.size())
163 {
164 reconnList.sort(ResetListSortCallback);
165 while(reconnList.size() > count)
166 reconnList.remove(count);
167 }
168
169 for(i = 0; i < count; i++)
170 reconnList.get(i)->inUse = true;
171 MutexUnlock(m_poolAccessMutex);
172
173 // do reconnects
174 for(i = 0; i < count; i++)
175 {
176 PoolConnectionInfo *conn = reconnList.get(i);
177 bool success = ResetConnection(conn);
178 MutexLock(m_poolAccessMutex);
179 if (success)
180 {
181 conn->inUse = false;
182 }
183 else
184 {
185 m_connections.remove(conn);
186 }
187 MutexUnlock(m_poolAccessMutex);
188 }
189 }
190
191 /**
192 * Pool maintenance thread
193 */
194 static THREAD_RESULT THREAD_CALL MaintenanceThread(void *arg)
195 {
196 ThreadSetName("DBPoolMaint");
197 nxlog_debug_tag(DEBUG_TAG, 1, _T("Database Connection Pool maintenance thread started"));
198
199 while(!ConditionWait(m_condShutdown, (m_connectionTTL > 0) ? m_connectionTTL * 750 : 300000))
200 {
201 DBConnectionPoolShrink();
202 if (m_connectionTTL > 0)
203 {
204 ResetExpiredConnections();
205 }
206 }
207
208 nxlog_debug_tag(DEBUG_TAG, 1, _T("Database Connection Pool maintenance thread stopped"));
209 return THREAD_OK;
210 }
211
212 /**
213 * Start connection pool
214 */
215 bool LIBNXDB_EXPORTABLE DBConnectionPoolStartup(DB_DRIVER driver, const TCHAR *server, const TCHAR *dbName,
216 const TCHAR *login, const TCHAR *password, const TCHAR *schema,
217 int basePoolSize, int maxPoolSize, int cooldownTime,
218 int connTTL)
219 {
220 if (s_initialized)
221 return true; // already initialized
222
223 m_driver = driver;
224 nx_strncpy(m_server, CHECK_NULL_EX(server), 256);
225 nx_strncpy(m_dbName, CHECK_NULL_EX(dbName), 256);
226 nx_strncpy(m_login, CHECK_NULL_EX(login), 256);
227 nx_strncpy(m_password, CHECK_NULL_EX(password), 256);
228 nx_strncpy(m_schema, CHECK_NULL_EX(schema), 256);
229
230 m_basePoolSize = basePoolSize;
231 m_maxPoolSize = maxPoolSize;
232 m_cooldownTime = cooldownTime;
233 m_connectionTTL = connTTL;
234
235 m_poolAccessMutex = MutexCreate();
236 m_connections.setOwner(true);
237 m_condShutdown = ConditionCreate(TRUE);
238 m_condRelease = ConditionCreate(FALSE);
239
240 if (!DBConnectionPoolPopulate())
241 {
242 // cannot open at least one connection
243 ConditionDestroy(m_condShutdown);
244 ConditionDestroy(m_condRelease);
245 MutexDestroy(m_poolAccessMutex);
246 return false;
247 }
248
249 m_maintThread = ThreadCreateEx(MaintenanceThread, 0, NULL);
250
251 s_initialized = true;
252 nxlog_debug_tag(DEBUG_TAG, 1, _T("Database Connection Pool initialized"));
253
254 return true;
255 }
256
257 /**
258 * Shutdown connection pool
259 */
260 void LIBNXDB_EXPORTABLE DBConnectionPoolShutdown()
261 {
262 if (!s_initialized)
263 return;
264
265 ConditionSet(m_condShutdown);
266 ThreadJoin(m_maintThread);
267
268 ConditionDestroy(m_condShutdown);
269 ConditionDestroy(m_condRelease);
270 MutexDestroy(m_poolAccessMutex);
271
272 for(int i = 0; i < m_connections.size(); i++)
273 {
274 DBDisconnect(m_connections.get(i)->handle);
275 }
276
277 m_connections.clear();
278
279 s_initialized = false;
280 nxlog_debug_tag(DEBUG_TAG, 1, _T("Database Connection Pool terminated"));
281 }
282
283 /**
284 * Acquire connection from pool. This function never fails - if it's impossible to acquire
285 * pooled connection, calling thread will be suspended until there will be connection available.
286 */
287 DB_HANDLE LIBNXDB_EXPORTABLE __DBConnectionPoolAcquireConnection(const char *srcFile, int srcLine)
288 {
289 retry:
290 MutexLock(m_poolAccessMutex);
291
292 DB_HANDLE handle = NULL;
293
294 // find less used connection
295 UINT32 count = 0xFFFFFFFF;
296 int index = -1;
297 for(int i = 0; (i < m_connections.size()) && (count > 0); i++)
298 {
299 PoolConnectionInfo *conn = m_connections.get(i);
300 if (!conn->inUse && (conn->usageCount < count))
301 {
302 count = conn->usageCount;
303 index = i;
304 }
305 }
306
307 if (index > -1)
308 {
309 PoolConnectionInfo *conn = m_connections.get(index);
310 handle = conn->handle;
311 conn->inUse = true;
312 conn->lastAccessTime = time(NULL);
313 conn->usageCount++;
314 strncpy(conn->srcFile, srcFile, 128);
315 conn->srcLine = srcLine;
316 }
317 else if (m_connections.size() < m_maxPoolSize)
318 {
319 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
320 PoolConnectionInfo *conn = new PoolConnectionInfo;
321 conn->handle = DBConnect(m_driver, m_server, m_dbName, m_login, m_password, m_schema, errorText);
322 if (conn->handle != NULL)
323 {
324 conn->inUse = true;
325 conn->connectTime = time(NULL);
326 conn->lastAccessTime = conn->connectTime;
327 conn->usageCount = 0;
328 strncpy(conn->srcFile, srcFile, 128);
329 conn->srcLine = srcLine;
330 m_connections.add(conn);
331 handle = conn->handle;
332 }
333 else
334 {
335 nxlog_debug_tag(DEBUG_TAG, 3, _T("Cannot create additional DB connection (%s)"), errorText);
336 delete conn;
337 }
338 }
339
340 MutexUnlock(m_poolAccessMutex);
341
342 if (handle == NULL)
343 {
344 nxlog_debug_tag(DEBUG_TAG, 1, _T("Database connection pool exhausted (call from %hs:%d)"), srcFile, srcLine);
345 ConditionWait(m_condRelease, 10000);
346 nxlog_debug_tag(DEBUG_TAG, 5, _T("Retry acquire connection (call from %hs:%d)"), srcFile, srcLine);
347 goto retry;
348 }
349
350 nxlog_debug_tag(DEBUG_TAG, 7, _T("Handle %p acquired (call from %hs:%d)"), handle, srcFile, srcLine);
351 return handle;
352 }
353
354 /**
355 * Release acquired connection
356 */
357 void LIBNXDB_EXPORTABLE DBConnectionPoolReleaseConnection(DB_HANDLE handle)
358 {
359 MutexLock(m_poolAccessMutex);
360
361 for(int i = 0; i < m_connections.size(); i++)
362 {
363 PoolConnectionInfo *conn = m_connections.get(i);
364 if (conn->handle == handle)
365 {
366 conn->inUse = false;
367 conn->lastAccessTime = time(NULL);
368 conn->srcFile[0] = 0;
369 conn->srcLine = 0;
370 break;
371 }
372 }
373
374 MutexUnlock(m_poolAccessMutex);
375
376 nxlog_debug_tag(DEBUG_TAG, 7, _T("Handle %p released"), handle);
377 ConditionPulse(m_condRelease);
378 }
379
380 /**
381 * Get current size of DB connection pool
382 */
383 int LIBNXDB_EXPORTABLE DBConnectionPoolGetSize()
384 {
385 MutexLock(m_poolAccessMutex);
386 int size = m_connections.size();
387 MutexUnlock(m_poolAccessMutex);
388 return size;
389 }
390
391 /**
392 * Get number of acquired connections in DB connection pool
393 */
394 int LIBNXDB_EXPORTABLE DBConnectionPoolGetAcquiredCount()
395 {
396 int count = 0;
397 MutexLock(m_poolAccessMutex);
398 for(int i = 0; i < m_connections.size(); i++)
399 if (m_connections.get(i)->inUse)
400 count++;
401 MutexUnlock(m_poolAccessMutex);
402 return count;
403 }
404
405 /**
406 * Get copy of active DB connections.
407 * Returned list must be deleted by the caller.
408 */
409 ObjectArray<PoolConnectionInfo> LIBNXDB_EXPORTABLE *DBConnectionPoolGetConnectionList()
410 {
411 ObjectArray<PoolConnectionInfo> *list = new ObjectArray<PoolConnectionInfo>(32, 32, true);
412 MutexLock(m_poolAccessMutex);
413 for(int i = 0; i < m_connections.size(); i++)
414 if (m_connections.get(i)->inUse)
415 {
416 list->add((PoolConnectionInfo *)nx_memdup(m_connections.get(i), sizeof(PoolConnectionInfo)));
417 }
418 MutexUnlock(m_poolAccessMutex);
419 return list;
420 }