- ObjectsGlobalLock() and ObjectsGlobalUnlock() removed
authorVictor Kirhenshtein <victor@netxms.org>
Sun, 23 Jan 2005 09:50:15 +0000 (09:50 +0000)
committerVictor Kirhenshtein <victor@netxms.org>
Sun, 23 Jan 2005 09:50:15 +0000 (09:50 +0000)
- Added multiple status and configuration poller threads
- Fixed deadlock sometimes caused by CMD_GET_DCI_DATA message handler
- Internal locking mechanisms reviewed

13 files changed:
ChangeLog
TODO
src/server/core/datacoll.cpp
src/server/core/discovery.cpp
src/server/core/hk.cpp
src/server/core/main.cpp
src/server/core/node.cpp
src/server/core/objects.cpp
src/server/core/session.cpp
src/server/core/status.cpp
src/server/core/syncer.cpp
src/server/include/nms_dcoll.h
src/server/include/nms_objects.h

index dd856ef..6243685 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -4,6 +4,9 @@
 
 - Added new object class - NetworkService, for simplified network service
   health checking
+- Server internal synchronization mechanisms improved to increase
+  stability and performance
+- Fixed deadlock sometimes caused by retrieving DCI collected data
 
 
 *
diff --git a/TODO b/TODO
index 584249e..010f6fc 100644 (file)
--- a/TODO
+++ b/TODO
@@ -36,8 +36,6 @@ SERVER:
 - Remove system-dependent code for getting interface list and ARP cache
   from management server. Server should rely on SNMP or native agent for
   this functionality.
-- Chech if ObjectGlobalLock()/ObjectGlobalUnlock() really needed or we can
-  use just index locking
 - Check what happens if DCI copied while being polled
 - On startup, server must read last poll time from database for each DCI
 - Optimize SNMP subsystem
index 1a15b11..3ce8524 100644 (file)
 #include "nxcore.h"
 
 
+//
+// Constants
+//
+
+#define ITEM_POLLING_INTERVAL    2
+
+
+//
+// Externals
+//
+
+extern Queue g_statusPollQueue;
+extern Queue g_configPollQueue;
+
+
 //
 // Global data
 //
 
 double g_dAvgPollerQueueSize = 0;
 double g_dAvgDBWriterQueueSize = 0;
+double g_dAvgStatusPollerQueueSize = 0;
+double g_dAvgConfigPollerQueueSize = 0;
+DWORD g_dwAvgDCIQueuingTime = 0;
 
 
 //
@@ -91,6 +109,9 @@ static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
          // Update item's last poll time and clear busy flag so item can be polled again
          pItem->SetLastPollTime(currTime);
          pItem->SetBusyFlag(FALSE);
