all pollers converted to single thread pool
authorVictor Kirhenshtein <victor@netxms.org>
Tue, 16 Jun 2015 19:58:45 +0000 (22:58 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Tue, 16 Jun 2015 19:58:45 +0000 (22:58 +0300)
15 files changed:
include/nxcpapi.h
src/java/client/netxms-client/src/main/java/org/netxms/client/datacollection/GraphItemStyle.java
src/server/core/bizservice.cpp
src/server/core/cluster.cpp
src/server/core/condition.cpp
src/server/core/datacoll.cpp
src/server/core/main.cpp
src/server/core/node.cpp
src/server/core/np.cpp
src/server/core/poll.cpp
src/server/core/session.cpp
src/server/include/nms_core.h
src/server/include/nms_dcoll.h
src/server/include/nms_objects.h
src/server/include/nxmodule.h

index 5b7b04a..24d4c29 100644 (file)
@@ -402,6 +402,40 @@ public:
    virtual size_t compressBufferSize(size_t dataSize);
 };
 
+#if 0
+/**
+ * NXCP message consumer interface
+ */
+class LIBNETXMS_EXPORTABLE MessageConsumer
+{
+public:
+   virtual SOCKET getSocket() = 0;
+   virtual void processMessage(NXCPMessage *msg) = 0;
+};
+
+/**
+ * Socket receiver - manages receiving NXCP messages from multiple sockets
+ */
+class LIBNETXMS_EXPORTABLE SocketReceiver
+{
+private:
+   THREAD m_thread;
+   HashMap<SOCKET, MessageConsumer> *m_consumers;
+
+   static int m_maxSocketsPerThread;
+   static ObjectArray<SocketReceiver> *m_receivers;
+
+public:
+   static void start();
+   static void shutdown();
+
+   static void addConsumer(MessageConsumer *mc);
+   static void removeConsumer(MessageConsumer *mc);
+
+   static String getDiagInfo();
+};
+#endif
+
 #else    /* __cplusplus */
 
 typedef void NXCPMessage;
index 86f4494..bb090c6 100644 (file)
@@ -194,7 +194,17 @@ void BusinessService::lockForPolling()
 /**
  * A callback for poller threads
  */
-void BusinessService::poll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void BusinessService::poll(PollerInfo *poller)
+{
+   poller->startExecution();
+   poll(NULL, 0, poller);
+   delete poller;
+}
+
+/**
+ * Status poll
+ */
+void BusinessService::poll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        DbgPrintf(5, _T("Started polling of business service %s [%d]"), m_name, (int)m_id);
        m_lastPollTime = time(NULL);
index cab9db6..1e71efe 100644 (file)
@@ -464,10 +464,20 @@ bool Cluster::isVirtualAddr(const InetAddress& addr)
        return bRet;
 }
 
+/**
+ * Entry point for status poller thread
+ */
+void Cluster::statusPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   statusPoll(NULL, 0, poller);
+   delete poller;
+}
+
 /**
  * Status poll
  */
-void Cluster::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void Cluster::statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        UINT32 i, j, k, dwPollListSize;
        InterfaceList *pIfList;
@@ -492,7 +502,7 @@ void Cluster::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
        DbgPrintf(6, _T("CLUSTER STATUS POLL [%s]: Polling member nodes"), m_name);
        for(i = 0, bAllDown = TRUE; i < dwPollListSize; i++)
        {
-               ((Node *)ppPollList[i])->statusPoll(pSession, dwRqId, nPoller);
+               ((Node *)ppPollList[i])->statusPoll(pSession, dwRqId, poller);
                if (!((Node *)ppPollList[i])->isDown())
                        bAllDown = FALSE;
        }
index 29c0a5d..55b71a8 100644 (file)
@@ -364,20 +364,21 @@ UINT32 Condition::modifyFromMessageInternal(NXCPMessage *pRequest)
  */
 void Condition::lockForPoll()
 {
-   incRefCount();
    m_queuedForPolling = TRUE;
 }
 
 /**
- * This method should be called by poller thread when poll finish
+ * Poller entry point
  */
-void Condition::endPoll()
+void Condition::doPoll(PollerInfo *poller)
 {
+   poller->startExecution();
+   check();
    lockProperties();
    m_queuedForPolling = FALSE;
    m_lastPoll = time(NULL);
    unlockProperties();
-   decRefCount();
+   delete poller;
 }
 
 /**
index b086b10..df27d00 100644 (file)
@@ -30,8 +30,6 @@
 /**
  * Externals
  */
-extern Queue g_statusPollQueue;
-extern Queue g_configPollQueue;
 extern Queue g_syslogProcessingQueue;
 extern Queue g_syslogWriteQueue;
 
@@ -43,8 +41,6 @@ double g_dAvgDBWriterQueueSize = 0;
 double g_dAvgIDataWriterQueueSize = 0;
 double g_dAvgRawDataWriterQueueSize = 0;
 double g_dAvgDBAndIDataWriterQueueSize = 0;
-double g_dAvgStatusPollerQueueSize = 0;
-double g_dAvgConfigPollerQueueSize = 0;
 double g_dAvgSyslogProcessingQueueSize = 0;
 double g_dAvgSyslogWriterQueueSize = 0;
 UINT32 g_dwAvgDCIQueuingTime = 0;
@@ -360,17 +356,14 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
    UINT32 i, currPos = 0;
    UINT32 pollerQS[12], dbWriterQS[12];
    UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
-   UINT32 statusPollerQS[12], configPollerQS[12];
    UINT32 syslogProcessingQS[12], syslogWriterQS[12];
-   double sum1, sum2, sum3, sum4, sum5, sum6, sum7, sum8, sum9;
+   double sum1, sum2, sum3, sum4, sum5, sum8, sum9;
 
    memset(pollerQS, 0, sizeof(UINT32) * 12);
    memset(dbWriterQS, 0, sizeof(UINT32) * 12);
    memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
-   memset(statusPollerQS, 0, sizeof(UINT32) * 12);
-   memset(configPollerQS, 0, sizeof(UINT32) * 12);
    memset(syslogProcessingQS, 0, sizeof(UINT32) * 12);
    memset(syslogWriterQS, 0, sizeof(UINT32) * 12);
    g_dAvgPollerQueueSize = 0;
@@ -378,8 +371,6 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
    g_dAvgIDataWriterQueueSize = 0;
    g_dAvgRawDataWriterQueueSize = 0;
    g_dAvgDBAndIDataWriterQueueSize = 0;
-   g_dAvgStatusPollerQueueSize = 0;
-   g_dAvgConfigPollerQueueSize = 0;
    g_dAvgSyslogProcessingQueueSize = 0;
    g_dAvgSyslogWriterQueueSize = 0;
    while(!IsShutdownInProgress())
@@ -393,8 +384,6 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
       iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
       rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
       dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size();
-      statusPollerQS[currPos] = g_statusPollQueue.size();
-      configPollerQS[currPos] = g_configPollQueue.size();
       syslogProcessingQS[currPos] = g_syslogProcessingQueue.size();
       syslogWriterQS[currPos] = g_syslogWriteQueue.size();
       currPos++;
@@ -402,15 +391,13 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
          currPos = 0;
 
       // Calculate new averages
-      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum6 = 0, sum7 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
+      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
       {
          sum1 += pollerQS[i];
          sum2 += dbWriterQS[i];
          sum3 += iDataWriterQS[i];
          sum4 += rawDataWriterQS[i];
          sum5 += dbAndIDataWriterQS[i];
-         sum6 += statusPollerQS[i];
-         sum7 += configPollerQS[i];
          sum8 += syslogProcessingQS[i];
          sum9 += syslogWriterQS[i];
       }
@@ -419,8 +406,6 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
       g_dAvgIDataWriterQueueSize = sum3 / 12;
       g_dAvgRawDataWriterQueueSize = sum4 / 12;
       g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
