all pollers converted to single thread pool
[public/netxms.git] / src / server / core / poll.cpp
index 61d8fab..c0855ed 100644 (file)
 #include "nxcore.h"
 
 /**
- * Poller state structure
+ * Node poller queue (polls new nodes)
  */
-struct __poller_state
+Queue g_nodePollerQueue;
+
+/**
+ * Thread pool for pollers
+ */
+ThreadPool *g_pollerThreadPool = NULL;
+
+/**
+ * Active pollers
+ */
+static HashMap<UINT64, PollerInfo> s_pollers(false);
+static MUTEX s_pollerLock = MutexCreate();
+
+/**
+ * Poller info destructor - will unregister poller and decrease ref count on object
+ */
+PollerInfo::~PollerInfo()
 {
-   int iType;
-   time_t timestamp;
-   TCHAR szMsg[128];
-   TCHAR szInfo[128];
-};
+   MutexLock(s_pollerLock);
+   s_pollers.remove(CAST_FROM_POINTER(this, UINT64));
+   MutexUnlock(s_pollerLock);
+   m_object->decRefCount();
+}
 
 /**
- * Poller queues
+ * Register active poller
  */
-Queue g_statusPollQueue;
-Queue g_configPollQueue;
-Queue g_instancePollQueue;
-Queue g_topologyPollQueue;
-Queue g_routePollQueue;
-Queue g_discoveryPollQueue;
-Queue g_nodePollerQueue;
-Queue g_conditionPollerQueue;
-Queue g_businessServicePollerQueue;
+PollerInfo *RegisterPoller(PollerType type, NetObj *object)
+{
+   PollerInfo *p = new PollerInfo(type, object);
+   object->incRefCount();
+   MutexLock(s_pollerLock);
+   s_pollers.set(CAST_FROM_POINTER(p, UINT64), p);
+   MutexUnlock(s_pollerLock);
+   return p;
+}
 
 /**
- * Static data
+ * Show poller information on console
  */
