Commit | Line | Data |
---|---|---|
6da535d9 | 1 | /* |
5039dede | 2 | ** NetXMS - Network Management System |
2885ad2e | 3 | ** Copyright (C) 2003-2017 Victor Kirhenshtein |
5039dede AK |
4 | ** |
5 | ** This program is free software; you can redistribute it and/or modify | |
6 | ** it under the terms of the GNU General Public License as published by | |
7 | ** the Free Software Foundation; either version 2 of the License, or | |
8 | ** (at your option) any later version. | |
9 | ** | |
10 | ** This program is distributed in the hope that it will be useful, | |
11 | ** but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 | ** GNU General Public License for more details. | |
14 | ** | |
15 | ** You should have received a copy of the GNU General Public License | |
16 | ** along with this program; if not, write to the Free Software | |
17 | ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. | |
18 | ** | |
19 | ** File: datacoll.cpp | |
20 | ** | |
21 | **/ | |
22 | ||
23 | #include "nxcore.h" | |
24 | ||
46ee6286 VK |
25 | /** |
26 | * Interval between DCI polling | |
27 | */ | |
6da535d9 | 28 | #define ITEM_POLLING_INTERVAL 1 |
5039dede | 29 | |
46ee6286 VK |
30 | /** |
31 | * Externals | |
32 | */ | |
f1784ab6 VK |
33 | extern Queue g_syslogProcessingQueue; |
34 | extern Queue g_syslogWriteQueue; | |
b239e165 VK |
35 | extern ThreadPool *g_pollerThreadPool; |
36 | ||
37 | /** | |
38 | * Thread pool for data collectors | |
39 | */ | |
40 | ThreadPool *g_dataCollectorThreadPool = NULL; | |
5039dede | 41 | |
6fd6de0a VK |
42 | /** |
43 | * Global data | |
44 | */ | |
b239e165 | 45 | double g_dAvgDataCollectorQueueSize = 0; |
5039dede AK |
46 | double g_dAvgPollerQueueSize = 0; |
47 | double g_dAvgDBWriterQueueSize = 0; | |
cf084617 | 48 | double g_dAvgIDataWriterQueueSize = 0; |
76fcb995 | 49 | double g_dAvgRawDataWriterQueueSize = 0; |
cf084617 | 50 | double g_dAvgDBAndIDataWriterQueueSize = 0; |
f1784ab6 VK |
51 | double g_dAvgSyslogProcessingQueueSize = 0; |
52 | double g_dAvgSyslogWriterQueueSize = 0; | |
967893bb | 53 | UINT32 g_dwAvgDCIQueuingTime = 0; |
d140955e | 54 | Queue g_dciCacheLoaderQueue; |
5039dede | 55 | |
171c2fd6 VK |
56 | /** |
57 | * Collect data for DCI | |
58 | */ | |
967893bb | 59 | static void *GetItemData(DataCollectionTarget *dcTarget, DCItem *pItem, TCHAR *pBuffer, UINT32 *error) |
df94e0ce | 60 | { |
c42b4551 | 61 | if (dcTarget->getObjectClass() == OBJECT_CLUSTER) |
df94e0ce | 62 | { |
85ae39bc VK |
63 | if (pItem->isAggregateOnCluster()) |
64 | { | |
65 | *error = ((Cluster *)dcTarget)->collectAggregatedData(pItem, pBuffer); | |
66 | } | |
67 | else | |
68 | { | |
3cd6f508 | 69 | *error = DCE_IGNORE; |
85ae39bc VK |
70 | } |
71 | } | |
72 | else | |
73 | { | |
74 | switch(pItem->getDataSource()) | |
75 | { | |
76 | case DS_INTERNAL: // Server internal parameters (like status) | |
77 | *error = dcTarget->getInternalItem(pItem->getName(), MAX_LINE_SIZE, pBuffer); | |
78 | break; | |
79 | case DS_SNMP_AGENT: | |
c42b4551 | 80 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
6da535d9 | 81 | *error = ((Node *)dcTarget)->getItemFromSNMP(pItem->getSnmpPort(), pItem->getName(), MAX_LINE_SIZE, |
85ae39bc VK |
82 | pBuffer, pItem->isInterpretSnmpRawValue() ? (int)pItem->getSnmpRawValueType() : SNMP_RAWTYPE_NONE); |
83 | else | |
84 | *error = DCE_NOT_SUPPORTED; | |
85 | break; | |
86 | case DS_CHECKPOINT_AGENT: | |
c42b4551 | 87 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
85ae39bc VK |
88 | *error = ((Node *)dcTarget)->getItemFromCheckPointSNMP(pItem->getName(), MAX_LINE_SIZE, pBuffer); |
89 | else | |
90 | *error = DCE_NOT_SUPPORTED; | |
91 | break; | |
92 | case DS_NATIVE_AGENT: | |
c42b4551 | 93 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
85ae39bc | 94 | *error = ((Node *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer); |
ce9e00cc VK |
95 | else if (dcTarget->getObjectClass() == OBJECT_SENSOR) |
96 | *error = ((Sensor *)dcTarget)->getItemFromAgent(pItem->getName(), MAX_LINE_SIZE, pBuffer); | |
85ae39bc VK |
97 | else |
98 | *error = DCE_NOT_SUPPORTED; | |
99 | break; | |
100 | case DS_WINPERF: | |
c42b4551 | 101 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
85ae39bc VK |
102 | { |
103 | TCHAR name[MAX_PARAM_NAME]; | |
1d919454 | 104 | _sntprintf(name, MAX_PARAM_NAME, _T("PDH.CounterValue(\"%s\",%d)"), (const TCHAR *)EscapeStringForAgent(pItem->getName()), pItem->getSampleCount()); |
85ae39bc VK |
105 | *error = ((Node *)dcTarget)->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer); |
106 | } | |
107 | else | |
108 | { | |
109 | *error = DCE_NOT_SUPPORTED; | |
110 | } | |
111 | break; | |
241541f4 VK |
112 | case DS_SSH: |
113 | if (dcTarget->getObjectClass() == OBJECT_NODE) | |
114 | { | |
115 | UINT32 proxyId = ((Node *)dcTarget)->getSshProxy(); | |
116 | if (proxyId == 0) | |
117 | { | |
118 | if (IsZoningEnabled()) | |
119 | { | |
a191c634 | 120 | Zone *zone = FindZoneByUIN(((Node *)dcTarget)->getZoneUIN()); |
43b62436 VK |
121 | if ((zone != NULL) && (zone->getProxyNodeId() != 0)) |
122 | proxyId = zone->getProxyNodeId(); | |
241541f4 VK |
123 | else |
124 | proxyId = g_dwMgmtNode; | |
125 | } | |
126 | else | |
127 | { | |
128 | proxyId = g_dwMgmtNode; | |
129 | } | |
130 | } | |
131 | Node *proxy = (Node *)FindObjectById(proxyId, OBJECT_NODE); | |
132 | if (proxy != NULL) | |
133 | { | |
134 | TCHAR name[MAX_PARAM_NAME], ipAddr[64]; | |
135 | _sntprintf(name, MAX_PARAM_NAME, _T("SSH.Command(%s,\"%s\",\"%s\",\"%s\")"), | |
136 | ((Node *)dcTarget)->getIpAddress().toString(ipAddr), | |
137 | (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshLogin()), | |
138 | (const TCHAR *)EscapeStringForAgent(((Node *)dcTarget)->getSshPassword()), | |
139 | (const TCHAR *)EscapeStringForAgent(pItem->getName())); | |
140 | *error = proxy->getItemFromAgent(name, MAX_LINE_SIZE, pBuffer); | |
141 | } | |
142 | else | |
143 | { | |
144 | *error = DCE_COMM_ERROR; | |
145 | } | |
146 | } | |
147 | else | |
148 | { | |
149 | *error = DCE_NOT_SUPPORTED; | |
150 | } | |
151 | break; | |
1d0d82b3 | 152 | case DS_SMCLP: |
c42b4551 | 153 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
03bc96df | 154 | { |
1d0d82b3 | 155 | *error = ((Node *)dcTarget)->getItemFromSMCLP(pItem->getName(), MAX_LINE_SIZE, pBuffer); |
03bc96df | 156 | } |
85ae39bc VK |
157 | else |
158 | { | |
159 | *error = DCE_NOT_SUPPORTED; | |
160 | } | |
17b1ab4a VK |
161 | break; |
162 | case DS_SCRIPT: | |
7a69b18b | 163 | *error = dcTarget->getScriptItem(pItem->getName(), MAX_LINE_SIZE, pBuffer, (DataCollectionTarget *)pItem->getOwner()); |
85ae39bc VK |
164 | break; |
165 | default: | |
166 | *error = DCE_NOT_SUPPORTED; | |
167 | break; | |
168 | } | |
df94e0ce VK |
169 | } |
170 | return pBuffer; | |
171 | } | |
172 | ||
171c2fd6 VK |
173 | /** |
174 | * Collect data for table | |
175 | */ | |
967893bb | 176 | static void *GetTableData(DataCollectionTarget *dcTarget, DCTable *table, UINT32 *error) |
df94e0ce VK |
177 | { |
178 | Table *result = NULL; | |
c42b4551 | 179 | if (dcTarget->getObjectClass() == OBJECT_CLUSTER) |
a0ddfb29 VK |
180 | { |
181 | if (table->isAggregateOnCluster()) | |
182 | { | |
183 | *error = ((Cluster *)dcTarget)->collectAggregatedData(table, &result); | |
184 | } | |
185 | else | |
186 | { | |
3cd6f508 | 187 | *error = DCE_IGNORE; |
a0ddfb29 VK |
188 | } |
189 | } | |
190 | else | |
df94e0ce | 191 | { |
a0ddfb29 VK |
192 | switch(table->getDataSource()) |
193 | { | |
194 | case DS_NATIVE_AGENT: | |
c42b4551 | 195 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
a0ddfb29 VK |
196 | { |
197 | *error = ((Node *)dcTarget)->getTableFromAgent(table->getName(), &result); | |
198 | if ((*error == DCE_SUCCESS) && (result != NULL)) | |
199 | table->updateResultColumns(result); | |
200 | } | |
201 | else | |
202 | { | |
203 | *error = DCE_NOT_SUPPORTED; | |
204 | } | |
205 | break; | |
db117859 | 206 | case DS_SNMP_AGENT: |
c42b4551 | 207 | if (dcTarget->getObjectClass() == OBJECT_NODE) |
db117859 VK |
208 | { |
209 | *error = ((Node *)dcTarget)->getTableFromSNMP(table->getSnmpPort(), table->getName(), table->getColumns(), &result); | |
210 | if ((*error == DCE_SUCCESS) && (result != NULL)) | |
211 | table->updateResultColumns(result); | |
212 | } | |
213 | else | |
214 | { | |
215 | *error = DCE_NOT_SUPPORTED; | |
216 | } | |
3f61dbd4 VK |
217 | break; |
218 | case DS_SCRIPT: | |
7a69b18b | 219 | *error = dcTarget->getScriptTable(table->getName(), &result, (DataCollectionTarget *)table->getOwner()); |
db117859 | 220 | break; |
a0ddfb29 VK |
221 | default: |
222 | *error = DCE_NOT_SUPPORTED; | |
223 | break; | |
224 | } | |
225 | } | |
df94e0ce VK |
226 | return result; |
227 | } | |
228 | ||
6fd6de0a VK |
229 | /** |
230 | * Data collector | |
231 | */ | |
b239e165 | 232 | void DataCollector(void *arg) |
5039dede | 233 | { |
b239e165 VK |
234 | DCObject *pItem = static_cast<DCObject*>(arg); |
235 | DataCollectionTarget *target = static_cast<DataCollectionTarget*>(pItem->getOwner()); | |
5039dede | 236 | |
b239e165 | 237 | if (pItem->isScheduledForDeletion()) |
5039dede | 238 | { |
b239e165 VK |
239 | nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"), |
240 | pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1); | |
241 | pItem->deleteFromDatabase(); | |
242 | delete pItem; | |
243 | target->decRefCount(); | |
244 | return; | |
245 | } | |
ced80327 | 246 | |
b239e165 VK |
247 | if (target == NULL) |
248 | { | |
249 | nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"), | |
250 | pItem->getId(), pItem->getName()); | |
46e2b370 | 251 | |
b239e165 VK |
252 | // Update item's last poll time and clear busy flag so item can be polled again |
253 | pItem->setLastPollTime(time(NULL)); | |
254 | pItem->clearBusyFlag(); | |
255 | return; | |
256 | } | |
46e2b370 | 257 | |
b239e165 VK |
258 | DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"), |
259 | pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getSourceNode()); | |
260 | UINT32 sourceNodeId = target->getEffectiveSourceNode(pItem); | |
261 | if (sourceNodeId != 0) | |
262 | { | |
263 | Node *sourceNode = (Node *)FindObjectById(sourceNodeId, OBJECT_NODE); | |
264 | if (sourceNode != NULL) | |
265 | { | |
266 | if (((target->getObjectClass() == OBJECT_CHASSIS) && (((Chassis *)target)->getControllerId() == sourceNodeId)) || | |
267 | sourceNode->isTrustedNode(target->getId())) | |
268 | { | |
269 | target = sourceNode; | |
270 | target->incRefCount(); | |
271 | } | |
272 | else | |
273 | { | |
274 | // Change item's status to "not supported" | |
275 | pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true); | |
46e2b370 VK |
276 | target->decRefCount(); |
277 | target = NULL; | |
b239e165 VK |
278 | } |
279 | } | |
280 | else | |
281 | { | |
282 | target->decRefCount(); | |
283 | target = NULL; | |
284 | } | |
285 | } | |
5039dede | 286 | |
b239e165 VK |
287 | time_t currTime = time(NULL); |
288 | if (target != NULL) | |
289 | { | |
290 | if (!IsShutdownInProgress()) | |
5039dede | 291 | { |
b239e165 VK |
292 | void *data; |
293 | TCHAR buffer[MAX_LINE_SIZE]; | |
294 | UINT32 error; | |
295 | switch(pItem->getType()) | |
5039dede | 296 | { |
b239e165 VK |
297 | case DCO_TYPE_ITEM: |
298 | data = GetItemData(target, (DCItem *)pItem, buffer, &error); | |
299 | break; | |
300 | case DCO_TYPE_TABLE: | |
301 | data = GetTableData(target, (DCTable *)pItem, &error); | |
302 | break; | |
303 | default: | |
304 | data = NULL; | |
305 | error = DCE_NOT_SUPPORTED; | |
306 | break; | |
307 | } | |
88dc9091 | 308 | |
b239e165 VK |
309 | // Transform and store received value into database or handle error |
310 | switch(error) | |
311 | { | |
312 | case DCE_SUCCESS: | |
313 | if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED) | |
314 | pItem->setStatus(ITEM_STATUS_ACTIVE, true); | |
315 | if (!((DataCollectionTarget *)pItem->getOwner())->processNewDCValue(pItem, currTime, data)) | |
316 | { | |
317 | // value processing failed, convert to data collection error | |
a8164c20 | 318 | pItem->processNewError(false); |
b239e165 VK |
319 | } |
320 | break; | |
321 | case DCE_COLLECTION_ERROR: | |
322 | if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED) | |
323 | pItem->setStatus(ITEM_STATUS_ACTIVE, true); | |
324 | pItem->processNewError(false); | |
325 | break; | |
326 | case DCE_NO_SUCH_INSTANCE: | |
327 | if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED) | |
328 | pItem->setStatus(ITEM_STATUS_ACTIVE, true); | |
329 | pItem->processNewError(true); | |
330 | break; | |
331 | case DCE_COMM_ERROR: | |
332 | pItem->processNewError(false); | |
333 | break; | |
334 | case DCE_NOT_SUPPORTED: | |
335 | // Change item's status | |
336 | pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true); | |
337 | break; | |
5039dede AK |
338 | } |
339 | ||
6da535d9 EJ |
340 | // Send session notification when force poll is performed |
341 | if (pItem->getPollingSession() != NULL) | |
342 | { | |
343 | ClientSession *session = pItem->processForcePoll(); | |
690f7fa5 | 344 | session->notify(NX_NOTIFY_FORCE_DCI_POLL, pItem->getOwnerId()); |
6da535d9 EJ |
345 | session->decRefCount(); |
346 | } | |
5039dede | 347 | } |
b239e165 VK |
348 | |
349 | // Decrement node's usage counter | |
350 | target->decRefCount(); | |
351 | if ((pItem->getSourceNode() != 0) && (pItem->getOwner() != NULL)) | |
5039dede | 352 | { |
b239e165 | 353 | pItem->getOwner()->decRefCount(); |
5039dede | 354 | } |
b239e165 VK |
355 | } |
356 | else /* target == NULL */ | |
357 | { | |
358 | Template *n = pItem->getOwner(); | |
359 | nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"), | |
360 | pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, sourceNodeId); | |
5039dede AK |
361 | } |
362 | ||
b239e165 VK |
363 | // Update item's last poll time and clear busy flag so item can be polled again |
364 | pItem->setLastPollTime(currTime); | |
365 | pItem->clearBusyFlag(); | |
5039dede AK |
366 | } |
367 | ||
6fd6de0a VK |
368 | /** |
369 | * Callback for queueing DCIs | |
370 | */ | |
6aba3998 VK |
371 | static void QueueItems(NetObj *object, void *data) |
372 | { | |
88dc9091 VK |
373 | if (IsShutdownInProgress()) |
374 | return; | |
375 | ||
fa179b75 | 376 | WatchdogNotify(*((UINT32 *)data)); |
2885ad2e VK |
377 | nxlog_debug(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"), |
378 | object->getName(), object->getId()); | |
b239e165 | 379 | ((DataCollectionTarget *)object)->queueItemsForPolling(); |
6aba3998 VK |
380 | } |
381 | ||
6fd6de0a | 382 | /** |
6da535d9 | 383 | * Item poller thread: check nodes' items and put into the |
6fd6de0a VK |
384 | * data collector queue when data polling required |
385 | */ | |
5039dede AK |
386 | static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg) |
387 | { | |
930a2a62 VK |
388 | ThreadSetName("ItemPoller"); |
389 | ||
3c15eedb | 390 | UINT32 dwSum, currPos = 0; |
967893bb | 391 | UINT32 dwTimingHistory[60 / ITEM_POLLING_INTERVAL]; |
5039dede AK |
392 | INT64 qwStart; |
393 | ||
fa179b75 | 394 | UINT32 watchdogId = WatchdogAddThread(_T("Item Poller"), 10); |
967893bb | 395 | memset(dwTimingHistory, 0, sizeof(UINT32) * (60 / ITEM_POLLING_INTERVAL)); |
5039dede | 396 | |
89135050 | 397 | while(!IsShutdownInProgress()) |
5039dede AK |
398 | { |
399 | if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL)) | |
400 | break; // Shutdown has arrived | |
3c15eedb | 401 | WatchdogNotify(watchdogId); |
5039dede AK |
402 | DbgPrintf(8, _T("ItemPoller: wakeup")); |
403 | ||
5039dede | 404 | qwStart = GetCurrentTimeMs(); |
fa179b75 VK |
405 | g_idxNodeById.forEach(QueueItems, &watchdogId); |
406 | g_idxClusterById.forEach(QueueItems, &watchdogId); | |
407 | g_idxMobileDeviceById.forEach(QueueItems, &watchdogId); | |
408 | g_idxChassisById.forEach(QueueItems, &watchdogId); | |
ce9e00cc | 409 | g_idxSensorById.forEach(QueueItems, &watchdogId); |
5039dede AK |
410 | |
411 | // Save last poll time | |
f1784ab6 VK |
412 | dwTimingHistory[currPos] = (UINT32)(GetCurrentTimeMs() - qwStart); |
413 | currPos++; | |
414 | if (currPos == (60 / ITEM_POLLING_INTERVAL)) | |
415 | currPos = 0; | |
5039dede AK |
416 | |
417 | // Calculate new average for last minute | |
6aba3998 VK |
418 | dwSum = 0; |
419 | for(int i = 0; i < (60 / ITEM_POLLING_INTERVAL); i++) | |
5039dede AK |
420 | dwSum += dwTimingHistory[i]; |
421 | g_dwAvgDCIQueuingTime = dwSum / (60 / ITEM_POLLING_INTERVAL); | |
422 | } | |
35f836fe | 423 | DbgPrintf(1, _T("Item poller thread terminated")); |
5039dede AK |
424 | return THREAD_OK; |
425 | } | |
426 | ||
6fd6de0a VK |
427 | /** |
428 | * Statistics collection thread | |
429 | */ | |
5039dede AK |
430 | static THREAD_RESULT THREAD_CALL StatCollector(void *pArg) |
431 | { | |
930a2a62 VK |
432 | ThreadSetName("StatCollector"); |
433 | ||
f1784ab6 | 434 | UINT32 i, currPos = 0; |
b239e165 | 435 | UINT32 pollerQS[12], dataCollectorQS[12], dbWriterQS[12]; |
76fcb995 | 436 | UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12]; |
f1784ab6 | 437 | UINT32 syslogProcessingQS[12], syslogWriterQS[12]; |
b239e165 | 438 | double sum1, sum2, sum3, sum4, sum5, sum8, sum9, sum10; |
5039dede | 439 | |
f1784ab6 | 440 | memset(pollerQS, 0, sizeof(UINT32) * 12); |
b239e165 | 441 | memset(dataCollectorQS, 0, sizeof(UINT32) * 12); |
f1784ab6 VK |
442 | memset(dbWriterQS, 0, sizeof(UINT32) * 12); |
443 | memset(iDataWriterQS, 0, sizeof(UINT32) * 12); | |
76fcb995 | 444 | memset(rawDataWriterQS, 0, sizeof(UINT32) * 12); |
f1784ab6 | 445 | memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12); |
f1784ab6 VK |
446 | memset(syslogProcessingQS, 0, sizeof(UINT32) * 12); |
447 | memset(syslogWriterQS, 0, sizeof(UINT32) * 12); | |
b239e165 | 448 | g_dAvgDataCollectorQueueSize = 0; |
5039dede | 449 | g_dAvgDBWriterQueueSize = 0; |
cf084617 | 450 | g_dAvgIDataWriterQueueSize = 0; |
76fcb995 | 451 | g_dAvgRawDataWriterQueueSize = 0; |
cf084617 | 452 | g_dAvgDBAndIDataWriterQueueSize = 0; |
f1784ab6 VK |
453 | g_dAvgSyslogProcessingQueueSize = 0; |
454 | g_dAvgSyslogWriterQueueSize = 0; | |
b239e165 | 455 | g_dAvgPollerQueueSize = 0; |
89135050 | 456 | while(!IsShutdownInProgress()) |
5039dede AK |
457 | { |
458 | if (SleepAndCheckForShutdown(5)) | |
459 | break; // Shutdown has arrived | |
460 | ||
461 | // Get current values | |
b239e165 VK |
462 | ThreadPoolInfo poolInfo; |
463 | ThreadPoolGetInfo(g_dataCollectorThreadPool, &poolInfo); | |
464 | dataCollectorQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0; | |
465 | ||
466 | ThreadPoolGetInfo(g_pollerThreadPool, &poolInfo); | |
467 | pollerQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0; | |
468 | ||
19dbc8ef VK |
469 | dbWriterQS[currPos] = g_dbWriterQueue->size(); |
470 | iDataWriterQS[currPos] = g_dciDataWriterQueue->size(); | |
471 | rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size(); | |
472 | dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size(); | |
19dbc8ef VK |
473 | syslogProcessingQS[currPos] = g_syslogProcessingQueue.size(); |
474 | syslogWriterQS[currPos] = g_syslogWriteQueue.size(); | |
f1784ab6 VK |
475 | currPos++; |
476 | if (currPos == 12) | |
477 | currPos = 0; | |
5039dede AK |
478 | |
479 | // Calculate new averages | |
b239e165 | 480 | for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0, sum10 = 0; i < 12; i++) |
5039dede | 481 | { |
b239e165 | 482 | sum1 += dataCollectorQS[i]; |
f1784ab6 VK |
483 | sum2 += dbWriterQS[i]; |
484 | sum3 += iDataWriterQS[i]; | |
76fcb995 VK |
485 | sum4 += rawDataWriterQS[i]; |
486 | sum5 += dbAndIDataWriterQS[i]; | |
76fcb995 VK |
487 | sum8 += syslogProcessingQS[i]; |
488 | sum9 += syslogWriterQS[i]; | |
b239e165 | 489 | sum10 += pollerQS[i]; |
5039dede | 490 | } |
b239e165 | 491 | g_dAvgDataCollectorQueueSize = sum1 / 12; |
f1784ab6 VK |
492 | g_dAvgDBWriterQueueSize = sum2 / 12; |
493 | g_dAvgIDataWriterQueueSize = sum3 / 12; | |
76fcb995 VK |
494 | g_dAvgRawDataWriterQueueSize = sum4 / 12; |
495 | g_dAvgDBAndIDataWriterQueueSize = sum5 / 12; | |
76fcb995 VK |
496 | g_dAvgSyslogProcessingQueueSize = sum8 / 12; |
497 | g_dAvgSyslogWriterQueueSize = sum9 / 12; | |
b239e165 | 498 | g_dAvgPollerQueueSize = sum10 / 12; |
5039dede AK |
499 | } |
500 | return THREAD_OK; | |
501 | } | |
502 | ||
d140955e VK |
503 | /** |
504 | * DCI cache loader | |
505 | */ | |
506 | THREAD_RESULT THREAD_CALL CacheLoader(void *arg) | |
507 | { | |
930a2a62 | 508 | ThreadSetName("CacheLoader"); |
d140955e VK |
509 | DbgPrintf(2, _T("DCI cache loader thread started")); |
510 | while(true) | |
511 | { | |
5eecfc25 VK |
512 | DCObjectInfo *ref = (DCObjectInfo *)g_dciCacheLoaderQueue.getOrBlock(); |
513 | if (ref == INVALID_POINTER_VALUE) | |
d140955e VK |
514 | break; |
515 | ||
5eecfc25 VK |
516 | NetObj *object = FindObjectById(ref->getOwnerId()); |
517 | if ((object != NULL) && object->isDataCollectionTarget()) | |
518 | { | |
519 | object->incRefCount(); | |
520 | DCObject *dci = static_cast<DataCollectionTarget*>(object)->getDCObjectById(ref->getId(), true); | |
17dec829 | 521 | if ((dci != NULL) && (dci->getType() == DCO_TYPE_ITEM)) |
5eecfc25 VK |
522 | { |
523 | DbgPrintf(6, _T("Loading cache for DCI %s [%d] on %s [%d]"), | |
524 | ref->getName(), ref->getId(), object->getName(), object->getId()); | |
525 | static_cast<DCItem*>(dci)->reloadCache(); | |
526 | } | |
527 | object->decRefCount(); | |
528 | } | |
3629bf1c | 529 | delete ref; |
d140955e VK |
530 | } |
531 | DbgPrintf(2, _T("DCI cache loader thread stopped")); | |
532 | return THREAD_OK; | |
533 | } | |
534 | ||
6fd6de0a VK |
535 | /** |
536 | * Initialize data collection subsystem | |
537 | */ | |
b239e165 | 538 | void InitDataCollector() |
5039dede AK |
539 | { |
540 | int i, iNumCollectors; | |
541 | ||
ad221861 VK |
542 | g_dataCollectorThreadPool = ThreadPoolCreate( |
543 | ConfigReadInt(_T("DataCollector.ThreadPool.BaseSize"), 10), | |
544 | ConfigReadInt(_T("DataCollector.ThreadPool.MaxSize"), 250), | |
545 | _T("DATACOLL")); | |
5039dede | 546 | |
5039dede | 547 | ThreadCreate(ItemPoller, 0, NULL); |
5039dede | 548 | ThreadCreate(StatCollector, 0, NULL); |
d140955e | 549 | ThreadCreate(CacheLoader, 0, NULL); |
5039dede AK |
550 | } |
551 | ||
d140955e VK |
552 | /** |
553 | * Update parameter list from node | |
554 | */ | |
6aba3998 VK |
555 | static void UpdateParamList(NetObj *object, void *data) |
556 | { | |
86c126f5 | 557 | ObjectArray<AgentParameterDefinition> *fullList = (ObjectArray<AgentParameterDefinition> *)data; |
6aba3998 | 558 | |
86c126f5 | 559 | ObjectArray<AgentParameterDefinition> *paramList; |
cc8ce218 VK |
560 | ((Node *)object)->openParamList(¶mList); |
561 | if ((paramList != NULL) && (paramList->size() > 0)) | |
6aba3998 | 562 | { |
cc8ce218 | 563 | for(int i = 0; i < paramList->size(); i++) |
889d7ff7 | 564 | { |
86c126f5 VK |
565 | int j; |
566 | for(j = 0; j < fullList->size(); j++) | |
889d7ff7 | 567 | { |
86c126f5 | 568 | if (!_tcsicmp(paramList->get(i)->getName(), fullList->get(j)->getName())) |
6aba3998 VK |
569 | break; |
570 | } | |
571 | ||
86c126f5 | 572 | if (j == fullList->size()) |
6aba3998 | 573 | { |
86c126f5 | 574 | fullList->add(new AgentParameterDefinition(paramList->get(i))); |
889d7ff7 | 575 | } |
889d7ff7 | 576 | } |
6aba3998 | 577 | } |
f1ff4cc9 | 578 | ((Node *)object)->closeParamList(); |
6aba3998 VK |
579 | } |
580 | ||
d140955e VK |
581 | /** |
582 | * Update table list from node | |
583 | */ | |
074498ac VK |
584 | static void UpdateTableList(NetObj *object, void *data) |
585 | { | |
86c126f5 | 586 | ObjectArray<AgentTableDefinition> *fullList = (ObjectArray<AgentTableDefinition> *)data; |
074498ac | 587 | |
86c126f5 | 588 | ObjectArray<AgentTableDefinition> *tableList; |
074498ac VK |
589 | ((Node *)object)->openTableList(&tableList); |
590 | if ((tableList != NULL) && (tableList->size() > 0)) | |
591 | { | |
074498ac VK |
592 | for(int i = 0; i < tableList->size(); i++) |
593 | { | |
86c126f5 VK |
594 | int j; |
595 | for(j = 0; j < fullList->size(); j++) | |
074498ac | 596 | { |
86c126f5 | 597 | if (!_tcsicmp(tableList->get(i)->getName(), fullList->get(j)->getName())) |
074498ac VK |
598 | break; |
599 | } | |
600 | ||
86c126f5 | 601 | if (j == fullList->size()) |
074498ac | 602 | { |
86c126f5 | 603 | fullList->add(new AgentTableDefinition(tableList->get(i))); |
074498ac VK |
604 | } |
605 | } | |
606 | } | |
607 | ((Node *)object)->closeTableList(); | |
608 | } | |
609 | ||
d140955e VK |
610 | /** |
611 | * Write full (from all nodes) agent parameters list to NXCP message | |
612 | */ | |
b368969c | 613 | void WriteFullParamListToMessage(NXCPMessage *pMsg, WORD flags) |
6aba3998 VK |
614 | { |
615 | // Gather full parameter list | |
074498ac VK |
616 | if (flags & 0x01) |
617 | { | |
86c126f5 | 618 | ObjectArray<AgentParameterDefinition> fullList(64, 64, true); |
074498ac VK |
619 | g_idxNodeById.forEach(UpdateParamList, &fullList); |
620 | ||
621 | // Put list into the message | |
b368969c | 622 | pMsg->setField(VID_NUM_PARAMETERS, (UINT32)fullList.size()); |
967893bb | 623 | UINT32 varId = VID_PARAM_LIST_BASE; |
86c126f5 | 624 | for(int i = 0; i < fullList.size(); i++) |
074498ac | 625 | { |
86c126f5 | 626 | varId += fullList.get(i)->fillMessage(pMsg, varId); |
074498ac | 627 | } |
074498ac VK |
628 | } |
629 | ||
630 | // Gather full table list | |
631 | if (flags & 0x02) | |
632 | { | |
86c126f5 | 633 | ObjectArray<AgentTableDefinition> fullList(64, 64, true); |
074498ac VK |
634 | g_idxNodeById.forEach(UpdateTableList, &fullList); |
635 | ||
636 | // Put list into the message | |
b368969c | 637 | pMsg->setField(VID_NUM_TABLES, (UINT32)fullList.size()); |
967893bb | 638 | UINT32 varId = VID_TABLE_LIST_BASE; |
86c126f5 | 639 | for(int i = 0; i < fullList.size(); i++) |
074498ac | 640 | { |
86c126f5 | 641 | varId += fullList.get(i)->fillMessage(pMsg, varId); |
074498ac | 642 | } |
074498ac | 643 | } |
5039dede | 644 | } |
9fddfb91 VK |
645 | |
646 | /** | |
647 | * Get type of data collection object | |
648 | */ | |
649 | int GetDCObjectType(UINT32 nodeId, UINT32 dciId) | |
650 | { | |
651 | Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE); | |
652 | if (node != NULL) | |
653 | { | |
654 | DCObject *dco = node->getDCObjectById(dciId); | |
655 | if (dco != NULL) | |
656 | { | |
657 | return dco->getType(); | |
658 | } | |
659 | } | |
660 | return DCO_TYPE_ITEM; // default | |
661 | } |