-      g_dAvgStatusPollerQueueSize = sum6 / 12;
-      g_dAvgConfigPollerQueueSize = sum7 / 12;
       g_dAvgSyslogProcessingQueueSize = sum8 / 12;
       g_dAvgSyslogWriterQueueSize = sum9 / 12;
    }
index e5364c1..1e08894 100644 (file)
@@ -66,18 +66,12 @@ extern const TCHAR *g_szMessages[];
 /**
  * Externals
  */
-extern Queue g_statusPollQueue;
-extern Queue g_configPollQueue;
-extern Queue g_instancePollQueue;
-extern Queue g_topologyPollQueue;
-extern Queue g_routePollQueue;
-extern Queue g_discoveryPollQueue;
 extern Queue g_nodePollerQueue;
-extern Queue g_conditionPollerQueue;
 extern Queue g_dataCollectionQueue;
 extern Queue g_dciCacheLoaderQueue;
 extern Queue g_syslogProcessingQueue;
 extern Queue g_syslogWriteQueue;
+extern ThreadPool *g_pollerThreadPool;
 
 void InitClientListeners();
 void InitMobileDeviceListeners();
@@ -1295,13 +1289,16 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
                   switch(pollType)
                   {
                      case 1:
-                        node->configurationPoll(NULL, 0, -1, 0);
+                                           node->lockForConfigurationPoll();
+                        ThreadPoolExecute(g_pollerThreadPool, node, &Node::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, node));
                         break;
                      case 2:
-                        node->statusPoll(NULL, 0, -1);
+                                               node->lockForStatusPoll();
+                        ThreadPoolExecute(g_pollerThreadPool, node, &Node::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, node));
                         break;
                      case 3:
-                        node->topologyPoll(NULL, 0, -1);
+                                               node->lockForTopologyPoll();
+                        ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
                         break;
                   }
                                   }
@@ -1557,24 +1554,17 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
                }
                else if (IsCommand(_T("POLLERS"), szBuffer, 1))
                {
-                       ShowPollerState(pCtx);
+                       ShowPollers(pCtx);
                }
                else if (IsCommand(_T("QUEUES"), szBuffer, 1))
                {
-                       ShowQueueStats(pCtx, &g_conditionPollerQueue, _T("Condition poller"));
-                       ShowQueueStats(pCtx, &g_configPollQueue, _T("Configuration poller"));
-                       ShowQueueStats(pCtx, &g_instancePollQueue, _T("Instance discovery poller"));
-                       ShowQueueStats(pCtx, &g_topologyPollQueue, _T("Topology poller"));
                        ShowQueueStats(pCtx, &g_dataCollectionQueue, _T("Data collector"));
                        ShowQueueStats(pCtx, &g_dciCacheLoaderQueue, _T("DCI cache loader"));
                        ShowQueueStats(pCtx, g_dbWriterQueue, _T("Database writer"));
                        ShowQueueStats(pCtx, g_dciDataWriterQueue, _T("Database writer (IData)"));
                        ShowQueueStats(pCtx, g_dciRawDataWriterQueue, _T("Database writer (raw DCI values)"));
                        ShowQueueStats(pCtx, g_pEventQueue, _T("Event processor"));
-                       ShowQueueStats(pCtx, &g_discoveryPollQueue, _T("Network discovery poller"));
                        ShowQueueStats(pCtx, &g_nodePollerQueue, _T("Node poller"));
-                       ShowQueueStats(pCtx, &g_routePollQueue, _T("Routing table poller"));
-                       ShowQueueStats(pCtx, &g_statusPollQueue, _T("Status poller"));
                        ShowQueueStats(pCtx, &g_syslogProcessingQueue, _T("Syslog processing"));
                        ShowQueueStats(pCtx, &g_syslogWriteQueue, _T("Syslog writer"));
                        ConsolePrintf(pCtx, _T("\n"));
index 95aa900..4fc2d71 100644 (file)
 
 #include "nxcore.h"
 
-/**
- * Externals
- */
-extern Queue g_statusPollQueue;
-extern Queue g_configPollQueue;
-extern Queue g_instancePollQueue;
-extern Queue g_topologyPollQueue;
-extern Queue g_routePollQueue;
-extern Queue g_discoveryPollQueue;
-
 /**
  * Node class default constructor
  */
@@ -1100,10 +1090,28 @@ void Node::calculateCompoundStatus(BOOL bForcedRecalc)
       PostEvent(dwEventCodes[m_iStatus], m_id, "d", iOldStatus);
 }
 
+/**
+ * Status poller entry point
+ */
+void Node::statusPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   statusPoll(NULL, 0, poller);
+
+   // Check if the node has to be deleted due to long downtime
+       time_t unreachableDeleteDays = (time_t)ConfigReadInt(_T("DeleteUnreachableNodesPeriod"), 0);
+       if ((unreachableDeleteDays > 0) && (m_downSince > 0) && 
+                (time(NULL) - m_downSince > unreachableDeleteDays * 24 * 3600))
+       {
+               deleteObject();
+       }
+   delete poller;
+}
+
 /**
  * Perform status poll on node
  */
-void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -1120,7 +1128,7 @@ void Node::statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
    time_t tNow, tExpire;
 
    Queue *pQueue = new Queue;     // Delayed event queue
