data collection switched to thread pool
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 28 Sep 2017 11:21:35 +0000 (14:21 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 28 Sep 2017 11:21:35 +0000 (14:21 +0300)
contrib/templates/netxms_server.xml
src/server/core/console.cpp
src/server/core/datacoll.cpp
src/server/core/dctarget.cpp
src/server/core/debug.cpp
src/server/core/main.cpp
src/server/core/node.cpp
src/server/core/session.cpp
src/server/include/nms_core.h
src/server/include/nms_dcoll.h
src/server/include/nms_objects.h

index e4c7fa6..e5415d4 100644 (file)
                        <dataCollection>
                                <dci id="21">
                                        <guid>2588c71e-271c-4b74-86ea-c0043671d9c0</guid>
-                                       <name>Server.AverageDCPollerQueueSize</name>
-                                       <description>NetXMS server: data collection poller&apos;s request queue</description>
+                                       <name>Server.AverageDataCollectorQueueSize</name>
+                                       <description>NetXMS server: data collector&apos;s request queue</description>
                                        <dataType>5</dataType>
                                        <samples>0</samples>
                                        <origin>0</origin>
index 939ab15..cb031ac 100644 (file)
  * Externals
  */
 extern Queue g_nodePollerQueue;
-extern Queue g_dataCollectionQueue;
 extern Queue g_dciCacheLoaderQueue;
 extern Queue g_syslogProcessingQueue;
 extern Queue g_syslogWriteQueue;
 extern ThreadPool *g_pollerThreadPool;
 extern ThreadPool *g_schedulerThreadPool;
+extern ThreadPool *g_dataCollectorThreadPool;
 
 void ShowPredictionEngines(CONSOLE_CTX console);
 void ShowAgentTunnels(CONSOLE_CTX console);
@@ -742,15 +742,17 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
       }
       else if (IsCommand(_T("QUEUES"), szBuffer, 1))
       {
-         ShowQueueStats(pCtx, &g_dataCollectionQueue, _T("Data collector"));
+         ShowThreadPoolPendingQueue(pCtx, g_dataCollectorThreadPool, _T("Data collector"));
          ShowQueueStats(pCtx, &g_dciCacheLoaderQueue, _T("DCI cache loader"));
          ShowQueueStats(pCtx, g_dbWriterQueue, _T("Database writer"));
          ShowQueueStats(pCtx, g_dciDataWriterQueue, _T("Database writer (IData)"));
          ShowQueueStats(pCtx, g_dciRawDataWriterQueue, _T("Database writer (raw DCI values)"));
          ShowQueueStats(pCtx, g_pEventQueue, _T("Event processor"));
-         ShowQueueStats(pCtx, &g_nodePollerQueue, _T("Node poller"));
+         ShowThreadPoolPendingQueue(pCtx, g_pollerThreadPool, _T("Poller"));
+         ShowQueueStats(pCtx, &g_nodePollerQueue, _T("Node discovery poller"));
          ShowQueueStats(pCtx, &g_syslogProcessingQueue, _T("Syslog processing"));
          ShowQueueStats(pCtx, &g_syslogWriteQueue, _T("Syslog writer"));
+         ShowThreadPoolPendingQueue(pCtx, g_schedulerThreadPool, _T("Scheduler"));
          ConsolePrintf(pCtx, _T("\n"));
       }
       else if (IsCommand(_T("ROUTING-TABLE"), szBuffer, 1))
@@ -820,6 +822,7 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
       {
          ShowThreadPool(pCtx, g_mainThreadPool);
          ShowThreadPool(pCtx, g_pollerThreadPool);
+         ShowThreadPool(pCtx, g_dataCollectorThreadPool);
          ShowThreadPool(pCtx, g_schedulerThreadPool);
          ShowThreadPool(pCtx, g_agentConnectionThreadPool);
       }
index f7e1a0a..11f0ce7 100644 (file)
  */
 extern Queue g_syslogProcessingQueue;
 extern Queue g_syslogWriteQueue;
+extern ThreadPool *g_pollerThreadPool;
+
+/**
+ * Thread pool for data collectors
+ */
+ThreadPool *g_dataCollectorThreadPool = NULL;
 
 /**
  * Global data
  */
+double g_dAvgDataCollectorQueueSize = 0;
 double g_dAvgPollerQueueSize = 0;
 double g_dAvgDBWriterQueueSize = 0;
 double g_dAvgIDataWriterQueueSize = 0;
