implemented single housekeeping thread for all message wait queues
authorVictor Kirhenshtein <victor@netxms.org>
Mon, 15 Jun 2015 17:45:46 +0000 (20:45 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Mon, 15 Jun 2015 17:46:11 +0000 (20:46 +0300)
13 files changed:
include/nxcpapi.h
src/agent/core/nxagentd.cpp
src/client/libnxclient/main.cpp
src/client/nxalarm/nxalarm.cpp
src/client/nxevent/nxevent.cpp
src/client/nxpush/nxpush.cpp
src/client/nxsms/nxsms.cpp
src/libnetxms/msgwq.cpp
src/server/core/main.cpp
src/server/tools/nxaction/nxaction.cpp
src/server/tools/nxap/nxap.cpp
src/server/tools/nxget/nxget.cpp
src/server/tools/nxupload/nxupload.cpp

index 0d85646..5b7b04a 100644 (file)
@@ -169,15 +169,12 @@ private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_wakeupCondition;
 #endif
-   CONDITION m_stopCondition;
    UINT32 m_holdTime;
    int m_size;
    int m_allocated;
    WAIT_QUEUE_ELEMENT *m_elements;
    UINT64 m_sequence;
-   THREAD m_hHkThread;
 
-   void housekeeperThread();
    void *waitForMessageInternal(UINT16 isBinary, UINT16 code, UINT32 id, UINT32 timeout);
 
    void lock()
@@ -198,7 +195,15 @@ private:
 #endif
    }
 
-   static THREAD_RESULT THREAD_CALL mwqThreadStarter(void *);
+   void housekeeperRun();
+
+   static MUTEX m_housekeeperLock;
+   static HashMap<UINT64, MsgWaitQueue> *m_activeQueues;
+   static CONDITION m_shutdownCondition;
+   static THREAD m_housekeeperThread;
+   static EnumerationCallbackResult houseKeeperCallback(const void *key, const void *object, void *arg);
+   static THREAD_RESULT THREAD_CALL housekeeperThread(void *);
+   static EnumerationCallbackResult diagInfoCallback(const void *key, const void *object, void *arg);
 
 public:
    MsgWaitQueue();
@@ -217,6 +222,9 @@ public:
 
    void clear();
    void setHoldTime(UINT32 holdTime) { m_holdTime = holdTime; }
+
+   static void shutdown();
+   static String getDiagInfo();
 };
 
 /**
index 1d14a22..373bea5 100644 (file)
@@ -1075,6 +1075,8 @@ void Shutdown()
       ThreadJoin(s_snmpTrapSenderThread);
        }
 
+   MsgWaitQueue::shutdown();
+
    UnloadAllSubAgents();
    CloseLocalDatabase();
    nxlog_write(MSG_AGENT_STOPPED, EVENTLOG_INFORMATION_TYPE, NULL);
index 50a8114..97de1e5 100644 (file)
@@ -71,6 +71,7 @@ bool LIBNXCLIENT_EXPORTABLE NXCInitialize()
  */
 void LIBNXCLIENT_EXPORTABLE NXCShutdown()
 {
+   MsgWaitQueue::shutdown();
 }
 
 /**
index 0c9bf7c..3fa161a 100644 (file)
@@ -251,6 +251,7 @@ int main(int argc, char *argv[])
        {
                _tprintf(_T("Unable to connect to server: %s\n"), NXCGetErrorText(rcc));
       delete session;
+      NXCShutdown();
                return 2;
        }
 
@@ -342,5 +343,6 @@ int main(int argc, char *argv[])
        }
 
    delete session;
+   NXCShutdown();
        return (rcc == RCC_SUCCESS) ? 0 : 5;
 }
index 9e7dc8a..c7bfef0 100644 (file)
@@ -87,6 +87,7 @@ static DWORD SendEvent(int iNumArgs, char **pArgList, BOOL bEncrypt)
             _tprintf(_T("Unable to send event: %s\n"), NXCGetErrorText(dwResult));
       }
       delete session;
+      NXCShutdown();
    }
    return dwResult;
 }
index 5b5ef93..f6326c8 100644 (file)
@@ -249,6 +249,7 @@ static BOOL Startup()
          s_session->setCommandTimeout(5 * 1000);
                        ret = TRUE;
                }
+      NXCShutdown();
        }
 
        return ret;
index 7449f1a..ccf473d 100644 (file)
@@ -153,6 +153,7 @@ int main(int argc, char *argv[])
        {
                _tprintf(_T("Unable to connect to server: %s\n"), NXCGetErrorText(rcc));
       delete session;
+      NXCShutdown();
                return 2;
        }
 
@@ -173,5 +174,6 @@ int main(int argc, char *argv[])
        }
 
    delete session;
+   NXCShutdown();
        return (rcc == RCC_SUCCESS) ? 0 : 5;
 }
index 528f886..1c12be3 100644 (file)
 /** 
  * Interval between checking messages TTL in milliseconds
  */
