ecfd3d3bdd965883883ef46a9b92154b3a88dea9
[public/netxms.git] / src / server / core / datacoll.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Interval between DCI polling
27 */
28 #define ITEM_POLLING_INTERVAL 1
29
30 /**
31 * Externals
32 */
33 extern Queue g_syslogProcessingQueue;
34 extern Queue g_syslogWriteQueue;
35
36 /**
37 * Global data
38 */
39 double g_dAvgPollerQueueSize = 0;
40 double g_dAvgDBWriterQueueSize = 0;
41 double g_dAvgIDataWriterQueueSize = 0;
42 double g_dAvgRawDataWriterQueueSize = 0;
43 double g_dAvgDBAndIDataWriterQueueSize = 0;
44 double g_dAvgSyslogProcessingQueueSize = 0;
45 double g_dAvgSyslogWriterQueueSize = 0;
46 UINT32 g_dwAvgDCIQueuingTime = 0;
47 Queue g_dataCollectionQueue(4096, 256);
48 Queue g_dciCacheLoaderQueue;
49
50 /**
51 * Collect data for DCI
52 */
53 static void *GetItemData(DataCollectionTarget *dcTarget, DCItem *pItem, TCHAR *pBuffer, UINT32 *error)
54 {
55 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
56 {
57 if (pItem->isAggregateOnCluster())
58 {
59 *error = ((Cluster *)dcTarget)->collectAggregatedData(pItem, pBuffer);
60 }
61 else
62 {
63 *error = DCE_IGNORE;
64 }
65 }
66 else
67 {
68 switch(pItem->getDataSource())
69 {
70 case DS_INTERNAL: // Server internal parameters (like status)
71 *error = dcTarget->getInternalItem(pItem->getName(), MAX_LINE_SIZE, pBuffer);
72 break;
73 case DS_SNMP_AGENT:
74 if (dcTarget->getObjectClass() == OBJECT_NODE)
75 *error = ((Node *)dcTarget)->getItemFromSNMP(pItem->getSnmpPort(), pItem->getName(), MAX_LINE_SIZE,
76 pBuffer, pItem->isInterpretSnmpRawValue() ? (int)pItem->getSnmpRawValueType() : SNMP_RAWTYPE_NONE);
77 else
78 *error = DCE_NOT_SUPPORTED;
79 break;
80 case DS_CHECKPOINT_AGENT:
81 if (dcTarget->getObjectClass() == OBJECT_NODE)
82 *error = ((Node *)dcTarget)->getItemFromCheckPointSNMP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
83 else
84 *error = DCE_NOT_SUPPORTED;
85 break;
86 case DS_NATIVE_AGENT:
87 if (dcTarget->getObjectClass() == OBJECT_NODE)
88 *error = ((Node *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer);
89 else if (dcTarget->getObjectClass() == OBJECT_SENSOR)
90 *error = ((Sensor *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer);
91 else
92 *error = DCE_NOT_SUPPORTED;
93 break;
94 case DS_WINPERF:
95 if (dcTarget->getObjectClass() == OBJECT_NODE)
96 {
97 TCHAR name[MAX_PARAM_NAME];
98 _sntprintf(name, MAX_PARAM_NAME, _T("PDH.CounterValue(\"%s\",%d)"), (const TCHAR *)EscapeStringForAgent(pItem->getName()), pItem->getSampleCount());
99 *error = ((Node *)dcTarget)->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer);
100 }
101 else
102 {
103 *error = DCE_NOT_SUPPORTED;
104 }
105 break;
106 case DS_SSH:
107 if (dcTarget->getObjectClass() == OBJECT_NODE)
108 {
109 UINT32 proxyId = ((Node *)dcTarget)->getSshProxy();
110 if (proxyId == 0)
111 {
112 if (IsZoningEnabled())
113 {
114 Zone *zone = FindZoneByUIN(((Node *)dcTarget)->getZoneUIN());
115 if ((zone != NULL) && (zone->getProxyNodeId() != 0))
116 proxyId = zone->getProxyNodeId();
117 else
118 proxyId = g_dwMgmtNode;
119 }
120 else
121 {
122 proxyId = g_dwMgmtNode;
123 }
124 }
125 Node *proxy = (Node *)FindObjectById(proxyId, OBJECT_NODE);
126 if (proxy != NULL)
127 {
128 TCHAR name[MAX_PARAM_NAME], ipAddr[64];
129 _sntprintf(name, MAX_PARAM_NAME, _T("SSH.Command(%s,\"%s\",\"%s\",\"%s\")"),
130 ((Node *)dcTarget)->getIpAddress().toString(ipAddr),
131 (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshLogin()),
132 (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshPassword()),
133 (const TCHAR *)EscapeStringForAgent(pItem->getName()));
134 *error = proxy->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer);
135 }
136 else
137 {
138 *error = DCE_COMM_ERROR;
139 }
140 }
141 else
142 {
143 *error = DCE_NOT_SUPPORTED;
144 }
145 break;
146 case DS_SMCLP:
147 if (dcTarget->getObjectClass() == OBJECT_NODE)
148 {
149 *error = ((Node *)dcTarget)->getItemFromSMCLP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
150 }
151 else
152 {
153 *error = DCE_NOT_SUPPORTED;
154 }
155 break;
156 case DS_SCRIPT:
157 *error = dcTarget->getScriptItem(pItem->getName(), MAX_LINE_SIZE, pBuffer, (DataCollectionTarget *)pItem->getOwner());
158 break;
159 default:
160 *error = DCE_NOT_SUPPORTED;
161 break;
162 }
163 }
164 return pBuffer;
165 }
166
167 /**
168 * Collect data for table
169 */
170 static void *GetTableData(DataCollectionTarget *dcTarget, DCTable *table, UINT32 *error)
171 {
172 Table *result = NULL;
173 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
174 {
175 if (table->isAggregateOnCluster())
176 {
177 *error = ((Cluster *)dcTarget)->collectAggregatedData(table, &result);
178 }
179 else
180 {
181 *error = DCE_IGNORE;
182 }
183 }
184 else
185 {
186 switch(table->getDataSource())
187 {
188 case DS_NATIVE_AGENT:
189 if (dcTarget->getObjectClass() == OBJECT_NODE)
190 {
191 *error = ((Node *)dcTarget)->getTableFromAgent(table->getName(), &result);
192 if ((*error == DCE_SUCCESS) && (result != NULL))
193 table->updateResultColumns(result);
194 }
195 else
196 {
197 *error = DCE_NOT_SUPPORTED;
198 }
199 break;
200 case DS_SNMP_AGENT:
201 if (dcTarget->getObjectClass() == OBJECT_NODE)
202 {
203 *error = ((Node *)dcTarget)->getTableFromSNMP(table->getSnmpPort(), table->getName(), table->getColumns(), &result);
204 if ((*error == DCE_SUCCESS) && (result != NULL))
205 table->updateResultColumns(result);
206 }
207 else
208 {
209 *error = DCE_NOT_SUPPORTED;
210 }
211 break;
212 case DS_SCRIPT:
213 *error = dcTarget->getScriptTable(table->getName(), &result, (DataCollectionTarget *)table->getOwner());
214 break;
215 default:
216 *error = DCE_NOT_SUPPORTED;
217 break;
218 }
219 }
220 return result;
221 }
222
223 /**
224 * Data collector
225 */
226 static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
227 {
228 UINT32 dwError;
229
230 TCHAR *pBuffer = (TCHAR *)malloc(MAX_LINE_SIZE * sizeof(TCHAR));
231 while(!IsShutdownInProgress())
232 {
233 DCObject *pItem = (DCObject *)g_dataCollectionQueue.getOrBlock();
234 DataCollectionTarget *target = (DataCollectionTarget *)pItem->getOwner();
235
236 if (pItem->isScheduledForDeletion())
237 {
238 nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
239 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1);
240 pItem->deleteFromDatabase();
241 delete pItem;
242 continue;
243 }
244
245 if (target == NULL)
246 {
247 nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
248 pItem->getId(), pItem->getName());
249
250 // Update item's last poll time and clear busy flag so item can be polled again
251 pItem->setLastPollTime(time(NULL));
252 pItem->clearBusyFlag();
253 continue;
254 }
255
256 DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
257 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getSourceNode());
258 UINT32 sourceNodeId = target->getEffectiveSourceNode(pItem);
259 if (sourceNodeId != 0)
260 {
261 Node *sourceNode = (Node *)FindObjectById(sourceNodeId, OBJECT_NODE);
262 if (sourceNode != NULL)
263 {
264 if (((target->getObjectClass() == OBJECT_CHASSIS) && (((Chassis *)target)->getControllerId() == sourceNodeId)) ||
265 sourceNode->isTrustedNode(target->getId()))
266 {
267 target = sourceNode;
268 target->incRefCount();
269 }
270 else
271 {
272 // Change item's status to "not supported"
273 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
274 target->decRefCount();
275 target = NULL;
276 }
277 }
278 else
279 {
280 target->decRefCount();
281 target = NULL;
282 }
283 }
284
285 time_t currTime = time(NULL);
286 if (target != NULL)
287 {
288 if (!IsShutdownInProgress())
289 {
290 void *data;
291 switch(pItem->getType())
292 {
293 case DCO_TYPE_ITEM:
294 data = GetItemData(target, (DCItem *)pItem, pBuffer, &dwError);
295 break;
296 case DCO_TYPE_TABLE:
297 data = GetTableData(target, (DCTable *)pItem, &dwError);
298 break;
299 default:
300 data = NULL;
301 dwError = DCE_NOT_SUPPORTED;
302 break;
303 }
304
305 // Transform and store received value into database or handle error
306 switch(dwError)
307 {
308 case DCE_SUCCESS:
309 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
310 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
311 if (!((DataCollectionTarget *)pItem->getOwner())->processNewDCValue(pItem, currTime, data))
312 {
313 // value processing failed, convert to data collection error
314 pItem->processNewError(false);
315 }
316 break;
317 case DCE_COLLECTION_ERROR:
318 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
319 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
320 pItem->processNewError(false);
321 break;
322 case DCE_NO_SUCH_INSTANCE:
323 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
324 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
325 pItem->processNewError(true);
326 break;
327 case DCE_COMM_ERROR:
328 pItem->processNewError(false);
329 break;
330 case DCE_NOT_SUPPORTED:
331 // Change item's status
332 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
333 break;
334 }
335 }
336
337 // Send session notification when force poll is performed
338 if (pItem->getPollingSession() != NULL)
339 {
340 ClientSession *session = pItem->processForcePoll();
341 session->notify(NX_NOTIFY_FORCE_DCI_POLL, pItem->getOwnerId());
342 session->decRefCount();
343 }
344
345 // Decrement node's usage counter
346 target->decRefCount();
347 if ((pItem->getSourceNode() != 0) && (pItem->getOwner() != NULL))
348 {
349 pItem->getOwner()->decRefCount();
350 }
351 }
352 else /* target == NULL */
353 {
354 Template *n = pItem->getOwner();
355 nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
356 pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, sourceNodeId);
357 }
358
359 // Update item's last poll time and clear busy flag so item can be polled again
360 pItem->setLastPollTime(currTime);
361 pItem->clearBusyFlag();
362 }
363
364 free(pBuffer);
365 DbgPrintf(1, _T("Data collector thread terminated"));
366 return THREAD_OK;
367 }
368
369 /**
370 * Callback for queueing DCIs
371 */
372 static void QueueItems(NetObj *object, void *data)
373 {
374 if (IsShutdownInProgress())
375 return;
376
377 WatchdogNotify(*((UINT32 *)data));
378 nxlog_debug(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
379 object->getName(), object->getId());
380 ((DataCollectionTarget *)object)->queueItemsForPolling(&g_dataCollectionQueue);
381 }
382
383 /**
384 * Item poller thread: check nodes' items and put into the
385 * data collector queue when data polling required
386 */
387 static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
388 {
389 UINT32 dwSum, currPos = 0;
390 UINT32 dwTimingHistory[60 / ITEM_POLLING_INTERVAL];
391 INT64 qwStart;
392
393 UINT32 watchdogId = WatchdogAddThread(_T("Item Poller"), 10);
394 memset(dwTimingHistory, 0, sizeof(UINT32) * (60 / ITEM_POLLING_INTERVAL));
395
396 while(!IsShutdownInProgress())
397 {
398 if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL))
399 break; // Shutdown has arrived
400 WatchdogNotify(watchdogId);
401 DbgPrintf(8, _T("ItemPoller: wakeup"));
402
403 qwStart = GetCurrentTimeMs();
404 g_idxNodeById.forEach(QueueItems, &watchdogId);
405 g_idxClusterById.forEach(QueueItems, &watchdogId);
406 g_idxMobileDeviceById.forEach(QueueItems, &watchdogId);
407 g_idxChassisById.forEach(QueueItems, &watchdogId);
408 g_idxSensorById.forEach(QueueItems, &watchdogId);
409
410 // Save last poll time
411 dwTimingHistory[currPos] = (UINT32)(GetCurrentTimeMs() - qwStart);
412 currPos++;
413 if (currPos == (60 / ITEM_POLLING_INTERVAL))
414 currPos = 0;
415
416 // Calculate new average for last minute
417 dwSum = 0;
418 for(int i = 0; i < (60 / ITEM_POLLING_INTERVAL); i++)
419 dwSum += dwTimingHistory[i];
420 g_dwAvgDCIQueuingTime = dwSum / (60 / ITEM_POLLING_INTERVAL);
421 }
422 DbgPrintf(1, _T("Item poller thread terminated"));
423 return THREAD_OK;
424 }
425
426 /**
427 * Statistics collection thread
428 */
429 static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
430 {
431 UINT32 i, currPos = 0;
432 UINT32 pollerQS[12], dbWriterQS[12];
433 UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
434 UINT32 syslogProcessingQS[12], syslogWriterQS[12];
435 double sum1, sum2, sum3, sum4, sum5, sum8, sum9;
436
437 memset(pollerQS, 0, sizeof(UINT32) * 12);
438 memset(dbWriterQS, 0, sizeof(UINT32) * 12);
439 memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
440 memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
441 memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
442 memset(syslogProcessingQS, 0, sizeof(UINT32) * 12);
443 memset(syslogWriterQS, 0, sizeof(UINT32) * 12);
444 g_dAvgPollerQueueSize = 0;
445 g_dAvgDBWriterQueueSize = 0;
446 g_dAvgIDataWriterQueueSize = 0;
447 g_dAvgRawDataWriterQueueSize = 0;
448 g_dAvgDBAndIDataWriterQueueSize = 0;
449 g_dAvgSyslogProcessingQueueSize = 0;
450 g_dAvgSyslogWriterQueueSize = 0;
451 while(!IsShutdownInProgress())
452 {
453 if (SleepAndCheckForShutdown(5))
454 break; // Shutdown has arrived
455
456 // Get current values
457 pollerQS[currPos] = g_dataCollectionQueue.size();
458 dbWriterQS[currPos] = g_dbWriterQueue->size();
459 iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
460 rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
461 dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size();
462 syslogProcessingQS[currPos] = g_syslogProcessingQueue.size();
463 syslogWriterQS[currPos] = g_syslogWriteQueue.size();
464 currPos++;
465 if (currPos == 12)
466 currPos = 0;
467
468 // Calculate new averages
469 for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
470 {
471 sum1 += pollerQS[i];
472 sum2 += dbWriterQS[i];
473 sum3 += iDataWriterQS[i];
474 sum4 += rawDataWriterQS[i];
475 sum5 += dbAndIDataWriterQS[i];
476 sum8 += syslogProcessingQS[i];
477 sum9 += syslogWriterQS[i];
478 }
479 g_dAvgPollerQueueSize = sum1 / 12;
480 g_dAvgDBWriterQueueSize = sum2 / 12;
481 g_dAvgIDataWriterQueueSize = sum3 / 12;
482 g_dAvgRawDataWriterQueueSize = sum4 / 12;
483 g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
484 g_dAvgSyslogProcessingQueueSize = sum8 / 12;
485 g_dAvgSyslogWriterQueueSize = sum9 / 12;
486 }
487 return THREAD_OK;
488 }
489
490 /**
491 * DCI cache loader
492 */
493 THREAD_RESULT THREAD_CALL CacheLoader(void *arg)
494 {
495 DbgPrintf(2, _T("DCI cache loader thread started"));
496 while(true)
497 {
498 DCObjectInfo *ref = (DCObjectInfo *)g_dciCacheLoaderQueue.getOrBlock();
499 if (ref == INVALID_POINTER_VALUE)
500 break;
501
502 NetObj *object = FindObjectById(ref->getOwnerId());
503 if ((object != NULL) && object->isDataCollectionTarget())
504 {
505 object->incRefCount();
506 DCObject *dci = static_cast<DataCollectionTarget*>(object)->getDCObjectById(ref->getId(), true);
507 if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM))
508 {
509 DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"),
510 ref->getName(), ref->getId(), object->getName(), object->getId());
511 static_cast<DCItem*>(dci)->reloadCache();
512 }
513 object->decRefCount();
514 }
515 delete ref;
516 }
517 DbgPrintf(2, _T("DCI cache loader thread stopped"));
518 return THREAD_OK;
519 }
520
521 /**
522 * Initialize data collection subsystem
523 */
524 BOOL InitDataCollector()
525 {
526 int i, iNumCollectors;
527
528 // Start data collection threads
529 iNumCollectors = ConfigReadInt(_T("NumberOfDataCollectors"), 10);
530 for(i = 0; i < iNumCollectors; i++)
531 ThreadCreate(DataCollector, 0, NULL);
532
533 ThreadCreate(ItemPoller, 0, NULL);
534 ThreadCreate(StatCollector, 0, NULL);
535 ThreadCreate(CacheLoader, 0, NULL);
536
537 return TRUE;
538 }
539
540 /**
541 * Update parameter list from node
542 */
543 static void UpdateParamList(NetObj *object, void *data)
544 {
545 ObjectArray<AgentParameterDefinition> *fullList = (ObjectArray<AgentParameterDefinition> *)data;
546
547 ObjectArray<AgentParameterDefinition> *paramList;
548 ((Node *)object)->openParamList(&paramList);
549 if ((paramList != NULL) && (paramList->size() > 0))
550 {
551 for(int i = 0; i < paramList->size(); i++)
552 {
553 int j;
554 for(j = 0; j < fullList->size(); j++)
555 {
556 if (!_tcsicmp(paramList->get(i)->getName(), fullList->get(j)->getName()))
557 break;
558 }
559
560 if (j == fullList->size())
561 {
562 fullList->add(new AgentParameterDefinition(paramList->get(i)));
563 }
564 }
565 }
566 ((Node *)object)->closeParamList();
567 }
568
569 /**
570 * Update table list from node
571 */
572 static void UpdateTableList(NetObj *object, void *data)
573 {
574 ObjectArray<AgentTableDefinition> *fullList = (ObjectArray<AgentTableDefinition> *)data;
575
576 ObjectArray<AgentTableDefinition> *tableList;
577 ((Node *)object)->openTableList(&tableList);
578 if ((tableList != NULL) && (tableList->size() > 0))
579 {
580 for(int i = 0; i < tableList->size(); i++)
581 {
582 int j;
583 for(j = 0; j < fullList->size(); j++)
584 {
585 if (!_tcsicmp(tableList->get(i)->getName(), fullList->get(j)->getName()))
586 break;
587 }
588
589 if (j == fullList->size())
590 {
591 fullList->add(new AgentTableDefinition(tableList->get(i)));
592 }
593 }
594 }
595 ((Node *)object)->closeTableList();
596 }
597
598 /**
599 * Write full (from all nodes) agent parameters list to NXCP message
600 */
601 void WriteFullParamListToMessage(NXCPMessage *pMsg, WORD flags)
602 {
603 // Gather full parameter list
604 if (flags & 0x01)
605 {
606 ObjectArray<AgentParameterDefinition> fullList(64, 64, true);
607 g_idxNodeById.forEach(UpdateParamList, &fullList);
608
609 // Put list into the message
610 pMsg->setField(VID_NUM_PARAMETERS, (UINT32)fullList.size());
611 UINT32 varId = VID_PARAM_LIST_BASE;
612 for(int i = 0; i < fullList.size(); i++)
613 {
614 varId += fullList.get(i)->fillMessage(pMsg, varId);
615 }
616 }
617
618 // Gather full table list
619 if (flags & 0x02)
620 {
621 ObjectArray<AgentTableDefinition> fullList(64, 64, true);
622 g_idxNodeById.forEach(UpdateTableList, &fullList);
623
624 // Put list into the message
625 pMsg->setField(VID_NUM_TABLES, (UINT32)fullList.size());
626 UINT32 varId = VID_TABLE_LIST_BASE;
627 for(int i = 0; i < fullList.size(); i++)
628 {
629 varId += fullList.get(i)->fillMessage(pMsg, varId);
630 }
631 }
632 }
633
634 /**
635 * Get type of data collection object
636 */
637 int GetDCObjectType(UINT32 nodeId, UINT32 dciId)
638 {
639 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
640 if (node != NULL)
641 {
642 DCObject *dco = node->getDCObjectById(dciId);
643 if (dco != NULL)
644 {
645 return dco->getType();
646 }
647 }
648 return DCO_TYPE_ITEM; // default
649 }