@@ -44,7 +51,6 @@ double g_dAvgDBAndIDataWriterQueueSize = 0;
 double g_dAvgSyslogProcessingQueueSize = 0;
 double g_dAvgSyslogWriterQueueSize = 0;
 UINT32 g_dwAvgDCIQueuingTime = 0;
-Queue g_dataCollectionQueue(4096, 256);
 Queue g_dciCacheLoaderQueue;
 
 /**
@@ -223,117 +229,112 @@ static void *GetTableData(DataCollectionTarget *dcTarget, DCTable *table, UINT32
 /**
  * Data collector
  */
-static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
+void DataCollector(void *arg)
 {
-   ThreadSetName("DataCollector");
-
-   UINT32 dwError;
+   DCObject *pItem = static_cast<DCObject*>(arg);
+   DataCollectionTarget *target = static_cast<DataCollectionTarget*>(pItem->getOwner());
 
-   TCHAR *pBuffer = (TCHAR *)malloc(MAX_LINE_SIZE * sizeof(TCHAR));
-   while(!IsShutdownInProgress())
+   if (pItem->isScheduledForDeletion())
    {
-      DCObject *pItem = (DCObject *)g_dataCollectionQueue.getOrBlock();
-               DataCollectionTarget *target = (DataCollectionTarget *)pItem->getOwner();
-
-               if (pItem->isScheduledForDeletion())
-               {
-             nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
-                                   pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1);
-                       pItem->deleteFromDatabase();
-                       delete pItem;
-                       continue;
-               }
+      nxlog_debug(7, _T("DataCollector(): about to destroy DC object %d \"%s\" owner=%d"),
+                  pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1);
+      pItem->deleteFromDatabase();
+      delete pItem;
+      target->decRefCount();
+      return;
+   }
 
-               if (target == NULL)
-               {
-         nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
-                     pItem->getId(), pItem->getName());
+   if (target == NULL)
+   {
+      nxlog_debug(3, _T("DataCollector: attempt to collect information for non-existing node (DCI=%d \"%s\")"),
+                  pItem->getId(), pItem->getName());
 
-         // Update item's last poll time and clear busy flag so item can be polled again
-         pItem->setLastPollTime(time(NULL));
-         pItem->clearBusyFlag();
-                  continue;
-               }
+      // Update item's last poll time and clear busy flag so item can be polled again
+      pItem->setLastPollTime(time(NULL));
+      pItem->clearBusyFlag();
+      return;
+   }
 
-      DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
-                         pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getSourceNode());
-      UINT32 sourceNodeId = target->getEffectiveSourceNode(pItem);
-               if (sourceNodeId != 0)
-               {
-                       Node *sourceNode = (Node *)FindObjectById(sourceNodeId, OBJECT_NODE);
-                       if (sourceNode != NULL)
-                       {
-                               if (((target->getObjectClass() == OBJECT_CHASSIS) && (((Chassis *)target)->getControllerId() == sourceNodeId)) ||
-                                   sourceNode->isTrustedNode(target->getId()))
-                               {
-                                       target = sourceNode;
-                                       target->incRefCount();
-                               }
-                               else
-                               {
-               // Change item's status to "not supported"
-               pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
-               target->decRefCount();
-               target = NULL;
-                               }
-                       }
-                       else
-                       {
+   DbgPrintf(8, _T("DataCollector(): processing DC object %d \"%s\" owner=%d sourceNode=%d"),
+             pItem->getId(), pItem->getName(), (target != NULL) ? (int)target->getId() : -1, pItem->getSourceNode());
+   UINT32 sourceNodeId = target->getEffectiveSourceNode(pItem);
+   if (sourceNodeId != 0)
+   {
+      Node *sourceNode = (Node *)FindObjectById(sourceNodeId, OBJECT_NODE);
+      if (sourceNode != NULL)
+      {
+         if (((target->getObjectClass() == OBJECT_CHASSIS) && (((Chassis *)target)->getControllerId() == sourceNodeId)) ||
+             sourceNode->isTrustedNode(target->getId()))
+         {
+            target = sourceNode;
+            target->incRefCount();
+         }
+         else
+         {
+            // Change item's status to "not supported"
+            pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
             target->decRefCount();
             target = NULL;
-                       }
-               }
+         }
+      }
+      else
+      {
+         target->decRefCount();
+         target = NULL;
+      }
+   }
 