+
+         // Decrement node's usage counter
+         pNode->DecRefCount();
       }
       else     /* pNode == NULL */
       {
@@ -111,14 +132,16 @@ static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
 
 static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
 {
-   DWORD i, dwElapsed, dwWatchdogId;
+   DWORD i, dwSum, dwWatchdogId, dwCurrPos = 0;
+   DWORD dwTimingHistory[60 / ITEM_POLLING_INTERVAL];
    INT64 qwStart;
 
    dwWatchdogId = WatchdogAddThread("Item Poller", 20);
+   memset(dwTimingHistory, 0, sizeof(DWORD) * (60 / ITEM_POLLING_INTERVAL));
 
    while(!ShutdownInProgress())
    {
-      if (SleepAndCheckForShutdown(2))
+      if (SleepAndCheckForShutdown(ITEM_POLLING_INTERVAL))
          break;      // Shutdown has arrived
       WatchdogNotify(dwWatchdogId);
 
@@ -128,7 +151,16 @@ static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
          ((Node *)g_pNodeIndexByAddr[i].pObject)->QueueItemsForPolling(m_pItemQueue);
       RWLockUnlock(g_rwlockNodeIndex);
 
-      dwElapsed = (DWORD)(GetCurrentTimeMs() - qwStart);
+      // Save last poll time
+      dwTimingHistory[dwCurrPos] = (DWORD)(GetCurrentTimeMs() - qwStart);
+      dwCurrPos++;
+      if (dwCurrPos == (60 / ITEM_POLLING_INTERVAL))
+         dwCurrPos = 0;
+
+      // Calculate new average for last minute
+      for(i = 0, dwSum = 0; i < (60 / ITEM_POLLING_INTERVAL); i++)
+         dwSum += dwTimingHistory[i];
+      g_dwAvgDCIQueuingTime = dwSum / (60 / ITEM_POLLING_INTERVAL);
    }
    DbgPrintf(AF_DEBUG_DC, "Item poller thread terminated");
    return THREAD_OK;
@@ -141,13 +173,19 @@ static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
 
 static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
 {
-   DWORD i, dwPollerQS[12], dwDBWriterQS[12], dwCurrPos = 0;
-   double dSum1, dSum2;
+   DWORD i, dwCurrPos = 0;
+   DWORD dwPollerQS[12], dwDBWriterQS[12];
+   DWORD dwStatusPollerQS[12], dwConfigPollerQS[12];
+   double dSum1, dSum2, dSum3, dSum4;
 
    memset(dwPollerQS, 0, sizeof(DWORD) * 12);
    memset(dwDBWriterQS, 0, sizeof(DWORD) * 12);
+   memset(dwStatusPollerQS, 0, sizeof(DWORD) * 12);
+   memset(dwConfigPollerQS, 0, sizeof(DWORD) * 12);
    g_dAvgPollerQueueSize = 0;
    g_dAvgDBWriterQueueSize = 0;
+   g_dAvgStatusPollerQueueSize = 0;
+   g_dAvgConfigPollerQueueSize = 0;
    while(!ShutdownInProgress())
    {
       if (SleepAndCheckForShutdown(5))
@@ -156,18 +194,24 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
       // Get current values
       dwPollerQS[dwCurrPos] = m_pItemQueue->Size();
       dwDBWriterQS[dwCurrPos] = g_pLazyRequestQueue->Size();
+      dwStatusPollerQS[dwCurrPos] = g_statusPollQueue.Size();
+      dwConfigPollerQS[dwCurrPos] = g_configPollQueue.Size();
       dwCurrPos++;
       if (dwCurrPos == 12)
          dwCurrPos = 0;
 
       // Calculate new averages
-      for(i = 0, dSum1 = 0, dSum2 = 0; i < 12; i++)
+      for(i = 0, dSum1 = 0, dSum2 = 0, dSum3 = 0, dSum4 = 0; i < 12; i++)
       {
          dSum1 += dwPollerQS[i];
          dSum2 += dwDBWriterQS[i];
+         dSum3 += dwStatusPollerQS[i];
+         dSum4 += dwConfigPollerQS[i];
       }
       g_dAvgPollerQueueSize = dSum1 / 12;
       g_dAvgDBWriterQueueSize = dSum2 / 12;
+      g_dAvgStatusPollerQueueSize = dSum3 / 12;
+      g_dAvgConfigPollerQueueSize = dSum4 / 12;
    }
    return THREAD_OK;
 }
index 79ec280..eb168f3 100644 (file)
@@ -64,6 +64,18 @@ void CheckForMgmtNode(void)
                                          "Server.AverageDBWriterQueueSize", 
                                          DS_INTERNAL, DCI_DT_FLOAT, 60, 30, pNode,
                                          "Average length of database writer's request queue for last minute"));
+               pNode->AddItem(new DCItem(CreateUniqueId(IDG_ITEM), 
+                                         "Server.AverageDCIQueuingTime", 
+                                         DS_INTERNAL, DCI_DT_UINT, 60, 30, pNode,
+                                         "Average time to queue DCI for polling for last minute"));
+               pNode->AddItem(new DCItem(CreateUniqueId(IDG_ITEM), 
+                                         "Server.AverageStatusPollerQueueSize", 
+                                         DS_INTERNAL, DCI_DT_FLOAT, 60, 30, pNode,
+                                         "Average length of status poller queue for last minute"));
+               pNode->AddItem(new DCItem(CreateUniqueId(IDG_ITEM), 
+                                         "Server.AverageConfigurationPollerQueueSize", 
+                                         DS_INTERNAL, DCI_DT_FLOAT, 60, 30, pNode,
+                                         "Average length of configuration poller queue for last minute"));
                break;
             }
       }
index 9cf1b1f..0099970 100644 (file)
@@ -53,7 +53,7 @@ static void CleanDeletedObjects(void)
             {
                // No records with that source ID, so we can purge this object
                sprintf(szQuery, "DELETE FROM deleted_objects WHERE object_id=%ld", dwObjectId);
-               DBQuery(g_hCoreDB, szQuery);
+               QueueSQLRequest(szQuery);
                DbgPrintf(AF_DEBUG_HOUSEKEEPER, "*HK* Deleted object with id %ld was purged", dwObjectId);
             }
             DBFreeAsyncResult(hAsyncResult);