-   SetPollerInfo(nPoller, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = pSession;
    sendPollerMsg(dwRqId, _T("Starting status poll for node %s\r\n"), m_name);
@@ -1146,7 +1154,7 @@ restart_agent_check:
                pTransport = createSnmpTransport();
                if (pTransport != NULL)
       {
-         SetPollerInfo(nPoller, _T("check SNMP"));
+         poller->setStatus(_T("check SNMP"));
          sendPollerMsg(dwRqId, _T("Checking SNMP agent connectivity\r\n"));
          const TCHAR *testOid = m_customAttributes.get(_T("snmp.testoid"));
          if (testOid == NULL)
@@ -1205,7 +1213,7 @@ restart_agent_check:
    if ((m_dwFlags & NF_IS_NATIVE_AGENT) && (!(m_dwFlags & NF_DISABLE_NXCP)) && m_ipAddress.isValidUnicast())
    {
       DbgPrintf(6, _T("StatusPoll(%s): checking agent"), m_name);
-      SetPollerInfo(nPoller, _T("check agent"));
+      poller->setStatus(_T("check agent"));
       sendPollerMsg(dwRqId, _T("Checking NetXMS agent connectivity\r\n"));
 
                UINT32 error, socketError;
@@ -1248,7 +1256,7 @@ restart_agent_check:
       DbgPrintf(7, _T("StatusPoll(%s): agent check finished"), m_name);
    }
 
-   SetPollerInfo(nPoller, _T("prepare polling list"));
+   poller->setStatus(_T("prepare polling list"));
 
    // Find service poller node object
    lockProperties();
@@ -1292,7 +1300,7 @@ restart_agent_check:
    UnlockChildList();
 
    // Poll interfaces and services
-   SetPollerInfo(nPoller, _T("child poll"));
+   poller->setStatus(_T("child poll"));
    DbgPrintf(7, _T("StatusPoll(%s): starting child object poll"), m_name);
        pCluster = getMyCluster();
        pTransport = createSnmpTransport();
@@ -1361,7 +1369,7 @@ restart_agent_check:
                   {
                      m_dwDynamicFlags |= NDF_UNREACHABLE;
                                m_downSince = time(NULL);
-                          SetPollerInfo(nPoller, _T("check network path"));
+                          poller->setStatus(_T("check network path"));
                                if (checkNetworkPath(dwRqId))
                                {
                              m_dwDynamicFlags |= NDF_NETWORK_PATH_PROBLEM;
@@ -1481,15 +1489,15 @@ restart_agent_check:
                if (g_pModuleList[i].pfStatusPollHook != NULL)
                {
                        DbgPrintf(5, _T("StatusPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                       g_pModuleList[i].pfStatusPollHook(this, pSession, dwRqId, nPoller);
+                       g_pModuleList[i].pfStatusPollHook(this, pSession, dwRqId, poller);
                }
        }
 
        // Execute hook script
-   SetPollerInfo(nPoller, _T("hook"));
+   poller->setStatus(_T("hook"));
        executeHookScript(_T("StatusPoll"));
 
-   SetPollerInfo(nPoller, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (pPollerNode != NULL)
       pPollerNode->decRefCount();
 
@@ -1540,7 +1548,10 @@ bool Node::checkNetworkPathElement(UINT32 nodeId, const TCHAR *nodeType, bool is
        {
                DbgPrintf(6, _T("Node::checkNetworkPathElement(%s [%d]): forced status poll on node %s [%d]"),
                                         m_name, m_id, node->getName(), node->getId());
-               node->statusPoll(NULL, 0, 0);
+      PollerInfo *poller = RegisterPoller(POLLER_TYPE_STATUS, node);
+      poller->startExecution();
+               node->statusPoll(NULL, 0, poller);
+      delete poller;
                if (node->isDown())
                {
                        DbgPrintf(5, _T("Node::checkNetworkPathElement(%s [%d]): %s %s [%d] is down"),
@@ -1636,7 +1647,10 @@ restart:
                {
                        DbgPrintf(6, _T("Node::checkNetworkPath(%s [%d]): forced status poll on node %s [%d]"),
                                  m_name, m_id, hop->object->getName(), hop->object->getId());
-                       ((Node *)hop->object)->statusPoll(NULL, 0, 0);
+         PollerInfo *poller = RegisterPoller(POLLER_TYPE_STATUS, (Node *)hop->object);
+         poller->startExecution();
+                       ((Node *)hop->object)->statusPoll(NULL, 0, poller);
+         delete poller;
                }
 
       if (((Node *)hop->object)->isDown())
@@ -1782,10 +1796,22 @@ void Node::updatePrimaryIpAddr()
        }
 }
 
+/**
+ * Entry point for configuration poller
+ */
+void Node::configurationPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   ObjectTransactionStart();
+   configurationPoll(NULL, 0, poller, 0);
+   ObjectTransactionEnd();
+   delete poller;
+}
+
 /**
  * Perform configuration poll on node
  */
-void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller, int maskBits)
+void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller, int maskBits)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -1798,7 +1824,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
    TCHAR szBuffer[4096];
    bool hasChanges = false;
 
-   SetPollerInfo(nPoller, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = pSession;
    sendPollerMsg(dwRqId, _T("Starting configuration poll for node %s\r\n"), m_name);
@@ -1833,7 +1859,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
    {
                updatePrimaryIpAddr();
 
-      SetPollerInfo(nPoller, _T("capability check"));
+      poller->setStatus(_T("capability check"));
       sendPollerMsg(dwRqId, _T("Checking node's capabilities...\r\n"));
 
                if (confPollAgent(dwRqId))
@@ -1870,7 +1896,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
       }
 
       // Retrieve interface list
-      SetPollerInfo(nPoller, _T("interface check"));
+      poller->setStatus(_T("interface check"));
       sendPollerMsg(dwRqId, _T("Capability check finished\r\n"));
 
                if (updateInterfaceConfiguration(dwRqId, maskBits))
@@ -1887,7 +1913,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                         isMyIP(dwAddr))
                {
                        sendPollerMsg(dwRqId, _T("Node name is an IP address and need to be resolved\r\n"));
-             SetPollerInfo(nPoller, _T("resolving name"));
+             poller->setStatus(_T("resolving name"));
                        if (resolveName(FALSE))
                        {
                                sendPollerMsg(dwRqId, POLLER_INFO _T("Node name resolved to %s\r\n"), m_name);
@@ -1903,7 +1929,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                        if (g_flags & AF_SYNC_NODE_NAMES_WITH_DNS)
                        {
                                sendPollerMsg(dwRqId, _T("Syncing node name with DNS\r\n"));
-                     SetPollerInfo(nPoller, _T("resolving name"));
+                     poller->setStatus(_T("resolving name"));
                                if (resolveName(TRUE))
                                {
                                        sendPollerMsg(dwRqId, POLLER_INFO _T("Node name resolved to %s\r\n"), m_name);
@@ -1922,7 +1948,7 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                // Get list of installed products
                if (m_dwFlags & NF_IS_NATIVE_AGENT)
                {
-                       SetPollerInfo(nPoller, _T("software check"));
+                       poller->setStatus(_T("software check"));
                        sendPollerMsg(dwRqId, _T("Reading list of installed software packages\r\n"));
 
                        Table *table;
@@ -1953,20 +1979,20 @@ void Node::configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller
                        if (g_pModuleList[i].pfConfPollHook != NULL)
                        {
                                DbgPrintf(5, _T("ConfigurationPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                               if (g_pModuleList[i].pfConfPollHook(this, pSession, dwRqId, nPoller))
+                               if (g_pModuleList[i].pfConfPollHook(this, pSession, dwRqId, poller))
                hasChanges = true;
                        }
                }
 
                // Execute hook script
-               SetPollerInfo(nPoller, _T("hook"));
+               poller->setStatus(_T("hook"));
                executeHookScript(_T("ConfigurationPoll"));
 
                m_dwDynamicFlags |= NDF_CONFIGURATION_POLL_PASSED;
    }
 
    // Finish configuration poll
-   SetPollerInfo(nPoller, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (dwRqId == 0)
       m_dwDynamicFlags &= ~NDF_QUEUED_FOR_CONFIG_POLL;
    m_dwDynamicFlags &= ~NDF_RECHECK_CAPABILITIES;
@@ -3010,10 +3036,22 @@ void Node::updateContainerMembership()
    delete containers;
 }
 
+/**
+ * Entry point for instance discovery poller
+ */
+void Node::instanceDiscoveryPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   ObjectTransactionStart();
+   instanceDiscoveryPoll(NULL, 0, poller);
+   ObjectTransactionEnd();
+   delete poller;
+}
+
 /**
  * Perform instance discovery poll on node
  */
-void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int pollerId)
+void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -3022,7 +3060,7 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
                return;
        }
 
-   SetPollerInfo(pollerId, _T("wait for lock"));
+   poller->setStatus(_T("wait for lock"));
    pollerLock();
    m_pollRequestor = session;
    sendPollerMsg(requestId, _T("Starting instance discovery poll for node %s\r\n"), m_name);
@@ -3031,11 +3069,11 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
    // Check if node is marked as unreachable
    if (!(m_dwDynamicFlags & NDF_UNREACHABLE))
    {
-               SetPollerInfo(pollerId, _T("instance discovery"));
+               poller->setStatus(_T("instance discovery"));
       doInstanceDiscovery(requestId);
 
                // Execute hook script
-               SetPollerInfo(pollerId, _T("hook"));
+               poller->setStatus(_T("hook"));
                executeHookScript(_T("InstancePoll"));
    }
    else
@@ -3047,7 +3085,7 @@ void Node::instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int p
    m_lastInstancePoll = time(NULL);
 
    // Finish instance discovery poll
-   SetPollerInfo(pollerId, _T("cleanup"));
+   poller->setStatus(_T("cleanup"));
    if (requestId == 0)
       m_dwDynamicFlags &= ~NDF_QUEUED_FOR_INSTANCE_POLL;
    pollerUnlock();
@@ -4026,14 +4064,6 @@ UINT32 Node::getInternalItem(const TCHAR *param, size_t bufSize, TCHAR *buffer)
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgRawDataWriterQueueSize);
       }
-      else if (!_tcsicmp(param, _T("Server.AverageStatusPollerQueueSize")))
-      {
-         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgStatusPollerQueueSize);
-      }
-      else if (!_tcsicmp(param, _T("Server.AverageConfigurationPollerQueueSize")))
-      {
-         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgConfigPollerQueueSize);
-      }
       else if (!_tcsicmp(param, _T("Server.AverageDCIQueuingTime")))
       {
          _sntprintf(buffer, bufSize, _T("%u"), g_dwAvgDCIQueuingTime);
@@ -5032,6 +5062,16 @@ bool Node::getNextHop(const InetAddress& srcAddr, const InetAddress& destAddr, I
    return nextHopFound;
 }
 
+/**
+ * Entry point for routing table poller
+ */
+void Node::routingTablePoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   updateRoutingTable();
+   delete poller;
+}
+
 /**
  * Update cached routing table
  */
@@ -5132,48 +5172,6 @@ void Node::prepareForDeletion()
    m_dwDynamicFlags |= NDF_POLLING_DISABLED | NDF_DELETE_IN_PROGRESS;
    unlockProperties();
 
-       if (g_statusPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_STATUS_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from status poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_configPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_CONFIG_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from configuration poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_instancePollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_INSTANCE_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from instance discovery poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_discoveryPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_DISCOVERY_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from discovery poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_routePollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_ROUTE_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from routing table poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
-       if (g_topologyPollQueue.remove(this, NodeQueueComparator))
-       {
-               m_dwDynamicFlags &= ~NDF_QUEUED_FOR_TOPOLOGY_POLL;
-               DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): removed from topology poller queue"), m_name, (int)m_id);
-               decRefCount();
-       }
-
    // Wait for all pending polls
        DbgPrintf(4, _T("Node::PrepareForDeletion(%s [%d]): waiting for outstanding polls to finish"), m_name, (int)m_id);
    while(1)
@@ -5559,10 +5557,20 @@ void Node::buildIPTopologyInternal(nxmap_ObjList &topology, int nDepth, UINT32 s
    }
 }
 
+/**
+ * Entry point for topoloy poller
+ */
+void Node::topologyPoll(PollerInfo *poller)
+{
+   poller->startExecution();
+   topologyPoll(NULL, 0, poller);
+   delete poller;
+}
+
 /**
  * Topology poller
  */
-void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
+void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller)
 {
        if (m_dwDynamicFlags & NDF_DELETE_IN_PROGRESS)
        {
@@ -5811,12 +5819,12 @@ void Node::topologyPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller)
                if (g_pModuleList[i].pfTopologyPollHook != NULL)
                {
                        DbgPrintf(5, _T("TopologyPoll(%s [%d]): calling hook in module %s"), m_name, m_id, g_pModuleList[i].szName);
-                       g_pModuleList[i].pfTopologyPollHook(this, pSession, dwRqId, nPoller);
+                       g_pModuleList[i].pfTopologyPollHook(this, pSession, dwRqId, poller);
                }
        }
 
        // Execute hook script
-   SetPollerInfo(nPoller, _T("hook"));
+   poller->setStatus(_T("hook"));
        executeHookScript(_T("TopologyPoll"));
 
    sendPollerMsg(dwRqId, _T("Finished topology poll for node %s\r\n"), m_name);
index 53b8354..7072999 100644 (file)
@@ -202,7 +202,12 @@ Node NXCORE_EXPORTABLE *PollNewNode(const InetAddress& ipAddr, UINT32 dwCreation
                ConfigReadInt(_T("DefaultDCIRetentionTime"), 30), pNode));
 
        if (doConfPoll)