-      time_t currTime = time(NULL);
-      if (target != NULL)
+   time_t currTime = time(NULL);
+   if (target != NULL)
+   {
+      if (!IsShutdownInProgress())
       {
-         if (!IsShutdownInProgress())
+         void *data;
+         TCHAR buffer[MAX_LINE_SIZE];
+         UINT32 error;
+         switch(pItem->getType())
          {
-            void *data;
-            switch(pItem->getType())
-            {
-               case DCO_TYPE_ITEM:
-                  data = GetItemData(target, (DCItem *)pItem, pBuffer, &dwError);
-                  break;
-               case DCO_TYPE_TABLE:
-                  data = GetTableData(target, (DCTable *)pItem, &dwError);
-                  break;
-               default:
-                  data = NULL;
-                  dwError = DCE_NOT_SUPPORTED;
-                  break;
-            }
+            case DCO_TYPE_ITEM:
+               data = GetItemData(target, (DCItem *)pItem, buffer, &error);
+               break;
+            case DCO_TYPE_TABLE:
+               data = GetTableData(target, (DCTable *)pItem, &error);
+               break;
+            default:
+               data = NULL;
+               error = DCE_NOT_SUPPORTED;
+               break;
+         }
 
-            // Transform and store received value into database or handle error
-            switch(dwError)
-            {
-               case DCE_SUCCESS:
-                  if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
-                     pItem->setStatus(ITEM_STATUS_ACTIVE, true);
-                  if (!((DataCollectionTarget *)pItem->getOwner())->processNewDCValue(pItem, currTime, data))
-                  {
-                     // value processing failed, convert to data collection error
-                     pItem->processNewError(false);
-                  }
-                  break;
-               case DCE_COLLECTION_ERROR:
-                  if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
-                     pItem->setStatus(ITEM_STATUS_ACTIVE, true);
-                  pItem->processNewError(false);
-                  break;
-               case DCE_NO_SUCH_INSTANCE:
-                  if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
-                     pItem->setStatus(ITEM_STATUS_ACTIVE, true);
-                  pItem->processNewError(true);
-                  break;
-               case DCE_COMM_ERROR:
+         // Transform and store received value into database or handle error
+         switch(error)
+         {
+            case DCE_SUCCESS:
+               if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+                  pItem->setStatus(ITEM_STATUS_ACTIVE, true);
+               if (!((DataCollectionTarget *)pItem->getOwner())->processNewDCValue(pItem, currTime, data))
+               {
+                  // value processing failed, convert to data collection error
                   pItem->processNewError(false);
-                  break;
-               case DCE_NOT_SUPPORTED:
-                  // Change item's status
-                  pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
-                  break;
-            }
+               }
+               break;
+            case DCE_COLLECTION_ERROR:
+               if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+                  pItem->setStatus(ITEM_STATUS_ACTIVE, true);
+               pItem->processNewError(false);
+               break;
+            case DCE_NO_SUCH_INSTANCE:
+               if (pItem->getStatus() == ITEM_STATUS_NOT_SUPPORTED)
+                  pItem->setStatus(ITEM_STATUS_ACTIVE, true);
+               pItem->processNewError(true);
+               break;
+            case DCE_COMM_ERROR:
+               pItem->processNewError(false);
+               break;
+            case DCE_NOT_SUPPORTED:
+               // Change item's status
+               pItem->setStatus(ITEM_STATUS_NOT_SUPPORTED, true);
+               break;
          }
 
          // Send session notification when force poll is performed
@@ -343,29 +344,25 @@ static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
             session->notify(NX_NOTIFY_FORCE_DCI_POLL, pItem->getOwnerId());
             session->decRefCount();
          }
-
-         // Decrement node's usage counter
-         target->decRefCount();
-                       if ((pItem->getSourceNode() != 0) && (pItem->getOwner() != NULL))
-                       {
-                               pItem->getOwner()->decRefCount();
-                       }
       }
-      else     /* target == NULL */
+
+      // Decrement node's usage counter
+      target->decRefCount();
+      if ((pItem->getSourceNode() != 0) && (pItem->getOwner() != NULL))
       {
-                       Template *n = pItem->getOwner();
-         nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
-                                   pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, sourceNodeId);
+         pItem->getOwner()->decRefCount();
       }
-
-               // Update item's last poll time and clear busy flag so item can be polled again
-      pItem->setLastPollTime(currTime);
-      pItem->clearBusyFlag();
+   }
+   else     /* target == NULL */
+   {
+      Template *n = pItem->getOwner();
+      nxlog_debug(5, _T("DataCollector: attempt to collect information for non-existing or inaccessible node (DCI=%d \"%s\" target=%d sourceNode=%d)"),
+                  pItem->getId(), pItem->getName(), (n != NULL) ? (int)n->getId() : -1, sourceNodeId);
    }
 