@@ -72,17 +72,20 @@ static void DeleteEmptySubnets(void)
 {
    DWORD i;
 
-   ObjectsGlobalLock();
+   RWLockReadLock(g_rwlockIdIndex, INFINITE);
 
    // Walk through subnets and delete empty ones
-   for(i = 0; i < g_dwSubnetAddrIndexSize; i++)
-      if (g_pSubnetIndexByAddr[i].pObject->IsEmpty())
+   for(i = 0; i < g_dwIdIndexSize; i++)
+      if (g_pIndexById[i].pObject->Type() == OBJECT_SUBNET)
       {
-         PostEvent(EVENT_SUBNET_DELETED, g_pSubnetIndexByAddr[i].pObject->Id(), NULL);
-         g_pSubnetIndexByAddr[i].pObject->Delete();
+         if (g_pIndexById[i].pObject->IsEmpty())
+         {
+            PostEvent(EVENT_SUBNET_DELETED, g_pIndexById[i].pObject->Id(), NULL);
+            g_pIndexById[i].pObject->Delete();
+         }
       }
 
-   ObjectsGlobalUnlock();
+   RWLockUnlock(g_rwlockIdIndex);
 }
 
 
index cfd4ce6..15d2f48 100644 (file)
@@ -45,8 +45,7 @@ THREAD_RESULT THREAD_CALL HouseKeeper(void *pArg);
 THREAD_RESULT THREAD_CALL DiscoveryThread(void *pArg);
 THREAD_RESULT THREAD_CALL Syncer(void *pArg);
 THREAD_RESULT THREAD_CALL NodePoller(void *pArg);
-THREAD_RESULT THREAD_CALL StatusPoller(void *pArg);
-THREAD_RESULT THREAD_CALL ConfigurationPoller(void *pArg);
+THREAD_RESULT THREAD_CALL NodePollManager(void *pArg);
 THREAD_RESULT THREAD_CALL EventProcessor(void *pArg);
 THREAD_RESULT THREAD_CALL WatchdogThread(void *pArg);
 THREAD_RESULT THREAD_CALL ClientListener(void *pArg);
@@ -295,8 +294,7 @@ BOOL NXCORE_EXPORTABLE Initialize(void)
    ThreadCreate(HouseKeeper, 0, NULL);
    ThreadCreate(Syncer, 0, NULL);
    ThreadCreate(NodePoller, 0, NULL);
-   ThreadCreate(StatusPoller, 0, NULL);
-   ThreadCreate(ConfigurationPoller, 0, NULL);
+   ThreadCreate(NodePollManager, 0, NULL);
    ThreadCreate(ClientListener, 0, NULL);
 
    // Start network discovery thread if required
@@ -477,7 +475,6 @@ static BOOL ProcessCommand(char *pszCmdLine)
       DbgTestRWLock(g_rwlockNodeIndex, "g_hMutexNodeIndex");
       DbgTestRWLock(g_rwlockSubnetIndex, "g_hMutexSubnetIndex");
       DbgTestRWLock(g_rwlockInterfaceIndex, "g_hMutexInterfaceIndex");
-      DbgTestMutex(g_hMutexObjectAccess, "g_hMutexObjectAccess");
       printf("\n");
    }
    else if (IsCommand("WD", szBuffer, 2))
index 8a31b59..ae79cc0 100644 (file)
@@ -32,6 +32,7 @@ Node::Node()
 {
    m_dwFlags = 0;
    m_dwDiscoveryFlags = 0;
+   m_dwDynamicFlags = 0;
    m_dwNodeType = NODE_TYPE_GENERIC;
    m_wAgentPort = AGENT_LISTEN_PORT;
    m_wAuthMethod = AUTH_NONE;
@@ -62,6 +63,7 @@ Node::Node(DWORD dwAddr, DWORD dwFlags, DWORD dwDiscoveryFlags)
 {
    m_dwIpAddr = dwAddr;
    m_dwFlags = dwFlags;
+   m_dwDynamicFlags = 0;
    m_dwNodeType = NODE_TYPE_GENERIC;
    m_dwDiscoveryFlags = dwDiscoveryFlags;
    m_wAgentPort = AGENT_LISTEN_PORT;
@@ -671,6 +673,7 @@ void Node::StatusPoll(ClientSession *pSession, DWORD dwRqId)
    SendPollerMsg(dwRqId, "Finished status poll for node %s\r\n"
                          "Node status after poll is %s\r\n", m_szName, g_pszStatusName[m_iStatus]);
    m_pPollRequestor = NULL;
+   m_dwDynamicFlags &= ~NDF_QUEUED_FOR_STATUS_POLL;
    PollerUnlock();
 }
 
@@ -779,7 +782,7 @@ void Node::ConfigurationPoll(ClientSession *pSession, DWORD dwRqId)
                PostEvent(EVENT_INTERFACE_DELETED, m_dwId, "dsaa", pInterface->IfIndex(),
                          pInterface->Name(), pInterface->IpAddr(), pInterface->IpNetMask());
                DeleteInterface(pInterface);
-               i = 0;   // Restart loop
+               i = 0xFFFFFFFF;   // Restart loop
                bHasChanges = TRUE;
             }
          }
