9bad42196fd309d85c4f0a951d8fef3711086192
[public/netxms.git] / src / server / core / dbwrite.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2011 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dbwrite.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25
26 //
27 // Constants
28 //
29
30 #define MAX_DB_WRITERS 16
31
32
33 //
34 // Global variables
35 //
36
37 Queue *g_pLazyRequestQueue = NULL;
38 Queue *g_pIDataInsertQueue = NULL;
39
40
41 //
42 // Static data
43 //
44
45 static int m_iNumWriters = 1;
46 static THREAD m_hWriteThreadList[MAX_DB_WRITERS];
47 static THREAD m_hIDataWriterThread;
48
49
50 //
51 // Put SQL request into queue for later execution
52 //
53
54 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query)
55 {
56 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(sizeof(DELAYED_SQL_REQUEST) + (_tcslen(query) + 1) * sizeof(TCHAR));
57 rq->query = (TCHAR *)&rq->bindings[0];
58 _tcscpy(rq->query, query);
59 rq->bindCount = 0;
60 g_pLazyRequestQueue->Put(rq);
61 DbgPrintf(8, _T("SQL request queued: %s"), query);
62 }
63
64 /**
65 * Put parameterized SQL request into queue for later execution
66 */
67 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *sqlTypes, const TCHAR **values)
68 {
69 int size = sizeof(DELAYED_SQL_REQUEST) + ((int)_tcslen(query) + 1) * sizeof(TCHAR) + bindCount * sizeof(TCHAR *) + bindCount;
70 for(int i = 0; i < bindCount; i++)
71 size += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR) + sizeof(TCHAR *);
72 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(size);
73
74 BYTE *base = (BYTE *)&rq->bindings[bindCount];
75 int pos = 0;
76 int align = sizeof(TCHAR *);
77
78 rq->query = (TCHAR *)base;
79 _tcscpy(rq->query, query);
80 rq->bindCount = bindCount;
81 pos += ((int)_tcslen(query) + 1) * sizeof(TCHAR);
82
83 rq->sqlTypes = &base[pos];
84 pos += bindCount;
85 if (pos % align != 0)
86 pos += align - pos % align;
87
88 for(int i = 0; i < bindCount; i++)
89 {
90 rq->sqlTypes[i] = (BYTE)sqlTypes[i];
91 rq->bindings[i] = (TCHAR *)&base[pos];
92 _tcscpy(rq->bindings[i], values[i]);
93 pos += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR);
94 if (pos % align != 0)
95 pos += align - pos % align;
96 }
97
98 g_pLazyRequestQueue->Put(rq);
99 DbgPrintf(8, _T("SQL request queued: %s"), query);
100 }
101
102
103 //
104 // Queue INSERT request for idata_xxx table
105 //
106
107 void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR *value)
108 {
109 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)malloc(sizeof(DELAYED_IDATA_INSERT));
110 rq->timestamp = timestamp;
111 rq->nodeId = nodeId;
112 rq->dciId = dciId;
113 nx_strncpy(rq->value, value, MAX_RESULT_LENGTH);
114 g_pIDataInsertQueue->Put(rq);
115 }
116
117 /**
118 * Database "lazy" write thread
119 */
120 static THREAD_RESULT THREAD_CALL DBWriteThread(void *arg)
121 {
122 DB_HANDLE hdb;
123
124 if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
125 {
126 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
127 hdb = DBConnect(g_dbDriver, g_szDbServer, g_szDbName, g_szDbLogin, g_szDbPassword, g_szDbSchema, errorText);
128 if (hdb == NULL)
129 {
130 nxlog_write(MSG_DB_CONNFAIL, EVENTLOG_ERROR_TYPE, "s", errorText);
131 return THREAD_OK;
132 }
133 }
134 else
135 {
136 hdb = g_hCoreDB;
137 }
138
139 while(1)
140 {
141 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_pLazyRequestQueue->GetOrBlock();
142 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
143 break;
144
145 if (rq->bindCount == 0)
146 {
147 DBQuery(hdb, rq->query);
148 }
149 else
150 {
151 DB_STATEMENT hStmt = DBPrepare(hdb, rq->query);
152 if (hStmt != NULL)
153 {
154 for(int i = 0; i < rq->bindCount; i++)
155 {
156 DBBind(hStmt, i + 1, (int)rq->sqlTypes[i], rq->bindings[i], DB_BIND_STATIC);
157 }
158 DBExecute(hStmt);
159 DBFreeStatement(hStmt);
160 }
161 }
162 free(rq);
163 }
164
165 if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
166 {
167 DBDisconnect(hdb);
168 }
169 return THREAD_OK;
170 }
171
172 /**
173 * Database "lazy" write thread for idata_xxx INSERTs
174 */
175 static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
176 {
177 DB_HANDLE hdb;
178
179 if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
180 {
181 TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
182 hdb = DBConnect(g_dbDriver, g_szDbServer, g_szDbName, g_szDbLogin, g_szDbPassword, g_szDbSchema, errorText);
183 if (hdb == NULL)
184 {
185 nxlog_write(MSG_DB_CONNFAIL, EVENTLOG_ERROR_TYPE, "s", errorText);
186 return THREAD_OK;
187 }
188 }
189 else
190 {
191 hdb = g_hCoreDB;
192 }
193
194 while(1)
195 {
196 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_pIDataInsertQueue->GetOrBlock();
197 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
198 break;
199
200 if (DBBegin(hdb))
201 {
202 int count = 0;
203 while(1)
204 {
205 TCHAR query[256];
206 BOOL success;
207
208 _sntprintf(query, 256, _T("INSERT INTO idata_%d (item_id,idata_timestamp,idata_value) VALUES (?,?,?)"), (int)rq->nodeId);
209 DB_STATEMENT hStmt = DBPrepare(hdb, query);
210 if (hStmt != NULL)
211 {
212 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, rq->dciId);
213 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
214 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, rq->value, DB_BIND_STATIC);
215 success = DBExecute(hStmt);
216 DBFreeStatement(hStmt);
217 }
218 else
219 {
220 success = FALSE;
221 }
222 free(rq);
223
224 count++;
225 if (!success || (count > 1000))
226 break;
227
228 rq = (DELAYED_IDATA_INSERT *)g_pIDataInsertQueue->Get();
229 if (rq == NULL)
230 break;
231 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
232 goto stop;
233 }
234 DBCommit(hdb);
235 }
236 else
237 {
238 free(rq);
239 }
240 }
241
242 stop:
243 if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
244 {
245 DBDisconnect(hdb);
246 }
247 return THREAD_OK;
248 }
249
250 /**
251 * Start writer thread
252 */
253 void StartDBWriter()
254 {
255 int i;
256
257 if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
258 {
259 m_iNumWriters = ConfigReadInt(_T("NumberOfDatabaseWriters"), 1);
260 if (m_iNumWriters < 1)
261 m_iNumWriters = 1;
262 if (m_iNumWriters > MAX_DB_WRITERS)
263 m_iNumWriters = MAX_DB_WRITERS;
264 }
265
266 for(i = 0; i < m_iNumWriters; i++)
267 m_hWriteThreadList[i] = ThreadCreateEx(DBWriteThread, 0, NULL);
268
269 m_hIDataWriterThread = ThreadCreateEx(IDataWriteThread, 0, NULL);
270 }
271
272 /**
273 * Stop writer thread and wait while all queries will be executed
274 */
275 void StopDBWriter()
276 {
277 int i;
278
279 for(i = 0; i < m_iNumWriters; i++)
280 g_pLazyRequestQueue->Put(INVALID_POINTER_VALUE);
281 for(i = 0; i < m_iNumWriters; i++)
282 ThreadJoin(m_hWriteThreadList[i]);
283
284 g_pIDataInsertQueue->Put(INVALID_POINTER_VALUE);
285 ThreadJoin(m_hIDataWriterThread);
286 }