-   free(pBuffer);
-   DbgPrintf(1, _T("Data collector thread terminated"));
-   return THREAD_OK;
+   // Update item's last poll time and clear busy flag so item can be polled again
+   pItem->setLastPollTime(currTime);
+   pItem->clearBusyFlag();
 }
 
 /**
@@ -379,7 +376,7 @@ static void QueueItems(NetObj *object, void *data)
    WatchdogNotify(*((UINT32 *)data));
        nxlog_debug(8, _T("ItemPoller: calling DataCollectionTarget::queueItemsForPolling for object %s [%d]"),
                                   object->getName(), object->getId());
-       ((DataCollectionTarget *)object)->queueItemsForPolling(&g_dataCollectionQueue);
+       ((DataCollectionTarget *)object)->queueItemsForPolling();
 }
 
 /**
@@ -435,32 +432,40 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
    ThreadSetName("StatCollector");
 
    UINT32 i, currPos = 0;
-   UINT32 pollerQS[12], dbWriterQS[12];
+   UINT32 pollerQS[12], dataCollectorQS[12], dbWriterQS[12];
    UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
    UINT32 syslogProcessingQS[12], syslogWriterQS[12];
-   double sum1, sum2, sum3, sum4, sum5, sum8, sum9;
+   double sum1, sum2, sum3, sum4, sum5, sum8, sum9, sum10;
 
    memset(pollerQS, 0, sizeof(UINT32) * 12);
+   memset(dataCollectorQS, 0, sizeof(UINT32) * 12);
    memset(dbWriterQS, 0, sizeof(UINT32) * 12);
    memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(syslogProcessingQS, 0, sizeof(UINT32) * 12);
    memset(syslogWriterQS, 0, sizeof(UINT32) * 12);
-   g_dAvgPollerQueueSize = 0;
+   g_dAvgDataCollectorQueueSize = 0;
    g_dAvgDBWriterQueueSize = 0;
    g_dAvgIDataWriterQueueSize = 0;
    g_dAvgRawDataWriterQueueSize = 0;
    g_dAvgDBAndIDataWriterQueueSize = 0;
    g_dAvgSyslogProcessingQueueSize = 0;
    g_dAvgSyslogWriterQueueSize = 0;
+   g_dAvgPollerQueueSize = 0;
    while(!IsShutdownInProgress())
    {
       if (SleepAndCheckForShutdown(5))
          break;      // Shutdown has arrived
 
       // Get current values
-      pollerQS[currPos] = g_dataCollectionQueue.size();
+      ThreadPoolInfo poolInfo;
+      ThreadPoolGetInfo(g_dataCollectorThreadPool, &poolInfo);
+      dataCollectorQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0;
+
+      ThreadPoolGetInfo(g_pollerThreadPool, &poolInfo);
+      pollerQS[currPos] = (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0;
+
       dbWriterQS[currPos] = g_dbWriterQueue->size();
       iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
       rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
@@ -472,23 +477,25 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
          currPos = 0;
 
       // Calculate new averages
-      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
+      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum8 = 0, sum9 = 0, sum10 = 0; i < 12; i++)
       {
-         sum1 += pollerQS[i];
+         sum1 += dataCollectorQS[i];
          sum2 += dbWriterQS[i];
          sum3 += iDataWriterQS[i];
          sum4 += rawDataWriterQS[i];
          sum5 += dbAndIDataWriterQS[i];
          sum8 += syslogProcessingQS[i];
          sum9 += syslogWriterQS[i];
+         sum10 += pollerQS[i];
       }
-      g_dAvgPollerQueueSize = sum1 / 12;
+      g_dAvgDataCollectorQueueSize = sum1 / 12;
       g_dAvgDBWriterQueueSize = sum2 / 12;
       g_dAvgIDataWriterQueueSize = sum3 / 12;
       g_dAvgRawDataWriterQueueSize = sum4 / 12;
       g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
       g_dAvgSyslogProcessingQueueSize = sum8 / 12;
       g_dAvgSyslogWriterQueueSize = sum9 / 12;
+      g_dAvgPollerQueueSize = sum10 / 12;
    }
    return THREAD_OK;
 }
@@ -528,20 +535,15 @@ THREAD_RESULT THREAD_CALL CacheLoader(void *arg)
 /**
  * Initialize data collection subsystem
  */