@@ -838,6 +841,7 @@ void Node::ConfigurationPoll(ClientSession *pSession, DWORD dwRqId)
                             "Finished configuration poll for node %s\r\n"
                             "Node configuration was%schanged after poll\r\n"),
                  m_szName, bHasChanges ? _T(" ") : _T(" not "));
+   m_dwDynamicFlags &= ~NDF_QUEUED_FOR_CONFIG_POLL;
    PollerUnlock();
    DbgPrintf(AF_DEBUG_DISCOVERY, "Finished configuration poll for node %s (ID: %d)", m_szName, m_dwId);
 
@@ -951,6 +955,18 @@ DWORD Node::GetInternalItem(const char *szParam, DWORD dwBufSize, char *szBuffer
       {
          sprintf(szBuffer, "%f", g_dAvgDBWriterQueueSize);
       }
+      else if (!stricmp(szParam, "Server.AverageStatusPollerQueueSize"))
+      {
+         sprintf(szBuffer, "%f", g_dAvgStatusPollerQueueSize);
+      }
+      else if (!stricmp(szParam, "Server.AverageConfigurationPollerQueueSize"))
+      {
+         sprintf(szBuffer, "%f", g_dAvgConfigPollerQueueSize);
+      }
+      else if (!stricmp(szParam, "Server.AverageDCIQueuingTime"))
+      {
+         sprintf(szBuffer, "%lu", g_dwAvgDCIQueuingTime);
+      }
       else
       {
          dwError = DCE_NOT_SUPPORTED;
@@ -1031,6 +1047,7 @@ void Node::QueueItemsForPolling(Queue *pPollerQueue)
       if (m_ppItems[i]->ReadyForPolling(currTime))
       {
          m_ppItems[i]->SetBusyFlag(TRUE);
+         m_dwRefCount++;   // Increment reference count for each queued DCI
          pPollerQueue->Put(m_ppItems[i]);
       }
    }
index d5924f2..321346d 100644 (file)
@@ -41,7 +41,6 @@ DWORD g_dwNodeAddrIndexSize = 0;
 INDEX *g_pInterfaceIndexByAddr = NULL;
 DWORD g_dwInterfaceAddrIndexSize = 0;
 
-MUTEX g_hMutexObjectAccess;
 RWLOCK g_rwlockIdIndex;
 RWLOCK g_rwlockNodeIndex;
 RWLOCK g_rwlockSubnetIndex;
@@ -54,7 +53,7 @@ char *g_pszStatusName[] = { "Normal", "Warning", "Minor", "Major", "Critical",
                             "Unknown", "Unmanaged", "Disabled", "Testing" };
 char *g_szClassName[]={ "Generic", "Subnet", "Node", "Interface",
                         "Network", "Container", "Zone", "ServiceRoot",
-                        "Template", "TemplateGroup", "TemplateRoot" };
+                        "Template", "TemplateGroup", "TemplateRoot", "NetworkService" };
 
 
 //