-               pNode->configurationPoll(NULL, 0, -1, ipAddr.getMaskBits());
+   {
+      PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
+      p->startExecution();
+               pNode->configurationPoll(NULL, 0, p, ipAddr.getMaskBits());
+      delete p;
+   }
 
    pNode->unhide();
    PostEvent(EVENT_NODE_ADDED, pNode->getId(), "d", (int)(discoveredNode ? 1 : 0));
index 61d8fab..c0855ed 100644 (file)
 #include "nxcore.h"
 
 /**
- * Poller state structure
+ * Node poller queue (polls new nodes)
  */
-struct __poller_state
+Queue g_nodePollerQueue;
+
+/**
+ * Thread pool for pollers
+ */
+ThreadPool *g_pollerThreadPool = NULL;
+
+/**
+ * Active pollers
+ */
+static HashMap<UINT64, PollerInfo> s_pollers(false);
+static MUTEX s_pollerLock = MutexCreate();
+
+/**
+ * Poller info destructor - will unregister poller and decrease ref count on object
+ */
+PollerInfo::~PollerInfo()
 {
-   int iType;
-   time_t timestamp;
-   TCHAR szMsg[128];
-   TCHAR szInfo[128];
-};
+   MutexLock(s_pollerLock);
+   s_pollers.remove(CAST_FROM_POINTER(this, UINT64));
+   MutexUnlock(s_pollerLock);
+   m_object->decRefCount();
+}
 
 /**
- * Poller queues
+ * Register active poller
  */
-Queue g_statusPollQueue;
-Queue g_configPollQueue;
-Queue g_instancePollQueue;
-Queue g_topologyPollQueue;
-Queue g_routePollQueue;
-Queue g_discoveryPollQueue;
-Queue g_nodePollerQueue;
-Queue g_conditionPollerQueue;
-Queue g_businessServicePollerQueue;
+PollerInfo *RegisterPoller(PollerType type, NetObj *object)
+{
+   PollerInfo *p = new PollerInfo(type, object);
+   object->incRefCount();
+   MutexLock(s_pollerLock);
+   s_pollers.set(CAST_FROM_POINTER(p, UINT64), p);
+   MutexUnlock(s_pollerLock);
+   return p;
+}
 
 /**
- * Static data
+ * Show poller information on console
  */
