Raw DCI values saved in history along with corresponding transformed values
[public/netxms.git] / src / server / core / dbwrite.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dbwrite.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Delayed SQL request
27 */
28 struct DELAYED_SQL_REQUEST
29 {
30 TCHAR *query;
31 int bindCount;
32 BYTE *sqlTypes;
33 TCHAR *bindings[1]; /* actual size determined by bindCount field */
34 };
35
36 /**
37 * Delayed request for idata_ INSERT
38 */
39 struct DELAYED_IDATA_INSERT
40 {
41 time_t timestamp;
42 UINT32 nodeId;
43 UINT32 dciId;
44 TCHAR rawValue[MAX_RESULT_LENGTH];
45 TCHAR transformedValue[MAX_RESULT_LENGTH];
46 };
47
48 /**
49 * Delayed request for raw_dci_values UPDATE
50 */
51 struct DELAYED_RAW_DATA_UPDATE
52 {
53 time_t timestamp;
54 UINT32 dciId;
55 TCHAR rawValue[MAX_RESULT_LENGTH];
56 TCHAR transformedValue[MAX_RESULT_LENGTH];
57 };
58
59 /**
60 * Generic DB writer queue
61 */
62 Queue *g_dbWriterQueue = NULL;
63
64 /**
65 * DCI data (idata_* tables) writer queue
66 */
67 Queue *g_dciDataWriterQueue = NULL;
68
69 /**
70 * Raw DCI data writer queue
71 */
72 Queue *g_dciRawDataWriterQueue = NULL;
73
74 /**
75 * Performance counters
76 */
77 UINT64 g_idataWriteRequests = 0;
78 UINT64 g_rawDataWriteRequests = 0;
79 UINT64 g_otherWriteRequests = 0;
80
81 /**
82 * Static data
83 */
84 static THREAD s_writerThread = INVALID_THREAD_HANDLE;
85 static THREAD s_iDataWriterThread = INVALID_THREAD_HANDLE;
86 static THREAD s_rawDataWriterThread = INVALID_THREAD_HANDLE;
87
88 /**
89 * Put SQL request into queue for later execution
90 */
91 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query)
92 {
93 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(sizeof(DELAYED_SQL_REQUEST) + (_tcslen(query) + 1) * sizeof(TCHAR));
94 rq->query = (TCHAR *)&rq->bindings[0];
95 _tcscpy(rq->query, query);
96 rq->bindCount = 0;
97 g_dbWriterQueue->put(rq);
98 DbgPrintf(8, _T("SQL request queued: %s"), query);
99 g_otherWriteRequests++;
100 }
101
102 /**
103 * Put parameterized SQL request into queue for later execution
104 */
105 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *sqlTypes, const TCHAR **values)
106 {
107 int size = sizeof(DELAYED_SQL_REQUEST) + ((int)_tcslen(query) + 1) * sizeof(TCHAR) + bindCount * sizeof(TCHAR *) + bindCount;
108 for(int i = 0; i < bindCount; i++)
109 {
110 if (values[i] != NULL)
111 size += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR) + sizeof(TCHAR *);
112 }
113 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(size);
114
115 BYTE *base = (BYTE *)&rq->bindings[bindCount];
116 int pos = 0;
117 int align = sizeof(TCHAR *);
118
119 rq->query = (TCHAR *)base;
120 _tcscpy(rq->query, query);
121 rq->bindCount = bindCount;
122 pos += ((int)_tcslen(query) + 1) * sizeof(TCHAR);
123
124 rq->sqlTypes = &base[pos];
125 pos += bindCount;
126 if (pos % align != 0)
127 pos += align - pos % align;
128
129 for(int i = 0; i < bindCount; i++)
130 {
131 rq->sqlTypes[i] = (BYTE)sqlTypes[i];
132 if (values[i] != NULL)
133 {
134 rq->bindings[i] = (TCHAR *)&base[pos];
135 _tcscpy(rq->bindings[i], values[i]);
136 pos += ((int)_tcslen(values[i]) + 1) * sizeof(TCHAR);
137 if (pos % align != 0)
138 pos += align - pos % align;
139 }
140 else
141 {
142 rq->bindings[i] = NULL;
143 }
144 }
145
146 g_dbWriterQueue->put(rq);
147 DbgPrintf(8, _T("SQL request queued: %s"), query);
148 g_otherWriteRequests++;
149 }
150
151 /**
152 * Queue INSERT request for idata_xxx table
153 */
154 void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR *rawValue, const TCHAR *transformedValue)
155 {
156 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)malloc(sizeof(DELAYED_IDATA_INSERT));
157 rq->timestamp = timestamp;
158 rq->nodeId = nodeId;
159 rq->dciId = dciId;
160 _tcslcpy(rq->rawValue, rawValue, MAX_RESULT_LENGTH);
161 _tcslcpy(rq->transformedValue, transformedValue, MAX_RESULT_LENGTH);
162 g_dciDataWriterQueue->put(rq);
163 g_idataWriteRequests++;
164 }
165
166 /**
167 * Queue UPDATE request for raw_dci_values table
168 */
169 void QueueRawDciDataUpdate(time_t timestamp, UINT32 dciId, const TCHAR *rawValue, const TCHAR *transformedValue)
170 {
171 DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)malloc(sizeof(DELAYED_RAW_DATA_UPDATE));
172 rq->timestamp = timestamp;
173 rq->dciId = dciId;
174 _tcslcpy(rq->rawValue, rawValue, MAX_RESULT_LENGTH);
175 _tcslcpy(rq->transformedValue, transformedValue, MAX_RESULT_LENGTH);
176 g_dciRawDataWriterQueue->put(rq);
177 g_rawDataWriteRequests++;
178 }
179
180 /**
181 * Database "lazy" write thread
182 */
183 static THREAD_RESULT THREAD_CALL DBWriteThread(void *arg)
184 {
185 ThreadSetName("DBWriter");
186 while(true)
187 {
188 DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_dbWriterQueue->getOrBlock();
189 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
190 break;
191
192 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
193
194 if (rq->bindCount == 0)
195 {
196 DBQuery(hdb, rq->query);
197 }
198 else
199 {
200 DB_STATEMENT hStmt = DBPrepare(hdb, rq->query);
201 if (hStmt != NULL)
202 {
203 for(int i = 0; i < rq->bindCount; i++)
204 {
205 DBBind(hStmt, i + 1, (int)rq->sqlTypes[i], rq->bindings[i], DB_BIND_STATIC);
206 }
207 DBExecute(hStmt);
208 DBFreeStatement(hStmt);
209 }
210 }
211 free(rq);
212
213 DBConnectionPoolReleaseConnection(hdb);
214 }
215
216 return THREAD_OK;
217 }
218
219 /**
220 * Database "lazy" write thread for idata_xxx INSERTs
221 */
222 static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
223 {
224 ThreadSetName("DBWriter/IData");
225 int maxRecords = ConfigReadInt(_T("DBWriter.MaxRecordsPerTransaction"), 1000);
226 while(true)
227 {
228 DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->getOrBlock();
229 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
230 break;
231
232 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
233 if (DBBegin(hdb))
234 {
235 int count = 0;
236 while(1)
237 {
238 bool success;
239
240 // For Oracle preparing statement even for one time execution is preferred
241 // For other databases it will actually slow down inserts
242 if (g_dbSyntax == DB_SYNTAX_ORACLE)
243 {
244 TCHAR query[256];
245 _sntprintf(query, 256, _T("INSERT INTO idata_%d (item_id,idata_timestamp,idata_value,raw_value) VALUES (?,?,?,?)"), (int)rq->nodeId);
246 DB_STATEMENT hStmt = DBPrepare(hdb, query);
247 if (hStmt != NULL)
248 {
249 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, rq->dciId);
250 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
251 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, rq->transformedValue, DB_BIND_STATIC);
252 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, rq->rawValue, DB_BIND_STATIC);
253 success = DBExecute(hStmt);
254 DBFreeStatement(hStmt);
255 }
256 else
257 {
258 success = false;
259 }
260 }
261 else
262 {
263 TCHAR query[1024];
264 _sntprintf(query, 1024, _T("INSERT INTO idata_%d (item_id,idata_timestamp,idata_value,raw_value) VALUES (%d,%d,%s,%s)"),
265 (int)rq->nodeId, (int)rq->dciId, (int)rq->timestamp,
266 (const TCHAR *)DBPrepareString(hdb, rq->transformedValue),
267 (const TCHAR *)DBPrepareString(hdb, rq->rawValue));
268 success = DBQuery(hdb, query);
269 }
270
271 free(rq);
272
273 count++;
274 if (!success || (count > maxRecords))
275 break;
276
277 rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->get();
278 if ((rq == NULL) || (rq == INVALID_POINTER_VALUE))
279 break;
280 }
281 DBCommit(hdb);
282 }
283 else
284 {
285 free(rq);
286 }
287 DBConnectionPoolReleaseConnection(hdb);
288 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
289 break;
290 }
291
292 return THREAD_OK;
293 }
294
295 /**
296 * Database "lazy" write thread for raw_dci_values UPDATEs
297 */
298 static THREAD_RESULT THREAD_CALL RawDataWriteThread(void *arg)
299 {
300 ThreadSetName("DBWriter/RData");
301 int maxRecords = ConfigReadInt(_T("DBWriter.MaxRecordsPerTransaction"), 1000);
302 while(true)
303 {
304 DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->getOrBlock();
305 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
306 break;
307
308 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
309 if (DBBegin(hdb))
310 {
311 DB_STATEMENT hStmt = DBPrepare(hdb, _T("UPDATE raw_dci_values SET raw_value=?,transformed_value=?,last_poll_time=? WHERE item_id=?"));
312 if (hStmt != NULL)
313 {
314 int count = 0;
315 while(true)
316 {
317 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, rq->rawValue, DB_BIND_STATIC);
318 DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, rq->transformedValue, DB_BIND_STATIC);
319 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
320 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, rq->dciId);
321 bool success = DBExecute(hStmt);
322
323 free(rq);
324
325 count++;
326 if (!success || (maxRecords > 1000))
327 break;
328
329 rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->get();
330 if ((rq == NULL) || (rq == INVALID_POINTER_VALUE))
331 break;
332 }
333 DBFreeStatement(hStmt);
334 }
335 DBCommit(hdb);
336 }
337 else
338 {
339 free(rq);
340 }
341 DBConnectionPoolReleaseConnection(hdb);
342 if (rq == INVALID_POINTER_VALUE) // End-of-job indicator
343 break;
344 }
345
346 return THREAD_OK;
347 }
348
349 /**
350 * Start writer thread
351 */
352 void StartDBWriter()
353 {
354 s_writerThread = ThreadCreateEx(DBWriteThread, 0, NULL);
355 s_iDataWriterThread = ThreadCreateEx(IDataWriteThread, 0, NULL);
356 s_rawDataWriterThread = ThreadCreateEx(RawDataWriteThread, 0, NULL);
357 }
358
359 /**
360 * Stop writer thread and wait while all queries will be executed
361 */
362 void StopDBWriter()
363 {
364 g_dbWriterQueue->put(INVALID_POINTER_VALUE);
365 g_dciDataWriterQueue->put(INVALID_POINTER_VALUE);
366 g_dciRawDataWriterQueue->put(INVALID_POINTER_VALUE);
367 ThreadJoin(s_writerThread);
368 ThreadJoin(s_iDataWriterThread);
369 ThreadJoin(s_rawDataWriterThread);
370 }