fixed server crash when reconciling table DCI data with agent
[public/netxms.git] / src / server / core / dctable.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dctable.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Column ID cache
27 */
28 TC_ID_MAP_ENTRY *DCTable::m_cache = NULL;
29 int DCTable::m_cacheSize = 0;
30 int DCTable::m_cacheAllocated = 0;
31 MUTEX DCTable::m_cacheMutex = MutexCreate();
32
33 /**
34 * Compare cache element's name to string key
35 */
36 static int CompareCacheElements(const void *key, const void *element)
37 {
38 return _tcsicmp((const TCHAR *)key, ((TC_ID_MAP_ENTRY *)element)->name);
39 }
40
41 /**
42 * Compare names of two cache elements
43 */
44 static int CompareCacheElements2(const void *e1, const void *e2)
45 {
46 return _tcsicmp(((TC_ID_MAP_ENTRY *)e1)->name, ((TC_ID_MAP_ENTRY *)e2)->name);
47 }
48
49 /**
50 * Get column ID from column name
51 */
52 INT32 DCTable::columnIdFromName(const TCHAR *name)
53 {
54 TC_ID_MAP_ENTRY buffer;
55
56 // check that column name is valid
57 if ((name == NULL) || (*name == 0))
58 return 0;
59
60 MutexLock(m_cacheMutex);
61
62 TC_ID_MAP_ENTRY *entry = (TC_ID_MAP_ENTRY *)bsearch(name, m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements);
63 if (entry == NULL)
64 {
65 // Not in cache, go to database
66 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
67
68 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT column_id FROM dct_column_names WHERE column_name=?"));
69 if (hStmt != NULL)
70 {
71 DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
72 DB_RESULT hResult = DBSelectPrepared(hStmt);
73 if (hResult != NULL)
74 {
75 entry = &buffer;
76 nx_strncpy(entry->name, name, MAX_COLUMN_NAME);
77 if (DBGetNumRows(hResult) > 0)
78 {
79 // found in database
80 entry->id = DBGetFieldLong(hResult, 0, 0);
81 }
82 else
83 {
84 // no such column name in database
85 entry->id = CreateUniqueId(IDG_DCT_COLUMN);
86
87 // update database
88 DB_STATEMENT hStmt2 = DBPrepare(hdb, _T("INSERT INTO dct_column_names (column_id,column_name) VALUES (?,?)"));
89 if (hStmt2 != NULL)
90 {
91 DBBind(hStmt2, 1, DB_SQLTYPE_INTEGER, entry->id);
92 DBBind(hStmt2, 2, DB_SQLTYPE_VARCHAR, name, DB_BIND_STATIC);
93 DBExecute(hStmt2);
94 DBFreeStatement(hStmt2);
95 }
96 }
97
98 DBFreeResult(hResult);
99
100 // Add to cache
101 if (m_cacheSize == m_cacheAllocated)
102 {
103 m_cacheAllocated += 16;
104 m_cache = (TC_ID_MAP_ENTRY *)realloc(m_cache, sizeof(TC_ID_MAP_ENTRY) * m_cacheAllocated);
105 }
106 memcpy(&m_cache[m_cacheSize++], entry, sizeof(TC_ID_MAP_ENTRY));
107 qsort(m_cache, m_cacheSize, sizeof(TC_ID_MAP_ENTRY), CompareCacheElements2);
108
109 DbgPrintf(6, _T("DCTable::columnIdFromName(): column name %s added to cache, ID=%d"), name, (int)entry->id);
110 }
111 DBFreeStatement(hStmt);
112 }
113
114 DBConnectionPoolReleaseConnection(hdb);
115 }
116
117 MutexUnlock(m_cacheMutex);
118 return (entry != NULL) ? entry->id : 0;
119 }
120
121 /**
122 * Default constructor
123 */
124 DCTable::DCTable() : DCObject()
125 {
126 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
127 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
128 m_lastValue = NULL;
129 }
130
131 /**
132 * Copy constructor
133 */
134 DCTable::DCTable(const DCTable *src) : DCObject(src)
135 {
136 m_columns = new ObjectArray<DCTableColumn>(src->m_columns->size(), 8, true);
137 for(int i = 0; i < src->m_columns->size(); i++)
138 m_columns->add(new DCTableColumn(src->m_columns->get(i)));
139 m_thresholds = new ObjectArray<DCTableThreshold>(src->m_thresholds->size(), 4, true);
140 for(int i = 0; i < src->m_thresholds->size(); i++)
141 m_thresholds->add(new DCTableThreshold(src->m_thresholds->get(i)));
142 m_lastValue = NULL;
143 }
144
145 /**
146 * Constructor for creating new DCTable from scratch
147 */
148 DCTable::DCTable(UINT32 id, const TCHAR *name, int source, int pollingInterval, int retentionTime,
149 Template *node, const TCHAR *description, const TCHAR *systemTag)
150 : DCObject(id, name, source, pollingInterval, retentionTime, node, description, systemTag)
151 {
152 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
153 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
154 m_lastValue = NULL;
155 }
156
157 /**
158 * Constructor for creating DCTable from database
159 * Assumes that fields in SELECT query are in following order:
160 * item_id,template_id,template_item_id,name,
161 * description,flags,source,snmp_port,polling_interval,retention_time,
162 * status,system_tag,resource_id,proxy_node,perftab_settings,
163 * transformation_script,comments,guid
164 */
165 DCTable::DCTable(DB_HANDLE hdb, DB_RESULT hResult, int iRow, Template *pNode) : DCObject()
166 {
167 m_id = DBGetFieldULong(hResult, iRow, 0);
168 m_dwTemplateId = DBGetFieldULong(hResult, iRow, 1);
169 m_dwTemplateItemId = DBGetFieldULong(hResult, iRow, 2);
170 DBGetField(hResult, iRow, 3, m_name, MAX_ITEM_NAME);
171 DBGetField(hResult, iRow, 4, m_description, MAX_DB_STRING);
172 m_flags = (WORD)DBGetFieldLong(hResult, iRow, 5);
173 m_source = (BYTE)DBGetFieldLong(hResult, iRow, 6);
174 m_snmpPort = (WORD)DBGetFieldLong(hResult, iRow, 7);
175 m_iPollingInterval = DBGetFieldLong(hResult, iRow, 8);
176 m_iRetentionTime = DBGetFieldLong(hResult, iRow, 9);
177 m_status = (BYTE)DBGetFieldLong(hResult, iRow, 10);
178 DBGetField(hResult, iRow, 11, m_systemTag, MAX_DB_STRING);
179 m_dwResourceId = DBGetFieldULong(hResult, iRow, 12);
180 m_sourceNode = DBGetFieldULong(hResult, iRow, 13);
181 m_pszPerfTabSettings = DBGetField(hResult, iRow, 14, NULL, 0);
182 TCHAR *pszTmp = DBGetField(hResult, iRow, 15, NULL, 0);
183 m_comments = DBGetField(hResult, iRow, 16, NULL, 0);
184 m_guid = DBGetFieldGUID(hResult, iRow, 17);
185 setTransformationScript(pszTmp);
186 free(pszTmp);
187
188 m_owner = pNode;
189 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
190 m_lastValue = NULL;
191
192 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT column_name,flags,snmp_oid,display_name FROM dc_table_columns WHERE table_id=? ORDER BY sequence_number"));
193 if (hStmt != NULL)
194 {
195 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
196 DB_RESULT hColumnList = DBSelectPrepared(hStmt);
197 if (hColumnList != NULL)
198 {
199 int count = DBGetNumRows(hColumnList);
200 for(int i = 0; i < count; i++)
201 m_columns->add(new DCTableColumn(hColumnList, i));
202 DBFreeResult(hColumnList);
203 }
204 DBFreeStatement(hStmt);
205 }
206
207 loadCustomSchedules(hdb);
208
209 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
210 loadThresholds(hdb);
211 }
212
213 /**
214 * Create DCTable from import file
215 */
216 DCTable::DCTable(ConfigEntry *config, Template *owner) : DCObject(config, owner)
217 {
218 ConfigEntry *columnsRoot = config->findEntry(_T("columns"));
219 if (columnsRoot != NULL)
220 {
221 ObjectArray<ConfigEntry> *columns = columnsRoot->getSubEntries(_T("column#*"));
222 m_columns = new ObjectArray<DCTableColumn>(columns->size(), 8, true);
223 for(int i = 0; i < columns->size(); i++)
224 {
225 m_columns->add(new DCTableColumn(columns->get(i)));
226 }
227 delete columns;
228 }
229 else
230 {
231 m_columns = new ObjectArray<DCTableColumn>(8, 8, true);
232 }
233
234 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
235 if (thresholdsRoot != NULL)
236 {
237 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
238 m_thresholds = new ObjectArray<DCTableThreshold>(thresholds->size(), 8, true);
239 for(int i = 0; i < thresholds->size(); i++)
240 {
241 m_thresholds->add(new DCTableThreshold(thresholds->get(i)));
242 }
243 delete thresholds;
244 }
245 else
246 {
247 m_thresholds = new ObjectArray<DCTableThreshold>(0, 4, true);
248 }
249
250 m_lastValue = NULL;
251 }
252
253 /**
254 * Destructor
255 */
256 DCTable::~DCTable()
257 {
258 delete m_columns;
259 delete m_thresholds;
260 if (m_lastValue != NULL)
261 m_lastValue->decRefCount();
262 }
263
264 /**
265 * Clean expired data
266 */
267 void DCTable::deleteExpiredData()
268 {
269 TCHAR query[256];
270 time_t now;
271
272 now = time(NULL);
273
274 lock();
275 _sntprintf(query, 256, _T("DELETE FROM tdata_%d WHERE (item_id=%d) AND (tdata_timestamp<%ld)"),
276 (int)m_owner->getId(), (int)m_id, (long)(now - (time_t)m_iRetentionTime * 86400));
277 unlock();
278
279 QueueSQLRequest(query);
280 }
281
282 /**
283 * Delete all collected data
284 */
285 bool DCTable::deleteAllData()
286 {
287 TCHAR szQuery[256];
288 bool success;
289
290 lock();
291 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
292 _sntprintf(szQuery, 256, _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_owner->getId(), (int)m_id);
293 success = DBQuery(hdb, szQuery) ? true : false;
294 DBConnectionPoolReleaseConnection(hdb);
295 unlock();
296 return success;
297 }
298
299 /**
300 * Process new collected value. Should return true on success.
301 * If returns false, current poll result will be converted into data collection error.
302 *
303 * @return true on success
304 */
305 bool DCTable::processNewValue(time_t timestamp, const void *value, bool *updateStatus)
306 {
307 *updateStatus = false;
308 lock();
309
310 // Normally m_owner shouldn't be NULL for polled items, but who knows...
311 if (m_owner == NULL)
312 {
313 unlock();
314 return false;
315 }
316
317 // Transform input value
318 // Cluster can have only aggregated data, and transformation
319 // should not be used on aggregation
320 if ((m_owner->getObjectClass() != OBJECT_CLUSTER) || (m_flags & DCF_TRANSFORM_AGGREGATED))
321 {
322 if (!transform((Table *)value))
323 {
324 unlock();
325 ((Table *)value)->decRefCount();
326 return false;
327 }
328 }
329
330 m_dwErrorCount = 0;
331 if (m_lastValue != NULL)
332 m_lastValue->decRefCount();
333 m_lastValue = (Table *)value;
334 m_lastValue->setTitle(m_description);
335 m_lastValue->setSource(m_source);
336
337 // Copy required fields into local variables
338 UINT32 tableId = m_id;
339 UINT32 nodeId = m_owner->getId();
340 bool save = (m_flags & DCF_NO_STORAGE) == 0;
341
342 ((Table *)value)->incRefCount();
343
344 unlock();
345
346 // Save data to database
347 // Object is unlocked, so only local variables can be used
348 if (save)
349 {
350 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
351 if (!DBBegin(hdb))
352 {
353 DBConnectionPoolReleaseConnection(hdb);
354 return true;
355 }
356
357 INT64 recordId = ((INT64)timestamp << 30) | (((INT64)tableId & 0xFFFF) << 14);
358 BOOL success = FALSE;
359 Table *data = (Table *)value;
360
361 TCHAR query[256];
362 _sntprintf(query, 256, _T("INSERT INTO tdata_%d (item_id,tdata_timestamp,record_id) VALUES (?,?,?)"), (int)nodeId);
363 DB_STATEMENT hStmt = DBPrepare(hdb, query);
364 if (hStmt != NULL)
365 {
366 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, tableId);
367 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)timestamp);
368 DBBind(hStmt, 3, DB_SQLTYPE_BIGINT, recordId);
369 success = DBExecute(hStmt);
370 DBFreeStatement(hStmt);
371 }
372
373 if (success)
374 {
375 _sntprintf(query, 256, _T("INSERT INTO tdata_records_%d (record_id,row_id,instance) VALUES (?,?,?)"), (int)nodeId);
376 DB_STATEMENT hStmt = DBPrepare(hdb, query);
377 if (hStmt != NULL)
378 {
379 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId);
380 for(int row = 0; row < data->getNumRows(); row++)
381 {
382 TCHAR instance[MAX_RESULT_LENGTH];
383 data->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
384 DBBind(hStmt, 2, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
385 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, instance, DB_BIND_STATIC);
386 success = DBExecute(hStmt);
387 if (!success)
388 break;
389 }
390 DBFreeStatement(hStmt);
391 }
392 }
393
394 if (success)
395 {
396 _sntprintf(query, 256, _T("INSERT INTO tdata_rows_%d (row_id,column_id,value) VALUES (?,?,?)"), (int)nodeId);
397 DB_STATEMENT hStmt = DBPrepare(hdb, query);
398 if (hStmt != NULL)
399 {
400 for(int col = 0; col < data->getNumColumns(); col++)
401 {
402 INT32 colId = columnIdFromName(data->getColumnName(col));
403 if (colId == 0)
404 continue; // cannot get column ID
405
406 for(int row = 0; row < data->getNumRows(); row++)
407 {
408 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, recordId | (INT64)row);
409 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, colId);
410 const TCHAR *s = data->getAsString(row, col);
411 if ((s == NULL) || (_tcslen(s) < MAX_DB_STRING))
412 {
413 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, s, DB_BIND_STATIC);
414 }
415 else
416 {
417 TCHAR *sp = (TCHAR *)nx_memdup(s, MAX_DB_STRING * sizeof(TCHAR));
418 sp[MAX_DB_STRING - 1] = 0;
419 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, sp, DB_BIND_DYNAMIC);
420 }
421 success = DBExecute(hStmt);
422 if (!success)
423 break;
424 }
425 }
426 DBFreeStatement(hStmt);
427 }
428 }
429
430 if (success)
431 DBCommit(hdb);
432 else
433 DBRollback(hdb);
434
435 DBConnectionPoolReleaseConnection(hdb);
436 }
437 if ((g_offlineDataRelevanceTime <= 0) || (timestamp > (time(NULL) - g_offlineDataRelevanceTime)))
438 checkThresholds((Table *)value);
439
440 if (g_flags & AF_PERFDATA_STORAGE_DRIVER_LOADED)
441 PerfDataStorageRequest(this, timestamp, (Table *)value);
442
443 ((Table *)value)->decRefCount();
444 return true;
445 }
446
447 /**
448 * Transform received value
449 */
450 bool DCTable::transform(Table *value)
451 {
452 if (m_transformationScript == NULL)
453 return true;
454
455 bool success = false;
456 NXSL_VM *vm = new NXSL_VM(new NXSL_ServerEnv());
457 if (vm->load(m_transformationScript))
458 {
459 NXSL_Value *nxslValue = new NXSL_Value(new NXSL_Object(&g_nxslStaticTableClass, value));
460 vm->setGlobalVariable(_T("$object"), m_owner->createNXSLObject());
461 if (m_owner->getObjectClass() == OBJECT_NODE)
462 {
463 vm->setGlobalVariable(_T("$node"), m_owner->createNXSLObject());
464 }
465 vm->setGlobalVariable(_T("$dci"), createNXSLObject());
466 vm->setGlobalVariable(_T("$isCluster"), new NXSL_Value((m_owner->getObjectClass() == OBJECT_CLUSTER) ? 1 : 0));
467
468 // remove lock from DCI for script execution to avoid deadlocks
469 unlock();
470 success = vm->run(1, &nxslValue);
471 lock();
472 if (!success)
473 {
474 if (vm->getErrorCode() == NXSL_ERR_EXECUTION_ABORTED)
475 {
476 DbgPrintf(6, _T("Transformation script for DCI \"%s\" [%d] on node %s [%d] aborted"),
477 m_description, m_id, getOwnerName(), getOwnerId());
478 }
479 else
480 {
481 TCHAR buffer[1024];
482 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
483 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
484 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
485 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
486 }
487 }
488 }
489 else
490 {
491 TCHAR buffer[1024];
492 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
493 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
494 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
495 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
496 }
497 delete vm;
498 return success;
499 }
500
501 /**
502 * Check thresholds
503 */
504 void DCTable::checkThresholds(Table *value)
505 {
506 static const TCHAR *paramNames[] = { _T("dciName"), _T("dciDescription"), _T("dciId"), _T("row"), _T("instance") };
507
508 lock();
509 for(int row = 0; row < value->getNumRows(); row++)
510 {
511 TCHAR instance[MAX_RESULT_LENGTH];
512 value->buildInstanceString(row, instance, MAX_RESULT_LENGTH);
513 for(int i = 0; i < m_thresholds->size(); i++)
514 {
515 DCTableThreshold *t = m_thresholds->get(i);
516 ThresholdCheckResult result = t->check(value, row, instance);
517 switch(result)
518 {
519 case ACTIVATED:
520 PostDciEventWithNames(t->getActivationEvent(), m_owner->getId(), m_id, "ssids", paramNames, m_name, m_description, m_id, row, instance);
521 if (!(m_flags & DCF_ALL_THRESHOLDS))
522 i = m_thresholds->size(); // Stop processing (for current row)
523 break;
524 case DEACTIVATED:
525 PostDciEventWithNames(t->getDeactivationEvent(), m_owner->getId(), m_id, "ssids", paramNames, m_name, m_description, m_id, row, instance);
526 break;
527 case ALREADY_ACTIVE:
528 i = m_thresholds->size(); // Threshold condition still true, stop processing
529 break;
530 default:
531 break;
532 }
533 }
534 }
535 unlock();
536 }
537
538 /**
539 * Process new data collection error
540 */
541 void DCTable::processNewError(bool noInstance)
542 {
543 m_dwErrorCount++;
544 }
545
546 /**
547 * Save to database
548 */
549 bool DCTable::saveToDatabase(DB_HANDLE hdb)
550 {
551 DB_STATEMENT hStmt;
552 if (IsDatabaseRecordExist(hdb, _T("dc_tables"), _T("item_id"), m_id))
553 {
554 hStmt = DBPrepare(hdb, _T("UPDATE dc_tables SET node_id=?,template_id=?,template_item_id=?,name=?,")
555 _T("description=?,flags=?,source=?,snmp_port=?,polling_interval=?,")
556 _T("retention_time=?,status=?,system_tag=?,resource_id=?,proxy_node=?,")
557 _T("perftab_settings=?,transformation_script=?,comments=?,guid=? WHERE item_id=?"));
558 }
559 else
560 {
561 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_tables (node_id,template_id,template_item_id,name,")
562 _T("description,flags,source,snmp_port,polling_interval,")
563 _T("retention_time,status,system_tag,resource_id,proxy_node,perftab_settings,")
564 _T("transformation_script,comments,guid,item_id) ")
565 _T("VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
566 }
567 if (hStmt == NULL)
568 return FALSE;
569
570 lock();
571
572 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (m_owner == NULL) ? (UINT32)0 : m_owner->getId());
573 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_dwTemplateId);
574 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, m_dwTemplateItemId);
575 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
576 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_description, DB_BIND_STATIC);
577 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (UINT32)m_flags);
578 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (INT32)m_source);
579 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (UINT32)m_snmpPort);
580 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (INT32)m_iPollingInterval);
581 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (INT32)m_iRetentionTime);
582 DBBind(hStmt, 11, DB_SQLTYPE_INTEGER, (INT32)m_status);
583 DBBind(hStmt, 12, DB_SQLTYPE_VARCHAR, m_systemTag, DB_BIND_STATIC);
584 DBBind(hStmt, 13, DB_SQLTYPE_INTEGER, m_dwResourceId);
585 DBBind(hStmt, 14, DB_SQLTYPE_INTEGER, m_sourceNode);
586 DBBind(hStmt, 15, DB_SQLTYPE_TEXT, m_pszPerfTabSettings, DB_BIND_STATIC);
587 DBBind(hStmt, 16, DB_SQLTYPE_TEXT, m_transformationScriptSource, DB_BIND_STATIC);
588 DBBind(hStmt, 17, DB_SQLTYPE_TEXT, m_comments, DB_BIND_STATIC);
589 DBBind(hStmt, 18, DB_SQLTYPE_VARCHAR, m_guid);
590 DBBind(hStmt, 19, DB_SQLTYPE_INTEGER, m_id);
591
592 bool result = DBExecute(hStmt);
593 DBFreeStatement(hStmt);
594
595 if (result)
596 {
597 // Save column configuration
598 hStmt = DBPrepare(hdb, _T("DELETE FROM dc_table_columns WHERE table_id=?"));
599 if (hStmt != NULL)
600 {
601 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
602 result = DBExecute(hStmt);
603 DBFreeStatement(hStmt);
604 }
605 else
606 {
607 result = false;
608 }
609
610 if (result && (m_columns->size() > 0))
611 {
612 hStmt = DBPrepare(hdb, _T("INSERT INTO dc_table_columns (table_id,sequence_number,column_name,snmp_oid,flags,display_name) VALUES (?,?,?,?,?,?)"));
613 if (hStmt != NULL)
614 {
615 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
616 for(int i = 0; i < m_columns->size(); i++)
617 {
618 DCTableColumn *column = m_columns->get(i);
619 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)(i + 1));
620 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, column->getName(), DB_BIND_STATIC);
621 SNMP_ObjectId *oid = column->getSnmpOid();
622 DBBind(hStmt, 4, DB_SQLTYPE_VARCHAR, (oid != NULL) ? (const TCHAR *)oid->toString() : NULL, DB_BIND_TRANSIENT);
623 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (INT32)column->getFlags());
624 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, column->getDisplayName(), DB_BIND_STATIC);
625
626 result = DBExecute(hStmt);
627 if (!result)
628 break;
629 }
630 DBFreeStatement(hStmt);
631 }
632 else
633 {
634 result = false;
635 }
636 }
637 }
638
639 saveThresholds(hdb);
640
641 unlock();
642 return result ? DCObject::saveToDatabase(hdb) : false;
643 }
644
645 /**
646 * Load thresholds from database
647 */
648 bool DCTable::loadThresholds(DB_HANDLE hdb)
649 {
650 DB_STATEMENT hStmt = DBPrepare(hdb, _T("SELECT id,activation_event,deactivation_event FROM dct_thresholds WHERE table_id=? ORDER BY sequence_number"));
651 if (hStmt == NULL)
652 return false;
653
654 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
655 DB_RESULT hResult = DBSelectPrepared(hStmt);
656 if (hResult != NULL)
657 {
658 int count = DBGetNumRows(hResult);
659 for(int i = 0; i < count; i++)
660 {
661 DCTableThreshold *t = new DCTableThreshold(hdb, hResult, i);
662 m_thresholds->add(t);
663 }
664 DBFreeResult(hResult);
665 }
666 DBFreeStatement(hStmt);
667 return true;
668 }
669
670 /**
671 * Save thresholds to database
672 */
673 bool DCTable::saveThresholds(DB_HANDLE hdb)
674 {
675 DB_STATEMENT hStmt = DBPrepare(hdb, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=?"));
676 if (hStmt == NULL)
677 return false;
678
679 for(int i = 0; i < m_thresholds->size(); i++)
680 {
681 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_thresholds->get(i)->getId());
682 DBExecute(hStmt);
683 }
684 DBFreeStatement(hStmt);
685
686 hStmt = DBPrepare(hdb, _T("DELETE FROM dct_thresholds WHERE table_id=?"));
687 if (hStmt == NULL)
688 return false;
689 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
690 DBExecute(hStmt);
691 DBFreeStatement(hStmt);
692
693 for(int i = 0; i < m_thresholds->size(); i++)
694 m_thresholds->get(i)->saveToDatabase(hdb, m_id, i);
695 return true;
696 }
697
698 /**
699 * Delete table object and collected data from database
700 */
701 void DCTable::deleteFromDatabase()
702 {
703 TCHAR szQuery[256];
704
705 DCObject::deleteFromDatabase();
706
707 if(m_owner->getObjectClass() != OBJECT_TEMPLATE)
708 {
709 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM tdata_%d WHERE item_id=%d"), m_owner->getId(), (int)m_id);
710 QueueSQLRequest(szQuery);
711 }
712
713 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_tables WHERE item_id=%d"), (int)m_id);
714 QueueSQLRequest(szQuery);
715 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dc_table_columns WHERE table_id=%d"), (int)m_id);
716 QueueSQLRequest(szQuery);
717
718 for(int i = 0; i < m_thresholds->size(); i++)
719 {
720 _sntprintf(szQuery, 256, _T("DELETE FROM dct_threshold_conditions WHERE threshold_id=%d"), (int)m_thresholds->get(i)->getId());
721 QueueSQLRequest(szQuery);
722 }
723
724 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM dct_thresholds WHERE table_id=%d"), (int)m_id);
725 QueueSQLRequest(szQuery);
726 }
727
728 /**
729 * Create NXCP message with item data
730 */
731 void DCTable::createMessage(NXCPMessage *pMsg)
732 {
733 DCObject::createMessage(pMsg);
734
735 lock();
736 pMsg->setField(VID_NUM_COLUMNS, (UINT32)m_columns->size());
737 UINT32 varId = VID_DCI_COLUMN_BASE;
738 for(int i = 0; i < m_columns->size(); i++)
739 {
740 DCTableColumn *column = m_columns->get(i);
741 pMsg->setField(varId++, column->getName());
742 pMsg->setField(varId++, column->getFlags());
743 SNMP_ObjectId *oid = column->getSnmpOid();
744 if (oid != NULL)
745 pMsg->setFieldFromInt32Array(varId++, (UINT32)oid->length(), oid->value());
746 else
747 varId++;
748 pMsg->setField(varId++, column->getDisplayName());
749 varId += 6;
750 }
751
752 pMsg->setField(VID_NUM_THRESHOLDS, (UINT32)m_thresholds->size());
753 varId = VID_DCI_THRESHOLD_BASE;
754 for(int i = 0; i < m_thresholds->size(); i++)
755 {
756 varId = m_thresholds->get(i)->fillMessage(pMsg, varId);
757 }
758
759 unlock();
760 }
761
762 /**
763 * Update data collection object from NXCP message
764 */
765 void DCTable::updateFromMessage(NXCPMessage *pMsg)
766 {
767 DCObject::updateFromMessage(pMsg);
768
769 lock();
770
771 m_columns->clear();
772 int count = (int)pMsg->getFieldAsUInt32(VID_NUM_COLUMNS);
773 UINT32 varId = VID_DCI_COLUMN_BASE;
774 for(int i = 0; i < count; i++)
775 {
776 m_columns->add(new DCTableColumn(pMsg, varId));
777 varId += 10;
778 }
779
780 count = (int)pMsg->getFieldAsUInt32(VID_NUM_THRESHOLDS);
781 ObjectArray<DCTableThreshold> *newThresholds = new ObjectArray<DCTableThreshold>(count, 8, true);
782 varId = VID_DCI_THRESHOLD_BASE;
783 for(int i = 0; i < count; i++)
784 {
785 DCTableThreshold *t = new DCTableThreshold(pMsg, &varId);
786 newThresholds->add(t);
787 for(int j = 0; j < m_thresholds->size(); j++)
788 {
789 DCTableThreshold *old = m_thresholds->get(j);
790 if (old->getId() == t->getId())
791 {
792 t->copyState(old);
793 break;
794 }
795 }
796 }
797 delete m_thresholds;
798 m_thresholds = newThresholds;
799
800 unlock();
801 }
802
803 /**
804 * Get last collected value
805 */
806 void DCTable::fillLastValueMessage(NXCPMessage *msg)
807 {
808 lock();
809 if (m_lastValue != NULL)
810 {
811 m_lastValue->fillMessage(*msg, 0, -1);
812 }
813 unlock();
814 }
815
816 /**
817 * Get summary of last collected value (to show along simple DCI values)
818 */
819 void DCTable::fillLastValueSummaryMessage(NXCPMessage *pMsg, UINT32 dwId)
820 {
821 lock();
822 pMsg->setField(dwId++, m_id);
823 pMsg->setField(dwId++, m_name);
824 pMsg->setField(dwId++, m_description);
825 pMsg->setField(dwId++, (WORD)m_source);
826 pMsg->setField(dwId++, (WORD)DCI_DT_NULL); // compatibility: data type
827 pMsg->setField(dwId++, _T("")); // compatibility: value
828 pMsg->setField(dwId++, (UINT32)m_tLastPoll);
829 pMsg->setField(dwId++, (WORD)(matchClusterResource() ? m_status : ITEM_STATUS_DISABLED)); // show resource-bound DCIs as inactive if cluster resource is not on this node
830 pMsg->setField(dwId++, (WORD)getType());
831 pMsg->setField(dwId++, m_dwErrorCount);
832 pMsg->setField(dwId++, m_dwTemplateItemId);
833 pMsg->setField(dwId++, (WORD)0); // compatibility: number of thresholds
834
835 unlock();
836 }
837
838 /**
839 * Get data type of given column
840 */
841 int DCTable::getColumnDataType(const TCHAR *name)
842 {
843 int dt = DCI_DT_STRING;
844 bool found = false;
845
846 lock();
847
848 // look in column definition first
849 for(int i = 0; i < m_columns->size(); i++)
850 {
851 DCTableColumn *column = m_columns->get(i);
852 if (!_tcsicmp(column->getName(), name))
853 {
854 dt = column->getDataType();
855 break;
856 }
857 }
858
859 // use last values if not found in definitions
860 if (!found && (m_lastValue != NULL))
861 {
862 int index = m_lastValue->getColumnIndex(name);
863 if (index != -1)
864 dt = m_lastValue->getColumnDataType(index);
865 }
866
867 unlock();
868 return dt;
869 }
870
871 /**
872 * Get last collected value
873 */
874 Table *DCTable::getLastValue()
875 {
876 lock();
877 Table *value;
878 if (m_lastValue != NULL)
879 {
880 value = m_lastValue;
881 value->incRefCount();
882 }
883 else
884 {
885 value = NULL;
886 }
887 unlock();
888 return value;
889 }
890
891 /**
892 * Update destination value from source value
893 */
894 #define RECALCULATE_VALUE(dst, src, func, count) \
895 { \
896 switch(func) \
897 { \
898 case DCF_FUNCTION_MIN: \
899 if (src < dst) dst = src; \
900 break; \
901 case DCF_FUNCTION_MAX: \
902 if (src > dst) dst = src; \
903 break; \
904 case DCF_FUNCTION_SUM: \
905 dst += src; \
906 break; \
907 case DCF_FUNCTION_AVG: \
908 dst = (dst * count + src) / (count + 1); \
909 break; \
910 } \
911 }
912
913 /**
914 * Merge values
915 */
916 void DCTable::mergeValues(Table *dest, Table *src, int count)
917 {
918 for(int sRow = 0; sRow < src->getNumRows(); sRow++)
919 {
920 TCHAR instance[MAX_RESULT_LENGTH];
921
922 src->buildInstanceString(sRow, instance, MAX_RESULT_LENGTH);
923 int dRow = dest->findRowByInstance(instance);
924 if (dRow >= 0)
925 {
926 for(int j = 0; j < m_columns->size(); j++)
927 {
928 DCTableColumn *cd = m_columns->get(j);
929 if ((cd == NULL) || cd->isInstanceColumn() || (cd->getDataType() == DCI_DT_STRING))
930 continue;
931 int column = dest->getColumnIndex(cd->getName());
932 if (column == -1)
933 continue;
934
935 if (cd->getDataType() == DCI_DT_FLOAT)
936 {
937 double sval = src->getAsDouble(sRow, column);
938 double dval = dest->getAsDouble(dRow, column);
939
940 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
941
942 dest->setAt(dRow, column, dval);
943 }
944 else if ((cd->getDataType() == DCI_DT_UINT) || (cd->getDataType() == DCI_DT_UINT64))
945 {
946 UINT64 sval = src->getAsUInt64(sRow, column);
947 UINT64 dval = dest->getAsUInt64(dRow, column);
948
949 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
950
951 dest->setAt(dRow, column, dval);
952 }
953 else
954 {
955 INT64 sval = src->getAsInt64(sRow, column);
956 INT64 dval = dest->getAsInt64(dRow, column);
957
958 RECALCULATE_VALUE(dval, sval, cd->getAggregationFunction(), count);
959
960 dest->setAt(dRow, column, dval);
961 }
962 }
963 }
964 else
965 {
966 // no such instance
967 dest->copyRow(src, sRow);
968 }
969 }
970 }
971
972 /**
973 * Update columns in resulting table according to definition
974 */
975 void DCTable::updateResultColumns(Table *t)
976 {
977 lock();
978 for(int i = 0; i < m_columns->size(); i++)
979 {
980 DCTableColumn *col = m_columns->get(i);
981 int index = t->getColumnIndex(col->getName());
982 if (index != -1)
983 {
984 TableColumnDefinition *cd = t->getColumnDefinitions()->get(index);
985 if (cd != NULL)
986 {
987 cd->setDataType(col->getDataType());
988 cd->setInstanceColumn(col->isInstanceColumn());
989 cd->setDisplayName(col->getDisplayName());
990 }
991 }
992 }
993 unlock();
994 }
995
996 /**
997 * Update from template item
998 */
999 void DCTable::updateFromTemplate(DCObject *src)
1000 {
1001 DCObject::updateFromTemplate(src);
1002
1003 if (src->getType() != DCO_TYPE_TABLE)
1004 {
1005 DbgPrintf(2, _T("INTERNAL ERROR: DCTable::updateFromTemplate(%d, %d): source type is %d"), (int)m_id, (int)src->getId(), src->getType());
1006 return;
1007 }
1008
1009 lock();
1010 DCTable *table = (DCTable *)src;
1011
1012 m_columns->clear();
1013 for(int i = 0; i < table->m_columns->size(); i++)
1014 m_columns->add(new DCTableColumn(table->m_columns->get(i)));
1015
1016 m_thresholds->clear();
1017 for(int i = 0; i < table->m_thresholds->size(); i++)
1018 m_thresholds->add(new DCTableThreshold(table->m_thresholds->get(i)));
1019
1020 unlock();
1021 }
1022
1023 /**
1024 * Create management pack record
1025 */
1026 void DCTable::createExportRecord(String &str)
1027 {
1028 lock();
1029
1030 str.appendFormattedString(_T("\t\t\t\t<dctable id=\"%d\">\n")
1031 _T("\t\t\t\t\t<guid>%s</guid>\n")
1032 _T("\t\t\t\t\t<name>%s</name>\n")
1033 _T("\t\t\t\t\t<description>%s</description>\n")
1034 _T("\t\t\t\t\t<origin>%d</origin>\n")
1035 _T("\t\t\t\t\t<interval>%d</interval>\n")
1036 _T("\t\t\t\t\t<retention>%d</retention>\n")
1037 _T("\t\t\t\t\t<systemTag>%s</systemTag>\n")
1038 _T("\t\t\t\t\t<flags>%d</flags>\n")
1039 _T("\t\t\t\t\t<snmpPort>%d</snmpPort>\n"),
1040 (int)m_id, (const TCHAR *)m_guid.toString(),
1041 (const TCHAR *)EscapeStringForXML2(m_name),
1042 (const TCHAR *)EscapeStringForXML2(m_description),
1043 (int)m_source, m_iPollingInterval, m_iRetentionTime,
1044 (const TCHAR *)EscapeStringForXML2(m_systemTag),
1045 (int)m_flags, (int)m_snmpPort);
1046
1047 if (m_transformationScriptSource != NULL)
1048 {
1049 str.append(_T("\t\t\t\t\t<transformation>"));
1050 str.appendPreallocated(EscapeStringForXML(m_transformationScriptSource, -1));
1051 str.append(_T("</transformation>\n"));
1052 }
1053
1054 if ((m_schedules != NULL) && (m_schedules->size() > 0))
1055 {
1056 str.append(_T("\t\t\t\t\t<schedules>\n"));
1057 for(int i = 0; i < m_schedules->size(); i++)
1058 {
1059 str.append(_T("\t\t\t\t\t\t<schedule>"));
1060 str.append(EscapeStringForXML2(m_schedules->get(i)));
1061 str.append(_T("</schedule>\n"));
1062 }
1063 str.append(_T("\t\t\t\t\t</schedules>\n"));
1064 }
1065
1066 if (m_columns != NULL)
1067 {
1068 str += _T("\t\t\t\t\t<columns>\n");
1069 for(int i = 0; i < m_columns->size(); i++)
1070 {
1071 m_columns->get(i)->createNXMPRecord(str, i + 1);
1072 }
1073 str += _T("\t\t\t\t\t</columns>\n");
1074 }
1075
1076 if (m_thresholds != NULL)
1077 {
1078 str += _T("\t\t\t\t\t<thresholds>\n");
1079 for(int i = 0; i < m_thresholds->size(); i++)
1080 {
1081 m_thresholds->get(i)->createNXMPRecord(str, i + 1);
1082 }
1083 str += _T("\t\t\t\t\t</thresholds>\n");
1084 }
1085
1086 if (m_pszPerfTabSettings != NULL)
1087 {
1088 str.append(_T("\t\t\t\t\t<perfTabSettings>"));
1089 str.appendPreallocated(EscapeStringForXML(m_pszPerfTabSettings, -1));
1090 str.append(_T("</perfTabSettings>\n"));
1091 }
1092
1093 unlock();
1094 str.append(_T("\t\t\t\t</dctable>\n"));
1095 }
1096
1097 /**
1098 * Create DCObject from import file
1099 */
1100 void DCTable::updateFromImport(ConfigEntry *config)
1101 {
1102 DCObject::updateFromImport(config);
1103
1104 lock();
1105 m_columns->clear();
1106 ConfigEntry *columnsRoot = config->findEntry(_T("columns"));
1107 if (columnsRoot != NULL)
1108 {
1109 ObjectArray<ConfigEntry> *columns = columnsRoot->getSubEntries(_T("column#*"));
1110 for(int i = 0; i < columns->size(); i++)
1111 {
1112 m_columns->add(new DCTableColumn(columns->get(i)));
1113 }
1114 delete columns;
1115 }
1116
1117 m_thresholds->clear();
1118 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
1119 if (thresholdsRoot != NULL)
1120 {
1121 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
1122 for(int i = 0; i < thresholds->size(); i++)
1123 {
1124 m_thresholds->add(new DCTableThreshold(thresholds->get(i)));
1125 }
1126 delete thresholds;
1127 }
1128 unlock();
1129 }
1130
1131 /**
1132 * Should return true if object has (or can have) value
1133 */
1134 bool DCTable::hasValue()
1135 {
1136 if (m_owner->getObjectClass() == OBJECT_CLUSTER)
1137 return isAggregateOnCluster();
1138 return true;
1139 }