update thread in client session removed
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 18 May 2017 16:08:24 +0000 (19:08 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 18 May 2017 16:08:35 +0000 (19:08 +0300)
src/server/core/session.cpp
src/server/include/nms_core.h

index fc2f5eb..4564bf4 100644 (file)
@@ -212,29 +212,18 @@ THREAD_RESULT THREAD_CALL ClientSession::processingThreadStarter(void *pArg)
 }
 
 /**
- * Information update processing thread starter
- */
-THREAD_RESULT THREAD_CALL ClientSession::updateThreadStarter(void *pArg)
-{
-   ((ClientSession *)pArg)->updateThread();
-   return THREAD_OK;
-}
-
-/**
  * Client session class constructor
  */
 ClientSession::ClientSession(SOCKET hSocket, struct sockaddr *addr)
 {
-   m_pSendQueue = new Queue;
-   m_pMessageQueue = new Queue;
-   m_pUpdateQueue = new Queue;
+   m_sendQueue = new Queue;
+   m_requestQueue = new Queue;
    m_hSocket = hSocket;
    m_id = -1;
    m_state = SESSION_STATE_INIT;
    m_pCtx = NULL;
    m_hWriteThread = INVALID_THREAD_HANDLE;
    m_hProcessingThread = INVALID_THREAD_HANDLE;
-   m_hUpdateThread = INVALID_THREAD_HANDLE;
        m_mutexSocketWrite = MutexCreate();
    m_mutexSendObjects = MutexCreate();
    m_mutexSendAlarms = MutexCreate();
@@ -283,9 +272,8 @@ ClientSession::~ClientSession()
 {
    if (m_hSocket != -1)
       closesocket(m_hSocket);
-   delete m_pSendQueue;
-   delete m_pMessageQueue;
-   delete m_pUpdateQueue;
+   delete m_sendQueue;
+   delete m_requestQueue;
        free(m_clientAddr);
        MutexDestroy(m_mutexSocketWrite);
    MutexDestroy(m_mutexSendObjects);
@@ -332,7 +320,6 @@ void ClientSession::run()
 {
    m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
    m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
-   m_hUpdateThread = ThreadCreateEx(updateThreadStarter, 0, this);
    ThreadCreate(readThreadStarter, 0, this);
 }
 
@@ -558,7 +545,7 @@ void ClientSession::readThread()
                        }
                        else
          {
-            m_pMessageQueue->put(msg);
+            m_requestQueue->put(msg);
          }
       }
    }
@@ -566,20 +553,16 @@ void ClientSession::readThread()
    // Mark as terminated (sendMessage calls will not work after that point)
    m_dwFlags |= CSF_TERMINATED;
 
-       // Finish update thread first
-   m_pUpdateQueue->put(INVALID_POINTER_VALUE);
-   ThreadJoin(m_hUpdateThread);
-
    // Notify other threads to exit
    NXCP_MESSAGE *rawMsg;
-   while((rawMsg = (NXCP_MESSAGE *)m_pSendQueue->get()) != NULL)
+   while((rawMsg = (NXCP_MESSAGE *)m_sendQueue->get()) != NULL)
       free(rawMsg);
-   m_pSendQueue->put(INVALID_POINTER_VALUE);
+   m_sendQueue->put(INVALID_POINTER_VALUE);
 
    NXCPMessage *msg;
-       while((msg = (NXCPMessage *)m_pMessageQueue->get()) != NULL)
+       while((msg = (NXCPMessage *)m_requestQueue->get()) != NULL)
                delete msg;
-   m_pMessageQueue->put(INVALID_POINTER_VALUE);
+   m_requestQueue->put(INVALID_POINTER_VALUE);
 
    // Wait for other threads to finish
    ThreadJoin(m_hWriteThread);
