2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 * Interval between DCI polling
28 #define ITEM_POLLING_INTERVAL 1
33 extern Queue g_syslogProcessingQueue
;
34 extern Queue g_syslogWriteQueue
;
35 extern ThreadPool
*g_pollerThreadPool
;
38 * Thread pool for data collectors
40 ThreadPool
*g_dataCollectorThreadPool
= NULL
;
45 double g_dAvgDataCollectorQueueSize
= 0;
46 double g_dAvgPollerQueueSize
= 0;
47 double g_dAvgDBWriterQueueSize
= 0;
48 double g_dAvgIDataWriterQueueSize
= 0;
49 double g_dAvgRawDataWriterQueueSize
= 0;
50 double g_dAvgDBAndIDataWriterQueueSize
= 0;
51 double g_dAvgSyslogProcessingQueueSize
= 0;
52 double g_dAvgSyslogWriterQueueSize
= 0;
53 UINT32 g_dwAvgDCIQueuingTime
= 0;
54 Queue g_dciCacheLoaderQueue
;
57 * Collect data for DCI
59 static void *GetItemData(DataCollectionTarget
*dcTarget
, DCItem
*pItem
, TCHAR
*pBuffer
, UINT32
*error
)
61 if (dcTarget
->getObjectClass() == OBJECT_CLUSTER
)
63 if (pItem
->isAggregateOnCluster())
65 *error
= ((Cluster
*)dcTarget
)->collectAggregatedData(pItem
, pBuffer
);
74 switch(pItem
->getDataSource())
76 case DS_INTERNAL
: // Server internal parameters (like status)
77 *error
= dcTarget
->getInternalItem(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
80 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
81 *error
= ((Node
*)dcTarget
)->getItemFromSNMP(pItem
->getSnmpPort(), pItem
->getName(), MAX_LINE_SIZE
,
82 pBuffer
, pItem
->isInterpretSnmpRawValue() ? (int)pItem
->getSnmpRawValueType() : SNMP_RAWTYPE_NONE
);
84 *error
= DCE_NOT_SUPPORTED
;
86 case DS_CHECKPOINT_AGENT
:
87 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
88 *error
= ((Node
*)dcTarget
)->getItemFromCheckPointSNMP(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
90 *error
= DCE_NOT_SUPPORTED
;
93 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
94 *error
= ((Node
*)dcTarget
)->getItemFromAgent(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
95 else if (dcTarget
->getObjectClass() == OBJECT_SENSOR
)
96 *error
= ((Sensor
*)dcTarget
)->getItemFromAgent(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
98 *error
= DCE_NOT_SUPPORTED
;
101 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
103 TCHAR name
[MAX_PARAM_NAME
];
104 _sntprintf(name
, MAX_PARAM_NAME
, _T("PDH.CounterValue(\"%s\",%d)"), (const TCHAR
*)EscapeStringForAgent(pItem
->getName()), pItem
->getSampleCount());
105 *error
= ((Node
*)dcTarget
)->getItemFromAgent(name
, MAX_LINE_SIZE
, pBuffer
);
109 *error
= DCE_NOT_SUPPORTED
;
113 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
115 UINT32 proxyId
= ((Node
*)dcTarget
)->getSshProxy();
118 if (IsZoningEnabled())
120 Zone
*zone
= FindZoneByUIN(((Node
*)dcTarget
)->getZoneUIN());
121 if ((zone
!= NULL
) && (zone
->getProxyNodeId() != 0))
122 proxyId
= zone
->getProxyNodeId();
124 proxyId
= g_dwMgmtNode
;
128 proxyId
= g_dwMgmtNode
;
131 Node
*proxy
= (Node
*)FindObjectById(proxyId
, OBJECT_NODE
);
134 TCHAR name
[MAX_PARAM_NAME
], ipAddr
[64];
135 _sntprintf(name
, MAX_PARAM_NAME
, _T("SSH.Command(%s,\"%s\",\"%s\",\"%s\")"),
136 ((Node
*)dcTarget
)->getIpAddress().toString(ipAddr
),
137 (const TCHAR
*)EscapeStringForAgent(((Node
*)dcTarget
)->getSshLogin()),
138 (const TCHAR
*)EscapeStringForAgent(((Node
*)dcTarget
)->getSshPassword()),
139 (const TCHAR
*)EscapeStringForAgent(pItem
->getName()));
140 *error
= proxy
->getItemFromAgent(name
, MAX_LINE_SIZE
, pBuffer
);
144 *error
= DCE_COMM_ERROR
;
149 *error
= DCE_NOT_SUPPORTED
;
153 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
155 *error
= ((Node
*)dcTarget
)->getItemFromSMCLP(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
159 *error
= DCE_NOT_SUPPORTED
;
163 *error
= dcTarget
->getScriptItem(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
, (DataCollectionTarget
*)pItem
->getOwner());
166 *error
= DCE_NOT_SUPPORTED
;
174 * Collect data for table
176 static void *GetTableData(DataCollectionTarget
*dcTarget
, DCTable
*table
, UINT32
*error
)
178 Table
*result
= NULL
;
179 if (dcTarget
->getObjectClass() == OBJECT_CLUSTER
)
181 if (table
->isAggregateOnCluster())
183 *error
= ((Cluster
*)dcTarget
)->collectAggregatedData(table
, &result
);
192 switch(table
->getDataSource())
194 case DS_NATIVE_AGENT
:
195 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
197 *error
= ((Node
*)dcTarget
)->getTableFromAgent(table
->getName(), &result
);
198 if ((*error
== DCE_SUCCESS
) && (result
!= NULL
))
199 table
->updateResultColumns(result
);
203 *error
= DCE_NOT_SUPPORTED
;
207 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
209 *error
= ((Node
*)dcTarget
)->getTableFromSNMP(table
->getSnmpPort(), table
->getName(), table
->getColumns(), &result
);
210 if ((*error
== DCE_SUCCESS
) && (result
!= NULL
))
211 table
->updateResultColumns(result
);
215 *error
= DCE_NOT_SUPPORTED
;
219 *error
= dcTarget
->getScriptTable(table
->getName(), &result
, (DataCollectionTarget
*)table
->getOwner());
222 *error
= DCE_NOT_SUPPORTED
;
232 void DataCollector(void *arg
)
234 DCObject
*pItem
= static_cast<DCObject
*>(arg
);
235 DataCollectionTarget
*target
= static_cast<DataCollectionTarget
*>(pItem
->getOwner());
237 if (pItem
->isScheduledForDeletion())
239 nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
240 pItem
->getId(), pItem
->getName(), (target
!= NULL
) ? (int)target
->getId() : -1);
241 pItem
->deleteFromDatabase();
243 target
->decRefCount();
249 nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
250 pItem
->getId(), pItem
->getName());
252 // Update item's last poll time and clear busy flag so item can be polled again
253 pItem
->setLastPollTime(time(NULL
));
254 pItem
->clearBusyFlag();
258 DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
259 pItem
->getId(), pItem
->getName(), (target
!= NULL
) ? (int)target
->getId() : -1, pItem
->getSourceNode());
260 UINT32 sourceNodeId
= target
->getEffectiveSourceNode(pItem
);
261 if (sourceNodeId
!= 0)
263 Node
*sourceNode
= (Node
*)FindObjectById(sourceNodeId
, OBJECT_NODE
);
264 if (sourceNode
!= NULL
)
266 if (((target
->getObjectClass() == OBJECT_CHASSIS
) && (((Chassis
*)target
)->getControllerId() == sourceNodeId
)) ||
267 sourceNode
->isTrustedNode(target
->getId()))
270 target
->incRefCount();
274 // Change item's status to "not supported"
275 pItem
->setStatus(ITEM_STATUS_NOT_SUPPORTED
, true);
276 target
->decRefCount();
282 target
->decRefCount();
287 time_t currTime
= time(NULL
);
290 if (!IsShutdownInProgress())
293 TCHAR buffer
[MAX_LINE_SIZE
];
295 switch(pItem
->getType())
298 data
= GetItemData(target
, (DCItem
*)pItem
, buffer
, &error
);
301 data
= GetTableData(target
, (DCTable
*)pItem
, &error
);
305 error
= DCE_NOT_SUPPORTED
;
309 // Transform and store received value into database or handle error
313 if (pItem
->getStatus() == ITEM_STATUS_NOT_SUPPORTED
)
314 pItem
->setStatus(ITEM_STATUS_ACTIVE
, true);
315 if (!((DataCollectionTarget
*)pItem
->getOwner())->processNewDCValue(pItem
, currTime
, data
))
317 // value processing failed, convert to data collection error
318 pItem
->processNewError(false);
321 case DCE_COLLECTION_ERROR
:
322 if (pItem
->getStatus() == ITEM_STATUS_NOT_SUPPORTED
)
323 pItem
->setStatus(ITEM_STATUS_ACTIVE
, true);
324 pItem
->processNewError(false);
326 case DCE_NO_SUCH_INSTANCE
:
327 if (pItem
->getStatus() == ITEM_STATUS_NOT_SUPPORTED
)
328 pItem
->setStatus(ITEM_STATUS_ACTIVE
, true);
329 pItem
->processNewError(true);
332 pItem
->processNewError(false);
334 case DCE_NOT_SUPPORTED
:
335 // Change item's status
336 pItem
->setStatus(ITEM_STATUS_NOT_SUPPORTED
, true);
340 // Send session notification when force poll is performed
341 if (pItem
->getPollingSession() != NULL
)
343 ClientSession
*session
= pItem
->processForcePoll();
344 session
->notify(NX_NOTIFY_FORCE_DCI_POLL
, pItem
->getOwnerId());
345 session
->decRefCount();
349 // Decrement node's usage counter
350 target
->decRefCount();
351 if ((pItem
->getSourceNode() != 0) && (pItem
->getOwner() != NULL
))
353 pItem
->getOwner()->decRefCount();
356 else /* target == NULL */
358 Template
*n
= pItem
->getOwner();
359 nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
360 pItem
->getId(), pItem
->getName(), (n
!= NULL
) ? (int)n
->getId() : -1, sourceNodeId
);
363 // Update item's last poll time and clear busy flag so item can be polled again
364 pItem
->setLastPollTime(currTime
);
365 pItem
->clearBusyFlag();
369 * Callback for queueing DCIs
371 static void QueueItems(NetObj
*object
, void *data
)
373 if (IsShutdownInProgress())
376 WatchdogNotify(*((UINT32
*)data
));
377 nxlog_debug(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
378 object
->getName(), object
->getId());
379 ((DataCollectionTarget
*)object
)->queueItemsForPolling();
383 * Item poller thread: check nodes' items and put into the
384 * data collector queue when data polling required
386 static THREAD_RESULT THREAD_CALL
ItemPoller(void *pArg
)
388 ThreadSetName("ItemPoller");
390 UINT32 dwSum
, currPos
= 0;
391 UINT32 dwTimingHistory
[60 / ITEM_POLLING_INTERVAL
];
394 UINT32 watchdogId
= WatchdogAddThread(_T("Item Poller"), 10);
395 memset(dwTimingHistory
, 0, sizeof(UINT32
) * (60 / ITEM_POLLING_INTERVAL
));
397 while(!IsShutdownInProgress())
399 if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL
))
400 break; // Shutdown has arrived
401 WatchdogNotify(watchdogId
);
402 DbgPrintf(8, _T("ItemPoller: wakeup"));
404 qwStart
= GetCurrentTimeMs();
405 g_idxNodeById
.forEach(QueueItems
, &watchdogId
);
406 g_idxClusterById
.forEach(QueueItems
, &watchdogId
);
407 g_idxMobileDeviceById
.forEach(QueueItems
, &watchdogId
);
408 g_idxChassisById
.forEach(QueueItems
, &watchdogId
);
409 g_idxSensorById
.forEach(QueueItems
, &watchdogId
);
411 // Save last poll time
412 dwTimingHistory
[currPos
] = (UINT32
)(GetCurrentTimeMs() - qwStart
);
414 if (currPos
== (60 / ITEM_POLLING_INTERVAL
))
417 // Calculate new average for last minute
419 for(int i
= 0; i
< (60 / ITEM_POLLING_INTERVAL
); i
++)
420 dwSum
+= dwTimingHistory
[i
];
421 g_dwAvgDCIQueuingTime
= dwSum
/ (60 / ITEM_POLLING_INTERVAL
);
423 DbgPrintf(1, _T("Item poller thread terminated"));
428 * Statistics collection thread
430 static THREAD_RESULT THREAD_CALL
StatCollector(void *pArg
)
432 ThreadSetName("StatCollector");
434 UINT32 i
, currPos
= 0;
435 UINT32 pollerQS
[12], dataCollectorQS
[12], dbWriterQS
[12];
436 UINT32 iDataWriterQS
[12], rawDataWriterQS
[12], dbAndIDataWriterQS
[12];
437 UINT32 syslogProcessingQS
[12], syslogWriterQS
[12];
438 double sum1
, sum2
, sum3
, sum4
, sum5
, sum8
, sum9
, sum10
;
440 memset(pollerQS
, 0, sizeof(UINT32
) * 12);
441 memset(dataCollectorQS
, 0, sizeof(UINT32
) * 12);
442 memset(dbWriterQS
, 0, sizeof(UINT32
) * 12);
443 memset(iDataWriterQS
, 0, sizeof(UINT32
) * 12);
444 memset(rawDataWriterQS
, 0, sizeof(UINT32
) * 12);
445 memset(dbAndIDataWriterQS
, 0, sizeof(UINT32
) * 12);
446 memset(syslogProcessingQS
, 0, sizeof(UINT32
) * 12);
447 memset(syslogWriterQS
, 0, sizeof(UINT32
) * 12);
448 g_dAvgDataCollectorQueueSize
= 0;
449 g_dAvgDBWriterQueueSize
= 0;
450 g_dAvgIDataWriterQueueSize
= 0;
451 g_dAvgRawDataWriterQueueSize
= 0;
452 g_dAvgDBAndIDataWriterQueueSize
= 0;
453 g_dAvgSyslogProcessingQueueSize
= 0;
454 g_dAvgSyslogWriterQueueSize
= 0;
455 g_dAvgPollerQueueSize
= 0;
456 while(!IsShutdownInProgress())
458 if (SleepAndCheckForShutdown(5))
459 break; // Shutdown has arrived
461 // Get current values
462 ThreadPoolInfo poolInfo
;
463 ThreadPoolGetInfo(g_dataCollectorThreadPool
, &poolInfo
);
464 dataCollectorQS
[currPos
] = (poolInfo
.activeRequests
> poolInfo
.curThreads
) ? poolInfo
.activeRequests
- poolInfo
.curThreads
: 0;
466 ThreadPoolGetInfo(g_pollerThreadPool
, &poolInfo
);
467 pollerQS
[currPos
] = (poolInfo
.activeRequests
> poolInfo
.curThreads
) ? poolInfo
.activeRequests
- poolInfo
.curThreads
: 0;
469 dbWriterQS
[currPos
] = g_dbWriterQueue
->size();
470 iDataWriterQS
[currPos
] = g_dciDataWriterQueue
->size();
471 rawDataWriterQS
[currPos
] = g_dciRawDataWriterQueue
->size();
472 dbAndIDataWriterQS
[currPos
] = g_dbWriterQueue
->size() + g_dciDataWriterQueue
->size() + g_dciRawDataWriterQueue
->size();
473 syslogProcessingQS
[currPos
] = g_syslogProcessingQueue
.size();
474 syslogWriterQS
[currPos
] = g_syslogWriteQueue
.size();
479 // Calculate new averages
480 for(i
= 0, sum1
= 0, sum2
= 0, sum3
= 0, sum4
= 0, sum5
= 0, sum8
= 0, sum9
= 0, sum10
= 0; i
< 12; i
++)
482 sum1
+= dataCollectorQS
[i
];
483 sum2
+= dbWriterQS
[i
];
484 sum3
+= iDataWriterQS
[i
];
485 sum4
+= rawDataWriterQS
[i
];
486 sum5
+= dbAndIDataWriterQS
[i
];
487 sum8
+= syslogProcessingQS
[i
];
488 sum9
+= syslogWriterQS
[i
];
489 sum10
+= pollerQS
[i
];
491 g_dAvgDataCollectorQueueSize
= sum1
/ 12;
492 g_dAvgDBWriterQueueSize
= sum2
/ 12;
493 g_dAvgIDataWriterQueueSize
= sum3
/ 12;
494 g_dAvgRawDataWriterQueueSize
= sum4
/ 12;
495 g_dAvgDBAndIDataWriterQueueSize
= sum5
/ 12;
496 g_dAvgSyslogProcessingQueueSize
= sum8
/ 12;
497 g_dAvgSyslogWriterQueueSize
= sum9
/ 12;
498 g_dAvgPollerQueueSize
= sum10
/ 12;
506 THREAD_RESULT THREAD_CALL
CacheLoader(void *arg
)
508 ThreadSetName("CacheLoader");
509 DbgPrintf(2, _T("DCI cache loader thread started"));
512 DCObjectInfo
*ref
= (DCObjectInfo
*)g_dciCacheLoaderQueue
.getOrBlock();
513 if (ref
== INVALID_POINTER_VALUE
)
516 NetObj
*object
= FindObjectById(ref
->getOwnerId());
517 if ((object
!= NULL
) && object
->isDataCollectionTarget())
519 object
->incRefCount();
520 DCObject
*dci
= static_cast<DataCollectionTarget
*>(object
)->getDCObjectById(ref
->getId(), true);
521 if ((dci
!= NULL
) && (dci
->getType() == DCO_TYPE_ITEM
))
523 DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"),
524 ref
->getName(), ref
->getId(), object
->getName(), object
->getId());
525 static_cast<DCItem
*>(dci
)->reloadCache();
527 object
->decRefCount();
531 DbgPrintf(2, _T("DCI cache loader thread stopped"));
536 * Initialize data collection subsystem
538 void InitDataCollector()
540 int i
, iNumCollectors
;
542 g_dataCollectorThreadPool
= ThreadPoolCreate(ConfigReadInt(_T("DataCollectorThreadPoolBaseSize"), 10), ConfigReadInt(_T("DataCollectorThreadPoolMaxSize"), 250), _T("DATACOLL"));
544 ThreadCreate(ItemPoller
, 0, NULL
);
545 ThreadCreate(StatCollector
, 0, NULL
);
546 ThreadCreate(CacheLoader
, 0, NULL
);
550 * Update parameter list from node
552 static void UpdateParamList(NetObj
*object
, void *data
)
554 ObjectArray
<AgentParameterDefinition
> *fullList
= (ObjectArray
<AgentParameterDefinition
> *)data
;
556 ObjectArray
<AgentParameterDefinition
> *paramList
;
557 ((Node
*)object
)->openParamList(¶mList
);
558 if ((paramList
!= NULL
) && (paramList
->size() > 0))
560 for(int i
= 0; i
< paramList
->size(); i
++)
563 for(j
= 0; j
< fullList
->size(); j
++)
565 if (!_tcsicmp(paramList
->get(i
)->getName(), fullList
->get(j
)->getName()))
569 if (j
== fullList
->size())
571 fullList
->add(new AgentParameterDefinition(paramList
->get(i
)));
575 ((Node
*)object
)->closeParamList();
579 * Update table list from node
581 static void UpdateTableList(NetObj
*object
, void *data
)
583 ObjectArray
<AgentTableDefinition
> *fullList
= (ObjectArray
<AgentTableDefinition
> *)data
;
585 ObjectArray
<AgentTableDefinition
> *tableList
;
586 ((Node
*)object
)->openTableList(&tableList
);
587 if ((tableList
!= NULL
) && (tableList
->size() > 0))
589 for(int i
= 0; i
< tableList
->size(); i
++)
592 for(j
= 0; j
< fullList
->size(); j
++)
594 if (!_tcsicmp(tableList
->get(i
)->getName(), fullList
->get(j
)->getName()))
598 if (j
== fullList
->size())
600 fullList
->add(new AgentTableDefinition(tableList
->get(i
)));
604 ((Node
*)object
)->closeTableList();
608 * Write full (from all nodes) agent parameters list to NXCP message
610 void WriteFullParamListToMessage(NXCPMessage
*pMsg
, WORD flags
)
612 // Gather full parameter list
615 ObjectArray
<AgentParameterDefinition
> fullList(64, 64, true);
616 g_idxNodeById
.forEach(UpdateParamList
, &fullList
);
618 // Put list into the message
619 pMsg
->setField(VID_NUM_PARAMETERS
, (UINT32
)fullList
.size());
620 UINT32 varId
= VID_PARAM_LIST_BASE
;
621 for(int i
= 0; i
< fullList
.size(); i
++)
623 varId
+= fullList
.get(i
)->fillMessage(pMsg
, varId
);
627 // Gather full table list
630 ObjectArray
<AgentTableDefinition
> fullList(64, 64, true);
631 g_idxNodeById
.forEach(UpdateTableList
, &fullList
);
633 // Put list into the message
634 pMsg
->setField(VID_NUM_TABLES
, (UINT32
)fullList
.size());
635 UINT32 varId
= VID_TABLE_LIST_BASE
;
636 for(int i
= 0; i
< fullList
.size(); i
++)
638 varId
+= fullList
.get(i
)->fillMessage(pMsg
, varId
);
644 * Get type of data collection object
646 int GetDCObjectType(UINT32 nodeId
, UINT32 dciId
)
648 Node
*node
= (Node
*)FindObjectById(nodeId
, OBJECT_NODE
);
651 DCObject
*dco
= node
->getDCObjectById(dciId
);
654 return dco
->getType();
657 return DCO_TYPE_ITEM
; // default