-BOOL InitDataCollector()
+void InitDataCollector()
 {
    int i, iNumCollectors;
 
-   // Start data collection threads
-   iNumCollectors = ConfigReadInt(_T("NumberOfDataCollectors"), 10);
-   for(i = 0; i < iNumCollectors; i++)
-      ThreadCreate(DataCollector, 0, NULL);
+   g_dataCollectorThreadPool = ThreadPoolCreate(ConfigReadInt(_T("DataCollectorThreadPoolBaseSize"), 10), ConfigReadInt(_T("DataCollectorThreadPoolMaxSize"), 250), _T("DATACOLL"));
 
    ThreadCreate(ItemPoller, 0, NULL);
    ThreadCreate(StatCollector, 0, NULL);
    ThreadCreate(CacheLoader, 0, NULL);
-
-   return TRUE;
 }
 
 /**
index 4552481..d5114c4 100644 (file)
 #include "nxcore.h"
 
 /**
+ * Data collector thread pool
+ */
+extern ThreadPool *g_dataCollectorThreadPool;
+
+/**
+ * Data collector worker
+ */
+void DataCollector(void *arg);
+
+/**
  * Default constructor
  */
 DataCollectionTarget::DataCollectionTarget() : Template()
@@ -530,7 +540,7 @@ bool DataCollectionTarget::isDataCollectionDisabled()
 /**
  * Put items which requires polling into the queue
  */
-void DataCollectionTarget::queueItemsForPolling(Queue *pollerQueue)
+void DataCollectionTarget::queueItemsForPolling()
 {
    if ((m_status == STATUS_UNMANAGED) || isDataCollectionDisabled() || m_isDeleted)
       return;  // Do not collect data for unmanaged objects or if data collection is disabled
@@ -545,7 +555,22 @@ void DataCollectionTarget::queueItemsForPolling(Queue *pollerQueue)
       {
          object->setBusyFlag();
          incRefCount();   // Increment reference count for each queued DCI
-         pollerQueue->put(object);
+
+         if ((object->getDataSource() == DS_NATIVE_AGENT) ||
+             (object->getDataSource() == DS_WINPERF) ||
+             (object->getDataSource() == DS_SSH) ||
+             (object->getDataSource() == DS_SMCLP))
+         {
+            TCHAR key[32];
+            _sntprintf(key, 32, _T("%08X/%s"),
+                     m_id, (object->getDataSource() == DS_SSH) ? _T("ssh") :
+                              (object->getDataSource() == DS_SMCLP) ? _T("smclp") : _T("agent"));
+            ThreadPoolExecuteSerialized(g_dataCollectorThreadPool, key, DataCollector, object);
+         }
+         else
+         {
+            ThreadPoolExecute(g_dataCollectorThreadPool, DataCollector, object);
+         }
                        nxlog_debug(8, _T("DataCollectionTarget(%s)->QueueItemsForPolling(): item %d \"%s\" added to queue"), m_name, object->getId(), object->getName());
       }
    }
index ae7c919..2571476 100644 (file)
@@ -184,6 +184,17 @@ void ShowQueueStats(CONSOLE_CTX console, Queue *pQueue, const TCHAR *pszName)
 }
 
 /**
+ * Show queue stats
+ */
+void ShowThreadPoolPendingQueue(CONSOLE_CTX console, ThreadPool *p, const TCHAR *pszName)
+{
+   ThreadPoolInfo info;
+   ThreadPoolGetInfo(p, &info);
+   int size = (info.activeRequests > info.curThreads) ? info.activeRequests - info.curThreads : 0;
+   ConsolePrintf(console, _T("%-32s : %d\n"), pszName, size);
+}
+
+/**
  * Show thread pool stats
  */
 void ShowThreadPool(CONSOLE_CTX console, ThreadPool *p)
index 4a897d2..dcd0869 100644 (file)
@@ -917,8 +917,7 @@ retry_db_lock:
 
        // Initialize data collection subsystem
    LoadPerfDataStorageDrivers();
-       if (!InitDataCollector())
-               return FALSE;
+       InitDataCollector();
 
        InitLogAccess();
        FileUploadJob::init();