-#define TTL_CHECK_INTERVAL    500
+#define TTL_CHECK_INTERVAL    30000
 
 /**
  * Buffer allocation step
  */
 #define ALLOCATION_STEP       16
 
+/**
+ * Housekeeper data
+ */
+MUTEX MsgWaitQueue::m_housekeeperLock = MutexCreate();
+HashMap<UINT64, MsgWaitQueue> *MsgWaitQueue::m_activeQueues = new HashMap<UINT64, MsgWaitQueue>(false);
+CONDITION MsgWaitQueue::m_shutdownCondition = ConditionCreate(TRUE);
+THREAD MsgWaitQueue::m_housekeeperThread = INVALID_THREAD_HANDLE;
+
 /**
  * Constructor
  */
@@ -43,7 +51,6 @@ MsgWaitQueue::MsgWaitQueue()
    m_allocated = 0;
    m_elements = NULL;
    m_sequence = 1;
-   m_stopCondition = ConditionCreate(FALSE);
 #ifdef _WIN32
    InitializeCriticalSectionAndSpinCount(&m_mutex, 4000);
    memset(m_wakeupEvents, 0, MAX_MSGQUEUE_WAITERS * sizeof(HANDLE));
@@ -53,7 +60,15 @@ MsgWaitQueue::MsgWaitQueue()
    pthread_mutex_init(&m_mutex, NULL);
    pthread_cond_init(&m_wakeupCondition, NULL);
 #endif
-   m_hHkThread = ThreadCreateEx(mwqThreadStarter, 0, this);
+
+   // register new queue
+   MutexLock(m_housekeeperLock);
+   m_activeQueues->set(CAST_FROM_POINTER(this, UINT64), this);
+   if (m_housekeeperThread == INVALID_THREAD_HANDLE)
+   {
+      m_housekeeperThread = ThreadCreateEx(MsgWaitQueue::housekeeperThread, 0, NULL);
+   }
+   MutexUnlock(m_housekeeperLock);
 }
 
 /**
@@ -61,15 +76,13 @@ MsgWaitQueue::MsgWaitQueue()
  */
 MsgWaitQueue::~MsgWaitQueue()
 {
-   ConditionSet(m_stopCondition);
-
-   // Wait for housekeeper thread to terminate
-   ThreadJoin(m_hHkThread);
+   // unregister queue
+   MutexLock(m_housekeeperLock);
+   m_activeQueues->remove(CAST_FROM_POINTER(this, UINT64));
+   MutexUnlock(m_housekeeperLock);
 
-   // Housekeeper thread stopped, proceed with object destruction
    clear();
    safe_free(m_elements);
-   ConditionDestroy(m_stopCondition);
 
 #ifdef _WIN32
    DeleteCriticalSection(&m_mutex);
@@ -299,56 +312,110 @@ void *MsgWaitQueue::waitForMessageInternal(UINT16 isBinary, UINT16 wCode, UINT32
 }
 
 /**
- * Housekeeping thread
+ * Housekeeping run
  */
-void MsgWaitQueue::housekeeperThread()
+void MsgWaitQueue::housekeeperRun()
 {
-   while(!ConditionWait(m_stopCondition, TTL_CHECK_INTERVAL))
+   lock();
+   if (m_size > 0)
    {
-      lock();
-      if (m_size > 0)
-      {
-         for(int i = 0; i < m_allocated; i++)
-                  {
-            if (m_elements[i].msg == NULL)
-               continue;
+      for(int i = 0; i < m_allocated; i++)
+          {
+         if (m_elements[i].msg == NULL)
+            continue;
 
-            if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
+         if (m_elements[i].ttl <= TTL_CHECK_INTERVAL)
+         {
+            if (m_elements[i].isBinary)
             {
-               if (m_elements[i].isBinary)
-               {
-                  safe_free(m_elements[i].msg);
-               }
-               else
-               {
-                  delete (NXCPMessage *)(m_elements[i].msg);
-               }
-               m_elements[i].msg = NULL;
-               m_size--;
+               safe_free(m_elements[i].msg);
             }
             else
             {
-               m_elements[i].ttl -= TTL_CHECK_INTERVAL;
+               delete (NXCPMessage *)(m_elements[i].msg);
             }
-                  }
-
-         // compact queue if possible
-         if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
+            m_elements[i].msg = NULL;
+            m_size--;
+         }
+         else
          {
-            m_allocated = ALLOCATION_STEP;
-            free(m_elements);
-            m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
+            m_elements[i].ttl -= TTL_CHECK_INTERVAL;
          }
+          }
+
+      // compact queue if possible
+      if ((m_allocated > ALLOCATION_STEP) && (m_size == 0))
+      {
+         m_allocated = ALLOCATION_STEP;
+         free(m_elements);
+         m_elements = (WAIT_QUEUE_ELEMENT *)calloc(m_allocated, sizeof(WAIT_QUEUE_ELEMENT));
       }
-      unlock();
    }
+   unlock();
+}
+
+/**
+ * Callback for enumerating active queues
+ */
+EnumerationCallbackResult MsgWaitQueue::houseKeeperCallback(const void *key, const void *object, void *arg)
+{
+   ((MsgWaitQueue *)object)->housekeeperRun();
+   return _CONTINUE;
 }
 
 /**
- * Housekeeper thread starter
+ * Housekeeper thread
  */
-THREAD_RESULT THREAD_CALL MsgWaitQueue::mwqThreadStarter(void *arg)
+THREAD_RESULT THREAD_CALL MsgWaitQueue::housekeeperThread(void *arg)
 {
-   ((MsgWaitQueue *)arg)->housekeeperThread();
+   while(!ConditionWait(m_shutdownCondition, TTL_CHECK_INTERVAL))
+   {
+      MutexLock(m_housekeeperLock);
+      m_activeQueues->forEach(MsgWaitQueue::houseKeeperCallback, NULL);
+      MutexUnlock(m_housekeeperLock);
+   }
    return THREAD_OK;
 }
+
+/**
+ * Shutdown message wait queue background tasks
+ */
+void MsgWaitQueue::shutdown()
+{
+   ConditionSet(m_shutdownCondition);
+   ThreadJoin(m_housekeeperThread);
+   MutexLock(m_housekeeperLock);
+   m_housekeeperThread = INVALID_THREAD_HANDLE;
+   MutexUnlock(m_housekeeperLock);
+}
+
+/**
+ * Diag info callback
+ */
+EnumerationCallbackResult MsgWaitQueue::diagInfoCallback(const void *key, const void *object, void *arg)
+{
+   MsgWaitQueue *q = (MsgWaitQueue *)object;
+   TCHAR buffer[256];
+   _sntprintf(buffer, 256, _T("   %p size=%d holdTime=%d\n"), q, q->m_size, q->m_holdTime);
+   ((String *)arg)->append(buffer);
+   return _CONTINUE;
+}
+
+/**
+ * Get diagnostic info
+ */
+String MsgWaitQueue::getDiagInfo()
+{
+   String out;
+   MutexLock(m_housekeeperLock);
+   out.append(m_activeQueues->size());
+   out.append(_T(" active queues\nHousekeeper thread state is "));
+   out.append((m_housekeeperThread != INVALID_THREAD_HANDLE) ? _T("RUNNING\n") : _T("STOPPED\n"));
+   if (m_activeQueues->size() > 0)
+   {
+      out.append(_T("Active queues:\n"));
+      m_activeQueues->forEach(MsgWaitQueue::diagInfoCallback, &out);
+   }
+   MutexUnlock(m_housekeeperLock);
+   return out;
+}
index 52c589e..e5364c1 100644 (file)
@@ -1001,6 +1001,7 @@ void NXCORE_EXPORTABLE Shutdown()
        DbgPrintf(1, _T("Event processing stopped"));
 
    ThreadPoolDestroy(g_mainThreadPool);
+   MsgWaitQueue::shutdown();
 
        delete g_pScriptLibrary;
 
@@ -1541,6 +1542,12 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
          }
          ConsolePrintf(pCtx, _T("%d modules loaded\n"), g_dwNumModules);
                }