@@ -63,7 +62,6 @@ char *g_szClassName[]={ "Generic", "Subnet", "Node", "Interface",
 
 void ObjectsInit(void)
 {
-   g_hMutexObjectAccess = MutexCreate();
    g_rwlockIdIndex = RWLockCreate();
    g_rwlockNodeIndex = RWLockCreate();
    g_rwlockSubnetIndex = RWLockCreate();
@@ -83,26 +81,6 @@ void ObjectsInit(void)
 }
 
 
-//
-// Lock write access to all objects
-//
-
-void ObjectsGlobalLock(void)
-{
-   MutexLock(g_hMutexObjectAccess, INFINITE);
-}
-
-
-//
-// Unlock write access to all objects
-//
-
-void ObjectsGlobalUnlock(void)
-{
-   MutexUnlock(g_hMutexObjectAccess);
-}
-
-
 //
 // Function to compare two indexes
 //
@@ -186,7 +164,6 @@ static void DeleteObjectFromIndex(INDEX **ppIndex, DWORD *pdwIndexSize, DWORD dw
 
 void NetObjInsert(NetObj *pObject, BOOL bNewObject)
 {
-   ObjectsGlobalLock();
    if (bNewObject)   
    {
       // Assign unique ID to new object
@@ -239,7 +216,6 @@ void NetObjInsert(NetObj *pObject, BOOL bNewObject)
             break;
       }
    }
-   ObjectsGlobalUnlock();
 }
 
 
@@ -287,7 +263,7 @@ void NetObjDeleteFromIndexes(NetObj *pObject)
 // Get IP netmask for object of any class
 //
 
-DWORD GetObjectNetmask(NetObj *pObject)
+static DWORD GetObjectNetmask(NetObj *pObject)
 {
    switch(pObject->Type())
    {
@@ -301,33 +277,6 @@ DWORD GetObjectNetmask(NetObj *pObject)
 }
 
 
-//
-// Delete object (final step)
-// This function should be called only by syncer thread when access to objects are locked.
-// Object will be removed from index by ID and destroyed.
-//
-
-void NetObjDelete(NetObj *pObject)
-{
-   char szQuery[256], szIpAddr[16], szNetMask[16];
-
-   // Write object to deleted objects table
-   _sntprintf(szQuery, 256, _T("INSERT INTO deleted_objects (object_id,object_class,name,"
-                               "ip_addr,ip_netmask) VALUES (%ld,%ld,'%s','%s','%s')"),
-              pObject->Id(), pObject->Type(), pObject->Name(), 
-              IpToStr(pObject->IpAddr(), szIpAddr),
-              IpToStr(GetObjectNetmask(pObject), szNetMask));
-   DBQuery(g_hCoreDB, szQuery);
-
-   // Delete object from index by ID
-   RWLockWriteLock(g_rwlockIdIndex, INFINITE);
-   DeleteObjectFromIndex(&g_pIndexById, &g_dwIdIndexSize, pObject->Id());
-   RWLockUnlock(g_rwlockIdIndex);
-            
-   delete pObject;
-}
-
-
 //
 // Find node by IP address
 //
@@ -643,15 +592,11 @@ void DeleteUserFromAllObjects(DWORD dwUserId)
 {
    DWORD i;
 
-   ObjectsGlobalLock();
-
    // Walk through all objects
    RWLockReadLock(g_rwlockIdIndex, INFINITE);
    for(i = 0; i < g_dwIdIndexSize; i++)
       g_pIndexById[i].pObject->DropUserAccess(dwUserId);
    RWLockUnlock(g_rwlockIdIndex);
-
-   ObjectsGlobalUnlock();
 }
 
 
@@ -739,3 +684,28 @@ BOOL IsValidParentClass(int iChildClass, int iParentClass)
    }
    return FALSE;
 }
+
+
+//
+// Delete object (final step)
+// This function should be called ONLY from syncer thread
+// Access to object index by id are write-locked when this function is called
+// Object will be removed from index by ID and destroyed.
+//
+
+void NetObjDelete(NetObj *pObject)
+{
+   char szQuery[256], szIpAddr[16], szNetMask[16];
+
+   // Write object to deleted objects table
+   _sntprintf(szQuery, 256, _T("INSERT INTO deleted_objects (object_id,object_class,name,"
+                               "ip_addr,ip_netmask) VALUES (%ld,%ld,'%s','%s','%s')"),
+              pObject->Id(), pObject->Type(), pObject->Name(), 
+              IpToStr(pObject->IpAddr(), szIpAddr),
+              IpToStr(GetObjectNetmask(pObject), szNetMask));
+   DBQuery(g_hCoreDB, szQuery);
+
+   // Delete object from index by ID and object itself
+   DeleteObjectFromIndex(&g_pIndexById, &g_dwIdIndexSize, pObject->Id());
+   delete pObject;
+}
index 9a09ea8..fa2203a 100644 (file)
@@ -1167,7 +1167,7 @@ void ClientSession::SendAllObjects(DWORD dwRqId)
    msg.SetCode(CMD_OBJECT);
 
    // Send objects, one per message
-   ObjectsGlobalLock();
+   RWLockReadLock(g_rwlockIdIndex, INFINITE);
    for(i = 0; i < g_dwIdIndexSize; i++)
       if (g_pIndexById[i].pObject->CheckAccessRights(m_dwUserId, OBJECT_ACCESS_READ))
       {
@@ -1175,7 +1175,7 @@ void ClientSession::SendAllObjects(DWORD dwRqId)
          SendMessage(&msg);
          msg.DeleteAllVariables();
       }
-   ObjectsGlobalUnlock();
+   RWLockUnlock(g_rwlockIdIndex);
 
    // Send end of list notification
    msg.SetCode(CMD_OBJECT_LIST_END);