index fa8bab3..b8d030e 100644 (file)
@@ -4515,7 +4515,11 @@ UINT32 Node::getInternalItem(const TCHAR *param, size_t bufSize, TCHAR *buffer)
    }
    else if (m_capabilities & NC_IS_LOCAL_MGMT)
    {
-      if (!_tcsicmp(param, _T("Server.AverageDBWriterQueueSize")))
+      if (!_tcsicmp(param, _T("Server.AverageDataCollectorQueueSize")))
+      {
+         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgDataCollectorQueueSize);
+      }
+      else if (!_tcsicmp(param, _T("Server.AverageDBWriterQueueSize")))
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgDBAndIDataWriterQueueSize);
       }
@@ -4535,7 +4539,7 @@ UINT32 Node::getInternalItem(const TCHAR *param, size_t bufSize, TCHAR *buffer)
       {
          _sntprintf(buffer, bufSize, _T("%u"), g_dwAvgDCIQueuingTime);
       }
-      else if (!_tcsicmp(param, _T("Server.AverageDCPollerQueueSize")))
+      else if (!_tcsicmp(param, _T("Server.AveragePollerQueueSize")))
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgPollerQueueSize);
       }
index b5f79d6..0485dc2 100644 (file)
@@ -62,8 +62,8 @@
  * Externals
  */
 extern Queue g_nodePollerQueue;
-extern Queue g_dataCollectionQueue;
 extern Queue g_dciCacheLoaderQueue;
+extern ThreadPool *g_dataCollectorThreadPool;
 
 void UnregisterClientSession(int id);
 void ResetDiscoveryPoller();
@@ -8066,7 +8066,10 @@ void ClientSession::sendServerStats(UINT32 dwRqId)
 #endif
 
        // Queues
-       msg.setField(VID_QSIZE_DCI_POLLER, g_dataCollectionQueue.size());
+   ThreadPoolInfo poolInfo;
+   ThreadPoolGetInfo(g_dataCollectorThreadPool, &poolInfo);
+       msg.setField(VID_QSIZE_DCI_POLLER, (poolInfo.activeRequests > poolInfo.curThreads) ? poolInfo.activeRequests - poolInfo.curThreads : 0);
+
        msg.setField(VID_QSIZE_DCI_CACHE_LOADER, g_dciCacheLoaderQueue.size());
        msg.setField(VID_QSIZE_DBWRITER, g_dbWriterQueue->size());
        msg.setField(VID_QSIZE_EVENT, g_pEventQueue->size());
index 405db80..a6f30e8 100644 (file)
@@ -1183,6 +1183,7 @@ void DumpClientSessions(CONSOLE_CTX console);
 void DumpMobileDeviceSessions(CONSOLE_CTX console);
 void ShowServerStats(CONSOLE_CTX console);
 void ShowQueueStats(CONSOLE_CTX console, Queue *pQueue, const TCHAR *pszName);
+void ShowThreadPoolPendingQueue(CONSOLE_CTX console, ThreadPool *p, const TCHAR *pszName);
 void ShowThreadPool(CONSOLE_CTX console, ThreadPool *p);
 LONG GetThreadPoolStat(ThreadPoolStat stat, const TCHAR *param, TCHAR *value);
 void DumpProcess(CONSOLE_CTX console);
index 30f348f..7588db2 100644 (file)
@@ -689,7 +689,7 @@ public:
 /**
  * Functions
  */
-BOOL InitDataCollector();
+void InitDataCollector();
 void DeleteAllItemsForNode(UINT32 dwNodeId);
 void WriteFullParamListToMessage(NXCPMessage *pMsg, WORD flags);
 int GetDCObjectType(UINT32 nodeId, UINT32 dciId);
@@ -704,6 +704,7 @@ void CalculateItemValueMax(ItemValue &result, int nDataType, int nNumValues, Ite
 /**
  * Global variables
  */
+extern double g_dAvgDataCollectorQueueSize;
 extern double g_dAvgPollerQueueSize;
 extern double g_dAvgDBWriterQueueSize;
 extern double g_dAvgIDataWriterQueueSize;
index e8ef2ee..0f491cf 100644 (file)
@@ -1104,7 +1104,7 @@ public:
    void updateDciCache();
    void updateDCItemCacheSize(UINT32 dciId, UINT32 conditionId = 0);
    void cleanDCIData(DB_HANDLE hdb);
-   void queueItemsForPolling(Queue *pollerQueue);
+   void queueItemsForPolling();
        bool processNewDCValue(DCObject *dco, time_t currTime, const void *value);
        void scheduleItemDataCleanup(UINT32 dciId);
    void scheduleTableDataCleanup(UINT32 dciId);