-static __poller_state *m_pPollerState = NULL;
-static int m_iNumPollers = 0;
+static EnumerationCallbackResult ShowPollerInfo(const void *key, const void *object, void *arg)
+{
+   static TCHAR *pollerType[] = { _T("STAT"), _T("CONF"), _T("INST"), _T("ROUT"), _T("DISC"), _T("BSVC"), _T("COND"), _T("TOPO") };
+
+   PollerInfo *p = (PollerInfo *)object;
+   NetObj *o = p->getObject();
+
+   TCHAR name[32];
+   nx_strncpy(name, o->getName(), 31);
+   ConsolePrintf((CONSOLE_CTX)arg, _T("%s | %9d | %-30s | %s\n"), pollerType[p->getType()], o->getId(), name, p->getStatus()); 
+
+   return _CONTINUE;
+}
+
+/**
+ * Get poller diagnostic
+ */
+void ShowPollers(CONSOLE_CTX console)
+{
+   ConsoleWrite(console, _T("Type | Object ID | Object name                    | Status\n")
+                         _T("-----+-----------+--------------------------------+--------------------------\n"));
+   MutexLock(s_pollerLock);
+   s_pollers.forEach(ShowPollerInfo, console);
+   MutexUnlock(s_pollerLock);
+}
 
 /**
  * Create management node object
@@ -62,7 +101,12 @@ static void CreateManagementNode(const InetAddress& addr)
        Node *pNode = new Node(addr, NF_IS_LOCAL_MGMT, 0, 0, 0);
    NetObjInsert(pNode, TRUE);
        pNode->setName(GetLocalHostName(buffer, 256));
-   pNode->configurationPoll(NULL, 0, -1, addr.getMaskBits());
+
+   PollerInfo *p = RegisterPoller(POLLER_TYPE_CONFIGURATION, pNode);
+   p->startExecution();
+   pNode->configurationPoll(NULL, 0, p, addr.getMaskBits());
+   delete p;
+
    pNode->unhide();
    g_dwMgmtNode = pNode->getId();   // Set local management node ID
    PostEvent(EVENT_NODE_ADDED, pNode->getId(), NULL);
@@ -218,204 +262,6 @@ void CheckForMgmtNode()
        }
 }
 
-/**
- * Set poller's state
- */
-static void SetPollerState(int nIdx, const TCHAR *pszMsg)
-{
-   nx_strncpy(m_pPollerState[nIdx].szMsg, pszMsg, 128);
-   m_pPollerState[nIdx].szInfo[0] = 0;
-   m_pPollerState[nIdx].timestamp = time(NULL);
-}
-
-/**
- * Set poller's info
- */
-void SetPollerInfo(int nIdx, const TCHAR *pszMsg)
-{
-   if (nIdx != -1)
-   {
-      nx_strncpy(m_pPollerState[nIdx].szInfo, pszMsg, 128);
-      m_pPollerState[nIdx].timestamp = time(NULL);
-   }
-}
-
-/**
- * Display current poller threads state
- */
-void ShowPollerState(CONSOLE_CTX pCtx)
-{
-   int i;
-   TCHAR szTime[64];
-   struct tm *ltm;
-
-   ConsolePrintf(pCtx, _T("PT  TIME                   STATE\n"));
-   for(i = 0; i < m_iNumPollers; i++)
-   {
-      ltm = localtime(&m_pPollerState[i].timestamp);
-      if (ltm != NULL)
-         _tcsftime(szTime, 64, _T("%d/%b/%Y %H:%M:%S"), ltm);
-      else
-         _tcscpy(szTime, _T("<error>             "));
-      if (m_pPollerState[i].szInfo[0] != 0)
-         ConsolePrintf(pCtx, _T("%c   %s   %s - %s\n"), m_pPollerState[i].iType, 
-                       szTime, m_pPollerState[i].szMsg, m_pPollerState[i].szInfo);
-      else
-         ConsolePrintf(pCtx, _T("%c   %s   %s\n"), m_pPollerState[i].iType, 
-                       szTime, m_pPollerState[i].szMsg);
-   }
-   ConsolePrintf(pCtx, _T("\n"));
-}
-
-/**
- * Status poll thread
- */
-static THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
-{
-   NetObj *pObject;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-       int unreachableDeleteDays = ConfigReadInt(_T("DeleteUnreachableNodesPeriod"), 0);
-       if (unreachableDeleteDays > 10000)
-       {
-               DbgPrintf(1, _T("Extremely high value of DeleteUnreachableNodesPeriod (%d), using 10000 instead)"), unreachableDeleteDays);
-               unreachableDeleteDays = 10000;
-       }
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'S';
-   SetPollerState((long)arg, _T("init"));
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pObject = (NetObj *)g_statusPollQueue.getOrBlock();
-      if (pObject == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pObject->getName(), pObject->getId());
-      SetPollerState((long)arg, szBuffer);
-               if (pObject->getObjectClass() == OBJECT_NODE)
-               {
-                       ((Node *)pObject)->statusPoll(NULL, 0, (long)arg);
-                       // Check if the node has to be deleted due to long downtime
-                       if ((unreachableDeleteDays > 0) && (((Node *)pObject)->getDownTime() > 0) && 
-                                (time(NULL) - ((Node *)pObject)->getDownTime() > unreachableDeleteDays * 24 * 3600))
-                       {
-                               ((Node*)pObject)->deleteObject();
-                       }
-               }
-               else if (pObject->getObjectClass() == OBJECT_CLUSTER)
-               {
-                       ((Cluster *)pObject)->statusPoll(NULL, 0, (long)arg);
-               }
-      pObject->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Configuration poller
- */
-static THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'C';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait one minute to give status pollers chance to run first
-   ThreadSleep(60);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_configPollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      ObjectTransactionStart();
-      pNode->configurationPoll(NULL, 0, (long)arg, 0);
-      ObjectTransactionEnd();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Instance discovery poller
- */
-static THREAD_RESULT THREAD_CALL InstanceDiscoveryPoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'I';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_instancePollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      ObjectTransactionStart();
-      pNode->instanceDiscoveryPoll(NULL, 0, (long)arg);
-      ObjectTransactionEnd();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Routing table poller
- */
-static THREAD_RESULT THREAD_CALL RoutePoller(void *arg)
-{
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'R';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_routePollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-      pNode->updateRoutingTable();
-      pNode->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
 /**
  * Comparator for poller queue elements
  */
@@ -507,167 +353,55 @@ static void CheckHostRoute(Node *node, ROUTE *route)
 /**
  * Discovery poller
  */
-static THREAD_RESULT THREAD_CALL DiscoveryPoller(void *arg)
+static void DiscoveryPoller(void *arg)
 {
-   Node *pNode;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
-   ARP_CACHE *pArpCache;
-       ROUTING_TABLE *rt;
-       UINT32 i;
+//   TCHAR szBuffer[MAX_OBJECT_NAME + 64], szIpAddr[64];
+//   ARP_CACHE *pArpCache;
+//     ROUTING_TABLE *rt;
 
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'D';
-   SetPollerState((long)arg, _T("init"));
-
-   // Wait two minutes to give status and configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_discoveryPollQueue.getOrBlock();
-      if (pNode == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-               if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
-               {
-             pNode->setDiscoveryPollTimeStamp();
-             pNode->decRefCount();
-                       continue;
-               }
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pNode->getName(), pNode->getId());
-      SetPollerState((long)arg, szBuffer);
-
-      DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
-                         pNode->getName(), pNode->getIpAddress().toString(szIpAddr), (int)pNode->getZoneId());
-
-      // Retrieve and analize node's ARP cache
-      pArpCache = pNode->getArpCache();
-      if (pArpCache != NULL)
-      {
-         for(i = 0; i < pArpCache->dwNumEntries; i++)
-                               if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6))     // Ignore broadcast addresses
-                                       CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
-         DestroyArpCache(pArpCache);
-      }
+   PollerInfo *poller = (PollerInfo *)arg;
+   poller->startExecution();
 
