2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 * Interval between DCI polling
28 #define ITEM_POLLING_INTERVAL 1
33 extern Queue g_syslogProcessingQueue
;
34 extern Queue g_syslogWriteQueue
;
39 double g_dAvgPollerQueueSize
= 0;
40 double g_dAvgDBWriterQueueSize
= 0;
41 double g_dAvgIDataWriterQueueSize
= 0;
42 double g_dAvgRawDataWriterQueueSize
= 0;
43 double g_dAvgDBAndIDataWriterQueueSize
= 0;
44 double g_dAvgSyslogProcessingQueueSize
= 0;
45 double g_dAvgSyslogWriterQueueSize
= 0;
46 UINT32 g_dwAvgDCIQueuingTime
= 0;
47 Queue
g_dataCollectionQueue(4096, 256);
48 Queue g_dciCacheLoaderQueue
;
51 * Collect data for DCI
53 static void *GetItemData(DataCollectionTarget
*dcTarget
, DCItem
*pItem
, TCHAR
*pBuffer
, UINT32
*error
)
55 if (dcTarget
->getObjectClass() == OBJECT_CLUSTER
)
57 if (pItem
->isAggregateOnCluster())
59 *error
= ((Cluster
*)dcTarget
)->collectAggregatedData(pItem
, pBuffer
);
68 switch(pItem
->getDataSource())
70 case DS_INTERNAL
: // Server internal parameters (like status)
71 *error
= dcTarget
->getInternalItem(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
74 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
75 *error
= ((Node
*)dcTarget
)->getItemFromSNMP(pItem
->getSnmpPort(), pItem
->getName(), MAX_LINE_SIZE
,
76 pBuffer
, pItem
->isInterpretSnmpRawValue() ? (int)pItem
->getSnmpRawValueType() : SNMP_RAWTYPE_NONE
);
78 *error
= DCE_NOT_SUPPORTED
;
80 case DS_CHECKPOINT_AGENT
:
81 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
82 *error
= ((Node
*)dcTarget
)->getItemFromCheckPointSNMP(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
84 *error
= DCE_NOT_SUPPORTED
;
87 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
88 *error
= ((Node
*)dcTarget
)->getItemFromAgent(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
90 *error
= DCE_NOT_SUPPORTED
;
93 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
95 TCHAR name
[MAX_PARAM_NAME
];
96 _sntprintf(name
, MAX_PARAM_NAME
, _T("PDH.CounterValue(\"%s\",%d)"), pItem
->getName(), pItem
->getSampleCount());
97 *error
= ((Node
*)dcTarget
)->getItemFromAgent(name
, MAX_LINE_SIZE
, pBuffer
);
101 *error
= DCE_NOT_SUPPORTED
;
105 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
107 *error
= ((Node
*)dcTarget
)->getItemFromSMCLP(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
111 *error
= DCE_NOT_SUPPORTED
;
115 *error
= dcTarget
->getScriptItem(pItem
->getName(), MAX_LINE_SIZE
, pBuffer
);
118 *error
= DCE_NOT_SUPPORTED
;
126 * Collect data for table
128 static void *GetTableData(DataCollectionTarget
*dcTarget
, DCTable
*table
, UINT32
*error
)
130 Table
*result
= NULL
;
131 if (dcTarget
->getObjectClass() == OBJECT_CLUSTER
)
133 if (table
->isAggregateOnCluster())
135 *error
= ((Cluster
*)dcTarget
)->collectAggregatedData(table
, &result
);
144 switch(table
->getDataSource())
146 case DS_NATIVE_AGENT
:
147 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
149 *error
= ((Node
*)dcTarget
)->getTableFromAgent(table
->getName(), &result
);
150 if ((*error
== DCE_SUCCESS
) && (result
!= NULL
))
151 table
->updateResultColumns(result
);
155 *error
= DCE_NOT_SUPPORTED
;
159 if (dcTarget
->getObjectClass() == OBJECT_NODE
)
161 *error
= ((Node
*)dcTarget
)->getTableFromSNMP(table
->getSnmpPort(), table
->getName(), table
->getColumns(), &result
);
162 if ((*error
== DCE_SUCCESS
) && (result
!= NULL
))
163 table
->updateResultColumns(result
);
167 *error
= DCE_NOT_SUPPORTED
;
171 *error
= DCE_NOT_SUPPORTED
;
181 static THREAD_RESULT THREAD_CALL
DataCollector(void *pArg
)
185 TCHAR
*pBuffer
= (TCHAR
*)malloc(MAX_LINE_SIZE
* sizeof(TCHAR
));
186 while(!IsShutdownInProgress())
188 DCObject
*pItem
= (DCObject
*)g_dataCollectionQueue
.getOrBlock();
189 DataCollectionTarget
*target
= (DataCollectionTarget
*)pItem
->getOwner();
191 if (pItem
->isScheduledForDeletion())
193 nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
194 pItem
->getId(), pItem
->getName(), (target
!= NULL
) ? (int)target
->getId() : -1);
195 pItem
->deleteFromDatabase();
202 nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
203 pItem
->getId(), pItem
->getName());
205 // Update item's last poll time and clear busy flag so item can be polled again
206 pItem
->setLastPollTime(time(NULL
));
207 pItem
->setBusyFlag(FALSE
);
211 DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
212 pItem
->getId(), pItem
->getName(), (target
!= NULL
) ? (int)target
->getId() : -1, pItem
->getSourceNode());
213 UINT32 sourceNodeId
= target
->getEffectiveSourceNode(pItem
);
214 if (sourceNodeId
!= 0)
216 Node
*sourceNode
= (Node
*)FindObjectById(sourceNodeId
, OBJECT_NODE
);
217 if (sourceNode
!= NULL
)
219 if (((target
->getObjectClass() == OBJECT_CHASSIS
) && (((Chassis
*)target
)->getControllerId() == sourceNodeId
)) ||
220 sourceNode
->isTrustedNode(target
->getId()))
223 target
->incRefCount();
227 // Change item's status to _T("not supported")
228 pItem
->setStatus(ITEM_STATUS_NOT_SUPPORTED
, true);
229 target
->decRefCount();
235 target
->decRefCount();
240 time_t currTime
= time(NULL
);
243 if (!IsShutdownInProgress())
246 switch(pItem
->getType())
249 data
= GetItemData(target
, (DCItem
*)pItem
, pBuffer
, &dwError
);
252 data
= GetTableData(target
, (DCTable
*)pItem
, &dwError
);
256 dwError
= DCE_NOT_SUPPORTED
;
260 // Transform and store received value into database or handle error
264 if (pItem
->getStatus() == ITEM_STATUS_NOT_SUPPORTED
)
265 pItem
->setStatus(ITEM_STATUS_ACTIVE
, true);
266 if (!((DataCollectionTarget
*)pItem
->getOwner())->processNewDCValue(pItem
, currTime
, data
))
268 // value processing failed, convert to data collection error
269 pItem
->processNewError(false);
273 if (pItem
->getStatus() == ITEM_STATUS_NOT_SUPPORTED
)
274 pItem
->setStatus(ITEM_STATUS_ACTIVE
, true);
275 pItem
->processNewError(false);
277 case DCE_NO_SUCH_INSTANCE
:
278 pItem
->processNewError(true);
280 case DCE_NOT_SUPPORTED
:
281 // Change item's status
282 pItem
->setStatus(ITEM_STATUS_NOT_SUPPORTED
, true);
287 // Decrement node's usage counter
288 target
->decRefCount();
289 if ((pItem
->getSourceNode() != 0) && (pItem
->getOwner() != NULL
))
291 pItem
->getOwner()->decRefCount();
294 else /* target == NULL */
296 Template
*n
= pItem
->getOwner();
297 nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
298 pItem
->getId(), pItem
->getName(), (n
!= NULL
) ? (int)n
->getId() : -1, sourceNodeId
);
301 // Update item's last poll time and clear busy flag so item can be polled again
302 pItem
->setLastPollTime(currTime
);
303 pItem
->setBusyFlag(FALSE
);
307 DbgPrintf(1, _T("Data collector thread terminated"));
312 * Callback for queueing DCIs
314 static void QueueItems(NetObj
*object
, void *data
)
316 if (IsShutdownInProgress())
319 DbgPrintf(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
320 object
->getName(), object
->getId());
321 ((DataCollectionTarget
*)object
)->queueItemsForPolling(&g_dataCollectionQueue
);
325 * Item poller thread: check nodes' items and put into the
326 * data collector queue when data polling required
328 static THREAD_RESULT THREAD_CALL
ItemPoller(void *pArg
)
330 UINT32 dwSum
, dwWatchdogId
, currPos
= 0;
331 UINT32 dwTimingHistory
[60 / ITEM_POLLING_INTERVAL
];
334 dwWatchdogId
= WatchdogAddThread(_T("Item Poller"), 20);
335 memset(dwTimingHistory
, 0, sizeof(UINT32
) * (60 / ITEM_POLLING_INTERVAL
));
337 while(!IsShutdownInProgress())
339 if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL
))
340 break; // Shutdown has arrived
341 WatchdogNotify(dwWatchdogId
);
342 DbgPrintf(8, _T("ItemPoller: wakeup"));
344 qwStart
= GetCurrentTimeMs();
345 g_idxNodeById
.forEach(QueueItems
, NULL
);
346 g_idxClusterById
.forEach(QueueItems
, NULL
);
347 g_idxMobileDeviceById
.forEach(QueueItems
, NULL
);
348 g_idxChassisById
.forEach(QueueItems
, NULL
);
350 // Save last poll time
351 dwTimingHistory
[currPos
] = (UINT32
)(GetCurrentTimeMs() - qwStart
);
353 if (currPos
== (60 / ITEM_POLLING_INTERVAL
))
356 // Calculate new average for last minute
358 for(int i
= 0; i
< (60 / ITEM_POLLING_INTERVAL
); i
++)
359 dwSum
+= dwTimingHistory
[i
];
360 g_dwAvgDCIQueuingTime
= dwSum
/ (60 / ITEM_POLLING_INTERVAL
);
362 DbgPrintf(1, _T("Item poller thread terminated"));
367 * Statistics collection thread
369 static THREAD_RESULT THREAD_CALL
StatCollector(void *pArg
)
371 UINT32 i
, currPos
= 0;
372 UINT32 pollerQS
[12], dbWriterQS
[12];
373 UINT32 iDataWriterQS
[12], rawDataWriterQS
[12], dbAndIDataWriterQS
[12];
374 UINT32 syslogProcessingQS
[12], syslogWriterQS
[12];
375 double sum1
, sum2
, sum3
, sum4
, sum5
, sum8
, sum9
;
377 memset(pollerQS
, 0, sizeof(UINT32
) * 12);
378 memset(dbWriterQS
, 0, sizeof(UINT32
) * 12);
379 memset(iDataWriterQS
, 0, sizeof(UINT32
) * 12);
380 memset(rawDataWriterQS
, 0, sizeof(UINT32
) * 12);
381 memset(dbAndIDataWriterQS
, 0, sizeof(UINT32
) * 12);
382 memset(syslogProcessingQS
, 0, sizeof(UINT32
) * 12);
383 memset(syslogWriterQS
, 0, sizeof(UINT32
) * 12);
384 g_dAvgPollerQueueSize
= 0;
385 g_dAvgDBWriterQueueSize
= 0;
386 g_dAvgIDataWriterQueueSize
= 0;
387 g_dAvgRawDataWriterQueueSize
= 0;
388 g_dAvgDBAndIDataWriterQueueSize
= 0;
389 g_dAvgSyslogProcessingQueueSize
= 0;
390 g_dAvgSyslogWriterQueueSize
= 0;
391 while(!IsShutdownInProgress())
393 if (SleepAndCheckForShutdown(5))
394 break; // Shutdown has arrived
396 // Get current values
397 pollerQS
[currPos
] = g_dataCollectionQueue
.size();
398 dbWriterQS
[currPos
] = g_dbWriterQueue
->size();
399 iDataWriterQS
[currPos
] = g_dciDataWriterQueue
->size();
400 rawDataWriterQS
[currPos
] = g_dciRawDataWriterQueue
->size();
401 dbAndIDataWriterQS
[currPos
] = g_dbWriterQueue
->size() + g_dciDataWriterQueue
->size() + g_dciRawDataWriterQueue
->size();
402 syslogProcessingQS
[currPos
] = g_syslogProcessingQueue
.size();
403 syslogWriterQS
[currPos
] = g_syslogWriteQueue
.size();
408 // Calculate new averages
409 for(i
= 0, sum1
= 0, sum2
= 0, sum3
= 0, sum4
= 0, sum5
= 0, sum8
= 0, sum9
= 0; i
< 12; i
++)
412 sum2
+= dbWriterQS
[i
];
413 sum3
+= iDataWriterQS
[i
];
414 sum4
+= rawDataWriterQS
[i
];
415 sum5
+= dbAndIDataWriterQS
[i
];
416 sum8
+= syslogProcessingQS
[i
];
417 sum9
+= syslogWriterQS
[i
];
419 g_dAvgPollerQueueSize
= sum1
/ 12;
420 g_dAvgDBWriterQueueSize
= sum2
/ 12;
421 g_dAvgIDataWriterQueueSize
= sum3
/ 12;
422 g_dAvgRawDataWriterQueueSize
= sum4
/ 12;
423 g_dAvgDBAndIDataWriterQueueSize
= sum5
/ 12;
424 g_dAvgSyslogProcessingQueueSize
= sum8
/ 12;
425 g_dAvgSyslogWriterQueueSize
= sum9
/ 12;
433 THREAD_RESULT THREAD_CALL
CacheLoader(void *arg
)
435 DbgPrintf(2, _T("DCI cache loader thread started"));
438 DCItem
*dci
= (DCItem
*)g_dciCacheLoaderQueue
.getOrBlock();
439 if (dci
== INVALID_POINTER_VALUE
)
442 DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"),
443 dci
->getName(), dci
->getId(), dci
->getOwnerName(), dci
->getOwnerId());
445 dci
->getOwner()->decRefCount();
447 DbgPrintf(2, _T("DCI cache loader thread stopped"));
452 * Initialize data collection subsystem
454 BOOL
InitDataCollector()
456 int i
, iNumCollectors
;
458 // Start data collection threads
459 iNumCollectors
= ConfigReadInt(_T("NumberOfDataCollectors"), 10);
460 for(i
= 0; i
< iNumCollectors
; i
++)
461 ThreadCreate(DataCollector
, 0, NULL
);
463 ThreadCreate(ItemPoller
, 0, NULL
);
464 ThreadCreate(StatCollector
, 0, NULL
);
465 ThreadCreate(CacheLoader
, 0, NULL
);
471 * Update parameter list from node
473 static void UpdateParamList(NetObj
*object
, void *data
)
475 ObjectArray
<AgentParameterDefinition
> *fullList
= (ObjectArray
<AgentParameterDefinition
> *)data
;
477 ObjectArray
<AgentParameterDefinition
> *paramList
;
478 ((Node
*)object
)->openParamList(¶mList
);
479 if ((paramList
!= NULL
) && (paramList
->size() > 0))
481 for(int i
= 0; i
< paramList
->size(); i
++)
484 for(j
= 0; j
< fullList
->size(); j
++)
486 if (!_tcsicmp(paramList
->get(i
)->getName(), fullList
->get(j
)->getName()))
490 if (j
== fullList
->size())
492 fullList
->add(new AgentParameterDefinition(paramList
->get(i
)));
496 ((Node
*)object
)->closeParamList();
500 * Update table list from node
502 static void UpdateTableList(NetObj
*object
, void *data
)
504 ObjectArray
<AgentTableDefinition
> *fullList
= (ObjectArray
<AgentTableDefinition
> *)data
;
506 ObjectArray
<AgentTableDefinition
> *tableList
;
507 ((Node
*)object
)->openTableList(&tableList
);
508 if ((tableList
!= NULL
) && (tableList
->size() > 0))
510 for(int i
= 0; i
< tableList
->size(); i
++)
513 for(j
= 0; j
< fullList
->size(); j
++)
515 if (!_tcsicmp(tableList
->get(i
)->getName(), fullList
->get(j
)->getName()))
519 if (j
== fullList
->size())
521 fullList
->add(new AgentTableDefinition(tableList
->get(i
)));
525 ((Node
*)object
)->closeTableList();
529 * Write full (from all nodes) agent parameters list to NXCP message
531 void WriteFullParamListToMessage(NXCPMessage
*pMsg
, WORD flags
)
533 // Gather full parameter list
536 ObjectArray
<AgentParameterDefinition
> fullList(64, 64, true);
537 g_idxNodeById
.forEach(UpdateParamList
, &fullList
);
539 // Put list into the message
540 pMsg
->setField(VID_NUM_PARAMETERS
, (UINT32
)fullList
.size());
541 UINT32 varId
= VID_PARAM_LIST_BASE
;
542 for(int i
= 0; i
< fullList
.size(); i
++)
544 varId
+= fullList
.get(i
)->fillMessage(pMsg
, varId
);
548 // Gather full table list
551 ObjectArray
<AgentTableDefinition
> fullList(64, 64, true);
552 g_idxNodeById
.forEach(UpdateTableList
, &fullList
);
554 // Put list into the message
555 pMsg
->setField(VID_NUM_TABLES
, (UINT32
)fullList
.size());
556 UINT32 varId
= VID_TABLE_LIST_BASE
;
557 for(int i
= 0; i
< fullList
.size(); i
++)
559 varId
+= fullList
.get(i
)->fillMessage(pMsg
, varId
);
565 * Get type of data collection object
567 int GetDCObjectType(UINT32 nodeId
, UINT32 dciId
)
569 Node
*node
= (Node
*)FindObjectById(nodeId
, OBJECT_NODE
);
572 DCObject
*dco
= node
->getDCObjectById(dciId
);
575 return dco
->getType();
578 return DCO_TYPE_ITEM
; // default