@@ -2219,14 +2219,14 @@ void ClientSession::GetCollectedData(CSCPMessage *pRequest)
             sprintf(&szCond[iPos], " AND idata_timestamp<=%d", dwTimeTo);
          }
 
+         // Get item's data type to determine actual row size
+         iType = ((Node *)pObject)->GetItemType(dwItemId);
+
          sprintf(szQuery, "SELECT idata_timestamp,idata_value FROM idata_%d WHERE item_id=%d%s ORDER BY idata_timestamp DESC",
                  dwObjectId, dwItemId, szCond);
          hResult = DBAsyncSelect(g_hCoreDB, szQuery);
          if (hResult != NULL)
          {
-            // Get item's data type to determine actual row size
-            iType = ((Node *)pObject)->GetItemType(dwItemId);
-
             // Allocate initial memory block and prepare data header
             pData = (DCI_DATA_HEADER *)malloc(dwAllocatedRows * m_dwRowSize[iType] + sizeof(DCI_DATA_HEADER));
             pData->dwDataType = htonl((DWORD)iType);
index 119de2d..1fbe133 100644 (file)
 #include "nxcore.h"
 
 
+//
+// Global data
+//
+
+Queue g_statusPollQueue;
+Queue g_configPollQueue;
+
+
 //
 // Status poll thread
 //
 
-THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
+static THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
 {
    Node *pNode;
-   DWORD dwWatchdogId;
-
-   dwWatchdogId = WatchdogAddThread("Status Poller", 60);
 
    while(!ShutdownInProgress())
    {
-      if (SleepAndCheckForShutdown(5))
-         break;      // Shutdown has arrived
-      WatchdogNotify(dwWatchdogId);
+      pNode = (Node *)g_statusPollQueue.GetOrBlock();
+      if (pNode == INVALID_POINTER_VALUE)
+         break;   // Shutdown indicator
 
-      // Walk through nodes and do status poll
-      for(DWORD i = 0; i < g_dwNodeAddrIndexSize; i++)
-      {
-         pNode = (Node *)g_pNodeIndexByAddr[i].pObject;
-         if (pNode->ReadyForStatusPoll())
-         {
-            pNode->StatusPoll(NULL, 0);
-            WatchdogNotify(dwWatchdogId);
-         }
-      }
+      pNode->StatusPoll(NULL, 0);
+      pNode->DecRefCount();
    }
    return THREAD_OK;
 }
@@ -59,29 +56,80 @@ THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
 // Configuration poll thread
 //
 
-THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
+static THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
+{
+   Node *pNode;
+
+   while(!ShutdownInProgress())
+   {
+      pNode = (Node *)g_configPollQueue.GetOrBlock();
+      if (pNode == INVALID_POINTER_VALUE)
+         break;   // Shutdown indicator
+
+      pNode->ConfigurationPoll(NULL, 0);
+      pNode->DecRefCount();
+   }
+   return THREAD_OK;
+}
+
+
+//
+// Node queuing thread
+//
+
+THREAD_RESULT THREAD_CALL NodePollManager(void *pArg)
 {
    Node *pNode;
    DWORD dwWatchdogId;
+   int i, iNumStatusPollers, iNumConfigPollers;
+
+   // Start status pollers
+   iNumStatusPollers = ConfigReadInt("NumberOfStatusPollers", 10);
+   for(i = 0; i < iNumStatusPollers; i++)
+      ThreadCreate(StatusPoller, 0, NULL);
+
+   // Start configuration pollers
+   iNumConfigPollers = ConfigReadInt("NumberOfConfigurationPollers", 4);
+   for(i = 0; i < iNumConfigPollers; i++)
+      ThreadCreate(ConfigurationPoller, 0, NULL);
 
-   dwWatchdogId = WatchdogAddThread("Configuration Poller", 120);
+   dwWatchdogId = WatchdogAddThread("Node Poll Manager", 60);
 
    while(!ShutdownInProgress())
    {
-      if (SleepAndCheckForShutdown(30))
+      if (SleepAndCheckForShutdown(5))
          break;      // Shutdown has arrived
       WatchdogNotify(dwWatchdogId);
 
-      // Walk through nodes and do configuration poll
+      // Walk through nodes and queue them for status 
+      // and/or configuration poll
+      RWLockReadLock(g_rwlockNodeIndex, INFINITE);
       for(DWORD i = 0; i < g_dwNodeAddrIndexSize; i++)
       {
          pNode = (Node *)g_pNodeIndexByAddr[i].pObject;
          if (pNode->ReadyForConfigurationPoll())
          {
-            pNode->ConfigurationPoll(NULL, 0);
-            WatchdogNotify(dwWatchdogId);
+            pNode->IncRefCount();
+            pNode->LockForStatusPoll();
+            g_configPollQueue.Put(pNode);
+         }
+         if (pNode->ReadyForStatusPoll())
+         {
+            pNode->IncRefCount();
+            pNode->LockForConfigurationPoll();
+            g_statusPollQueue.Put(pNode);
          }
       }
+      RWLockUnlock(g_rwlockNodeIndex);
    }
+
+   // Send stop signal to all pollers
+   g_statusPollQueue.Clear();
+   g_configPollQueue.Clear();
+   for(i = 0; i < iNumStatusPollers; i++)
+      g_statusPollQueue.Put(INVALID_POINTER_VALUE);
+   for(i = 0; i < iNumConfigPollers; i++)
+      g_configPollQueue.Put(INVALID_POINTER_VALUE);
+
    return THREAD_OK;
 }
index 46159fc..5f1a355 100644 (file)
 #include "nxcore.h"
 
 
+//
+// Externals
+//
+
+void NetObjDelete(NetObj *pObject);
+
+
 //
 // Save objects to database
 //
@@ -31,9 +38,9 @@ void SaveObjects(void)
 {
    DWORD i;
 
-   ObjectsGlobalLock();
-
    // Delete objects marked for deletion
+   RWLockWriteLock(g_rwlockIdIndex, INFINITE);
+delete_loop_start:;
    for(i = 0; i < g_dwIdIndexSize; i++)
       if (g_pIndexById[i].pObject->IsDeleted())
          if (g_pIndexById[i].pObject->RefCount() == 0)
@@ -42,20 +49,21 @@ void SaveObjects(void)
                       g_pIndexById[i].pObject->Id(), g_pIndexById[i].pObject->Name());
             g_pIndexById[i].pObject->DeleteFromDB();
             NetObjDelete(g_pIndexById[i].pObject);
-            i = 0xFFFFFFFF;   // Restart loop
+            goto delete_loop_start;   // Restart loop
          }
          else
          {
             DbgPrintf(AF_DEBUG_HOUSEKEEPER, "* Syncer * Unable to delete object with id %d because it is being referenced %d time(s)",
                       g_pIndexById[i].pObject->Id(), g_pIndexById[i].pObject->RefCount());
          }