-static __poller_state *m_pPollerState = NULL;
-static int m_iNumPollers = 0;
+static EnumerationCallbackResult ShowPollerInfo(const void *key, const void *object, void *arg)
+{
+   static TCHAR *pollerType[] = { _T("STAT"), _T("CONF"), _T("INST"), _T("ROUT"), _T("DISC"), _T("BSVC"), _T("COND"), _T("TOPO") };
+
+   PollerInfo *p = (PollerInfo *)object;
+   NetObj *o = p->getObject();
+
+   TCHAR name[32];
+   nx_strncpy(name, o->getName(), 31);
+   ConsolePrintf((CONSOLE_CTX)arg, _T("%s | %9d | %-30s | %s\n"), pollerType[p->getType()], o->getId(), name, p->getStatus()); 
+
+   return _CONTINUE;
+}
+
+/**
+ * Get poller diagnostic
+ */
+void ShowPollers(CONSOLE_CTX console)
+{
+   ConsoleWrite(console, _T("Type | Object ID | Object name                    | Status\n")
+                         _T("-----+-----------+--------------------------------+--------------------------\n"));
+   MutexLock(s_pollerLock);
+   s_pollers.forEach(ShowPollerInfo, console);
+   MutexUnlock(s_pollerLock);
+}
 
 /**
  * Create management node object
@@ -62,7 +101,12 @@ static void CreateManagementNode(const InetAddress& addr)
        Node *pNode = new Node(addr, NF_IS_LOCAL_MGMT, 0, 0, 0);
    NetObjInsert(pNode, TRUE);
        pNode->setName(GetLocalHostName(buffer, 256));
-   pNode->configurationPoll(NULL, 0, -1, addr.getMaskBits());
+
+   PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
+   p->startExecution();
+   pNode->configurationPoll(NULL, 0, p, addr.getMaskBits());
+   delete p;
+
    pNode->unhide();
    g_dwMgmtNode = pNode->getId();   // Set local management node ID
    PostEvent(EVENT_NODE_ADDED, pNode->getId(), NULL);
@@ -218,204 +262,6 @@ void CheckForMgmtNode()
        }
 }
 
-/**
- * Set poller's state
- */
-static void SetPollerState(int nIdx, const TCHAR *pszMsg)
-{
-   nx_strncpy(m_pPollerState[nIdx].szMsg, pszMsg, 128);
-   m_pPollerState[nIdx].szInfo[0] = 0;
-   m_pPollerState[nIdx].timestamp = time(NULL);
-}
-
-/**
- * Set poller's info
- */
-void SetPollerInfo(int nIdx, const TCHAR *pszMsg)
-{
-   if (nIdx != -1)
-   {
-      nx_strncpy(m_pPollerState[nIdx].szInfo, pszMsg, 128);
-      m_pPollerState[nIdx].timestamp = time(NULL);
-   }
-}
-
-/**
- * Display current poller threads state
- */
-void ShowPollerState(CONSOLE_CTX pCtx)
-{
-   int i;
-   TCHAR szTime[64];
-   struct tm *ltm;
-
-   ConsolePrintf(pCtx, _T("PT  TIME                   STATE\n"));
-   for(i = 0; i < m_iNumPollers; i++)
-   {
-      ltm = localtime(&m_pPollerState[i].timestamp);
-      if (ltm != NULL)
-         _tcsftime(szTime, 64, _T("%d/%b/%Y %H:%M:%S"), ltm);
-      else
-         _tcscpy(szTime, _T("<error>             "));
-      if (m_pPollerState[i].szInfo[0] != 0)
-         ConsolePrintf(pCtx, _T("%c   %s   %s - %s\n"), m_pPollerState[i].iType, 
-                       szTime, m_pPollerState[i].szMsg, m_pPollerState[i].szInfo);
-      else
-         ConsolePrintf(pCtx, _T("%c   %s   %s\n"), m_pPollerState[i].iType, 
-                       szTime, m_pPollerState[i].szMsg);
-   }
-   ConsolePrintf(pCtx, _T("\n"));
-}
-
-/**
- * Status poll thread
- */
-static THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
-{
-   NetObj *pObject;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-       int unreachableDeleteDays = ConfigReadInt(_T("DeleteUnreachableNodesPeriod"), 0);
-       if (unreachableDeleteDays > 10000)
-       {
-               DbgPrintf(1, _T("Extremely high value of DeleteUnreachableNodesPeriod (%d), using 10000 instead)"), unreachableDeleteDays);
-               unreachableDeleteDays = 10000;
-       }
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'S';
-   SetPollerState((long)arg, _T("init"));
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pObject = (NetObj *)g_statusPollQueue.getOrBlock();
-      if (pObject == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pObject->getName(), pObject->getId());
-      SetPollerState((long)arg, szBuffer);
-               if (pObject->getObjectClass() == OBJECT_NODE)
-               {
-                       ((Node *)pObject)->statusPoll(NULL, 0, (long)arg);
-                       // Check if the node has to be deleted due to long downtime
-                       if ((unreachableDeleteDays > 0) && (((Node *)pObject)->getDownTime() > 0) && 
-                                (time(NULL) - ((Node *)pObject)->getDownTime() > unreachableDeleteDays * 24 * 3600))
-                       {
-                               ((Node*)pObject)->deleteObject();
-                       }
-               }
-               else if (pObject->getObjectClass() == OBJECT_CLUSTER)
-               {
-                       ((Cluster *)pObject)->statusPoll(NULL, 0, (long)arg);
-               }
-      pObject->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Configuration poller
- */
-static THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'C';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait one minute to give status pollers chance to run first
-   ThreadSleep(60);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_configPollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      ObjectTransactionStart();
-      pNode->configurationPoll(NULL, 0, (long)arg, 0);
-      ObjectTransactionEnd();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Instance discovery poller
- */
-static THREAD_RESULT THREAD_CALL InstanceDiscoveryPoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'I';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_instancePollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      ObjectTransactionStart();
-      pNode->instanceDiscoveryPoll(NULL, 0, (long)arg);
-      ObjectTransactionEnd();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Routing table poller
- */
-static THREAD_RESULT THREAD_CALL RoutePoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'R';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_routePollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      pNode->updateRoutingTable();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
 /**
  * Comparator for poller queue elements
  */
@@ -507,167 +353,55 @@ static void CheckHostRoute(Node *node, ROUTE *route)
 /**
  * Discovery poller
  */
-static THREAD_RESULT THREAD_CALL DiscoveryPoller(void *arg)
+static void DiscoveryPoller(void *arg)
 {
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
-   ARP_CACHE *pArpCache;
-       ROUTING_TABLE *rt;
-       UINT32 i;
+//   TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
+//   ARP_CACHE *pArpCache;
+//     ROUTING_TABLE *rt;
 
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'D';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_discoveryPollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-               if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
-               {
-             pNode->setDiscoveryPollTimeStamp();
-             pNode->decRefCount();
-                       continue;
-               }
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-
-      DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
-                         pNode->getName(), pNode->getIpAddress().toString(szIpAddr), (int)pNode->getZoneId());
-
-      // Retrieve and analize node's ARP cache
-      pArpCache = pNode->getArpCache();
-      if (pArpCache != NULL)
-      {
-         for(i = 0; i < pArpCache->dwNumEntries; i++)
-                               if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6))     // Ignore broadcast addresses
-                                       CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
-         DestroyArpCache(pArpCache);
-      }
+   PollerInfo *poller = (PollerInfo *)arg;
+   poller->startExecution();
 
-               // Retrieve and analize node's routing table
-      DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
-                pNode->getName(), pNode->getIpAddress().toString(szIpAddr));
-               rt = pNode->getRoutingTable();
-               if (rt != NULL)
-               {
-                       for(i = 0; i < (UINT32)rt->iNumEntries; i++)
-                       {
-                               CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
-                               if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
-                                       CheckHostRoute(pNode, &rt->pRoutes[i]);
-                       }
-                       DestroyRoutingTable(rt);
-               }
-
-      DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
-                pNode->getName(), pNode->getIpAddress().toString(szIpAddr));
+   Node *pNode = (Node *)poller->getObject();
+       if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
+       {
       pNode->setDiscoveryPollTimeStamp();
-      pNode->decRefCount();
-   }
-   g_nodePollerQueue.clear();
-   g_nodePollerQueue.put(INVALID_POINTER_VALUE);
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Condition poller
- */
-static THREAD_RESULT THREAD_CALL ConditionPoller(void *arg)
-{
-   Condition *pCond;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'N';
-   SetPollerState((long)arg, _T("init"));
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pCond = (Condition *)g_conditionPollerQueue.getOrBlock();
-      if (pCond == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pCond->getName(), pCond->getId());
-      SetPollerState((long)arg, szBuffer);
-      pCond->check();
-      pCond->endPoll();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Topology poller
- */
-static THREAD_RESULT THREAD_CALL TopologyPoller(void *arg)
-{
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'T';
-   SetPollerState((long)arg, _T("init"));
+      delete poller;
+      return;
+       }
 
-   // Wait two minutes to give configuration pollers chance to run first
-   ThreadSleep(120);
+   DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
+                 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString(), (int)pNode->getZoneId());
 
-   // Main loop
-   while(!IsShutdownInProgress())
+   // Retrieve and analize node's ARP cache
+   ARP_CACHE *pArpCache = pNode->getArpCache();
+   if (pArpCache != NULL)
    {
-      SetPollerState((long)arg, _T("wait"));
-      Node *node = (Node *)g_topologyPollQueue.getOrBlock();
-      if (node == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), node->getName(), node->getId());
-      SetPollerState((long)arg, szBuffer);
-               node->topologyPoll(NULL, 0, CAST_FROM_POINTER(arg, int));
-      node->decRefCount();
+      for(UINT32 i = 0; i < pArpCache->dwNumEntries; i++)
+                       if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6))     // Ignore broadcast addresses
+                               CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
+      DestroyArpCache(pArpCache);
    }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Business service poller
