3fe952ae61963d32db7e99f99670cb68dd022987
[public/netxms.git] / src / server / core / dbwrite.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dbwrite.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Generic DB writer queue
27 */
28 Queue *g_dbWriterQueue = NULL;
29
30 /**
31 * DCI data (idata_* tables) writer queue
32 */
33 Queue *g_dciDataWriterQueue = NULL;
34
35 /**
36 * Raw DCI data writer queue
37 */
38 Queue *g_dciRawDataWriterQueue = NULL;
39
40 /**
41 * Performance counters
42 */
43 UINT64 g_idataWriteRequests = 0;
44 UINT64 g_rawDataWriteRequests = 0;
45 UINT64 g_otherWriteRequests = 0;
46
47 /**
48 * Static data
49 */
50 static THREAD s_writerThread = INVALID_THREAD_HANDLE;
51 static THREAD s_iDataWriterThread = INVALID_THREAD_HANDLE;
52 static THREAD s_rawDataWriterThread = INVALID_THREAD_HANDLE;
53
54 /**
55 * Put SQL request into queue for later execution
56 */
57 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query)
58 {
59 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(sizeof(DELAYED_SQL_REQUEST) + (_tcslen(query) + 1) * sizeof(TCHAR));
60 rq->query = (TCHAR *)&rq->bindings[0];
61 _tcscpy(rq->query, query);
62 rq->bindCount = 0;
63 g_dbWriterQueue->put(rq);
64 DbgPrintf(8, _T("SQL request queued: %s"), query);
65 g_otherWriteRequests++;
66 }
67
68 /**
69 * Put parameterized SQL request into queue for later execution
70 */
71 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *sqlTypes, const TCHAR **values)
72 {
73 int size = sizeof(DELAYED_SQL_REQUEST) + ((int)_tcslen(query) + 1) * sizeof(TCHAR) + bindCount * sizeof(TCHAR *) + bindCount;
74 for(int i = 0; i < bindCount; i++)
75 {
76 if (values[i] != NULL)
77 size += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR) + sizeof(TCHAR *);
78 }
79 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(size);
80
81 BYTE *base = (BYTE *)&rq->bindings[bindCount];
82 int pos = 0;
83 int align = sizeof(TCHAR *);
84
85 rq->query = (TCHAR *)base;
86 _tcscpy(rq->query, query);
87 rq->bindCount = bindCount;
88 pos += ((int)_tcslen(query) + 1) * sizeof(TCHAR);
89
90 rq->sqlTypes = &base[pos];
91 pos += bindCount;
92 if (pos % align != 0)
93 pos += align - pos % align;
94
95 for(int i = 0; i < bindCount; i++)
96 {
97 rq->sqlTypes[i] = (BYTE)sqlTypes[i];
98 if (values[i] != NULL)
99 {
100 rq->bindings[i] = (TCHAR *)&base[pos];
101 _tcscpy(rq->bindings[i], values[i]);
102 pos += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR);
103 if (pos % align != 0)
104 pos += align - pos % align;
105 }
106 else
107 {
108 rq->bindings[i] = NULL;
109 }
110 }
111
112 g_dbWriterQueue->put(rq);
113 DbgPrintf(8, _T("SQL request queued: %s"), query);
114 g_otherWriteRequests++;
115 }
116
117 /**
118 * Queue INSERT request for idata_xxx table
119 */
120 void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR *value)
121 {
122 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)malloc(sizeof(DELAYED_IDATA_INSERT));
123 rq->timestamp = timestamp;
124 rq->nodeId = nodeId;
125 rq->dciId = dciId;
126 nx_strncpy(rq->value, value, MAX_RESULT_LENGTH);
127 g_dciDataWriterQueue->put(rq);
128 g_idataWriteRequests++;
129 }
130
131 /**
132 * Queue UPDATE request for raw_dci_values table
133 */
134 void QueueRawDciDataUpdate(time_t timestamp, UINT32 dciId, const TCHAR *rawValue, const TCHAR *transformedValue)
135 {
136 DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)malloc(sizeof(DELAYED_RAW_DATA_UPDATE));
137 rq->timestamp = timestamp;
138 rq->dciId = dciId;
139 nx_strncpy(rq->rawValue, rawValue, MAX_RESULT_LENGTH);
140 nx_strncpy(rq->transformedValue, transformedValue, MAX_RESULT_LENGTH);
141 g_dciRawDataWriterQueue->put(rq);
142 g_rawDataWriteRequests++;
143 }
144
145 /**
146 * Database "lazy" write thread
147 */
148 static THREAD_RESULT THREAD_CALL DBWriteThread(void *arg)
149 {
150 ThreadSetName("DBWriter");
151 while(true)
152 {
153 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_dbWriterQueue->getOrBlock();
154 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
155 break;
156
157 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
158
159 if (rq->bindCount == 0)
160 {
161 DBQuery(hdb, rq->query);
162 }
163 else
164 {
165 DB_STATEMENT hStmt = DBPrepare(hdb, rq->query);
166 if (hStmt != NULL)
167 {
168 for(int i = 0; i < rq->bindCount; i++)
169 {
170 DBBind(hStmt, i + 1, (int)rq->sqlTypes[i], rq->bindings[i], DB_BIND_STATIC);
171 }
172 DBExecute(hStmt);
173 DBFreeStatement(hStmt);
174 }
175 }
176 free(rq);
177
178 DBConnectionPoolReleaseConnection(hdb);
179 }
180
181 return THREAD_OK;
182 }
183
184 /**
185 * Database "lazy" write thread for idata_xxx INSERTs
186 */
187 static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
188 {
189 ThreadSetName("DBWriter/IData");
190 int maxRecords = ConfigReadInt(_T("DBWriter.MaxRecordsPerTransaction"), 1000);
191 while(true)
192 {
193 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->getOrBlock();
194 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
195 break;
196
197 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
198 if (DBBegin(hdb))
199 {
200 int count = 0;
201 while(1)
202 {
203 bool success;
204
205 // For Oracle preparing statement even for one time execution is preferred
206 // For other databases it will actually slow down inserts
207 if (g_dbSyntax == DB_SYNTAX_ORACLE)
208 {
209 TCHAR query[256];
210 _sntprintf(query, 256, _T("INSERT INTO idata_%d (item_id,idata_timestamp,idata_value) VALUES (?,?,?)"), (int)rq->nodeId);
211 DB_STATEMENT hStmt = DBPrepare(hdb, query);
212 if (hStmt != NULL)
213 {
214 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, rq->dciId);
215 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
216 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, rq->value, DB_BIND_STATIC);
217 success = DBExecute(hStmt);
218 DBFreeStatement(hStmt);
219 }
220 else
221 {
222 success = false;
223 }
224 }
225 else
226 {
227 TCHAR query[1024];
228 _sntprintf(query, 1024, _T("INSERT INTO idata_%d (item_id,idata_timestamp,idata_value) VALUES (%d,%d,%s)"),
229 (int)rq->nodeId, (int)rq->dciId, (int)rq->timestamp, (const TCHAR *)DBPrepareString(hdb, rq->value));
230 success = DBQuery(hdb, query);
231 }
232
233 free(rq);
234
235 count++;
236 if (!success || (count > maxRecords))
237 break;
238
239 rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->get();
240 if ((rq == NULL) || (rq == INVALID_POINTER_VALUE))
241 break;
242 }
243 DBCommit(hdb);
244 }
245 else
246 {
247 free(rq);
248 }
249 DBConnectionPoolReleaseConnection(hdb);
250 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
251 break;
252 }
253
254 return THREAD_OK;
255 }
256
257 /**
258 * Database "lazy" write thread for raw_dci_values UPDATEs
259 */
260 static THREAD_RESULT THREAD_CALL RawDataWriteThread(void *arg)
261 {
262 ThreadSetName("DBWriter/RData");
263 int maxRecords = ConfigReadInt(_T("DBWriter.MaxRecordsPerTransaction"), 1000);
264 while(true)
265 {
266 DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->getOrBlock();
267 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
268 break;
269
270 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
271 if (DBBegin(hdb))
272 {
273 DB_STATEMENT hStmt = DBPrepare(hdb, _T("UPDATE raw_dci_values SET raw_value=?,transformed_value=?,last_poll_time=? WHERE item_id=?"));
274 if (hStmt != NULL)
275 {
276 int count = 0;
277 while(true)
278 {
279 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, rq->rawValue, DB_BIND_STATIC);
280 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, rq->transformedValue, DB_BIND_STATIC);
281 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
282 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, rq->dciId);
283 bool success = DBExecute(hStmt);
284
285 free(rq);
286
287 count++;
288 if (!success || (maxRecords > 1000))
289 break;
290
291 rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->get();
292 if ((rq == NULL) || (rq == INVALID_POINTER_VALUE))
293 break;
294 }
295 DBFreeStatement(hStmt);
296 }
297 DBCommit(hdb);
298 }
299 else
300 {
301 free(rq);
302 }
303 DBConnectionPoolReleaseConnection(hdb);
304 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
305 break;
306 }
307
308 return THREAD_OK;
309 }
310
311 /**
312 * Start writer thread
313 */
314 void StartDBWriter()
315 {
316 s_writerThread = ThreadCreateEx(DBWriteThread, 0, NULL);
317 s_iDataWriterThread = ThreadCreateEx(IDataWriteThread, 0, NULL);
318 s_rawDataWriterThread = ThreadCreateEx(RawDataWriteThread, 0, NULL);
319 }
320
321 /**
322 * Stop writer thread and wait while all queries will be executed
323 */
324 void StopDBWriter()
325 {
326 g_dbWriterQueue->put(INVALID_POINTER_VALUE);
327 g_dciDataWriterQueue->put(INVALID_POINTER_VALUE);
328 g_dciRawDataWriterQueue->put(INVALID_POINTER_VALUE);
329 ThreadJoin(s_writerThread);
330 ThreadJoin(s_iDataWriterThread);
331 ThreadJoin(s_rawDataWriterThread);
332 }