Reworked Node flags. Instance, status and configuration polls functionality moved...
[public/netxms.git] / src / server / core / poll.cpp
CommitLineData
91b3cba9 1/*
5039dede 2** NetXMS - Network Management System
a191c634 3** Copyright (C) 2003-2017 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: poll.cpp
20**
21**/
22
23#include "nxcore.h"
24
8f3acc9b 25/**
208d7427 26 * Node poller queue (polls new nodes)
8f3acc9b 27 */
208d7427
VK
28Queue g_nodePollerQueue;
29
30/**
31 * Thread pool for pollers
32 */
33ThreadPool *g_pollerThreadPool = NULL;
34
35/**
36 * Active pollers
37 */
38static HashMap<UINT64, PollerInfo> s_pollers(false);
e30c2442 39static Mutex s_pollerLock;
208d7427
VK
40
41/**
42 * Poller info destructor - will unregister poller and decrease ref count on object
43 */
44PollerInfo::~PollerInfo()
5039dede 45{
e30c2442 46 s_pollerLock.lock();
208d7427 47 s_pollers.remove(CAST_FROM_POINTER(this, UINT64));
e30c2442 48 s_pollerLock.unlock();
208d7427
VK
49 m_object->decRefCount();
50}
5039dede 51
8f3acc9b 52/**
208d7427 53 * Register active poller
8f3acc9b 54 */
208d7427
VK
55PollerInfo *RegisterPoller(PollerType type, NetObj *object)
56{
57 PollerInfo *p = new PollerInfo(type, object);
58 object->incRefCount();
e30c2442 59 s_pollerLock.lock();
208d7427 60 s_pollers.set(CAST_FROM_POINTER(p, UINT64), p);
e30c2442 61 s_pollerLock.unlock();
208d7427
VK
62 return p;
63}
5039dede 64
8f3acc9b 65/**
208d7427 66 * Show poller information on console
8f3acc9b 67 */
208d7427
VK
68static EnumerationCallbackResult ShowPollerInfo(const void *key, const void *object, void *arg)
69{
5eb5bae3 70 static const TCHAR *pollerType[] = { _T("STAT"), _T("CONF"), _T("INST"), _T("ROUT"), _T("DISC"), _T("BSVC"), _T("COND"), _T("TOPO") };
208d7427
VK
71
72 PollerInfo *p = (PollerInfo *)object;
73 NetObj *o = p->getObject();
74
75 TCHAR name[32];
76 nx_strncpy(name, o->getName(), 31);
91b3cba9 77 ConsolePrintf((CONSOLE_CTX)arg, _T("%s | %9d | %-30s | %s\n"), pollerType[p->getType()], o->getId(), name, p->getStatus());
208d7427
VK
78
79 return _CONTINUE;
80}
81
82/**
83 * Get poller diagnostic
84 */
85void ShowPollers(CONSOLE_CTX console)
86{
87 ConsoleWrite(console, _T("Type | Object ID | Object name | Status\n")
88 _T("-----+-----------+--------------------------------+--------------------------\n"));
e30c2442 89 s_pollerLock.lock();
208d7427 90 s_pollers.forEach(ShowPollerInfo, console);
e30c2442 91 s_pollerLock.unlock();
208d7427 92}
5039dede 93
9f3db00f
VK
94/**
95 * Helper for AddThreadPoolMonitoringParameters
96 */
97inline TCHAR *BuildParamName(const TCHAR *format, const TCHAR *pool, TCHAR *buffer)
98{
99 _sntprintf(buffer, 256, format, pool);
100 return buffer;
101}
102
8f3acc9b
VK
103/**
104 * Create management node object
105 */
c30c0c0f 106static void CreateManagementNode(const InetAddress& addr)
d749debc 107{
f7694811
VK
108 TCHAR buffer[256];
109
91b3cba9 110 Node *node = new Node(addr, 0, NC_IS_LOCAL_MGMT, 0, 0, 0, 0, 0);
e7450f3b 111 NetObjInsert(node, true, false);
9f3db00f 112 node->setName(GetLocalHostName(buffer, 256));
208d7427 113
9f3db00f 114 PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, node);
208d7427 115 p->startExecution();
9f3db00f 116 node->configurationPoll(NULL, 0, p, addr.getMaskBits());
208d7427
VK
117 delete p;
118
9f3db00f
VK
119 node->unhide();
120 g_dwMgmtNode = node->getId(); // Set local management node ID
121 PostEvent(EVENT_NODE_ADDED, node->getId(), NULL);
f7694811
VK
122
123 // Bind to the root of service tree
1f8be1f4
VK
124 g_pServiceRoot->addChild(node);
125 node->addParent(g_pServiceRoot);
d749debc
VK
126}
127
8f3acc9b 128/**
efe76251 129 * Callback to clear incorrectly set local management node flag
8f3acc9b 130 */
d5e19c61
VK
131static void CheckMgmtFlagCallback(NetObj *object, void *data)
132{
c42b4551 133 if ((g_dwMgmtNode != object->getId()) && ((Node *)object)->isLocalManagement())
d5e19c61
VK
134 {
135 ((Node *)object)->clearLocalMgmtFlag();
91b3cba9 136 DbgPrintf(2, _T("Incorrectly set flag NC_IS_LOCAL_MGMT cleared from node %s [%d]"),
c42b4551 137 object->getName(), object->getId());
d5e19c61
VK
138 }
139}
140
efe76251
VK
141/**
142 * Comparator to find management node object in existing nodes
143 */
d5e19c61
VK
144static bool LocalMgmtNodeComparator(NetObj *object, void *data)
145{
146 return ((Node *)object)->isLocalManagement();
147}
148
efe76251
VK
149/**
150 * Check if management server node presented in node list
151 */
040c45fa 152void CheckForMgmtNode()
5039dede 153{
98762401 154 InterfaceList *pIfList;
9f3db00f 155 Node *node;
5039dede
AK
156 int i;
157
158 pIfList = GetLocalInterfaceList();
159 if (pIfList != NULL)
160 {
a6312bd6 161 for(i = 0; i < pIfList->size(); i++)
fcec42a9 162 {
c30c0c0f
VK
163 InterfaceInfo *iface = pIfList->get(i);
164 if (iface->type == IFTYPE_SOFTWARE_LOOPBACK)
fcec42a9 165 continue;
9f3db00f 166 if ((node = FindNodeByIP(0, &iface->ipAddrList)) != NULL)
5039dede
AK
167 {
168 // Check management node flag
91b3cba9 169 if (!(node->getFlags() & NC_IS_LOCAL_MGMT))
5039dede 170 {
9f3db00f 171 node->setLocalMgmtFlag();
91b3cba9 172 DbgPrintf(1, _T("Local management node %s [%d] was not have NC_IS_LOCAL_MGMT flag set"), node->getName(), node->getId());
5039dede 173 }
9f3db00f 174 g_dwMgmtNode = node->getId(); // Set local management node ID
5039dede
AK
175 break;
176 }
fcec42a9 177 }
a6312bd6 178 if (i == pIfList->size()) // No such node
5039dede
AK
179 {
180 // Find interface with IP address
a6312bd6 181 for(i = 0; i < pIfList->size(); i++)
fcec42a9 182 {
c30c0c0f
VK
183 InterfaceInfo *iface = pIfList->get(i);
184 if ((iface->type == IFTYPE_SOFTWARE_LOOPBACK) || (iface->ipAddrList.size() == 0))
185 continue;
186
187 for(int j = 0; j < iface->ipAddrList.size(); j++)
5039dede 188 {
c30c0c0f
VK
189 const InetAddress& addr = iface->ipAddrList.get(j);
190 if (addr.isValidUnicast())
191 {
192 CreateManagementNode(addr);
1a5cb2b0 193 i = pIfList->size(); // stop walking interface list
c30c0c0f
VK
194 break;
195 }
5039dede 196 }
fcec42a9 197 }
5039dede 198 }
98762401 199 delete pIfList;
5039dede 200 }
4d0c32f3 201
d749debc
VK
202 if (g_dwMgmtNode != 0)
203 {
91b3cba9 204 // Check that other nodes does not have NC_IS_LOCAL_MGMT flag set
d5e19c61 205 g_idxNodeById.forEach(CheckMgmtFlagCallback, NULL);
d749debc
VK
206 }
207 else
208 {
209 // Management node cannot be found or created. This can happen
210 // if management node currently does not have IP addresses (for example,
211 // it's a Windows machine which is disconnected from the network).
91b3cba9 212 // In this case, try to find any node with NC_IS_LOCAL_MGMT flag, or create
d749debc 213 // new one without interfaces
d5e19c61
VK
214 NetObj *mgmtNode = g_idxNodeById.find(LocalMgmtNodeComparator, NULL);
215 if (mgmtNode != NULL)
d749debc 216 {
c42b4551 217 g_dwMgmtNode = mgmtNode->getId();
d749debc 218 }
d5e19c61 219 else
d749debc 220 {
c30c0c0f 221 CreateManagementNode(InetAddress());
d749debc
VK
222 }
223 }
5039dede
AK
224}
225
efe76251
VK
226/**
227 * Comparator for poller queue elements
228 */
85f1fea1
VK
229static bool PollerQueueElementComparator(void *key, void *element)
230{
c75e9ee4 231 return ((InetAddress *)key)->equals(((NEW_NODE *)element)->ipAddr);
85f1fea1
VK
232}
233
4875c4df
VK
234/**
235 * Check potential new node from sysog or SNMP trap
236 */
a191c634 237void CheckPotentialNode(const InetAddress& ipAddr, UINT32 zoneUIN)
4875c4df
VK
238{
239 TCHAR buffer[64];
a191c634 240 nxlog_debug(6, _T("CheckPotentialNode(): checking address %s in zone %d"), ipAddr.toString(buffer), zoneUIN);
4875c4df
VK
241 if (!ipAddr.isValid() || ipAddr.isBroadcast() || ipAddr.isLoopback() || ipAddr.isMulticast())
242 {
243 nxlog_debug(6, _T("CheckPotentialNode(): potential node %s rejected (IP address is not a valid unicast address)"), ipAddr.toString(buffer));
244 return;
245 }
246
a191c634 247 Node *curr = FindNodeByIP(zoneUIN, ipAddr);
4875c4df
VK
248 if (curr != NULL)
249 {
250 nxlog_debug(6, _T("CheckPotentialNode(): potential node %s rejected (IP address already known at node %s [%d])"),
251 ipAddr.toString(buffer), curr->getName(), curr->getId());
252 return;
253 }
254
a191c634 255 if (IsClusterIP(zoneUIN, ipAddr))
4875c4df
VK
256 {
257 nxlog_debug(6, _T("CheckPotentialNode(): potential node %s rejected (IP address is known as cluster resource address)"), ipAddr.toString(buffer));
258 return;
259 }
260
261 if (g_nodePollerQueue.find((void *)&ipAddr, PollerQueueElementComparator) != NULL)
262 {
263 nxlog_debug(6, _T("CheckPotentialNode(): potential node %s rejected (IP address already queued for polling)"), ipAddr.toString(buffer));
264 return;
265 }
266
a191c634 267 Subnet *subnet = FindSubnetForNode(zoneUIN, ipAddr);
4875c4df
VK
268 if (subnet != NULL)
269 {
270 if (!subnet->getIpAddress().equals(ipAddr) && !ipAddr.isSubnetBroadcast(subnet->getIpAddress().getMaskBits()))
271 {
272 NEW_NODE *pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
273 pInfo->ipAddr = ipAddr;
274 pInfo->ipAddr.setMaskBits(subnet->getIpAddress().getMaskBits());
a191c634 275 pInfo->zoneUIN = zoneUIN;
4875c4df
VK
276 pInfo->ignoreFilter = FALSE;
277 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
278 nxlog_debug(5, _T("CheckPotentialNode(): new node queued: %s/%d"), pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
279 g_nodePollerQueue.put(pInfo);
280 }
281 else
282 {
283 nxlog_debug(6, _T("CheckPotentialNode(): potential node %s rejected (IP address is a base or broadcast address of existing subnet)"), ipAddr.toString(buffer));
284 }
285 }
286 else
287 {
288 NEW_NODE *pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
289 pInfo->ipAddr = ipAddr;
a191c634 290 pInfo->zoneUIN = zoneUIN;
4875c4df
VK
291 pInfo->ignoreFilter = FALSE;
292 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
293 nxlog_debug(5, _T("CheckPotentialNode(): new node queued: %s/%d"), pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
294 g_nodePollerQueue.put(pInfo);
295 }
296}
297
efe76251
VK
298/**
299 * Check potential new node from ARP cache or routing table
300 */
c75e9ee4 301static void CheckPotentialNode(Node *node, const InetAddress& ipAddr, UINT32 ifIndex, BYTE *macAddr = NULL)
5039dede 302{
4875c4df
VK
303 TCHAR buffer[64];
304 nxlog_debug(6, _T("DiscoveryPoller(): checking potential node %s at %s:%d"), ipAddr.toString(buffer), node->getName(), ifIndex);
2ef51625
VK
305 if (!ipAddr.isValid() || ipAddr.isBroadcast() || ipAddr.isLoopback() || ipAddr.isMulticast())
306 {
307 nxlog_debug(6, _T("DiscoveryPoller(): potential node %s rejected (IP address is not a valid unicast address)"), ipAddr.toString(buffer));
308 return;
309 }
e4a64da2 310
a191c634 311 Node *curr = FindNodeByIP(node->getZoneUIN(), ipAddr);
2ef51625 312 if (curr != NULL)
5039dede 313 {
2ef51625
VK
314 nxlog_debug(6, _T("DiscoveryPoller(): potential node %s rejected (IP address already known at node %s [%d])"),
315 ipAddr.toString(buffer), curr->getName(), curr->getId());
316 return;
317 }
c30c0c0f 318
a191c634 319 if (IsClusterIP(node->getZoneUIN(), ipAddr))
2ef51625
VK
320 {
321 nxlog_debug(6, _T("DiscoveryPoller(): potential node %s rejected (IP address is known as cluster resource address)"), ipAddr.toString(buffer));
322 return;
323 }
324
325 if (g_nodePollerQueue.find((void *)&ipAddr, PollerQueueElementComparator) != NULL)
326 {
327 nxlog_debug(6, _T("DiscoveryPoller(): potential node %s rejected (IP address already queued for polling)"), ipAddr.toString(buffer));
328 return;
329 }
330
331 Interface *pInterface = node->findInterfaceByIndex(ifIndex);
332 if (pInterface != NULL)
333 {
334 const InetAddress& interfaceAddress = pInterface->getIpAddressList()->findSameSubnetAddress(ipAddr);
335 if (interfaceAddress.isValidUnicast())
336 {
337 nxlog_debug(6, _T("DiscoveryPoller(): interface found: %s [%d] addr=%s/%d ifIndex=%d"),
338 pInterface->getName(), pInterface->getId(), interfaceAddress.toString(buffer), interfaceAddress.getMaskBits(), pInterface->getIfIndex());
339 if (!ipAddr.isSubnetBroadcast(interfaceAddress.getMaskBits()))
340 {
341 NEW_NODE *pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
342 pInfo->ipAddr = ipAddr;
343 pInfo->ipAddr.setMaskBits(interfaceAddress.getMaskBits());
a191c634 344 pInfo->zoneUIN = node->getZoneUIN();
2ef51625
VK
345 pInfo->ignoreFilter = FALSE;
346 if (macAddr == NULL)
347 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
348 else
349 memcpy(pInfo->bMacAddr, macAddr, MAC_ADDR_LENGTH);
350 DbgPrintf(5, _T("DiscoveryPoller(): new node queued: %s/%d"),
351 pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
352 g_nodePollerQueue.put(pInfo);
c30c0c0f
VK
353 }
354 else
355 {
2ef51625 356 nxlog_debug(6, _T("DiscoveryPoller(): potential node %s rejected - broadcast/multicast address"), ipAddr.toString(buffer));
5039dede 357 }
2ef51625
VK
358 }
359 else
360 {
361 nxlog_debug(6, _T("DiscoveryPoller(): interface object found but IP address not found"));
362 }
363 }
364 else
365 {
366 nxlog_debug(6, _T("DiscoveryPoller(): interface object not found"));
5039dede 367 }
e4a64da2
VK
368}
369
efe76251
VK
370/**
371 * Check host route
372 * Host will be added if it is directly connected
373 */
e4a64da2
VK
374static void CheckHostRoute(Node *node, ROUTE *route)
375{
376 TCHAR buffer[16];
377 Interface *iface;
378
379 DbgPrintf(6, _T("DiscoveryPoller(): checking host route %s at %d"), IpToStr(route->dwDestAddr, buffer), route->dwIfIndex);
c75e9ee4 380 iface = node->findInterfaceByIndex(route->dwIfIndex);
c30c0c0f 381 if ((iface != NULL) && iface->getIpAddressList()->findSameSubnetAddress(route->dwDestAddr).isValidUnicast())
e4a64da2
VK
382 {
383 CheckPotentialNode(node, route->dwDestAddr, route->dwIfIndex);
384 }
385 else
386 {
387 DbgPrintf(6, _T("DiscoveryPoller(): interface object not found for host route"));
388 }
5039dede
AK
389}
390
efe76251
VK
391/**
392 * Discovery poller
393 */
208d7427 394static void DiscoveryPoller(void *arg)
5039dede 395{
208d7427
VK
396 PollerInfo *poller = (PollerInfo *)arg;
397 poller->startExecution();
5039dede 398
88dc9091
VK
399 if (IsShutdownInProgress())
400 {
401 delete poller;
402 return;
403 }
404
9f3db00f 405 Node *node = (Node *)poller->getObject();
91b3cba9 406 if (node->getRuntimeFlags() & DCDF_DELETE_IN_PROGRESS)
208d7427 407 {
9f3db00f 408 node->setDiscoveryPollTimeStamp();
208d7427
VK
409 delete poller;
410 return;
411 }
040c45fa 412
208d7427 413 DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
a191c634 414 node->getName(), (const TCHAR *)node->getIpAddress().toString(), (int)node->getZoneUIN());
040c45fa 415
c77bdd19 416 // Retrieve and analyze node's ARP cache
9f3db00f 417 ARP_CACHE *pArpCache = node->getArpCache();
208d7427 418 if (pArpCache != NULL)
040c45fa 419 {
208d7427
VK
420 for(UINT32 i = 0; i < pArpCache->dwNumEntries; i++)
421 if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6)) // Ignore broadcast addresses
9f3db00f 422 CheckPotentialNode(node, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
208d7427 423 DestroyArpCache(pArpCache);
040c45fa 424 }
d5e8ff90 425
c77bdd19 426 // Retrieve and analyze node's routing table
208d7427 427 DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
9f3db00f
VK
428 node->getName(), (const TCHAR *)node->getIpAddress().toString());
429 ROUTING_TABLE *rt = node->getRoutingTable();
208d7427
VK
430 if (rt != NULL)
431 {
432 for(int i = 0; i < rt->iNumEntries; i++)
433 {
9f3db00f 434 CheckPotentialNode(node, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
208d7427 435 if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
9f3db00f 436 CheckHostRoute(node, &rt->pRoutes[i]);
208d7427
VK
437 }
438 DestroyRoutingTable(rt);
439 }
d5e8ff90 440
208d7427 441 DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
9f3db00f
VK
442 node->getName(), (const TCHAR *)node->getIpAddress().toString());
443 node->setDiscoveryPollTimeStamp();
208d7427 444 delete poller;
d5e8ff90
VK
445}
446
efe76251
VK
447/**
448 * Check given address range with ICMP ping for new nodes
449 */
ba889094 450static void CheckRange(const InetAddressListElement& range)
5039dede 451{
ba889094 452 if (range.getBaseAddress().getFamily() != AF_INET)
5039dede 453 {
ba889094
VK
454 DbgPrintf(4, _T("Active discovery on range %s skipped - only IPv4 ranges supported"), (const TCHAR *)range.toString());
455 return;
456 }
457
458 UINT32 from = range.getBaseAddress().getAddressV4();
459 UINT32 to;
460 if (range.getType() == InetAddressListElement_SUBNET)
461 {
462 from++;
463 to = range.getBaseAddress().getSubnetBroadcast().getAddressV4() - 1;
5039dede
AK
464 }
465 else
466 {
ba889094 467 to = range.getEndAddress().getAddressV4();
5039dede 468 }
5039dede 469
fbcf3182
VK
470 TCHAR ipAddr1[16], ipAddr2[16];
471 DbgPrintf(4, _T("Starting active discovery check on range %s - %s"), IpToStr(from, ipAddr1), IpToStr(to, ipAddr2));
472
88dc9091 473 for(UINT32 curr = from; (curr <= to) && !IsShutdownInProgress(); curr++)
5039dede 474 {
fbcf3182
VK
475 InetAddress addr = InetAddress(curr);
476 if (IcmpPing(addr, 3, g_icmpPingTimeout, NULL, g_icmpPingSize) == ICMP_SUCCESS)
5039dede 477 {
fbcf3182
VK
478 DbgPrintf(5, _T("Active discovery - node %s responds to ICMP ping"), addr.toString(ipAddr1));
479 if (FindNodeByIP(0, addr) == NULL)
5039dede
AK
480 {
481 Subnet *pSubnet;
482
fbcf3182 483 pSubnet = FindSubnetForNode(0, addr);
5039dede
AK
484 if (pSubnet != NULL)
485 {
91b3cba9 486 if (!pSubnet->getIpAddress().equals(addr) &&
fbcf3182 487 !addr.isSubnetBroadcast(pSubnet->getIpAddress().getMaskBits()))
5039dede
AK
488 {
489 NEW_NODE *pInfo;
490
491 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
fbcf3182 492 pInfo->ipAddr = addr;
c75e9ee4 493 pInfo->ipAddr.setMaskBits(pSubnet->getIpAddress().getMaskBits());
a191c634 494 pInfo->zoneUIN = 0; /* FIXME: add correct zone ID */
5039dede 495 pInfo->ignoreFilter = FALSE;
baa5324c 496 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
19dbc8ef 497 g_nodePollerQueue.put(pInfo);
5039dede
AK
498 }
499 }
500 else
501 {
502 NEW_NODE *pInfo;
503
504 pInfo = (NEW_NODE *)malloc(sizeof(NEW_NODE));
fbcf3182 505 pInfo->ipAddr = addr;
a191c634 506 pInfo->zoneUIN = 0; /* FIXME: add correct zone ID */
5039dede 507 pInfo->ignoreFilter = FALSE;
baa5324c 508 memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
19dbc8ef 509 g_nodePollerQueue.put(pInfo);
5039dede
AK
510 }
511 }
512 }
513 }
514
fbcf3182 515 DbgPrintf(4, _T("Finished active discovery check on range %s - %s"), IpToStr(from, ipAddr1), IpToStr(to, ipAddr2));
5039dede
AK
516}
517
efe76251
VK
518/**
519 * Active discovery poller thread
520 */
5039dede
AK
521static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
522{
208d7427 523 int nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
5039dede
AK
524
525 // Main loop
89135050 526 while(!IsShutdownInProgress())
5039dede 527 {
5039dede
AK
528 if (SleepAndCheckForShutdown(nInterval))
529 break;
530
c8076b19 531 if (!(g_flags & AF_ACTIVE_NETWORK_DISCOVERY))
5039dede
AK
532 continue;
533
9bd1bace
VK
534 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
535 DB_RESULT hResult = DBSelect(hdb, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
536 DBConnectionPoolReleaseConnection(hdb);
5039dede
AK
537 if (hResult != NULL)
538 {
208d7427 539 int nRows = DBGetNumRows(hResult);
88dc9091 540 for(int i = 0; (i < nRows) && !IsShutdownInProgress(); i++)
5039dede 541 {
ba889094 542 CheckRange(InetAddressListElement(hResult, i));
5039dede
AK
543 }
544 DBFreeResult(hResult);
545 }
546 }
5039dede
AK
547 return THREAD_OK;
548}
549
efe76251
VK
550/**
551 * Callback for queueing objects for polling
552 */
d5e19c61
VK
553static void QueueForPolling(NetObj *object, void *data)
554{
83b1c107 555 WatchdogNotify(*((UINT32 *)data));
88dc9091
VK
556 if (IsShutdownInProgress())
557 return;
91b3cba9 558 if(object->isDataCollectionTarget()) //common check for node, cluster and sensor
559 {
560 DataCollectionTarget *target = (DataCollectionTarget *)object;
561 if (target->isReadyForStatusPoll())
562 {
563 target->lockForStatusPoll();
564 DbgPrintf(6, _T("Data Collection Target %d \"%s\" queued for status poll"), (int)target->getId(), target->getName());
565 ThreadPoolExecute(g_pollerThreadPool, target, &DataCollectionTarget::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, target));
566 }
567 if (target->isReadyForConfigurationPoll())
568 {
569 target->lockForConfigurationPoll();
570 DbgPrintf(6, _T("Data Collection Target %d \"%s\" queued for configuration poll"), (int)target->getId(), target->getName());
571 ThreadPoolExecute(g_pollerThreadPool, target, &DataCollectionTarget::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, target));
572 }
573 if (target->isReadyForInstancePoll())
574 {
575 target->lockForInstancePoll();
576 DbgPrintf(6, _T("Data Collection Target %d \"%s\" queued for instance discovery poll"), (int)target->getId(), target->getName());
577 ThreadPoolExecute(g_pollerThreadPool, target, &DataCollectionTarget::instanceDiscoveryPoll, RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, target));
578 }
579 }
88dc9091 580
c42b4551 581 switch(object->getObjectClass())
d5e19c61
VK
582 {
583 case OBJECT_NODE:
584 {
585 Node *node = (Node *)object;
d5e19c61
VK
586 if (node->isReadyForRoutePoll())
587 {
d5e19c61 588 node->lockForRoutePoll();
c42b4551 589 DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
208d7427 590 ThreadPoolExecute(g_pollerThreadPool, node, &Node::routingTablePoll, RegisterPoller(POLLER_TYPE_ROUTING_TABLE, node));
d5e19c61
VK
591 }
592 if (node->isReadyForDiscoveryPoll())
593 {
d5e19c61 594 node->lockForDiscoveryPoll();
c42b4551 595 DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
208d7427 596 ThreadPoolExecute(g_pollerThreadPool, DiscoveryPoller, RegisterPoller(POLLER_TYPE_DISCOVERY, node));
d5e19c61
VK
597 }
598 if (node->isReadyForTopologyPoll())
599 {
d5e19c61 600 node->lockForTopologyPoll();
c42b4551 601 DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
208d7427 602 ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
d5e19c61
VK
603 }
604 }
605 break;
606 case OBJECT_CONDITION:
607 {
9f06d008 608 ConditionObject *cond = (ConditionObject *)object;
9fddfb91 609 if (cond->isReadyForPoll())
d5e19c61 610 {
9fddfb91 611 cond->lockForPoll();
c42b4551 612 DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
9f06d008 613 ThreadPoolExecute(g_pollerThreadPool, cond, &ConditionObject::doPoll, RegisterPoller(POLLER_TYPE_CONDITION, cond));
d5e19c61
VK
614 }
615 }
616 break;
5a4dc0c6 617 case OBJECT_BUSINESSSERVICE:
d5e8ff90 618 {
5a4dc0c6 619 BusinessService *service = (BusinessService *)object;
d5e8ff90
VK
620 if (service->isReadyForPolling())
621 {
d5e8ff90 622 service->lockForPolling();
c42b4551 623 DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
208d7427 624 ThreadPoolExecute(g_pollerThreadPool, service, &BusinessService::poll, RegisterPoller(POLLER_TYPE_BUSINESS_SERVICE, service));
d5e8ff90
VK
625 }
626 }
627 break;
d5e19c61
VK
628 default:
629 break;
630 }
631}
632
8f3acc9b
VK
633/**
634 * Node and condition queuing thread
635 */
5039dede
AK
636THREAD_RESULT THREAD_CALL PollManager(void *pArg)
637{
208d7427 638 g_pollerThreadPool = ThreadPoolCreate(ConfigReadInt(_T("PollerThreadPoolBaseSize"), 10), ConfigReadInt(_T("PollerThreadPoolMaxSize"), 250), _T("POLLERS"));
d5e8ff90 639
5039dede 640 // Start active discovery poller
208d7427 641 ThreadCreate(ActiveDiscoveryPoller, 0, NULL);
5039dede 642
83b1c107 643 UINT32 watchdogId = WatchdogAddThread(_T("Poll Manager"), 5);
208d7427 644 int counter = 0;
5039dede 645
83b1c107
VK
646 WatchdogStartSleep(watchdogId);
647 while(!SleepAndCheckForShutdown(5))
5039dede 648 {
208d7427 649 WatchdogNotify(watchdogId);
5039dede
AK
650
651 // Check for management node every 10 minutes
208d7427
VK
652 counter++;
653 if (counter % 120 == 0)
5039dede 654 {
208d7427 655 counter = 0;
5039dede
AK
656 CheckForMgmtNode();
657 }
658
91b3cba9 659 // Walk through objects and queue them for status
5039dede 660 // and/or configuration poll
83b1c107
VK
661 g_idxObjectById.forEach(QueueForPolling, &watchdogId);
662 WatchdogStartSleep(watchdogId);
5039dede
AK
663 }
664
208d7427
VK
665 g_nodePollerQueue.clear();
666 g_nodePollerQueue.put(INVALID_POINTER_VALUE);
5039dede 667
208d7427 668 ThreadPoolDestroy(g_pollerThreadPool);
35f836fe 669 DbgPrintf(1, _T("PollManager: main thread terminated"));
5039dede
AK
670 return THREAD_OK;
671}
672
8f3acc9b
VK
673/**
674 * Reset discovery poller after configuration change
675 */
040c45fa 676void ResetDiscoveryPoller()
5039dede 677{
5039dede
AK
678 NEW_NODE *pInfo;
679
208d7427 680 // Clear node poller queue
19dbc8ef 681 while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
5039dede
AK
682 {
683 if (pInfo != INVALID_POINTER_VALUE)
684 free(pInfo);
685 }
686
687 // Reload discovery parameters
35f836fe
VK
688 g_dwDiscoveryPollingInterval = ConfigReadInt(_T("DiscoveryPollingInterval"), 900);
689 if (ConfigReadInt(_T("RunNetworkDiscovery"), 0))
c8076b19 690 g_flags |= AF_ENABLE_NETWORK_DISCOVERY;
5039dede 691 else
c8076b19 692 g_flags &= ~AF_ENABLE_NETWORK_DISCOVERY;
5039dede 693
35f836fe 694 if (ConfigReadInt(_T("ActiveNetworkDiscovery"), 0))
c8076b19 695 g_flags |= AF_ACTIVE_NETWORK_DISCOVERY;
5039dede 696 else
c8076b19 697 g_flags &= ~AF_ACTIVE_NETWORK_DISCOVERY;
e02953a4
VK
698
699 if (ConfigReadInt(_T("UseSNMPTrapsForDiscovery"), 0))
c8076b19 700 g_flags |= AF_SNMP_TRAP_DISCOVERY;
e02953a4 701 else
c8076b19 702 g_flags &= ~AF_SNMP_TRAP_DISCOVERY;
727b90ff
VK
703
704 if (ConfigReadInt(_T("UseSyslogForDiscovery"), 0))
705 g_flags |= AF_SYSLOG_DISCOVERY;
706 else
707 g_flags &= ~AF_SYSLOG_DISCOVERY;
5039dede 708}