-               // Retrieve and analize node's routing table
-      DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
-                pNode->getName(), pNode->getIpAddress().toString(szIpAddr));
-               rt = pNode->getRoutingTable();
-               if (rt != NULL)
-               {
-                       for(i = 0; i < (UINT32)rt->iNumEntries; i++)
-                       {
-                               CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
-                               if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
-                                       CheckHostRoute(pNode, &rt->pRoutes[i]);
-                       }
-                       DestroyRoutingTable(rt);
-               }
-
-      DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
-                pNode->getName(), pNode->getIpAddress().toString(szIpAddr));
+   Node *pNode = (Node *)poller->getObject();
+       if (pNode->getRuntimeFlags() & NDF_DELETE_IN_PROGRESS)
+       {
       pNode->setDiscoveryPollTimeStamp();
-      pNode->decRefCount();
-   }
-   g_nodePollerQueue.clear();
-   g_nodePollerQueue.put(INVALID_POINTER_VALUE);
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Condition poller
- */
-static THREAD_RESULT THREAD_CALL ConditionPoller(void *arg)
-{
-   Condition *pCond;
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'N';
-   SetPollerState((long)arg, _T("init"));
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-      pCond = (Condition *)g_conditionPollerQueue.getOrBlock();
-      if (pCond == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), pCond->getName(), pCond->getId());
-      SetPollerState((long)arg, szBuffer);
-      pCond->check();
-      pCond->endPoll();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Topology poller
- */
-static THREAD_RESULT THREAD_CALL TopologyPoller(void *arg)
-{
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'T';
-   SetPollerState((long)arg, _T("init"));
+      delete poller;
+      return;
+       }
 
-   // Wait two minutes to give configuration pollers chance to run first
-   ThreadSleep(120);
+   DbgPrintf(4, _T("Starting discovery poll for node %s (%s) in zone %d"),
+                 pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString(), (int)pNode->getZoneId());
 
-   // Main loop
-   while(!IsShutdownInProgress())
+   // Retrieve and analize node's ARP cache
+   ARP_CACHE *pArpCache = pNode->getArpCache();
+   if (pArpCache != NULL)
    {
-      SetPollerState((long)arg, _T("wait"));
-      Node *node = (Node *)g_topologyPollQueue.getOrBlock();
-      if (node == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
-
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), node->getName(), node->getId());
-      SetPollerState((long)arg, szBuffer);
-               node->topologyPoll(NULL, 0, CAST_FROM_POINTER(arg, int));
-      node->decRefCount();
+      for(UINT32 i = 0; i < pArpCache->dwNumEntries; i++)
+                       if (memcmp(pArpCache->pEntries[i].bMacAddr, "\xFF\xFF\xFF\xFF\xFF\xFF", 6))     // Ignore broadcast addresses
+                               CheckPotentialNode(pNode, pArpCache->pEntries[i].ipAddr, pArpCache->pEntries[i].dwIndex, pArpCache->pEntries[i].bMacAddr);
+      DestroyArpCache(pArpCache);
    }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
