Collected DCI data recalculation based on stored raw values and current transformatio...
[public/netxms.git] / src / server / core / dcitem.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: dcitem.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 #ifdef WITH_ZMQ
26 #include "zeromq.h"
27 #endif
28
29 /**
30 * Event parameter names
31 */
32 static const TCHAR *s_paramNamesReach[] = { _T("dciName"), _T("dciDescription"), _T("thresholdValue"), _T("currentValue"), _T("dciId"), _T("instance"), _T("isRepeatedEvent"), _T("dciValue") };
33 static const TCHAR *s_paramNamesRearm[] = { _T("dciName"), _T("dciDescription"), _T("dciId"), _T("instance"), _T("thresholdValue"), _T("currentValue"), _T("dciValue") };
34
35 /**
36 * DCI cache loader queue
37 */
38 extern Queue g_dciCacheLoaderQueue;
39
40 /**
41 * Default constructor for DCItem
42 */
43 DCItem::DCItem() : DCObject()
44 {
45 m_thresholds = NULL;
46 m_dataType = DCI_DT_INT;
47 m_deltaCalculation = DCM_ORIGINAL_VALUE;
48 m_sampleCount = 0;
49 m_cacheSize = 0;
50 m_requiredCacheSize = 0;
51 m_ppValueCache = NULL;
52 m_tPrevValueTimeStamp = 0;
53 m_bCacheLoaded = false;
54 m_nBaseUnits = DCI_BASEUNITS_OTHER;
55 m_nMultiplier = 1;
56 m_customUnitName = NULL;
57 m_snmpRawValueType = SNMP_RAWTYPE_NONE;
58 m_predictionEngine[0] = 0;
59 }
60
61 /**
62 * Create DCItem from another DCItem
63 */
64 DCItem::DCItem(const DCItem *pSrc) : DCObject(pSrc)
65 {
66 m_dataType = pSrc->m_dataType;
67 m_deltaCalculation = pSrc->m_deltaCalculation;
68 m_sampleCount = pSrc->m_sampleCount;
69 m_cacheSize = 0;
70 m_requiredCacheSize = 0;
71 m_ppValueCache = NULL;
72 m_tPrevValueTimeStamp = 0;
73 m_bCacheLoaded = false;
74 m_nBaseUnits = pSrc->m_nBaseUnits;
75 m_nMultiplier = pSrc->m_nMultiplier;
76 m_customUnitName = (pSrc->m_customUnitName != NULL) ? _tcsdup(pSrc->m_customUnitName) : NULL;
77 m_snmpRawValueType = pSrc->m_snmpRawValueType;
78 _tcscpy(m_predictionEngine, pSrc->m_predictionEngine);
79
80 // Copy thresholds
81 if (pSrc->getThresholdCount() > 0)
82 {
83 m_thresholds = new ObjectArray<Threshold>(pSrc->m_thresholds->size(), 8, true);
84 for(int i = 0; i < pSrc->m_thresholds->size(); i++)
85 {
86 Threshold *t = new Threshold(pSrc->m_thresholds->get(i));
87 t->createId();
88 m_thresholds->add(t);
89 }
90 }
91 else
92 {
93 m_thresholds = NULL;
94 }
95 }
96
97 /**
98 * Constructor for creating DCItem from database
99 * Assumes that fields in SELECT query are in following order:
100 * item_id,name,source,datatype,polling_interval,retention_time,status,
101 * delta_calculation,transformation,template_id,description,instance,
102 * template_item_id,flags,resource_id,proxy_node,base_units,unit_multiplier,
103 * custom_units_name,perftab_settings,system_tag,snmp_port,snmp_raw_value_type,
104 * instd_method,instd_data,instd_filter,samples,comments,guid,npe_name
105 */
106 DCItem::DCItem(DB_HANDLE hdb, DB_RESULT hResult, int iRow, Template *pNode) : DCObject()
107 {
108 m_id = DBGetFieldULong(hResult, iRow, 0);
109 DBGetField(hResult, iRow, 1, m_name, MAX_ITEM_NAME);
110 m_source = (BYTE)DBGetFieldLong(hResult, iRow, 2);
111 m_dataType = (BYTE)DBGetFieldLong(hResult, iRow, 3);
112 m_iPollingInterval = DBGetFieldLong(hResult, iRow, 4);
113 m_iRetentionTime = DBGetFieldLong(hResult, iRow, 5);
114 m_status = (BYTE)DBGetFieldLong(hResult, iRow, 6);
115 m_deltaCalculation = (BYTE)DBGetFieldLong(hResult, iRow, 7);
116 TCHAR *pszTmp = DBGetField(hResult, iRow, 8, NULL, 0);
117 setTransformationScript(pszTmp);
118 free(pszTmp);
119 m_dwTemplateId = DBGetFieldULong(hResult, iRow, 9);
120 DBGetField(hResult, iRow, 10, m_description, MAX_DB_STRING);
121 DBGetField(hResult, iRow, 11, m_instance, MAX_DB_STRING);
122 m_dwTemplateItemId = DBGetFieldULong(hResult, iRow, 12);
123 m_thresholds = NULL;
124 m_owner = pNode;
125 m_cacheSize = 0;
126 m_requiredCacheSize = 0;
127 m_ppValueCache = NULL;
128 m_tPrevValueTimeStamp = 0;
129 m_bCacheLoaded = false;
130 m_flags = (WORD)DBGetFieldLong(hResult, iRow, 13);
131 m_dwResourceId = DBGetFieldULong(hResult, iRow, 14);
132 m_sourceNode = DBGetFieldULong(hResult, iRow, 15);
133 m_nBaseUnits = DBGetFieldLong(hResult, iRow, 16);
134 m_nMultiplier = DBGetFieldLong(hResult, iRow, 17);
135 m_customUnitName = DBGetField(hResult, iRow, 18, NULL, 0);
136 m_pszPerfTabSettings = DBGetField(hResult, iRow, 19, NULL, 0);
137 DBGetField(hResult, iRow, 20, m_systemTag, MAX_DB_STRING);
138 m_snmpPort = (WORD)DBGetFieldLong(hResult, iRow, 21);
139 m_snmpRawValueType = (WORD)DBGetFieldLong(hResult, iRow, 22);
140 m_instanceDiscoveryMethod = (WORD)DBGetFieldLong(hResult, iRow, 23);
141 m_instanceDiscoveryData = DBGetField(hResult, iRow, 24, NULL, 0);
142 m_instanceFilterSource = NULL;
143 m_instanceFilter = NULL;
144 pszTmp = DBGetField(hResult, iRow, 25, NULL, 0);
145 setInstanceFilter(pszTmp);
146 free(pszTmp);
147 m_sampleCount = DBGetFieldLong(hResult, iRow, 26);
148 m_comments = DBGetField(hResult, iRow, 27, NULL, 0);
149 m_guid = DBGetFieldGUID(hResult, iRow, 28);
150 DBGetField(hResult, iRow, 29, m_predictionEngine, MAX_NPE_NAME_LEN);
151
152 // Load last raw value from database
153 TCHAR szQuery[256];
154 _sntprintf(szQuery, 256, _T("SELECT raw_value,last_poll_time FROM raw_dci_values WHERE item_id=%d"), m_id);
155 DB_RESULT hTempResult = DBSelect(hdb, szQuery);
156 if (hTempResult != NULL)
157 {
158 if (DBGetNumRows(hTempResult) > 0)
159 {
160 TCHAR szBuffer[MAX_DB_STRING];
161 m_prevRawValue = DBGetField(hTempResult, 0, 0, szBuffer, MAX_DB_STRING);
162 m_tPrevValueTimeStamp = DBGetFieldULong(hTempResult, 0, 1);
163 m_tLastPoll = m_tPrevValueTimeStamp;
164 }
165 DBFreeResult(hTempResult);
166 }
167
168 loadCustomSchedules(hdb);
169 }
170
171 /**
172 * Constructor for creating new DCItem from scratch
173 */
174 DCItem::DCItem(UINT32 dwId, const TCHAR *szName, int iSource, int iDataType,
175 int iPollingInterval, int iRetentionTime, Template *pNode,
176 const TCHAR *pszDescription, const TCHAR *systemTag)
177 : DCObject(dwId, szName, iSource, iPollingInterval, iRetentionTime, pNode, pszDescription, systemTag)
178 {
179 m_dataType = iDataType;
180 m_deltaCalculation = DCM_ORIGINAL_VALUE;
181 m_sampleCount = 0;
182 m_thresholds = NULL;
183 m_cacheSize = 0;
184 m_requiredCacheSize = 0;
185 m_ppValueCache = NULL;
186 m_tPrevValueTimeStamp = 0;
187 m_bCacheLoaded = false;
188 m_nBaseUnits = DCI_BASEUNITS_OTHER;
189 m_nMultiplier = 1;
190 m_customUnitName = NULL;
191 m_snmpRawValueType = SNMP_RAWTYPE_NONE;
192 m_predictionEngine[0] = 0;
193
194 updateCacheSizeInternal(false);
195 }
196
197 /**
198 * Create DCItem from import file
199 */
200 DCItem::DCItem(ConfigEntry *config, Template *owner) : DCObject(config, owner)
201 {
202 m_dataType = (BYTE)config->getSubEntryValueAsInt(_T("dataType"));
203 m_deltaCalculation = (BYTE)config->getSubEntryValueAsInt(_T("delta"));
204 m_sampleCount = (BYTE)config->getSubEntryValueAsInt(_T("samples"));
205 m_cacheSize = 0;
206 m_requiredCacheSize = 0;
207 m_ppValueCache = NULL;
208 m_tPrevValueTimeStamp = 0;
209 m_bCacheLoaded = false;
210 m_nBaseUnits = DCI_BASEUNITS_OTHER;
211 m_nMultiplier = 1;
212 m_customUnitName = NULL;
213 m_snmpRawValueType = (WORD)config->getSubEntryValueAsInt(_T("snmpRawValueType"));
214 nx_strncpy(m_predictionEngine, config->getSubEntryValue(_T("predictionEngine"), 0, _T("")), MAX_NPE_NAME_LEN);
215
216 // for compatibility with old format
217 if (config->getSubEntryValueAsInt(_T("allThresholds")))
218 m_flags |= DCF_ALL_THRESHOLDS;
219 if (config->getSubEntryValueAsInt(_T("rawValueInOctetString")))
220 m_flags |= DCF_RAW_VALUE_OCTET_STRING;
221
222 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
223 if (thresholdsRoot != NULL)
224 {
225 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
226 m_thresholds = new ObjectArray<Threshold>(thresholds->size(), 8, true);
227 for(int i = 0; i < thresholds->size(); i++)
228 {
229 m_thresholds->add(new Threshold(thresholds->get(i), this));
230 }
231 delete thresholds;
232 }
233 else
234 {
235 m_thresholds = NULL;
236 }
237
238 updateCacheSizeInternal(false);
239 }
240
241 /**
242 * Destructor
243 */
244 DCItem::~DCItem()
245 {
246 delete m_thresholds;
247 free(m_customUnitName);
248 clearCache();
249 }
250
251 /**
252 * Delete all thresholds
253 */
254 void DCItem::deleteAllThresholds()
255 {
256 lock();
257 delete_and_null(m_thresholds);
258 unlock();
259 }
260
261 /**
262 * Clear data cache
263 */
264 void DCItem::clearCache()
265 {
266 for(UINT32 i = 0; i < m_cacheSize; i++)
267 delete m_ppValueCache[i];
268 free(m_ppValueCache);
269 m_ppValueCache = NULL;
270 m_cacheSize = 0;
271 }
272
273 /**
274 * Load data collection items thresholds from database
275 */
276 bool DCItem::loadThresholdsFromDB(DB_HANDLE hdb)
277 {
278 bool result = false;
279
280 DB_STATEMENT hStmt = DBPrepare(hdb,
281 _T("SELECT threshold_id,fire_value,rearm_value,check_function,")
282 _T("check_operation,sample_count,script,event_code,current_state,")
283 _T("rearm_event_code,repeat_interval,current_severity,")
284 _T("last_event_timestamp,match_count FROM thresholds WHERE item_id=? ")
285 _T("ORDER BY sequence_number"));
286 if (hStmt != NULL)
287 {
288 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
289 DB_RESULT hResult = DBSelectPrepared(hStmt);
290 if (hResult != NULL)
291 {
292 int count = DBGetNumRows(hResult);
293 if (count > 0)
294 {
295 m_thresholds = new ObjectArray<Threshold>(count, 8, true);
296 for(int i = 0; i < count; i++)
297 m_thresholds->add(new Threshold(hResult, i, this));
298 }
299 DBFreeResult(hResult);
300 result = true;
301 }
302 DBFreeStatement(hStmt);
303 }
304 return result;
305 }
306
307 /**
308 * Save to database
309 */
310 bool DCItem::saveToDatabase(DB_HANDLE hdb)
311 {
312 // Prepare and execute query
313 DB_STATEMENT hStmt;
314 if (IsDatabaseRecordExist(hdb, _T("items"), _T("item_id"), m_id))
315 {
316 hStmt = DBPrepare(hdb,
317 _T("UPDATE items SET node_id=?,template_id=?,name=?,source=?,")
318 _T("datatype=?,polling_interval=?,retention_time=?,status=?,")
319 _T("delta_calculation=?,transformation=?,description=?,")
320 _T("instance=?,template_item_id=?,flags=?,")
321 _T("resource_id=?,proxy_node=?,base_units=?,")
322 _T("unit_multiplier=?,custom_units_name=?,perftab_settings=?,")
323 _T("system_tag=?,snmp_port=?,snmp_raw_value_type=?,")
324 _T("instd_method=?,instd_data=?,instd_filter=?,samples=?,")
325 _T("comments=?,guid=?,npe_name=? WHERE item_id=?"));
326 }
327 else
328 {
329 hStmt = DBPrepare(hdb,
330 _T("INSERT INTO items (node_id,template_id,name,source,")
331 _T("datatype,polling_interval,retention_time,status,delta_calculation,")
332 _T("transformation,description,instance,template_item_id,flags,")
333 _T("resource_id,proxy_node,base_units,unit_multiplier,")
334 _T("custom_units_name,perftab_settings,system_tag,snmp_port,snmp_raw_value_type,")
335 _T("instd_method,instd_data,instd_filter,samples,comments,guid,npe_name,item_id) VALUES ")
336 _T("(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
337 }
338 if (hStmt == NULL)
339 return false;
340
341 lock();
342
343 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (m_owner == NULL) ? 0 : m_owner->getId());
344 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, m_dwTemplateId);
345 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
346 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (INT32)m_source);
347 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (INT32)m_dataType);
348 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (INT32)m_iPollingInterval);
349 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (INT32)m_iRetentionTime);
350 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (INT32)m_status);
351 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (INT32)m_deltaCalculation);
352 DBBind(hStmt, 10, DB_SQLTYPE_TEXT, m_transformationScriptSource, DB_BIND_STATIC);
353 DBBind(hStmt, 11, DB_SQLTYPE_VARCHAR, m_description, DB_BIND_STATIC);
354 DBBind(hStmt, 12, DB_SQLTYPE_VARCHAR, m_instance, DB_BIND_STATIC);
355 DBBind(hStmt, 13, DB_SQLTYPE_INTEGER, m_dwTemplateItemId);
356 DBBind(hStmt, 14, DB_SQLTYPE_INTEGER, (UINT32)m_flags);
357 DBBind(hStmt, 15, DB_SQLTYPE_INTEGER, m_dwResourceId);
358 DBBind(hStmt, 16, DB_SQLTYPE_INTEGER, m_sourceNode);
359 DBBind(hStmt, 17, DB_SQLTYPE_INTEGER, (INT32)m_nBaseUnits);
360 DBBind(hStmt, 18, DB_SQLTYPE_INTEGER, (INT32)m_nMultiplier);
361 DBBind(hStmt, 19, DB_SQLTYPE_VARCHAR, m_customUnitName, DB_BIND_STATIC);
362 DBBind(hStmt, 20, DB_SQLTYPE_VARCHAR, m_pszPerfTabSettings, DB_BIND_STATIC);
363 DBBind(hStmt, 21, DB_SQLTYPE_VARCHAR, m_systemTag, DB_BIND_STATIC);
364 DBBind(hStmt, 22, DB_SQLTYPE_INTEGER, (INT32)m_snmpPort);
365 DBBind(hStmt, 23, DB_SQLTYPE_INTEGER, (INT32)m_snmpRawValueType);
366 DBBind(hStmt, 24, DB_SQLTYPE_INTEGER, (INT32)m_instanceDiscoveryMethod);
367 DBBind(hStmt, 25, DB_SQLTYPE_VARCHAR, m_instanceDiscoveryData, DB_BIND_STATIC);
368 DBBind(hStmt, 26, DB_SQLTYPE_TEXT, m_instanceFilterSource, DB_BIND_STATIC);
369 DBBind(hStmt, 27, DB_SQLTYPE_INTEGER, (INT32)m_sampleCount);
370 DBBind(hStmt, 28, DB_SQLTYPE_TEXT, m_comments, DB_BIND_STATIC);
371 DBBind(hStmt, 29, DB_SQLTYPE_VARCHAR, m_guid);
372 DBBind(hStmt, 30, DB_SQLTYPE_VARCHAR, m_predictionEngine, DB_BIND_STATIC);
373 DBBind(hStmt, 31, DB_SQLTYPE_INTEGER, m_id);
374
375 bool bResult = DBExecute(hStmt);
376 DBFreeStatement(hStmt);
377
378 // Save thresholds
379 if (bResult && (m_thresholds != NULL))
380 {
381 for(int i = 0; i < m_thresholds->size(); i++)
382 m_thresholds->get(i)->saveToDB(hdb, i);
383 }
384
385 // Delete non-existing thresholds
386 TCHAR query[256];
387 _sntprintf(query, 256, _T("SELECT threshold_id FROM thresholds WHERE item_id=%d"), m_id);
388 DB_RESULT hResult = DBSelect(hdb, query);
389 if (hResult != NULL)
390 {
391 int iNumRows = DBGetNumRows(hResult);
392 for(int i = 0; i < iNumRows; i++)
393 {
394 UINT32 dwId = DBGetFieldULong(hResult, i, 0);
395 int j;
396 for(j = 0; j < getThresholdCount(); j++)
397 if (m_thresholds->get(j)->getId() == dwId)
398 break;
399 if (j == getThresholdCount())
400 {
401 _sntprintf(query, 256, _T("DELETE FROM thresholds WHERE threshold_id=%d"), dwId);
402 DBQuery(hdb, query);
403 }
404 }
405 DBFreeResult(hResult);
406 }
407
408 // Create record in raw_dci_values if needed
409 if (!IsDatabaseRecordExist(hdb, _T("raw_dci_values"), _T("item_id"), m_id))
410 {
411 hStmt = DBPrepare(hdb, _T("INSERT INTO raw_dci_values (item_id,raw_value,last_poll_time) VALUES (?,?,?)"));
412 if (hStmt == NULL)
413 {
414 unlock();
415 return false;
416 }
417 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
418 DBBind(hStmt, 2, DB_SQLTYPE_TEXT, m_prevRawValue.getString(), DB_BIND_STATIC, 255);
419 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT64)m_tPrevValueTimeStamp);
420 bResult = DBExecute(hStmt);
421 DBFreeStatement(hStmt);
422 }
423
424 unlock();
425 return bResult ? DCObject::saveToDatabase(hdb) : false;
426 }
427
428 /**
429 * Check last value for threshold violations
430 */
431 void DCItem::checkThresholds(ItemValue &value)
432 {
433 if (m_thresholds == NULL)
434 return;
435
436 bool thresholdDeactivated = false;
437 for(int i = 0; i < m_thresholds->size(); i++)
438 {
439 Threshold *t = m_thresholds->get(i);
440 ItemValue checkValue;
441 ThresholdCheckResult result = t->check(value, m_ppValueCache, checkValue, m_owner, this);
442 switch(result)
443 {
444 case ACTIVATED:
445 {
446 PostDciEventWithNames(t->getEventCode(), m_owner->getId(), m_id, "ssssisds",
447 s_paramNamesReach, m_name, m_description, t->getStringValue(),
448 (const TCHAR *)checkValue, m_id, m_instance, 0, (const TCHAR *)value);
449 EventTemplate *evt = FindEventTemplateByCode(t->getEventCode());
450 if (evt != NULL)
451 {
452 t->markLastEvent(evt->getSeverity());
453 evt->decRefCount();
454 }
455 if (!(m_flags & DCF_ALL_THRESHOLDS))
456 i = m_thresholds->size(); // Stop processing
457 }
458 break;
459 case DEACTIVATED:
460 PostDciEventWithNames(t->getRearmEventCode(), m_owner->getId(), m_id, "ssissss",
461 s_paramNamesRearm, m_name, m_description, m_id, m_instance,
462 t->getStringValue(), (const TCHAR *)checkValue, (const TCHAR *)value);
463 if (!(m_flags & DCF_ALL_THRESHOLDS))
464 {
465 // this flag used to re-send activation event for next active threshold
466 thresholdDeactivated = true;
467 }
468 break;
469 case ALREADY_ACTIVE:
470 {
471 // Check if we need to re-sent threshold violation event
472 time_t now = time(NULL);
473 UINT32 repeatInterval = (t->getRepeatInterval() == -1) ? g_thresholdRepeatInterval : (UINT32)t->getRepeatInterval();
474 if (thresholdDeactivated || ((repeatInterval != 0) && (t->getLastEventTimestamp() + (time_t)repeatInterval < now)))
475 {
476 PostDciEventWithNames(t->getEventCode(), m_owner->getId(), m_id, "ssssisds",
477 s_paramNamesReach, m_name, m_description, t->getStringValue(),
478 (const TCHAR *)checkValue, m_id, m_instance, 1, (const TCHAR *)value);
479 EventTemplate *evt = FindEventTemplateByCode(t->getEventCode());
480 if (evt != NULL)
481 {
482 t->markLastEvent(evt->getSeverity());
483 evt->decRefCount();
484 }
485 }
486 }
487 if (!(m_flags & DCF_ALL_THRESHOLDS))
488 {
489 i = m_thresholds->size(); // Threshold condition still true, stop processing
490 }
491 thresholdDeactivated = false;
492 break;
493 default:
494 break;
495 }
496 }
497 }
498
499 /**
500 * Create NXCP message with item data
501 */
502 void DCItem::createMessage(NXCPMessage *pMsg)
503 {
504 DCObject::createMessage(pMsg);
505
506 lock();
507 pMsg->setField(VID_DCI_DATA_TYPE, (WORD)m_dataType);
508 pMsg->setField(VID_DCI_DELTA_CALCULATION, (WORD)m_deltaCalculation);
509 pMsg->setField(VID_SAMPLE_COUNT, (WORD)m_sampleCount);
510 pMsg->setField(VID_BASE_UNITS, (WORD)m_nBaseUnits);
511 pMsg->setField(VID_MULTIPLIER, (UINT32)m_nMultiplier);
512 pMsg->setField(VID_SNMP_RAW_VALUE_TYPE, m_snmpRawValueType);
513 pMsg->setField(VID_NPE_NAME, m_predictionEngine);
514 if (m_customUnitName != NULL)
515 pMsg->setField(VID_CUSTOM_UNITS_NAME, m_customUnitName);
516 if (m_thresholds != NULL)
517 {
518 pMsg->setField(VID_NUM_THRESHOLDS, (UINT32)m_thresholds->size());
519 UINT32 dwId = VID_DCI_THRESHOLD_BASE;
520 for(int i = 0; i < m_thresholds->size(); i++, dwId += 20)
521 m_thresholds->get(i)->createMessage(pMsg, dwId);
522 }
523 else
524 {
525 pMsg->setField(VID_NUM_THRESHOLDS, (UINT32)0);
526 }
527 unlock();
528 }
529
530 /**
531 * Delete item and collected data from database
532 */
533 void DCItem::deleteFromDatabase()
534 {
535 TCHAR szQuery[256];
536
537 DCObject::deleteFromDatabase();
538
539 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM items WHERE item_id=%d"), m_id);
540 QueueSQLRequest(szQuery);
541 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM thresholds WHERE item_id=%d"), m_id);
542 QueueSQLRequest(szQuery);
543 _sntprintf(szQuery, sizeof(szQuery) / sizeof(TCHAR), _T("DELETE FROM raw_dci_values WHERE item_id=%d"), m_id);
544 QueueSQLRequest(szQuery);
545
546 if (m_owner->isDataCollectionTarget())
547 static_cast<DataCollectionTarget*>(m_owner)->scheduleItemDataCleanup(m_id);
548 }
549
550 /**
551 * Update item from NXCP message
552 */
553 void DCItem::updateFromMessage(NXCPMessage *pMsg, UINT32 *pdwNumMaps, UINT32 **ppdwMapIndex, UINT32 **ppdwMapId)
554 {
555 DCObject::updateFromMessage(pMsg);
556
557 lock();
558
559 m_dataType = (BYTE)pMsg->getFieldAsUInt16(VID_DCI_DATA_TYPE);
560 m_deltaCalculation = (BYTE)pMsg->getFieldAsUInt16(VID_DCI_DELTA_CALCULATION);
561 m_sampleCount = (int)pMsg->getFieldAsUInt16(VID_SAMPLE_COUNT);
562 m_nBaseUnits = pMsg->getFieldAsUInt16(VID_BASE_UNITS);
563 m_nMultiplier = (int)pMsg->getFieldAsUInt32(VID_MULTIPLIER);
564 safe_free(m_customUnitName);
565 m_customUnitName = pMsg->getFieldAsString(VID_CUSTOM_UNITS_NAME);
566 m_snmpRawValueType = pMsg->getFieldAsUInt16(VID_SNMP_RAW_VALUE_TYPE);
567 pMsg->getFieldAsString(VID_NPE_NAME, m_predictionEngine, MAX_NPE_NAME_LEN);
568
569 // Update thresholds
570 UINT32 dwNum = pMsg->getFieldAsUInt32(VID_NUM_THRESHOLDS);
571 UINT32 *newThresholds = (UINT32 *)malloc(sizeof(UINT32) * dwNum);
572 *ppdwMapIndex = (UINT32 *)malloc(dwNum * sizeof(UINT32));
573 *ppdwMapId = (UINT32 *)malloc(dwNum * sizeof(UINT32));
574 *pdwNumMaps = 0;
575
576 // Read all new threshold ids from message
577 for(UINT32 i = 0, dwId = VID_DCI_THRESHOLD_BASE; i < dwNum; i++, dwId += 10)
578 {
579 newThresholds[i] = pMsg->getFieldAsUInt32(dwId);
580 }
581
582 // Check if some thresholds was deleted, and reposition others if needed
583 Threshold **ppNewList = (Threshold **)malloc(sizeof(Threshold *) * dwNum);
584 memset(ppNewList, 0, sizeof(Threshold *) * dwNum);
585 for(int i = 0; i < getThresholdCount(); i++)
586 {
587 UINT32 j;
588 for(j = 0; j < dwNum; j++)
589 if (m_thresholds->get(i)->getId() == newThresholds[j])
590 break;
591 if (j == dwNum)
592 {
593 // No threshold with that id in new list, delete it
594 m_thresholds->remove(i);
595 i--;
596 }
597 else
598 {
599 // Move existing thresholds to appropriate positions in new list
600 ppNewList[j] = m_thresholds->get(i);
601 }
602 }
603
604 // Add or update thresholds
605 for(UINT32 i = 0, dwId = VID_DCI_THRESHOLD_BASE; i < dwNum; i++, dwId += 10)
606 {
607 if (newThresholds[i] == 0) // New threshold?
608 {
609 ppNewList[i] = new Threshold(this);
610 ppNewList[i]->createId();
611
612 // Add index -> id mapping
613 (*ppdwMapIndex)[*pdwNumMaps] = i;
614 (*ppdwMapId)[*pdwNumMaps] = ppNewList[i]->getId();
615 (*pdwNumMaps)++;
616 }
617 if (ppNewList[i] != NULL)
618 ppNewList[i]->updateFromMessage(pMsg, dwId);
619 }
620
621 if (dwNum > 0)
622 {
623 if (m_thresholds != NULL)
624 {
625 m_thresholds->setOwner(false);
626 m_thresholds->clear();
627 m_thresholds->setOwner(true);
628 }
629 else
630 {
631 m_thresholds = new ObjectArray<Threshold>((int)dwNum, 8, true);
632 }
633 for(UINT32 i = 0; i < dwNum; i++)
634 {
635 if (ppNewList[i] != NULL)
636 m_thresholds->add(ppNewList[i]);
637 }
638 }
639 else
640 {
641 delete_and_null(m_thresholds);
642 }
643
644 // Update data type in thresholds
645 for(int i = 0; i < getThresholdCount(); i++)
646 m_thresholds->get(i)->setDataType(m_dataType);
647
648 free(ppNewList);
649 free(newThresholds);
650 updateCacheSizeInternal(true);
651 unlock();
652 }
653
654 /**
655 * Process new collected value. Should return true on success.
656 * If returns false, current poll result will be converted into data collection error.
657 *
658 * @return true on success
659 */
660 bool DCItem::processNewValue(time_t tmTimeStamp, const void *originalValue, bool *updateStatus)
661 {
662 ItemValue rawValue, *pValue;
663
664 lock();
665
666 // Normally m_owner shouldn't be NULL for polled items, but who knows...
667 if (m_owner == NULL)
668 {
669 unlock();
670 return false;
671 }
672
673 // Create new ItemValue object and transform it as needed
674 pValue = new ItemValue((const TCHAR *)originalValue, tmTimeStamp);
675 if (m_tPrevValueTimeStamp == 0)
676 m_prevRawValue = *pValue; // Delta should be zero for first poll
677 rawValue = *pValue;
678
679 // Cluster can have only aggregated data, and transformation
680 // should not be used on aggregation
681 if ((m_owner->getObjectClass() != OBJECT_CLUSTER) || (m_flags & DCF_TRANSFORM_AGGREGATED))
682 {
683 if (!transform(*pValue, (tmTimeStamp > m_tPrevValueTimeStamp) ? (tmTimeStamp - m_tPrevValueTimeStamp) : 0))
684 {
685 unlock();
686 return false;
687 }
688 }
689
690 m_dwErrorCount = 0;
691
692 if (isStatusDCO() && (tmTimeStamp > m_tPrevValueTimeStamp) && ((m_cacheSize == 0) || !m_bCacheLoaded || ((UINT32)*pValue != (UINT32)*m_ppValueCache[0])))
693 {
694 *updateStatus = true;
695 }
696 else
697 {
698 *updateStatus = false;
699 }
700
701 if (tmTimeStamp > m_tPrevValueTimeStamp)
702 {
703 m_prevRawValue = rawValue;
704 m_tPrevValueTimeStamp = tmTimeStamp;
705
706 // Save raw value into database
707 QueueRawDciDataUpdate(tmTimeStamp, m_id, (const TCHAR *)originalValue, pValue->getString());
708 }
709
710 // Save transformed value to database
711 if ((m_flags & DCF_NO_STORAGE) == 0)
712 QueueIDataInsert(tmTimeStamp, m_owner->getId(), m_id, (const TCHAR *)originalValue, pValue->getString());
713 if (g_flags & AF_PERFDATA_STORAGE_DRIVER_LOADED)
714 PerfDataStorageRequest(this, tmTimeStamp, pValue->getString());
715
716 // Update prediction engine
717 if (m_predictionEngine[0] != 0)
718 {
719 PredictionEngine *engine = FindPredictionEngine(m_predictionEngine);
720 if (engine != NULL)
721 engine->update(m_id, tmTimeStamp, pValue->getDouble());
722 }
723
724 // Check thresholds and add value to cache
725 if (m_bCacheLoaded && (tmTimeStamp >= m_tPrevValueTimeStamp) &&
726 ((g_offlineDataRelevanceTime <= 0) || (tmTimeStamp > (time(NULL) - g_offlineDataRelevanceTime))))
727 {
728 checkThresholds(*pValue);
729 }
730
731 if ((m_cacheSize > 0) && (tmTimeStamp >= m_tPrevValueTimeStamp))
732 {
733 delete m_ppValueCache[m_cacheSize - 1];
734 memmove(&m_ppValueCache[1], m_ppValueCache, sizeof(ItemValue *) * (m_cacheSize - 1));
735 m_ppValueCache[0] = pValue;
736 }
737 else
738 {
739 delete pValue;
740 }
741
742 unlock();
743
744 #ifdef WITH_ZMQ
745 ZmqPublishData(m_owner->getId(), m_id, m_name, pValue->getString());
746 #endif
747
748 return true;
749 }
750
751 /**
752 * Process new data collection error
753 */
754 void DCItem::processNewError(bool noInstance, time_t now)
755 {
756 lock();
757
758 // Normally m_owner shouldn't be NULL for polled items, but who knows...
759 if (m_owner == NULL)
760 {
761 unlock();
762 return;
763 }
764
765 m_dwErrorCount++;
766
767 for(int i = 0; i < getThresholdCount(); i++)
768 {
769 Threshold *t = m_thresholds->get(i);
770 ThresholdCheckResult result = t->checkError(m_dwErrorCount);
771 switch(result)
772 {
773 case ACTIVATED:
774 {
775 PostDciEventWithNames(t->getEventCode(), m_owner->getId(), m_id, "ssssisds",
776 s_paramNamesReach, m_name, m_description, _T(""), _T(""),
777 m_id, m_instance, 0, _T(""));
778 EventTemplate *evt = FindEventTemplateByCode(t->getEventCode());
779 if (evt != NULL)
780 {
781 t->markLastEvent(evt->getSeverity());
782 evt->decRefCount();
783 }
784 if (!(m_flags & DCF_ALL_THRESHOLDS))
785 {
786 i = m_thresholds->size(); // Stop processing
787 }
788 }
789 break;
790 case DEACTIVATED:
791 PostDciEventWithNames(t->getRearmEventCode(), m_owner->getId(), m_id, "ssissss",
792 s_paramNamesRearm, m_name, m_description, m_id, m_instance, _T(""), _T(""), _T(""));
793 break;
794 case ALREADY_ACTIVE:
795 {
796 // Check if we need to re-sent threshold violation event
797 UINT32 repeatInterval = (t->getRepeatInterval() == -1) ? g_thresholdRepeatInterval : (UINT32)t->getRepeatInterval();
798 if ((repeatInterval != 0) && (t->getLastEventTimestamp() + (time_t)repeatInterval < now))
799 {
800 PostDciEventWithNames(t->getEventCode(), m_owner->getId(), m_id, "ssssisds",
801 s_paramNamesReach, m_name, m_description, _T(""), _T(""),
802 m_id, m_instance, 1, _T(""));
803 EventTemplate *evt = FindEventTemplateByCode(t->getEventCode());
804 if (evt != NULL)
805 {
806 t->markLastEvent(evt->getSeverity());
807 evt->decRefCount();
808 }
809 }
810 }
811 if (!(m_flags & DCF_ALL_THRESHOLDS))
812 {
813 i = m_thresholds->size(); // Threshold condition still true, stop processing
814 }
815 break;
816 default:
817 break;
818 }
819 }
820
821 unlock();
822 }
823
824 /**
825 * Transform received value
826 */
827 bool DCItem::transform(ItemValue &value, time_t nElapsedTime)
828 {
829 bool success = true;
830
831 switch(m_deltaCalculation)
832 {
833 case DCM_SIMPLE:
834 switch(m_dataType)
835 {
836 case DCI_DT_INT:
837 value = (INT32)value - (INT32)m_prevRawValue;
838 break;
839 case DCI_DT_UINT:
840 value = (UINT32)value - (UINT32)m_prevRawValue;
841 break;
842 case DCI_DT_INT64:
843 value = (INT64)value - (INT64)m_prevRawValue;
844 break;
845 case DCI_DT_UINT64:
846 value = (UINT64)value - (UINT64)m_prevRawValue;
847 break;
848 case DCI_DT_FLOAT:
849 value = (double)value - (double)m_prevRawValue;
850 break;
851 case DCI_DT_STRING:
852 value = (INT32)((_tcscmp((const TCHAR *)value, (const TCHAR *)m_prevRawValue) == 0) ? 0 : 1);
853 break;
854 default:
855 // Delta calculation is not supported for other types
856 break;
857 }
858 break;
859 case DCM_AVERAGE_PER_MINUTE:
860 nElapsedTime /= 60; // Convert to minutes
861 /* no break */
862 case DCM_AVERAGE_PER_SECOND:
863 // Check elapsed time to prevent divide-by-zero exception
864 if (nElapsedTime == 0)
865 nElapsedTime++;
866
867 switch(m_dataType)
868 {
869 case DCI_DT_INT:
870 value = ((INT32)value - (INT32)m_prevRawValue) / (INT32)nElapsedTime;
871 break;
872 case DCI_DT_UINT:
873 value = ((UINT32)value - (UINT32)m_prevRawValue) / (UINT32)nElapsedTime;
874 break;
875 case DCI_DT_INT64:
876 value = ((INT64)value - (INT64)m_prevRawValue) / (INT64)nElapsedTime;
877 break;
878 case DCI_DT_UINT64:
879 value = ((UINT64)value - (UINT64)m_prevRawValue) / (UINT64)nElapsedTime;
880 break;
881 case DCI_DT_FLOAT:
882 value = ((double)value - (double)m_prevRawValue) / (double)nElapsedTime;
883 break;
884 case DCI_DT_STRING:
885 // I don't see any meaning in _T("average delta per second (minute)") for string
886 // values, so result will be 0 if there are no difference between
887 // current and previous values, and 1 otherwise
888 value = (INT32)((_tcscmp((const TCHAR *)value, (const TCHAR *)m_prevRawValue) == 0) ? 0 : 1);
889 break;
890 default:
891 // Delta calculation is not supported for other types
892 break;
893 }
894 break;
895 default: // Default is no transformation
896 break;
897 }
898
899 if (m_transformationScript != NULL)
900 {
901 NXSL_VM *vm = new NXSL_VM(new NXSL_ServerEnv());
902 if (vm->load(m_transformationScript))
903 {
904 NXSL_Value *pValue = new NXSL_Value((const TCHAR *)value);
905 vm->setGlobalVariable(_T("$object"), m_owner->createNXSLObject());
906 if (m_owner->getObjectClass() == OBJECT_NODE)
907 {
908 vm->setGlobalVariable(_T("$node"), m_owner->createNXSLObject());
909 }
910 vm->setGlobalVariable(_T("$dci"), createNXSLObject());
911 vm->setGlobalVariable(_T("$isCluster"), new NXSL_Value((m_owner->getObjectClass() == OBJECT_CLUSTER) ? 1 : 0));
912
913 // remove lock from DCI for script execution to avoid deadlocks
914 unlock();
915 success = vm->run(1, &pValue);
916 lock();
917 if (success)
918 {
919 pValue = vm->getResult();
920 if (pValue != NULL)
921 {
922 switch(m_dataType)
923 {
924 case DCI_DT_INT:
925 value = pValue->getValueAsInt32();
926 break;
927 case DCI_DT_UINT:
928 value = pValue->getValueAsUInt32();
929 break;
930 case DCI_DT_INT64:
931 value = pValue->getValueAsInt64();
932 break;
933 case DCI_DT_UINT64:
934 value = pValue->getValueAsUInt64();
935 break;
936 case DCI_DT_FLOAT:
937 value = pValue->getValueAsReal();
938 break;
939 case DCI_DT_STRING:
940 value = CHECK_NULL_EX(pValue->getValueAsCString());
941 break;
942 default:
943 break;
944 }
945 }
946 }
947 else if (vm->getErrorCode() == NXSL_ERR_EXECUTION_ABORTED)
948 {
949 DbgPrintf(6, _T("Transformation script for DCI \"%s\" [%d] on node %s [%d] aborted"),
950 m_description, m_id, getOwnerName(), getOwnerId());
951 }
952 else
953 {
954 TCHAR buffer[1024];
955 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
956 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
957 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
958 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
959 }
960 }
961 else
962 {
963 TCHAR buffer[1024];
964 _sntprintf(buffer, 1024, _T("DCI::%s::%d::TransformationScript"), getOwnerName(), m_id);
965 PostDciEvent(EVENT_SCRIPT_ERROR, g_dwMgmtNode, m_id, "ssd", buffer, vm->getErrorText(), m_id);
966 nxlog_write(MSG_TRANSFORMATION_SCRIPT_EXECUTION_ERROR, NXLOG_WARNING, "dsdss",
967 getOwnerId(), getOwnerName(), m_id, m_name, vm->getErrorText());
968 success = false;
969 }
970 delete vm;
971 }
972 return success;
973 }
974
975 /**
976 * Set new ID and node/template association
977 */
978 void DCItem::changeBinding(UINT32 dwNewId, Template *pNewNode, BOOL doMacroExpansion)
979 {
980 DCObject::changeBinding(dwNewId, pNewNode, doMacroExpansion);
981
982 lock();
983 if (dwNewId != 0)
984 {
985 for(int i = 0; i < getThresholdCount(); i++)
986 m_thresholds->get(i)->bindToItem(m_id, m_owner->getId());
987 }
988
989 clearCache();
990 updateCacheSizeInternal(true);
991 unlock();
992 }
993
994 /**
995 * Update required cache size depending on thresholds
996 * dwCondId is an identifier of calling condition object id. If it is not 0,
997 * GetCacheSizeForDCI should be called with bNoLock == TRUE for appropriate
998 * condition object
999 */
1000 void DCItem::updateCacheSizeInternal(bool allowLoad, UINT32 conditionId)
1001 {
1002 // Sanity check
1003 if (m_owner == NULL)
1004 {
1005 nxlog_debug(3, _T("DCItem::updateCacheSize() called for DCI %d when m_owner == NULL"), m_id);
1006 return;
1007 }
1008
1009 // Minimum cache size is 1 for nodes (so GetLastValue can work)
1010 // and it is always 0 for templates
1011 if (((m_owner->isDataCollectionTarget() && (m_owner->getObjectClass() != OBJECT_CLUSTER)) ||
1012 ((m_owner->getObjectClass() == OBJECT_CLUSTER) && isAggregateOnCluster())) &&
1013 (m_instanceDiscoveryMethod == IDM_NONE))
1014 {
1015 UINT32 requiredSize = 1;
1016
1017 // Calculate required cache size
1018 for(int i = 0; i < getThresholdCount(); i++)
1019 if (requiredSize < m_thresholds->get(i)->getRequiredCacheSize())
1020 requiredSize = m_thresholds->get(i)->getRequiredCacheSize();
1021
1022 ObjectArray<NetObj> *conditions = g_idxConditionById.getObjects(true);
1023 for(int i = 0; i < conditions->size(); i++)
1024 {
1025 ConditionObject *c = (ConditionObject *)conditions->get(i);
1026 UINT32 size = c->getCacheSizeForDCI(m_id, conditionId == c->getId());
1027 if (size > requiredSize)
1028 requiredSize = size;
1029 c->decRefCount();
1030 }
1031 delete conditions;
1032
1033 m_requiredCacheSize = requiredSize;
1034 }
1035 else
1036 {
1037 m_requiredCacheSize = 0;
1038 }
1039
1040 nxlog_debug_tag(_T("obj.dc.cache"), 8, _T("DCItem::updateCacheSizeInternal(dci=\"%s\", node=%s [%d]): requiredSize=%d cacheSize=%d"),
1041 m_name, m_owner->getName(), m_owner->getId(), m_requiredCacheSize, m_cacheSize);
1042
1043 // Update cache if needed
1044 if (m_requiredCacheSize < m_cacheSize)
1045 {
1046 // Destroy unneeded values
1047 if (m_cacheSize > 0)
1048 {
1049 for(UINT32 i = m_requiredCacheSize; i < m_cacheSize; i++)
1050 delete m_ppValueCache[i];
1051 }
1052
1053 m_cacheSize = m_requiredCacheSize;
1054 if (m_cacheSize > 0)
1055 {
1056 m_ppValueCache = (ItemValue **)realloc(m_ppValueCache, sizeof(ItemValue *) * m_cacheSize);
1057 }
1058 else
1059 {
1060 free(m_ppValueCache);
1061 m_ppValueCache = NULL;
1062 }
1063 }
1064 else if (m_requiredCacheSize > m_cacheSize)
1065 {
1066 // Load missing values from database
1067 // Skip caching for DCIs where estimated time to fill the cache is less then 5 minutes
1068 // to reduce load on database at server startup
1069 if (allowLoad && (m_owner != NULL) && (((m_requiredCacheSize - m_cacheSize) * m_iPollingInterval > 300) || (m_source == DS_PUSH_AGENT)))
1070 {
1071 m_bCacheLoaded = false;
1072 g_dciCacheLoaderQueue.put(new DCObjectInfo(this));
1073 }
1074 else
1075 {
1076 // will not read data from database, fill cache with empty values
1077 m_ppValueCache = (ItemValue **)realloc(m_ppValueCache, sizeof(ItemValue *) * m_requiredCacheSize);
1078 for(UINT32 i = m_cacheSize; i < m_requiredCacheSize; i++)
1079 m_ppValueCache[i] = new ItemValue(_T(""), 1);
1080 DbgPrintf(7, _T("Cache load skipped for parameter %s [%d]"), m_name, (int)m_id);
1081 m_cacheSize = m_requiredCacheSize;
1082 m_bCacheLoaded = true;
1083 }
1084 }
1085 }
1086
1087 /**
1088 * Reload cache from database
1089 */
1090 void DCItem::reloadCache()
1091 {
1092 TCHAR szBuffer[MAX_DB_STRING];
1093
1094 switch(g_dbSyntax)
1095 {
1096 case DB_SYNTAX_MSSQL:
1097 _sntprintf(szBuffer, MAX_DB_STRING, _T("SELECT TOP %d idata_value,idata_timestamp FROM idata_%d ")
1098 _T("WHERE item_id=%d ORDER BY idata_timestamp DESC"),
1099 m_requiredCacheSize, m_owner->getId(), m_id);
1100 break;
1101 case DB_SYNTAX_ORACLE:
1102 _sntprintf(szBuffer, MAX_DB_STRING, _T("SELECT * FROM (SELECT idata_value,idata_timestamp FROM idata_%d ")
1103 _T("WHERE item_id=%d ORDER BY idata_timestamp DESC) WHERE ROWNUM <= %d"),
1104 m_owner->getId(), m_id, m_requiredCacheSize);
1105 break;
1106 case DB_SYNTAX_MYSQL:
1107 case DB_SYNTAX_PGSQL:
1108 case DB_SYNTAX_SQLITE:
1109 _sntprintf(szBuffer, MAX_DB_STRING, _T("SELECT idata_value,idata_timestamp FROM idata_%d ")
1110 _T("WHERE item_id=%d ORDER BY idata_timestamp DESC LIMIT %d"),
1111 m_owner->getId(), m_id, m_requiredCacheSize);
1112 break;
1113 case DB_SYNTAX_DB2:
1114 _sntprintf(szBuffer, MAX_DB_STRING, _T("SELECT idata_value,idata_timestamp FROM idata_%d ")
1115 _T("WHERE item_id=%d ORDER BY idata_timestamp DESC FETCH FIRST %d ROWS ONLY"),
1116 m_owner->getId(), m_id, m_requiredCacheSize);
1117 break;
1118 default:
1119 _sntprintf(szBuffer, MAX_DB_STRING, _T("SELECT idata_value,idata_timestamp FROM idata_%d ")
1120 _T("WHERE item_id=%d ORDER BY idata_timestamp DESC"),
1121 m_owner->getId(), m_id);
1122 break;
1123 }
1124
1125 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
1126 DB_UNBUFFERED_RESULT hResult = DBSelectUnbuffered(hdb, szBuffer);
1127
1128 lock();
1129
1130 UINT32 i;
1131 for(i = 0; i < m_cacheSize; i++)
1132 delete m_ppValueCache[i];
1133
1134 if (m_cacheSize != m_requiredCacheSize)
1135 {
1136 m_ppValueCache = (ItemValue **)realloc(m_ppValueCache, sizeof(ItemValue *) * m_requiredCacheSize);
1137 }
1138
1139 nxlog_debug_tag(_T("obj.dc.cache"), 8, _T("DCItem::reloadCache(dci=\"%s\", node=%s [%d]): requiredSize=%d cacheSize=%d"),
1140 m_name, m_owner->getName(), m_owner->getId(), m_requiredCacheSize, m_cacheSize);
1141 if (hResult != NULL)
1142 {
1143 // Create cache entries
1144 bool moreData = true;
1145 for(i = 0; (i < m_requiredCacheSize) && moreData; i++)
1146 {
1147 moreData = DBFetch(hResult);
1148 if (moreData)
1149 {
1150 DBGetField(hResult, 0, szBuffer, MAX_DB_STRING);
1151 m_ppValueCache[i] = new ItemValue(szBuffer, DBGetFieldULong(hResult, 1));
1152 }
1153 else
1154 {
1155 m_ppValueCache[i] = new ItemValue(_T(""), 1); // Empty value
1156 }
1157 }
1158
1159 // Fill up cache with empty values if we don't have enough values in database
1160 if (i < m_requiredCacheSize)
1161 {
1162 nxlog_debug_tag(_T("obj.dc.cache"), 8, _T("DCItem::reloadCache(dci=\"%s\", node=%s [%d]): %d values missing in DB"),
1163 m_name, m_owner->getName(), m_owner->getId(), m_requiredCacheSize - i);
1164 for(; i < m_requiredCacheSize; i++)
1165 m_ppValueCache[i] = new ItemValue(_T(""), 1);
1166 }
1167 DBFreeResult(hResult);
1168 }
1169 else
1170 {
1171 // Error reading data from database, fill cache with empty values
1172 for(i = 0; i < m_requiredCacheSize; i++)
1173 m_ppValueCache[i] = new ItemValue(_T(""), 1);
1174 }
1175
1176 m_cacheSize = m_requiredCacheSize;
1177 m_bCacheLoaded = true;
1178 unlock();
1179
1180 DBConnectionPoolReleaseConnection(hdb);
1181 }
1182
1183 /**
1184 * Put last value into CSCP message
1185 */
1186 void DCItem::fillLastValueMessage(NXCPMessage *pMsg, UINT32 dwId)
1187 {
1188 lock();
1189 pMsg->setField(dwId++, m_id);
1190 pMsg->setField(dwId++, m_name);
1191 pMsg->setField(dwId++, m_description);
1192 pMsg->setField(dwId++, (UINT16)m_source);
1193 if (m_cacheSize > 0)
1194 {
1195 pMsg->setField(dwId++, (UINT16)m_dataType);
1196 pMsg->setField(dwId++, m_ppValueCache[0]->getString());
1197 pMsg->setFieldFromTime(dwId++, m_ppValueCache[0]->getTimeStamp());
1198 }
1199 else
1200 {
1201 pMsg->setField(dwId++, (UINT16)DCI_DT_NULL);
1202 pMsg->setField(dwId++, _T(""));
1203 pMsg->setField(dwId++, (UINT32)0);
1204 }
1205 pMsg->setField(dwId++, (WORD)(matchClusterResource() ? m_status : ITEM_STATUS_DISABLED)); // show resource-bound DCIs as inactive if cluster resource is not on this node
1206 pMsg->setField(dwId++, (WORD)getType());
1207 pMsg->setField(dwId++, m_dwErrorCount);
1208 pMsg->setField(dwId++, m_dwTemplateItemId);
1209
1210 int i;
1211 for(i = 0; i < getThresholdCount(); i++)
1212 {
1213 if (m_thresholds->get(i)->isReached())
1214 break;
1215 }
1216 if (i < getThresholdCount())
1217 {
1218 pMsg->setField(dwId++, (WORD)1);
1219 m_thresholds->get(i)->createMessage(pMsg, dwId);
1220 }
1221 else
1222 {
1223 pMsg->setField(dwId++, (WORD)0);
1224 }
1225
1226 unlock();
1227 }
1228
1229 /**
1230 * Get item's last value for use in NXSL
1231 */
1232 NXSL_Value *DCItem::getRawValueForNXSL()
1233 {
1234 lock();
1235 NXSL_Value *value = new NXSL_Value(m_prevRawValue.getString());
1236 unlock();
1237 return value;
1238 }
1239
1240 /**
1241 * Get item's last value for use in NXSL
1242 */
1243 NXSL_Value *DCItem::getValueForNXSL(int nFunction, int nPolls)
1244 {
1245 NXSL_Value *pValue;
1246
1247 lock();
1248 switch(nFunction)
1249 {
1250 case F_LAST:
1251 // cache placeholders will have timestamp 1
1252 pValue = (m_bCacheLoaded && (m_cacheSize > 0) && (m_ppValueCache[0]->getTimeStamp() != 1)) ? new NXSL_Value(m_ppValueCache[0]->getString()) : new NXSL_Value;
1253 break;
1254 case F_DIFF:
1255 if (m_bCacheLoaded && (m_cacheSize >= 2))
1256 {
1257 ItemValue result;
1258 CalculateItemValueDiff(result, m_dataType, *m_ppValueCache[0], *m_ppValueCache[1]);
1259 pValue = new NXSL_Value(result.getString());
1260 }
1261 else
1262 {
1263 pValue = new NXSL_Value;
1264 }
1265 break;
1266 case F_AVERAGE:
1267 if (m_bCacheLoaded && (m_cacheSize > 0))
1268 {
1269 ItemValue result;
1270 CalculateItemValueAverage(result, m_dataType, std::min(m_cacheSize, (UINT32)nPolls), m_ppValueCache);
1271 pValue = new NXSL_Value(result.getString());
1272 }
1273 else
1274 {
1275 pValue = new NXSL_Value;
1276 }
1277 break;
1278 case F_DEVIATION:
1279 if (m_bCacheLoaded && (m_cacheSize > 0))
1280 {
1281 ItemValue result;
1282 CalculateItemValueMD(result, m_dataType, std::min(m_cacheSize, (UINT32)nPolls), m_ppValueCache);
1283 pValue = new NXSL_Value(result.getString());
1284 }
1285 else
1286 {
1287 pValue = new NXSL_Value;
1288 }
1289 break;
1290 case F_ERROR:
1291 pValue = new NXSL_Value((INT32)((m_dwErrorCount >= (UINT32)nPolls) ? 1 : 0));
1292 break;
1293 default:
1294 pValue = new NXSL_Value;
1295 break;
1296 }
1297 unlock();
1298 return pValue;
1299 }
1300
1301 /**
1302 * Get last value
1303 */
1304 const TCHAR *DCItem::getLastValue()
1305 {
1306 lock();
1307 const TCHAR *v = (m_cacheSize > 0) ? (const TCHAR *)m_ppValueCache[0]->getString() : NULL;
1308 unlock();
1309 return v;
1310 }
1311
1312 /**
1313 * Get copy of internal last value object. Caller is responsible for destroying returned object.
1314 */
1315 ItemValue *DCItem::getInternalLastValue()
1316 {
1317 lock();
1318 ItemValue *v = (m_cacheSize > 0) ? new ItemValue(m_ppValueCache[0]) : NULL;
1319 unlock();
1320 return v;
1321 }
1322
1323 /**
1324 * Get aggregate value. Returned value must be deallocated by caller.
1325 *
1326 * @return dynamically allocated value or NULL on error
1327 */
1328 TCHAR *DCItem::getAggregateValue(AggregationFunction func, time_t periodStart, time_t periodEnd)
1329 {
1330 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
1331 TCHAR query[1024];
1332 TCHAR *result = NULL;
1333
1334 static const TCHAR *functions[] = { _T(""), _T("min"), _T("max"), _T("avg"), _T("sum") };
1335
1336 if (g_dbSyntax == DB_SYNTAX_ORACLE)
1337 {
1338 _sntprintf(query, 1024, _T("SELECT %s(coalesce(to_number(idata_value),0)) FROM idata_%u ")
1339 _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ?"),
1340 functions[func], m_owner->getId());
1341 }
1342 else if (g_dbSyntax == DB_SYNTAX_MSSQL)
1343 {
1344 _sntprintf(query, 1024, _T("SELECT %s(coalesce(cast(idata_value as float),0)) FROM idata_%u ")
1345 _T("WHERE item_id=? AND (idata_timestamp BETWEEN ? AND ?) AND isnumeric(idata_value)=1"),
1346 functions[func], m_owner->getId());
1347 }
1348 else if (g_dbSyntax == DB_SYNTAX_PGSQL)
1349 {
1350 _sntprintf(query, 1024, _T("SELECT %s(idata_value::double precision) FROM idata_%u ")
1351 _T("WHERE item_id=? AND idata_timestamp BETWEEN ? AND ? AND idata_value~E'^\\\\d+(\\\\.\\\\d+)*$'"),
1352 functions[func], m_owner->getId());
1353 }
1354 else
1355 {
1356 _sntprintf(query, 1024, _T("SELECT %s(coalesce(idata_value,0)) FROM idata_%u ")
1357 _T("WHERE item_id=? and idata_timestamp between ? and ?"),
1358 functions[func], m_owner->getId());
1359 }
1360
1361 DB_STATEMENT hStmt = DBPrepare(hdb, query);
1362 if (hStmt != NULL)
1363 {
1364 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, m_id);
1365 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)periodStart);
1366 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT32)periodEnd);
1367 DB_RESULT hResult = DBSelectPrepared(hStmt);
1368 if (hResult != NULL)
1369 {
1370 if (DBGetNumRows(hResult) == 1)
1371 {
1372 result = DBGetField(hResult, 0, 0, NULL, 0);
1373 }
1374 DBFreeResult(hResult);
1375 }
1376 DBFreeStatement(hStmt);
1377 }
1378
1379 DBConnectionPoolReleaseConnection(hdb);
1380 return result;
1381 }
1382
1383 /**
1384 * Delete all collected data
1385 */
1386 bool DCItem::deleteAllData()
1387 {
1388 TCHAR szQuery[256];
1389 bool success;
1390
1391 lock();
1392 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
1393 _sntprintf(szQuery, 256, _T("DELETE FROM idata_%d WHERE item_id=%d"), m_owner->getId(), m_id);
1394 success = DBQuery(hdb, szQuery) ? true : false;
1395 DBConnectionPoolReleaseConnection(hdb);
1396 clearCache();
1397 updateCacheSizeInternal(true);
1398 unlock();
1399 return success;
1400 }
1401
1402 /**
1403 * Update from template item
1404 */
1405 void DCItem::updateFromTemplate(DCObject *src)
1406 {
1407 DCObject::updateFromTemplate(src);
1408
1409 if (src->getType() != DCO_TYPE_ITEM)
1410 {
1411 DbgPrintf(2, _T("INTERNAL ERROR: DCItem::updateFromTemplate(%d, %d): source type is %d"), (int)m_id, (int)src->getId(), src->getType());
1412 return;
1413 }
1414
1415 lock();
1416 DCItem *item = (DCItem *)src;
1417
1418 m_dataType = item->m_dataType;
1419 m_deltaCalculation = item->m_deltaCalculation;
1420 m_sampleCount = item->m_sampleCount;
1421 m_snmpRawValueType = item->m_snmpRawValueType;
1422
1423 m_nBaseUnits = item->m_nBaseUnits;
1424 m_nMultiplier = item->m_nMultiplier;
1425 safe_free(m_customUnitName);
1426 m_customUnitName = (item->m_customUnitName != NULL) ? _tcsdup(item->m_customUnitName) : NULL;
1427
1428 // Copy thresholds
1429 // ***************************
1430 // First, skip matching thresholds
1431 int count = std::min(getThresholdCount(), item->getThresholdCount());
1432 int i;
1433 for(i = 0; i < count; i++)
1434 if (!m_thresholds->get(i)->compare(item->m_thresholds->get(i)))
1435 break;
1436 count = i; // First unmatched threshold's position
1437
1438 // Delete all original thresholds starting from first unmatched
1439 while(count < getThresholdCount())
1440 m_thresholds->remove(count);
1441
1442 // (Re)create thresholds starting from first unmatched
1443 if ((m_thresholds == NULL) && (item->getThresholdCount() > 0))
1444 m_thresholds = new ObjectArray<Threshold>(item->getThresholdCount(), 8, true);
1445 for(i = count; i < item->getThresholdCount(); i++)
1446 {
1447 Threshold *t = new Threshold(item->m_thresholds->get(i));
1448 t->createId();
1449 t->bindToItem(m_id, m_owner->getId());
1450 m_thresholds->add(t);
1451 }
1452
1453 // Update data type in thresholds
1454 for(i = 0; i < getThresholdCount(); i++)
1455 m_thresholds->get(i)->setDataType(m_dataType);
1456
1457 updateCacheSizeInternal(true);
1458 unlock();
1459 }
1460
1461 /**
1462 * Get list of used events
1463 */
1464 void DCItem::getEventList(IntegerArray<UINT32> *eventList)
1465 {
1466 lock();
1467 if (m_thresholds != NULL)
1468 {
1469 for(int i = 0; i < m_thresholds->size(); i++)
1470 {
1471 eventList->add(m_thresholds->get(i)->getEventCode());
1472 eventList->add(m_thresholds->get(i)->getRearmEventCode());
1473 }
1474 }
1475 unlock();
1476 }
1477
1478 /**
1479 * Create management pack record
1480 */
1481 void DCItem::createExportRecord(String &str)
1482 {
1483 lock();
1484
1485 str.appendFormattedString(_T("\t\t\t\t<dci id=\"%d\">\n")
1486 _T("\t\t\t\t\t<guid>%s</guid>\n")
1487 _T("\t\t\t\t\t<name>%s</name>\n")
1488 _T("\t\t\t\t\t<description>%s</description>\n")
1489 _T("\t\t\t\t\t<dataType>%d</dataType>\n")
1490 _T("\t\t\t\t\t<samples>%d</samples>\n")
1491 _T("\t\t\t\t\t<origin>%d</origin>\n")
1492 _T("\t\t\t\t\t<interval>%d</interval>\n")
1493 _T("\t\t\t\t\t<retention>%d</retention>\n")
1494 _T("\t\t\t\t\t<instance>%s</instance>\n")
1495 _T("\t\t\t\t\t<systemTag>%s</systemTag>\n")
1496 _T("\t\t\t\t\t<delta>%d</delta>\n")
1497 _T("\t\t\t\t\t<flags>%d</flags>\n")
1498 _T("\t\t\t\t\t<snmpRawValueType>%d</snmpRawValueType>\n")
1499 _T("\t\t\t\t\t<snmpPort>%d</snmpPort>\n")
1500 _T("\t\t\t\t\t<instanceDiscoveryMethod>%d</instanceDiscoveryMethod>\n"),
1501 (int)m_id, (const TCHAR *)m_guid.toString(),
1502 (const TCHAR *)EscapeStringForXML2(m_name),
1503 (const TCHAR *)EscapeStringForXML2(m_description),
1504 m_dataType, m_sampleCount, (int)m_source, m_iPollingInterval, m_iRetentionTime,
1505 (const TCHAR *)EscapeStringForXML2(m_instance),
1506 (const TCHAR *)EscapeStringForXML2(m_systemTag),
1507 (int)m_deltaCalculation, (int)m_flags,
1508 (int)m_snmpRawValueType, (int)m_snmpPort, (int)m_instanceDiscoveryMethod);
1509
1510 if (m_transformationScriptSource != NULL)
1511 {
1512 str += _T("\t\t\t\t\t<transformation>");
1513 str.appendPreallocated(EscapeStringForXML(m_transformationScriptSource, -1));
1514 str += _T("</transformation>\n");
1515 }
1516
1517 if ((m_schedules != NULL) && (m_schedules->size() > 0))
1518 {
1519 str += _T("\t\t\t\t\t<schedules>\n");
1520 for(int i = 0; i < m_schedules->size(); i++)
1521 str.appendFormattedString(_T("\t\t\t\t\t\t<schedule>%s</schedule>\n"), (const TCHAR *)EscapeStringForXML2(m_schedules->get(i)));
1522 str += _T("\t\t\t\t\t</schedules>\n");
1523 }
1524
1525 if (m_thresholds != NULL)
1526 {
1527 str += _T("\t\t\t\t\t<thresholds>\n");
1528 for(int i = 0; i < m_thresholds->size(); i++)
1529 {
1530 m_thresholds->get(i)->createNXMPRecord(str, i + 1);
1531 }
1532 str += _T("\t\t\t\t\t</thresholds>\n");
1533 }
1534
1535 if (m_pszPerfTabSettings != NULL)
1536 {
1537 str += _T("\t\t\t\t\t<perfTabSettings>");
1538 str.appendPreallocated(EscapeStringForXML(m_pszPerfTabSettings, -1));
1539 str += _T("</perfTabSettings>\n");
1540 }
1541
1542 if (m_instanceDiscoveryData != NULL)
1543 {
1544 str += _T("\t\t\t\t\t<instanceDiscoveryData>");
1545 str.appendPreallocated(EscapeStringForXML(m_instanceDiscoveryData, -1));
1546 str += _T("</instanceDiscoveryData>\n");
1547 }
1548
1549 if (m_instanceFilterSource != NULL)
1550 {
1551 str += _T("\t\t\t\t\t<instanceFilter>");
1552 str.appendPreallocated(EscapeStringForXML(m_instanceFilterSource, -1));
1553 str += _T("</instanceFilter>\n");
1554 }
1555
1556 unlock();
1557 str += _T("\t\t\t\t</dci>\n");
1558 }
1559
1560 /**
1561 * Add threshold to the list
1562 */
1563 void DCItem::addThreshold(Threshold *pThreshold)
1564 {
1565 if (m_thresholds == NULL)
1566 m_thresholds = new ObjectArray<Threshold>(8, 8, true);
1567 m_thresholds->add(pThreshold);
1568 }
1569
1570 /**
1571 * Enumerate all thresholds
1572 */
1573 BOOL DCItem::enumThresholds(BOOL (* pfCallback)(Threshold *, UINT32, void *), void *pArg)
1574 {
1575 BOOL bRet = TRUE;
1576
1577 lock();
1578 if (m_thresholds != NULL)
1579 {
1580 for(int i = 0; i < m_thresholds->size(); i++)
1581 {
1582 if (!pfCallback(m_thresholds->get(i), i, pArg))
1583 {
1584 bRet = FALSE;
1585 break;
1586 }
1587 }
1588 }
1589 unlock();
1590 return bRet;
1591 }
1592
1593 /**
1594 * Test DCI's transformation script
1595 */
1596 bool DCItem::testTransformation(DataCollectionTarget *object, const TCHAR *script, const TCHAR *value, TCHAR *buffer, size_t bufSize)
1597 {
1598 bool success = false;
1599 NXSL_VM *vm = NXSLCompileAndCreateVM(script, buffer, (int)bufSize, new NXSL_ServerEnv);
1600 if (vm != NULL)
1601 {
1602 NXSL_Value *pValue = new NXSL_Value(value);
1603 vm->setGlobalVariable(_T("$object"), object->createNXSLObject());
1604 if (object->getObjectClass() == OBJECT_NODE)
1605 {
1606 vm->setGlobalVariable(_T("$node"), object->createNXSLObject());
1607 }
1608 //FIXME: vm->setGlobalVariable(_T("$dci"), new NXSL_Value(new NXSL_Object(&g_nxslDciClass, this)));
1609 vm->setGlobalVariable(_T("$isCluster"), new NXSL_Value((object->getObjectClass() == OBJECT_CLUSTER) ? 1 : 0));
1610
1611 if (vm->run(1, &pValue))
1612 {
1613 pValue = vm->getResult();
1614 if (pValue != NULL)
1615 {
1616 if (pValue->isNull())
1617 {
1618 nx_strncpy(buffer, _T("(null)"), bufSize);
1619 }
1620 else if (pValue->isObject())
1621 {
1622 nx_strncpy(buffer, _T("(object)"), bufSize);
1623 }
1624 else if (pValue->isArray())
1625 {
1626 nx_strncpy(buffer, _T("(array)"), bufSize);
1627 }
1628 else
1629 {
1630 const TCHAR *strval;
1631
1632 strval = pValue->getValueAsCString();
1633 nx_strncpy(buffer, CHECK_NULL(strval), bufSize);
1634 }
1635 }
1636 else
1637 {
1638 nx_strncpy(buffer, _T("(null)"), bufSize);
1639 }
1640 success = true;
1641 }
1642 else
1643 {
1644 nx_strncpy(buffer, vm->getErrorText(), bufSize);
1645 }
1646 }
1647 delete vm;
1648 return success;
1649 }
1650
1651 /**
1652 * Fill NXCP message with thresholds
1653 */
1654 void DCItem::fillMessageWithThresholds(NXCPMessage *msg)
1655 {
1656 lock();
1657
1658 msg->setField(VID_NUM_THRESHOLDS, (UINT32)getThresholdCount());
1659 UINT32 id = VID_DCI_THRESHOLD_BASE;
1660 for(int i = 0; i < getThresholdCount(); i++, id += 20)
1661 {
1662 m_thresholds->get(i)->createMessage(msg, id);
1663 }
1664
1665 unlock();
1666 }
1667
1668 /**
1669 * Check if DCI has active threshold
1670 */
1671 bool DCItem::hasActiveThreshold()
1672 {
1673 bool result = false;
1674 lock();
1675 for(int i = 0; i < getThresholdCount(); i++)
1676 {
1677 if (m_thresholds->get(i)->isReached())
1678 {
1679 result = true;
1680 break;
1681 }
1682 }
1683 unlock();
1684 return result;
1685 }
1686
1687 /**
1688 * Get severity of active threshold. If no active threshold exist, returns SEVERITY_NORMAL.
1689 */
1690 int DCItem::getThresholdSeverity()
1691 {
1692 int result = SEVERITY_NORMAL;
1693 lock();
1694 for(int i = 0; i < getThresholdCount(); i++)
1695 {
1696 Threshold *t = m_thresholds->get(i);
1697 if (t->isReached())
1698 {
1699 result = t->getCurrentSeverity();
1700 break;
1701 }
1702 }
1703 unlock();
1704 return result;
1705 }
1706
1707 /**
1708 * Returns true if internal cache is loaded. If data collection object
1709 * does not have cache should return true
1710 */
1711 bool DCItem::isCacheLoaded()
1712 {
1713 return m_bCacheLoaded;
1714 }
1715
1716 /**
1717 * Create DCI from import file
1718 */
1719 void DCItem::updateFromImport(ConfigEntry *config)
1720 {
1721 DCObject::updateFromImport(config);
1722
1723 lock();
1724 m_dataType = (BYTE)config->getSubEntryValueAsInt(_T("dataType"));
1725 m_deltaCalculation = (BYTE)config->getSubEntryValueAsInt(_T("delta"));
1726 m_sampleCount = (BYTE)config->getSubEntryValueAsInt(_T("samples"));
1727 m_snmpRawValueType = (WORD)config->getSubEntryValueAsInt(_T("snmpRawValueType"));
1728
1729 ConfigEntry *thresholdsRoot = config->findEntry(_T("thresholds"));
1730 if (thresholdsRoot != NULL)
1731 {
1732 ObjectArray<ConfigEntry> *thresholds = thresholdsRoot->getSubEntries(_T("threshold#*"));
1733 if (m_thresholds != NULL)
1734 m_thresholds->clear();
1735 else
1736 m_thresholds = new ObjectArray<Threshold>(thresholds->size(), 8, true);
1737 for(int i = 0; i < thresholds->size(); i++)
1738 {
1739 m_thresholds->add(new Threshold(thresholds->get(i), this));
1740 }
1741 delete thresholds;
1742 }
1743 else
1744 {
1745 delete_and_null(m_thresholds);
1746 }
1747
1748 updateCacheSizeInternal(true);
1749 unlock();
1750 }
1751
1752 /*
1753 * Clone DCI
1754 */
1755 DCObject *DCItem::clone()
1756 {
1757 return new DCItem(this);
1758 }
1759
1760 /**
1761 * Serialize object to JSON
1762 */
1763 json_t *DCItem::toJson()
1764 {
1765 json_t *root = DCObject::toJson();
1766 json_object_set_new(root, "deltaCalculation", json_integer(m_deltaCalculation));
1767 json_object_set_new(root, "dataType", json_integer(m_dataType));
1768 json_object_set_new(root, "sampleCount", json_integer(m_sampleCount));
1769 json_object_set_new(root, "thresholds", json_object_array(m_thresholds));
1770 json_object_set_new(root, "prevRawValue", json_string_t(m_prevRawValue));
1771 json_object_set_new(root, "prevValueTimeStamp", json_integer(m_tPrevValueTimeStamp));
1772 json_object_set_new(root, "baseUnits", json_integer(m_nBaseUnits));
1773 json_object_set_new(root, "multiplier", json_integer(m_nMultiplier));
1774 json_object_set_new(root, "customUnitName", json_string_t(m_customUnitName));
1775 json_object_set_new(root, "snmpRawValueType", json_integer(m_snmpRawValueType));
1776 json_object_set_new(root, "predictionEngine", json_string_t(m_predictionEngine));
1777 return root;
1778 }
1779
1780 /**
1781 * Prepare DCI object for recalculation (should be executed on DCI copy)
1782 */
1783 void DCItem::prepareForRecalc()
1784 {
1785 m_tPrevValueTimeStamp = 0;
1786 m_tLastPoll = 0;
1787 updateCacheSizeInternal(false);
1788 }
1789
1790 /**
1791 * Recalculate old value (should be executed on DCI copy)
1792 */
1793 void DCItem::recalculateValue(ItemValue &value)
1794 {
1795 if (m_tPrevValueTimeStamp == 0)
1796 m_prevRawValue = value; // Delta should be zero for first poll
1797 ItemValue rawValue = value;
1798
1799 // Cluster can have only aggregated data, and transformation
1800 // should not be used on aggregation
1801 if ((m_owner->getObjectClass() != OBJECT_CLUSTER) || (m_flags & DCF_TRANSFORM_AGGREGATED))
1802 {
1803 if (!transform(value, (value.getTimeStamp() > m_tPrevValueTimeStamp) ? (value.getTimeStamp() - m_tPrevValueTimeStamp) : 0))
1804 {
1805 return;
1806 }
1807 }
1808
1809 if (value.getTimeStamp() > m_tPrevValueTimeStamp)
1810 {
1811 m_prevRawValue = rawValue;
1812 m_tPrevValueTimeStamp = value.getTimeStamp();
1813 }
1814
1815 if ((m_cacheSize > 0) && (value.getTimeStamp() >= m_tPrevValueTimeStamp))
1816 {
1817 delete m_ppValueCache[m_cacheSize - 1];
1818 memmove(&m_ppValueCache[1], m_ppValueCache, sizeof(ItemValue *) * (m_cacheSize - 1));
1819 m_ppValueCache[0] = new ItemValue(&value);
1820 }
1821
1822 m_tLastPoll = value.getTimeStamp();
1823 }