String class refactored; background log writer option implemented; fixed incorrect...
[public/netxms.git] / src / server / core / dctable.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dctable.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Column ID cache
27 */
28 TC_ID_MAP_ENTRY *DCTable::m_cache = NULL;
29 int DCTable::m_cacheSize = 0;
30 int DCTable::m_cacheAllocated = 0;
31 MUTEX DCTable::m_cacheMutex = MutexCreate();
32
33 /**
34 * Compare cache element's name to string key
35 */
36 static int CompareCacheElements(const void *key, const void *element)
37 {
38 return _tcsicmp((const TCHAR *)key, ((TC_ID_MAP_ENTRY *)element)->name);
39 }
40
41 /**
42 * Compare names of two cache elements
43 */
44 static int CompareCacheElements2(const void *e1, const void *e2)
45 {
46 return _tcsicmp(((TC_ID_MAP_ENTRY *)e1)->name, ((TC_ID_MAP_ENTRY *)e2)->name);
47 }
48
49 /**
50 * Get column ID from column name
51 */
52 INT32 DCTable::columnIdFromName(const TCHAR *name)
53 {
54 TC_ID_MAP_ENTRY buffer;
55
56 // check that column name is valid
57 if ((name == NULL) || (*name == 0))
58 return 0;
59
60 MutexLock(m_cacheMutex);
61
62 TC_ID_MAP_ENTRY *entry = (TC_ID_MAP_ENTRY *)bsearch(name, m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements);
63 if (entry == NULL)
64 {
65 // Not in cache, go to database
66 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
67
68 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT column_id FROM dct_column_names WHERE column_name=?"));
69 if (hStmt != NULL)
70 {
71 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
72 DB_RESULT hResult = DBSelectPrepared(hStmt);
73 if (hResult != NULL)
74 {
75 entry = &buffer;
76 nx_strncpy(entry->name, name, MAX_COLUMN_NAME);
77 if (DBGetNumRows(hResult) > 0)
78 {
79 // found in database
80 entry->id = DBGetFieldLong(hResult, 0, 0);
81 }
82 else
83 {
84 // no such column name in database
85 entry->id = CreateUniqueId(IDG_DCT_COLUMN);
86
87 // update database
88 DB_STATEMENT hStmt2 = DBPrepare(hdb, _T("INSERT INTO dct_column_names (column_id,column_name) VALUES (?,?)"));
89 if (hStmt2 != NULL)
90 {
91 DBBind(hStmt2, 1, DB_SQLTYPE_INTEGER, entry->id);
92 DBBind(hStmt2, 2, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
93 DBExecute(hStmt2);
94 DBFreeStatement(hStmt2);
95 }
96 }
97
98 DBFreeResult(hResult);
99
100 // Add to cache
101 if (m_cacheSize == m_cacheAllocated)
102 {
103 m_cacheAllocated += 16;
104 m_cache = (TC_ID_MAP_ENTRY *)realloc(m_cache, sizeof(TC_ID_MAP_ENTRY) * m_cacheAllocated);
105 }
106 memcpy(&m_cache[m_cacheSize++], entry, sizeof(TC_ID_MAP_ENTRY));
107 qsort(m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements2);
108
109 DbgPrintf(6, _T("DCTable::columnIdFromName(): column name %s added to cache, ID=%d"), name, (int)entry->id);
110 }
111 DBFreeStatement(hStmt);
112 }
113
114 DBConnectionPoolReleaseConnection(hdb);
115 }
116
117 MutexUnlock(m_cacheMutex);
118 return (entry != NULL) ? entry->id : 0;
119 }
120
121 /**
122 * Default constructor
123 */
124 DCTable::DCTable() : DCObject()
125 {
126 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
127 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
128 m_lastValue = NULL;
129 }
130
131 /**
132 * Copy constructor
133 */
134 DCTable::DCTable(const DCTable *src) : DCObject(src)
135 {
136 m_columns = new ObjectArray<DCTableColumn>(src->m_columns->size(), 8, true);
137 for(int i = 0; i < src->m_columns->size(); i++)
138 m_columns->add(new DCTableColumn(src->m_columns->get(i)));
139 m_thresholds = new ObjectArray<DCTableThreshold>(src->m_thresholds->size(), 4, true);
140 for(int i = 0; i < src->m_thresholds->size(); i++)
141 m_thresholds->add(new DCTableThreshold(src->m_thresholds->get(i)));
142 m_lastValue = NULL;
143 }
144
145 /**
146 * Constructor for creating new DCTable from scratch
147 */
148 DCTable::DCTable(UINT32 id, const TCHAR *name, int source, int pollingInterval, int retentionTime,
149 Template *node, const TCHAR *description, const TCHAR *systemTag)
150 : DCObject(id, name, source, pollingInterval, retentionTime, node, description, systemTag)
151 {
152 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
153 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
154 m_lastValue = NULL;
155 }
156
157 /**
158 * Constructor for creating DCTable from database
159 * Assumes that fields in SELECT query are in following order:
160 * item_id,template_id,template_item_id,name,
161 * description,flags,source,snmp_port,polling_interval,retention_time,
162 * status,system_tag,resource_id,proxy_node,perftab_settings,
163 * transformation_script,comments
164 */
165 DCTable::DCTable(DB_RESULT hResult, int iRow, Template *pNode) : DCObject()
166 {
167 m_id = DBGetFieldULong(hResult, iRow, 0);
168 m_dwTemplateId = DBGetFieldULong(hResult, iRow, 1);
169 m_dwTemplateItemId = DBGetFieldULong(hResult, iRow, 2);
170 DBGetField(hResult, iRow, 3, m_name, MAX_ITEM_NAME);
171 DBGetField(hResult, iRow, 4, m_szDescription, MAX_DB_STRING);
172 m_flags = (WORD)DBGetFieldLong(hResult, iRow, 5);
173 m_source = (BYTE)DBGetFieldLong(hResult, iRow, 6);
174 m_snmpPort = (WORD)DBGetFieldLong(hResult, iRow, 7);
175 m_iPollingInterval = DBGetFieldLong(hResult, iRow, 8);
176 m_iRetentionTime = DBGetFieldLong(hResult, iRow, 9);
177 m_status = (BYTE)DBGetFieldLong(hResult, iRow, 10);
178 DBGetField(hResult, iRow, 11, m_systemTag, MAX_DB_STRING);
179 m_dwResourceId = DBGetFieldULong(hResult, iRow, 12);
180 m_dwProxyNode = DBGetFieldULong(hResult, iRow, 13);
181 m_pszPerfTabSettings = DBGetField(hResult, iRow, 14, NULL, 0);
182 TCHAR *pszTmp = DBGetField(hResult, iRow, 15, NULL, 0);
183 m_comments = DBGetField(hResult, iRow, 16, NULL, 0);
184 setTransformationScript(pszTmp);
185 free(pszTmp);
186
187 m_pNode = pNode;
188 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
189 m_lastValue = NULL;
190
191 DB_STATEMENT hStmt = DBPrepare(g_hCoreDB, _T("SELECT column_name,flags,snmp_oid,display_name FROM dc_table_columns WHERE table_id=? ORDER BY sequence_number"));
192 if (hStmt != NULL)
193 {
194 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
195 DB_RESULT hColumnList = DBSelectPrepared(hStmt);
196 if (hColumnList != NULL)
197 {
198 int count = DBGetNumRows(hColumnList);
199 for(int i = 0; i < count; i++)
200 m_columns->add(new DCTableColumn(hColumnList, i));
201 DBFreeResult(hColumnList);
202 }
203 DBFreeStatement(hStmt);
204 }
205
206 loadCustomSchedules();
207
208 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
209 loadThresholds();
210 }
211
212 /**
213 * Create DCTable from import file
214 */
215 DCTable::DCTable(ConfigEntry *config, Template *owner) : DCObject(config, owner)
216 {
217 ConfigEntry *columnsRoot = config->findEntry(_T("columns"));
218 if (columnsRoot != NULL)
219 {
220 ObjectArray<ConfigEntry> *columns = columnsRoot->getSubEntries(_T("column#*"));
221 m_columns = new ObjectArray<DCTableColumn>(columns->size(), 8, true);
222 for(int i = 0; i < columns->size(); i++)
223 {
224 m_columns->add(new DCTableColumn(columns->get(i)));
225 }
226 delete columns;
227 }
228 else
229 {
230 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
231 }
232
233 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
234 if (thresholdsRoot != NULL)
235 {
236 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
237 m_thresholds = new ObjectArray<DCTableThreshold>(thresholds->size(), 8, true);
238 for(int i = 0; i < thresholds->size(); i++)
239 {
240 m_thresholds->add(new DCTableThreshold(thresholds->get(i)));
241 }
242 delete thresholds;
243 }
244 else
245 {
246 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
247 }
248
249 m_lastValue = NULL;
250 }
251
252 /**
253 * Destructor
254 */
255 DCTable::~DCTable()
256 {
257 delete m_columns;
258 delete m_thresholds;
259 if (m_lastValue != NULL)
260 m_lastValue->decRefCount();
261 }
262
263 /**
264 * Clean expired data
265 */
266 void DCTable::deleteExpiredData()
267 {
268 TCHAR query[256];
269 time_t now;
270
271 now = time(NULL);
272
273 lock();
274 _sntprintf(query, 256, _T("DELETE FROM tdata_%d WHERE (item_id=%d) AND (tdata_timestamp<%ld)"),
275 (int)m_pNode->getId(), (int)m_id, (long)(now - (time_t)m_iRetentionTime * 86400));
276 unlock();
277
278 QueueSQLRequest(query);
279 }
280
281 /**
282 * Delete all collected data
283 */
284 bool DCTable::deleteAllData()
285 {
286 TCHAR szQuery[256];
287 bool success;
288
289 lock();
290 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
291 _sntprintf(szQuery, 256, _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_pNode->getId(), (int)m_id);
292 success = DBQuery(hdb, szQuery) ? true : false;
293 DBConnectionPoolReleaseConnection(hdb);
294 unlock();
295 return success;
296 }
297
298 /**
299 * Process new collected value. Should return true on success.
300 * If returns false, current poll result will be converted into data collection error.
301 *
302 * @return true on success
303 */
304 bool DCTable::processNewValue(time_t nTimeStamp,const void *value, bool *updateStatus)
305 {
306 *updateStatus = false;
307 lock();
308
309 // Normally m_pNode shouldn't be NULL for polled items, but who knows...
310 if (m_pNode == NULL)
311 {
312 unlock();
313 return false;
314 }
315
316 // Transform input value
317 // Cluster can have only aggregated data, and transformation
318 // should not be used on aggregation
319 if ((m_pNode->getObjectClass() != OBJECT_CLUSTER) || (m_flags & DCF_TRANSFORM_AGGREGATED))
320 {
321 if (!transform((Table *)value))
322 {
323 unlock();
324 return false;
325 }
326 }
327
328 m_dwErrorCount = 0;
329 if (m_lastValue != NULL)
330 m_lastValue->decRefCount();
331 m_lastValue = (Table *)value;
332 m_lastValue->setTitle(m_szDescription);
333 m_lastValue->setSource(m_source);
334
335 // Copy required fields into local variables
336 UINT32 tableId = m_id;
337 UINT32 nodeId = m_pNode->getId();
338 bool save = (m_flags & DCF_NO_STORAGE) == 0;
339
340 unlock();
341
342 // Save data to database
343 // Object is unlocked, so only local variables can be used
344 if (save)
345 {
346 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
347 if (!DBBegin(hdb))
348 {
349 DBConnectionPoolReleaseConnection(hdb);
350 return true;
351 }
352
353 INT64 recordId = ((INT64)time(NULL) << 30) | (((INT64)tableId & 0xFFFF) << 14);
354 BOOL success = FALSE;
355 Table *data = (Table *)value;
356
357 TCHAR query[256];
358 _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,record_id) VALUES (?,?,?)"), (int)nodeId);
359 DB_STATEMENT hStmt = DBPrepare(hdb, query);
360 if (hStmt != NULL)
361 {
362 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, tableId);
363 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)nTimeStamp);
364 DBBind(hStmt, 3, DB_SQLTYPE_BIGINT, recordId);
365 success = DBExecute(hStmt);
366 DBFreeStatement(hStmt);
367 }
368
369 if (success)
370 {
371 _sntprintf(query, 256, _T("INSERT INTO tdata_records_%d (record_id,row_id,instance) VALUES (?,?,?)"), (int)nodeId);
372 DB_STATEMENT hStmt = DBPrepare(hdb, query);
373 if (hStmt != NULL)
374 {
375 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId);
376 for(int row = 0; row < data->getNumRows(); row++)
377 {
378 TCHAR instance[MAX_RESULT_LENGTH];
379 data->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
380 DBBind(hStmt, 2, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
381 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, instance, DB_BIND_STATIC);
382 success = DBExecute(hStmt);
383 if (!success)
384 break;
385 }
386 DBFreeStatement(hStmt);
387 }
388 }
389
390 if (success)
391 {
392 _sntprintf(query, 256, _T("INSERT INTO tdata_rows_%d (row_id,column_id,value) VALUES (?,?,?)"), (int)nodeId);
393 DB_STATEMENT hStmt = DBPrepare(hdb, query);
394 if (hStmt != NULL)
395 {
396 for(int col = 0; col < data->getNumColumns(); col++)
397 {
398 INT32 colId = columnIdFromName(data->getColumnName(col));
399 if (colId == 0)
400 continue; // cannot get column ID
401
402 for(int row = 0; row < data->getNumRows(); row++)
403 {
404 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
405 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, colId);
406 const TCHAR *s = data->getAsString(row, col);
407 if ((s == NULL) || (_tcslen(s) < MAX_DB_STRING))
408 {
409 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, s, DB_BIND_STATIC);
410 }
411 else
412 {
413 TCHAR *sp = (TCHAR *)nx_memdup(s, MAX_DB_STRING * sizeof(TCHAR));
414 sp[MAX_DB_STRING - 1] = 0;
415 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, sp, DB_BIND_DYNAMIC);
416 }
417 success = DBExecute(hStmt);
418 if (!success)
419 break;
420 }
421 }
422 DBFreeStatement(hStmt);
423 }
424 }
425
426 if (success)
427 DBCommit(hdb);
428 else
429 DBRollback(hdb);
430
431 DBConnectionPoolReleaseConnection(hdb);
432 }
433 checkThresholds((Table *)value);
434
435 if (g_flags & AF_PERFDATA_STORAGE_DRIVER_LOADED)
436 PerfDataStorageRequest(this, nTimeStamp, (Table *)value);
437
438 return true;
439 }
440
441 /**
442 * Transform received value
443 */
444 bool DCTable::transform(Table *value)
445 {
446 if (m_transformationScript == NULL)
447 return true;
448
449 NXSL_Value *nxslValue = new NXSL_Value(new NXSL_Object(&g_nxslStaticTableClass, value));
450 m_transformationScript->setGlobalVariable(_T("$object"), new NXSL_Value(new NXSL_Object(&g_nxslNetObjClass, m_pNode)));
451 if (m_pNode->getObjectClass() == OBJECT_NODE)
452 {
453 m_transformationScript->setGlobalVariable(_T("$node"), new NXSL_Value(new NXSL_Object(&g_nxslNodeClass, m_pNode)));
454 }
455 m_transformationScript->setGlobalVariable(_T("$dci"), new NXSL_Value(new NXSL_Object(&g_nxslDciClass, this)));
456 m_transformationScript->setGlobalVariable(_T("$isCluster"), new NXSL_Value((m_pNode->getObjectClass() == OBJECT_CLUSTER) ? 1 : 0));
457
458 bool success = true;
459 if (!m_transformationScript->run(1, &nxslValue))
460 {
461 if (m_transformationScript->getErrorCode() == NXSL_ERR_EXECUTION_ABORTED)
462 {
463 DbgPrintf(6, _T("Transformation script for DCI \"%s\" [%d] on node %s [%d] aborted"),
464 m_szDescription, m_id, (m_pNode != NULL) ? m_pNode->getName() : _T("(null)"), (m_pNode != NULL) ? m_pNode->getId() : 0);
465 }
466 else
467 {
468 TCHAR szBuffer[1024];
469
470 _sntprintf(szBuffer, 1024, _T("DCI::%s::%d::TransformationScript"),
471 (m_pNode != NULL) ? m_pNode->getName() : _T("(null)"), m_id);
472 PostEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, "ssd", szBuffer, m_transformationScript->getErrorText(), m_id);
473 }
474 success = false;
475 }
476 return success;
477 }
478
479 /**
480 * Check thresholds
481 */
482 void DCTable::checkThresholds(Table *value)
483 {
484 static const TCHAR *paramNames[] = { _T("dciName"), _T("dciDescription"), _T("dciId"), _T("row"), _T("instance") };
485
486 lock();
487 for(int row = 0; row < value->getNumRows(); row++)
488 {
489 TCHAR instance[MAX_RESULT_LENGTH];
490 value->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
491 for(int i = 0; i < m_thresholds->size(); i++)
492 {
493 DCTableThreshold *t = m_thresholds->get(i);
494 ThresholdCheckResult result = t->check(value, row, instance);
495 switch(result)
496 {
497 case ACTIVATED:
498 PostEventWithNames(t->getActivationEvent(), m_pNode->getId(), "ssids", paramNames, m_name, m_szDescription, m_id, row, instance);
499 if (!(m_flags & DCF_ALL_THRESHOLDS))
500 i = m_thresholds->size(); // Stop processing (for current row)
501 break;
502 case DEACTIVATED:
503 PostEventWithNames(t->getDeactivationEvent(), m_pNode->getId(), "ssids", paramNames, m_name, m_szDescription, m_id, row, instance);
504 break;
505 case ALREADY_ACTIVE:
506 i = m_thresholds->size(); // Threshold condition still true, stop processing
507 break;
508 default:
509 break;
510 }
511 }
512 }
513 unlock();
514 }
515
516 /**
517 * Process new data collection error
518 */
519 void DCTable::processNewError()
520 {
521 m_dwErrorCount++;
522 }
523
524 /**
525 * Save to database
526 */
527 BOOL DCTable::saveToDB(DB_HANDLE hdb)
528 {
529 DB_STATEMENT hStmt;
530 if (IsDatabaseRecordExist(hdb, _T("dc_tables"), _T("item_id"), m_id))
531 {
532 hStmt = DBPrepare(hdb, _T("UPDATE dc_tables SET node_id=?,template_id=?,template_item_id=?,name=?,")
533 _T("description=?,flags=?,source=?,snmp_port=?,polling_interval=?,")
534 _T("retention_time=?,status=?,system_tag=?,resource_id=?,proxy_node=?,")
535 _T("perftab_settings=?,transformation_script=?,comments=? WHERE item_id=?"));
536 }
537 else
538 {
539 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_tables (node_id,template_id,template_item_id,name,")
540 _T("description,flags,source,snmp_port,polling_interval,")
541 _T("retention_time,status,system_tag,resource_id,proxy_node,perftab_settings,")
542 _T("transformation_script,comments,item_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
543 }
544 if (hStmt == NULL)
545 return FALSE;
546
547 lock();
548
549 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (m_pNode == NULL) ? (UINT32)0 : m_pNode->getId());
550 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_dwTemplateId);
551 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, m_dwTemplateItemId);
552 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
553 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_szDescription, DB_BIND_STATIC);
554 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (UINT32)m_flags);
555 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (INT32)m_source);
556 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (UINT32)m_snmpPort);
557 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (INT32)m_iPollingInterval);
558 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (INT32)m_iRetentionTime);
559 DBBind(hStmt, 11, DB_SQLTYPE_INTEGER, (INT32)m_status);
560 DBBind(hStmt, 12, DB_SQLTYPE_VARCHAR, m_systemTag, DB_BIND_STATIC);
561 DBBind(hStmt, 13, DB_SQLTYPE_INTEGER, m_dwResourceId);
562 DBBind(hStmt, 14, DB_SQLTYPE_INTEGER, m_dwProxyNode);
563 DBBind(hStmt, 15, DB_SQLTYPE_TEXT, m_pszPerfTabSettings, DB_BIND_STATIC);
564 DBBind(hStmt, 16, DB_SQLTYPE_TEXT, m_transformationScriptSource, DB_BIND_STATIC);
565 DBBind(hStmt, 17, DB_SQLTYPE_TEXT, m_comments, DB_BIND_STATIC);
566 DBBind(hStmt, 18, DB_SQLTYPE_INTEGER, m_id);
567
568 BOOL result = DBExecute(hStmt);
569 DBFreeStatement(hStmt);
570
571 if (result)
572 {
573 // Save column configuration
574 hStmt = DBPrepare(hdb, _T("DELETE FROM dc_table_columns WHERE table_id=?"));
575 if (hStmt != NULL)
576 {
577 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
578 result = DBExecute(hStmt);
579 DBFreeStatement(hStmt);
580 }
581 else
582 {
583 result = FALSE;
584 }
585
586 if (result && (m_columns->size() > 0))
587 {
588 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_table_columns (table_id,sequence_number,column_name,snmp_oid,flags,display_name) VALUES (?,?,?,?,?,?)"));
589 if (hStmt != NULL)
590 {
591 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
592 for(int i = 0; i < m_columns->size(); i++)
593 {
594 DCTableColumn *column = m_columns->get(i);
595 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)(i + 1));
596 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, column->getName(), DB_BIND_STATIC);
597 SNMP_ObjectId *oid = column->getSnmpOid();
598 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, (oid != NULL) ? oid->getValueAsText() : NULL, DB_BIND_STATIC);
599 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (INT32)column->getFlags());
600 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, column->getDisplayName(), DB_BIND_STATIC);
601
602 result = DBExecute(hStmt);
603 if (!result)
604 break;
605 }
606 DBFreeStatement(hStmt);
607 }
608 else
609 {
610 result = FALSE;
611 }
612 }
613 }
614
615 saveThresholds(hdb);
616
617 unlock();
618 return result ? DCObject::saveToDB(hdb) : FALSE;
619 }
620
621 /**
622 * Load thresholds from database
623 */
624 bool DCTable::loadThresholds()
625 {
626 DB_STATEMENT hStmt = DBPrepare(g_hCoreDB, _T("SELECT id,activation_event,deactivation_event FROM dct_thresholds WHERE table_id=? ORDER BY sequence_number"));
627 if (hStmt == NULL)
628 return false;
629
630 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
631 DB_RESULT hResult = DBSelectPrepared(hStmt);
632 if (hResult != NULL)
633 {
634 int count = DBGetNumRows(hResult);
635 for(int i = 0; i < count; i++)
636 {
637 DCTableThreshold *t = new DCTableThreshold(hResult, i);
638 m_thresholds->add(t);
639 }
640 DBFreeResult(hResult);
641 }
642 DBFreeStatement(hStmt);
643 return true;
644 }
645
646 /**
647 * Save thresholds to database
648 */
649 bool DCTable::saveThresholds(DB_HANDLE hdb)
650 {
651 DB_STATEMENT hStmt = DBPrepare(hdb, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=?"));
652 if (hStmt == NULL)
653 return false;
654
655 for(int i = 0; i < m_thresholds->size(); i++)
656 {
657 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_thresholds->get(i)->getId());
658 DBExecute(hStmt);
659 }
660 DBFreeStatement(hStmt);
661
662 hStmt = DBPrepare(hdb, _T("DELETE FROM dct_thresholds WHERE table_id=?"));
663 if (hStmt == NULL)
664 return false;
665 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
666 DBExecute(hStmt);
667 DBFreeStatement(hStmt);
668
669 for(int i = 0; i < m_thresholds->size(); i++)
670 m_thresholds->get(i)->saveToDatabase(hdb, m_id, i);
671 return true;
672 }
673
674 /**
675 * Delete table object and collected data from database
676 */
677 void DCTable::deleteFromDatabase()
678 {
679 TCHAR szQuery[256];
680
681 DCObject::deleteFromDatabase();
682
683 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_pNode->getId(), (int)m_id);
684 QueueSQLRequest(szQuery);
685
686 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_tables WHERE item_id=%d"), (int)m_id);
687 QueueSQLRequest(szQuery);
688 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_table_columns WHERE table_id=%d"), (int)m_id);
689 QueueSQLRequest(szQuery);
690
691 for(int i = 0; i < m_thresholds->size(); i++)
692 {
693 _sntprintf(szQuery, 256, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=%d"), (int)m_thresholds->get(i)->getId());
694 QueueSQLRequest(szQuery);
695 }
696
697 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dct_thresholds WHERE table_id=%d"), (int)m_id);
698 QueueSQLRequest(szQuery);
699 }
700
701 /**
702 * Create NXCP message with item data
703 */
704 void DCTable::createMessage(NXCPMessage *pMsg)
705 {
706 DCObject::createMessage(pMsg);
707
708 lock();
709 pMsg->setField(VID_NUM_COLUMNS, (UINT32)m_columns->size());
710 UINT32 varId = VID_DCI_COLUMN_BASE;
711 for(int i = 0; i < m_columns->size(); i++)
712 {
713 DCTableColumn *column = m_columns->get(i);
714 pMsg->setField(varId++, column->getName());
715 pMsg->setField(varId++, column->getFlags());
716 SNMP_ObjectId *oid = column->getSnmpOid();
717 if (oid != NULL)
718 pMsg->setFieldFromInt32Array(varId++, (UINT32)oid->getLength(), oid->getValue());
719 else
720 varId++;
721 pMsg->setField(varId++, column->getDisplayName());
722 varId += 6;
723 }
724
725 pMsg->setField(VID_NUM_THRESHOLDS, (UINT32)m_thresholds->size());
726 varId = VID_DCI_THRESHOLD_BASE;
727 for(int i = 0; i < m_thresholds->size(); i++)
728 {
729 varId = m_thresholds->get(i)->fillMessage(pMsg, varId);
730 }
731
732 unlock();
733 }
734
735 /**
736 * Update data collection object from NXCP message
737 */
738 void DCTable::updateFromMessage(NXCPMessage *pMsg)
739 {
740 DCObject::updateFromMessage(pMsg);
741
742 lock();
743
744 m_columns->clear();
745 int count = (int)pMsg->getFieldAsUInt32(VID_NUM_COLUMNS);
746 UINT32 varId = VID_DCI_COLUMN_BASE;
747 for(int i = 0; i < count; i++)
748 {
749 m_columns->add(new DCTableColumn(pMsg, varId));
750 varId += 10;
751 }
752
753 count = (int)pMsg->getFieldAsUInt32(VID_NUM_THRESHOLDS);
754 ObjectArray<DCTableThreshold> *newThresholds = new ObjectArray<DCTableThreshold>(count, 8, true);
755 varId = VID_DCI_THRESHOLD_BASE;
756 for(int i = 0; i < count; i++)
757 {
758 DCTableThreshold *t = new DCTableThreshold(pMsg, &varId);
759 newThresholds->add(t);
760 for(int j = 0; j < m_thresholds->size(); j++)
761 {
762 DCTableThreshold *old = m_thresholds->get(j);
763 if (old->getId() == t->getId())
764 {
765 t->copyState(old);
766 break;
767 }
768 }
769 }
770 delete m_thresholds;
771 m_thresholds = newThresholds;
772
773 unlock();
774 }
775
776 /**
777 * Get last collected value
778 */
779 void DCTable::fillLastValueMessage(NXCPMessage *msg)
780 {
781 lock();
782 if (m_lastValue != NULL)
783 {
784 m_lastValue->fillMessage(*msg, 0, -1);
785 }
786 unlock();
787 }
788
789 /**
790 * Get summary of last collected value (to show along simple DCI values)
791 */
792 void DCTable::fillLastValueSummaryMessage(NXCPMessage *pMsg, UINT32 dwId)
793 {
794 lock();
795 pMsg->setField(dwId++, m_id);
796 pMsg->setField(dwId++, m_name);
797 pMsg->setField(dwId++, m_szDescription);
798 pMsg->setField(dwId++, (WORD)m_source);
799 pMsg->setField(dwId++, (WORD)DCI_DT_NULL); // compatibility: data type
800 pMsg->setField(dwId++, _T("")); // compatibility: value
801 pMsg->setField(dwId++, (UINT32)m_tLastPoll);
802 pMsg->setField(dwId++, (WORD)(matchClusterResource() ? m_status : ITEM_STATUS_DISABLED)); // show resource-bound DCIs as inactive if cluster resource is not on this node
803 pMsg->setField(dwId++, (WORD)getType());
804 pMsg->setField(dwId++, m_dwErrorCount);
805 pMsg->setField(dwId++, m_dwTemplateItemId);
806 pMsg->setField(dwId++, (WORD)0); // compatibility: number of thresholds
807
808 unlock();
809 }
810
811 /**
812 * Get data type of given column
813 */
814 int DCTable::getColumnDataType(const TCHAR *name)
815 {
816 int dt = DCI_DT_STRING;
817 bool found = false;
818
819 lock();
820
821 // look in column definition first
822 for(int i = 0; i < m_columns->size(); i++)
823 {
824 DCTableColumn *column = m_columns->get(i);
825 if (!_tcsicmp(column->getName(), name))
826 {
827 dt = column->getDataType();
828 break;
829 }
830 }
831
832 // use last values if not found in definitions
833 if (!found && (m_lastValue != NULL))
834 {
835 int index = m_lastValue->getColumnIndex(name);
836 if (index != -1)
837 dt = m_lastValue->getColumnDataType(index);
838 }
839
840 unlock();
841 return dt;
842 }
843
844 /**
845 * Get last collected value
846 */
847 Table *DCTable::getLastValue()
848 {
849 lock();
850 Table *value;
851 if (m_lastValue != NULL)
852 {
853 value = m_lastValue;
854 value->incRefCount();
855 }
856 else
857 {
858 value = NULL;
859 }
860 unlock();
861 return value;
862 }
863
864 /**
865 * Update destination value from source value
866 */
867 #define RECALCULATE_VALUE(dst, src, func, count) \
868 { \
869 switch(func) \
870 { \
871 case DCF_FUNCTION_MIN: \
872 if (src < dst) dst = src; \
873 break; \
874 case DCF_FUNCTION_MAX: \
875 if (src > dst) dst = src; \
876 break; \
877 case DCF_FUNCTION_SUM: \
878 dst += src; \
879 break; \
880 case DCF_FUNCTION_AVG: \
881 dst = (dst * count + src) / (count + 1); \
882 break; \
883 } \
884 }
885
886 /**
887 * Merge values
888 */
889 void DCTable::mergeValues(Table *dest, Table *src, int count)
890 {
891 for(int sRow = 0; sRow < src->getNumRows(); sRow++)
892 {
893 TCHAR instance[MAX_RESULT_LENGTH];
894
895 src->buildInstanceString(sRow, instance, MAX_RESULT_LENGTH);
896 int dRow = dest->findRowByInstance(instance);
897 if (dRow >= 0)
898 {
899 for(int j = 0; j < m_columns->size(); j++)
900 {
901 DCTableColumn *cd = m_columns->get(j);
902 if ((cd == NULL) || cd->isInstanceColumn() || (cd->getDataType() == DCI_DT_STRING))
903 continue;
904 int column = dest->getColumnIndex(cd->getName());
905 if (column == -1)
906 continue;
907
908 if (cd->getDataType() == DCI_DT_FLOAT)
909 {
910 double sval = src->getAsDouble(sRow, column);
911 double dval = dest->getAsDouble(dRow, column);
912
913 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
914
915 dest->setAt(dRow, column, dval);
916 }
917 else if ((cd->getDataType() == DCI_DT_UINT) || (cd->getDataType() == DCI_DT_UINT64))
918 {
919 UINT64 sval = src->getAsUInt64(sRow, column);
920 UINT64 dval = dest->getAsUInt64(dRow, column);
921
922 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
923
924 dest->setAt(dRow, column, dval);
925 }
926 else
927 {
928 INT64 sval = src->getAsInt64(sRow, column);
929 INT64 dval = dest->getAsInt64(dRow, column);
930
931 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
932
933 dest->setAt(dRow, column, dval);
934 }
935 }
936 }
937 else
938 {
939 // no such instance
940 dest->copyRow(src, sRow);
941 }
942 }
943 }
944
945 /**
946 * Update columns in resulting table according to definition
947 */
948 void DCTable::updateResultColumns(Table *t)
949 {
950 lock();
951 for(int i = 0; i < m_columns->size(); i++)
952 {
953 DCTableColumn *col = m_columns->get(i);
954 int index = t->getColumnIndex(col->getName());
955 if (index != -1)
956 {
957 TableColumnDefinition *cd = t->getColumnDefinitions()->get(index);
958 if (cd != NULL)
959 {
960 cd->setDataType(col->getDataType());
961 cd->setInstanceColumn(col->isInstanceColumn());
962 cd->setDisplayName(col->getDisplayName());
963 }
964 }
965 }
966 unlock();
967 }
968
969 /**
970 * Update from template item
971 */
972 void DCTable::updateFromTemplate(DCObject *src)
973 {
974 DCObject::updateFromTemplate(src);
975
976 if (src->getType() != DCO_TYPE_TABLE)
977 {
978 DbgPrintf(2, _T("INTERNAL ERROR: DCTable::updateFromTemplate(%d, %d): source type is %d"), (int)m_id, (int)src->getId(), src->getType());
979 return;
980 }
981
982 lock();
983 DCTable *table = (DCTable *)src;
984
985 m_columns->clear();
986 for(int i = 0; i < table->m_columns->size(); i++)
987 m_columns->add(new DCTableColumn(table->m_columns->get(i)));
988
989 m_thresholds->clear();
990 for(int i = 0; i < table->m_thresholds->size(); i++)
991 m_thresholds->add(new DCTableThreshold(table->m_thresholds->get(i)));
992
993 unlock();
994 }
995
996 /**
997 * Create management pack record
998 */
999 void DCTable::createNXMPRecord(String &str)
1000 {
1001 UINT32 i;
1002
1003 lock();
1004
1005 str.appendFormattedString(_T("\t\t\t\t<dctable id=\"%d\">\n")
1006 _T("\t\t\t\t\t<name>%s</name>\n")
1007 _T("\t\t\t\t\t<description>%s</description>\n")
1008 _T("\t\t\t\t\t<origin>%d</origin>\n")
1009 _T("\t\t\t\t\t<interval>%d</interval>\n")
1010 _T("\t\t\t\t\t<retention>%d</retention>\n")
1011 _T("\t\t\t\t\t<systemTag>%s</systemTag>\n")
1012 _T("\t\t\t\t\t<flags>%d</flags>\n")
1013 _T("\t\t\t\t\t<snmpPort>%d</snmpPort>\n"),
1014 (int)m_id, (const TCHAR *)EscapeStringForXML2(m_name),
1015 (const TCHAR *)EscapeStringForXML2(m_szDescription),
1016 (int)m_source, m_iPollingInterval, m_iRetentionTime,
1017 (const TCHAR *)EscapeStringForXML2(m_systemTag),
1018 (int)m_flags, (int)m_snmpPort);
1019
1020 if (m_transformationScriptSource != NULL)
1021 {
1022 str += _T("\t\t\t\t\t<transformation>");
1023 str.appendPreallocated(EscapeStringForXML(m_transformationScriptSource, -1));
1024 str += _T("</transformation>\n");
1025 }
1026
1027 if (m_dwNumSchedules > 0)
1028 {
1029 str += _T("\t\t\t\t\t<schedules>\n");
1030 for(i = 0; i < m_dwNumSchedules; i++)
1031 str.appendFormattedString(_T("\t\t\t\t\t\t<schedule>%s</schedule>\n"), (const TCHAR *)EscapeStringForXML2(m_ppScheduleList[i]));
1032 str += _T("\t\t\t\t\t</schedules>\n");
1033 }
1034
1035 if (m_columns != NULL)
1036 {
1037 str += _T("\t\t\t\t\t<columns>\n");
1038 for(i = 0; i < (UINT32)m_columns->size(); i++)
1039 {
1040 m_columns->get(i)->createNXMPRecord(str, i + 1);
1041 }
1042 str += _T("\t\t\t\t\t</columns>\n");
1043 }
1044
1045 if (m_thresholds != NULL)
1046 {
1047 str += _T("\t\t\t\t\t<thresholds>\n");
1048 for(i = 0; i < (UINT32)m_thresholds->size(); i++)
1049 {
1050 m_thresholds->get(i)->createNXMPRecord(str, i + 1);
1051 }
1052 str += _T("\t\t\t\t\t</thresholds>\n");
1053 }
1054
1055 if (m_pszPerfTabSettings != NULL)
1056 {
1057 str += _T("\t\t\t\t\t<perfTabSettings>");
1058 str.appendPreallocated(EscapeStringForXML(m_pszPerfTabSettings, -1));
1059 str += _T("</perfTabSettings>\n");
1060 }
1061
1062 unlock();
1063 str += _T("\t\t\t\t</dctable>\n");
1064 }