-}
-
-/**
- * Business service poller
- */
-static THREAD_RESULT THREAD_CALL BusinessServicePoller(void *arg)
-{
-   TCHAR szBuffer[MAX_OBJECT_NAME + 64];
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'B';
-   SetPollerState((long)arg, _T("init"));
 
-   // Wait two minutes to give configuration pollers chance to run first
-   ThreadSleep(120);
-
-   // Main loop
-   while(!IsShutdownInProgress())
-   {
-      SetPollerState((long)arg, _T("wait"));
-               BusinessService *service = (BusinessService *)g_businessServicePollerQueue.getOrBlock();
-      if (service == INVALID_POINTER_VALUE)
-         break;   // Shutdown indicator
+       // Retrieve and analize node's routing table
+   DbgPrintf(5, _T("Discovery poll for node %s (%s) - reading routing table"),
+             pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
+       ROUTING_TABLE *rt = pNode->getRoutingTable();
+       if (rt != NULL)
+       {
+               for(int i = 0; i < rt->iNumEntries; i++)
+               {
+                       CheckPotentialNode(pNode, rt->pRoutes[i].dwNextHop, rt->pRoutes[i].dwIfIndex);
+                       if ((rt->pRoutes[i].dwDestMask == 0xFFFFFFFF) && (rt->pRoutes[i].dwDestAddr != 0))
+                               CheckHostRoute(pNode, &rt->pRoutes[i]);
+               }
+               DestroyRoutingTable(rt);
+       }
 
-      _sntprintf(szBuffer, MAX_OBJECT_NAME + 64, _T("poll: %s [%d]"), service->getName(), service->getId());
-      SetPollerState((long)arg, szBuffer);
-               service->poll(NULL, 0, CAST_FROM_POINTER(arg, int));
-      service->decRefCount();
-   }
-   SetPollerState((long)arg, _T("finished"));
-   return THREAD_OK;
+   DbgPrintf(4, _T("Finished discovery poll for node %s (%s)"),
+             pNode->getName(), (const TCHAR *)pNode->getIpAddress().toString());
+   pNode->setDiscoveryPollTimeStamp();
+   delete poller;
 }
 
 /**
@@ -742,31 +476,22 @@ static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
  */
 static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
 {
-   int i, nRows, nInterval;
-   DB_RESULT hResult;
-
-   // Initialize state info
-   m_pPollerState[(long)arg].iType = 'A';
-   SetPollerState((long)arg, _T("init"));
-
-   nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
+   int nInterval = ConfigReadInt(_T("ActiveDiscoveryInterval"), 7200);
 
    // Main loop
    while(!IsShutdownInProgress())
    {
-      SetPollerState((long)arg, _T("wait"));
       if (SleepAndCheckForShutdown(nInterval))
          break;
 
       if (!(g_flags & AF_ACTIVE_NETWORK_DISCOVERY))
          continue;
 
-      SetPollerState((long)arg, _T("check"));
-      hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
+      DB_RESULT hResult = DBSelect(g_hCoreDB, _T("SELECT addr_type,addr1,addr2 FROM address_lists WHERE list_type=1"));
       if (hResult != NULL)
       {
-         nRows = DBGetNumRows(hResult);
-         for(i = 0; i < nRows; i++)
+         int nRows = DBGetNumRows(hResult);
+         for(int i = 0; i < nRows; i++)
          {
             CheckRange(DBGetFieldLong(hResult, i, 0),
                        DBGetFieldIPAddr(hResult, i, 1),
@@ -775,7 +500,6 @@ static THREAD_RESULT THREAD_CALL ActiveDiscoveryPoller(void *arg)
          DBFreeResult(hResult);
       }
    }
-   SetPollerState((long)arg, _T("finished"));
    return THREAD_OK;
 }
 
@@ -791,45 +515,39 @@ static void QueueForPolling(NetObj *object, void *data)
                                Node *node = (Node *)object;
                                if (node->isReadyForConfigurationPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForConfigurationPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for configuration poll"), (int)node->getId(), node->getName());
-                                       g_configPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::configurationPoll, RegisterPoller(POLLER_TYPE_CONFIGURATION, node));
                                }
                                if (node->isReadyForInstancePoll())
                                {
-                                       node->incRefCount();
                                        node->lockForInstancePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for instance discovery poll"), (int)node->getId(), node->getName());
-                                       g_instancePollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::instanceDiscoveryPoll, RegisterPoller(POLLER_TYPE_INSTANCE_DISCOVERY, node));
                                }
                                if (node->isReadyForStatusPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForStatusPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for status poll"), (int)node->getId(), node->getName());
-                                       g_statusPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, node));
                                }
                                if (node->isReadyForRoutePoll())
                                {
-                                       node->incRefCount();
                                        node->lockForRoutePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
-                                       g_routePollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::routingTablePoll, RegisterPoller(POLLER_TYPE_ROUTING_TABLE, node));
                                }
                                if (node->isReadyForDiscoveryPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForDiscoveryPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
-                                       g_discoveryPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, DiscoveryPoller, RegisterPoller(POLLER_TYPE_DISCOVERY, node));
                                }
                                if (node->isReadyForTopologyPoll())
                                {
-                                       node->incRefCount();
                                        node->lockForTopologyPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
-                                       g_topologyPollQueue.put(node);
+               ThreadPoolExecute(g_pollerThreadPool, node, &Node::topologyPoll, RegisterPoller(POLLER_TYPE_TOPOLOGY, node));
                                }
                        }
                        break;
