all pollers converted to single thread pool
[public/netxms.git] / src / server / core / node.cpp
index 95aa900..4fc2d71 100644 (file)
 
 #include "nxcore.h"
 
-/**
- * Externals
- */
-extern Queue g_statusPollQueue;
-extern Queue g_configPollQueue;
-extern Queue g_instancePollQueue;
-extern Queue g_topologyPollQueue;
-extern Queue g_routePollQueue;
-extern Queue g_discoveryPollQueue;
-
 /**
  * Node class default constructor
  */
@@ -1100,10 +1090,28 @@ void Node::calculateCompoundStatus(BOOL bForcedRecalc)
       PostEvent(dwEventCodes[m_iStatus], m_id, "d", iOldStatus);
 }
 
+/**
+ * Status poller entry point
+ */
+void Node::statusPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   statusPoll(NULL, 0, poller);
+
+   // Check if the node has to be deleted due to long downtime
+       time_t unreachableDeleteDays = (time_t)ConfigReadInt(_T("DeleteUnreachableNodesPeriod"), 0);
+       if ((unreachableDeleteDays > 0) && (m_downSince > 0) && 
+                (time(NULL) - m_downSince > unreachableDeleteDays * 24 * 3600))
+       {
+               deleteObject();
+       }
+   delete poller;
+}
+
 /**
  * Perform status poll on node
  */
-void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -1120,7 +1128,7 @@ void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
    time_t tNow, tExpire;
 
    Queue *pQueue = new Queue;     // Delayed event queue