@@ -623,7 +606,7 @@ void ClientSession::writeThread()
 {
    while(true)
    {
-      NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)m_pSendQueue->getOrBlock();
+      NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
       if (rawMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -633,49 +616,6 @@ void ClientSession::writeThread()
 }
 
 /**
- * Update processing thread
- */
-void ClientSession::updateThread()
-{
-   UPDATE_INFO *pUpdate;
-   NXCPMessage msg;
-
-   while(true)
-   {
-      pUpdate = (UPDATE_INFO *)m_pUpdateQueue->getOrBlock();
-      if (pUpdate == INVALID_POINTER_VALUE)    // Session termination indicator
-         break;
-
-      switch(pUpdate->dwCategory)
-      {
-         case INFO_CAT_OBJECT_CHANGE:
-            MutexLock(m_mutexSendObjects);
-            msg.setCode(CMD_OBJECT_UPDATE);
-            if (!((NetObj *)pUpdate->pData)->isDeleted())
-            {
-               ((NetObj *)pUpdate->pData)->fillMessage(&msg);
-               if (m_dwFlags & CSF_SYNC_OBJECT_COMMENTS)
-                  ((NetObj *)pUpdate->pData)->commentsToMessage(&msg);
-            }
-            else
-            {
-               msg.setField(VID_OBJECT_ID, ((NetObj *)pUpdate->pData)->getId());
-               msg.setField(VID_IS_DELETED, (WORD)1);
-            }
-            sendMessage(&msg);
-            MutexUnlock(m_mutexSendObjects);
-            msg.deleteAllFields();
-            ((NetObj *)pUpdate->pData)->decRefCount();
-            break;
-         default:
-            break;
-      }
-
-      free(pUpdate);
-   }
-}
-
-/**
  * Message processing thread
  */
 void ClientSession::processingThread()
@@ -687,7 +627,7 @@ void ClientSession::processingThread()
 
    while(true)
    {
-      pMsg = (NXCPMessage *)m_pMessageQueue->getOrBlock();
+      pMsg = (NXCPMessage *)m_requestQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -2709,21 +2649,39 @@ void ClientSession::onNewEvent(Event *pEvent)
 }
 
 /**
+ * Send object update (executed in thread pool)
+ */
+void ClientSession::sendObjectUpdate(NetObj *object)
+{
+   NXCPMessage msg(CMD_OBJECT_UPDATE, 0);
+   if (!object->isDeleted())
+   {
+      object->fillMessage(&msg);
+      if (m_dwFlags & CSF_SYNC_OBJECT_COMMENTS)
+         object->commentsToMessage(&msg);
+   }
+   else
+   {
+      msg.setField(VID_OBJECT_ID, object->getId());
+      msg.setField(VID_IS_DELETED, (UINT16)1);
+   }
+   MutexLock(m_mutexSendObjects);
+   sendMessage(&msg);
+   MutexUnlock(m_mutexSendObjects);
+   object->decRefCount();
+}
+
+/**
  * Handler for object changes
  */
 void ClientSession::onObjectChange(NetObj *object)
 {
-   UPDATE_INFO *pUpdate;
-
-   if (isAuthenticated() && isSubscribedTo(NXC_CHANNEL_OBJECTS))
-      if (object->isDeleted() || object->checkAccessRights(m_dwUserId, OBJECT_ACCESS_READ))
-      {
-         pUpdate = (UPDATE_INFO *)malloc(sizeof(UPDATE_INFO));
-         pUpdate->dwCategory = INFO_CAT_OBJECT_CHANGE;
-         pUpdate->pData = object;
-         object->incRefCount();
-         m_pUpdateQueue->put(pUpdate);
-      }
+   if (isAuthenticated() && isSubscribedTo(NXC_CHANNEL_OBJECTS) &&
+      (object->isDeleted() || object->checkAccessRights(m_dwUserId, OBJECT_ACCESS_READ)))
+   {
+      object->incRefCount();
+      ThreadPoolExecute(g_mainThreadPool, this, &ClientSession::sendObjectUpdate, object);
+   }
 }
 
 /**
index 8892438..88a50e2 100644 (file)
@@ -206,11 +206,6 @@ typedef void * HSNMPSESSION;
 #define SESSION_STATE_PROCESSING 2
 
 /**
- * Information categories for UPDATE_INFO structure
- */
-#define INFO_CAT_OBJECT_CHANGE   2
-
-/**
  * Certificate types
  */
 enum CertificateType
@@ -418,9 +413,8 @@ class NXCORE_EXPORTABLE ClientSession
 {
 private:
    SOCKET m_hSocket;
-   Queue *m_pSendQueue;
-   Queue *m_pMessageQueue;
-   Queue *m_pUpdateQueue;
+   Queue *m_sendQueue;
+   Queue *m_requestQueue;
    int m_id;
    int m_state;
    WORD m_wCurrentCmd;
@@ -432,7 +426,6 @@ private:
        BYTE m_challenge[CLIENT_CHALLENGE_SIZE];
    THREAD m_hWriteThread;
    THREAD m_hProcessingThread;
-   THREAD m_hUpdateThread;
        MUTEX m_mutexSocketWrite;
    MUTEX m_mutexSendObjects;
    MUTEX m_mutexSendAlarms;
@@ -467,7 +460,6 @@ private:
    static THREAD_RESULT THREAD_CALL readThreadStarter(void *);
    static THREAD_RESULT THREAD_CALL writeThreadStarter(void *);
    static THREAD_RESULT THREAD_CALL processingThreadStarter(void *);
-   static THREAD_RESULT THREAD_CALL updateThreadStarter(void *);
    static void pollerThreadStarter(void *);
 
    DECLARE_THREAD_STARTER(cancelFileMonitoring)
@@ -512,7 +504,6 @@ private:
    void readThread();
    void writeThread();
    void processingThread();
-   void updateThread();
    void pollerThread(Node *pNode, int iPollType, UINT32 dwRqId);
 
    void debugPrintf(int level, const TCHAR *format, ...);
@@ -755,6 +746,7 @@ private:
 
    void alarmUpdateWorker(Alarm *alarm);
    void sendActionDBUpdateMessage(NXCP_MESSAGE *msg);
+   void sendObjectUpdate(NetObj *object);
 
 public:
    ClientSession(SOCKET hSocket, struct sockaddr *addr);
@@ -765,7 +757,7 @@ public:
 
    void run();
 
-   void postMessage(NXCPMessage *pMsg) { if (!isTerminated()) m_pSendQueue->put(pMsg->createMessage((m_dwFlags & CSF_COMPRESSION_ENABLED) != 0)); }
+   void postMessage(NXCPMessage *pMsg) { if (!isTerminated()) m_sendQueue->put(pMsg->createMessage((m_dwFlags & CSF_COMPRESSION_ENABLED) != 0)); }
    bool sendMessage(NXCPMessage *pMsg);
    void sendRawMessage(NXCP_MESSAGE *pMsg);
    void sendPollerMsg(UINT32 dwRqId, const TCHAR *pszMsg);
@@ -814,7 +806,6 @@ public:
    void kill();
    void notify(UINT32 dwCode, UINT32 dwData = 0);
 
-       void queueUpdate(UPDATE_INFO *pUpdate) { m_pUpdateQueue->put(pUpdate); }
    void onNewEvent(Event *pEvent);
    void onSyslogMessage(NX_SYSLOG_RECORD *pRec);
    void onNewSNMPTrap(NXCPMessage *pMsg);