@@ -840,7 +558,7 @@ static void QueueForPolling(NetObj *object, void *data)
                                {
                                        cond->lockForPoll();
                                        DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_conditionPollerQueue.put(cond);
+               ThreadPoolExecute(g_pollerThreadPool, cond, &Condition::doPoll, RegisterPoller(POLLER_TYPE_CONDITION, cond));
                                }
                        }
                        break;
@@ -849,10 +567,9 @@ static void QueueForPolling(NetObj *object, void *data)
                                Cluster *cluster = (Cluster *)object;
                                if (cluster->isReadyForStatusPoll())
                                {
-                                       cluster->incRefCount();
                                        cluster->lockForStatusPoll();
                                        DbgPrintf(6, _T("Cluster %d \"%s\" queued for status poll"), (int)cluster->getId(), cluster->getName());
-                                       g_statusPollQueue.put(cluster);
+               ThreadPoolExecute(g_pollerThreadPool, cluster, &Cluster::statusPoll, RegisterPoller(POLLER_TYPE_STATUS, cluster));
                                }
                        }
                        break;
@@ -861,10 +578,9 @@ static void QueueForPolling(NetObj *object, void *data)
                                BusinessService *service = (BusinessService *)object;
                                if (service->isReadyForPolling())
                                {
-                                       service->incRefCount();
                                        service->lockForPolling();
                                        DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_businessServicePollerQueue.put(service);
+               ThreadPoolExecute(g_pollerThreadPool, service, &BusinessService::poll, RegisterPoller(POLLER_TYPE_BUSINESS_SERVICE, service));
                                }
                        }
                        break;