-   SetPollerInfo(nPoller, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = pSession;
    sendPollerMsg(dwRqId, _T("Starting status poll for node %s\r\n"), m_name);
@@ -1146,7 +1154,7 @@ restart_agent_check:
                pTransport = createSnmpTransport();
                if (pTransport != NULL)
       {
-         SetPollerInfo(nPoller, _T("check SNMP"));
+         poller->setStatus(_T("check SNMP"));
          sendPollerMsg(dwRqId, _T("Checking SNMP agent connectivity\r\n"));
          const TCHAR *testOid = m_customAttributes.get(_T("snmp.testoid"));
          if (testOid == NULL)
@@ -1205,7 +1213,7 @@ restart_agent_check:
    if ((m_dwFlags & NF_IS_NATIVE_AGENT) && (!(m_dwFlags & NF_DISABLE_NXCP)) && m_ipAddress.isValidUnicast())
    {
       DbgPrintf(6, _T("StatusPoll(%s): checking agent"), m_name);
-      SetPollerInfo(nPoller, _T("check agent"));
+      poller->setStatus(_T("check agent"));
       sendPollerMsg(dwRqId, _T("Checking NetXMS agent connectivity\r\n"));
 
                UINT32 error, socketError;
@@ -1248,7 +1256,7 @@ restart_agent_check:
       DbgPrintf(7, _T("StatusPoll(%s): agent check finished"), m_name);
    }
 
-   SetPollerInfo(nPoller, _T("prepare polling list"));
+   poller->setStatus(_T("prepare polling list"));
 
    // Find service poller node object
    lockProperties();
@@ -1292,7 +1300,7 @@ restart_agent_check:
    UnlockChildList();
 
    // Poll interfaces and services
-   SetPollerInfo(nPoller, _T("child poll"));
+   poller->setStatus(_T("child poll"));
    DbgPrintf(7, _T("StatusPoll(%s): starting child object poll"), m_name);
        pCluster = getMyCluster();
        pTransport = createSnmpTransport();
@@ -1361,7 +1369,7 @@ restart_agent_check:
                   {
                      m_dwDynamicFlags |= NDF_UNREACHABLE;
                                m_downSince = time(NULL);
-                          SetPollerInfo(nPoller, _T("check network path"));
+                          poller->setStatus(_T("check network path"));
                                if (checkNetworkPath(dwRqId))
                                {
                              m_dwDynamicFlags |= NDF_NETWORK_PATH_PROBLEM;
@@ -1481,15 +1489,15 @@ restart_agent_check:
                if (g_pModuleList[i].pfStatusPollHook != NULL)
                {
                        DbgPrintf(5, _T("StatusPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                       g_pModuleList[i].pfStatusPollHook(this, pSession, dwRqId, nPoller);
+                       g_pModuleList[i].pfStatusPollHook(this, pSession, dwRqId, poller);
                }
        }
 
        // Execute hook script
-   SetPollerInfo(nPoller, _T("hook"));
+   poller->setStatus(_T("hook"));
        executeHookScript(_T("StatusPoll"));
 
-   SetPollerInfo(nPoller, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (pPollerNode != NULL)
       pPollerNode->decRefCount();
 
@@ -1540,7 +1548,10 @@ bool Node::checkNetworkPathElement(UINT32 nodeId, const TCHAR *nodeType, bool is
        {
                DbgPrintf(6, _T("Node::checkNetworkPathElement(%s [%d]): forced status poll on node %s [%d]"),
                                         m_name, m_id, node->getName(), node->getId());
-               node->statusPoll(NULL, 0, 0);
+      PollerInfo *poller = RegisterPoller(POLLER_TYPE_STATUS, node);
+      poller->startExecution();
+               node->statusPoll(NULL, 0, poller);
+      delete poller;
                if (node->isDown())
                {
                        DbgPrintf(5, _T("Node::checkNetworkPathElement(%s [%d]): %s %s [%d] is down"),
@@ -1636,7 +1647,10 @@ restart:
                {
                        DbgPrintf(6, _T("Node::checkNetworkPath(%s [%d]): forced status poll on node %s [%d]"),
                                  m_name, m_id, hop->object->getName(), hop->object->getId());
-                       ((Node *)hop->object)->statusPoll(NULL, 0, 0);
+         PollerInfo *poller = RegisterPoller(POLLER_TYPE_STATUS, (Node *)hop->object);
+         poller->startExecution();
+                       ((Node *)hop->object)->statusPoll(NULL, 0, poller);
+         delete poller;
                }
 
       if (((Node *)hop->object)->isDown())
@@ -1782,10 +1796,22 @@ void Node::updatePrimaryIpAddr()
        }
 }
 
+/**
+ * Entry point for configuration poller
+ */
+void Node::configurationPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   ObjectTransactionStart();
+   configurationPoll(NULL, 0, poller, 0);
+   ObjectTransactionEnd();
+   delete poller;
+}
+
 /**
  * Perform configuration poll on node
  */
-void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller, int maskBits)
+void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller, int maskBits)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -1798,7 +1824,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
    TCHAR szBuffer[4096];
    bool hasChanges = false;
 
-   SetPollerInfo(nPoller, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = pSession;
    sendPollerMsg(dwRqId, _T("Starting configuration poll for node %s\r\n"), m_name);
@@ -1833,7 +1859,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
    {
                updatePrimaryIpAddr();
 
-      SetPollerInfo(nPoller, _T("capability check"));
+      poller->setStatus(_T("capability check"));
       sendPollerMsg(dwRqId, _T("Checking node's capabilities...\r\n"));
 
                if (confPollAgent(dwRqId))
@@ -1870,7 +1896,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
       }
 
       // Retrieve interface list
-      SetPollerInfo(nPoller, _T("interface check"));
+      poller->setStatus(_T("interface check"));
       sendPollerMsg(dwRqId, _T("Capability check finished\r\n"));
 
                if (updateInterfaceConfiguration(dwRqId, maskBits))
@@ -1887,7 +1913,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                         isMyIP(dwAddr))
                {
                        sendPollerMsg(dwRqId, _T("Node name is an IP address and need to be resolved\r\n"));
-             SetPollerInfo(nPoller, _T("resolving name"));
+             poller->setStatus(_T("resolving name"));
                        if (resolveName(FALSE))
                        {
                                sendPollerMsg(dwRqId, POLLER_INFO _T("Node name resolved to %s\r\n"), m_name);
@@ -1903,7 +1929,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                        if (g_flags & AF_SYNC_NODE_NAMES_WITH_DNS)
                        {
                                sendPollerMsg(dwRqId, _T("Syncing node name with DNS\r\n"));
-                     SetPollerInfo(nPoller, _T("resolving name"));
+                     poller->setStatus(_T("resolving name"));
                                if (resolveName(TRUE))
                                {
                                        sendPollerMsg(dwRqId, POLLER_INFO _T("Node name resolved to %s\r\n"), m_name);
@@ -1922,7 +1948,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                // Get list of installed products
                if (m_dwFlags & NF_IS_NATIVE_AGENT)
                {
-                       SetPollerInfo(nPoller, _T("software check"));
+                       poller->setStatus(_T("software check"));
                        sendPollerMsg(dwRqId, _T("Reading list of installed software packages\r\n"));
 
                        Table *table;
@@ -1953,20 +1979,20 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                        if (g_pModuleList[i].pfConfPollHook != NULL)
                        {
                                DbgPrintf(5, _T("ConfigurationPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                               if (g_pModuleList[i].pfConfPollHook(this, pSession, dwRqId, nPoller))
+                               if (g_pModuleList[i].pfConfPollHook(this, pSession, dwRqId, poller))
                hasChanges = true;
                        }
                }
 
                // Execute hook script
-               SetPollerInfo(nPoller, _T("hook"));
+               poller->setStatus(_T("hook"));
                executeHookScript(_T("ConfigurationPoll"));
 
                m_dwDynamicFlags |= NDF_CONFIGURATION_POLL_PASSED;
    }
 
    // Finish configuration poll
-   SetPollerInfo(nPoller, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (dwRqId == 0)
       m_dwDynamicFlags &= ~NDF_QUEUED_FOR_CONFIG_POLL;
    m_dwDynamicFlags &= ~NDF_RECHECK_CAPABILITIES;
@@ -3010,10 +3036,22 @@ void Node::updateContainerMembership()
    delete containers;
 }
 
+/**
+ * Entry point for instance discovery poller
+ */
+void Node::instanceDiscoveryPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   ObjectTransactionStart();
+   instanceDiscoveryPoll(NULL, 0, poller);
+   ObjectTransactionEnd();
+   delete poller;
+}
+
 /**
  * Perform instance discovery poll on node
  */
-void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int pollerId)
+void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -3022,7 +3060,7 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
                return;
        }
 
-   SetPollerInfo(pollerId, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = session;
    sendPollerMsg(requestId, _T("Starting instance discovery poll for node %s\r\n"), m_name);
@@ -3031,11 +3069,11 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
    // Check if node is marked as unreachable
    if (!(m_dwDynamicFlags & NDF_UNREACHABLE))
    {
-               SetPollerInfo(pollerId, _T("instance discovery"));
+               poller->setStatus(_T("instance discovery"));
       doInstanceDiscovery(requestId);
 
                // Execute hook script
-               SetPollerInfo(pollerId, _T("hook"));
+               poller->setStatus(_T("hook"));
                executeHookScript(_T("InstancePoll"));
    }
    else
@@ -3047,7 +3085,7 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
    m_lastInstancePoll = time(NULL);
 
    // Finish instance discovery poll
-   SetPollerInfo(pollerId, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (requestId == 0)
       m_dwDynamicFlags &= ~NDF_QUEUED_FOR_INSTANCE_POLL;
    pollerUnlock();
@@ -4026,14 +4064,6 @@ UINT32 Node::getInternalItem(const TCHAR *param, size_t bufSize, TCHAR *buffer)
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgRawDataWriterQueueSize);
       }
-      else if (!_tcsicmp(param, _T("Server.AverageStatusPollerQueueSize")))
-      {
-         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgStatusPollerQueueSize);
-      }
-      else if (!_tcsicmp(param, _T("Server.AverageConfigurationPollerQueueSize")))
-      {
-         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgConfigPollerQueueSize);
-      }
       else if (!_tcsicmp(param, _T("Server.AverageDCIQueuingTime")))
       {
          _sntprintf(buffer, bufSize, _T("%u"), g_dwAvgDCIQueuingTime);
@@ -5032,6 +5062,16 @@ bool Node::getNextHop(const InetAddress& srcAddr, const InetAddress& destAddr, I
    return nextHopFound;
 }
 
+/**
+ * Entry point for routing table poller
+ */
+void Node::routingTablePoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   updateRoutingTable();
+   delete poller;
+}
+
 /**
  * Update cached routing table
  */
@@ -5132,48 +5172,6 @@ void Node::prepareForDeletion()
    m_dwDynamicFlags |= NDF_POLLING_DISABLED | NDF_DELETE_IN_PROGRESS;
    unlockProperties();
 
-       if (g_statusPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_STATUS_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from status poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_configPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_CONFIG_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from configuration poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_instancePollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_INSTANCE_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from instance discovery poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_discoveryPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_DISCOVERY_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from discovery poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_routePollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_ROUTE_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from routing table poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_topologyPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_TOPOLOGY_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from topology poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
    // Wait for all pending polls
        DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): waiting for outstanding polls to finish"), m_name, (int)m_id);
    while(1)
@@ -5559,10 +5557,20 @@ void Node::buildIPTopologyInternal(nxmap_ObjList &topology, int nDepth, UINT32 s
    }
 }
 
+/**
+ * Entry point for topoloy poller
+ */
+void Node::topologyPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   topologyPoll(NULL, 0, poller);
+   delete poller;
+}
+
 /**
  * Topology poller
  */
-void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -5811,12 +5819,12 @@ void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
                if (g_pModuleList[i].pfTopologyPollHook != NULL)
                {
                        DbgPrintf(5, _T("TopologyPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                       g_pModuleList[i].pfTopologyPollHook(this, pSession, dwRqId, nPoller);
+                       g_pModuleList[i].pfTopologyPollHook(this, pSession, dwRqId, poller);
                }
        }
 
        // Execute hook script
-   SetPollerInfo(nPoller, _T("hook"));
+   poller->setStatus(_T("hook"));
        executeHookScript(_T("TopologyPoll"));
 
    sendPollerMsg(dwRqId, _T("Finished topology poll for node %s\r\n"), m_name);