+               else if (IsCommand(_T("MSGWQ"), szBuffer, 2))
+               {
+         String text = MsgWaitQueue::getDiagInfo();
+         ConsoleWrite(pCtx, text);
+         ConsoleWrite(pCtx, _T("\n"));
+      }
                else if (IsCommand(_T("OBJECTS"), szBuffer, 1))
                {
                        // Get filter
@@ -1940,6 +1947,7 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
             _T("   show heap                 - Show heap information\n")
                                _T("   show index <index>        - Show internal index\n")
                                _T("   show modules              - Show loaded server modules\n")
+            _T("   show msgwq                - Show message wait queues information\n")
                                _T("   show objects [<filter>]   - Dump network objects to screen\n")
                                _T("   show pollers              - Show poller threads state information\n")
                                _T("   show queues               - Show internal queues statistics\n")
index aa6005c..944eca7 100644 (file)
@@ -296,5 +296,6 @@ int main(int argc, char *argv[])
       }
    }
 
+   MsgWaitQueue::shutdown();
    return iExitCode;
 }
index a09576f..8bf4ffe 100644 (file)
@@ -335,5 +335,6 @@ int main(int argc, char *argv[])
       }
    }
 
+   MsgWaitQueue::shutdown();
    return iExitCode;
 }
index 5ae47b4..7b9ddd5 100644 (file)
@@ -704,5 +704,6 @@ int main(int argc, char *argv[])
       }
    }
 
+   MsgWaitQueue::shutdown();
    return iExitCode;
 }
index e72ad4c..30efaf3 100644 (file)
@@ -404,5 +404,6 @@ int main(int argc, char *argv[])
       }
    }
 
+   MsgWaitQueue::shutdown();
    return iExitCode;
 }