@@ -878,77 +594,25 @@ static void QueueForPolling(NetObj *object, void *data)
  */
 THREAD_RESULT THREAD_CALL PollManager(void *pArg)
 {
-   UINT32 dwWatchdogId;
-   int i, iCounter, iNumStatusPollers, iNumConfigPollers, numInstancePollers;
-   int nIndex, iNumDiscoveryPollers, iNumRoutePollers;
-   int iNumConditionPollers, iNumTopologyPollers, iNumBusinessServicePollers;
-
-   // Read configuration
-   iNumConditionPollers = ConfigReadInt(_T("NumberOfConditionPollers"), 10);
-   iNumStatusPollers = ConfigReadInt(_T("NumberOfStatusPollers"), 25);
-   iNumConfigPollers = ConfigReadInt(_T("NumberOfConfigurationPollers"), 10);
-   numInstancePollers = ConfigReadInt(_T("NumberOfInstancePollers"), 10);
-   iNumRoutePollers = ConfigReadInt(_T("NumberOfRoutingTablePollers"), 10);
-   iNumDiscoveryPollers = ConfigReadInt(_T("NumberOfDiscoveryPollers"), 1);
-   iNumTopologyPollers = ConfigReadInt(_T("NumberOfTopologyPollers"), 10);
-   iNumBusinessServicePollers = ConfigReadInt(_T("NumberOfBusinessServicePollers"), 10);
-   m_iNumPollers = iNumStatusPollers + iNumConfigPollers + numInstancePollers +
-                   iNumDiscoveryPollers + iNumRoutePollers + iNumConditionPollers + 
-                                                iNumTopologyPollers + iNumBusinessServicePollers + 1;
-   DbgPrintf(2, _T("PollManager: %d pollers to start"), m_iNumPollers);
-
-   // Prepare static data
-   m_pPollerState = (__poller_state *)malloc(sizeof(__poller_state) * m_iNumPollers);
-
-   // Start status pollers
-   for(i = 0, nIndex = 0; i < iNumStatusPollers; i++, nIndex++)
-      ThreadCreate(StatusPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start configuration pollers
-   for(i = 0; i < iNumConfigPollers; i++, nIndex++)
-      ThreadCreate(ConfigurationPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start instance discovery pollers
-   for(i = 0; i < numInstancePollers; i++, nIndex++)
-      ThreadCreate(InstanceDiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start routing table pollers
-   for(i = 0; i < iNumRoutePollers; i++, nIndex++)
-      ThreadCreate(RoutePoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start discovery pollers
-   for(i = 0; i < iNumDiscoveryPollers; i++, nIndex++)
-      ThreadCreate(DiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start condition pollers
-   for(i = 0; i < iNumConditionPollers; i++, nIndex++)
-      ThreadCreate(ConditionPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start topology pollers
-   for(i = 0; i < iNumTopologyPollers; i++, nIndex++)
-      ThreadCreate(TopologyPoller, 0, CAST_TO_POINTER(nIndex, void *));
-
-   // Start business service pollers
-   for(i = 0; i < iNumBusinessServicePollers; i++, nIndex++)
-      ThreadCreate(BusinessServicePoller, 0, CAST_TO_POINTER(nIndex, void *));
+   g_pollerThreadPool = ThreadPoolCreate(ConfigReadInt(_T("PollerThreadPoolBaseSize"), 10), ConfigReadInt(_T("PollerThreadPoolMaxSize"), 250), _T("POLLERS"));
 
    // Start active discovery poller
-   ThreadCreate(ActiveDiscoveryPoller, 0, CAST_TO_POINTER(nIndex, void *));
+   ThreadCreate(ActiveDiscoveryPoller, 0, NULL);
 
-   dwWatchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
-   iCounter = 0;
+   UINT32 watchdogId = WatchdogAddThread(_T("Poll Manager"), 60);
+   int counter = 0;
 
    while(!IsShutdownInProgress())
    {
       if (SleepAndCheckForShutdown(5))
          break;      // Shutdown has arrived
-      WatchdogNotify(dwWatchdogId);
+      WatchdogNotify(watchdogId);
 
       // Check for management node every 10 minutes
-      iCounter++;
-      if (iCounter % 120 == 0)
+      counter++;
+      if (counter % 120 == 0)
       {
-         iCounter = 0;
+         counter = 0;
          CheckForMgmtNode();
       }
 
@@ -957,31 +621,10 @@ THREAD_RESULT THREAD_CALL PollManager(void *pArg)
                g_idxObjectById.forEach(QueueForPolling, NULL);
    }
 
-   // Send stop signal to all pollers
-   g_statusPollQueue.clear();
-   g_statusPollQueue.setShutdownMode();
-
-   g_configPollQueue.clear();
-   g_configPollQueue.setShutdownMode();
-
-   g_instancePollQueue.clear();
-   g_instancePollQueue.setShutdownMode();
-
-   g_discoveryPollQueue.clear();
-   g_discoveryPollQueue.setShutdownMode();
-
-   g_routePollQueue.clear();
-   g_routePollQueue.setShutdownMode();
-
-   g_conditionPollerQueue.clear();
-   g_conditionPollerQueue.setShutdownMode();
-
-   g_topologyPollQueue.clear();
-   g_topologyPollQueue.setShutdownMode();
-
-   g_businessServicePollerQueue.clear();
-   g_businessServicePollerQueue.setShutdownMode();
+   g_nodePollerQueue.clear();
+   g_nodePollerQueue.put(INVALID_POINTER_VALUE);
 
+   ThreadPoolDestroy(g_pollerThreadPool);
    DbgPrintf(1, _T("PollManager: main thread terminated"));
    return THREAD_OK;
 }
@@ -991,18 +634,9 @@ THREAD_RESULT THREAD_CALL PollManager(void *pArg)
  */
 void ResetDiscoveryPoller()
 {
-   Node *pNode;
    NEW_NODE *pInfo;
 
-   // Clear queues
-   while((pNode = (Node *)g_discoveryPollQueue.get()) != NULL)
-   {
-      if (pNode != INVALID_POINTER_VALUE)
-      {
-         pNode->setDiscoveryPollTimeStamp();
-         pNode->decRefCount();
-      }
-   }
+   // Clear node poller queue
    while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
    {
       if (pInfo != INVALID_POINTER_VALUE)