- Fixed server crash sometimes caused by DCI deletion
[public/netxms.git] / src / server / core / dcitem.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003, 2004, 2005 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** $module: dcitem.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25
26 //
27 // Default constructor for DCItem
28 //
29
30 DCItem::DCItem()
31 {
32 m_dwId = 0;
33 m_dwTemplateId = 0;
34 m_dwNumThresholds = 0;
35 m_ppThresholdList = NULL;
36 m_iBusy = 0;
37 m_iDataType = DCI_DT_INT;
38 m_iPollingInterval = 3600;
39 m_iRetentionTime = 0;
40 m_iDeltaCalculation = DCM_ORIGINAL_VALUE;
41 m_iSource = DS_INTERNAL;
42 m_iStatus = ITEM_STATUS_NOT_SUPPORTED;
43 m_szName[0] = 0;
44 m_szDescription[0] = 0;
45 m_szInstance[0] = 0;
46 m_tLastPoll = 0;
47 m_pszFormula = _tcsdup(_T(""));
48 m_pNode = NULL;
49 m_hMutex = MutexCreate();
50 m_dwCacheSize = 0;
51 m_ppValueCache = NULL;
52 }
53
54
55 //
56 // Create DCItem from another DCItem
57 //
58
59 DCItem::DCItem(const DCItem *pSrc)
60 {
61 DWORD i;
62
63 m_dwId = pSrc->m_dwId;
64 m_dwTemplateId = pSrc->m_dwTemplateId;
65 m_iBusy = 0;
66 m_iDataType = pSrc->m_iDataType;
67 m_iPollingInterval = pSrc->m_iPollingInterval;
68 m_iRetentionTime = pSrc->m_iRetentionTime;
69 m_iDeltaCalculation = pSrc->m_iDeltaCalculation;
70 m_iSource = pSrc->m_iSource;
71 m_iStatus = pSrc->m_iStatus;
72 _tcscpy(m_szName, pSrc->m_szName);
73 _tcscpy(m_szDescription, pSrc->m_szDescription);
74 _tcscpy(m_szInstance, pSrc->m_szInstance);
75 m_tLastPoll = 0;
76 m_pszFormula = _tcsdup(pSrc->m_pszFormula);
77 m_pNode = NULL;
78 m_hMutex = MutexCreate();
79 m_dwCacheSize = 0;
80 m_ppValueCache = NULL;
81
82 // Copy thresholds
83 m_dwNumThresholds = pSrc->m_dwNumThresholds;
84 m_ppThresholdList = (Threshold **)malloc(sizeof(Threshold *) * m_dwNumThresholds);
85 for(i = 0; i < m_dwNumThresholds; i++)
86 {
87 m_ppThresholdList[i] = new Threshold(pSrc->m_ppThresholdList[i]);
88 m_ppThresholdList[i]->CreateId();
89 }
90
91 UpdateCacheSize();
92 }
93
94
95 //
96 // Constructor for creating DCItem from database
97 // Assumes that fields in SELECT query are in following order:
98 // item_id,name,source,datatype,polling_interval,retention_time,status,
99 // delta_calculation,transformation,template_id,description,instance
100 //
101
102 DCItem::DCItem(DB_RESULT hResult, int iRow, Template *pNode)
103 {
104 m_dwId = DBGetFieldULong(hResult, iRow, 0);
105 strncpy(m_szName, DBGetField(hResult, iRow, 1), MAX_ITEM_NAME);
106 DecodeSQLString(m_szName);
107 m_iSource = (BYTE)DBGetFieldLong(hResult, iRow, 2);
108 m_iDataType = (BYTE)DBGetFieldLong(hResult, iRow, 3);
109 m_iPollingInterval = DBGetFieldLong(hResult, iRow, 4);
110 m_iRetentionTime = DBGetFieldLong(hResult, iRow, 5);
111 m_iStatus = (BYTE)DBGetFieldLong(hResult, iRow, 6);
112 m_iDeltaCalculation = (BYTE)DBGetFieldLong(hResult, iRow, 7);
113 m_pszFormula = strdup(DBGetField(hResult, iRow, 8));
114 m_dwTemplateId = DBGetFieldULong(hResult, iRow, 9);
115 strncpy(m_szDescription, DBGetField(hResult, iRow, 10), MAX_DB_STRING);
116 DecodeSQLString(m_szDescription);
117 strncpy(m_szInstance, DBGetField(hResult, iRow, 11), MAX_DB_STRING);
118 DecodeSQLString(m_szInstance);
119 m_iBusy = 0;
120 m_tLastPoll = 0;
121 m_dwNumThresholds = 0;
122 m_ppThresholdList = NULL;
123 m_pNode = pNode;
124 m_hMutex = MutexCreate();
125 m_dwCacheSize = 0;
126 m_ppValueCache = NULL;
127 }
128
129
130 //
131 // Constructor for creating new DCItem from scratch
132 //
133
134 DCItem::DCItem(DWORD dwId, char *szName, int iSource, int iDataType,
135 int iPollingInterval, int iRetentionTime, Template *pNode,
136 char *pszDescription)
137 {
138 m_dwId = dwId;
139 m_dwTemplateId = 0;
140 strncpy(m_szName, szName, MAX_ITEM_NAME);
141 if (pszDescription != NULL)
142 strncpy(m_szDescription, pszDescription, MAX_DB_STRING);
143 else
144 strcpy(m_szDescription, m_szName);
145 m_szInstance[0] = 0;
146 m_iSource = iSource;
147 m_iDataType = iDataType;
148 m_iPollingInterval = iPollingInterval;
149 m_iRetentionTime = iRetentionTime;
150 m_iDeltaCalculation = DCM_ORIGINAL_VALUE;
151 m_iStatus = ITEM_STATUS_ACTIVE;
152 m_iBusy = 0;
153 m_tLastPoll = 0;
154 m_pszFormula = strdup("");
155 m_dwNumThresholds = 0;
156 m_ppThresholdList = NULL;
157 m_pNode = pNode;
158 m_hMutex = MutexCreate();
159 m_dwCacheSize = 0;
160 m_ppValueCache = NULL;
161
162 UpdateCacheSize();
163 }
164
165
166 //
167 // Destructor for DCItem
168 //
169
170 DCItem::~DCItem()
171 {
172 DWORD i;
173
174 for(i = 0; i < m_dwNumThresholds; i++)
175 delete m_ppThresholdList[i];
176 safe_free(m_ppThresholdList);
177 safe_free(m_pszFormula);
178 for(i = 0; i < m_dwCacheSize; i++)
179 delete m_ppValueCache[i];
180 safe_free(m_ppValueCache);
181 MutexDestroy(m_hMutex);
182 }
183
184
185 //
186 // Load data collection items thresholds from database
187 //
188
189 BOOL DCItem::LoadThresholdsFromDB(void)
190 {
191 DWORD i;
192 char szQuery[256];
193 DB_RESULT hResult;
194 BOOL bResult = FALSE;
195
196 sprintf(szQuery, "SELECT threshold_id,fire_value,rearm_value,check_function,"
197 "check_operation,parameter_1,parameter_2,event_code FROM thresholds "
198 "WHERE item_id=%ld ORDER BY sequence_number", m_dwId);
199 hResult = DBSelect(g_hCoreDB, szQuery);
200 if (hResult != NULL)
201 {
202 m_dwNumThresholds = DBGetNumRows(hResult);
203 if (m_dwNumThresholds > 0)
204 {
205 m_ppThresholdList = (Threshold **)malloc(sizeof(Threshold *) * m_dwNumThresholds);
206 for(i = 0; i < m_dwNumThresholds; i++)
207 m_ppThresholdList[i] = new Threshold(hResult, i, this);
208 }
209 DBFreeResult(hResult);
210 bResult = TRUE;
211 }
212
213 UpdateCacheSize();
214
215 return bResult;
216 }
217
218
219 //
220 // Save to database
221 //
222
223 BOOL DCItem::SaveToDB(void)
224 {
225 TCHAR *pszEscName, *pszEscFormula, *pszEscDescr, *pszEscInstance, szQuery[1024];
226 DB_RESULT hResult;
227 BOOL bNewObject = TRUE, bResult;
228
229 Lock();
230
231 // Check for object's existence in database
232 sprintf(szQuery, "SELECT item_id FROM items WHERE item_id=%ld", m_dwId);
233 hResult = DBSelect(g_hCoreDB, szQuery);
234 if (hResult != 0)
235 {
236 if (DBGetNumRows(hResult) > 0)
237 bNewObject = FALSE;
238 DBFreeResult(hResult);
239 }
240
241 // Prepare and execute query
242 pszEscName = EncodeSQLString(m_szName);
243 pszEscFormula = EncodeSQLString(m_pszFormula);
244 pszEscDescr = EncodeSQLString(m_szDescription);
245 pszEscInstance = EncodeSQLString(m_szInstance);
246 if (bNewObject)
247 sprintf(szQuery, "INSERT INTO items (item_id,node_id,template_id,name,description,source,"
248 "datatype,polling_interval,retention_time,status,delta_calculation,"
249 "transformation,instance) VALUES (%ld,%ld,%ld,'%s','%s',%d,%d,%ld,%ld,%d,%d,'%s','%s')",
250 m_dwId, (m_pNode == NULL) ? 0 : m_pNode->Id(), m_dwTemplateId,
251 pszEscName, pszEscDescr, m_iSource, m_iDataType, m_iPollingInterval,
252 m_iRetentionTime, m_iStatus, m_iDeltaCalculation, pszEscFormula, pszEscInstance);
253 else
254 sprintf(szQuery, "UPDATE items SET node_id=%ld,template_id=%ld,name='%s',source=%d,"
255 "datatype=%d,polling_interval=%ld,retention_time=%ld,status=%d,"
256 "delta_calculation=%d,transformation='%s',description='%s',"
257 "instance='%s' WHERE item_id=%ld",
258 (m_pNode == NULL) ? 0 : m_pNode->Id(), m_dwTemplateId,
259 pszEscName, m_iSource, m_iDataType, m_iPollingInterval,
260 m_iRetentionTime, m_iStatus, m_iDeltaCalculation, pszEscFormula,
261 pszEscDescr, pszEscInstance, m_dwId);
262 bResult = DBQuery(g_hCoreDB, szQuery);
263 free(pszEscName);
264 free(pszEscFormula);
265 free(pszEscDescr);
266 free(pszEscInstance);
267
268 // Save thresholds
269 if (bResult)
270 {
271 DWORD i;
272
273 for(i = 0; i < m_dwNumThresholds; i++)
274 m_ppThresholdList[i]->SaveToDB(i);
275 }
276
277 // Delete non-existing thresholds
278 sprintf(szQuery, "SELECT threshold_id FROM thresholds WHERE item_id=%ld", m_dwId);
279 hResult = DBSelect(g_hCoreDB, szQuery);
280 if (hResult != 0)
281 {
282 int i, iNumRows;
283 DWORD j, dwId;
284
285 iNumRows = DBGetNumRows(hResult);
286 for(i = 0; i < iNumRows; i++)
287 {
288 dwId = DBGetFieldULong(hResult, i, 0);
289 for(j = 0; j < m_dwNumThresholds; j++)
290 if (m_ppThresholdList[j]->Id() == dwId)
291 break;
292 if (j == m_dwNumThresholds)
293 {
294 sprintf(szQuery, "DELETE FROM thresholds WHERE threshold_id=%ld", dwId);
295 DBQuery(g_hCoreDB, szQuery);
296 }
297 }
298 DBFreeResult(hResult);
299 }
300
301 Unlock();
302 return bResult;
303 }
304
305
306 //
307 // Check last value for threshold breaches
308 //
309
310 void DCItem::CheckThresholds(ItemValue &value)
311 {
312 DWORD i, iResult;
313 ItemValue checkValue;
314
315 for(i = 0; i < m_dwNumThresholds; i++)
316 {
317 iResult = m_ppThresholdList[i]->Check(value, m_ppValueCache, checkValue);
318 switch(iResult)
319 {
320 case THRESHOLD_REACHED:
321 PostEvent(m_ppThresholdList[i]->EventCode(), m_pNode->Id(), "ssssis", m_szName,
322 m_szDescription, m_ppThresholdList[i]->StringValue(),
323 (const char *)checkValue, m_dwId, m_szInstance);
324 i = m_dwNumThresholds; // Stop processing
325 break;
326 case THRESHOLD_REARMED:
327 PostEvent(EVENT_THRESHOLD_REARMED, m_pNode->Id(), "ssi", m_szName,
328 m_szDescription, m_dwId);
329 break;
330 case NO_ACTION:
331 if (m_ppThresholdList[i]->IsReached())
332 i = m_dwNumThresholds; // Threshold condition still true, stop processing
333 break;
334 }
335 }
336 }
337
338
339 //
340 // Create CSCP message with item data
341 //
342
343 void DCItem::CreateMessage(CSCPMessage *pMsg)
344 {
345 DCI_THRESHOLD dct;
346 DWORD i, dwId;
347
348 Lock();
349 pMsg->SetVariable(VID_DCI_ID, m_dwId);
350 pMsg->SetVariable(VID_NAME, m_szName);
351 pMsg->SetVariable(VID_DESCRIPTION, m_szDescription);
352 pMsg->SetVariable(VID_INSTANCE, m_szInstance);
353 pMsg->SetVariable(VID_POLLING_INTERVAL, (DWORD)m_iPollingInterval);
354 pMsg->SetVariable(VID_RETENTION_TIME, (DWORD)m_iRetentionTime);
355 pMsg->SetVariable(VID_DCI_SOURCE_TYPE, (WORD)m_iSource);
356 pMsg->SetVariable(VID_DCI_DATA_TYPE, (WORD)m_iDataType);
357 pMsg->SetVariable(VID_DCI_STATUS, (WORD)m_iStatus);
358 pMsg->SetVariable(VID_DCI_DELTA_CALCULATION, (WORD)m_iDeltaCalculation);
359 pMsg->SetVariable(VID_DCI_FORMULA, m_pszFormula);
360 pMsg->SetVariable(VID_NUM_THRESHOLDS, m_dwNumThresholds);
361 for(i = 0, dwId = VID_DCI_THRESHOLD_BASE; i < m_dwNumThresholds; i++, dwId++)
362 {
363 m_ppThresholdList[i]->CreateMessage(&dct);
364 pMsg->SetVariable(dwId, (BYTE *)&dct, sizeof(DCI_THRESHOLD));
365 }
366 Unlock();
367 }
368
369
370 //
371 // Delete item and collected data from database
372 //
373
374 void DCItem::DeleteFromDB(void)
375 {
376 char szQuery[256];
377
378 sprintf(szQuery, "DELETE FROM items WHERE item_id=%d", m_dwId);
379 QueueSQLRequest(szQuery);
380 sprintf(szQuery, "DELETE FROM idata_%d WHERE item_id=%d", m_pNode->Id(), m_dwId);
381 QueueSQLRequest(szQuery);
382 sprintf(szQuery, "DELETE FROM thresholds WHERE item_id=%d", m_dwId);
383 QueueSQLRequest(szQuery);
384 }
385
386
387 //
388 // Update item from CSCP message
389 //
390
391 void DCItem::UpdateFromMessage(CSCPMessage *pMsg, DWORD *pdwNumMaps,
392 DWORD **ppdwMapIndex, DWORD **ppdwMapId)
393 {
394 DWORD i, j, dwNum, dwId;
395 DCI_THRESHOLD *pNewThresholds;
396 Threshold **ppNewList;
397
398 Lock();
399
400 pMsg->GetVariableStr(VID_NAME, m_szName, MAX_ITEM_NAME);
401 pMsg->GetVariableStr(VID_DESCRIPTION, m_szDescription, MAX_DB_STRING);
402 pMsg->GetVariableStr(VID_INSTANCE, m_szInstance, MAX_DB_STRING);
403 m_iSource = (BYTE)pMsg->GetVariableShort(VID_DCI_SOURCE_TYPE);
404 m_iDataType = (BYTE)pMsg->GetVariableShort(VID_DCI_DATA_TYPE);
405 m_iPollingInterval = pMsg->GetVariableLong(VID_POLLING_INTERVAL);
406 m_iRetentionTime = pMsg->GetVariableLong(VID_RETENTION_TIME);
407 m_iStatus = (BYTE)pMsg->GetVariableShort(VID_DCI_STATUS);
408 m_iDeltaCalculation = (BYTE)pMsg->GetVariableShort(VID_DCI_DELTA_CALCULATION);
409 safe_free(m_pszFormula);
410 m_pszFormula = pMsg->GetVariableStr(VID_DCI_FORMULA);
411
412 // Update thresholds
413 dwNum = pMsg->GetVariableLong(VID_NUM_THRESHOLDS);
414 pNewThresholds = (DCI_THRESHOLD *)malloc(sizeof(DCI_THRESHOLD) * dwNum);
415 *ppdwMapIndex = (DWORD *)malloc(dwNum * sizeof(DWORD));
416 *ppdwMapId = (DWORD *)malloc(dwNum * sizeof(DWORD));
417 *pdwNumMaps = 0;
418
419 // Read all thresholds from message
420 for(i = 0, dwId = VID_DCI_THRESHOLD_BASE; i < dwNum; i++, dwId++)
421 {
422 pMsg->GetVariableBinary(dwId, (BYTE *)&pNewThresholds[i], sizeof(DCI_THRESHOLD));
423 pNewThresholds[i].dwId = ntohl(pNewThresholds[i].dwId);
424 }
425
426 // Check if some thresholds was deleted, and reposition others if needed
427 ppNewList = (Threshold **)malloc(sizeof(Threshold *) * dwNum);
428 for(i = 0; i < m_dwNumThresholds; i++)
429 {
430 for(j = 0; j < dwNum; j++)
431 if (m_ppThresholdList[i]->Id() == pNewThresholds[j].dwId)
432 break;
433 if (j == dwNum)
434 {
435 // No threshold with that id in new list, delete it
436 delete m_ppThresholdList[i];
437 m_dwNumThresholds--;
438 memmove(&m_ppThresholdList[i], &m_ppThresholdList[i + 1], sizeof(Threshold *) * (m_dwNumThresholds - i));
439 i--;
440 }
441 else
442 {
443 // Move existing thresholds to appropriate positions in new list
444 ppNewList[j] = m_ppThresholdList[i];
445 }
446 }
447 safe_free(m_ppThresholdList);
448 m_ppThresholdList = ppNewList;
449 m_dwNumThresholds = dwNum;
450
451 // Add or update thresholds
452 for(i = 0; i < dwNum; i++)
453 {
454 if (pNewThresholds[i].dwId == 0) // New threshold?
455 {
456 m_ppThresholdList[i] = new Threshold(this);
457 m_ppThresholdList[i]->CreateId();
458
459 // Add index -> id mapping
460 (*ppdwMapIndex)[*pdwNumMaps] = i;
461 (*ppdwMapId)[*pdwNumMaps] = m_ppThresholdList[i]->Id();
462 (*pdwNumMaps)++;
463 }
464 m_ppThresholdList[i]->UpdateFromMessage(&pNewThresholds[i]);
465 }
466
467 safe_free(pNewThresholds);
468 UpdateCacheSize();
469 Unlock();
470 }
471
472
473 //
474 // Process new value
475 //
476
477 void DCItem::NewValue(DWORD dwTimeStamp, const char *pszOriginalValue)
478 {
479 char szQuery[MAX_LINE_SIZE + 128];
480 ItemValue rawValue, *pValue;
481
482 Lock();
483
484 // Normally m_pNode shouldn't be NULL for polled items,
485 // but who knows...
486 if (m_pNode == NULL)
487 {
488 Unlock();
489 return;
490 }
491
492 // Create new ItemValue object and transform it as needed
493 pValue = new ItemValue(pszOriginalValue);
494 if (m_tLastPoll == 0)
495 m_prevRawValue = *pValue; // Delta should be zero for first poll
496 rawValue = *pValue;
497 Transform(*pValue, (long)(dwTimeStamp - m_tLastPoll));
498 m_prevRawValue = rawValue;
499
500 // Save transformed value to database
501 sprintf(szQuery, "INSERT INTO idata_%ld (item_id,idata_timestamp,idata_value)"
502 " VALUES (%ld,%ld,'%s')", m_pNode->Id(), m_dwId, dwTimeStamp,
503 pValue->String());
504 QueueSQLRequest(szQuery);
505
506 // Check thresholds and add value to cache
507 CheckThresholds(*pValue);
508
509 if (m_dwCacheSize > 0)
510 {
511 delete m_ppValueCache[m_dwCacheSize - 1];
512 memmove(&m_ppValueCache[1], m_ppValueCache, sizeof(ItemValue *) * (m_dwCacheSize - 1));
513 m_ppValueCache[0] = pValue;
514 }
515 else
516 {
517 delete pValue;
518 }
519
520 Unlock();
521 }
522
523
524 //
525 // Transform received value
526 //
527
528 void DCItem::Transform(ItemValue &value, long nElapsedTime)
529 {
530 switch(m_iDeltaCalculation)
531 {
532 case DCM_SIMPLE:
533 switch(m_iDataType)
534 {
535 case DCI_DT_INT:
536 value = (long)value - (long)m_prevRawValue;
537 break;
538 case DCI_DT_UINT:
539 value = (DWORD)value - (DWORD)m_prevRawValue;
540 break;
541 case DCI_DT_INT64:
542 value = (INT64)value - (INT64)m_prevRawValue;
543 break;
544 case DCI_DT_UINT64:
545 value = (QWORD)value - (QWORD)m_prevRawValue;
546 break;
547 case DCI_DT_FLOAT:
548 value = (double)value - (double)m_prevRawValue;
549 break;
550 case DCI_DT_STRING:
551 value = (long)((strcmp((const TCHAR *)value, (const TCHAR *)m_prevRawValue) == 0) ? 0 : 1);
552 break;
553 default:
554 // Delta calculation is not supported for other types
555 break;
556 }
557 break;
558 case DCM_AVERAGE_PER_MINUTE:
559 nElapsedTime /= 60; // Convert to minutes
560 case DCM_AVERAGE_PER_SECOND:
561 // Check elapsed time to prevent divide-by-zero exception
562 if (nElapsedTime == 0)
563 nElapsedTime++;
564
565 switch(m_iDataType)
566 {
567 case DCI_DT_INT:
568 value = ((long)value - (long)m_prevRawValue) / nElapsedTime;
569 break;
570 case DCI_DT_UINT:
571 value = ((DWORD)value - (DWORD)m_prevRawValue) / (DWORD)nElapsedTime;
572 break;
573 case DCI_DT_INT64:
574 value = ((INT64)value - (INT64)m_prevRawValue) / (INT64)nElapsedTime;
575 break;
576 case DCI_DT_UINT64:
577 value = ((QWORD)value - (QWORD)m_prevRawValue) / (QWORD)nElapsedTime;
578 break;
579 case DCI_DT_FLOAT:
580 value = ((double)value - (double)m_prevRawValue) / (double)nElapsedTime;
581 break;
582 case DCI_DT_STRING:
583 // I don't see any meaning in "average delta per second (minute)" for string
584 // values, so result will be 0 if there are no difference between
585 // current and previous values, and 1 otherwise
586 value = (long)((strcmp((const TCHAR *)value, (const TCHAR *)m_prevRawValue) == 0) ? 0 : 1);
587 break;
588 default:
589 // Delta calculation is not supported for other types
590 break;
591 }
592 break;
593 default: // Default is no transformation
594 break;
595 }
596 }
597
598
599 //
600 // Set new ID
601 //
602
603 void DCItem::SetId(DWORD dwNewId)
604 {
605 DWORD i;
606
607 Lock();
608 m_dwId = dwNewId;
609 for(i = 0; i < m_dwNumThresholds; i++)
610 m_ppThresholdList[i]->BindToItem(m_dwId);
611 Unlock();
612 }
613
614
615 //
616 // Update required cache size depending on thresholds
617 //
618
619 void DCItem::UpdateCacheSize(void)
620 {
621 DWORD i, dwRequiredSize;
622
623 // Minimum cache size is 1 (so GetLastValue can work)
624 for(i = 0, dwRequiredSize = 1; i < m_dwNumThresholds; i++)
625 if (dwRequiredSize < m_ppThresholdList[i]->RequiredCacheSize())
626 dwRequiredSize = m_ppThresholdList[i]->RequiredCacheSize();
627 if (dwRequiredSize < m_dwCacheSize)
628 {
629 m_dwCacheSize = dwRequiredSize;
630 if (m_dwCacheSize > 0)
631 {
632 m_ppValueCache = (ItemValue **)realloc(m_ppValueCache, sizeof(ItemValue *) * m_dwCacheSize);
633 }
634 else
635 {
636 safe_free(m_ppValueCache);
637 m_ppValueCache = NULL;
638 }
639 }
640 else if (dwRequiredSize > m_dwCacheSize)
641 {
642 // Expand cache
643 m_ppValueCache = (ItemValue **)realloc(m_ppValueCache, sizeof(ItemValue *) * dwRequiredSize);
644 for(i = m_dwCacheSize; i < dwRequiredSize; i++)
645 m_ppValueCache[i] = NULL;
646
647 // Load missing values from database
648 if (m_pNode != NULL)
649 {
650 DB_ASYNC_RESULT hResult;
651 char szBuffer[MAX_DB_STRING];
652 BOOL bHasData;
653
654 switch(g_dwDBSyntax)
655 {
656 case DB_SYNTAX_MSSQL:
657 sprintf(szBuffer, "SELECT TOP %ld idata_value FROM idata_%ld "
658 "WHERE item_id=%ld ORDER BY idata_timestamp DESC",
659 m_dwCacheSize, m_pNode->Id(), m_dwId);
660 break;
661 case DB_SYNTAX_MYSQL:
662 case DB_SYNTAX_PGSQL:
663 sprintf(szBuffer, "SELECT idata_value FROM idata_%ld "
664 "WHERE item_id=%ld ORDER BY idata_timestamp DESC LIMIT %ld",
665 m_pNode->Id(), m_dwId, m_dwCacheSize);
666 break;
667 default:
668 sprintf(szBuffer, "SELECT idata_value FROM idata_%ld "
669 "WHERE item_id=%ld ORDER BY idata_timestamp DESC",
670 m_pNode->Id(), m_dwId);
671 break;
672 }
673 hResult = DBAsyncSelect(g_hCoreDB, szBuffer);
674 if (hResult != NULL)
675 {
676 // Skip already cached values
677 for(i = 0, bHasData = TRUE; (i < m_dwCacheSize) && bHasData; i++)
678 bHasData = DBFetch(hResult);
679
680 // Create new cache entries
681 for(; (i < dwRequiredSize) && bHasData; i++)
682 {
683 bHasData = DBFetch(hResult);
684 if (bHasData)
685 {
686 m_ppValueCache[i] = new ItemValue(DBGetFieldAsync(hResult, 0, szBuffer, MAX_DB_STRING));
687 }
688 else
689 {
690 m_ppValueCache[i] = new ItemValue(_T("")); // Empty value
691 }
692 }
693
694 // Fill up cache with empty values if we don't have enough values in database
695 for(; i < dwRequiredSize; i++)
696 m_ppValueCache[i] = new ItemValue(_T(""));
697
698 DBFreeAsyncResult(hResult);
699 }
700 else
701 {
702 // Error reading data from database, fill cache with empty values
703 for(i = 0; i < dwRequiredSize; i++)
704 m_ppValueCache[i] = new ItemValue(_T(""));
705 }
706 }
707 m_dwCacheSize = dwRequiredSize;
708 }
709 }
710
711
712 //
713 // Put last value into CSCP message
714 //
715
716 void DCItem::GetLastValue(CSCPMessage *pMsg, DWORD dwId)
717 {
718 pMsg->SetVariable(dwId++, m_dwId);
719 pMsg->SetVariable(dwId++, m_szName);
720 pMsg->SetVariable(dwId++, m_szDescription);
721 if (m_dwCacheSize > 0)
722 {
723 pMsg->SetVariable(dwId++, (WORD)m_iDataType);
724 pMsg->SetVariable(dwId++, (TCHAR *)m_ppValueCache[0]->String());
725 pMsg->SetVariable(dwId++, (DWORD)m_tLastPoll);
726 }
727 else
728 {
729 pMsg->SetVariable(dwId++, (WORD)DCI_DT_NULL);
730 pMsg->SetVariable(dwId++, _T(""));
731 pMsg->SetVariable(dwId++, (DWORD)0);
732 }
733 }
734
735
736 //
737 // Clean expired data
738 //
739
740 void DCItem::CleanData(void)
741 {
742 TCHAR szQuery[256];
743 time_t now;
744
745 now = time(NULL);
746 Lock();
747 _sntprintf(szQuery, 256, _T("DELETE FROM idata_%ld WHERE (item_id=%ld) AND (idata_timestamp<%ld)"),
748 m_pNode->Id(), m_dwId, now - (DWORD)m_iRetentionTime * 86400);
749 Unlock();
750 QueueSQLRequest(szQuery);
751 }
752
753
754 //
755 // Prepare item for deletion
756 //
757
758 void DCItem::PrepareForDeletion(void)
759 {
760 Lock();
761
762 m_iStatus = ITEM_STATUS_DISABLED; // Prevent future polls
763
764 // Wait until current poll ends, if any
765 // while() is not a very good solution, and probably need to be
766 // rewrited using conditions
767 while(m_iBusy)
768 ThreadSleepMs(100);
769
770 Unlock();
771 }