+   RWLockUnlock(g_rwlockIdIndex);
 
    // Save objects
+   RWLockReadLock(g_rwlockIdIndex, INFINITE);
    for(i = 0; i < g_dwIdIndexSize; i++)
       if (g_pIndexById[i].pObject->IsModified())
          g_pIndexById[i].pObject->SaveToDB();
-
-   ObjectsGlobalUnlock();
+   RWLockUnlock(g_rwlockIdIndex);
 }
 
 
index e9d618f..23b479c 100644 (file)
@@ -218,6 +218,9 @@ void DeleteAllItemsForNode(DWORD dwNodeId);
 
 extern double g_dAvgPollerQueueSize;
 extern double g_dAvgDBWriterQueueSize;
+extern double g_dAvgStatusPollerQueueSize;
+extern double g_dAvgConfigPollerQueueSize;
+extern DWORD g_dwAvgDCIQueuingTime;
 
 
 #endif   /* _nms_dcoll_h_ */
index 4d4d012..47c5395 100644 (file)
@@ -62,6 +62,14 @@ extern DWORD g_dwConfigurationPollingInterval;
 #define DF_DEFAULT            (DF_CHECK_INTERFACES | DF_CHECK_ARP)
 
 
+//
+// Node runtime (dynamic) flags
+//
+
+#define NDF_QUEUED_FOR_STATUS_POLL     0x0001
+#define NDF_QUEUED_FOR_CONFIG_POLL     0x0002
+
+
 //
 // Status poll types
 //
@@ -125,8 +133,8 @@ public:
    BOOL IsEmpty(void) { return m_dwChildCount == 0 ? TRUE : FALSE; }
 
    DWORD RefCount(void) { return m_dwRefCount; }
-   void IncRefCount(void) { m_dwRefCount++; }
-   void DecRefCount(void) { if (m_dwRefCount > 0) m_dwRefCount--; }
+   void IncRefCount(void) { Lock(); m_dwRefCount++; Unlock(); }
+   void DecRefCount(void) { Lock(); if (m_dwRefCount > 0) m_dwRefCount--; Unlock(); }
 
    BOOL IsChild(DWORD dwObjectId);
 
