all pollers converted to single thread pool
[public/netxms.git] / src / server / core / poll.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS - Network Management System
805171de 3** Copyright (C) 2003-2015 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: poll.cpp
20**
21**/
22
23#include "nxcore.h"
24
8f3acc9b 25/**
208d7427 26 * Node poller queue (polls new nodes)
8f3acc9b 27 */
208d7427
VK
28Queue g_nodePollerQueue;
29
30/**
31 * Thread pool for pollers
32 */
33ThreadPool *g_pollerThreadPool = NULL;
34
35/**
36 * Active pollers
37 */
38static HashMap<UINT64, PollerInfo> s_pollers(false);
39static MUTEX s_pollerLock = MutexCreate();
40
41/**
42 * Poller info destructor - will unregister poller and decrease ref count on object
43 */
44PollerInfo::~PollerInfo()
5039dede 45{
208d7427
VK
46 MutexLock(s_pollerLock);
47 s_pollers.remove(CAST_FROM_POINTER(this, UINT64));
48 MutexUnlock(s_pollerLock);
49 m_object->decRefCount();
50}
5039dede 51
8f3acc9b 52/**
208d7427 53 * Register active poller
8f3acc9b 54 */
208d7427
VK
55PollerInfo *RegisterPoller(PollerType type, NetObj *object)
56{
57 PollerInfo *p = new PollerInfo(type, object);
58 object->incRefCount();
59 MutexLock(s_pollerLock);
60 s_pollers.set(CAST_FROM_POINTER(p, UINT64), p);
61 MutexUnlock(s_pollerLock);
62 return p;
63}
5039dede 64
8f3acc9b 65/**
208d7427 66 * Show poller information on console
8f3acc9b 67 */
208d7427
VK
68static EnumerationCallbackResult ShowPollerInfo(const void *key, const void *object, void *arg)
69{
70 static TCHAR *pollerType[] = { _T("STAT"), _T("CONF"), _T("INST"), _T("ROUT"), _T("DISC"), _T("BSVC"), _T("COND"), _T("TOPO") };
71
72 PollerInfo *p = (PollerInfo *)object;
73 NetObj *o = p->getObject();
74
75 TCHAR name[32];
76 nx_strncpy(name, o->getName(), 31);
77 ConsolePrintf((CONSOLE_CTX)arg, _T("%s | %9d | %-30s | %s\n"), pollerType[p->getType()], o->getId(), name, p->getStatus());
78
79 return _CONTINUE;
80}
81
82/**
83 * Get poller diagnostic
84 */
85void ShowPollers(CONSOLE_CTX console)
86{
87 ConsoleWrite(console, _T("Type | Object ID | Object name | Status\n")
88 _T("-----+-----------+--------------------------------+--------------------------\n"));
89 MutexLock(s_pollerLock);
90 s_pollers.forEach(ShowPollerInfo, console);
91 MutexUnlock(s_pollerLock);
92}
5039dede 93
8f3acc9b
VK
94/**
95 * Create management node object
96 */
c30c0c0f 97static void CreateManagementNode(const InetAddress& addr)
d749debc 98{
f7694811
VK
99 TCHAR buffer[256];
100
c30c0c0f 101 Node *pNode = new Node(addr, NF_IS_LOCAL_MGMT, 0, 0, 0);
d749debc 102 NetObjInsert(pNode, TRUE);
f7694811 103 pNode->setName(GetLocalHostName(buffer, 256));
208d7427
VK
104
105 PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
106 p->startExecution();
107 pNode->configurationPoll(NULL, 0, p, addr.getMaskBits());
108 delete p;
109
478d4ff4 110 pNode->unhide();
c42b4551
VK
111 g_dwMgmtNode = pNode->getId(); // Set local management node ID
112 PostEvent(EVENT_NODE_ADDED, pNode->getId(), NULL);
f7694811
VK
113
114 // Bind to the root of service tree
115 g_pServiceRoot->AddChild(pNode);
116 pNode->AddParent(g_pServiceRoot);
d749debc
VK
117
118 // Add default data collection items
f05a8a45
VK
119 int pollingInterval = ConfigReadInt(_T("DefaultDCIPollingInterval"), 60);
120 int retentionTime = ConfigReadInt(_T("DefaultDCIRetentionTime"), 30);
16d6f798 121 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM), _T("Status"),
f05a8a45 122 DS_INTERNAL, DCI_DT_INT, pollingInterval, retentionTime, pNode));
16d6f798
VK
123 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
124 _T("Server.AverageDCPollerQueueSize"),
f05a8a45 125 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798
VK
126 _T("Data collection poller's request queue for last minute")));
127 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
128 _T("Server.AverageDBWriterQueueSize"),
f05a8a45 129 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798
VK
130 _T("Database writer's request queue for last minute")));
131 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
132 _T("Server.AverageDBWriterQueueSize.IData"),
f05a8a45 133 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798
VK
134 _T("Database writer's request queue (DCI data) for last minute")));
135 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
136 _T("Server.AverageDBWriterQueueSize.Other"),
f05a8a45 137 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798
VK
138 _T("Database writer's request queue (other queries) for last minute")));
139 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
140 _T("Server.AverageDCIQueuingTime"),
f05a8a45 141 DS_INTERNAL, DCI_DT_UINT, pollingInterval, retentionTime, pNode,
16d6f798
VK
142 _T("Average time to queue DCI for polling for last minute")));
143 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
144 _T("Server.AverageStatusPollerQueueSize"),
f05a8a45 145 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798
VK
146 _T("Status poller queue for last minute")));
147 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
148 _T("Server.AverageConfigurationPollerQueueSize"),
f05a8a45 149 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
16d6f798 150 _T("Configuration poller queue for last minute")));
f1784ab6
VK
151 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
152 _T("Server.AverageSyslogProcessingQueueSize"),
153 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
154 _T("Syslog processing queue for last minute")));
155 pNode->addDCObject(new DCItem(CreateUniqueId(IDG_ITEM),
156 _T("Server.AverageSyslogWriterQueueSize"),
157 DS_INTERNAL, DCI_DT_FLOAT, pollingInterval, retentionTime, pNode,
158 _T("Syslog writer queue for last minute")));
3c8e169e
VK
159 DCItem *pEventsPerMinuteDCI = new DCItem(CreateUniqueId(IDG_ITEM),
160 _T("Server.TotalEventsProcessed"),
161 DS_INTERNAL, DCI_DT_UINT, pollingInterval, retentionTime, pNode,
162 _T("Events processed for last minute"));
163 pEventsPerMinuteDCI->setDeltaCalcMethod(DCM_AVERAGE_PER_MINUTE);
164 pNode->addDCObject(pEventsPerMinuteDCI);
d749debc
VK
165}
166
8f3acc9b 167/**
efe76251 168 * Callback to clear incorrectly set local management node flag
8f3acc9b 169 */
d5e19c61
VK
170static void CheckMgmtFlagCallback(NetObj *object, void *data)
171{
c42b4551 172 if ((g_dwMgmtNode != object->getId()) && ((Node *)object)->isLocalManagement())
d5e19c61
VK
173 {
174 ((Node *)object)->clearLocalMgmtFlag();
175 DbgPrintf(2, _T("Incorrectly set flag NF_IS_LOCAL_MGMT cleared from node %s [%d]"),
c42b4551 176 object->getName(), object->getId());
d5e19c61
VK
177 }
178}
179
efe76251
VK
180/**
181 * Comparator to find management node object in existing nodes
182 */
d5e19c61
VK
183static bool LocalMgmtNodeComparator(NetObj *object, void *data)
184{
185 return ((Node *)object)->isLocalManagement();
186}
187
efe76251
VK
188/**
189 * Check if management server node presented in node list
190 */
040c45fa 191void CheckForMgmtNode()
5039dede 192{
98762401 193 InterfaceList *pIfList;
5039dede
AK
194 Node *pNode;
195 int i;
196
197 pIfList = GetLocalInterfaceList();
198 if (pIfList != NULL)
199 {
a6312bd6 200 for(i = 0; i < pIfList->size(); i++)
fcec42a9 201 {
c30c0c0f
VK
202 InterfaceInfo *iface = pIfList->get(i);
203 if (iface->type == IFTYPE_SOFTWARE_LOOPBACK)
fcec42a9 204 continue;
c30c0c0f 205 if ((pNode = FindNodeByIP(0, &iface->ipAddrList)) != NULL)
5039dede
AK
206 {
207 // Check management node flag
7c521895 208 if (!(pNode->getFlags() & NF_IS_LOCAL_MGMT))
5039dede 209 {
7c521895 210 pNode->setLocalMgmtFlag();
c42b4551 211 DbgPrintf(1, _T("Local management node %s [%d] was not have NF_IS_LOCAL_MGMT flag set"), pNode->getName(), pNode->getId());
5039dede 212 }
c42b4551 213 g_dwMgmtNode = pNode->getId(); // Set local management node ID
5039dede
AK
214 break;
215 }
fcec42a9 216 }
a6312bd6 217 if (i == pIfList->size()) // No such node
5039dede
AK
218 {
219 // Find interface with IP address
a6312bd6 220 for(i = 0; i < pIfList->size(); i++)
fcec42a9 221 {
c30c0c0f
VK
222 InterfaceInfo *iface = pIfList->get(i);
223 if ((iface->type == IFTYPE_SOFTWARE_LOOPBACK) || (iface->ipAddrList.size() == 0))
224 continue;
225
226 for(int j = 0; j < iface->ipAddrList.size(); j++)
5039dede 227 {
c30c0c0f
VK
228 const InetAddress& addr = iface->ipAddrList.get(j);
229 if (addr.isValidUnicast())
230 {
231 CreateManagementNode(addr);
1a5cb2b0 232 i = pIfList->size(); // stop walking interface list
c30c0c0f
VK
233 break;
234 }
5039dede 235 }
fcec42a9 236 }
5039dede 237 }
98762401 238 delete pIfList;
5039dede 239 }
4d0c32f3 240
d749debc
VK
241 if (g_dwMgmtNode != 0)
242 {
243 // Check that other nodes does not have NF_IS_LOCAL_MGMT flag set
d5e19c61 244 g_idxNodeById.forEach(CheckMgmtFlagCallback, NULL);
d749debc
VK
245 }
246 else
247 {
248 // Management node cannot be found or created. This can happen
249 // if management node currently does not have IP addresses (for example,
250 // it's a Windows machine which is disconnected from the network).
251 // In this case, try to find any node with NF_IS_LOCAL_MGMT flag, or create
252 // new one without interfaces
d5e19c61
VK
253 NetObj *mgmtNode = g_idxNodeById.find(LocalMgmtNodeComparator, NULL);
254 if (mgmtNode != NULL)
d749debc 255 {
c42b4551 256 g_dwMgmtNode = mgmtNode->getId();
d749debc 257 }
d5e19c61 258 else
d749debc 259 {
c30c0c0f 260 CreateManagementNode(InetAddress());
d749debc
VK
261 }
262 }
5039dede
AK
263}
264
efe76251
VK
265/**
266 * Comparator for poller queue elements
267 */
85f1fea1
VK
268static bool PollerQueueElementComparator(void *key, void *element)
269{
c75e9ee4 270 return ((InetAddress *)key)->equals(((NEW_NODE *)element)->ipAddr);
85f1fea1
VK
271}
272
efe76251
VK
273/**
274 * Check potential new node from ARP cache or routing table
275 */
c75e9ee4 276static void CheckPotentialNode(Node *node, const InetAddress& ipAddr, UINT32 ifIndex, BYTE *macAddr = NULL)
5039dede 277{
c75e9ee4 278 TCHAR buffer[64];
e4a64da2 279
c75e9ee4
VK
280 DbgPrintf(6, _T("DiscoveryPoller(): checking potential node %s at %d"), ipAddr.toString(buffer), ifIndex);
281 if (ipAddr.isValid() && !ipAddr.isBroadcast() && !ipAddr.isLoopback() && !ipAddr.isMulticast() &&
89135050 282 (FindNodeByIP(node->getZoneId(), ipAddr) == NULL) && !IsClusterIP(node->getZoneId(), ipAddr) &&
c75e9ee4 283 (g_nodePollerQueue.find((void *)&ipAddr, PollerQueueElementComparator) == NULL))
5039dede 284 {
c75e9ee4 285 Interface *pInterface = node->findInterfaceByIndex(ifIndex);
c30c0c0f 286 if (pInterface != NULL)
5039dede 287 {
c30c0c0f
VK
288 const InetAddress& interfaceAddress = pInterface->getIpAddressList()->findSameSubnetAddress(ipAddr);
289 if (interfaceAddress.isValidUnicast())
5039dede 290 {
c30c0c0f
VK
291 DbgPrintf(6, _T("DiscoveryPoller(): interface found: %s [%d] addr=%s/%d ifIndex=%d"),
292 pInterface->getName(), pInterface->getId(), interfaceAddress.toString(buffer), interfaceAddress.getMaskBits(), pInterface->getIfIndex());
293 if (!ipAddr.isSubnetBroadcast(interfaceAddress.getMaskBits()))
294 {
295 NEW_NODE *pInfo;
296 TCHAR buffer[64];
297
298 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
299 pInfo->ipAddr = ipAddr;
300 pInfo->ipAddr.setMaskBits(interfaceAddress.getMaskBits());
301 pInfo->zoneId = node->getZoneId();
302 pInfo->ignoreFilter = FALSE;
303 if (macAddr == NULL)
304 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
305 else
306 memcpy(pInfo->bMacAddr, macAddr, MAC_ADDR_LENGTH);
307 DbgPrintf(5, _T("DiscoveryPoller(): new node queued: %s/%d"),
308 pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
19dbc8ef 309 g_nodePollerQueue.put(pInfo);
c30c0c0f
VK
310 }
311 else
312 {
313 DbgPrintf(6, _T("DiscoveryPoller(): potential node %s rejected - broadcast/multicast address"), ipAddr.toString(buffer));
314 }
315 }
316 else
317 {
318 DbgPrintf(6, _T("DiscoveryPoller(): interface object found but IP address not found"));
5039dede 319 }
e4a64da2
VK
320 }
321 else
322 {
323 DbgPrintf(6, _T("DiscoveryPoller(): interface object not found"));
5039dede
AK
324 }
325 }
e4a64da2
VK
326 else
327 {
c75e9ee4 328 DbgPrintf(6, _T("DiscoveryPoller(): potential node %s rejected"), ipAddr.toString(buffer));
e4a64da2
VK
329 }
330}
331
efe76251
VK
332/**
333 * Check host route
334 * Host will be added if it is directly connected
335 */
e4a64da2
VK
336static void CheckHostRoute(Node *node, ROUTE *route)
337{
338 TCHAR buffer[16];
339 Interface *iface;
340
341 DbgPrintf(6, _T("DiscoveryPoller(): checking host route %s at %d"), IpToStr(route->dwDestAddr, buffer), route->dwIfIndex);
c75e9ee4 342 iface = node->findInterfaceByIndex(route->dwIfIndex);
c30c0c0f 343 if ((iface != NULL) && iface->getIpAddressList()->findSameSubnetAddress(route->dwDestAddr).isValidUnicast())
e4a64da2
VK
344 {
345 CheckPotentialNode(node, route->dwDestAddr, route->dwIfIndex);
346 }
347 else
348 {
349 DbgPrintf(6, _T("DiscoveryPoller(): interface object not found for host route"));
350 }
5039dede
AK
351}
352
efe76251
VK
353/**
354 * Discovery poller
355 */
208d7427 356static void DiscoveryPoller(void *arg)
5039dede 357{
208d7427
VK
358// TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
359// ARP_CACHE *pArpCache;
360// ROUTING_TABLE *rt;
5039dede 361
208d7427
VK
362 PollerInfo *poller = (PollerInfo *)arg;
363 poller->startExecution();
5039dede 364
208d7427
VK
365 Node *pNode = (Node *)poller->getObject();
366 if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
367 {
7c521895 368 pNode->setDiscoveryPollTimeStamp();
208d7427
VK
369 delete poller;
370 return;
371 }
040c45fa 372
208d7427
VK
373 DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
374 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString(), (int)pNode->getZoneId());
040c45fa 375
208d7427
VK
376 // Retrieve and analize node's ARP cache
377 ARP_CACHE *pArpCache = pNode->getArpCache();
378 if (pArpCache != NULL)
040c45fa 379 {
208d7427
VK
380 for(UINT32 i = 0; i < pArpCache->dwNumEntries; i++)
381 if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6)) // Ignore broadcast addresses
382 CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
383 DestroyArpCache(pArpCache);
040c45fa 384 }
d5e8ff90 385
208d7427
VK
386 // Retrieve and analize node's routing table
387 DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
388 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
389 ROUTING_TABLE *rt = pNode->getRoutingTable();
390 if (rt != NULL)
391 {
392 for(int i = 0; i < rt->iNumEntries; i++)
393 {
394 CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
395 if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
396 CheckHostRoute(pNode, &rt->pRoutes[i]);
397 }
398 DestroyRoutingTable(rt);
399 }
d5e8ff90 400
208d7427
VK
401 DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
402 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
403 pNode->setDiscoveryPollTimeStamp();
404 delete poller;
d5e8ff90
VK
405}
406
efe76251
VK
407/**
408 * Check given address range with ICMP ping for new nodes
409 */
967893bb 410static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
5039dede 411{
967893bb 412 UINT32 dwAddr, dwFrom, dwTo;
5039dede
AK
413 TCHAR szIpAddr1[16], szIpAddr2[16];
414
415 if (nType == 0)
416 {
417 dwFrom = (dwAddr1 & dwAddr2) + 1;
418 dwTo = dwFrom | ~dwAddr2 - 1;
419 }
420 else
421 {
422 dwFrom = dwAddr1;
423 dwTo = dwAddr2;
424 }
425 DbgPrintf(4, _T("Starting active discovery check on range %s - %s"),
426 IpToStr(dwFrom, szIpAddr1), IpToStr(dwTo, szIpAddr2));
427
428 for(dwAddr = dwFrom; dwAddr <= dwTo; dwAddr++)
429 {
c59466d2 430 if (IcmpPing(htonl(dwAddr), 3, g_icmpPingTimeout, NULL, g_icmpPingSize) == ICMP_SUCCESS)
5039dede
AK
431 {
432 DbgPrintf(5, _T("Active discovery - node %s responds to ICMP ping"),
433 IpToStr(dwAddr, szIpAddr1));
89135050 434 if (FindNodeByIP(0, dwAddr) == NULL)
5039dede
AK
435 {
436 Subnet *pSubnet;
437
89135050 438 pSubnet = FindSubnetForNode(0, dwAddr);
5039dede
AK
439 if (pSubnet != NULL)
440 {
c75e9ee4
VK
441 if (!pSubnet->getIpAddress().equals(dwAddr) &&
442 !InetAddress(dwAddr).isSubnetBroadcast(pSubnet->getIpAddress().getMaskBits()))
5039dede
AK
443 {
444 NEW_NODE *pInfo;
445
446 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
c75e9ee4
VK
447 pInfo->ipAddr = dwAddr;
448 pInfo->ipAddr.setMaskBits(pSubnet->getIpAddress().getMaskBits());
dc25b21c 449 pInfo->zoneId = 0; /* FIXME: add correct zone ID */
5039dede 450 pInfo->ignoreFilter = FALSE;
baa5324c 451 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
19dbc8ef 452 g_nodePollerQueue.put(pInfo);
5039dede
AK
453 }
454 }
455 else
456 {
457 NEW_NODE *pInfo;
458
459 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
c75e9ee4 460 pInfo->ipAddr = dwAddr;
dc25b21c 461 pInfo->zoneId = 0; /* FIXME: add correct zone ID */
5039dede 462 pInfo->ignoreFilter = FALSE;
baa5324c 463 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
19dbc8ef 464 g_nodePollerQueue.put(pInfo);
5039dede
AK
465 }
466 }
467 }
468 }
469
470 DbgPrintf(4, _T("Finished active discovery check on range %s - %s"),
471 IpToStr(dwFrom, szIpAddr1), IpToStr(dwTo, szIpAddr2));
472}
473
efe76251
VK
474/**
475 * Active discovery poller thread
476 */
5039dede
AK
477static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
478{
208d7427 479 int nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
5039dede
AK
480
481 // Main loop
89135050 482 while(!IsShutdownInProgress())
5039dede 483 {
5039dede
AK
484 if (SleepAndCheckForShutdown(nInterval))
485 break;
486
c8076b19 487 if (!(g_flags & AF_ACTIVE_NETWORK_DISCOVERY))
5039dede
AK
488 continue;
489
208d7427 490 DB_RESULT hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
5039dede
AK
491 if (hResult != NULL)
492 {
208d7427
VK
493 int nRows = DBGetNumRows(hResult);
494 for(int i = 0; i < nRows; i++)
5039dede
AK
495 {
496 CheckRange(DBGetFieldLong(hResult, i, 0),
497 DBGetFieldIPAddr(hResult, i, 1),
498 DBGetFieldIPAddr(hResult, i, 2));
499 }
500 DBFreeResult(hResult);
501 }
502 }
5039dede
AK
503 return THREAD_OK;
504}
505
efe76251
VK
506/**
507 * Callback for queueing objects for polling
508 */
d5e19c61
VK
509static void QueueForPolling(NetObj *object, void *data)
510{
c42b4551 511 switch(object->getObjectClass())
d5e19c61
VK
512 {
513 case OBJECT_NODE:
514 {
515 Node *node = (Node *)object;
516 if (node->isReadyForConfigurationPoll())
517 {
d5e19c61 518 node->lockForConfigurationPoll();
c42b4551 519 DbgPrintf(6, _T("Node %d \"%s\" queued for configuration poll"), (int)node->getId(), node->getName());
208d7427 520 ThreadPoolExecute(g_pollerThreadPool, node, &Node::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, node));
d5e19c61 521 }
805171de
VK
522 if (node->isReadyForInstancePoll())
523 {
805171de
VK
524 node->lockForInstancePoll();
525 DbgPrintf(6, _T("Node %d \"%s\" queued for instance discovery poll"), (int)node->getId(), node->getName());
208d7427 526 ThreadPoolExecute(g_pollerThreadPool, node, &Node::instanceDiscoveryPoll, RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, node));
805171de 527 }
d5e19c61
VK
528 if (node->isReadyForStatusPoll())
529 {
d5e19c61 530 node->lockForStatusPoll();
c42b4551 531 DbgPrintf(6, _T("Node %d \"%s\" queued for status poll"), (int)node->getId(), node->getName());
208d7427 532 ThreadPoolExecute(g_pollerThreadPool, node, &Node::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, node));
d5e19c61
VK
533 }
534 if (node->isReadyForRoutePoll())
535 {
d5e19c61 536 node->lockForRoutePoll();
c42b4551 537 DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
208d7427 538 ThreadPoolExecute(g_pollerThreadPool, node, &Node::routingTablePoll, RegisterPoller(POLLER_TYPE_ROUTING_TABLE, node));
d5e19c61
VK
539 }
540 if (node->isReadyForDiscoveryPoll())
541 {
d5e19c61 542 node->lockForDiscoveryPoll();
c42b4551 543 DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
208d7427 544 ThreadPoolExecute(g_pollerThreadPool, DiscoveryPoller, RegisterPoller(POLLER_TYPE_DISCOVERY, node));
d5e19c61
VK
545 }
546 if (node->isReadyForTopologyPoll())
547 {
d5e19c61 548 node->lockForTopologyPoll();
c42b4551 549 DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
208d7427 550 ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
d5e19c61
VK
551 }
552 }
553 break;
554 case OBJECT_CONDITION:
555 {
556 Condition *cond = (Condition *)object;
9fddfb91 557 if (cond->isReadyForPoll())
d5e19c61 558 {
9fddfb91 559 cond->lockForPoll();
c42b4551 560 DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
208d7427 561 ThreadPoolExecute(g_pollerThreadPool, cond, &Condition::doPoll, RegisterPoller(POLLER_TYPE_CONDITION, cond));
d5e19c61
VK
562 }
563 }
564 break;
565 case OBJECT_CLUSTER:
566 {
567 Cluster *cluster = (Cluster *)object;
568 if (cluster->isReadyForStatusPoll())
569 {
d5e19c61 570 cluster->lockForStatusPoll();
c42b4551 571 DbgPrintf(6, _T("Cluster %d \"%s\" queued for status poll"), (int)cluster->getId(), cluster->getName());
208d7427 572 ThreadPoolExecute(g_pollerThreadPool, cluster, &Cluster::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, cluster));
d5e19c61
VK
573 }
574 }
575 break;
5a4dc0c6 576 case OBJECT_BUSINESSSERVICE:
d5e8ff90 577 {
5a4dc0c6 578 BusinessService *service = (BusinessService *)object;
d5e8ff90
VK
579 if (service->isReadyForPolling())
580 {
d5e8ff90 581 service->lockForPolling();
c42b4551 582 DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
208d7427 583 ThreadPoolExecute(g_pollerThreadPool, service, &BusinessService::poll, RegisterPoller(POLLER_TYPE_BUSINESS_SERVICE, service));
d5e8ff90
VK
584 }
585 }
586 break;
d5e19c61
VK
587 default:
588 break;
589 }
590}
591
8f3acc9b
VK
592/**
593 * Node and condition queuing thread
594 */
5039dede
AK
595THREAD_RESULT THREAD_CALL PollManager(void *pArg)
596{
208d7427 597 g_pollerThreadPool = ThreadPoolCreate(ConfigReadInt(_T("PollerThreadPoolBaseSize"), 10), ConfigReadInt(_T("PollerThreadPoolMaxSize"), 250), _T("POLLERS"));
d5e8ff90 598
5039dede 599 // Start active discovery poller
208d7427 600 ThreadCreate(ActiveDiscoveryPoller, 0, NULL);
5039dede 601
208d7427
VK
602 UINT32 watchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
603 int counter = 0;
5039dede 604
89135050 605 while(!IsShutdownInProgress())
5039dede
AK
606 {
607 if (SleepAndCheckForShutdown(5))
608 break; // Shutdown has arrived
208d7427 609 WatchdogNotify(watchdogId);
5039dede
AK
610
611 // Check for management node every 10 minutes
208d7427
VK
612 counter++;
613 if (counter % 120 == 0)
5039dede 614 {
208d7427 615 counter = 0;
5039dede
AK
616 CheckForMgmtNode();
617 }
618
619 // Walk through objects and queue them for status
620 // and/or configuration poll
d5e19c61 621 g_idxObjectById.forEach(QueueForPolling, NULL);
5039dede
AK
622 }
623
208d7427
VK
624 g_nodePollerQueue.clear();
625 g_nodePollerQueue.put(INVALID_POINTER_VALUE);
5039dede 626
208d7427 627 ThreadPoolDestroy(g_pollerThreadPool);
35f836fe 628 DbgPrintf(1, _T("PollManager: main thread terminated"));
5039dede
AK
629 return THREAD_OK;
630}
631
8f3acc9b
VK
632/**
633 * Reset discovery poller after configuration change
634 */
040c45fa 635void ResetDiscoveryPoller()
5039dede 636{
5039dede
AK
637 NEW_NODE *pInfo;
638
208d7427 639 // Clear node poller queue
19dbc8ef 640 while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
5039dede
AK
641 {
642 if (pInfo != INVALID_POINTER_VALUE)
643 free(pInfo);
644 }
645
646 // Reload discovery parameters
35f836fe
VK
647 g_dwDiscoveryPollingInterval = ConfigReadInt(_T("DiscoveryPollingInterval"), 900);
648 if (ConfigReadInt(_T("RunNetworkDiscovery"), 0))
c8076b19 649 g_flags |= AF_ENABLE_NETWORK_DISCOVERY;
5039dede 650 else
c8076b19 651 g_flags &= ~AF_ENABLE_NETWORK_DISCOVERY;
5039dede 652
35f836fe 653 if (ConfigReadInt(_T("ActiveNetworkDiscovery"), 0))
c8076b19 654 g_flags |= AF_ACTIVE_NETWORK_DISCOVERY;
5039dede 655 else
c8076b19 656 g_flags &= ~AF_ACTIVE_NETWORK_DISCOVERY;
e02953a4
VK
657
658 if (ConfigReadInt(_T("UseSNMPTrapsForDiscovery"), 0))
c8076b19 659 g_flags |= AF_SNMP_TRAP_DISCOVERY;
e02953a4 660 else
c8076b19 661 g_flags &= ~AF_SNMP_TRAP_DISCOVERY;
5039dede 662}