- */
-static THREAD_RESULT THREAD_CALL BusinessServicePoller(void *arg)
-{
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'B';
-   SetPollerState((long)arg, _T("init"));
 
-   // Wait two minutes to give configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-               BusinessService *service = (BusinessService *)g_businessServicePollerQueue.getOrBlock();
-      if (service == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
+       // Retrieve and analize node's routing table
+   DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
+             pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
+       ROUTING_TABLE *rt = pNode->getRoutingTable();
+       if (rt != NULL)
+       {
+               for(int i = 0; i < rt->iNumEntries; i++)
+               {
+                       CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
+                       if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
+                               CheckHostRoute(pNode, &rt->pRoutes[i]);
+               }
+               DestroyRoutingTable(rt);
+       }
 
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), service->getName(), service->getId());
-      SetPollerState((long)arg, szBuffer);
-               service->poll(NULL, 0, CAST_FROM_POINTER(arg, int));
-      service->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
+   DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
+             pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
+   pNode->setDiscoveryPollTimeStamp();
+   delete poller;
 }
 
 /**
@@ -742,31 +476,22 @@ static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
  */
 static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
 {
-   int i, nRows, nInterval;
-   DB_RESULT hResult;
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'A';
-   SetPollerState((long)arg, _T("init"));
-
-   nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
+   int nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
 
    // Main loop
    while(!IsShutdownInProgress())
    {
-      SetPollerState((long)arg, _T("wait"));
       if (SleepAndCheckForShutdown(nInterval))
          break;
 
       if (!(g_flags & AF_ACTIVE_NETWORK_DISCOVERY))
          continue;
 
-      SetPollerState((long)arg, _T("check"));
-      hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
+      DB_RESULT hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
       if (hResult != NULL)
       {
-         nRows = DBGetNumRows(hResult);
-         for(i = 0; i < nRows; i++)
+         int nRows = DBGetNumRows(hResult);
+         for(int i = 0; i < nRows; i++)
          {
             CheckRange(DBGetFieldLong(hResult, i, 0),
                        DBGetFieldIPAddr(hResult, i, 1),
@@ -775,7 +500,6 @@ static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
          DBFreeResult(hResult);
       }
    }
-   SetPollerState((long)arg, _T("finished"));
    return THREAD_OK;
 }
 
@@ -791,45 +515,39 @@ static void QueueForPolling(NetObj *object, void *data)
                                Node *node = (Node *)object;
                                if (node->isReadyForConfigurationPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForConfigurationPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for configuration poll"), (int)node->getId(), node->getName());
-                                       g_configPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, node));
                                }
                                if (node->isReadyForInstancePoll())
                                {
-                                       node->incRefCount();
                                        node->lockForInstancePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for instance discovery poll"), (int)node->getId(), node->getName());
-                                       g_instancePollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::instanceDiscoveryPoll, RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, node));
                                }
                                if (node->isReadyForStatusPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForStatusPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for status poll"), (int)node->getId(), node->getName());
-                                       g_statusPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, node));
                                }
                                if (node->isReadyForRoutePoll())
                                {
-                                       node->incRefCount();
                                        node->lockForRoutePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
-                                       g_routePollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::routingTablePoll, RegisterPoller(POLLER_TYPE_ROUTING_TABLE, node));
                                }
                                if (node->isReadyForDiscoveryPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForDiscoveryPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
-                                       g_discoveryPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, DiscoveryPoller, RegisterPoller(POLLER_TYPE_DISCOVERY, node));
                                }
                                if (node->isReadyForTopologyPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForTopologyPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
-                                       g_topologyPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
                                }
                        }
                        break;
@@ -840,7 +558,7 @@ static void QueueForPolling(NetObj *object, void *data)
                                {
                                        cond->lockForPoll();
                                        DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_conditionPollerQueue.put(cond);
+               ThreadPoolExecute(g_pollerThreadPool, cond, &Condition::doPoll, RegisterPoller(POLLER_TYPE_CONDITION, cond));
                                }
                        }
                        break;
@@ -849,10 +567,9 @@ static void QueueForPolling(NetObj *object, void *data)
                                Cluster *cluster = (Cluster *)object;
                                if (cluster->isReadyForStatusPoll())
                                {
-                                       cluster->incRefCount();
                                        cluster->lockForStatusPoll();
                                        DbgPrintf(6, _T("Cluster %d \"%s\" queued for status poll"), (int)cluster->getId(), cluster->getName());
-                                       g_statusPollQueue.put(cluster);
+               ThreadPoolExecute(g_pollerThreadPool, cluster, &Cluster::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, cluster));
                                }
                        }
                        break;
@@ -861,10 +578,9 @@ static void QueueForPolling(NetObj *object, void *data)
                                BusinessService *service = (BusinessService *)object;
                                if (service->isReadyForPolling())
                                {
-                                       service->incRefCount();
                                        service->lockForPolling();
                                        DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_businessServicePollerQueue.put(service);
+               ThreadPoolExecute(g_pollerThreadPool, service, &BusinessService::poll, RegisterPoller(POLLER_TYPE_BUSINESS_SERVICE, service));
                                }
                        }
                        break;
