16d32ed6b4f3a6f5d3c8f2f672a75cf576f1334e
[public/netxms.git] / src / server / core / dctable.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dctable.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Column ID cache
27 */
28 TC_ID_MAP_ENTRY *DCTable::m_cache = NULL;
29 int DCTable::m_cacheSize = 0;
30 int DCTable::m_cacheAllocated = 0;
31 MUTEX DCTable::m_cacheMutex = MutexCreate();
32
33 /**
34 * Compare cache element's name to string key
35 */
36 static int CompareCacheElements(const void *key, const void *element)
37 {
38 return _tcsicmp((const TCHAR *)key, ((TC_ID_MAP_ENTRY *)element)->name);
39 }
40
41 /**
42 * Compare names of two cache elements
43 */
44 static int CompareCacheElements2(const void *e1, const void *e2)
45 {
46 return _tcsicmp(((TC_ID_MAP_ENTRY *)e1)->name, ((TC_ID_MAP_ENTRY *)e2)->name);
47 }
48
49 /**
50 * Get column ID from column name
51 */
52 INT32 DCTable::columnIdFromName(const TCHAR *name)
53 {
54 TC_ID_MAP_ENTRY buffer;
55
56 // check that column name is valid
57 if ((name == NULL) || (*name == 0))
58 return 0;
59
60 MutexLock(m_cacheMutex);
61
62 TC_ID_MAP_ENTRY *entry = (TC_ID_MAP_ENTRY *)bsearch(name, m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements);
63 if (entry == NULL)
64 {
65 // Not in cache, go to database
66 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
67
68 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT column_id FROM dct_column_names WHERE column_name=?"));
69 if (hStmt != NULL)
70 {
71 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
72 DB_RESULT hResult = DBSelectPrepared(hStmt);
73 if (hResult != NULL)
74 {
75 entry = &buffer;
76 nx_strncpy(entry->name, name, MAX_COLUMN_NAME);
77 if (DBGetNumRows(hResult) > 0)
78 {
79 // found in database
80 entry->id = DBGetFieldLong(hResult, 0, 0);
81 }
82 else
83 {
84 // no such column name in database
85 entry->id = CreateUniqueId(IDG_DCT_COLUMN);
86
87 // update database
88 DB_STATEMENT hStmt2 = DBPrepare(hdb, _T("INSERT INTO dct_column_names (column_id,column_name) VALUES (?,?)"));
89 if (hStmt2 != NULL)
90 {
91 DBBind(hStmt2, 1, DB_SQLTYPE_INTEGER, entry->id);
92 DBBind(hStmt2, 2, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
93 DBExecute(hStmt2);
94 DBFreeStatement(hStmt2);
95 }
96 }
97
98 DBFreeResult(hResult);
99
100 // Add to cache
101 if (m_cacheSize == m_cacheAllocated)
102 {
103 m_cacheAllocated += 16;
104 m_cache = (TC_ID_MAP_ENTRY *)realloc(m_cache, sizeof(TC_ID_MAP_ENTRY) * m_cacheAllocated);
105 }
106 memcpy(&m_cache[m_cacheSize++], entry, sizeof(TC_ID_MAP_ENTRY));
107 qsort(m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements2);
108
109 DbgPrintf(6, _T("DCTable::columnIdFromName(): column name %s added to cache, ID=%d"), name, (int)entry->id);
110 }
111 DBFreeStatement(hStmt);
112 }
113
114 DBConnectionPoolReleaseConnection(hdb);
115 }
116
117 MutexUnlock(m_cacheMutex);
118 return (entry != NULL) ? entry->id : 0;
119 }
120
121 /**
122 * Default constructor
123 */
124 DCTable::DCTable() : DCObject()
125 {
126 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
127 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
128 m_lastValue = NULL;
129 }
130
131 /**
132 * Copy constructor
133 */
134 DCTable::DCTable(const DCTable *src) : DCObject(src)
135 {
136 m_columns = new ObjectArray<DCTableColumn>(src->m_columns->size(), 8, true);
137 for(int i = 0; i < src->m_columns->size(); i++)
138 m_columns->add(new DCTableColumn(src->m_columns->get(i)));
139 m_thresholds = new ObjectArray<DCTableThreshold>(src->m_thresholds->size(), 4, true);
140 for(int i = 0; i < src->m_thresholds->size(); i++)
141 m_thresholds->add(new DCTableThreshold(src->m_thresholds->get(i)));
142 m_lastValue = NULL;
143 }
144
145 /**
146 * Constructor for creating new DCTable from scratch
147 */
148 DCTable::DCTable(UINT32 id, const TCHAR *name, int source, int pollingInterval, int retentionTime,
149 Template *node, const TCHAR *description, const TCHAR *systemTag)
150 : DCObject(id, name, source, pollingInterval, retentionTime, node, description, systemTag)
151 {
152 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
153 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
154 m_lastValue = NULL;
155 }
156
157 /**
158 * Constructor for creating DCTable from database
159 * Assumes that fields in SELECT query are in following order:
160 * item_id,template_id,template_item_id,name,
161 * description,flags,source,snmp_port,polling_interval,retention_time,
162 * status,system_tag,resource_id,proxy_node,perftab_settings,
163 * transformation_script,comments,guid
164 */
165 DCTable::DCTable(DB_HANDLE hdb, DB_RESULT hResult, int iRow, Template *pNode) : DCObject()
166 {
167 m_id = DBGetFieldULong(hResult, iRow, 0);
168 m_dwTemplateId = DBGetFieldULong(hResult, iRow, 1);
169 m_dwTemplateItemId = DBGetFieldULong(hResult, iRow, 2);
170 DBGetField(hResult, iRow, 3, m_name, MAX_ITEM_NAME);
171 DBGetField(hResult, iRow, 4, m_description, MAX_DB_STRING);
172 m_flags = (WORD)DBGetFieldLong(hResult, iRow, 5);
173 m_source = (BYTE)DBGetFieldLong(hResult, iRow, 6);
174 m_snmpPort = (WORD)DBGetFieldLong(hResult, iRow, 7);
175 m_iPollingInterval = DBGetFieldLong(hResult, iRow, 8);
176 m_iRetentionTime = DBGetFieldLong(hResult, iRow, 9);
177 m_status = (BYTE)DBGetFieldLong(hResult, iRow, 10);
178 DBGetField(hResult, iRow, 11, m_systemTag, MAX_DB_STRING);
179 m_dwResourceId = DBGetFieldULong(hResult, iRow, 12);
180 m_sourceNode = DBGetFieldULong(hResult, iRow, 13);
181 m_pszPerfTabSettings = DBGetField(hResult, iRow, 14, NULL, 0);
182 TCHAR *pszTmp = DBGetField(hResult, iRow, 15, NULL, 0);
183 m_comments = DBGetField(hResult, iRow, 16, NULL, 0);
184 m_guid = DBGetFieldGUID(hResult, iRow, 17);
185 setTransformationScript(pszTmp);
186 free(pszTmp);
187
188 m_owner = pNode;
189 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
190 m_lastValue = NULL;
191
192 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT column_name,flags,snmp_oid,display_name FROM dc_table_columns WHERE table_id=? ORDER BY sequence_number"));
193 if (hStmt != NULL)
194 {
195 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
196 DB_RESULT hColumnList = DBSelectPrepared(hStmt);
197 if (hColumnList != NULL)
198 {
199 int count = DBGetNumRows(hColumnList);
200 for(int i = 0; i < count; i++)
201 m_columns->add(new DCTableColumn(hColumnList, i));
202 DBFreeResult(hColumnList);
203 }
204 DBFreeStatement(hStmt);
205 }
206
207 loadCustomSchedules(hdb);
208
209 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
210 loadThresholds(hdb);
211 }
212
213 /**
214 * Create DCTable from import file
215 */
216 DCTable::DCTable(ConfigEntry *config, Template *owner) : DCObject(config, owner)
217 {
218 ConfigEntry *columnsRoot = config->findEntry(_T("columns"));
219 if (columnsRoot != NULL)
220 {
221 ObjectArray<ConfigEntry> *columns = columnsRoot->getSubEntries(_T("column#*"));
222 m_columns = new ObjectArray<DCTableColumn>(columns->size(), 8, true);
223 for(int i = 0; i < columns->size(); i++)
224 {
225 m_columns->add(new DCTableColumn(columns->get(i)));
226 }
227 delete columns;
228 }
229 else
230 {
231 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
232 }
233
234 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
235 if (thresholdsRoot != NULL)
236 {
237 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
238 m_thresholds = new ObjectArray<DCTableThreshold>(thresholds->size(), 8, true);
239 for(int i = 0; i < thresholds->size(); i++)
240 {
241 m_thresholds->add(new DCTableThreshold(thresholds->get(i)));
242 }
243 delete thresholds;
244 }
245 else
246 {
247 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
248 }
249
250 m_lastValue = NULL;
251 }
252
253 /**
254 * Destructor
255 */
256 DCTable::~DCTable()
257 {
258 delete m_columns;
259 delete m_thresholds;
260 if (m_lastValue != NULL)
261 m_lastValue->decRefCount();
262 }
263
264 /**
265 * Clean expired data
266 */
267 void DCTable::deleteExpiredData()
268 {
269 TCHAR query[256];
270 time_t now;
271
272 now = time(NULL);
273
274 lock();
275 _sntprintf(query, 256, _T("DELETE FROM tdata_%d WHERE (item_id=%d) AND (tdata_timestamp<%ld)"),
276 (int)m_owner->getId(), (int)m_id, (long)(now - (time_t)m_iRetentionTime * 86400));
277 unlock();
278
279 QueueSQLRequest(query);
280 }
281
282 /**
283 * Delete all collected data
284 */
285 bool DCTable::deleteAllData()
286 {
287 TCHAR szQuery[256];
288 bool success;
289
290 lock();
291 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
292 _sntprintf(szQuery, 256, _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_owner->getId(), (int)m_id);
293 success = DBQuery(hdb, szQuery) ? true : false;
294 DBConnectionPoolReleaseConnection(hdb);
295 unlock();
296 return success;
297 }
298
299 /**
300 * Process new collected value. Should return true on success.
301 * If returns false, current poll result will be converted into data collection error.
302 *
303 * @return true on success
304 */
305 bool DCTable::processNewValue(time_t timestamp, const void *value, bool *updateStatus)
306 {
307 *updateStatus = false;
308 lock();
309
310 // Normally m_owner shouldn't be NULL for polled items, but who knows...
311 if (m_owner == NULL)
312 {
313 unlock();
314 return false;
315 }
316
317 // Transform input value
318 // Cluster can have only aggregated data, and transformation
319 // should not be used on aggregation
320 if ((m_owner->getObjectClass() != OBJECT_CLUSTER) || (m_flags & DCF_TRANSFORM_AGGREGATED))
321 {
322 if (!transform((Table *)value))
323 {
324 unlock();
325 ((Table *)value)->decRefCount();
326 return false;
327 }
328 }
329
330 m_dwErrorCount = 0;
331 if (m_lastValue != NULL)
332 m_lastValue->decRefCount();
333 m_lastValue = (Table *)value;
334 m_lastValue->setTitle(m_description);
335 m_lastValue->setSource(m_source);
336
337 // Copy required fields into local variables
338 UINT32 tableId = m_id;
339 UINT32 nodeId = m_owner->getId();
340 bool save = (m_flags & DCF_NO_STORAGE) == 0;
341
342 unlock();
343
344 // Save data to database
345 // Object is unlocked, so only local variables can be used
346 if (save)
347 {
348 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
349 if (!DBBegin(hdb))
350 {
351 DBConnectionPoolReleaseConnection(hdb);
352 return true;
353 }
354
355 INT64 recordId = ((INT64)timestamp << 30) | (((INT64)tableId & 0xFFFF) << 14);
356 BOOL success = FALSE;
357 Table *data = (Table *)value;
358
359 TCHAR query[256];
360 _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,record_id) VALUES (?,?,?)"), (int)nodeId);
361 DB_STATEMENT hStmt = DBPrepare(hdb, query);
362 if (hStmt != NULL)
363 {
364 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, tableId);
365 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)timestamp);
366 DBBind(hStmt, 3, DB_SQLTYPE_BIGINT, recordId);
367 success = DBExecute(hStmt);
368 DBFreeStatement(hStmt);
369 }
370
371 if (success)
372 {
373 _sntprintf(query, 256, _T("INSERT INTO tdata_records_%d (record_id,row_id,instance) VALUES (?,?,?)"), (int)nodeId);
374 DB_STATEMENT hStmt = DBPrepare(hdb, query);
375 if (hStmt != NULL)
376 {
377 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId);
378 for(int row = 0; row < data->getNumRows(); row++)
379 {
380 TCHAR instance[MAX_RESULT_LENGTH];
381 data->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
382 DBBind(hStmt, 2, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
383 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, instance, DB_BIND_STATIC);
384 success = DBExecute(hStmt);
385 if (!success)
386 break;
387 }
388 DBFreeStatement(hStmt);
389 }
390 }
391
392 if (success)
393 {
394 _sntprintf(query, 256, _T("INSERT INTO tdata_rows_%d (row_id,column_id,value) VALUES (?,?,?)"), (int)nodeId);
395 DB_STATEMENT hStmt = DBPrepare(hdb, query);
396 if (hStmt != NULL)
397 {
398 for(int col = 0; col < data->getNumColumns(); col++)
399 {
400 INT32 colId = columnIdFromName(data->getColumnName(col));
401 if (colId == 0)
402 continue; // cannot get column ID
403
404 for(int row = 0; row < data->getNumRows(); row++)
405 {
406 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
407 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, colId);
408 const TCHAR *s = data->getAsString(row, col);
409 if ((s == NULL) || (_tcslen(s) < MAX_DB_STRING))
410 {
411 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, s, DB_BIND_STATIC);
412 }
413 else
414 {
415 TCHAR *sp = (TCHAR *)nx_memdup(s, MAX_DB_STRING * sizeof(TCHAR));
416 sp[MAX_DB_STRING - 1] = 0;
417 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, sp, DB_BIND_DYNAMIC);
418 }
419 success = DBExecute(hStmt);
420 if (!success)
421 break;
422 }
423 }
424 DBFreeStatement(hStmt);
425 }
426 }
427
428 if (success)
429 DBCommit(hdb);
430 else
431 DBRollback(hdb);
432
433 DBConnectionPoolReleaseConnection(hdb);
434 }
435 if ((g_offlineDataRelevanceTime <= 0) || (timestamp > (time(NULL) - g_offlineDataRelevanceTime)))
436 checkThresholds((Table *)value);
437
438 if (g_flags & AF_PERFDATA_STORAGE_DRIVER_LOADED)
439 PerfDataStorageRequest(this, timestamp, (Table *)value);
440
441 return true;
442 }
443
444 /**
445 * Transform received value
446 */
447 bool DCTable::transform(Table *value)
448 {
449 if (m_transformationScript == NULL)
450 return true;
451
452 bool success = false;
453 NXSL_VM *vm = new NXSL_VM(new NXSL_ServerEnv());
454 if (vm->load(m_transformationScript))
455 {
456 NXSL_Value *nxslValue = new NXSL_Value(new NXSL_Object(&g_nxslStaticTableClass, value));
457 vm->setGlobalVariable(_T("$object"), m_owner->createNXSLObject());
458 if (m_owner->getObjectClass() == OBJECT_NODE)
459 {
460 vm->setGlobalVariable(_T("$node"), m_owner->createNXSLObject());
461 }
462 vm->setGlobalVariable(_T("$dci"), createNXSLObject());
463 vm->setGlobalVariable(_T("$isCluster"), new NXSL_Value((m_owner->getObjectClass() == OBJECT_CLUSTER) ? 1 : 0));
464
465 // remove lock from DCI for script execution to avoid deadlocks
466 unlock();
467 success = vm->run(1, &nxslValue);
468 lock();
469 if (!success)
470 {
471 if (vm->getErrorCode() == NXSL_ERR_EXECUTION_ABORTED)
472 {
473 DbgPrintf(6, _T("Transformation script for DCI \"%s\" [%d] on node %s [%d] aborted"),
474 m_description, m_id, getOwnerName(), getOwnerId());
475 }
476 else
477 {
478 TCHAR buffer[1024];
479 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
480 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
481 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
482 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
483 }
484 }
485 }
486 else
487 {
488 TCHAR buffer[1024];
489 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
490 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
491 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
492 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
493 }
494 delete vm;
495 return success;
496 }
497
498 /**
499 * Check thresholds
500 */
501 void DCTable::checkThresholds(Table *value)
502 {
503 static const TCHAR *paramNames[] = { _T("dciName"), _T("dciDescription"), _T("dciId"), _T("row"), _T("instance") };
504
505 lock();
506 for(int row = 0; row < value->getNumRows(); row++)
507 {
508 TCHAR instance[MAX_RESULT_LENGTH];
509 value->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
510 for(int i = 0; i < m_thresholds->size(); i++)
511 {
512 DCTableThreshold *t = m_thresholds->get(i);
513 ThresholdCheckResult result = t->check(value, row, instance);
514 switch(result)
515 {
516 case ACTIVATED:
517 PostDciEventWithNames(t->getActivationEvent(), m_owner->getId(), m_id, "ssids", paramNames, m_name, m_description, m_id, row, instance);
518 if (!(m_flags & DCF_ALL_THRESHOLDS))
519 i = m_thresholds->size(); // Stop processing (for current row)
520 break;
521 case DEACTIVATED:
522 PostDciEventWithNames(t->getDeactivationEvent(), m_owner->getId(), m_id, "ssids", paramNames, m_name, m_description, m_id, row, instance);
523 break;
524 case ALREADY_ACTIVE:
525 i = m_thresholds->size(); // Threshold condition still true, stop processing
526 break;
527 default:
528 break;
529 }
530 }
531 }
532 unlock();
533 }
534
535 /**
536 * Process new data collection error
537 */
538 void DCTable::processNewError(bool noInstance)
539 {
540 m_dwErrorCount++;
541 }
542
543 /**
544 * Save to database
545 */
546 bool DCTable::saveToDatabase(DB_HANDLE hdb)
547 {
548 DB_STATEMENT hStmt;
549 if (IsDatabaseRecordExist(hdb, _T("dc_tables"), _T("item_id"), m_id))
550 {
551 hStmt = DBPrepare(hdb, _T("UPDATE dc_tables SET node_id=?,template_id=?,template_item_id=?,name=?,")
552 _T("description=?,flags=?,source=?,snmp_port=?,polling_interval=?,")
553 _T("retention_time=?,status=?,system_tag=?,resource_id=?,proxy_node=?,")
554 _T("perftab_settings=?,transformation_script=?,comments=?,guid=? WHERE item_id=?"));
555 }
556 else
557 {
558 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_tables (node_id,template_id,template_item_id,name,")
559 _T("description,flags,source,snmp_port,polling_interval,")
560 _T("retention_time,status,system_tag,resource_id,proxy_node,perftab_settings,")
561 _T("transformation_script,comments,guid,item_id) ")
562 _T("VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
563 }
564 if (hStmt == NULL)
565 return FALSE;
566
567 lock();
568
569 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (m_owner == NULL) ? (UINT32)0 : m_owner->getId());
570 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_dwTemplateId);
571 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, m_dwTemplateItemId);
572 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
573 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_description, DB_BIND_STATIC);
574 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (UINT32)m_flags);
575 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (INT32)m_source);
576 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (UINT32)m_snmpPort);
577 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (INT32)m_iPollingInterval);
578 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (INT32)m_iRetentionTime);
579 DBBind(hStmt, 11, DB_SQLTYPE_INTEGER, (INT32)m_status);
580 DBBind(hStmt, 12, DB_SQLTYPE_VARCHAR, m_systemTag, DB_BIND_STATIC);
581 DBBind(hStmt, 13, DB_SQLTYPE_INTEGER, m_dwResourceId);
582 DBBind(hStmt, 14, DB_SQLTYPE_INTEGER, m_sourceNode);
583 DBBind(hStmt, 15, DB_SQLTYPE_TEXT, m_pszPerfTabSettings, DB_BIND_STATIC);
584 DBBind(hStmt, 16, DB_SQLTYPE_TEXT, m_transformationScriptSource, DB_BIND_STATIC);
585 DBBind(hStmt, 17, DB_SQLTYPE_TEXT, m_comments, DB_BIND_STATIC);
586 DBBind(hStmt, 18, DB_SQLTYPE_VARCHAR, m_guid);
587 DBBind(hStmt, 19, DB_SQLTYPE_INTEGER, m_id);
588
589 bool result = DBExecute(hStmt);
590 DBFreeStatement(hStmt);
591
592 if (result)
593 {
594 // Save column configuration
595 hStmt = DBPrepare(hdb, _T("DELETE FROM dc_table_columns WHERE table_id=?"));
596 if (hStmt != NULL)
597 {
598 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
599 result = DBExecute(hStmt);
600 DBFreeStatement(hStmt);
601 }
602 else
603 {
604 result = false;
605 }
606
607 if (result && (m_columns->size() > 0))
608 {
609 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_table_columns (table_id,sequence_number,column_name,snmp_oid,flags,display_name) VALUES (?,?,?,?,?,?)"));
610 if (hStmt != NULL)
611 {
612 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
613 for(int i = 0; i < m_columns->size(); i++)
614 {
615 DCTableColumn *column = m_columns->get(i);
616 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)(i + 1));
617 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, column->getName(), DB_BIND_STATIC);
618 SNMP_ObjectId *oid = column->getSnmpOid();
619 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, (oid != NULL) ? (const TCHAR *)oid->toString() : NULL, DB_BIND_TRANSIENT);
620 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (INT32)column->getFlags());
621 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, column->getDisplayName(), DB_BIND_STATIC);
622
623 result = DBExecute(hStmt);
624 if (!result)
625 break;
626 }
627 DBFreeStatement(hStmt);
628 }
629 else
630 {
631 result = false;
632 }
633 }
634 }
635
636 saveThresholds(hdb);
637
638 unlock();
639 return result ? DCObject::saveToDatabase(hdb) : false;
640 }
641
642 /**
643 * Load thresholds from database
644 */
645 bool DCTable::loadThresholds(DB_HANDLE hdb)
646 {
647 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT id,activation_event,deactivation_event FROM dct_thresholds WHERE table_id=? ORDER BY sequence_number"));
648 if (hStmt == NULL)
649 return false;
650
651 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
652 DB_RESULT hResult = DBSelectPrepared(hStmt);
653 if (hResult != NULL)
654 {
655 int count = DBGetNumRows(hResult);
656 for(int i = 0; i < count; i++)
657 {
658 DCTableThreshold *t = new DCTableThreshold(hdb, hResult, i);
659 m_thresholds->add(t);
660 }
661 DBFreeResult(hResult);
662 }
663 DBFreeStatement(hStmt);
664 return true;
665 }
666
667 /**
668 * Save thresholds to database
669 */
670 bool DCTable::saveThresholds(DB_HANDLE hdb)
671 {
672 DB_STATEMENT hStmt = DBPrepare(hdb, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=?"));
673 if (hStmt == NULL)
674 return false;
675
676 for(int i = 0; i < m_thresholds->size(); i++)
677 {
678 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_thresholds->get(i)->getId());
679 DBExecute(hStmt);
680 }
681 DBFreeStatement(hStmt);
682
683 hStmt = DBPrepare(hdb, _T("DELETE FROM dct_thresholds WHERE table_id=?"));
684 if (hStmt == NULL)
685 return false;
686 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
687 DBExecute(hStmt);
688 DBFreeStatement(hStmt);
689
690 for(int i = 0; i < m_thresholds->size(); i++)
691 m_thresholds->get(i)->saveToDatabase(hdb, m_id, i);
692 return true;
693 }
694
695 /**
696 * Delete table object and collected data from database
697 */
698 void DCTable::deleteFromDatabase()
699 {
700 TCHAR szQuery[256];
701
702 DCObject::deleteFromDatabase();
703
704 if(m_owner->getObjectClass() != OBJECT_TEMPLATE)
705 {
706 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_owner->getId(), (int)m_id);
707 QueueSQLRequest(szQuery);
708 }
709
710 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_tables WHERE item_id=%d"), (int)m_id);
711 QueueSQLRequest(szQuery);
712 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_table_columns WHERE table_id=%d"), (int)m_id);
713 QueueSQLRequest(szQuery);
714
715 for(int i = 0; i < m_thresholds->size(); i++)
716 {
717 _sntprintf(szQuery, 256, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=%d"), (int)m_thresholds->get(i)->getId());
718 QueueSQLRequest(szQuery);
719 }
720
721 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dct_thresholds WHERE table_id=%d"), (int)m_id);
722 QueueSQLRequest(szQuery);
723 }
724
725 /**
726 * Create NXCP message with item data
727 */
728 void DCTable::createMessage(NXCPMessage *pMsg)
729 {
730 DCObject::createMessage(pMsg);
731
732 lock();
733 pMsg->setField(VID_NUM_COLUMNS, (UINT32)m_columns->size());
734 UINT32 varId = VID_DCI_COLUMN_BASE;
735 for(int i = 0; i < m_columns->size(); i++)
736 {
737 DCTableColumn *column = m_columns->get(i);
738 pMsg->setField(varId++, column->getName());
739 pMsg->setField(varId++, column->getFlags());
740 SNMP_ObjectId *oid = column->getSnmpOid();
741 if (oid != NULL)
742 pMsg->setFieldFromInt32Array(varId++, (UINT32)oid->length(), oid->value());
743 else
744 varId++;
745 pMsg->setField(varId++, column->getDisplayName());
746 varId += 6;
747 }
748
749 pMsg->setField(VID_NUM_THRESHOLDS, (UINT32)m_thresholds->size());
750 varId = VID_DCI_THRESHOLD_BASE;
751 for(int i = 0; i < m_thresholds->size(); i++)
752 {
753 varId = m_thresholds->get(i)->fillMessage(pMsg, varId);
754 }
755
756 unlock();
757 }
758
759 /**
760 * Update data collection object from NXCP message
761 */
762 void DCTable::updateFromMessage(NXCPMessage *pMsg)
763 {
764 DCObject::updateFromMessage(pMsg);
765
766 lock();
767
768 m_columns->clear();
769 int count = (int)pMsg->getFieldAsUInt32(VID_NUM_COLUMNS);
770 UINT32 varId = VID_DCI_COLUMN_BASE;
771 for(int i = 0; i < count; i++)
772 {
773 m_columns->add(new DCTableColumn(pMsg, varId));
774 varId += 10;
775 }
776
777 count = (int)pMsg->getFieldAsUInt32(VID_NUM_THRESHOLDS);
778 ObjectArray<DCTableThreshold> *newThresholds = new ObjectArray<DCTableThreshold>(count, 8, true);
779 varId = VID_DCI_THRESHOLD_BASE;
780 for(int i = 0; i < count; i++)
781 {
782 DCTableThreshold *t = new DCTableThreshold(pMsg, &varId);
783 newThresholds->add(t);
784 for(int j = 0; j < m_thresholds->size(); j++)
785 {
786 DCTableThreshold *old = m_thresholds->get(j);
787 if (old->getId() == t->getId())
788 {
789 t->copyState(old);
790 break;
791 }
792 }
793 }
794 delete m_thresholds;
795 m_thresholds = newThresholds;
796
797 unlock();
798 }
799
800 /**
801 * Get last collected value
802 */
803 void DCTable::fillLastValueMessage(NXCPMessage *msg)
804 {
805 lock();
806 if (m_lastValue != NULL)
807 {
808 m_lastValue->fillMessage(*msg, 0, -1);
809 }
810 unlock();
811 }
812
813 /**
814 * Get summary of last collected value (to show along simple DCI values)
815 */
816 void DCTable::fillLastValueSummaryMessage(NXCPMessage *pMsg, UINT32 dwId)
817 {
818 lock();
819 pMsg->setField(dwId++, m_id);
820 pMsg->setField(dwId++, m_name);
821 pMsg->setField(dwId++, m_description);
822 pMsg->setField(dwId++, (WORD)m_source);
823 pMsg->setField(dwId++, (WORD)DCI_DT_NULL); // compatibility: data type
824 pMsg->setField(dwId++, _T("")); // compatibility: value
825 pMsg->setField(dwId++, (UINT32)m_tLastPoll);
826 pMsg->setField(dwId++, (WORD)(matchClusterResource() ? m_status : ITEM_STATUS_DISABLED)); // show resource-bound DCIs as inactive if cluster resource is not on this node
827 pMsg->setField(dwId++, (WORD)getType());
828 pMsg->setField(dwId++, m_dwErrorCount);
829 pMsg->setField(dwId++, m_dwTemplateItemId);
830 pMsg->setField(dwId++, (WORD)0); // compatibility: number of thresholds
831
832 unlock();
833 }
834
835 /**
836 * Get data type of given column
837 */
838 int DCTable::getColumnDataType(const TCHAR *name)
839 {
840 int dt = DCI_DT_STRING;
841 bool found = false;
842
843 lock();
844
845 // look in column definition first
846 for(int i = 0; i < m_columns->size(); i++)
847 {
848 DCTableColumn *column = m_columns->get(i);
849 if (!_tcsicmp(column->getName(), name))
850 {
851 dt = column->getDataType();
852 break;
853 }
854 }
855
856 // use last values if not found in definitions
857 if (!found && (m_lastValue != NULL))
858 {
859 int index = m_lastValue->getColumnIndex(name);
860 if (index != -1)
861 dt = m_lastValue->getColumnDataType(index);
862 }
863
864 unlock();
865 return dt;
866 }
867
868 /**
869 * Get last collected value
870 */
871 Table *DCTable::getLastValue()
872 {
873 lock();
874 Table *value;
875 if (m_lastValue != NULL)
876 {
877 value = m_lastValue;
878 value->incRefCount();
879 }
880 else
881 {
882 value = NULL;
883 }
884 unlock();
885 return value;
886 }
887
888 /**
889 * Update destination value from source value
890 */
891 #define RECALCULATE_VALUE(dst, src, func, count) \
892 { \
893 switch(func) \
894 { \
895 case DCF_FUNCTION_MIN: \
896 if (src < dst) dst = src; \
897 break; \
898 case DCF_FUNCTION_MAX: \
899 if (src > dst) dst = src; \
900 break; \
901 case DCF_FUNCTION_SUM: \
902 dst += src; \
903 break; \
904 case DCF_FUNCTION_AVG: \
905 dst = (dst * count + src) / (count + 1); \
906 break; \
907 } \
908 }
909
910 /**
911 * Merge values
912 */
913 void DCTable::mergeValues(Table *dest, Table *src, int count)
914 {
915 for(int sRow = 0; sRow < src->getNumRows(); sRow++)
916 {
917 TCHAR instance[MAX_RESULT_LENGTH];
918
919 src->buildInstanceString(sRow, instance, MAX_RESULT_LENGTH);
920 int dRow = dest->findRowByInstance(instance);
921 if (dRow >= 0)
922 {
923 for(int j = 0; j < m_columns->size(); j++)
924 {
925 DCTableColumn *cd = m_columns->get(j);
926 if ((cd == NULL) || cd->isInstanceColumn() || (cd->getDataType() == DCI_DT_STRING))
927 continue;
928 int column = dest->getColumnIndex(cd->getName());
929 if (column == -1)
930 continue;
931
932 if (cd->getDataType() == DCI_DT_FLOAT)
933 {
934 double sval = src->getAsDouble(sRow, column);
935 double dval = dest->getAsDouble(dRow, column);
936
937 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
938
939 dest->setAt(dRow, column, dval);
940 }
941 else if ((cd->getDataType() == DCI_DT_UINT) || (cd->getDataType() == DCI_DT_UINT64))
942 {
943 UINT64 sval = src->getAsUInt64(sRow, column);
944 UINT64 dval = dest->getAsUInt64(dRow, column);
945
946 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
947
948 dest->setAt(dRow, column, dval);
949 }
950 else
951 {
952 INT64 sval = src->getAsInt64(sRow, column);
953 INT64 dval = dest->getAsInt64(dRow, column);
954
955 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
956
957 dest->setAt(dRow, column, dval);
958 }
959 }
960 }
961 else
962 {
963 // no such instance
964 dest->copyRow(src, sRow);
965 }
966 }
967 }
968
969 /**
970 * Update columns in resulting table according to definition
971 */
972 void DCTable::updateResultColumns(Table *t)
973 {
974 lock();
975 for(int i = 0; i < m_columns->size(); i++)
976 {
977 DCTableColumn *col = m_columns->get(i);
978 int index = t->getColumnIndex(col->getName());
979 if (index != -1)
980 {
981 TableColumnDefinition *cd = t->getColumnDefinitions()->get(index);
982 if (cd != NULL)
983 {
984 cd->setDataType(col->getDataType());
985 cd->setInstanceColumn(col->isInstanceColumn());
986 cd->setDisplayName(col->getDisplayName());
987 }
988 }
989 }
990 unlock();
991 }
992
993 /**
994 * Update from template item
995 */
996 void DCTable::updateFromTemplate(DCObject *src)
997 {
998 DCObject::updateFromTemplate(src);
999
1000 if (src->getType() != DCO_TYPE_TABLE)
1001 {
1002 DbgPrintf(2, _T("INTERNAL ERROR: DCTable::updateFromTemplate(%d, %d): source type is %d"), (int)m_id, (int)src->getId(), src->getType());
1003 return;
1004 }
1005
1006 lock();
1007 DCTable *table = (DCTable *)src;
1008
1009 m_columns->clear();
1010 for(int i = 0; i < table->m_columns->size(); i++)
1011 m_columns->add(new DCTableColumn(table->m_columns->get(i)));
1012
1013 m_thresholds->clear();
1014 for(int i = 0; i < table->m_thresholds->size(); i++)
1015 m_thresholds->add(new DCTableThreshold(table->m_thresholds->get(i)));
1016
1017 unlock();
1018 }
1019
1020 /**
1021 * Create management pack record
1022 */
1023 void DCTable::createExportRecord(String &str)
1024 {
1025 lock();
1026
1027 str.appendFormattedString(_T("\t\t\t\t<dctable id=\"%d\">\n")
1028 _T("\t\t\t\t\t<guid>%s</guid>\n")
1029 _T("\t\t\t\t\t<name>%s</name>\n")
1030 _T("\t\t\t\t\t<description>%s</description>\n")
1031 _T("\t\t\t\t\t<origin>%d</origin>\n")
1032 _T("\t\t\t\t\t<interval>%d</interval>\n")
1033 _T("\t\t\t\t\t<retention>%d</retention>\n")
1034 _T("\t\t\t\t\t<systemTag>%s</systemTag>\n")
1035 _T("\t\t\t\t\t<flags>%d</flags>\n")
1036 _T("\t\t\t\t\t<snmpPort>%d</snmpPort>\n"),
1037 (int)m_id, (const TCHAR *)m_guid.toString(),
1038 (const TCHAR *)EscapeStringForXML2(m_name),
1039 (const TCHAR *)EscapeStringForXML2(m_description),
1040 (int)m_source, m_iPollingInterval, m_iRetentionTime,
1041 (const TCHAR *)EscapeStringForXML2(m_systemTag),
1042 (int)m_flags, (int)m_snmpPort);
1043
1044 if (m_transformationScriptSource != NULL)
1045 {
1046 str.append(_T("\t\t\t\t\t<transformation>"));
1047 str.appendPreallocated(EscapeStringForXML(m_transformationScriptSource, -1));
1048 str.append(_T("</transformation>\n"));
1049 }
1050
1051 if ((m_schedules != NULL) && (m_schedules->size() > 0))
1052 {
1053 str.append(_T("\t\t\t\t\t<schedules>\n"));
1054 for(int i = 0; i < m_schedules->size(); i++)
1055 {
1056 str.append(_T("\t\t\t\t\t\t<schedule>"));
1057 str.append(EscapeStringForXML2(m_schedules->get(i)));
1058 str.append(_T("</schedule>\n"));
1059 }
1060 str.append(_T("\t\t\t\t\t</schedules>\n"));
1061 }
1062
1063 if (m_columns != NULL)
1064 {
1065 str += _T("\t\t\t\t\t<columns>\n");
1066 for(int i = 0; i < m_columns->size(); i++)
1067 {
1068 m_columns->get(i)->createNXMPRecord(str, i + 1);
1069 }
1070 str += _T("\t\t\t\t\t</columns>\n");
1071 }
1072
1073 if (m_thresholds != NULL)
1074 {
1075 str += _T("\t\t\t\t\t<thresholds>\n");
1076 for(int i = 0; i < m_thresholds->size(); i++)
1077 {
1078 m_thresholds->get(i)->createNXMPRecord(str, i + 1);
1079 }
1080 str += _T("\t\t\t\t\t</thresholds>\n");
1081 }
1082
1083 if (m_pszPerfTabSettings != NULL)
1084 {
1085 str.append(_T("\t\t\t\t\t<perfTabSettings>"));
1086 str.appendPreallocated(EscapeStringForXML(m_pszPerfTabSettings, -1));
1087 str.append(_T("</perfTabSettings>\n"));
1088 }
1089
1090 unlock();
1091 str.append(_T("\t\t\t\t</dctable>\n"));
1092 }
1093
1094 /**
1095 * Create DCObject from import file
1096 */
1097 void DCTable::updateFromImport(ConfigEntry *config)
1098 {
1099 DCObject::updateFromImport(config);
1100
1101 lock();
1102 m_columns->clear();
1103 ConfigEntry *columnsRoot = config->findEntry(_T("columns"));
1104 if (columnsRoot != NULL)
1105 {
1106 ObjectArray<ConfigEntry> *columns = columnsRoot->getSubEntries(_T("column#*"));
1107 for(int i = 0; i < columns->size(); i++)
1108 {
1109 m_columns->add(new DCTableColumn(columns->get(i)));
1110 }
1111 delete columns;
1112 }
1113
1114 m_thresholds->clear();
1115 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
1116 if (thresholdsRoot != NULL)
1117 {
1118 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
1119 for(int i = 0; i < thresholds->size(); i++)
1120 {
1121 m_thresholds->add(new DCTableThreshold(thresholds->get(i)));
1122 }
1123 delete thresholds;
1124 }
1125 unlock();
1126 }
1127
1128 /**
1129 * Should return true if object has (or can have) value
1130 */
1131 bool DCTable::hasValue()
1132 {
1133 if (m_owner->getObjectClass() == OBJECT_CLUSTER)
1134 return isAggregateOnCluster();
1135 return true;
1136 }