all pollers converted to single thread pool
[public/netxms.git] / src / server / core / datacoll.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Interval between DCI polling
27 */
28 #define ITEM_POLLING_INTERVAL 1
29
30 /**
31 * Externals
32 */
33 extern Queue g_syslogProcessingQueue;
34 extern Queue g_syslogWriteQueue;
35
36 /**
37 * Global data
38 */
39 double g_dAvgPollerQueueSize = 0;
40 double g_dAvgDBWriterQueueSize = 0;
41 double g_dAvgIDataWriterQueueSize = 0;
42 double g_dAvgRawDataWriterQueueSize = 0;
43 double g_dAvgDBAndIDataWriterQueueSize = 0;
44 double g_dAvgSyslogProcessingQueueSize = 0;
45 double g_dAvgSyslogWriterQueueSize = 0;
46 UINT32 g_dwAvgDCIQueuingTime = 0;
47 Queue g_dataCollectionQueue(4096, 256);
48 Queue g_dciCacheLoaderQueue;
49
50 /**
51 * Collect data for DCI
52 */
53 static void *GetItemData(DataCollectionTarget *dcTarget, DCItem *pItem, TCHAR *pBuffer, UINT32 *error)
54 {
55 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
56 {
57 if (pItem->isAggregateOnCluster())
58 {
59 *error = ((Cluster *)dcTarget)->collectAggregatedData(pItem, pBuffer);
60 }
61 else
62 {
63 *error = DCE_IGNORE;
64 }
65 }
66 else
67 {
68 switch(pItem->getDataSource())
69 {
70 case DS_INTERNAL: // Server internal parameters (like status)
71 *error = dcTarget->getInternalItem(pItem->getName(), MAX_LINE_SIZE, pBuffer);
72 break;
73 case DS_SNMP_AGENT:
74 if (dcTarget->getObjectClass() == OBJECT_NODE)
75 *error = ((Node *)dcTarget)->getItemFromSNMP(pItem->getSnmpPort(), pItem->getName(), MAX_LINE_SIZE,
76 pBuffer, pItem->isInterpretSnmpRawValue() ? (int)pItem->getSnmpRawValueType() : SNMP_RAWTYPE_NONE);
77 else
78 *error = DCE_NOT_SUPPORTED;
79 break;
80 case DS_CHECKPOINT_AGENT:
81 if (dcTarget->getObjectClass() == OBJECT_NODE)
82 *error = ((Node *)dcTarget)->getItemFromCheckPointSNMP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
83 else
84 *error = DCE_NOT_SUPPORTED;
85 break;
86 case DS_NATIVE_AGENT:
87 if (dcTarget->getObjectClass() == OBJECT_NODE)
88 *error = ((Node *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer);
89 else
90 *error = DCE_NOT_SUPPORTED;
91 break;
92 case DS_WINPERF:
93 if (dcTarget->getObjectClass() == OBJECT_NODE)
94 {
95 TCHAR name[MAX_PARAM_NAME];
96 _sntprintf(name, MAX_PARAM_NAME, _T("PDH.CounterValue(\"%s\",%d)"), pItem->getName(), pItem->getSampleCount());
97 *error = ((Node *)dcTarget)->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer);
98 }
99 else
100 {
101 *error = DCE_NOT_SUPPORTED;
102 }
103 break;
104 case DS_SMCLP:
105 if (dcTarget->getObjectClass() == OBJECT_NODE)
106 {
107 *error = ((Node *)dcTarget)->getItemFromSMCLP(pItem->getName(), MAX_LINE_SIZE, pBuffer);
108 }
109 else
110 {
111 *error = DCE_NOT_SUPPORTED;
112 }
113 break;
114 case DS_SCRIPT:
115 *error = dcTarget->getScriptItem(pItem->getName(), MAX_LINE_SIZE, pBuffer);
116 break;
117 default:
118 *error = DCE_NOT_SUPPORTED;
119 break;
120 }
121 }
122 return pBuffer;
123 }
124
125 /**
126 * Collect data for table
127 */
128 static void *GetTableData(DataCollectionTarget *dcTarget, DCTable *table, UINT32 *error)
129 {
130 Table *result = NULL;
131 if (dcTarget->getObjectClass() == OBJECT_CLUSTER)
132 {
133 if (table->isAggregateOnCluster())
134 {
135 *error = ((Cluster *)dcTarget)->collectAggregatedData(table, &result);
136 }
137 else
138 {
139 *error = DCE_IGNORE;
140 }
141 }
142 else
143 {
144 switch(table->getDataSource())
145 {
146 case DS_NATIVE_AGENT:
147 if (dcTarget->getObjectClass() == OBJECT_NODE)
148 {
149 *error = ((Node *)dcTarget)->getTableFromAgent(table->getName(), &result);
150 if ((*error == DCE_SUCCESS) && (result != NULL))
151 table->updateResultColumns(result);
152 }
153 else
154 {
155 *error = DCE_NOT_SUPPORTED;
156 }
157 break;
158 case DS_SNMP_AGENT:
159 if (dcTarget->getObjectClass() == OBJECT_NODE)
160 {
161 *error = ((Node *)dcTarget)->getTableFromSNMP(table->getSnmpPort(), table->getName(), table->getColumns(), &result);
162 if ((*error == DCE_SUCCESS) && (result != NULL))
163 table->updateResultColumns(result);
164 }
165 else
166 {
167 *error = DCE_NOT_SUPPORTED;
168 }
169 break;
170 default:
171 *error = DCE_NOT_SUPPORTED;
172 break;
173 }
174 }
175 return result;
176 }
177
178 /**
179 * Data collector
180 */
181 static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
182 {
183 UINT32 dwError;
184
185 TCHAR *pBuffer = (TCHAR *)malloc(MAX_LINE_SIZE * sizeof(TCHAR));
186 while(!IsShutdownInProgress())
187 {
188 DCObject *pItem = (DCObject *)g_dataCollectionQueue.getOrBlock();
189 DataCollectionTarget *target = (DataCollectionTarget *)pItem->getTarget();
190
191 if (pItem->isScheduledForDeletion())
192 {
193 DbgPrintf(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
194 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1);
195 pItem->deleteFromDatabase();
196 delete pItem;
197 continue;
198 }
199
200 DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d proxy=%d"),
201 pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getProxyNode());
202 if (pItem->getProxyNode() != 0)
203 {
204 NetObj *object = FindObjectById(pItem->getProxyNode(), OBJECT_NODE);
205 if (object != NULL)
206 {
207 if (object->isTrustedNode((target != NULL) ? target->getId() : 0))
208 {
209 target = (Node *)object;
210 target->incRefCount();
211 }
212 else
213 {
214 // Change item's status to _T("not supported")
215 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
216
217 if (target != NULL)
218 {
219 target->decRefCount();
220 target = NULL;
221 }
222 }
223 }
224 else
225 {
226 if (target != NULL)
227 {
228 target->decRefCount();
229 target = NULL;
230 }
231 }
232 }
233
234 time_t currTime = time(NULL);
235 if (target != NULL)
236 {
237 void *data;
238
239 switch(pItem->getType())
240 {
241 case DCO_TYPE_ITEM:
242 data = GetItemData(target, (DCItem *)pItem, pBuffer, &dwError);
243 break;
244 case DCO_TYPE_TABLE:
245 data = GetTableData(target, (DCTable *)pItem, &dwError);
246 break;
247 default:
248 data = NULL;
249 dwError = DCE_NOT_SUPPORTED;
250 break;
251 }
252
253 // Transform and store received value into database or handle error
254 switch(dwError)
255 {
256 case DCE_SUCCESS:
257 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
258 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
259 if (!((DataCollectionTarget *)pItem->getTarget())->processNewDCValue(pItem, currTime, data))
260 {
261 // value processing failed, convert to data collection error
262 pItem->processNewError();
263 }
264 break;
265 case DCE_COMM_ERROR:
266 if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
267 pItem->setStatus(ITEM_STATUS_ACTIVE, true);
268 pItem->processNewError();
269 break;
270 case DCE_NOT_SUPPORTED:
271 // Change item's status
272 pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
273 break;
274 }
275
276 // Decrement node's usage counter
277 target->decRefCount();
278 if ((pItem->getProxyNode() != 0) && (pItem->getTarget() != NULL))
279 {
280 pItem->getTarget()->decRefCount();
281 }
282 }
283 else /* target == NULL */
284 {
285 Template *n = pItem->getTarget();
286 DbgPrintf(3, _T("*** DataCollector: Attempt to collect information for non-existing node (DCI=%d \"%s\" target=%d proxy=%d)"),
287 pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, pItem->getProxyNode());
288 }
289
290 // Update item's last poll time and clear busy flag so item can be polled again
291 pItem->setLastPollTime(currTime);
292 pItem->setBusyFlag(FALSE);
293 }
294
295 free(pBuffer);
296 DbgPrintf(1, _T("Data collector thread terminated"));
297 return THREAD_OK;
298 }
299
300 /**
301 * Callback for queueing DCIs
302 */
303 static void QueueItems(NetObj *object, void *data)
304 {
305 DbgPrintf(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
306 object->getName(), object->getId());
307 ((DataCollectionTarget *)object)->queueItemsForPolling(&g_dataCollectionQueue);
308 }
309
310 /**
311 * Item poller thread: check nodes' items and put into the
312 * data collector queue when data polling required
313 */
314 static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
315 {
316 UINT32 dwSum, dwWatchdogId, currPos = 0;
317 UINT32 dwTimingHistory[60 / ITEM_POLLING_INTERVAL];
318 INT64 qwStart;
319
320 dwWatchdogId = WatchdogAddThread(_T("Item Poller"), 20);
321 memset(dwTimingHistory, 0, sizeof(UINT32) * (60 / ITEM_POLLING_INTERVAL));
322
323 while(!IsShutdownInProgress())
324 {
325 if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL))
326 break; // Shutdown has arrived
327 WatchdogNotify(dwWatchdogId);
328 DbgPrintf(8, _T("ItemPoller: wakeup"));
329
330 qwStart = GetCurrentTimeMs();
331 g_idxNodeById.forEach(QueueItems, NULL);
332 g_idxClusterById.forEach(QueueItems, NULL);
333 g_idxMobileDeviceById.forEach(QueueItems, NULL);
334
335 // Save last poll time
336 dwTimingHistory[currPos] = (UINT32)(GetCurrentTimeMs() - qwStart);
337 currPos++;
338 if (currPos == (60 / ITEM_POLLING_INTERVAL))
339 currPos = 0;
340
341 // Calculate new average for last minute
342 dwSum = 0;
343 for(int i = 0; i < (60 / ITEM_POLLING_INTERVAL); i++)
344 dwSum += dwTimingHistory[i];
345 g_dwAvgDCIQueuingTime = dwSum / (60 / ITEM_POLLING_INTERVAL);
346 }
347 DbgPrintf(1, _T("Item poller thread terminated"));
348 return THREAD_OK;
349 }
350
351 /**
352 * Statistics collection thread
353 */
354 static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
355 {
356 UINT32 i, currPos = 0;
357 UINT32 pollerQS[12], dbWriterQS[12];
358 UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
359 UINT32 syslogProcessingQS[12], syslogWriterQS[12];
360 double sum1, sum2, sum3, sum4, sum5, sum8, sum9;
361
362 memset(pollerQS, 0, sizeof(UINT32) * 12);
363 memset(dbWriterQS, 0, sizeof(UINT32) * 12);
364 memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
365 memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
366 memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
367 memset(syslogProcessingQS, 0, sizeof(UINT32) * 12);
368 memset(syslogWriterQS, 0, sizeof(UINT32) * 12);
369 g_dAvgPollerQueueSize = 0;
370 g_dAvgDBWriterQueueSize = 0;
371 g_dAvgIDataWriterQueueSize = 0;
372 g_dAvgRawDataWriterQueueSize = 0;
373 g_dAvgDBAndIDataWriterQueueSize = 0;
374 g_dAvgSyslogProcessingQueueSize = 0;
375 g_dAvgSyslogWriterQueueSize = 0;
376 while(!IsShutdownInProgress())
377 {
378 if (SleepAndCheckForShutdown(5))
379 break; // Shutdown has arrived
380
381 // Get current values
382 pollerQS[currPos] = g_dataCollectionQueue.size();
383 dbWriterQS[currPos] = g_dbWriterQueue->size();
384 iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
385 rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
386 dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size();
387 syslogProcessingQS[currPos] = g_syslogProcessingQueue.size();
388 syslogWriterQS[currPos] = g_syslogWriteQueue.size();
389 currPos++;
390 if (currPos == 12)
391 currPos = 0;
392
393 // Calculate new averages
394 for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
395 {
396 sum1 += pollerQS[i];
397 sum2 += dbWriterQS[i];
398 sum3 += iDataWriterQS[i];
399 sum4 += rawDataWriterQS[i];
400 sum5 += dbAndIDataWriterQS[i];
401 sum8 += syslogProcessingQS[i];
402 sum9 += syslogWriterQS[i];
403 }
404 g_dAvgPollerQueueSize = sum1 / 12;
405 g_dAvgDBWriterQueueSize = sum2 / 12;
406 g_dAvgIDataWriterQueueSize = sum3 / 12;
407 g_dAvgRawDataWriterQueueSize = sum4 / 12;
408 g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
409 g_dAvgSyslogProcessingQueueSize = sum8 / 12;
410 g_dAvgSyslogWriterQueueSize = sum9 / 12;
411 }
412 return THREAD_OK;
413 }
414
415 /**
416 * DCI cache loader
417 */
418 THREAD_RESULT THREAD_CALL CacheLoader(void *arg)
419 {
420 DbgPrintf(2, _T("DCI cache loader thread started"));
421 while(true)
422 {
423 DCItem *dci = (DCItem *)g_dciCacheLoaderQueue.getOrBlock();
424 if (dci == INVALID_POINTER_VALUE)
425 break;
426
427 DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"),
428 dci->getName(), dci->getId(), dci->getNode()->getName(), dci->getNode()->getId());
429 dci->reloadCache();
430 dci->getNode()->decRefCount();
431 }
432 DbgPrintf(2, _T("DCI cache loader thread stopped"));
433 return THREAD_OK;
434 }
435
436 /**
437 * Initialize data collection subsystem
438 */
439 BOOL InitDataCollector()
440 {
441 int i, iNumCollectors;
442
443 // Start data collection threads
444 iNumCollectors = ConfigReadInt(_T("NumberOfDataCollectors"), 10);
445 for(i = 0; i < iNumCollectors; i++)
446 ThreadCreate(DataCollector, 0, NULL);
447
448 ThreadCreate(ItemPoller, 0, NULL);
449 ThreadCreate(StatCollector, 0, NULL);
450 ThreadCreate(CacheLoader, 0, NULL);
451
452 return TRUE;
453 }
454
455 /**
456 * Update parameter list from node
457 */
458 static void UpdateParamList(NetObj *object, void *data)
459 {
460 ObjectArray<AgentParameterDefinition> *fullList = (ObjectArray<AgentParameterDefinition> *)data;
461
462 ObjectArray<AgentParameterDefinition> *paramList;
463 ((Node *)object)->openParamList(&paramList);
464 if ((paramList != NULL) && (paramList->size() > 0))
465 {
466 for(int i = 0; i < paramList->size(); i++)
467 {
468 int j;
469 for(j = 0; j < fullList->size(); j++)
470 {
471 if (!_tcsicmp(paramList->get(i)->getName(), fullList->get(j)->getName()))
472 break;
473 }
474
475 if (j == fullList->size())
476 {
477 fullList->add(new AgentParameterDefinition(paramList->get(i)));
478 }
479 }
480 }
481 ((Node *)object)->closeParamList();
482 }
483
484 /**
485 * Update table list from node
486 */
487 static void UpdateTableList(NetObj *object, void *data)
488 {
489 ObjectArray<AgentTableDefinition> *fullList = (ObjectArray<AgentTableDefinition> *)data;
490
491 ObjectArray<AgentTableDefinition> *tableList;
492 ((Node *)object)->openTableList(&tableList);
493 if ((tableList != NULL) && (tableList->size() > 0))
494 {
495 for(int i = 0; i < tableList->size(); i++)
496 {
497 int j;
498 for(j = 0; j < fullList->size(); j++)
499 {
500 if (!_tcsicmp(tableList->get(i)->getName(), fullList->get(j)->getName()))
501 break;
502 }
503
504 if (j == fullList->size())
505 {
506 fullList->add(new AgentTableDefinition(tableList->get(i)));
507 }
508 }
509 }
510 ((Node *)object)->closeTableList();
511 }
512
513 /**
514 * Write full (from all nodes) agent parameters list to NXCP message
515 */
516 void WriteFullParamListToMessage(NXCPMessage *pMsg, WORD flags)
517 {
518 // Gather full parameter list
519 if (flags & 0x01)
520 {
521 ObjectArray<AgentParameterDefinition> fullList(64, 64, true);
522 g_idxNodeById.forEach(UpdateParamList, &fullList);
523
524 // Put list into the message
525 pMsg->setField(VID_NUM_PARAMETERS, (UINT32)fullList.size());
526 UINT32 varId = VID_PARAM_LIST_BASE;
527 for(int i = 0; i < fullList.size(); i++)
528 {
529 varId += fullList.get(i)->fillMessage(pMsg, varId);
530 }
531 }
532
533 // Gather full table list
534 if (flags & 0x02)
535 {
536 ObjectArray<AgentTableDefinition> fullList(64, 64, true);
537 g_idxNodeById.forEach(UpdateTableList, &fullList);
538
539 // Put list into the message
540 pMsg->setField(VID_NUM_TABLES, (UINT32)fullList.size());
541 UINT32 varId = VID_TABLE_LIST_BASE;
542 for(int i = 0; i < fullList.size(); i++)
543 {
544 varId += fullList.get(i)->fillMessage(pMsg, varId);
545 }
546 }
547 }
548
549 /**
550 * Get type of data collection object
551 */
552 int GetDCObjectType(UINT32 nodeId, UINT32 dciId)
553 {
554 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
555 if (node != NULL)
556 {
557 DCObject *dco = node->getDCObjectById(dciId);
558 if (dco != NULL)
559 {
560 return dco->getType();
561 }
562 }
563 return DCO_TYPE_ITEM; // default
564 }