@@ -878,77 +594,25 @@ static void QueueForPolling(NetObj *object, void *data)
  */
 THREAD_RESULT THREAD_CALL PollManager(void *pArg)
 {
-   UINT32 dwWatchdogId;
-   int i, iCounter, iNumStatusPollers, iNumConfigPollers, numInstancePollers;
-   int nIndex, iNumDiscoveryPollers, iNumRoutePollers;
-   int iNumConditionPollers, iNumTopologyPollers, iNumBusinessServicePollers;
-
-   // Read configuration
-   iNumConditionPollers = ConfigReadInt(_T("NumberOfConditionPollers"), 10);
-   iNumStatusPollers = ConfigReadInt(_T("NumberOfStatusPollers"), 25);
-   iNumConfigPollers = ConfigReadInt(_T("NumberOfConfigurationPollers"), 10);
-   numInstancePollers = ConfigReadInt(_T("NumberOfInstancePollers"), 10);
-   iNumRoutePollers = ConfigReadInt(_T("NumberOfRoutingTablePollers"), 10);
-   iNumDiscoveryPollers = ConfigReadInt(_T("NumberOfDiscoveryPollers"), 1);
-   iNumTopologyPollers = ConfigReadInt(_T("NumberOfTopologyPollers"), 10);
-   iNumBusinessServicePollers = ConfigReadInt(_T("NumberOfBusinessServicePollers"), 10);
-   m_iNumPollers = iNumStatusPollers + iNumConfigPollers + numInstancePollers +
-                   iNumDiscoveryPollers + iNumRoutePollers + iNumConditionPollers + 
-                                                iNumTopologyPollers + iNumBusinessServicePollers + 1;
-   DbgPrintf(2, _T("PollManager: %d pollers to start"), m_iNumPollers);
-
-   // Prepare static data
-   m_pPollerState = (__poller_state *)malloc(sizeof(__poller_state) * m_iNumPollers);
-
-   // Start status pollers
-   for(i = 0, nIndex = 0; i < iNumStatusPollers; i++, nIndex++)
-      ThreadCreate(StatusPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start configuration pollers
-   for(i = 0; i < iNumConfigPollers; i++, nIndex++)
-      ThreadCreate(ConfigurationPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start instance discovery pollers
-   for(i = 0; i < numInstancePollers; i++, nIndex++)
-      ThreadCreate(InstanceDiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start routing table pollers
-   for(i = 0; i < iNumRoutePollers; i++, nIndex++)
-      ThreadCreate(RoutePoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start discovery pollers
-   for(i = 0; i < iNumDiscoveryPollers; i++, nIndex++)
-      ThreadCreate(DiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start condition pollers
-   for(i = 0; i < iNumConditionPollers; i++, nIndex++)
-      ThreadCreate(ConditionPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start topology pollers
-   for(i = 0; i < iNumTopologyPollers; i++, nIndex++)
-      ThreadCreate(TopologyPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start business service pollers
-   for(i = 0; i < iNumBusinessServicePollers; i++, nIndex++)
-      ThreadCreate(BusinessServicePoller, 0, CAST_TO_POINTER(nIndex, void *));
+   g_pollerThreadPool = ThreadPoolCreate(ConfigReadInt(_T("PollerThreadPoolBaseSize"), 10), ConfigReadInt(_T("PollerThreadPoolMaxSize"), 250), _T("POLLERS"));
 
    // Start active discovery poller
-   ThreadCreate(ActiveDiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
+   ThreadCreate(ActiveDiscoveryPoller, 0, NULL);
 
-   dwWatchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
-   iCounter = 0;
+   UINT32 watchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
+   int counter = 0;
 
    while(!IsShutdownInProgress())
    {
       if (SleepAndCheckForShutdown(5))
          break;      // Shutdown has arrived
-      WatchdogNotify(dwWatchdogId);
+      WatchdogNotify(watchdogId);
 
       // Check for management node every 10 minutes
-      iCounter++;
-      if (iCounter % 120 == 0)
+      counter++;
+      if (counter % 120 == 0)
       {
-         iCounter = 0;
+         counter = 0;
          CheckForMgmtNode();
       }
 
@@ -957,31 +621,10 @@ THREAD_RESULT THREAD_CALL PollManager(void *pArg)
                g_idxObjectById.forEach(QueueForPolling, NULL);
    }
 
-   // Send stop signal to all pollers
-   g_statusPollQueue.clear();
-   g_statusPollQueue.setShutdownMode();
-
-   g_configPollQueue.clear();
-   g_configPollQueue.setShutdownMode();
-
-   g_instancePollQueue.clear();
-   g_instancePollQueue.setShutdownMode();
-
-   g_discoveryPollQueue.clear();
-   g_discoveryPollQueue.setShutdownMode();
-
-   g_routePollQueue.clear();
-   g_routePollQueue.setShutdownMode();
-
-   g_conditionPollerQueue.clear();
-   g_conditionPollerQueue.setShutdownMode();
-
-   g_topologyPollQueue.clear();
-   g_topologyPollQueue.setShutdownMode();
-
-   g_businessServicePollerQueue.clear();
-   g_businessServicePollerQueue.setShutdownMode();
+   g_nodePollerQueue.clear();
+   g_nodePollerQueue.put(INVALID_POINTER_VALUE);
 
+   ThreadPoolDestroy(g_pollerThreadPool);
    DbgPrintf(1, _T("PollManager: main thread terminated"));
    return THREAD_OK;
 }
@@ -991,18 +634,9 @@ THREAD_RESULT THREAD_CALL PollManager(void *pArg)
  */
 void ResetDiscoveryPoller()
 {
-   Node *pNode;
    NEW_NODE *pInfo;
 
-   // Clear queues
-   while((pNode = (Node *)g_discoveryPollQueue.get()) != NULL)
-   {
-      if (pNode != INVALID_POINTER_VALUE)
-      {
-         pNode->setDiscoveryPollTimeStamp();
-         pNode->decRefCount();
-      }
-   }
+   // Clear node poller queue
    while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
    {
       if (pInfo != INVALID_POINTER_VALUE)
index dbc42e7..cff5a2a 100644 (file)
 /**
  * Externals
  */
-extern Queue g_statusPollQueue;
-extern Queue g_configPollQueue;
-extern Queue g_routePollQueue;
-extern Queue g_discoveryPollQueue;
 extern Queue g_nodePollerQueue;
-extern Queue g_conditionPollerQueue;
 extern Queue g_dataCollectionQueue;
 extern Queue g_dciCacheLoaderQueue;
 
@@ -5867,22 +5862,31 @@ void ClientSession::pollerThread(Node *pNode, int iPollType, UINT32 dwRqId)
    MutexLock(m_mutexPollerInit);
    MutexUnlock(m_mutexPollerInit);
 
+   PollerInfo *poller = NULL;
    switch(iPollType)
    {
       case POLL_STATUS:
-         pNode->statusPoll(this, dwRqId, -1);
+         poller = RegisterPoller(POLLER_TYPE_STATUS, pNode);
+         poller->startExecution();
+         pNode->statusPoll(this, dwRqId, poller);
          break;
       case POLL_CONFIGURATION_FULL:
                        pNode->setRecheckCapsFlag();
          // intentionally no break here
       case POLL_CONFIGURATION_NORMAL:
-         pNode->configurationPoll(this, dwRqId, -1, 0);
+         poller = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
+         poller->startExecution();
+         pNode->configurationPoll(this, dwRqId, poller, 0);
          break;
       case POLL_INSTANCE_DISCOVERY:
-         pNode->instanceDiscoveryPoll(this, dwRqId, -1);
+         poller = RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, pNode);
+         poller->startExecution();
+         pNode->instanceDiscoveryPoll(this, dwRqId, poller);
          break;
       case POLL_TOPOLOGY:
-         pNode->topologyPoll(this, dwRqId, -1);
+         poller = RegisterPoller(POLLER_TYPE_TOPOLOGY, pNode);
+         poller->startExecution();
+         pNode->topologyPoll(this, dwRqId, poller);
          break;
       case POLL_INTERFACE_NAMES:
          pNode->updateInterfaceNames(this, dwRqId);
@@ -5892,6 +5896,7 @@ void ClientSession::pollerThread(Node *pNode, int iPollType, UINT32 dwRqId)
          break;
    }
    pNode->decRefCount();
+   delete poller;
 
    msg.setCode(CMD_POLLING_INFO);
    msg.setId(dwRqId);
@@ -7975,16 +7980,11 @@ void ClientSession::sendServerStats(UINT32 dwRqId)
 #endif
 
        // Queues
-       msg.setField(VID_QSIZE_CONDITION_POLLER, g_conditionPollerQueue.size());
-       msg.setField(VID_QSIZE_CONF_POLLER, g_configPollQueue.size());
        msg.setField(VID_QSIZE_DCI_POLLER, g_dataCollectionQueue.size());
        msg.setField(VID_QSIZE_DCI_CACHE_LOADER, g_dciCacheLoaderQueue.size());
        msg.setField(VID_QSIZE_DBWRITER, g_dbWriterQueue->size());
        msg.setField(VID_QSIZE_EVENT, g_pEventQueue->size());
-       msg.setField(VID_QSIZE_DISCOVERY, g_discoveryPollQueue.size());
        msg.setField(VID_QSIZE_NODE_POLLER, g_nodePollerQueue.size());
-       msg.setField(VID_QSIZE_ROUTE_POLLER, g_routePollQueue.size());
-       msg.setField(VID_QSIZE_STATUS_POLLER, g_statusPollQueue.size());
 
    // Send response
    sendMessage(&msg);
index be8d5ca..5c63923 100644 (file)
@@ -971,8 +971,6 @@ THREAD_RESULT NXCORE_EXPORTABLE THREAD_CALL SignalHandler(void *);
 void DbgTestRWLock(RWLOCK hLock, const TCHAR *szName, CONSOLE_CTX console);
 void DumpClientSessions(CONSOLE_CTX console);
 void DumpMobileDeviceSessions(CONSOLE_CTX console);
-void ShowPollerState(CONSOLE_CTX console);
-void SetPollerInfo(int nIdx, const TCHAR *pszMsg);
 void ShowServerStats(CONSOLE_CTX console);
 void ShowQueueStats(CONSOLE_CTX console, Queue *pQueue, const TCHAR *pszName);
 void ShowThreadPool(CONSOLE_CTX console, ThreadPool *p);
index bb7e774..6bd6b2a 100644 (file)
@@ -606,8 +606,6 @@ extern double g_dAvgDBWriterQueueSize;
 extern double g_dAvgIDataWriterQueueSize;
 extern double g_dAvgRawDataWriterQueueSize;
 extern double g_dAvgDBAndIDataWriterQueueSize;
-extern double g_dAvgStatusPollerQueueSize;
-extern double g_dAvgConfigPollerQueueSize;
 extern double g_dAvgSyslogProcessingQueueSize;
 extern double g_dAvgSyslogWriterQueueSize;
 extern UINT32 g_dwAvgDCIQueuingTime;
index 847438a..7f51fb6 100644 (file)
@@ -111,7 +111,44 @@ bool NXCORE_EXPORTABLE ExecuteQueryOnObject(DB_HANDLE hdb, UINT32 objectId, cons
  * Cluster runtime flags
  */
 #define CLF_QUEUED_FOR_STATUS_POLL     0x0001
-#define CLF_DOWN                                                               0x0002
+#define CLF_DOWN                       0x0002
+
+/**
+ * Poller types
+ */
+enum PollerType
+{
+   POLLER_TYPE_STATUS = 0,
+   POLLER_TYPE_CONFIGURATION = 1,
+   POLLER_TYPE_INSTANCE_DISCOVERY = 2,
+   POLLER_TYPE_ROUTING_TABLE = 3,
+   POLLER_TYPE_DISCOVERY = 4,
+   POLLER_TYPE_BUSINESS_SERVICE = 5,
+   POLLER_TYPE_CONDITION = 6,
+   POLLER_TYPE_TOPOLOGY = 7
+};
+
+/**
+ * Poller information
+ */
+class PollerInfo
+{
+private:
+   PollerType m_type;
+   NetObj *m_object;
+   TCHAR m_status[128];
+
+public:
+   PollerInfo(PollerType type, NetObj *object) { m_type = type; m_object = object; _tcscpy(m_status, _T("awaiting execution")); }
+   ~PollerInfo();
+
+   const PollerType getType() const { return m_type; }
+   NetObj *getObject() const { return m_object; }
+   const TCHAR *getStatus() const { return m_status; }
+
+   void startExecution() { _tcscpy(m_status, _T("started")); }
+   void setStatus(const TCHAR *status) { nx_strncpy(m_status, status, 128); }
+};
 
 /**
  * Status poll types
@@ -1047,7 +1084,8 @@ public:
        bool isResourceOnNode(UINT32 dwResource, UINT32 dwNode);
    UINT32 getZoneId() { return m_zoneId; }
 
-   void statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller);
+   void statusPoll(PollerInfo *poller);
+   void statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller);
    void lockForStatusPoll() { m_dwFlags |= CLF_QUEUED_FOR_STATUS_POLL; }
    bool isReadyForStatusPoll()
    {
@@ -1296,12 +1334,17 @@ public:
 
        void setRecheckCapsFlag() { m_dwDynamicFlags |= NDF_RECHECK_CAPABILITIES; }
    void setDiscoveryPollTimeStamp();
-   void statusPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller);
-   void configurationPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller, int maskBits);
-       void instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, int pollerId);
-       void topologyPoll(ClientSession *pSession, UINT32 dwRqId, int nPoller);
+   void statusPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller);
+   void statusPoll(PollerInfo *poller);
+   void configurationPoll(PollerInfo *poller);
+   void configurationPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller, int maskBits);
+       void instanceDiscoveryPoll(PollerInfo *poller);
+       void instanceDiscoveryPoll(ClientSession *session, UINT32 requestId, PollerInfo *poller);
+       void topologyPoll(PollerInfo *poller);
+       void topologyPoll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller);
        void resolveVlanPorts(VlanList *vlanList);
        void updateInterfaceNames(ClientSession *pSession, UINT32 dwRqId);
+       void routingTablePoll(PollerInfo *poller);
    void updateRoutingTable();
        void checkSubnetBinding();
    AccessPointState getAccessPointState(AccessPoint *ap, SNMP_Transport *snmpTransport);
@@ -1405,8 +1448,6 @@ inline bool Node::isReadyForStatusPoll()
 {
        if (m_isDeleted)
                return false;
-       //if (GetMyCluster() != NULL)
-       //      return FALSE;   // Cluster nodes should be polled from cluster status poll
    if (m_dwDynamicFlags & NDF_FORCE_STATUS_POLL)
    {
       m_dwDynamicFlags &= ~NDF_FORCE_STATUS_POLL;
@@ -1805,10 +1846,9 @@ public:
    virtual bool deleteFromDatabase(DB_HANDLE hdb);
    virtual BOOL loadFromDatabase(UINT32 dwId);
 
-   void check();
-
    void lockForPoll();
-   void endPoll();
+   void doPoll(PollerInfo *poller);
+   void check();
 
    bool isReadyForPoll()
    {
@@ -2195,7 +2235,8 @@ public:
 
        bool isReadyForPolling();
        void lockForPolling();
-       void poll(ClientSession *pSession, UINT32 dwRqId, int nPoller);
+       void poll(PollerInfo *poller);
+       void poll(ClientSession *pSession, UINT32 dwRqId, PollerInfo *poller);
 
        void getApplicableTemplates(ServiceContainer *target, ObjectArray<SlmCheck> *templates);
 };
@@ -2313,6 +2354,9 @@ bool IsEventSource(int objectClass);
 int DefaultPropagatedStatus(int iObjectStatus);
 int GetDefaultStatusCalculation(int *pnSingleThreshold, int **ppnThresholds);
 
+PollerInfo *RegisterPoller(PollerType type, NetObj *object);
+void ShowPollers(CONSOLE_CTX console);
+
 /**
  * Global variables
  */
index 3be1f7d..ff954fd 100644 (file)
@@ -24,6 +24,7 @@
 #define _nxmodule_h_
 
 #include <nxdbapi.h>
+
 /**
  * Forward declaration of server classes
  */
@@ -32,6 +33,7 @@ class MobileDeviceSession;
 class Node;
 class Event;
 class NetObj;
+class PollerInfo;
 class NXSL_Environment;
 struct NXCORE_LOG;
 
@@ -72,9 +74,9 @@ typedef struct
    BOOL (* pfTrapHandler)(SNMP_PDU *pdu, Node *pNode);
    BOOL (* pfEventHandler)(Event *event);
    void (* pfAlarmChangeHook)(UINT32 changeCode, NXC_ALARM *alarm);
-       void (* pfStatusPollHook)(Node *node, ClientSession *session, UINT32 rqId, int pollerId);
-       bool (* pfConfPollHook)(Node *node, ClientSession *session, UINT32 rqId, int pollerId);
-       void (* pfTopologyPollHook)(Node *node, ClientSession *session, UINT32 rqId, int pollerId);
+       void (* pfStatusPollHook)(Node *node, ClientSession *session, UINT32 rqId, PollerInfo *poller);
+       bool (* pfConfPollHook)(Node *node, ClientSession *session, UINT32 rqId, PollerInfo *poller);
+       void (* pfTopologyPollHook)(Node *node, ClientSession *session, UINT32 rqId, PollerInfo *poller);
        int (* pfCalculateObjectStatus)(NetObj *object);
        BOOL (* pfNetObjInsert)(NetObj *object);
        BOOL (* pfNetObjDelete)(NetObj *object);