removed unused local variables (data collectors refactoring)
[public/netxms.git] / src / server / core / datacoll.cpp
CommitLineData
6da535d9 1/*
5039dede 2** NetXMS - Network Management System
2885ad2e 3** Copyright (C) 2003-2017 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxcore.h"
24
46ee6286
VK
25/**
26 * Interval between DCI polling
27 */
6da535d9 28#define ITEM_POLLING_INTERVAL 1
5039dede 29
46ee6286
VK
30/**
31 * Externals
32 */
f1784ab6
VK
33extern Queue g_syslogProcessingQueue;
34extern Queue g_syslogWriteQueue;
b239e165
VK
35extern ThreadPool *g_pollerThreadPool;
36
37/**
38 * Thread pool for data collectors
39 */
40ThreadPool *g_dataCollectorThreadPool = NULL;
5039dede 41
6fd6de0a
VK
42/**
43 * Global data
44 */
b239e165 45double g_dAvgDataCollectorQueueSize = 0;
5039dede
AK
46double g_dAvgPollerQueueSize = 0;
47double g_dAvgDBWriterQueueSize = 0;
cf084617 48double g_dAvgIDataWriterQueueSize = 0;
76fcb995 49double g_dAvgRawDataWriterQueueSize = 0;
cf084617 50double g_dAvgDBAndIDataWriterQueueSize = 0;
f1784ab6
VK
51double g_dAvgSyslogProcessingQueueSize = 0;
52double g_dAvgSyslogWriterQueueSize = 0;
967893bb 53UINT32 g_dwAvgDCIQueuingTime = 0;
d140955e 54Queue g_dciCacheLoaderQueue;
5039dede 55
171c2fd6
VK
56/**
57 * Collect data for DCI
58 */
967893bb 59static void *GetItemData(DataCollectionTarget *dcTarget, DCItem *pItem, TCHAR *pBuffer, UINT32 *error)
df94e0ce 60{
c42b4551 61 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
df94e0ce 62 {
85ae39bc
VK
63 if (pItem->isAggregateOnCluster())
64 {
65 *error = ((Cluster *)dcTarget)->collectAggregatedData(pItem, pBuffer);
66 }
67 else
68 {
3cd6f508 69 *error = DCE_IGNORE;
85ae39bc
VK
70 }
71 }
72 else
73 {
74 switch(pItem->getDataSource())
75 {
76 case DS_INTERNAL: // Server internal parameters (like status)
77 *error = dcTarget->getInternalItem(pItem->getName(), MAX_LINE_SIZE, pBuffer);
78 break;
79 case DS_SNMP_AGENT:
c42b4551 80 if (dcTarget->getObjectClass() == OBJECT_NODE)
6da535d9 81 *error = ((Node *)dcTarget)->getItemFromSNMP(pItem->getSnmpPort(), pItem->getName(), MAX_LINE_SIZE,
85ae39bc
VK
82 pBuffer, pItem->isInterpretSnmpRawValue() ? (int)pItem->getSnmpRawValueType() : SNMP_RAWTYPE_NONE);
83 else
84 *error = DCE_NOT_SUPPORTED;
85 break;
86 case DS_CHECKPOINT_AGENT:
c42b4551 87 if (dcTarget->getObjectClass() == OBJECT_NODE)
85ae39bc
VK
88 *error = ((Node *)dcTarget)->getItemFromCheckPointSNMP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
89 else
90 *error = DCE_NOT_SUPPORTED;
91 break;
92 case DS_NATIVE_AGENT:
c42b4551 93 if (dcTarget->getObjectClass() == OBJECT_NODE)
85ae39bc 94 *error = ((Node *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer);
ce9e00cc
VK
95 else if (dcTarget->getObjectClass() == OBJECT_SENSOR)
96 *error = ((Sensor *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer);
85ae39bc
VK
97 else
98 *error = DCE_NOT_SUPPORTED;
99 break;
100 case DS_WINPERF:
c42b4551 101 if (dcTarget->getObjectClass() == OBJECT_NODE)
85ae39bc
VK
102 {
103 TCHAR name[MAX_PARAM_NAME];
1d919454 104 _sntprintf(name, MAX_PARAM_NAME, _T("PDH.CounterValue(\"%s\",%d)"), (const TCHAR *)EscapeStringForAgent(pItem->getName()), pItem->getSampleCount());
85ae39bc
VK
105 *error = ((Node *)dcTarget)->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer);
106 }
107 else
108 {
109 *error = DCE_NOT_SUPPORTED;
110 }
111 break;
241541f4
VK
112 case DS_SSH:
113 if (dcTarget->getObjectClass() == OBJECT_NODE)
114 {
115 UINT32 proxyId = ((Node *)dcTarget)->getSshProxy();
116 if (proxyId == 0)
117 {
118 if (IsZoningEnabled())
119 {
a191c634 120 Zone *zone = FindZoneByUIN(((Node *)dcTarget)->getZoneUIN());
43b62436
VK
121 if ((zone != NULL) && (zone->getProxyNodeId() != 0))
122 proxyId = zone->getProxyNodeId();
241541f4
VK
123 else
124 proxyId = g_dwMgmtNode;
125 }
126 else
127 {
128 proxyId = g_dwMgmtNode;
129 }
130 }
131 Node *proxy = (Node *)FindObjectById(proxyId, OBJECT_NODE);
132 if (proxy != NULL)
133 {
134 TCHAR name[MAX_PARAM_NAME], ipAddr[64];
135 _sntprintf(name, MAX_PARAM_NAME, _T("SSH.Command(%s,\"%s\",\"%s\",\"%s\")"),
136 ((Node *)dcTarget)->getIpAddress().toString(ipAddr),
137 (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshLogin()),
138 (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshPassword()),
139 (const TCHAR *)EscapeStringForAgent(pItem->getName()));
140 *error = proxy->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer);
141 }
142 else
143 {
144 *error = DCE_COMM_ERROR;
145 }
146 }
147 else
148 {
149 *error = DCE_NOT_SUPPORTED;
150 }
151 break;
1d0d82b3 152 case DS_SMCLP:
c42b4551 153 if (dcTarget->getObjectClass() == OBJECT_NODE)
03bc96df 154 {
1d0d82b3 155 *error = ((Node *)dcTarget)->getItemFromSMCLP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
03bc96df 156 }
85ae39bc
VK
157 else
158 {
159 *error = DCE_NOT_SUPPORTED;
160 }
161 break;
17b1ab4a 162 case DS_SCRIPT:
7a69b18b 163 *error = dcTarget->getScriptItem(pItem->getName(), MAX_LINE_SIZE, pBuffer, (DataCollectionTarget *)pItem->getOwner());
17b1ab4a 164 break;
85ae39bc
VK
165 default:
166 *error = DCE_NOT_SUPPORTED;
167 break;
168 }
df94e0ce
VK
169 }
170 return pBuffer;
171}
172
171c2fd6
VK
173/**
174 * Collect data for table
175 */
967893bb 176static void *GetTableData(DataCollectionTarget *dcTarget, DCTable *table, UINT32 *error)
df94e0ce
VK
177{
178 Table *result = NULL;
c42b4551 179 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
a0ddfb29
VK
180 {
181 if (table->isAggregateOnCluster())
182 {
183 *error = ((Cluster *)dcTarget)->collectAggregatedData(table, &result);
184 }
185 else
186 {
3cd6f508 187 *error = DCE_IGNORE;
a0ddfb29
VK
188 }
189 }
190 else
df94e0ce 191 {
a0ddfb29
VK
192 switch(table->getDataSource())
193 {
194 case DS_NATIVE_AGENT:
c42b4551 195 if (dcTarget->getObjectClass() == OBJECT_NODE)
a0ddfb29
VK
196 {
197 *error = ((Node *)dcTarget)->getTableFromAgent(table->getName(), &result);
198 if ((*error == DCE_SUCCESS) && (result != NULL))
199 table->updateResultColumns(result);
200 }
201 else
202 {
203 *error = DCE_NOT_SUPPORTED;
204 }
205 break;
db117859 206 case DS_SNMP_AGENT:
c42b4551 207 if (dcTarget->getObjectClass() == OBJECT_NODE)
db117859
VK
208 {
209 *error = ((Node *)dcTarget)->getTableFromSNMP(table->getSnmpPort(), table->getName(), table->getColumns(), &result);
210 if ((*error == DCE_SUCCESS) && (result != NULL))
211 table->updateResultColumns(result);
212 }
213 else
214 {
215 *error = DCE_NOT_SUPPORTED;
216 }
217 break;
3f61dbd4 218 case DS_SCRIPT:
7a69b18b 219 *error = dcTarget->getScriptTable(table->getName(), &result, (DataCollectionTarget *)table->getOwner());
3f61dbd4 220 break;
a0ddfb29
VK
221 default:
222 *error = DCE_NOT_SUPPORTED;
223 break;
224 }
225 }
df94e0ce
VK
226 return result;
227}
228
6fd6de0a
VK
229/**
230 * Data collector
231 */
b239e165 232void DataCollector(void *arg)
5039dede 233{
b239e165
VK
234 DCObject *pItem = static_cast<DCObject*>(arg);
235 DataCollectionTarget *target = static_cast<DataCollectionTarget*>(pItem->getOwner());
5039dede 236
b239e165 237 if (pItem->isScheduledForDeletion())
5039dede 238 {
b239e165
VK
239 nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
240 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1);
241 pItem->deleteFromDatabase();
242 delete pItem;
243 target->decRefCount();
244 return;
245 }
ced80327 246
b239e165
VK
247 if (target == NULL)
248 {
249 nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
250 pItem->getId(), pItem->getName());
46e2b370 251
b239e165
VK
252 // Update item's last poll time and clear busy flag so item can be polled again
253 pItem->setLastPollTime(time(NULL));
254 pItem->clearBusyFlag();
255 return;
256 }
46e2b370 257
b239e165
VK
258 DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
259 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getSourceNode());
260 UINT32 sourceNodeId = target->getEffectiveSourceNode(pItem);
261 if (sourceNodeId != 0)
262 {
263 Node *sourceNode = (Node *)FindObjectById(sourceNodeId, OBJECT_NODE);
264 if (sourceNode != NULL)
265 {
266 if (((target->getObjectClass() == OBJECT_CHASSIS) && (((Chassis *)target)->getControllerId() == sourceNodeId)) ||
267 sourceNode->isTrustedNode(target->getId()))
268 {
269 target = sourceNode;
270 target->incRefCount();
271 }
272 else
273 {
274 // Change item's status to "not supported"
275 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
46e2b370
VK
276 target->decRefCount();
277 target = NULL;
b239e165
VK
278 }
279 }
280 else
281 {
282 target->decRefCount();
283 target = NULL;
284 }
285 }
5039dede 286
b239e165
VK
287 time_t currTime = time(NULL);
288 if (target != NULL)
289 {
290 if (!IsShutdownInProgress())
5039dede 291 {
b239e165
VK
292 void *data;
293 TCHAR buffer[MAX_LINE_SIZE];
294 UINT32 error;
295 switch(pItem->getType())
5039dede 296 {
b239e165
VK
297 case DCO_TYPE_ITEM:
298 data = GetItemData(target, (DCItem *)pItem, buffer, &error);
299 break;
300 case DCO_TYPE_TABLE:
301 data = GetTableData(target, (DCTable *)pItem, &error);
302 break;
303 default:
304 data = NULL;
305 error = DCE_NOT_SUPPORTED;
306 break;
307 }
88dc9091 308
b239e165
VK
309 // Transform and store received value into database or handle error
310 switch(error)
311 {
312 case DCE_SUCCESS:
313 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
314 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
315 if (!((DataCollectionTarget *)pItem->getOwner())->processNewDCValue(pItem, currTime, data))
316 {
317 // value processing failed, convert to data collection error
a8164c20 318 pItem->processNewError(false);
b239e165
VK
319 }
320 break;
321 case DCE_COLLECTION_ERROR:
322 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
323 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
324 pItem->processNewError(false);
325 break;
326 case DCE_NO_SUCH_INSTANCE:
327 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
328 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
329 pItem->processNewError(true);
330 break;
331 case DCE_COMM_ERROR:
332 pItem->processNewError(false);
333 break;
334 case DCE_NOT_SUPPORTED:
335 // Change item's status
336 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
337 break;
5039dede
AK
338 }
339
6da535d9
EJ
340 // Send session notification when force poll is performed
341 if (pItem->getPollingSession() != NULL)
342 {
343 ClientSession *session = pItem->processForcePoll();
690f7fa5 344 session->notify(NX_NOTIFY_FORCE_DCI_POLL, pItem->getOwnerId());
6da535d9
EJ
345 session->decRefCount();
346 }
5039dede 347 }
b239e165
VK
348
349 // Decrement node's usage counter
350 target->decRefCount();
351 if ((pItem->getSourceNode() != 0) && (pItem->getOwner() != NULL))
5039dede 352 {
b239e165 353 pItem->getOwner()->decRefCount();
5039dede 354 }
b239e165
VK
355 }
356 else /* target == NULL */
357 {
358 Template *n = pItem->getOwner();
359 nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
360 pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, sourceNodeId);
5039dede
AK
361 }
362
b239e165
VK
363 // Update item's last poll time and clear busy flag so item can be polled again
364 pItem->setLastPollTime(currTime);
365 pItem->clearBusyFlag();
5039dede
AK
366}
367
6fd6de0a
VK
368/**
369 * Callback for queueing DCIs
370 */
6aba3998
VK
371static void QueueItems(NetObj *object, void *data)
372{
88dc9091
VK
373 if (IsShutdownInProgress())
374 return;
375
fa179b75 376 WatchdogNotify(*((UINT32 *)data));
2885ad2e
VK
377 nxlog_debug(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
378 object->getName(), object->getId());
b239e165 379 ((DataCollectionTarget *)object)->queueItemsForPolling();
6aba3998
VK
380}
381
6fd6de0a 382/**
6da535d9 383 * Item poller thread: check nodes' items and put into the
6fd6de0a
VK
384 * data collector queue when data polling required
385 */
5039dede
AK
386static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
387{
930a2a62
VK
388 ThreadSetName("ItemPoller");
389
3c15eedb 390 UINT32 dwSum, currPos = 0;
967893bb 391 UINT32 dwTimingHistory[60 / ITEM_POLLING_INTERVAL];
5039dede
AK
392 INT64 qwStart;
393
fa179b75 394 UINT32 watchdogId = WatchdogAddThread(_T("Item Poller"), 10);
967893bb 395 memset(dwTimingHistory, 0, sizeof(UINT32) * (60 / ITEM_POLLING_INTERVAL));
5039dede 396
89135050 397 while(!IsShutdownInProgress())
5039dede
AK
398 {
399 if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL))
400 break; // Shutdown has arrived
3c15eedb 401 WatchdogNotify(watchdogId);
5039dede
AK
402 DbgPrintf(8, _T("ItemPoller: wakeup"));
403
5039dede 404 qwStart = GetCurrentTimeMs();
fa179b75
VK
405 g_idxNodeById.forEach(QueueItems, &watchdogId);
406 g_idxClusterById.forEach(QueueItems, &watchdogId);
407 g_idxMobileDeviceById.forEach(QueueItems, &watchdogId);
408 g_idxChassisById.forEach(QueueItems, &watchdogId);
ce9e00cc 409 g_idxSensorById.forEach(QueueItems, &watchdogId);
5039dede
AK
410
411 // Save last poll time
f1784ab6
VK
412 dwTimingHistory[currPos] = (UINT32)(GetCurrentTimeMs() - qwStart);
413 currPos++;
414 if (currPos == (60 / ITEM_POLLING_INTERVAL))
415 currPos = 0;
5039dede
AK
416
417 // Calculate new average for last minute
6aba3998
VK
418 dwSum = 0;
419 for(int i = 0; i < (60 / ITEM_POLLING_INTERVAL); i++)
5039dede
AK
420 dwSum += dwTimingHistory[i];
421 g_dwAvgDCIQueuingTime = dwSum / (60 / ITEM_POLLING_INTERVAL);
422 }
35f836fe 423 DbgPrintf(1, _T("Item poller thread terminated"));
5039dede
AK
424 return THREAD_OK;
425}
426
6fd6de0a
VK
427/**
428 * Statistics collection thread
429 */
5039dede
AK
430static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
431{
930a2a62
VK
432 ThreadSetName("StatCollector");
433
f1784ab6 434 UINT32 i, currPos = 0;
b239e165 435 UINT32 pollerQS[12], dataCollectorQS[12], dbWriterQS[12];
76fcb995 436 UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
f1784ab6 437 UINT32 syslogProcessingQS[12], syslogWriterQS[12];
b239e165 438 double sum1, sum2, sum3, sum4, sum5, sum8, sum9, sum10;
5039dede 439
f1784ab6 440 memset(pollerQS, 0, sizeof(UINT32) * 12);
b239e165 441 memset(dataCollectorQS, 0, sizeof(UINT32) * 12);
f1784ab6
VK
442 memset(dbWriterQS, 0, sizeof(UINT32) * 12);
443 memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
76fcb995 444 memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
f1784ab6 445 memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
f1784ab6
VK
446 memset(syslogProcessingQS, 0, sizeof(UINT32) * 12);
447 memset(syslogWriterQS, 0, sizeof(UINT32) * 12);
b239e165 448 g_dAvgDataCollectorQueueSize = 0;
5039dede 449 g_dAvgDBWriterQueueSize = 0;
cf084617 450 g_dAvgIDataWriterQueueSize = 0;
76fcb995 451 g_dAvgRawDataWriterQueueSize = 0;
cf084617 452 g_dAvgDBAndIDataWriterQueueSize = 0;
f1784ab6
VK
453 g_dAvgSyslogProcessingQueueSize = 0;
454 g_dAvgSyslogWriterQueueSize = 0;
b239e165 455 g_dAvgPollerQueueSize = 0;
7b9f85db 456 while(!SleepAndCheckForShutdown(5))
5039dede 457 {
7b9f85db
VK
458 if (!(g_flags & AF_SERVER_INITIALIZED))
459 continue;
5039dede
AK
460
461 // Get current values
b239e165
VK
462 ThreadPoolInfo poolInfo;
463 ThreadPoolGetInfo(g_dataCollectorThreadPool, &poolInfo);
464 dataCollectorQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0;
465
466 ThreadPoolGetInfo(g_pollerThreadPool, &poolInfo);
467 pollerQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0;
468
19dbc8ef
VK
469 dbWriterQS[currPos] = g_dbWriterQueue->size();
470 iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
471 rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
472 dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size();
19dbc8ef
VK
473 syslogProcessingQS[currPos] = g_syslogProcessingQueue.size();
474 syslogWriterQS[currPos] = g_syslogWriteQueue.size();
f1784ab6
VK
475 currPos++;
476 if (currPos == 12)
477 currPos = 0;
5039dede
AK
478
479 // Calculate new averages
b239e165 480 for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0, sum10 = 0; i < 12; i++)
5039dede 481 {
b239e165 482 sum1 += dataCollectorQS[i];
f1784ab6
VK
483 sum2 += dbWriterQS[i];
484 sum3 += iDataWriterQS[i];
76fcb995
VK
485 sum4 += rawDataWriterQS[i];
486 sum5 += dbAndIDataWriterQS[i];
76fcb995
VK
487 sum8 += syslogProcessingQS[i];
488 sum9 += syslogWriterQS[i];
b239e165 489 sum10 += pollerQS[i];
5039dede 490 }
b239e165 491 g_dAvgDataCollectorQueueSize = sum1 / 12;
f1784ab6
VK
492 g_dAvgDBWriterQueueSize = sum2 / 12;
493 g_dAvgIDataWriterQueueSize = sum3 / 12;
76fcb995
VK
494 g_dAvgRawDataWriterQueueSize = sum4 / 12;
495 g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
76fcb995
VK
496 g_dAvgSyslogProcessingQueueSize = sum8 / 12;
497 g_dAvgSyslogWriterQueueSize = sum9 / 12;
b239e165 498 g_dAvgPollerQueueSize = sum10 / 12;
5039dede
AK
499 }
500 return THREAD_OK;
501}
502
6fd6de0a 503/**
d140955e
VK
504 * DCI cache loader
505 */
506THREAD_RESULT THREAD_CALL CacheLoader(void *arg)
507{
930a2a62 508 ThreadSetName("CacheLoader");
d140955e
VK
509 DbgPrintf(2, _T("DCI cache loader thread started"));
510 while(true)
511 {
5eecfc25
VK
512 DCObjectInfo *ref = (DCObjectInfo *)g_dciCacheLoaderQueue.getOrBlock();
513 if (ref == INVALID_POINTER_VALUE)
d140955e
VK
514 break;
515
5eecfc25
VK
516 NetObj *object = FindObjectById(ref->getOwnerId());
517 if ((object != NULL) && object->isDataCollectionTarget())
518 {
519 object->incRefCount();
520 DCObject *dci = static_cast<DataCollectionTarget*>(object)->getDCObjectById(ref->getId(), true);
17dec829 521 if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM))
5eecfc25
VK
522 {
523 DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"),
524 ref->getName(), ref->getId(), object->getName(), object->getId());
525 static_cast<DCItem*>(dci)->reloadCache();
526 }
527 object->decRefCount();
528 }
3629bf1c 529 delete ref;
d140955e
VK
530 }
531 DbgPrintf(2, _T("DCI cache loader thread stopped"));
532 return THREAD_OK;
533}
534
535/**
ca0a0eb7
VK
536 * Threads
537 */
538static THREAD s_itemPollerThread = INVALID_THREAD_HANDLE;
539static THREAD s_statCollectorThread = INVALID_THREAD_HANDLE;
540static THREAD s_cacheLoaderThread = INVALID_THREAD_HANDLE;
541
542/**
6fd6de0a
VK
543 * Initialize data collection subsystem
544 */
b239e165 545void InitDataCollector()
5039dede 546{
ad221861
VK
547 g_dataCollectorThreadPool = ThreadPoolCreate(
548 ConfigReadInt(_T("DataCollector.ThreadPool.BaseSize"), 10),
549 ConfigReadInt(_T("DataCollector.ThreadPool.MaxSize"), 250),
550 _T("DATACOLL"));
5039dede 551
ca0a0eb7
VK
552 s_itemPollerThread = ThreadCreateEx(ItemPoller, 0, NULL);
553 s_statCollectorThread = ThreadCreateEx(StatCollector, 0, NULL);
554 s_cacheLoaderThread = ThreadCreateEx(CacheLoader, 0, NULL);
555}
556
557/**
558 * Stop data collection
559 */
560void StopDataCollection()
561{
562 ThreadJoin(s_itemPollerThread);
563 ThreadJoin(s_statCollectorThread);
564 ThreadJoin(s_cacheLoaderThread);
565 ThreadPoolDestroy(g_dataCollectorThreadPool);
5039dede
AK
566}
567
d140955e
VK
568/**
569 * Update parameter list from node
570 */
6aba3998
VK
571static void UpdateParamList(NetObj *object, void *data)
572{
86c126f5 573 ObjectArray<AgentParameterDefinition> *fullList = (ObjectArray<AgentParameterDefinition> *)data;
6aba3998 574
86c126f5 575 ObjectArray<AgentParameterDefinition> *paramList;
cc8ce218
VK
576 ((Node *)object)->openParamList(&paramList);
577 if ((paramList != NULL) && (paramList->size() > 0))
6aba3998 578 {
cc8ce218 579 for(int i = 0; i < paramList->size(); i++)
889d7ff7 580 {
86c126f5
VK
581 int j;
582 for(j = 0; j < fullList->size(); j++)
889d7ff7 583 {
86c126f5 584 if (!_tcsicmp(paramList->get(i)->getName(), fullList->get(j)->getName()))
6aba3998
VK
585 break;
586 }
587
86c126f5 588 if (j == fullList->size())
6aba3998 589 {
86c126f5 590 fullList->add(new AgentParameterDefinition(paramList->get(i)));
889d7ff7 591 }
889d7ff7 592 }
6aba3998 593 }
f1ff4cc9 594 ((Node *)object)->closeParamList();
6aba3998
VK
595}
596
d140955e
VK
597/**
598 * Update table list from node
599 */
074498ac
VK
600static void UpdateTableList(NetObj *object, void *data)
601{
86c126f5 602 ObjectArray<AgentTableDefinition> *fullList = (ObjectArray<AgentTableDefinition> *)data;
074498ac 603
86c126f5 604 ObjectArray<AgentTableDefinition> *tableList;
074498ac
VK
605 ((Node *)object)->openTableList(&tableList);
606 if ((tableList != NULL) && (tableList->size() > 0))
607 {
074498ac
VK
608 for(int i = 0; i < tableList->size(); i++)
609 {
86c126f5
VK
610 int j;
611 for(j = 0; j < fullList->size(); j++)
074498ac 612 {
86c126f5 613 if (!_tcsicmp(tableList->get(i)->getName(), fullList->get(j)->getName()))
074498ac
VK
614 break;
615 }
616
86c126f5 617 if (j == fullList->size())
074498ac 618 {
86c126f5 619 fullList->add(new AgentTableDefinition(tableList->get(i)));
074498ac
VK
620 }
621 }
622 }
623 ((Node *)object)->closeTableList();
624}
625
d140955e
VK
626/**
627 * Write full (from all nodes) agent parameters list to NXCP message
628 */
b368969c 629void WriteFullParamListToMessage(NXCPMessage *pMsg, WORD flags)
6aba3998
VK
630{
631 // Gather full parameter list
074498ac
VK
632 if (flags & 0x01)
633 {
86c126f5 634 ObjectArray<AgentParameterDefinition> fullList(64, 64, true);
074498ac
VK
635 g_idxNodeById.forEach(UpdateParamList, &fullList);
636
637 // Put list into the message
b368969c 638 pMsg->setField(VID_NUM_PARAMETERS, (UINT32)fullList.size());
967893bb 639 UINT32 varId = VID_PARAM_LIST_BASE;
86c126f5 640 for(int i = 0; i < fullList.size(); i++)
074498ac 641 {
86c126f5 642 varId += fullList.get(i)->fillMessage(pMsg, varId);
074498ac 643 }
074498ac
VK
644 }
645
646 // Gather full table list
647 if (flags & 0x02)
648 {
86c126f5 649 ObjectArray<AgentTableDefinition> fullList(64, 64, true);
074498ac
VK
650 g_idxNodeById.forEach(UpdateTableList, &fullList);
651
652 // Put list into the message
b368969c 653 pMsg->setField(VID_NUM_TABLES, (UINT32)fullList.size());
967893bb 654 UINT32 varId = VID_TABLE_LIST_BASE;
86c126f5 655 for(int i = 0; i < fullList.size(); i++)
074498ac 656 {
86c126f5 657 varId += fullList.get(i)->fillMessage(pMsg, varId);
074498ac 658 }
074498ac 659 }
5039dede 660}
9fddfb91
VK
661
662/**
663 * Get type of data collection object
664 */
665int GetDCObjectType(UINT32 nodeId, UINT32 dciId)
666{
667 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
668 if (node != NULL)
669 {
670 DCObject *dco = node->getDCObjectById(dciId);
671 if (dco != NULL)
672 {
673 return dco->getType();
674 }
675 }
676 return DCO_TYPE_ITEM; // default
677}