all pollers converted to single thread pool
[public/netxms.git] / src / server / core / poll.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: poll.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 /**
26 * Node poller queue (polls new nodes)
27 */
28 Queue g_nodePollerQueue;
29
30 /**
31 * Thread pool for pollers
32 */
33 ThreadPool *g_pollerThreadPool = NULL;
34
35 /**
36 * Active pollers
37 */
38 static HashMap<UINT64, PollerInfo> s_pollers(false);
39 static MUTEX s_pollerLock = MutexCreate();
40
41 /**
42 * Poller info destructor - will unregister poller and decrease ref count on object
43 */
44 PollerInfo::~PollerInfo()
45 {
46 MutexLock(s_pollerLock);
47 s_pollers.remove(CAST_FROM_POINTER(this, UINT64));
48 MutexUnlock(s_pollerLock);
49 m_object->decRefCount();
50 }
51
52 /**
53 * Register active poller
54 */
55 PollerInfo *RegisterPoller(PollerType type, NetObj *object)
56 {
57 PollerInfo *p = new PollerInfo(type, object);
58 object->incRefCount();
59 MutexLock(s_pollerLock);
60 s_pollers.set(CAST_FROM_POINTER(p, UINT64), p);
61 MutexUnlock(s_pollerLock);
62 return p;
63 }
64
65 /**
66 * Show poller information on console
67 */
68 static EnumerationCallbackResult ShowPollerInfo(const void *key, const void *object, void *arg)
69 {
70 static TCHAR *pollerType[] = { _T("STAT"), _T("CONF"), _T("INST"), _T("ROUT"), _T("DISC"), _T("BSVC"), _T("COND"), _T("TOPO") };
71
72 PollerInfo *p = (PollerInfo *)object;
73 NetObj *o = p->getObject();
74
75 TCHAR name[32];
76 nx_strncpy(name, o->getName(), 31);
77 ConsolePrintf((CONSOLE_CTX)arg, _T("%s | %9d | %-30s | %s\n"), pollerType[p->getType()], o->getId(), name, p->getStatus());
78
79 return _CONTINUE;
80 }
81
82 /**
83 * Get poller diagnostic
84 */
85 void ShowPollers(CONSOLE_CTX console)
86 {
87 ConsoleWrite(console, _T("Type | Object ID | Object name | Status\n")
88 _T("-----+-----------+--------------------------------+--------------------------\n"));
89 MutexLock(s_pollerLock);
90 s_pollers.forEach(ShowPollerInfo, console);
91 MutexUnlock(s_pollerLock);
92 }
93
94 /**
95 * Create management node object
96 */
97 static void CreateManagementNode(const InetAddress& addr)
98 {
99 TCHAR buffer[256];
100
101 Node *pNode = new Node(addr, NF_IS_LOCAL_MGMT, 0, 0, 0);
102 NetObjInsert(pNode, TRUE);
103 pNode->setName(GetLocalHostName(buffer, 256));
104
105 PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
106 p->startExecution();
107 pNode->configurationPoll(NULL, 0, p, addr.getMaskBits());
108 delete p;
109
110 pNode->unhide();
111 g_dwMgmtNode = pNode->getId(); // Set local management node ID
112 PostEvent(EVENT_NODE_ADDED, pNode->getId(), NULL);
113
114 // Bind to the root of service tree
115 g_pServiceRoot->AddChild(pNode);
116 pNode->AddParent(g_pServiceRoot);
117
118 // Add default data collection items
119 int pollingInterval = ConfigReadInt(_T("DefaultDCIPollingInterval"), 60);
120 int retentionTime = ConfigReadInt(_T("DefaultDCIRetentionTime"), 30);
121 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM), _T("Status"),
122 DS_INTERNAL, DCI_DT_INT, pollingInterval, retentionTime, pNode));
123 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
124 _T("Server.AverageDCPollerQueueSize"),
125 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
126 _T("Data collection poller's request queue for last minute")));
127 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
128 _T("Server.AverageDBWriterQueueSize"),
129 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
130 _T("Database writer's request queue for last minute")));
131 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
132 _T("Server.AverageDBWriterQueueSize.IData"),
133 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
134 _T("Database writer's request queue (DCI data) for last minute")));
135 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
136 _T("Server.AverageDBWriterQueueSize.Other"),
137 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
138 _T("Database writer's request queue (other queries) for last minute")));
139 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
140 _T("Server.AverageDCIQueuingTime"),
141 DS_INTERNAL, DCI_DT_UINT, pollingInterval, retentionTime, pNode,
142 _T("Average time to queue DCI for polling for last minute")));
143 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
144 _T("Server.AverageStatusPollerQueueSize"),
145 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
146 _T("Status poller queue for last minute")));
147 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
148 _T("Server.AverageConfigurationPollerQueueSize"),
149 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
150 _T("Configuration poller queue for last minute")));
151 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
152 _T("Server.AverageSyslogProcessingQueueSize"),
153 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
154 _T("Syslog processing queue for last minute")));
155 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
156 _T("Server.AverageSyslogWriterQueueSize"),
157 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
158 _T("Syslog writer queue for last minute")));
159 DCItem *pEventsPerMinuteDCI = new DCItem(CreateUniqueId(IDG_ITEM),
160 _T("Server.TotalEventsProcessed"),
161 DS_INTERNAL, DCI_DT_UINT, pollingInterval, retentionTime, pNode,
162 _T("Events processed for last minute"));
163 pEventsPerMinuteDCI->setDeltaCalcMethod(DCM_AVERAGE_PER_MINUTE);
164 pNode->addDCObject(pEventsPerMinuteDCI);
165 }
166
167 /**
168 * Callback to clear incorrectly set local management node flag
169 */
170 static void CheckMgmtFlagCallback(NetObj *object, void *data)
171 {
172 if ((g_dwMgmtNode != object->getId()) && ((Node *)object)->isLocalManagement())
173 {
174 ((Node *)object)->clearLocalMgmtFlag();
175 DbgPrintf(2, _T("Incorrectly set flag NF_IS_LOCAL_MGMT cleared from node %s [%d]"),
176 object->getName(), object->getId());
177 }
178 }
179
180 /**
181 * Comparator to find management node object in existing nodes
182 */
183 static bool LocalMgmtNodeComparator(NetObj *object, void *data)
184 {
185 return ((Node *)object)->isLocalManagement();
186 }
187
188 /**
189 * Check if management server node presented in node list
190 */
191 void CheckForMgmtNode()
192 {
193 InterfaceList *pIfList;
194 Node *pNode;
195 int i;
196
197 pIfList = GetLocalInterfaceList();
198 if (pIfList != NULL)
199 {
200 for(i = 0; i < pIfList->size(); i++)
201 {
202 InterfaceInfo *iface = pIfList->get(i);
203 if (iface->type == IFTYPE_SOFTWARE_LOOPBACK)
204 continue;
205 if ((pNode = FindNodeByIP(0, &iface->ipAddrList)) != NULL)
206 {
207 // Check management node flag
208 if (!(pNode->getFlags() & NF_IS_LOCAL_MGMT))
209 {
210 pNode->setLocalMgmtFlag();
211 DbgPrintf(1, _T("Local management node %s [%d] was not have NF_IS_LOCAL_MGMT flag set"), pNode->getName(), pNode->getId());
212 }
213 g_dwMgmtNode = pNode->getId(); // Set local management node ID
214 break;
215 }
216 }
217 if (i == pIfList->size()) // No such node
218 {
219 // Find interface with IP address
220 for(i = 0; i < pIfList->size(); i++)
221 {
222 InterfaceInfo *iface = pIfList->get(i);
223 if ((iface->type == IFTYPE_SOFTWARE_LOOPBACK) || (iface->ipAddrList.size() == 0))
224 continue;
225
226 for(int j = 0; j < iface->ipAddrList.size(); j++)
227 {
228 const InetAddress& addr = iface->ipAddrList.get(j);
229 if (addr.isValidUnicast())
230 {
231 CreateManagementNode(addr);
232 i = pIfList->size(); // stop walking interface list
233 break;
234 }
235 }
236 }
237 }
238 delete pIfList;
239 }
240
241 if (g_dwMgmtNode != 0)
242 {
243 // Check that other nodes does not have NF_IS_LOCAL_MGMT flag set
244 g_idxNodeById.forEach(CheckMgmtFlagCallback, NULL);
245 }
246 else
247 {
248 // Management node cannot be found or created. This can happen
249 // if management node currently does not have IP addresses (for example,
250 // it's a Windows machine which is disconnected from the network).
251 // In this case, try to find any node with NF_IS_LOCAL_MGMT flag, or create
252 // new one without interfaces
253 NetObj *mgmtNode = g_idxNodeById.find(LocalMgmtNodeComparator, NULL);
254 if (mgmtNode != NULL)
255 {
256 g_dwMgmtNode = mgmtNode->getId();
257 }
258 else
259 {
260 CreateManagementNode(InetAddress());
261 }
262 }
263 }
264
265 /**
266 * Comparator for poller queue elements
267 */
268 static bool PollerQueueElementComparator(void *key, void *element)
269 {
270 return ((InetAddress *)key)->equals(((NEW_NODE *)element)->ipAddr);
271 }
272
273 /**
274 * Check potential new node from ARP cache or routing table
275 */
276 static void CheckPotentialNode(Node *node, const InetAddress& ipAddr, UINT32 ifIndex, BYTE *macAddr = NULL)
277 {
278 TCHAR buffer[64];
279
280 DbgPrintf(6, _T("DiscoveryPoller(): checking potential node %s at %d"), ipAddr.toString(buffer), ifIndex);
281 if (ipAddr.isValid() && !ipAddr.isBroadcast() && !ipAddr.isLoopback() && !ipAddr.isMulticast() &&
282 (FindNodeByIP(node->getZoneId(), ipAddr) == NULL) && !IsClusterIP(node->getZoneId(), ipAddr) &&
283 (g_nodePollerQueue.find((void *)&ipAddr, PollerQueueElementComparator) == NULL))
284 {
285 Interface *pInterface = node->findInterfaceByIndex(ifIndex);
286 if (pInterface != NULL)
287 {
288 const InetAddress& interfaceAddress = pInterface->getIpAddressList()->findSameSubnetAddress(ipAddr);
289 if (interfaceAddress.isValidUnicast())
290 {
291 DbgPrintf(6, _T("DiscoveryPoller(): interface found: %s [%d] addr=%s/%d ifIndex=%d"),
292 pInterface->getName(), pInterface->getId(), interfaceAddress.toString(buffer), interfaceAddress.getMaskBits(), pInterface->getIfIndex());
293 if (!ipAddr.isSubnetBroadcast(interfaceAddress.getMaskBits()))
294 {
295 NEW_NODE *pInfo;
296 TCHAR buffer[64];
297
298 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
299 pInfo->ipAddr = ipAddr;
300 pInfo->ipAddr.setMaskBits(interfaceAddress.getMaskBits());
301 pInfo->zoneId = node->getZoneId();
302 pInfo->ignoreFilter = FALSE;
303 if (macAddr == NULL)
304 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
305 else
306 memcpy(pInfo->bMacAddr, macAddr, MAC_ADDR_LENGTH);
307 DbgPrintf(5, _T("DiscoveryPoller(): new node queued: %s/%d"),
308 pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
309 g_nodePollerQueue.put(pInfo);
310 }
311 else
312 {
313 DbgPrintf(6, _T("DiscoveryPoller(): potential node %s rejected - broadcast/multicast address"), ipAddr.toString(buffer));
314 }
315 }
316 else
317 {
318 DbgPrintf(6, _T("DiscoveryPoller(): interface object found but IP address not found"));
319 }
320 }
321 else
322 {
323 DbgPrintf(6, _T("DiscoveryPoller(): interface object not found"));
324 }
325 }
326 else
327 {
328 DbgPrintf(6, _T("DiscoveryPoller(): potential node %s rejected"), ipAddr.toString(buffer));
329 }
330 }
331
332 /**
333 * Check host route
334 * Host will be added if it is directly connected
335 */
336 static void CheckHostRoute(Node *node, ROUTE *route)
337 {
338 TCHAR buffer[16];
339 Interface *iface;
340
341 DbgPrintf(6, _T("DiscoveryPoller(): checking host route %s at %d"), IpToStr(route->dwDestAddr, buffer), route->dwIfIndex);
342 iface = node->findInterfaceByIndex(route->dwIfIndex);
343 if ((iface != NULL) && iface->getIpAddressList()->findSameSubnetAddress(route->dwDestAddr).isValidUnicast())
344 {
345 CheckPotentialNode(node, route->dwDestAddr, route->dwIfIndex);
346 }
347 else
348 {
349 DbgPrintf(6, _T("DiscoveryPoller(): interface object not found for host route"));
350 }
351 }
352
353 /**
354 * Discovery poller
355 */
356 static void DiscoveryPoller(void *arg)
357 {
358 // TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
359 // ARP_CACHE *pArpCache;
360 // ROUTING_TABLE *rt;
361
362 PollerInfo *poller = (PollerInfo *)arg;
363 poller->startExecution();
364
365 Node *pNode = (Node *)poller->getObject();
366 if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
367 {
368 pNode->setDiscoveryPollTimeStamp();
369 delete poller;
370 return;
371 }
372
373 DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
374 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString(), (int)pNode->getZoneId());
375
376 // Retrieve and analize node's ARP cache
377 ARP_CACHE *pArpCache = pNode->getArpCache();
378 if (pArpCache != NULL)
379 {
380 for(UINT32 i = 0; i < pArpCache->dwNumEntries; i++)
381 if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6)) // Ignore broadcast addresses
382 CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
383 DestroyArpCache(pArpCache);
384 }
385
386 // Retrieve and analize node's routing table
387 DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
388 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
389 ROUTING_TABLE *rt = pNode->getRoutingTable();
390 if (rt != NULL)
391 {
392 for(int i = 0; i < rt->iNumEntries; i++)
393 {
394 CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
395 if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
396 CheckHostRoute(pNode, &rt->pRoutes[i]);
397 }
398 DestroyRoutingTable(rt);
399 }
400
401 DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
402 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
403 pNode->setDiscoveryPollTimeStamp();
404 delete poller;
405 }
406
407 /**
408 * Check given address range with ICMP ping for new nodes
409 */
410 static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
411 {
412 UINT32 dwAddr, dwFrom, dwTo;
413 TCHAR szIpAddr1[16], szIpAddr2[16];
414
415 if (nType == 0)
416 {
417 dwFrom = (dwAddr1 & dwAddr2) + 1;
418 dwTo = dwFrom | ~dwAddr2 - 1;
419 }
420 else
421 {
422 dwFrom = dwAddr1;
423 dwTo = dwAddr2;
424 }
425 DbgPrintf(4, _T("Starting active discovery check on range %s - %s"),
426 IpToStr(dwFrom, szIpAddr1), IpToStr(dwTo, szIpAddr2));
427
428 for(dwAddr = dwFrom; dwAddr <= dwTo; dwAddr++)
429 {
430 if (IcmpPing(htonl(dwAddr), 3, g_icmpPingTimeout, NULL, g_icmpPingSize) == ICMP_SUCCESS)
431 {
432 DbgPrintf(5, _T("Active discovery - node %s responds to ICMP ping"),
433 IpToStr(dwAddr, szIpAddr1));
434 if (FindNodeByIP(0, dwAddr) == NULL)
435 {
436 Subnet *pSubnet;
437
438 pSubnet = FindSubnetForNode(0, dwAddr);
439 if (pSubnet != NULL)
440 {
441 if (!pSubnet->getIpAddress().equals(dwAddr) &&
442 !InetAddress(dwAddr).isSubnetBroadcast(pSubnet->getIpAddress().getMaskBits()))
443 {
444 NEW_NODE *pInfo;
445
446 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
447 pInfo->ipAddr = dwAddr;
448 pInfo->ipAddr.setMaskBits(pSubnet->getIpAddress().getMaskBits());
449 pInfo->zoneId = 0; /* FIXME: add correct zone ID */
450 pInfo->ignoreFilter = FALSE;
451 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
452 g_nodePollerQueue.put(pInfo);
453 }
454 }
455 else
456 {
457 NEW_NODE *pInfo;
458
459 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
460 pInfo->ipAddr = dwAddr;
461 pInfo->zoneId = 0; /* FIXME: add correct zone ID */
462 pInfo->ignoreFilter = FALSE;
463 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
464 g_nodePollerQueue.put(pInfo);
465 }
466 }
467 }
468 }
469
470 DbgPrintf(4, _T("Finished active discovery check on range %s - %s"),
471 IpToStr(dwFrom, szIpAddr1), IpToStr(dwTo, szIpAddr2));
472 }
473
474 /**
475 * Active discovery poller thread
476 */
477 static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
478 {
479 int nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
480
481 // Main loop
482 while(!IsShutdownInProgress())
483 {
484 if (SleepAndCheckForShutdown(nInterval))
485 break;
486
487 if (!(g_flags & AF_ACTIVE_NETWORK_DISCOVERY))
488 continue;
489
490 DB_RESULT hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
491 if (hResult != NULL)
492 {
493 int nRows = DBGetNumRows(hResult);
494 for(int i = 0; i < nRows; i++)
495 {
496 CheckRange(DBGetFieldLong(hResult, i, 0),
497 DBGetFieldIPAddr(hResult, i, 1),
498 DBGetFieldIPAddr(hResult, i, 2));
499 }
500 DBFreeResult(hResult);
501 }
502 }
503 return THREAD_OK;
504 }
505
506 /**
507 * Callback for queueing objects for polling
508 */
509 static void QueueForPolling(NetObj *object, void *data)
510 {
511 switch(object->getObjectClass())
512 {
513 case OBJECT_NODE:
514 {
515 Node *node = (Node *)object;
516 if (node->isReadyForConfigurationPoll())
517 {
518 node->lockForConfigurationPoll();
519 DbgPrintf(6, _T("Node %d \"%s\" queued for configuration poll"), (int)node->getId(), node->getName());
520 ThreadPoolExecute(g_pollerThreadPool, node, &Node::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, node));
521 }
522 if (node->isReadyForInstancePoll())
523 {
524 node->lockForInstancePoll();
525 DbgPrintf(6, _T("Node %d \"%s\" queued for instance discovery poll"), (int)node->getId(), node->getName());
526 ThreadPoolExecute(g_pollerThreadPool, node, &Node::instanceDiscoveryPoll, RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, node));
527 }
528 if (node->isReadyForStatusPoll())
529 {
530 node->lockForStatusPoll();
531 DbgPrintf(6, _T("Node %d \"%s\" queued for status poll"), (int)node->getId(), node->getName());
532 ThreadPoolExecute(g_pollerThreadPool, node, &Node::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, node));
533 }
534 if (node->isReadyForRoutePoll())
535 {
536 node->lockForRoutePoll();
537 DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
538 ThreadPoolExecute(g_pollerThreadPool, node, &Node::routingTablePoll, RegisterPoller(POLLER_TYPE_ROUTING_TABLE, node));
539 }
540 if (node->isReadyForDiscoveryPoll())
541 {
542 node->lockForDiscoveryPoll();
543 DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
544 ThreadPoolExecute(g_pollerThreadPool, DiscoveryPoller, RegisterPoller(POLLER_TYPE_DISCOVERY, node));
545 }
546 if (node->isReadyForTopologyPoll())
547 {
548 node->lockForTopologyPoll();
549 DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
550 ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
551 }
552 }
553 break;
554 case OBJECT_CONDITION:
555 {
556 Condition *cond = (Condition *)object;
557 if (cond->isReadyForPoll())
558 {
559 cond->lockForPoll();
560 DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
561 ThreadPoolExecute(g_pollerThreadPool, cond, &Condition::doPoll, RegisterPoller(POLLER_TYPE_CONDITION, cond));
562 }
563 }
564 break;
565 case OBJECT_CLUSTER:
566 {
567 Cluster *cluster = (Cluster *)object;
568 if (cluster->isReadyForStatusPoll())
569 {
570 cluster->lockForStatusPoll();
571 DbgPrintf(6, _T("Cluster %d \"%s\" queued for status poll"), (int)cluster->getId(), cluster->getName());
572 ThreadPoolExecute(g_pollerThreadPool, cluster, &Cluster::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, cluster));
573 }
574 }
575 break;
576 case OBJECT_BUSINESSSERVICE:
577 {
578 BusinessService *service = (BusinessService *)object;
579 if (service->isReadyForPolling())
580 {
581 service->lockForPolling();
582 DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
583 ThreadPoolExecute(g_pollerThreadPool, service, &BusinessService::poll, RegisterPoller(POLLER_TYPE_BUSINESS_SERVICE, service));
584 }
585 }
586 break;
587 default:
588 break;
589 }
590 }
591
592 /**
593 * Node and condition queuing thread
594 */
595 THREAD_RESULT THREAD_CALL PollManager(void *pArg)
596 {
597 g_pollerThreadPool = ThreadPoolCreate(ConfigReadInt(_T("PollerThreadPoolBaseSize"), 10), ConfigReadInt(_T("PollerThreadPoolMaxSize"), 250), _T("POLLERS"));
598
599 // Start active discovery poller
600 ThreadCreate(ActiveDiscoveryPoller, 0, NULL);
601
602 UINT32 watchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
603 int counter = 0;
604
605 while(!IsShutdownInProgress())
606 {
607 if (SleepAndCheckForShutdown(5))
608 break; // Shutdown has arrived
609 WatchdogNotify(watchdogId);
610
611 // Check for management node every 10 minutes
612 counter++;
613 if (counter % 120 == 0)
614 {
615 counter = 0;
616 CheckForMgmtNode();
617 }
618
619 // Walk through objects and queue them for status
620 // and/or configuration poll
621 g_idxObjectById.forEach(QueueForPolling, NULL);
622 }
623
624 g_nodePollerQueue.clear();
625 g_nodePollerQueue.put(INVALID_POINTER_VALUE);
626
627 ThreadPoolDestroy(g_pollerThreadPool);
628 DbgPrintf(1, _T("PollManager: main thread terminated"));
629 return THREAD_OK;
630 }
631
632 /**
633 * Reset discovery poller after configuration change
634 */
635 void ResetDiscoveryPoller()
636 {
637 NEW_NODE *pInfo;
638
639 // Clear node poller queue
640 while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
641 {
642 if (pInfo != INVALID_POINTER_VALUE)
643 free(pInfo);
644 }
645
646 // Reload discovery parameters
647 g_dwDiscoveryPollingInterval = ConfigReadInt(_T("DiscoveryPollingInterval"), 900);
648 if (ConfigReadInt(_T("RunNetworkDiscovery"), 0))
649 g_flags |= AF_ENABLE_NETWORK_DISCOVERY;
650 else
651 g_flags &= ~AF_ENABLE_NETWORK_DISCOVERY;
652
653 if (ConfigReadInt(_T("ActiveNetworkDiscovery"), 0))
654 g_flags |= AF_ACTIVE_NETWORK_DISCOVERY;
655 else
656 g_flags &= ~AF_ACTIVE_NETWORK_DISCOVERY;
657
658 if (ConfigReadInt(_T("UseSNMPTrapsForDiscovery"), 0))
659 g_flags |= AF_SNMP_TRAP_DISCOVERY;
660 else
661 g_flags &= ~AF_SNMP_TRAP_DISCOVERY;
662 }