@@ -296,6 +304,7 @@ class NXCORE_EXPORTABLE Node : public Template
 protected:
    DWORD m_dwFlags;
    DWORD m_dwDiscoveryFlags;
+   DWORD m_dwDynamicFlags;       // Flags used at runtime by server
    WORD m_wAgentPort;
    WORD m_wAuthMethod;
    DWORD m_dwNodeType;
@@ -314,7 +323,7 @@ protected:
    MUTEX m_hPollerMutex;
    MUTEX m_hAgentAccessMutex;
    AgentConnection *m_pAgentConnection;
-   Node *m_pPollNode;      // Node used for network service polling
+   Node *m_pPollerNode;      // Node used for network service polling
 
    void PollerLock(void) { MutexLock(m_hPollerMutex, INFINITE); }
    void PollerUnlock(void) { MutexUnlock(m_hPollerMutex); }
@@ -335,6 +344,7 @@ public:
 
    DWORD Flags(void) { return m_dwFlags; }
    DWORD DiscoveryFlags(void) { return m_dwDiscoveryFlags; }
+   DWORD RuntimeFlags(void) { return m_dwDynamicFlags; }
 
    BOOL IsSNMPSupported(void) { return m_dwFlags & NF_IS_SNMP ? TRUE : FALSE; }
    BOOL IsNativeAgent(void) { return m_dwFlags & NF_IS_NATIVE_AGENT ? TRUE : FALSE; }
@@ -363,6 +373,8 @@ public:
    BOOL ReadyForStatusPoll(void);
    BOOL ReadyForConfigurationPoll(void);
    BOOL ReadyForDiscoveryPoll(void);
+   void LockForStatusPoll(void);
+   void LockForConfigurationPoll(void);
 
    virtual void CalculateCompoundStatus(void);
 
@@ -379,7 +391,7 @@ public:
    DWORD WakeUp(void);
 
    void AddService(NetworkService *pNetSrv) { AddChild(pNetSrv); pNetSrv->AddParent(this); }
-   Node *GetPollNode(void) { return m_pPollNode; }
+   Node *GetPollerNode(void) { return m_pPollerNode; }
 };
 
 
@@ -390,6 +402,7 @@ public:
 inline BOOL Node::ReadyForStatusPoll(void) 
 { 
    return ((m_iStatus != STATUS_UNMANAGED) && 
+           (!(m_dwDynamicFlags & NDF_QUEUED_FOR_STATUS_POLL)) &&
            ((DWORD)time(NULL) - (DWORD)m_tLastStatusPoll > g_dwStatusPollingInterval))
                ? TRUE : FALSE;
 }
@@ -397,6 +410,7 @@ inline BOOL Node::ReadyForStatusPoll(void)
 inline BOOL Node::ReadyForConfigurationPoll(void) 
 { 
    return ((m_iStatus != STATUS_UNMANAGED) &&
+           (!(m_dwDynamicFlags & NDF_QUEUED_FOR_CONFIG_POLL)) &&
            ((DWORD)time(NULL) - (DWORD)m_tLastConfigurationPoll > g_dwConfigurationPollingInterval))
                ? TRUE : FALSE;
 }
@@ -408,6 +422,20 @@ inline BOOL Node::ReadyForDiscoveryPoll(void)
                ? TRUE : FALSE; 
 }
 
+inline void Node::LockForStatusPoll(void)
+{ 
+   Lock(); 
+   m_dwDynamicFlags |= NDF_QUEUED_FOR_STATUS_POLL; 
+   Unlock(); 
+}
+
+inline void Node::LockForConfigurationPoll(void) 
+{ 
+   Lock(); 
+   m_dwDynamicFlags |= NDF_QUEUED_FOR_CONFIG_POLL; 
+   Unlock(); 
+}
+
 
 //
 // Subnet
@@ -583,8 +611,6 @@ struct CONTAINER_CATEGORY
 //
 
 void ObjectsInit(void);
-void ObjectsGlobalLock(void);
-void ObjectsGlobalUnlock(void);
 
 void NetObjInsert(NetObj *pObject, BOOL bNewObject);
 void NetObjDeleteFromIndexes(NetObj *pObject);
@@ -626,7 +652,6 @@ extern RWLOCK g_rwlockIdIndex;
 extern RWLOCK g_rwlockNodeIndex;
 extern RWLOCK g_rwlockSubnetIndex;
 extern RWLOCK g_rwlockInterfaceIndex;
-extern MUTEX g_hMutexObjectAccess;
 extern DWORD g_dwNumCategories;
 extern CONTAINER_CATEGORY *g_pContainerCatList;
 extern char *g_szClassName[];